sftp binding component (#3505)
Signed-off-by: Mustafa Arslan <mustafa.arslan1992@gmail.com> Signed-off-by: Bernd Verst <github@bernd.dev> Co-authored-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
parent
c6bac52cab
commit
ab9422dff9
|
@ -0,0 +1,325 @@
|
|||
package sftp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
|
||||
sftpClient "github.com/pkg/sftp"
|
||||
"golang.org/x/crypto/ssh"
|
||||
"golang.org/x/crypto/ssh/knownhosts"
|
||||
|
||||
"github.com/dapr/components-contrib/bindings"
|
||||
"github.com/dapr/components-contrib/metadata"
|
||||
"github.com/dapr/kit/logger"
|
||||
kitmd "github.com/dapr/kit/metadata"
|
||||
)
|
||||
|
||||
const (
|
||||
metadataRootPath = "rootPath"
|
||||
metadataFileName = "fileName"
|
||||
)
|
||||
|
||||
// Sftp is a binding for file operations on sftp server.
|
||||
type Sftp struct {
|
||||
metadata *sftpMetadata
|
||||
logger logger.Logger
|
||||
sftpClient *sftpClient.Client
|
||||
}
|
||||
|
||||
// sftpMetadata defines the sftp metadata.
|
||||
type sftpMetadata struct {
|
||||
RootPath string `json:"rootPath"`
|
||||
Address string `json:"address"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
PrivateKey []byte `json:"privateKey"`
|
||||
PrivateKeyPassphrase []byte `json:"privateKeyPassphrase"`
|
||||
HostPublicKey []byte `json:"hostPublicKey"`
|
||||
KnownHostsFile string `json:"knownHostsFile"`
|
||||
InsecureIgnoreHostKey bool `json:"insecureIgnoreHostKey"`
|
||||
}
|
||||
|
||||
type createResponse struct {
|
||||
FileName string `json:"fileName"`
|
||||
}
|
||||
|
||||
type listResponse struct {
|
||||
FileName string `json:"fileName"`
|
||||
IsDirectory bool `json:"isDirectory"`
|
||||
}
|
||||
|
||||
func NewSftp(logger logger.Logger) bindings.OutputBinding {
|
||||
return &Sftp{logger: logger}
|
||||
}
|
||||
|
||||
func (sftp *Sftp) Init(_ context.Context, metadata bindings.Metadata) error {
|
||||
m, err := sftp.parseMetadata(metadata)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse metadata: %w", err)
|
||||
}
|
||||
|
||||
var auth []ssh.AuthMethod
|
||||
var hostKeyCallback ssh.HostKeyCallback
|
||||
|
||||
if m.InsecureIgnoreHostKey {
|
||||
//nolint:gosec
|
||||
hostKeyCallback = ssh.InsecureIgnoreHostKey()
|
||||
} else if len(m.KnownHostsFile) > 0 {
|
||||
hostKeyCallback, err = knownhosts.New(m.KnownHostsFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sftp binding error: read known host file error: %w", err)
|
||||
}
|
||||
} else if len(m.HostPublicKey) > 0 {
|
||||
var hostPublicKey ssh.PublicKey
|
||||
hostPublicKey, _, _, _, err = ssh.ParseAuthorizedKey(m.HostPublicKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sftp binding error: parse host public key error: %w", err)
|
||||
}
|
||||
|
||||
hostKeyCallback = ssh.FixedHostKey(hostPublicKey)
|
||||
}
|
||||
|
||||
if hostKeyCallback == nil {
|
||||
return errors.New("sftp binding error: no host validation method provided")
|
||||
}
|
||||
|
||||
if len(m.PrivateKey) > 0 {
|
||||
var signer ssh.Signer
|
||||
|
||||
if len(m.PrivateKeyPassphrase) > 0 {
|
||||
signer, err = ssh.ParsePrivateKeyWithPassphrase(m.PrivateKey, m.PrivateKeyPassphrase)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sftp binding error: parse private key error: %w", err)
|
||||
}
|
||||
} else {
|
||||
signer, err = ssh.ParsePrivateKey(m.PrivateKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sftp binding error: parse private key error: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
auth = append(auth, ssh.PublicKeys(signer))
|
||||
}
|
||||
|
||||
if len(m.Password) > 0 {
|
||||
auth = append(auth, ssh.Password(m.Password))
|
||||
}
|
||||
|
||||
config := &ssh.ClientConfig{
|
||||
User: m.Username,
|
||||
Auth: auth,
|
||||
HostKeyCallback: hostKeyCallback,
|
||||
}
|
||||
|
||||
sshClient, err := ssh.Dial("tcp", m.Address, config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sftp binding error: error create ssh client: %w", err)
|
||||
}
|
||||
|
||||
newSftpClient, err := sftpClient.NewClient(sshClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sftp binding error: error create sftp client: %w", err)
|
||||
}
|
||||
|
||||
sftp.metadata = m
|
||||
sftp.sftpClient = newSftpClient
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sftp *Sftp) parseMetadata(meta bindings.Metadata) (*sftpMetadata, error) {
|
||||
var m sftpMetadata
|
||||
err := kitmd.DecodeMetadata(meta.Properties, &m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &m, nil
|
||||
}
|
||||
|
||||
func (sftp *Sftp) Operations() []bindings.OperationKind {
|
||||
return []bindings.OperationKind{
|
||||
bindings.CreateOperation,
|
||||
bindings.GetOperation,
|
||||
bindings.DeleteOperation,
|
||||
bindings.ListOperation,
|
||||
}
|
||||
}
|
||||
|
||||
func (sftp *Sftp) create(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
metadata, err := sftp.metadata.mergeWithRequestMetadata(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err)
|
||||
}
|
||||
|
||||
path, err := metadata.getPath(req.Metadata)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: %w", err)
|
||||
}
|
||||
|
||||
dir, fileName := sftpClient.Split(path)
|
||||
|
||||
err = sftp.sftpClient.MkdirAll(dir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error create dir %s: %w", dir, err)
|
||||
}
|
||||
|
||||
file, err := sftp.sftpClient.Create(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error create file %s: %w", path, err)
|
||||
}
|
||||
|
||||
_, err = file.Write(req.Data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error write file: %w", err)
|
||||
}
|
||||
|
||||
jsonResponse, err := json.Marshal(createResponse{
|
||||
FileName: fileName,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error marshalling create response: %w", err)
|
||||
}
|
||||
|
||||
return &bindings.InvokeResponse{
|
||||
Data: jsonResponse,
|
||||
Metadata: map[string]string{
|
||||
metadataFileName: fileName,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (sftp *Sftp) list(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
metadata, err := sftp.metadata.mergeWithRequestMetadata(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err)
|
||||
}
|
||||
|
||||
path, err := metadata.getPath(req.Metadata)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: %w", err)
|
||||
}
|
||||
|
||||
files, err := sftp.sftpClient.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error read dir %s: %w", path, err)
|
||||
}
|
||||
|
||||
resp := make([]listResponse, len(files))
|
||||
|
||||
for i, file := range files {
|
||||
resp[i] = listResponse{
|
||||
FileName: file.Name(),
|
||||
IsDirectory: file.IsDir(),
|
||||
}
|
||||
}
|
||||
|
||||
jsonResponse, err := json.Marshal(resp)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: cannot marshal list to json: %w", err)
|
||||
}
|
||||
|
||||
return &bindings.InvokeResponse{
|
||||
Data: jsonResponse,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (sftp *Sftp) get(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
metadata, err := sftp.metadata.mergeWithRequestMetadata(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err)
|
||||
}
|
||||
|
||||
path, err := metadata.getPath(req.Metadata)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: %w", err)
|
||||
}
|
||||
|
||||
file, err := sftp.sftpClient.Open(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error open file %s: %w", path, err)
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error read file %s: %w", path, err)
|
||||
}
|
||||
|
||||
return &bindings.InvokeResponse{
|
||||
Data: b,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (sftp *Sftp) delete(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
metadata, err := sftp.metadata.mergeWithRequestMetadata(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err)
|
||||
}
|
||||
|
||||
path, err := metadata.getPath(req.Metadata)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: %w", err)
|
||||
}
|
||||
|
||||
err = sftp.sftpClient.Remove(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sftp binding error: error remove file %s: %w", path, err)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (sftp *Sftp) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
switch req.Operation {
|
||||
case bindings.CreateOperation:
|
||||
return sftp.create(ctx, req)
|
||||
case bindings.GetOperation:
|
||||
return sftp.get(ctx, req)
|
||||
case bindings.DeleteOperation:
|
||||
return sftp.delete(ctx, req)
|
||||
case bindings.ListOperation:
|
||||
return sftp.list(ctx, req)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
|
||||
}
|
||||
}
|
||||
|
||||
func (sftp *Sftp) Close() error {
|
||||
return sftp.sftpClient.Close()
|
||||
}
|
||||
|
||||
func (metadata sftpMetadata) getPath(requestMetadata map[string]string) (path string, err error) {
|
||||
if val, ok := kitmd.GetMetadataProperty(requestMetadata, metadataFileName); ok && val != "" {
|
||||
path = sftpClient.Join(metadata.RootPath, val)
|
||||
} else {
|
||||
path = metadata.RootPath
|
||||
}
|
||||
|
||||
if path == "" {
|
||||
err = errors.New("required metadata rootPath or fileName missing")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Helper to merge config and request metadata.
|
||||
func (metadata sftpMetadata) mergeWithRequestMetadata(req *bindings.InvokeRequest) (sftpMetadata, error) {
|
||||
merged := metadata
|
||||
|
||||
if val, ok := kitmd.GetMetadataProperty(req.Metadata, metadataRootPath); ok && val != "" {
|
||||
merged.RootPath = val
|
||||
}
|
||||
|
||||
return merged, nil
|
||||
}
|
||||
|
||||
// GetComponentMetadata returns the metadata of the component.
|
||||
func (sftp *Sftp) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
|
||||
metadataStruct := sftpMetadata{}
|
||||
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.BindingType)
|
||||
return
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
package sftp
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dapr/components-contrib/bindings"
|
||||
)
|
||||
|
||||
func TestParseMeta(t *testing.T) {
|
||||
t.Run("Has correct metadata", func(t *testing.T) {
|
||||
m := bindings.Metadata{}
|
||||
m.Properties = map[string]string{
|
||||
"rootPath": "path",
|
||||
"address": "address",
|
||||
"username": "user",
|
||||
"password": "pass",
|
||||
"privateKey": "cHJpdmF0ZUtleQ==",
|
||||
"hostPublicKey": "aG9zdFB1YmxpY0tleQ==",
|
||||
"KnownHostsFile": "/known_hosts",
|
||||
"insecureIgnoreHostKey": "true",
|
||||
}
|
||||
sftp := Sftp{}
|
||||
meta, err := sftp.parseMetadata(m)
|
||||
|
||||
privateKeyBytes := []byte{0x63, 0x48, 0x4a, 0x70, 0x64, 0x6d, 0x46, 0x30, 0x5a, 0x55, 0x74, 0x6c, 0x65, 0x51, 0x3d, 0x3d}
|
||||
hostPublicKeyBytes := []byte{0x61, 0x47, 0x39, 0x7a, 0x64, 0x46, 0x42, 0x31, 0x59, 0x6d, 0x78, 0x70, 0x59, 0x30, 0x74, 0x6c, 0x65, 0x51, 0x3d, 0x3d}
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "path", meta.RootPath)
|
||||
assert.Equal(t, "address", meta.Address)
|
||||
assert.Equal(t, "user", meta.Username)
|
||||
assert.Equal(t, "pass", meta.Password)
|
||||
assert.Equal(t, privateKeyBytes, meta.PrivateKey)
|
||||
assert.Equal(t, hostPublicKeyBytes, meta.HostPublicKey)
|
||||
assert.Equal(t, "/known_hosts", meta.KnownHostsFile)
|
||||
assert.True(t, meta.InsecureIgnoreHostKey)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMergeWithRequestMetadata(t *testing.T) {
|
||||
t.Run("Has merged metadata", func(t *testing.T) {
|
||||
m := bindings.Metadata{}
|
||||
m.Properties = map[string]string{
|
||||
"rootPath": "path",
|
||||
"address": "address",
|
||||
"username": "user",
|
||||
"password": "pass",
|
||||
"privateKey": "cHJpdmF0ZUtleQ==",
|
||||
"hostPublicKey": "aG9zdFB1YmxpY0tleQ==",
|
||||
"KnownHostsFile": "/known_hosts",
|
||||
"insecureIgnoreHostKey": "true",
|
||||
}
|
||||
sftp := Sftp{}
|
||||
meta, _ := sftp.parseMetadata(m)
|
||||
|
||||
request := bindings.InvokeRequest{}
|
||||
request.Metadata = map[string]string{
|
||||
"rootPath": "changedpath",
|
||||
"address": "changedaddress",
|
||||
"username": "changeduser",
|
||||
"password": "changedpass",
|
||||
"privateKey": "changedcHJpdmF0ZUtleQ==",
|
||||
"hostPublicKey": "changedaG9zdFB1YmxpY0tleQ==",
|
||||
"KnownHostsFile": "changed/known_hosts",
|
||||
"insecureSSL": "changedtrue",
|
||||
}
|
||||
|
||||
mergedMeta, err := meta.mergeWithRequestMetadata(&request)
|
||||
|
||||
privateKeyBytes := []byte{0x63, 0x48, 0x4a, 0x70, 0x64, 0x6d, 0x46, 0x30, 0x5a, 0x55, 0x74, 0x6c, 0x65, 0x51, 0x3d, 0x3d}
|
||||
hostPublicKeyBytes := []byte{0x61, 0x47, 0x39, 0x7a, 0x64, 0x46, 0x42, 0x31, 0x59, 0x6d, 0x78, 0x70, 0x59, 0x30, 0x74, 0x6c, 0x65, 0x51, 0x3d, 0x3d}
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "changedpath", mergedMeta.RootPath)
|
||||
assert.Equal(t, "address", mergedMeta.Address)
|
||||
assert.Equal(t, "user", mergedMeta.Username)
|
||||
assert.Equal(t, "pass", mergedMeta.Password)
|
||||
assert.Equal(t, privateKeyBytes, mergedMeta.PrivateKey)
|
||||
assert.Equal(t, hostPublicKeyBytes, mergedMeta.HostPublicKey)
|
||||
assert.Equal(t, "/known_hosts", mergedMeta.KnownHostsFile)
|
||||
assert.True(t, meta.InsecureIgnoreHostKey)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetPath(t *testing.T) {
|
||||
t.Run("has path", func(t *testing.T) {
|
||||
m := &sftpMetadata{
|
||||
RootPath: "/path",
|
||||
}
|
||||
_, err := m.getPath(map[string]string{})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("return if error path is empty", func(t *testing.T) {
|
||||
m := &sftpMetadata{}
|
||||
_, err := m.getPath(map[string]string{})
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
2
go.mod
2
go.mod
|
@ -98,6 +98,7 @@ require (
|
|||
github.com/oracle/oci-go-sdk/v54 v54.0.0
|
||||
github.com/pashagolub/pgxmock/v2 v2.12.0
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/pkg/sftp v1.13.6
|
||||
github.com/puzpuzpuz/xsync/v3 v3.0.0
|
||||
github.com/rabbitmq/amqp091-go v1.8.1
|
||||
github.com/redis/go-redis/v9 v9.2.1
|
||||
|
@ -298,6 +299,7 @@ require (
|
|||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
|
||||
github.com/klauspost/compress v1.17.7 // indirect
|
||||
github.com/knadh/koanf v1.4.1 // indirect
|
||||
github.com/kr/fs v0.1.0 // indirect
|
||||
github.com/kubemq-io/protobuf v1.3.1 // indirect
|
||||
github.com/kylelemons/godebug v1.1.0 // indirect
|
||||
github.com/leodido/go-urn v1.2.1 // indirect
|
||||
|
|
4
go.sum
4
go.sum
|
@ -1067,6 +1067,8 @@ github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgSh
|
|||
github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7/go.mod h1:Y2SaZf2Rzd0pXkLVhLlCiAXFCLSXAIbTKDivVgff/AM=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
|
||||
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
|
@ -1341,6 +1343,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
|
|||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
|
||||
github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo=
|
||||
github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk=
|
||||
github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw=
|
||||
github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
|
|
Loading…
Reference in New Issue