go-sdk/client/crypto.go

273 lines
7.9 KiB
Go

/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"errors"
"fmt"
"io"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
)
// Encrypt data read from a stream, returning a readable stream that receives the encrypted data.
// This method returns an error if the initial call fails. Errors performed during the encryption are received by the out stream.
func (c *GRPCClient) Encrypt(ctx context.Context, in io.Reader, opts EncryptOptions) (io.Reader, error) {
// Ensure required options are present
// This short-circuits and avoids a call to the runtime
if opts.ComponentName == "" {
return nil, errors.New("option 'ComponentName' is required")
}
if opts.KeyName == "" {
return nil, errors.New("option 'KeyName' is required")
}
if opts.KeyWrapAlgorithm == "" {
return nil, errors.New("option 'Algorithm' is required")
}
// Create the stream
stream, err := c.protoClient.EncryptAlpha1(ctx)
if err != nil {
return nil, err
}
// Use the context of the stream here.
return c.performCryptoOperation(
stream.Context(), stream,
in, opts,
&runtimev1pb.EncryptRequest{},
&runtimev1pb.EncryptResponse{},
)
}
// Decrypt data read from a stream, returning a readable stream that receives the decrypted data.
// This method returns an error if the initial call fails. Errors performed during the encryption are received by the out stream.
func (c *GRPCClient) Decrypt(ctx context.Context, in io.Reader, opts DecryptOptions) (io.Reader, error) {
// Ensure required options are present
// This short-circuits and avoids a call to the runtime
if opts.ComponentName == "" {
return nil, errors.New("option 'ComponentName' is required")
}
// Create the stream
stream, err := c.protoClient.DecryptAlpha1(ctx)
if err != nil {
return nil, err
}
// Use the context of the stream here.
return c.performCryptoOperation(
stream.Context(), stream,
in, opts,
&runtimev1pb.DecryptRequest{},
&runtimev1pb.DecryptResponse{},
)
}
func (c *GRPCClient) performCryptoOperation(ctx context.Context, stream grpc.ClientStream, in io.Reader, opts cryptoOperationOpts, reqProto runtimev1pb.CryptoRequests, resProto runtimev1pb.CryptoResponses) (io.Reader, error) {
var err error
// Pipe for writing the response
pr, pw := io.Pipe()
// Send the request in a background goroutine
go func() {
// Build the options object for the first message
optsProto := opts.getProto()
// Get a buffer from the pool
reqBuf := bufPool.Get().(*[]byte)
defer func() {
bufPool.Put(reqBuf)
}()
// Send the request in chunks
var (
n int
seq uint64
done bool
)
for {
if ctx.Err() != nil {
pw.CloseWithError(ctx.Err())
return
}
// First message only - add the options
if optsProto != nil {
reqProto.SetOptions(optsProto)
optsProto = nil
} else {
// Reset the object so we can re-use it
reqProto.Reset()
}
n, err = in.Read(*reqBuf)
if err == io.EOF {
done = true
} else if err != nil {
pw.CloseWithError(err)
return
}
// Send the chunk if there's anything to send
if n > 0 {
reqProto.SetPayload(&commonv1pb.StreamPayload{
Data: (*reqBuf)[:n],
Seq: seq,
})
seq++
err = stream.SendMsg(reqProto)
if errors.Is(err, io.EOF) {
// If SendMsg returns an io.EOF error, it usually means that there's a transport-level error
// The exact error can only be determined by RecvMsg, so if we encounter an EOF error here, just consider the stream done and let RecvMsg handle the error
done = true
} else if err != nil {
pw.CloseWithError(fmt.Errorf("error sending message: %w", err))
return
}
}
// Stop the loop with the last chunk
if done {
err = stream.CloseSend()
if err != nil {
pw.CloseWithError(fmt.Errorf("failed to close the send direction of the stream: %w", err))
return
}
break
}
}
}()
// Read the response in another goroutine
go func() {
var (
expectSeq uint64
readErr error
done bool
payload *commonv1pb.StreamPayload
)
// Read until the end of the stream
for {
if ctx.Err() != nil {
pw.CloseWithError(ctx.Err())
return
}
// Read the next chunk
readErr = stream.RecvMsg(resProto)
if errors.Is(readErr, io.EOF) {
// Receiving an io.EOF signifies that the client has stopped sending data over the pipe, so this is the end
done = true
} else if readErr != nil {
pw.CloseWithError(fmt.Errorf("error receiving message: %w", readErr))
return
}
// Write the data, if any, into the pipe
payload = resProto.GetPayload()
if payload != nil {
if payload.GetSeq() != expectSeq {
pw.CloseWithError(fmt.Errorf("invalid sequence number in chunk: %d (expected: %d)", payload.GetSeq(), expectSeq))
return
}
expectSeq++
_, readErr = pw.Write(payload.GetData())
if readErr != nil {
pw.CloseWithError(fmt.Errorf("error writing data: %w", readErr))
return
}
}
// Stop when done
if done {
break
}
// Reset the proto
resProto.Reset()
}
// Close the writer of the pipe when done
pw.Close()
}()
// Return the readable stream
return pr, nil
}
// Interface for EncryptOptions and DecryptOptions
type cryptoOperationOpts interface {
getProto() proto.Message
}
// EncryptOptions contains options passed to the Encrypt method.
type EncryptOptions struct {
// Name of the component. Required.
ComponentName string
// Name (or name/version) of the key. Required.
KeyName string
// Key wrapping algorithm to use. Required.
// Supported options include: A256KW, A128CBC, A192CBC, A256CBC, RSA-OAEP-256.
KeyWrapAlgorithm string
// DataEncryptionCipher to use to encrypt data (optional): "aes-gcm" (default) or "chacha20-poly1305"
DataEncryptionCipher string
// If true, the encrypted document does not contain a key reference.
// In that case, calls to the Decrypt method must provide a key reference (name or name/version).
// Defaults to false.
OmitDecryptionKeyName bool
// Key reference to embed in the encrypted document (name or name/version).
// This is helpful if the reference of the key used to decrypt the document is different from the one used to encrypt it.
// If unset, uses the reference of the key used to encrypt the document (this is the default behavior).
// This option is ignored if omit_decryption_key_name is true.
DecryptionKeyName string
}
func (o EncryptOptions) getProto() proto.Message {
return &runtimev1pb.EncryptRequestOptions{
ComponentName: o.ComponentName,
KeyName: o.KeyName,
KeyWrapAlgorithm: o.KeyWrapAlgorithm,
DataEncryptionCipher: o.DataEncryptionCipher,
OmitDecryptionKeyName: o.OmitDecryptionKeyName,
DecryptionKeyName: o.DecryptionKeyName,
}
}
// DecryptOptions contains options passed to the Decrypt method.
type DecryptOptions struct {
// Name of the component. Required.
ComponentName string
// Name (or name/version) of the key to decrypt the message.
// Overrides any key reference included in the message if present.
// This is required if the message doesn't include a key reference (i.e. was created with omit_decryption_key_name set to true).
KeyName string
}
func (o DecryptOptions) getProto() proto.Message {
return &runtimev1pb.DecryptRequestOptions{
ComponentName: o.ComponentName,
KeyName: o.KeyName,
}
}