From ab9422dff9e89ab92cfb3fed24696f337f2584d2 Mon Sep 17 00:00:00 2001 From: Mustafa Arslan Date: Sat, 26 Oct 2024 02:56:14 +0300 Subject: [PATCH] sftp binding component (#3505) Signed-off-by: Mustafa Arslan Signed-off-by: Bernd Verst Co-authored-by: Bernd Verst --- bindings/sftp/sftp.go | 325 +++++++++++++++++++++++++++++++++++++ bindings/sftp/sftp_test.go | 102 ++++++++++++ go.mod | 2 + go.sum | 4 + 4 files changed, 433 insertions(+) create mode 100644 bindings/sftp/sftp.go create mode 100644 bindings/sftp/sftp_test.go diff --git a/bindings/sftp/sftp.go b/bindings/sftp/sftp.go new file mode 100644 index 000000000..960294a44 --- /dev/null +++ b/bindings/sftp/sftp.go @@ -0,0 +1,325 @@ +package sftp + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "reflect" + + sftpClient "github.com/pkg/sftp" + "golang.org/x/crypto/ssh" + "golang.org/x/crypto/ssh/knownhosts" + + "github.com/dapr/components-contrib/bindings" + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/kit/logger" + kitmd "github.com/dapr/kit/metadata" +) + +const ( + metadataRootPath = "rootPath" + metadataFileName = "fileName" +) + +// Sftp is a binding for file operations on sftp server. +type Sftp struct { + metadata *sftpMetadata + logger logger.Logger + sftpClient *sftpClient.Client +} + +// sftpMetadata defines the sftp metadata. +type sftpMetadata struct { + RootPath string `json:"rootPath"` + Address string `json:"address"` + Username string `json:"username"` + Password string `json:"password"` + PrivateKey []byte `json:"privateKey"` + PrivateKeyPassphrase []byte `json:"privateKeyPassphrase"` + HostPublicKey []byte `json:"hostPublicKey"` + KnownHostsFile string `json:"knownHostsFile"` + InsecureIgnoreHostKey bool `json:"insecureIgnoreHostKey"` +} + +type createResponse struct { + FileName string `json:"fileName"` +} + +type listResponse struct { + FileName string `json:"fileName"` + IsDirectory bool `json:"isDirectory"` +} + +func NewSftp(logger logger.Logger) bindings.OutputBinding { + return &Sftp{logger: logger} +} + +func (sftp *Sftp) Init(_ context.Context, metadata bindings.Metadata) error { + m, err := sftp.parseMetadata(metadata) + if err != nil { + return fmt.Errorf("failed to parse metadata: %w", err) + } + + var auth []ssh.AuthMethod + var hostKeyCallback ssh.HostKeyCallback + + if m.InsecureIgnoreHostKey { + //nolint:gosec + hostKeyCallback = ssh.InsecureIgnoreHostKey() + } else if len(m.KnownHostsFile) > 0 { + hostKeyCallback, err = knownhosts.New(m.KnownHostsFile) + if err != nil { + return fmt.Errorf("sftp binding error: read known host file error: %w", err) + } + } else if len(m.HostPublicKey) > 0 { + var hostPublicKey ssh.PublicKey + hostPublicKey, _, _, _, err = ssh.ParseAuthorizedKey(m.HostPublicKey) + if err != nil { + return fmt.Errorf("sftp binding error: parse host public key error: %w", err) + } + + hostKeyCallback = ssh.FixedHostKey(hostPublicKey) + } + + if hostKeyCallback == nil { + return errors.New("sftp binding error: no host validation method provided") + } + + if len(m.PrivateKey) > 0 { + var signer ssh.Signer + + if len(m.PrivateKeyPassphrase) > 0 { + signer, err = ssh.ParsePrivateKeyWithPassphrase(m.PrivateKey, m.PrivateKeyPassphrase) + if err != nil { + return fmt.Errorf("sftp binding error: parse private key error: %w", err) + } + } else { + signer, err = ssh.ParsePrivateKey(m.PrivateKey) + if err != nil { + return fmt.Errorf("sftp binding error: parse private key error: %w", err) + } + } + + auth = append(auth, ssh.PublicKeys(signer)) + } + + if len(m.Password) > 0 { + auth = append(auth, ssh.Password(m.Password)) + } + + config := &ssh.ClientConfig{ + User: m.Username, + Auth: auth, + HostKeyCallback: hostKeyCallback, + } + + sshClient, err := ssh.Dial("tcp", m.Address, config) + if err != nil { + return fmt.Errorf("sftp binding error: error create ssh client: %w", err) + } + + newSftpClient, err := sftpClient.NewClient(sshClient) + if err != nil { + return fmt.Errorf("sftp binding error: error create sftp client: %w", err) + } + + sftp.metadata = m + sftp.sftpClient = newSftpClient + + return nil +} + +func (sftp *Sftp) parseMetadata(meta bindings.Metadata) (*sftpMetadata, error) { + var m sftpMetadata + err := kitmd.DecodeMetadata(meta.Properties, &m) + if err != nil { + return nil, err + } + + return &m, nil +} + +func (sftp *Sftp) Operations() []bindings.OperationKind { + return []bindings.OperationKind{ + bindings.CreateOperation, + bindings.GetOperation, + bindings.DeleteOperation, + bindings.ListOperation, + } +} + +func (sftp *Sftp) create(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + metadata, err := sftp.metadata.mergeWithRequestMetadata(req) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err) + } + + path, err := metadata.getPath(req.Metadata) + if err != nil { + return nil, fmt.Errorf("sftp binding error: %w", err) + } + + dir, fileName := sftpClient.Split(path) + + err = sftp.sftpClient.MkdirAll(dir) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error create dir %s: %w", dir, err) + } + + file, err := sftp.sftpClient.Create(path) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error create file %s: %w", path, err) + } + + _, err = file.Write(req.Data) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error write file: %w", err) + } + + jsonResponse, err := json.Marshal(createResponse{ + FileName: fileName, + }) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error marshalling create response: %w", err) + } + + return &bindings.InvokeResponse{ + Data: jsonResponse, + Metadata: map[string]string{ + metadataFileName: fileName, + }, + }, nil +} + +func (sftp *Sftp) list(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + metadata, err := sftp.metadata.mergeWithRequestMetadata(req) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err) + } + + path, err := metadata.getPath(req.Metadata) + if err != nil { + return nil, fmt.Errorf("sftp binding error: %w", err) + } + + files, err := sftp.sftpClient.ReadDir(path) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error read dir %s: %w", path, err) + } + + resp := make([]listResponse, len(files)) + + for i, file := range files { + resp[i] = listResponse{ + FileName: file.Name(), + IsDirectory: file.IsDir(), + } + } + + jsonResponse, err := json.Marshal(resp) + if err != nil { + return nil, fmt.Errorf("sftp binding error: cannot marshal list to json: %w", err) + } + + return &bindings.InvokeResponse{ + Data: jsonResponse, + }, nil +} + +func (sftp *Sftp) get(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + metadata, err := sftp.metadata.mergeWithRequestMetadata(req) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err) + } + + path, err := metadata.getPath(req.Metadata) + if err != nil { + return nil, fmt.Errorf("sftp binding error: %w", err) + } + + file, err := sftp.sftpClient.Open(path) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error open file %s: %w", path, err) + } + + b, err := io.ReadAll(file) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error read file %s: %w", path, err) + } + + return &bindings.InvokeResponse{ + Data: b, + }, nil +} + +func (sftp *Sftp) delete(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + metadata, err := sftp.metadata.mergeWithRequestMetadata(req) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err) + } + + path, err := metadata.getPath(req.Metadata) + if err != nil { + return nil, fmt.Errorf("sftp binding error: %w", err) + } + + err = sftp.sftpClient.Remove(path) + if err != nil { + return nil, fmt.Errorf("sftp binding error: error remove file %s: %w", path, err) + } + + return nil, nil +} + +func (sftp *Sftp) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + switch req.Operation { + case bindings.CreateOperation: + return sftp.create(ctx, req) + case bindings.GetOperation: + return sftp.get(ctx, req) + case bindings.DeleteOperation: + return sftp.delete(ctx, req) + case bindings.ListOperation: + return sftp.list(ctx, req) + default: + return nil, fmt.Errorf("unsupported operation %s", req.Operation) + } +} + +func (sftp *Sftp) Close() error { + return sftp.sftpClient.Close() +} + +func (metadata sftpMetadata) getPath(requestMetadata map[string]string) (path string, err error) { + if val, ok := kitmd.GetMetadataProperty(requestMetadata, metadataFileName); ok && val != "" { + path = sftpClient.Join(metadata.RootPath, val) + } else { + path = metadata.RootPath + } + + if path == "" { + err = errors.New("required metadata rootPath or fileName missing") + } + + return +} + +// Helper to merge config and request metadata. +func (metadata sftpMetadata) mergeWithRequestMetadata(req *bindings.InvokeRequest) (sftpMetadata, error) { + merged := metadata + + if val, ok := kitmd.GetMetadataProperty(req.Metadata, metadataRootPath); ok && val != "" { + merged.RootPath = val + } + + return merged, nil +} + +// GetComponentMetadata returns the metadata of the component. +func (sftp *Sftp) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { + metadataStruct := sftpMetadata{} + metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.BindingType) + return +} diff --git a/bindings/sftp/sftp_test.go b/bindings/sftp/sftp_test.go new file mode 100644 index 000000000..69f7e5bb8 --- /dev/null +++ b/bindings/sftp/sftp_test.go @@ -0,0 +1,102 @@ +package sftp + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/dapr/components-contrib/bindings" +) + +func TestParseMeta(t *testing.T) { + t.Run("Has correct metadata", func(t *testing.T) { + m := bindings.Metadata{} + m.Properties = map[string]string{ + "rootPath": "path", + "address": "address", + "username": "user", + "password": "pass", + "privateKey": "cHJpdmF0ZUtleQ==", + "hostPublicKey": "aG9zdFB1YmxpY0tleQ==", + "KnownHostsFile": "/known_hosts", + "insecureIgnoreHostKey": "true", + } + sftp := Sftp{} + meta, err := sftp.parseMetadata(m) + + privateKeyBytes := []byte{0x63, 0x48, 0x4a, 0x70, 0x64, 0x6d, 0x46, 0x30, 0x5a, 0x55, 0x74, 0x6c, 0x65, 0x51, 0x3d, 0x3d} + hostPublicKeyBytes := []byte{0x61, 0x47, 0x39, 0x7a, 0x64, 0x46, 0x42, 0x31, 0x59, 0x6d, 0x78, 0x70, 0x59, 0x30, 0x74, 0x6c, 0x65, 0x51, 0x3d, 0x3d} + + require.NoError(t, err) + assert.Equal(t, "path", meta.RootPath) + assert.Equal(t, "address", meta.Address) + assert.Equal(t, "user", meta.Username) + assert.Equal(t, "pass", meta.Password) + assert.Equal(t, privateKeyBytes, meta.PrivateKey) + assert.Equal(t, hostPublicKeyBytes, meta.HostPublicKey) + assert.Equal(t, "/known_hosts", meta.KnownHostsFile) + assert.True(t, meta.InsecureIgnoreHostKey) + }) +} + +func TestMergeWithRequestMetadata(t *testing.T) { + t.Run("Has merged metadata", func(t *testing.T) { + m := bindings.Metadata{} + m.Properties = map[string]string{ + "rootPath": "path", + "address": "address", + "username": "user", + "password": "pass", + "privateKey": "cHJpdmF0ZUtleQ==", + "hostPublicKey": "aG9zdFB1YmxpY0tleQ==", + "KnownHostsFile": "/known_hosts", + "insecureIgnoreHostKey": "true", + } + sftp := Sftp{} + meta, _ := sftp.parseMetadata(m) + + request := bindings.InvokeRequest{} + request.Metadata = map[string]string{ + "rootPath": "changedpath", + "address": "changedaddress", + "username": "changeduser", + "password": "changedpass", + "privateKey": "changedcHJpdmF0ZUtleQ==", + "hostPublicKey": "changedaG9zdFB1YmxpY0tleQ==", + "KnownHostsFile": "changed/known_hosts", + "insecureSSL": "changedtrue", + } + + mergedMeta, err := meta.mergeWithRequestMetadata(&request) + + privateKeyBytes := []byte{0x63, 0x48, 0x4a, 0x70, 0x64, 0x6d, 0x46, 0x30, 0x5a, 0x55, 0x74, 0x6c, 0x65, 0x51, 0x3d, 0x3d} + hostPublicKeyBytes := []byte{0x61, 0x47, 0x39, 0x7a, 0x64, 0x46, 0x42, 0x31, 0x59, 0x6d, 0x78, 0x70, 0x59, 0x30, 0x74, 0x6c, 0x65, 0x51, 0x3d, 0x3d} + + require.NoError(t, err) + assert.Equal(t, "changedpath", mergedMeta.RootPath) + assert.Equal(t, "address", mergedMeta.Address) + assert.Equal(t, "user", mergedMeta.Username) + assert.Equal(t, "pass", mergedMeta.Password) + assert.Equal(t, privateKeyBytes, mergedMeta.PrivateKey) + assert.Equal(t, hostPublicKeyBytes, mergedMeta.HostPublicKey) + assert.Equal(t, "/known_hosts", mergedMeta.KnownHostsFile) + assert.True(t, meta.InsecureIgnoreHostKey) + }) +} + +func TestGetPath(t *testing.T) { + t.Run("has path", func(t *testing.T) { + m := &sftpMetadata{ + RootPath: "/path", + } + _, err := m.getPath(map[string]string{}) + require.NoError(t, err) + }) + + t.Run("return if error path is empty", func(t *testing.T) { + m := &sftpMetadata{} + _, err := m.getPath(map[string]string{}) + require.Error(t, err) + }) +} diff --git a/go.mod b/go.mod index 7ba8abaac..2f194602f 100644 --- a/go.mod +++ b/go.mod @@ -98,6 +98,7 @@ require ( github.com/oracle/oci-go-sdk/v54 v54.0.0 github.com/pashagolub/pgxmock/v2 v2.12.0 github.com/patrickmn/go-cache v2.1.0+incompatible + github.com/pkg/sftp v1.13.6 github.com/puzpuzpuz/xsync/v3 v3.0.0 github.com/rabbitmq/amqp091-go v1.8.1 github.com/redis/go-redis/v9 v9.2.1 @@ -298,6 +299,7 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/klauspost/compress v1.17.7 // indirect github.com/knadh/koanf v1.4.1 // indirect + github.com/kr/fs v0.1.0 // indirect github.com/kubemq-io/protobuf v1.3.1 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/leodido/go-urn v1.2.1 // indirect diff --git a/go.sum b/go.sum index 0b416716e..bc2344e5a 100644 --- a/go.sum +++ b/go.sum @@ -1067,6 +1067,8 @@ github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgSh github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7/go.mod h1:Y2SaZf2Rzd0pXkLVhLlCiAXFCLSXAIbTKDivVgff/AM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= +github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -1341,6 +1343,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo= +github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk= github.com/pkoukk/tiktoken-go v0.1.6 h1:JF0TlJzhTbrI30wCvFuiw6FzP2+/bR+FIxUdgEAcUsw= github.com/pkoukk/tiktoken-go v0.1.6/go.mod h1:9NiV+i9mJKGj1rYOT+njbv+ZwA/zJxYdewGl6qVatpg= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=