mirror of https://github.com/dapr/go-sdk.git
[Crypto] Implement support for high-level crypto APIs (#387)
* Updated generated protos
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
* Added encrypt method
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
* Add Decrypt method
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
* Working on tests
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
* Added unit tests
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
* Sync protos
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
* Updated protos
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
* 💄
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
---------
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
b14253b0fe
commit
92c23df717
4
Makefile
4
Makefile
|
@ -41,8 +41,8 @@ tag: ## Creates release tag
|
|||
.PHONY: clean
|
||||
clean: ## Cleans go and generated files in ./dapr/proto/
|
||||
go clean
|
||||
rm -fr ./dapr/proto/common/v1/*
|
||||
rm -fr ./dapr/proto/runtime/v1/*
|
||||
rm -fr ./dapr/proto/common/v1/*.pb.go
|
||||
rm -fr ./dapr/proto/runtime/v1/*.pb.go
|
||||
|
||||
.PHONY: help
|
||||
help: ## Display available commands
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
Copyright 2023 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Maximum size, in bytes, for the buffer used by stream invocations: 2KB.
|
||||
const StreamBufferSize = 2 << 10
|
||||
|
||||
// Pool of *[]byte used by stream invocations. Their size is fixed at StreamBufferSize.
|
||||
var bufPool = sync.Pool{
|
||||
New: func() any {
|
||||
// Return a pointer here
|
||||
// See https://github.com/dominikh/go-tools/issues/1336 for explanation
|
||||
b := make([]byte, StreamBufferSize)
|
||||
return &b
|
||||
},
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Copyright 2023 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
@ -17,6 +17,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
|
@ -149,6 +150,14 @@ type Client interface {
|
|||
// UnlockAlpha1 deletes unlocks a lock from a lock store.
|
||||
UnlockAlpha1(ctx context.Context, storeName string, request *UnlockRequest) (*UnlockResponse, error)
|
||||
|
||||
// Encrypt data read from a stream, returning a readable stream that receives the encrypted data.
|
||||
// This method returns an error if the initial call fails. Errors performed during the encryption are received by the out stream.
|
||||
Encrypt(ctx context.Context, in io.Reader, opts EncryptOptions) (io.Reader, error)
|
||||
|
||||
// Decrypt data read from a stream, returning a readable stream that receives the decrypted data.
|
||||
// This method returns an error if the initial call fails. Errors performed during the encryption are received by the out stream.
|
||||
Decrypt(ctx context.Context, in io.Reader, opts DecryptOptions) (io.Reader, error)
|
||||
|
||||
// Shutdown the sidecar.
|
||||
Shutdown(ctx context.Context) error
|
||||
|
||||
|
|
|
@ -0,0 +1,272 @@
|
|||
/*
|
||||
Copyright 2023 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
commonv1pb "github.com/dapr/go-sdk/dapr/proto/common/v1"
|
||||
runtimev1pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
|
||||
)
|
||||
|
||||
// Encrypt data read from a stream, returning a readable stream that receives the encrypted data.
|
||||
// This method returns an error if the initial call fails. Errors performed during the encryption are received by the out stream.
|
||||
func (c *GRPCClient) Encrypt(ctx context.Context, in io.Reader, opts EncryptOptions) (io.Reader, error) {
|
||||
// Ensure required options are present
|
||||
// This short-circuits and avoids a call to the runtime
|
||||
if opts.ComponentName == "" {
|
||||
return nil, errors.New("option 'ComponentName' is required")
|
||||
}
|
||||
if opts.KeyName == "" {
|
||||
return nil, errors.New("option 'KeyName' is required")
|
||||
}
|
||||
if opts.KeyWrapAlgorithm == "" {
|
||||
return nil, errors.New("option 'Algorithm' is required")
|
||||
}
|
||||
|
||||
// Create the stream
|
||||
stream, err := c.protoClient.EncryptAlpha1(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Use the context of the stream here.
|
||||
return c.performCryptoOperation(
|
||||
stream.Context(), stream,
|
||||
in, opts,
|
||||
&runtimev1pb.EncryptRequest{},
|
||||
&runtimev1pb.EncryptResponse{},
|
||||
)
|
||||
}
|
||||
|
||||
// Decrypt data read from a stream, returning a readable stream that receives the decrypted data.
|
||||
// This method returns an error if the initial call fails. Errors performed during the encryption are received by the out stream.
|
||||
func (c *GRPCClient) Decrypt(ctx context.Context, in io.Reader, opts DecryptOptions) (io.Reader, error) {
|
||||
// Ensure required options are present
|
||||
// This short-circuits and avoids a call to the runtime
|
||||
if opts.ComponentName == "" {
|
||||
return nil, errors.New("option 'ComponentName' is required")
|
||||
}
|
||||
|
||||
// Create the stream
|
||||
stream, err := c.protoClient.DecryptAlpha1(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Use the context of the stream here.
|
||||
return c.performCryptoOperation(
|
||||
stream.Context(), stream,
|
||||
in, opts,
|
||||
&runtimev1pb.DecryptRequest{},
|
||||
&runtimev1pb.DecryptResponse{},
|
||||
)
|
||||
}
|
||||
|
||||
func (c *GRPCClient) performCryptoOperation(ctx context.Context, stream grpc.ClientStream, in io.Reader, opts cryptoOperationOpts, reqProto runtimev1pb.CryptoRequests, resProto runtimev1pb.CryptoResponses) (io.Reader, error) {
|
||||
var err error
|
||||
// Pipe for writing the response
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
// Send the request in a background goroutine
|
||||
go func() {
|
||||
// Build the options object for the first message
|
||||
optsProto := opts.getProto()
|
||||
|
||||
// Get a buffer from the pool
|
||||
reqBuf := bufPool.Get().(*[]byte)
|
||||
defer func() {
|
||||
bufPool.Put(reqBuf)
|
||||
}()
|
||||
|
||||
// Send the request in chunks
|
||||
var (
|
||||
n int
|
||||
seq uint64
|
||||
done bool
|
||||
)
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
pw.CloseWithError(ctx.Err())
|
||||
return
|
||||
}
|
||||
|
||||
// First message only - add the options
|
||||
if optsProto != nil {
|
||||
reqProto.SetOptions(optsProto)
|
||||
optsProto = nil
|
||||
} else {
|
||||
// Reset the object so we can re-use it
|
||||
reqProto.Reset()
|
||||
}
|
||||
|
||||
n, err = in.Read(*reqBuf)
|
||||
if err == io.EOF {
|
||||
done = true
|
||||
} else if err != nil {
|
||||
pw.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Send the chunk if there's anything to send
|
||||
if n > 0 {
|
||||
reqProto.SetPayload(&commonv1pb.StreamPayload{
|
||||
Data: (*reqBuf)[:n],
|
||||
Seq: seq,
|
||||
})
|
||||
seq++
|
||||
|
||||
err = stream.SendMsg(reqProto)
|
||||
if errors.Is(err, io.EOF) {
|
||||
// If SendMsg returns an io.EOF error, it usually means that there's a transport-level error
|
||||
// The exact error can only be determined by RecvMsg, so if we encounter an EOF error here, just consider the stream done and let RecvMsg handle the error
|
||||
done = true
|
||||
} else if err != nil {
|
||||
pw.CloseWithError(fmt.Errorf("error sending message: %w", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Stop the loop with the last chunk
|
||||
if done {
|
||||
err = stream.CloseSend()
|
||||
if err != nil {
|
||||
pw.CloseWithError(fmt.Errorf("failed to close the send direction of the stream: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Read the response in another goroutine
|
||||
go func() {
|
||||
var (
|
||||
expectSeq uint64
|
||||
readErr error
|
||||
done bool
|
||||
payload *commonv1pb.StreamPayload
|
||||
)
|
||||
|
||||
// Read until the end of the stream
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
pw.CloseWithError(ctx.Err())
|
||||
return
|
||||
}
|
||||
|
||||
// Read the next chunk
|
||||
readErr = stream.RecvMsg(resProto)
|
||||
if errors.Is(readErr, io.EOF) {
|
||||
// Receiving an io.EOF signifies that the client has stopped sending data over the pipe, so this is the end
|
||||
done = true
|
||||
} else if readErr != nil {
|
||||
pw.CloseWithError(fmt.Errorf("error receiving message: %w", readErr))
|
||||
return
|
||||
}
|
||||
|
||||
// Write the data, if any, into the pipe
|
||||
payload = resProto.GetPayload()
|
||||
if payload != nil {
|
||||
if payload.Seq != expectSeq {
|
||||
pw.CloseWithError(fmt.Errorf("invalid sequence number in chunk: %d (expected: %d)", payload.Seq, expectSeq))
|
||||
return
|
||||
}
|
||||
expectSeq++
|
||||
|
||||
_, readErr = pw.Write(payload.Data)
|
||||
if readErr != nil {
|
||||
pw.CloseWithError(fmt.Errorf("error writing data: %w", readErr))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Stop when done
|
||||
if done {
|
||||
break
|
||||
}
|
||||
|
||||
// Reset the proto
|
||||
resProto.Reset()
|
||||
}
|
||||
|
||||
// Close the writer of the pipe when done
|
||||
pw.Close()
|
||||
}()
|
||||
|
||||
// Return the readable stream
|
||||
return pr, nil
|
||||
}
|
||||
|
||||
// Interface for EncryptOptions and DecryptOptions
|
||||
type cryptoOperationOpts interface {
|
||||
getProto() proto.Message
|
||||
}
|
||||
|
||||
// EncryptOptions contains options passed to the Encrypt method.
|
||||
type EncryptOptions struct {
|
||||
// Name of the component. Required.
|
||||
ComponentName string
|
||||
// Name (or name/version) of the key. Required.
|
||||
KeyName string
|
||||
// Key wrapping algorithm to use. Required.
|
||||
// Supported options include: A256KW, A128CBC, A192CBC, A256CBC, RSA-OAEP-256.
|
||||
KeyWrapAlgorithm string
|
||||
// DataEncryptionCipher to use to encrypt data (optional): "aes-gcm" (default) or "chacha20-poly1305"
|
||||
DataEncryptionCipher string
|
||||
// If true, the encrypted document does not contain a key reference.
|
||||
// In that case, calls to the Decrypt method must provide a key reference (name or name/version).
|
||||
// Defaults to false.
|
||||
OmitDecryptionKeyName bool
|
||||
// Key reference to embed in the encrypted document (name or name/version).
|
||||
// This is helpful if the reference of the key used to decrypt the document is different from the one used to encrypt it.
|
||||
// If unset, uses the reference of the key used to encrypt the document (this is the default behavior).
|
||||
// This option is ignored if omit_decryption_key_name is true.
|
||||
DecryptionKeyName string
|
||||
}
|
||||
|
||||
func (o EncryptOptions) getProto() proto.Message {
|
||||
return &runtimev1pb.EncryptRequestOptions{
|
||||
ComponentName: o.ComponentName,
|
||||
KeyName: o.KeyName,
|
||||
KeyWrapAlgorithm: o.KeyWrapAlgorithm,
|
||||
DataEncryptionCipher: o.DataEncryptionCipher,
|
||||
OmitDecryptionKeyName: o.OmitDecryptionKeyName,
|
||||
DecryptionKeyName: o.DecryptionKeyName,
|
||||
}
|
||||
}
|
||||
|
||||
// DecryptOptions contains options passed to the Decrypt method.
|
||||
type DecryptOptions struct {
|
||||
// Name of the component. Required.
|
||||
ComponentName string
|
||||
// Name (or name/version) of the key to decrypt the message.
|
||||
// Overrides any key reference included in the message if present.
|
||||
// This is required if the message doesn't include a key reference (i.e. was created with omit_decryption_key_name set to true).
|
||||
KeyName string
|
||||
}
|
||||
|
||||
func (o DecryptOptions) getProto() proto.Message {
|
||||
return &runtimev1pb.DecryptRequestOptions{
|
||||
ComponentName: o.ComponentName,
|
||||
KeyName: o.KeyName,
|
||||
}
|
||||
}
|
|
@ -0,0 +1,288 @@
|
|||
/*
|
||||
Copyright 2023 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
commonv1 "github.com/dapr/go-sdk/dapr/proto/common/v1"
|
||||
runtimev1pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
|
||||
)
|
||||
|
||||
func TestEncrypt(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("missing ComponentName", func(t *testing.T) {
|
||||
out, err := testClient.Encrypt(ctx,
|
||||
strings.NewReader("hello world"),
|
||||
EncryptOptions{
|
||||
// ComponentName: "mycomponent",
|
||||
KeyName: "key",
|
||||
KeyWrapAlgorithm: "algorithm",
|
||||
},
|
||||
)
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "ComponentName")
|
||||
require.Nil(t, out)
|
||||
})
|
||||
|
||||
t.Run("missing Key", func(t *testing.T) {
|
||||
out, err := testClient.Encrypt(ctx,
|
||||
strings.NewReader("hello world"),
|
||||
EncryptOptions{
|
||||
ComponentName: "mycomponent",
|
||||
// KeyName: "key",
|
||||
KeyWrapAlgorithm: "algorithm",
|
||||
},
|
||||
)
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "Key")
|
||||
require.Nil(t, out)
|
||||
})
|
||||
|
||||
t.Run("missing Algorithm", func(t *testing.T) {
|
||||
out, err := testClient.Encrypt(ctx,
|
||||
strings.NewReader("hello world"),
|
||||
EncryptOptions{
|
||||
ComponentName: "mycomponent",
|
||||
KeyName: "key",
|
||||
// Algorithm: "algorithm",
|
||||
},
|
||||
)
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "Algorithm")
|
||||
require.Nil(t, out)
|
||||
})
|
||||
|
||||
t.Run("receiving back data sent", func(t *testing.T) {
|
||||
// The test server doesn't actually encrypt data
|
||||
out, err := testClient.Encrypt(ctx,
|
||||
strings.NewReader("hello world"),
|
||||
EncryptOptions{
|
||||
ComponentName: "mycomponent",
|
||||
KeyName: "key",
|
||||
KeyWrapAlgorithm: "algorithm",
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
|
||||
read, err := io.ReadAll(out)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "hello world", string(read))
|
||||
})
|
||||
|
||||
t.Run("error in input stream", func(t *testing.T) {
|
||||
out, err := testClient.Encrypt(ctx,
|
||||
&failingReader{
|
||||
data: strings.NewReader("hello world"),
|
||||
},
|
||||
EncryptOptions{
|
||||
ComponentName: "mycomponent",
|
||||
KeyName: "key",
|
||||
KeyWrapAlgorithm: "algorithm",
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
|
||||
_, err = io.ReadAll(out)
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "simulated")
|
||||
})
|
||||
|
||||
t.Run("context canceled", func(t *testing.T) {
|
||||
failingCtx, failingCancel := context.WithTimeout(ctx, time.Second)
|
||||
defer failingCancel()
|
||||
|
||||
out, err := testClient.Encrypt(failingCtx,
|
||||
&slowReader{
|
||||
// Should take a lot longer than 1s
|
||||
//nolint:dupword
|
||||
data: strings.NewReader("soft kitty, warm kitty, little ball of fur, happy kitty, sleepy kitty, purr purr purr"),
|
||||
delay: time.Second,
|
||||
},
|
||||
EncryptOptions{
|
||||
ComponentName: "mycomponent",
|
||||
KeyName: "key",
|
||||
KeyWrapAlgorithm: "algorithm",
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
|
||||
_, err = io.ReadAll(out)
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "context deadline exceeded")
|
||||
})
|
||||
}
|
||||
|
||||
func TestDecrypt(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("missing ComponentName", func(t *testing.T) {
|
||||
out, err := testClient.Decrypt(ctx,
|
||||
strings.NewReader("hello world"),
|
||||
DecryptOptions{
|
||||
// ComponentName: "mycomponent",
|
||||
},
|
||||
)
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "ComponentName")
|
||||
require.Nil(t, out)
|
||||
})
|
||||
|
||||
t.Run("receiving back data sent", func(t *testing.T) {
|
||||
// The test server doesn't actually decrypt data
|
||||
out, err := testClient.Decrypt(ctx,
|
||||
strings.NewReader("hello world"),
|
||||
DecryptOptions{
|
||||
ComponentName: "mycomponent",
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
|
||||
read, err := io.ReadAll(out)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "hello world", string(read))
|
||||
})
|
||||
|
||||
t.Run("error in input stream", func(t *testing.T) {
|
||||
out, err := testClient.Decrypt(ctx,
|
||||
&failingReader{
|
||||
data: strings.NewReader("hello world"),
|
||||
},
|
||||
DecryptOptions{
|
||||
ComponentName: "mycomponent",
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
|
||||
_, err = io.ReadAll(out)
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "simulated")
|
||||
})
|
||||
}
|
||||
|
||||
/* --- Server methods --- */
|
||||
|
||||
func (s *testDaprServer) EncryptAlpha1(stream runtimev1pb.Dapr_EncryptAlpha1Server) error {
|
||||
return s.performCryptoOperation(
|
||||
stream,
|
||||
&runtimev1pb.EncryptRequest{},
|
||||
&runtimev1pb.EncryptResponse{},
|
||||
)
|
||||
}
|
||||
|
||||
func (s *testDaprServer) DecryptAlpha1(stream runtimev1pb.Dapr_DecryptAlpha1Server) error {
|
||||
return s.performCryptoOperation(
|
||||
stream,
|
||||
&runtimev1pb.DecryptRequest{},
|
||||
&runtimev1pb.DecryptResponse{},
|
||||
)
|
||||
}
|
||||
|
||||
func (s *testDaprServer) performCryptoOperation(stream grpc.ServerStream, reqProto runtimev1pb.CryptoRequests, resProto runtimev1pb.CryptoResponses) error {
|
||||
// This doesn't really encrypt or decrypt the data and just sends back whatever it receives
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
var (
|
||||
done bool
|
||||
err error
|
||||
expectSeq uint64
|
||||
)
|
||||
first := true
|
||||
for !done && stream.Context().Err() == nil {
|
||||
reqProto.Reset()
|
||||
err = stream.RecvMsg(reqProto)
|
||||
if errors.Is(err, io.EOF) {
|
||||
done = true
|
||||
} else if err != nil {
|
||||
pw.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
|
||||
if first && !reqProto.HasOptions() {
|
||||
pw.CloseWithError(errors.New("first message must have options"))
|
||||
return
|
||||
} else if !first && reqProto.HasOptions() {
|
||||
pw.CloseWithError(errors.New("messages after first must not have options"))
|
||||
return
|
||||
}
|
||||
first = false
|
||||
|
||||
payload := reqProto.GetPayload()
|
||||
if payload != nil {
|
||||
if payload.Seq != expectSeq {
|
||||
pw.CloseWithError(fmt.Errorf("invalid sequence number: %d (expected: %d)", payload.Seq, expectSeq))
|
||||
return
|
||||
}
|
||||
expectSeq++
|
||||
|
||||
_, err = pw.Write(payload.Data)
|
||||
if err != nil {
|
||||
pw.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pw.Close()
|
||||
}()
|
||||
|
||||
var (
|
||||
done bool
|
||||
n int
|
||||
err error
|
||||
seq uint64
|
||||
)
|
||||
buf := make([]byte, 2<<10)
|
||||
for !done && stream.Context().Err() == nil {
|
||||
resProto.Reset()
|
||||
|
||||
n, err = pr.Read(buf)
|
||||
if errors.Is(err, io.EOF) {
|
||||
done = true
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if n > 0 {
|
||||
resProto.SetPayload(&commonv1.StreamPayload{
|
||||
Seq: seq,
|
||||
Data: buf[:n],
|
||||
})
|
||||
seq++
|
||||
|
||||
err = stream.SendMsg(resProto)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -13,7 +13,13 @@ limitations under the License.
|
|||
|
||||
package client
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestIsCloudEvent(t *testing.T) {
|
||||
testcases := []struct {
|
||||
|
@ -62,3 +68,53 @@ func TestIsCloudEvent(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Implements an io.Reader that simulates failures (after optionally reading from a stream in full)
|
||||
type failingReader struct {
|
||||
// Data to return before returning an error
|
||||
data io.Reader
|
||||
l sync.Mutex
|
||||
}
|
||||
|
||||
func (f *failingReader) Read(p []byte) (n int, err error) {
|
||||
f.l.Lock()
|
||||
defer f.l.Unlock()
|
||||
|
||||
if f.data != nil {
|
||||
n, err := f.data.Read(p)
|
||||
if err == nil {
|
||||
return n, nil
|
||||
} else if errors.Is(err, io.EOF) {
|
||||
// Do not return io.EOF as error
|
||||
// Instead, just delete the stream
|
||||
// On the next call, we will return an error
|
||||
f.data = nil
|
||||
return n, nil
|
||||
} else {
|
||||
// Should not happen
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return 0, errors.New("simulated")
|
||||
}
|
||||
|
||||
// Implements an io.Reader that returns data slowly
|
||||
type slowReader struct {
|
||||
// Data to return
|
||||
data io.Reader
|
||||
// Interval between every byte sent
|
||||
delay time.Duration
|
||||
l sync.Mutex
|
||||
}
|
||||
|
||||
func (s *slowReader) Read(p []byte) (n int, err error) {
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
|
||||
// Sleep
|
||||
time.Sleep(s.delay)
|
||||
|
||||
// Read one byte at a time
|
||||
return s.data.Read(p[0:1])
|
||||
}
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
Copyright 2023 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
commonv1pb "github.com/dapr/go-sdk/dapr/proto/common/v1"
|
||||
)
|
||||
|
||||
// CryptoRequests is an interface for EncryptRequest and DecryptRequest.
|
||||
type CryptoRequests interface {
|
||||
proto.Message
|
||||
|
||||
// SetPayload sets the payload.
|
||||
SetPayload(payload *commonv1pb.StreamPayload)
|
||||
// GetPayload returns the payload.
|
||||
GetPayload() *commonv1pb.StreamPayload
|
||||
// Reset the object.
|
||||
Reset()
|
||||
// SetOptions sets the Options property.
|
||||
SetOptions(opts proto.Message)
|
||||
// HasOptions returns true if the Options property is not empty.
|
||||
HasOptions() bool
|
||||
}
|
||||
|
||||
func (x *EncryptRequest) SetPayload(payload *commonv1pb.StreamPayload) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
|
||||
x.Payload = payload
|
||||
}
|
||||
|
||||
func (x *EncryptRequest) SetOptions(opts proto.Message) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
|
||||
x.Options = opts.(*EncryptRequestOptions)
|
||||
}
|
||||
|
||||
func (x *EncryptRequest) HasOptions() bool {
|
||||
return x != nil && x.Options != nil
|
||||
}
|
||||
|
||||
func (x *DecryptRequest) SetPayload(payload *commonv1pb.StreamPayload) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
|
||||
x.Payload = payload
|
||||
}
|
||||
|
||||
func (x *DecryptRequest) SetOptions(opts proto.Message) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
|
||||
x.Options = opts.(*DecryptRequestOptions)
|
||||
}
|
||||
|
||||
func (x *DecryptRequest) HasOptions() bool {
|
||||
return x != nil && x.Options != nil
|
||||
}
|
||||
|
||||
// CryptoResponses is an interface for EncryptResponse and DecryptResponse.
|
||||
type CryptoResponses interface {
|
||||
proto.Message
|
||||
|
||||
// SetPayload sets the payload.
|
||||
SetPayload(payload *commonv1pb.StreamPayload)
|
||||
// GetPayload returns the payload.
|
||||
GetPayload() *commonv1pb.StreamPayload
|
||||
// Reset the object.
|
||||
Reset()
|
||||
}
|
||||
|
||||
func (x *EncryptResponse) SetPayload(payload *commonv1pb.StreamPayload) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
|
||||
x.Payload = payload
|
||||
}
|
||||
|
||||
func (x *DecryptResponse) SetPayload(payload *commonv1pb.StreamPayload) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
|
||||
x.Payload = payload
|
||||
}
|
Loading…
Reference in New Issue