mirror of https://github.com/containerd/ttrpc.git
Compare commits
49 Commits
Author | SHA1 | Date |
---|---|---|
|
ababa3fc18 | |
|
4729a87007 | |
|
3b8c8b7557 | |
|
644ecfaa4c | |
|
430f734791 | |
|
b71d9dee11 | |
|
bcc40a4d69 | |
|
c4d96d55ad | |
|
ed6c3ba082 | |
|
b5cd6e4b32 | |
|
d8c00dfec3 | |
|
de273bf751 | |
|
3f02183720 | |
|
84e1784f34 | |
|
655622931d | |
|
4785c70883 | |
|
19d523c66a | |
|
196dbef628 | |
|
ef5734239e | |
|
1b4f6f8edb | |
|
aa5f2d4e10 | |
|
13b8289864 | |
|
272c8575a6 | |
|
4a2816be9b | |
|
e0f3eadca5 | |
|
589a593abc | |
|
faba5896a9 | |
|
73b6a9156d | |
|
90d421ee7e | |
|
44ca0096e1 | |
|
6615f159ba | |
|
dea99e9d05 | |
|
336fc1b6b4 | |
|
baadfd8e79 | |
|
1e51c4681d | |
|
b6bd7ce660 | |
|
bea960d9fe | |
|
9c0db2b1c3 | |
|
40f227ddbb | |
|
f984c9b178 | |
|
b2f0adabbf | |
|
9287d4978d | |
|
a2fbc14815 | |
|
cf2b85de12 | |
|
8ca4110ebc | |
|
05e0d07d83 | |
|
e0cd801116 | |
|
712429a9f0 | |
|
8d4784675e |
|
@ -0,0 +1,16 @@
|
|||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: "gomod"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
open-pull-requests-limit: 10
|
||||
groups:
|
||||
golang-x:
|
||||
patterns:
|
||||
- "golang.org/x/*"
|
||||
- package-ecosystem: "github-actions"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
open-pull-requests-limit: 10
|
|
@ -7,7 +7,7 @@ on:
|
|||
branches: [ main ]
|
||||
|
||||
env:
|
||||
GO_VERSION: 1.20.x
|
||||
GO_VERSION: 1.23.x
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
@ -27,27 +27,21 @@ jobs:
|
|||
os: [ubuntu-latest, macos-latest, windows-latest]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
path: src/github.com/containerd/ttrpc
|
||||
|
||||
- uses: actions/setup-go@v3
|
||||
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
|
||||
with:
|
||||
go-version: ${{ env.GO_VERSION }}
|
||||
|
||||
- name: golangci-lint
|
||||
uses: golangci/golangci-lint-action@v3
|
||||
uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8 # v6.1.1
|
||||
with:
|
||||
version: v1.51.2
|
||||
version: v1.60.3
|
||||
args: --timeout=5m
|
||||
skip-cache: true
|
||||
working-directory: src/github.com/containerd/ttrpc
|
||||
|
||||
- name: golangci-lint errors
|
||||
run: golangci-lint run
|
||||
working-directory: src/github.com/containerd/ttrpc
|
||||
if: ${{ failure() }}
|
||||
|
||||
#
|
||||
# Project checks
|
||||
#
|
||||
|
@ -57,16 +51,15 @@ jobs:
|
|||
timeout-minutes: 5
|
||||
|
||||
steps:
|
||||
- uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ${{ env.GO_VERSION }}
|
||||
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
path: src/github.com/containerd/ttrpc
|
||||
fetch-depth: 25
|
||||
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
|
||||
with:
|
||||
go-version: ${{ env.GO_VERSION }}
|
||||
|
||||
- uses: containerd/project-checks@v1.1.0
|
||||
- uses: containerd/project-checks@434a07157608eeaa1d5c8d4dd506154204cd9401 # v1.1.0
|
||||
with:
|
||||
working-directory: src/github.com/containerd/ttrpc
|
||||
|
||||
|
@ -78,22 +71,20 @@ jobs:
|
|||
fail-fast: false
|
||||
matrix:
|
||||
os: [ubuntu-latest, macos-latest, windows-latest]
|
||||
go: [1.19.x, 1.20.x]
|
||||
go: [1.22.x, 1.23.x]
|
||||
|
||||
name: ${{ matrix.os }} / ${{ matrix.go }}
|
||||
runs-on: ${{ matrix.os }}
|
||||
timeout-minutes: 5
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
|
||||
- uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ${{ matrix.go }}
|
||||
|
||||
- name: Check out code
|
||||
uses: actions/checkout@v3
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
path: src/github.com/containerd/ttrpc
|
||||
fetch-depth: 25
|
||||
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
|
||||
with:
|
||||
go-version: ${{ matrix.go }}
|
||||
|
||||
- name: Test
|
||||
working-directory: src/github.com/containerd/ttrpc
|
||||
|
@ -119,7 +110,11 @@ jobs:
|
|||
timeout-minutes: 5
|
||||
steps:
|
||||
|
||||
- uses: actions/setup-go@v3
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
path: src/github.com/containerd/ttrpc
|
||||
fetch-depth: 25
|
||||
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
|
||||
with:
|
||||
go-version: ${{ env.GO_VERSION }}
|
||||
id: go
|
||||
|
@ -130,12 +125,6 @@ jobs:
|
|||
echo "GOPATH=${{ github.workspace }}" >> $GITHUB_ENV
|
||||
echo "${{ github.workspace }}/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Check out code
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
path: src/github.com/containerd/ttrpc
|
||||
fetch-depth: 25
|
||||
|
||||
- name: Install dependencies
|
||||
working-directory: src/github.com/containerd/ttrpc
|
||||
run: |
|
||||
|
|
|
@ -6,7 +6,7 @@ linters:
|
|||
- goimports
|
||||
- revive
|
||||
- ineffassign
|
||||
- vet
|
||||
- govet
|
||||
- unused
|
||||
- misspell
|
||||
disable:
|
||||
|
@ -14,7 +14,7 @@ linters:
|
|||
|
||||
linters-settings:
|
||||
revive:
|
||||
ignore-generated-headers: true
|
||||
ignore-generated-header: true
|
||||
rules:
|
||||
- name: blank-imports
|
||||
- name: context-as-argument
|
||||
|
@ -45,8 +45,8 @@ linters-settings:
|
|||
issues:
|
||||
include:
|
||||
- EXC0002
|
||||
exclude-dirs:
|
||||
- example
|
||||
|
||||
run:
|
||||
timeout: 8m
|
||||
skip-dirs:
|
||||
- example
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# ttrpc
|
||||
|
||||
[](https://github.com/containerd/ttrpc/actions?query=workflow%3ACI)
|
||||
[](https://github.com/containerd/ttrpc/actions/workflows/ci.yml)
|
||||
|
||||
GRPC for low-memory environments.
|
||||
|
||||
|
|
|
@ -143,10 +143,10 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
|
|||
}
|
||||
|
||||
func (ch *channel) send(streamID uint32, t messageType, flags uint8, p []byte) error {
|
||||
// TODO: Error on send rather than on recv
|
||||
//if len(p) > messageLengthMax {
|
||||
// return status.Errorf(codes.InvalidArgument, "refusing to send, message length %v exceed maximum message size of %v", len(p), messageLengthMax)
|
||||
//}
|
||||
if len(p) > messageLengthMax {
|
||||
return OversizedMessageError(len(p))
|
||||
}
|
||||
|
||||
if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t, Flags: flags}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -89,21 +89,19 @@ func TestReadWriteMessage(t *testing.T) {
|
|||
|
||||
func TestMessageOversize(t *testing.T) {
|
||||
var (
|
||||
w, r = net.Pipe()
|
||||
wch, rch = newChannel(w), newChannel(r)
|
||||
msg = bytes.Repeat([]byte("a message of massive length"), 512<<10)
|
||||
errs = make(chan error, 1)
|
||||
w, _ = net.Pipe()
|
||||
wch = newChannel(w)
|
||||
msg = bytes.Repeat([]byte("a message of massive length"), 512<<10)
|
||||
errs = make(chan error, 1)
|
||||
)
|
||||
|
||||
go func() {
|
||||
if err := wch.send(1, 1, 0, msg); err != nil {
|
||||
errs <- err
|
||||
}
|
||||
errs <- wch.send(1, 1, 0, msg)
|
||||
}()
|
||||
|
||||
_, _, err := rch.recv()
|
||||
err := <-errs
|
||||
if err == nil {
|
||||
t.Fatalf("error expected reading with small buffer")
|
||||
t.Fatalf("sending oversized message expected to fail")
|
||||
}
|
||||
|
||||
status, ok := status.FromError(err)
|
||||
|
@ -114,12 +112,4 @@ func TestMessageOversize(t *testing.T) {
|
|||
if status.Code() != codes.ResourceExhausted {
|
||||
t.Fatalf("expected grpc status code: %v != %v", status.Code(), codes.ResourceExhausted)
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errs:
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
86
client.go
86
client.go
|
@ -27,7 +27,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/containerd/log"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
@ -71,6 +71,42 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
|
|||
}
|
||||
}
|
||||
|
||||
// WithChainUnaryClientInterceptor sets the provided chain of client interceptors
|
||||
func WithChainUnaryClientInterceptor(interceptors ...UnaryClientInterceptor) ClientOpts {
|
||||
return func(c *Client) {
|
||||
if len(interceptors) == 0 {
|
||||
return
|
||||
}
|
||||
if c.interceptor != nil {
|
||||
interceptors = append([]UnaryClientInterceptor{c.interceptor}, interceptors...)
|
||||
}
|
||||
c.interceptor = func(
|
||||
ctx context.Context,
|
||||
req *Request,
|
||||
reply *Response,
|
||||
info *UnaryClientInfo,
|
||||
final Invoker,
|
||||
) error {
|
||||
return interceptors[0](ctx, req, reply, info,
|
||||
chainUnaryInterceptors(interceptors[1:], final, info))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker, info *UnaryClientInfo) Invoker {
|
||||
if len(interceptors) == 0 {
|
||||
return final
|
||||
}
|
||||
return func(
|
||||
ctx context.Context,
|
||||
req *Request,
|
||||
reply *Response,
|
||||
) error {
|
||||
return interceptors[0](ctx, req, reply, info,
|
||||
chainUnaryInterceptors(interceptors[1:], final, info))
|
||||
}
|
||||
}
|
||||
|
||||
// NewClient creates a new ttrpc client using the given connection
|
||||
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
@ -85,13 +121,16 @@ func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
|
|||
ctx: ctx,
|
||||
userCloseFunc: func() {},
|
||||
userCloseWaitCh: make(chan struct{}),
|
||||
interceptor: defaultClientInterceptor,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(c)
|
||||
}
|
||||
|
||||
if c.interceptor == nil {
|
||||
c.interceptor = defaultClientInterceptor
|
||||
}
|
||||
|
||||
go c.run()
|
||||
return c
|
||||
}
|
||||
|
@ -286,7 +325,7 @@ func (c *Client) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// UserOnCloseWait is used to blocks untils the user's on-close callback
|
||||
// UserOnCloseWait is used to block until the user's on-close callback
|
||||
// finishes.
|
||||
func (c *Client) UserOnCloseWait(ctx context.Context) error {
|
||||
select {
|
||||
|
@ -329,7 +368,7 @@ func (c *Client) receiveLoop() error {
|
|||
sid := streamID(msg.header.StreamID)
|
||||
s := c.getStream(sid)
|
||||
if s == nil {
|
||||
logrus.WithField("stream", sid).Errorf("ttrpc: received message on inactive stream")
|
||||
log.G(c.ctx).WithField("stream", sid).Error("ttrpc: received message on inactive stream")
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -337,7 +376,7 @@ func (c *Client) receiveLoop() error {
|
|||
s.closeWithError(err)
|
||||
} else {
|
||||
if err := s.receive(c.ctx, msg); err != nil {
|
||||
logrus.WithError(err).WithField("stream", sid).Errorf("ttrpc: failed to handle message")
|
||||
log.G(c.ctx).WithFields(log.Fields{"error": err, "stream": sid}).Error("ttrpc: failed to handle message")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -347,25 +386,44 @@ func (c *Client) receiveLoop() error {
|
|||
// createStream creates a new stream and registers it with the client
|
||||
// Introduce stream types for multiple or single response
|
||||
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
|
||||
c.streamLock.Lock()
|
||||
// sendLock must be held across both allocation of the stream ID and sending it across the wire.
|
||||
// This ensures that new stream IDs sent on the wire are always increasing, which is a
|
||||
// requirement of the TTRPC protocol.
|
||||
// This use of sendLock could be split into another mutex that covers stream creation + first send,
|
||||
// and just use sendLock to guard writing to the wire, but for now it seems simpler to have fewer mutexes.
|
||||
c.sendLock.Lock()
|
||||
defer c.sendLock.Unlock()
|
||||
|
||||
// Check if closed since lock acquired to prevent adding
|
||||
// anything after cleanup completes
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
c.streamLock.Unlock()
|
||||
return nil, ErrClosed
|
||||
default:
|
||||
}
|
||||
|
||||
// Stream ID should be allocated at same time
|
||||
s := newStream(c.nextStreamID, c)
|
||||
c.streams[s.id] = s
|
||||
c.nextStreamID = c.nextStreamID + 2
|
||||
var s *stream
|
||||
if err := func() error {
|
||||
// In the future this could be replaced with a sync.Map instead of streamLock+map.
|
||||
c.streamLock.Lock()
|
||||
defer c.streamLock.Unlock()
|
||||
|
||||
c.sendLock.Lock()
|
||||
defer c.sendLock.Unlock()
|
||||
c.streamLock.Unlock()
|
||||
// Check if closed since lock acquired to prevent adding
|
||||
// anything after cleanup completes
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return ErrClosed
|
||||
default:
|
||||
}
|
||||
|
||||
s = newStream(c.nextStreamID, c)
|
||||
c.streams[s.id] = s
|
||||
c.nextStreamID = c.nextStreamID + 2
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
|
||||
return s, filterCloseErr(err)
|
||||
|
|
|
@ -18,6 +18,7 @@ package main
|
|||
|
||||
import (
|
||||
"google.golang.org/protobuf/compiler/protogen"
|
||||
"google.golang.org/protobuf/types/pluginpb"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
@ -30,6 +31,7 @@ func main() {
|
|||
return nil
|
||||
},
|
||||
}.Run(func(gen *protogen.Plugin) error {
|
||||
gen.SupportedFeatures = uint64(pluginpb.CodeGeneratorResponse_FEATURE_PROTO3_OPTIONAL)
|
||||
for _, f := range gen.Files {
|
||||
if !f.Generate {
|
||||
continue
|
||||
|
|
38
config.go
38
config.go
|
@ -16,7 +16,10 @@
|
|||
|
||||
package ttrpc
|
||||
|
||||
import "errors"
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
)
|
||||
|
||||
type serverConfig struct {
|
||||
handshaker Handshaker
|
||||
|
@ -44,9 +47,40 @@ func WithServerHandshaker(handshaker Handshaker) ServerOpt {
|
|||
func WithUnaryServerInterceptor(i UnaryServerInterceptor) ServerOpt {
|
||||
return func(c *serverConfig) error {
|
||||
if c.interceptor != nil {
|
||||
return errors.New("only one interceptor allowed per server")
|
||||
return errors.New("only one unchained interceptor allowed per server")
|
||||
}
|
||||
c.interceptor = i
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithChainUnaryServerInterceptor sets the provided chain of server interceptors
|
||||
func WithChainUnaryServerInterceptor(interceptors ...UnaryServerInterceptor) ServerOpt {
|
||||
return func(c *serverConfig) error {
|
||||
if len(interceptors) == 0 {
|
||||
return nil
|
||||
}
|
||||
if c.interceptor != nil {
|
||||
interceptors = append([]UnaryServerInterceptor{c.interceptor}, interceptors...)
|
||||
}
|
||||
c.interceptor = func(
|
||||
ctx context.Context,
|
||||
unmarshal Unmarshaler,
|
||||
info *UnaryServerInfo,
|
||||
method Method) (interface{}, error) {
|
||||
return interceptors[0](ctx, unmarshal, info,
|
||||
chainUnaryServerInterceptors(info, method, interceptors[1:]))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func chainUnaryServerInterceptors(info *UnaryServerInfo, method Method, interceptors []UnaryServerInterceptor) Method {
|
||||
if len(interceptors) == 0 {
|
||||
return method
|
||||
}
|
||||
return func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
|
||||
return interceptors[0](ctx, unmarshal, info,
|
||||
chainUnaryServerInterceptors(info, method, interceptors[1:]))
|
||||
}
|
||||
}
|
||||
|
|
48
errors.go
48
errors.go
|
@ -16,7 +16,12 @@
|
|||
|
||||
package ttrpc
|
||||
|
||||
import "errors"
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrProtocol is a general error in the handling the protocol.
|
||||
|
@ -32,3 +37,44 @@ var (
|
|||
// ErrStreamClosed is when the streaming connection is closed.
|
||||
ErrStreamClosed = errors.New("ttrpc: stream closed")
|
||||
)
|
||||
|
||||
// OversizedMessageErr is used to indicate refusal to send an oversized message.
|
||||
// It wraps a ResourceExhausted grpc Status together with the offending message
|
||||
// length.
|
||||
type OversizedMessageErr struct {
|
||||
messageLength int
|
||||
err error
|
||||
}
|
||||
|
||||
// OversizedMessageError returns an OversizedMessageErr error for the given message
|
||||
// length if it exceeds the allowed maximum. Otherwise a nil error is returned.
|
||||
func OversizedMessageError(messageLength int) error {
|
||||
if messageLength <= messageLengthMax {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &OversizedMessageErr{
|
||||
messageLength: messageLength,
|
||||
err: status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", messageLength, messageLengthMax),
|
||||
}
|
||||
}
|
||||
|
||||
// Error returns the error message for the corresponding grpc Status for the error.
|
||||
func (e *OversizedMessageErr) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
||||
|
||||
// Unwrap returns the corresponding error with our grpc status code.
|
||||
func (e *OversizedMessageErr) Unwrap() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
// RejectedLength retrieves the rejected message length which triggered the error.
|
||||
func (e *OversizedMessageErr) RejectedLength() int {
|
||||
return e.messageLength
|
||||
}
|
||||
|
||||
// MaximumLength retrieves the maximum allowed message length that triggered the error.
|
||||
func (*OversizedMessageErr) MaximumLength() int {
|
||||
return messageLengthMax
|
||||
}
|
||||
|
|
16
go.mod
16
go.mod
|
@ -1,14 +1,16 @@
|
|||
module github.com/containerd/ttrpc
|
||||
|
||||
go 1.13
|
||||
go 1.22
|
||||
|
||||
require (
|
||||
github.com/containerd/log v0.1.0
|
||||
github.com/gogo/protobuf v1.3.2
|
||||
github.com/golang/protobuf v1.5.0
|
||||
github.com/golang/protobuf v1.5.4
|
||||
github.com/prometheus/procfs v0.6.0
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
golang.org/x/sys v0.1.0
|
||||
google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63
|
||||
google.golang.org/grpc v1.27.1
|
||||
google.golang.org/protobuf v1.27.1
|
||||
golang.org/x/sys v0.26.0
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38
|
||||
google.golang.org/grpc v1.69.2
|
||||
google.golang.org/protobuf v1.36.0
|
||||
)
|
||||
|
||||
require github.com/sirupsen/logrus v1.9.3 // indirect
|
||||
|
|
89
go.sum
89
go.sum
|
@ -1,100 +1,69 @@
|
|||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
|
||||
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
|
||||
github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
|
||||
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
||||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
|
||||
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
|
||||
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
|
||||
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
|
||||
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
|
||||
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
|
||||
google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63 h1:YzfoEYWbODU5Fbt37+h7X16BWQbad7Q4S6gclTKFXM8=
|
||||
google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
|
||||
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
|
||||
google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk=
|
||||
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
|
||||
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
|
||||
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
|
||||
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
|
||||
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
|
||||
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
|
@ -211,7 +211,7 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc =
|
|||
0x52, 0x03, 0x61, 0x64, 0x64, 0x22, 0x29, 0x0a, 0x03, 0x53, 0x75, 0x6d, 0x12, 0x10, 0x0a, 0x03,
|
||||
0x73, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x73, 0x75, 0x6d, 0x12, 0x10,
|
||||
0x0a, 0x03, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x6e, 0x75, 0x6d,
|
||||
0x32, 0xa0, 0x04, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x5a,
|
||||
0x32, 0xfa, 0x04, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x5a,
|
||||
0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x12, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69,
|
||||
0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61,
|
||||
0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
|
||||
|
@ -245,11 +245,17 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc =
|
|||
0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f,
|
||||
0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
|
||||
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x28,
|
||||
0x01, 0x30, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
|
||||
0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72,
|
||||
0x70, 0x63, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73,
|
||||
0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
|
||||
0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x01, 0x30, 0x01, 0x12, 0x58, 0x0a, 0x12, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x50, 0x61, 0x79, 0x6c,
|
||||
0x6f, 0x61, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
|
||||
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
|
||||
0x79, 0x1a, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72,
|
||||
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e,
|
||||
0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x30, 0x01, 0x42, 0x3d, 0x5a,
|
||||
0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74,
|
||||
0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2f, 0x69, 0x6e, 0x74,
|
||||
0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
|
||||
0x6e, 0x67, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -278,14 +284,16 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_depIdxs =
|
|||
2, // 3: ttrpc.integration.streaming.Streaming.DivideStream:input_type -> ttrpc.integration.streaming.Sum
|
||||
0, // 4: ttrpc.integration.streaming.Streaming.EchoNull:input_type -> ttrpc.integration.streaming.EchoPayload
|
||||
0, // 5: ttrpc.integration.streaming.Streaming.EchoNullStream:input_type -> ttrpc.integration.streaming.EchoPayload
|
||||
0, // 6: ttrpc.integration.streaming.Streaming.Echo:output_type -> ttrpc.integration.streaming.EchoPayload
|
||||
0, // 7: ttrpc.integration.streaming.Streaming.EchoStream:output_type -> ttrpc.integration.streaming.EchoPayload
|
||||
2, // 8: ttrpc.integration.streaming.Streaming.SumStream:output_type -> ttrpc.integration.streaming.Sum
|
||||
1, // 9: ttrpc.integration.streaming.Streaming.DivideStream:output_type -> ttrpc.integration.streaming.Part
|
||||
3, // 10: ttrpc.integration.streaming.Streaming.EchoNull:output_type -> google.protobuf.Empty
|
||||
3, // 11: ttrpc.integration.streaming.Streaming.EchoNullStream:output_type -> google.protobuf.Empty
|
||||
6, // [6:12] is the sub-list for method output_type
|
||||
0, // [0:6] is the sub-list for method input_type
|
||||
3, // 6: ttrpc.integration.streaming.Streaming.EmptyPayloadStream:input_type -> google.protobuf.Empty
|
||||
0, // 7: ttrpc.integration.streaming.Streaming.Echo:output_type -> ttrpc.integration.streaming.EchoPayload
|
||||
0, // 8: ttrpc.integration.streaming.Streaming.EchoStream:output_type -> ttrpc.integration.streaming.EchoPayload
|
||||
2, // 9: ttrpc.integration.streaming.Streaming.SumStream:output_type -> ttrpc.integration.streaming.Sum
|
||||
1, // 10: ttrpc.integration.streaming.Streaming.DivideStream:output_type -> ttrpc.integration.streaming.Part
|
||||
3, // 11: ttrpc.integration.streaming.Streaming.EchoNull:output_type -> google.protobuf.Empty
|
||||
3, // 12: ttrpc.integration.streaming.Streaming.EchoNullStream:output_type -> google.protobuf.Empty
|
||||
0, // 13: ttrpc.integration.streaming.Streaming.EmptyPayloadStream:output_type -> ttrpc.integration.streaming.EchoPayload
|
||||
7, // [7:14] is the sub-list for method output_type
|
||||
0, // [0:7] is the sub-list for method input_type
|
||||
0, // [0:0] is the sub-list for extension type_name
|
||||
0, // [0:0] is the sub-list for extension extendee
|
||||
0, // [0:0] is the sub-list for field type_name
|
||||
|
|
|
@ -34,6 +34,7 @@ service Streaming {
|
|||
rpc DivideStream(Sum) returns (stream Part);
|
||||
rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty);
|
||||
rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty);
|
||||
rpc EmptyPayloadStream(google.protobuf.Empty) returns (stream EchoPayload);
|
||||
}
|
||||
|
||||
message EchoPayload {
|
||||
|
|
|
@ -15,6 +15,7 @@ type TTRPCStreamingService interface {
|
|||
DivideStream(context.Context, *Sum, TTRPCStreaming_DivideStreamServer) error
|
||||
EchoNull(context.Context, TTRPCStreaming_EchoNullServer) (*emptypb.Empty, error)
|
||||
EchoNullStream(context.Context, TTRPCStreaming_EchoNullStreamServer) error
|
||||
EmptyPayloadStream(context.Context, *emptypb.Empty, TTRPCStreaming_EmptyPayloadStreamServer) error
|
||||
}
|
||||
|
||||
type TTRPCStreaming_EchoStreamServer interface {
|
||||
|
@ -108,6 +109,19 @@ func (x *ttrpcstreamingEchoNullStreamServer) Recv() (*EchoPayload, error) {
|
|||
return m, nil
|
||||
}
|
||||
|
||||
type TTRPCStreaming_EmptyPayloadStreamServer interface {
|
||||
Send(*EchoPayload) error
|
||||
ttrpc.StreamServer
|
||||
}
|
||||
|
||||
type ttrpcstreamingEmptyPayloadStreamServer struct {
|
||||
ttrpc.StreamServer
|
||||
}
|
||||
|
||||
func (x *ttrpcstreamingEmptyPayloadStreamServer) Send(m *EchoPayload) error {
|
||||
return x.StreamServer.SendMsg(m)
|
||||
}
|
||||
|
||||
func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService) {
|
||||
srv.RegisterService("ttrpc.integration.streaming.Streaming", &ttrpc.ServiceDesc{
|
||||
Methods: map[string]ttrpc.Method{
|
||||
|
@ -159,6 +173,17 @@ func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService)
|
|||
StreamingClient: true,
|
||||
StreamingServer: true,
|
||||
},
|
||||
"EmptyPayloadStream": {
|
||||
Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
|
||||
m := new(emptypb.Empty)
|
||||
if err := stream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, svc.EmptyPayloadStream(ctx, m, &ttrpcstreamingEmptyPayloadStreamServer{stream})
|
||||
},
|
||||
StreamingClient: false,
|
||||
StreamingServer: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
@ -170,6 +195,7 @@ type TTRPCStreamingClient interface {
|
|||
DivideStream(context.Context, *Sum) (TTRPCStreaming_DivideStreamClient, error)
|
||||
EchoNull(context.Context) (TTRPCStreaming_EchoNullClient, error)
|
||||
EchoNullStream(context.Context) (TTRPCStreaming_EchoNullStreamClient, error)
|
||||
EmptyPayloadStream(context.Context, *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error)
|
||||
}
|
||||
|
||||
type ttrpcstreamingClient struct {
|
||||
|
@ -360,3 +386,32 @@ func (x *ttrpcstreamingEchoNullStreamClient) Recv() (*emptypb.Empty, error) {
|
|||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (c *ttrpcstreamingClient) EmptyPayloadStream(ctx context.Context, req *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error) {
|
||||
stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
|
||||
StreamingClient: false,
|
||||
StreamingServer: true,
|
||||
}, "ttrpc.integration.streaming.Streaming", "EmptyPayloadStream", req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &ttrpcstreamingEmptyPayloadStreamClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type TTRPCStreaming_EmptyPayloadStreamClient interface {
|
||||
Recv() (*EchoPayload, error)
|
||||
ttrpc.ClientStream
|
||||
}
|
||||
|
||||
type ttrpcstreamingEmptyPayloadStreamClient struct {
|
||||
ttrpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *ttrpcstreamingEmptyPayloadStreamClient) Recv() (*EchoPayload, error) {
|
||||
m := new(EchoPayload)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/containerd/ttrpc"
|
||||
"github.com/containerd/ttrpc/integration/streaming"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
)
|
||||
|
||||
func runService(ctx context.Context, t testing.TB, service streaming.TTRPCStreamingService) (streaming.TTRPCStreamingClient, func()) {
|
||||
|
@ -190,6 +191,14 @@ func (tss *testStreamingService) EchoNullStream(_ context.Context, es streaming.
|
|||
return sendErr
|
||||
}
|
||||
|
||||
func (tss *testStreamingService) EmptyPayloadStream(_ context.Context, _ *emptypb.Empty, streamer streaming.TTRPCStreaming_EmptyPayloadStreamServer) error {
|
||||
if err := streamer.Send(&streaming.EchoPayload{Seq: 1}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return streamer.Send(&streaming.EchoPayload{Seq: 2})
|
||||
}
|
||||
|
||||
func TestStreamingService(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
@ -203,6 +212,7 @@ func TestStreamingService(t *testing.T) {
|
|||
t.Run("DivideStream", divideStreamTest(ctx, client))
|
||||
t.Run("EchoNull", echoNullTest(ctx, client))
|
||||
t.Run("EchoNullStream", echoNullStreamTest(ctx, client))
|
||||
t.Run("EmptyPayloadStream", emptyPayloadStream(ctx, client))
|
||||
}
|
||||
|
||||
func echoTest(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
|
||||
|
@ -385,6 +395,33 @@ func echoNullStreamTest(ctx context.Context, client streaming.TTRPCStreamingClie
|
|||
}
|
||||
}
|
||||
|
||||
func emptyPayloadStream(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
stream, err := client.EmptyPayloadStream(ctx, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for i := uint32(1); i < 3; i++ {
|
||||
first, err := stream.Recv()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if first.Seq != i {
|
||||
t.Fatalf("unexpected seq: %d != %d", first.Seq, i)
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := stream.Recv(); err != io.EOF {
|
||||
t.Fatalf("Expected io.EOF, got %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func assertNextEcho(t testing.TB, a, b *streaming.EchoPayload) {
|
||||
t.Helper()
|
||||
if a.Msg != b.Msg {
|
||||
|
|
|
@ -0,0 +1,245 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ttrpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/containerd/ttrpc/internal"
|
||||
)
|
||||
|
||||
func TestUnaryClientInterceptor(t *testing.T) {
|
||||
var (
|
||||
intercepted = false
|
||||
interceptor = func(ctx context.Context, req *Request, reply *Response, _ *UnaryClientInfo, i Invoker) error {
|
||||
intercepted = true
|
||||
return i(ctx, req, reply)
|
||||
}
|
||||
|
||||
ctx = context.Background()
|
||||
server = mustServer(t)(NewServer())
|
||||
testImpl = &testingServer{}
|
||||
addr, listener = newTestListener(t)
|
||||
client, cleanup = newTestClient(t, addr, WithUnaryClientInterceptor(interceptor))
|
||||
message = strings.Repeat("a", 16)
|
||||
reply = strings.Repeat(message, 2)
|
||||
)
|
||||
|
||||
defer listener.Close()
|
||||
defer cleanup()
|
||||
|
||||
registerTestingService(server, testImpl)
|
||||
|
||||
go server.Serve(ctx, listener)
|
||||
defer server.Shutdown(ctx)
|
||||
|
||||
request := &internal.TestPayload{
|
||||
Foo: message,
|
||||
}
|
||||
response := &internal.TestPayload{}
|
||||
|
||||
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if !intercepted {
|
||||
t.Fatalf("ttrpc client call not intercepted")
|
||||
}
|
||||
|
||||
if response.Foo != reply {
|
||||
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChainUnaryClientInterceptor(t *testing.T) {
|
||||
var (
|
||||
orderIdx = 0
|
||||
recorded = []string{}
|
||||
intercept = func(idx int, tag string) UnaryClientInterceptor {
|
||||
return func(ctx context.Context, req *Request, reply *Response, _ *UnaryClientInfo, i Invoker) error {
|
||||
if idx != orderIdx {
|
||||
t.Fatalf("unexpected interceptor invocation order (%d != %d)", orderIdx, idx)
|
||||
}
|
||||
recorded = append(recorded, tag)
|
||||
orderIdx++
|
||||
return i(ctx, req, reply)
|
||||
}
|
||||
}
|
||||
|
||||
ctx = context.Background()
|
||||
server = mustServer(t)(NewServer())
|
||||
testImpl = &testingServer{}
|
||||
addr, listener = newTestListener(t)
|
||||
client, cleanup = newTestClient(t, addr,
|
||||
WithChainUnaryClientInterceptor(),
|
||||
WithChainUnaryClientInterceptor(
|
||||
intercept(0, "seen it"),
|
||||
intercept(1, "been"),
|
||||
intercept(2, "there"),
|
||||
intercept(3, "done"),
|
||||
intercept(4, "that"),
|
||||
),
|
||||
)
|
||||
expected = []string{
|
||||
"seen it",
|
||||
"been",
|
||||
"there",
|
||||
"done",
|
||||
"that",
|
||||
}
|
||||
message = strings.Repeat("a", 16)
|
||||
reply = strings.Repeat(message, 2)
|
||||
)
|
||||
|
||||
defer listener.Close()
|
||||
defer cleanup()
|
||||
|
||||
registerTestingService(server, testImpl)
|
||||
|
||||
go server.Serve(ctx, listener)
|
||||
defer server.Shutdown(ctx)
|
||||
|
||||
request := &internal.TestPayload{
|
||||
Foo: message,
|
||||
}
|
||||
response := &internal.TestPayload{}
|
||||
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(recorded, expected) {
|
||||
t.Fatalf("unexpected ttrpc chained client unary interceptor order (%s != %s)",
|
||||
strings.Join(recorded, " "), strings.Join(expected, " "))
|
||||
}
|
||||
|
||||
if response.Foo != reply {
|
||||
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnaryServerInterceptor(t *testing.T) {
|
||||
var (
|
||||
intercepted = false
|
||||
interceptor = func(ctx context.Context, unmarshal Unmarshaler, _ *UnaryServerInfo, method Method) (interface{}, error) {
|
||||
intercepted = true
|
||||
return method(ctx, unmarshal)
|
||||
}
|
||||
|
||||
ctx = context.Background()
|
||||
server = mustServer(t)(NewServer(WithUnaryServerInterceptor(interceptor)))
|
||||
testImpl = &testingServer{}
|
||||
addr, listener = newTestListener(t)
|
||||
client, cleanup = newTestClient(t, addr)
|
||||
message = strings.Repeat("a", 16)
|
||||
reply = strings.Repeat(message, 2)
|
||||
)
|
||||
|
||||
defer listener.Close()
|
||||
defer cleanup()
|
||||
|
||||
registerTestingService(server, testImpl)
|
||||
|
||||
go server.Serve(ctx, listener)
|
||||
defer server.Shutdown(ctx)
|
||||
|
||||
request := &internal.TestPayload{
|
||||
Foo: message,
|
||||
}
|
||||
response := &internal.TestPayload{}
|
||||
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if !intercepted {
|
||||
t.Fatalf("ttrpc server call not intercepted")
|
||||
}
|
||||
|
||||
if response.Foo != reply {
|
||||
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChainUnaryServerInterceptor(t *testing.T) {
|
||||
var (
|
||||
orderIdx = 0
|
||||
recorded = []string{}
|
||||
intercept = func(idx int, tag string) UnaryServerInterceptor {
|
||||
return func(ctx context.Context, unmarshal Unmarshaler, _ *UnaryServerInfo, method Method) (interface{}, error) {
|
||||
if orderIdx != idx {
|
||||
t.Fatalf("unexpected interceptor invocation order (%d != %d)", orderIdx, idx)
|
||||
}
|
||||
recorded = append(recorded, tag)
|
||||
orderIdx++
|
||||
return method(ctx, unmarshal)
|
||||
}
|
||||
}
|
||||
|
||||
ctx = context.Background()
|
||||
server = mustServer(t)(NewServer(
|
||||
WithUnaryServerInterceptor(
|
||||
intercept(0, "seen it"),
|
||||
),
|
||||
WithChainUnaryServerInterceptor(
|
||||
intercept(1, "been"),
|
||||
intercept(2, "there"),
|
||||
intercept(3, "done"),
|
||||
intercept(4, "that"),
|
||||
),
|
||||
))
|
||||
expected = []string{
|
||||
"seen it",
|
||||
"been",
|
||||
"there",
|
||||
"done",
|
||||
"that",
|
||||
}
|
||||
testImpl = &testingServer{}
|
||||
addr, listener = newTestListener(t)
|
||||
client, cleanup = newTestClient(t, addr)
|
||||
message = strings.Repeat("a", 16)
|
||||
reply = strings.Repeat(message, 2)
|
||||
)
|
||||
|
||||
defer listener.Close()
|
||||
defer cleanup()
|
||||
|
||||
registerTestingService(server, testImpl)
|
||||
|
||||
go server.Serve(ctx, listener)
|
||||
defer server.Shutdown(ctx)
|
||||
|
||||
request := &internal.TestPayload{
|
||||
Foo: message,
|
||||
}
|
||||
response := &internal.TestPayload{}
|
||||
|
||||
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(recorded, expected) {
|
||||
t.Fatalf("unexpected ttrpc chained server unary interceptor order (%s != %s)",
|
||||
strings.Join(recorded, " "), strings.Join(expected, " "))
|
||||
}
|
||||
|
||||
if response.Foo != reply {
|
||||
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
|
||||
}
|
||||
}
|
28
metadata.go
28
metadata.go
|
@ -62,6 +62,34 @@ func (m MD) Append(key string, values ...string) {
|
|||
}
|
||||
}
|
||||
|
||||
// Clone returns a copy of MD or nil if it's nil.
|
||||
// It's copied from golang's `http.Header.Clone` implementation:
|
||||
// https://cs.opensource.google/go/go/+/refs/tags/go1.23.4:src/net/http/header.go;l=94
|
||||
func (m MD) Clone() MD {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Find total number of values.
|
||||
nv := 0
|
||||
for _, vv := range m {
|
||||
nv += len(vv)
|
||||
}
|
||||
sv := make([]string, nv) // shared backing array for headers' values
|
||||
m2 := make(MD, len(m))
|
||||
for k, vv := range m {
|
||||
if vv == nil {
|
||||
// Preserve nil values.
|
||||
m2[k] = nil
|
||||
continue
|
||||
}
|
||||
n := copy(sv, vv)
|
||||
m2[k] = sv[:n:n]
|
||||
sv = sv[n:]
|
||||
}
|
||||
return m2
|
||||
}
|
||||
|
||||
func (m MD) setRequest(r *Request) {
|
||||
for k, values := range m {
|
||||
for _, v := range values {
|
||||
|
|
|
@ -18,6 +18,8 @@ package ttrpc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
|
@ -106,3 +108,100 @@ func TestMetadataContext(t *testing.T) {
|
|||
t.Errorf("invalid metadata value: %q", bar)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetadataClone(t *testing.T) {
|
||||
var metadata MD
|
||||
m2 := metadata.Clone()
|
||||
if m2 != nil {
|
||||
t.Error("MD.Clone() on nil metadata should return nil")
|
||||
}
|
||||
|
||||
metadata = MD{"nil": nil, "foo": {"bar"}, "baz": {"qux", "quxx"}}
|
||||
m2 = metadata.Clone()
|
||||
|
||||
if len(metadata) != len(m2) {
|
||||
t.Errorf("unexpected number of keys: %d, expected: %d", len(m2), len(metadata))
|
||||
}
|
||||
|
||||
for k, v := range metadata {
|
||||
v2, ok := m2[k]
|
||||
if !ok {
|
||||
t.Errorf("key not found: %s", k)
|
||||
}
|
||||
if v == nil && v2 == nil {
|
||||
continue
|
||||
}
|
||||
if v == nil || v2 == nil {
|
||||
t.Errorf("unexpected nil value: %v, expected: %v", v2, v)
|
||||
}
|
||||
if len(v) != len(v2) {
|
||||
t.Errorf("unexpected number of values: %d, expected: %d", len(v2), len(v))
|
||||
}
|
||||
for i := range v {
|
||||
if v[i] != v2[i] {
|
||||
t.Errorf("unexpected value: %s, expected: %s", v2[i], v[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetadataCloneConcurrent(t *testing.T) {
|
||||
metadata := make(MD)
|
||||
metadata.Set("foo", "bar")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 20; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
m2 := metadata.Clone()
|
||||
m2.Set("foo", "baz")
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
// Concurrent modification should clone the metadata first to avoid panic
|
||||
// due to concurrent map writes.
|
||||
if val, ok := metadata.Get("foo"); !ok {
|
||||
t.Error("metadata not found")
|
||||
} else if val[0] != "bar" {
|
||||
t.Errorf("invalid metadata value: %q", val[0])
|
||||
}
|
||||
}
|
||||
|
||||
func simpleClone(src MD) MD {
|
||||
md := MD{}
|
||||
for k, v := range src {
|
||||
md[k] = append(md[k], v...)
|
||||
}
|
||||
return md
|
||||
}
|
||||
|
||||
func BenchmarkMetadataClone(b *testing.B) {
|
||||
for _, sz := range []int{5, 10, 20, 50} {
|
||||
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
|
||||
metadata := make(MD)
|
||||
for i := 0; i < sz; i++ {
|
||||
metadata.Set("foo"+fmt.Sprint(i), "bar"+fmt.Sprint(i))
|
||||
}
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = metadata.Clone()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSimpleMetadataClone(b *testing.B) {
|
||||
for _, sz := range []int{5, 10, 20, 50} {
|
||||
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
|
||||
metadata := make(MD)
|
||||
for i := 0; i < sz; i++ {
|
||||
metadata.Set("foo"+fmt.Sprint(i), "bar"+fmt.Sprint(i))
|
||||
}
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = simpleClone(metadata)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
31
server.go
31
server.go
|
@ -27,7 +27,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/containerd/log"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
@ -74,9 +74,18 @@ func (s *Server) RegisterService(name string, desc *ServiceDesc) {
|
|||
}
|
||||
|
||||
func (s *Server) Serve(ctx context.Context, l net.Listener) error {
|
||||
s.addListener(l)
|
||||
s.mu.Lock()
|
||||
s.addListenerLocked(l)
|
||||
defer s.closeListener(l)
|
||||
|
||||
select {
|
||||
case <-s.done:
|
||||
s.mu.Unlock()
|
||||
return ErrServerClosed
|
||||
default:
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
var (
|
||||
backoff time.Duration
|
||||
handshaker = s.config.handshaker
|
||||
|
@ -109,7 +118,7 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) error {
|
|||
}
|
||||
|
||||
sleep := time.Duration(rand.Int63n(int64(backoff)))
|
||||
logrus.WithError(err).Errorf("ttrpc: failed accept; backoff %v", sleep)
|
||||
log.G(ctx).WithError(err).Errorf("ttrpc: failed accept; backoff %v", sleep)
|
||||
time.Sleep(sleep)
|
||||
continue
|
||||
}
|
||||
|
@ -121,14 +130,14 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) error {
|
|||
|
||||
approved, handshake, err := handshaker.Handshake(ctx, conn)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("ttrpc: refusing connection after handshake")
|
||||
log.G(ctx).WithError(err).Error("ttrpc: refusing connection after handshake")
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
sc, err := s.newConn(approved, handshake)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("ttrpc: create connection failed")
|
||||
log.G(ctx).WithError(err).Error("ttrpc: create connection failed")
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
|
@ -188,9 +197,7 @@ func (s *Server) Close() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *Server) addListener(l net.Listener) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
func (s *Server) addListenerLocked(l net.Listener) {
|
||||
s.listeners[l] = struct{}{}
|
||||
}
|
||||
|
||||
|
@ -513,12 +520,12 @@ func (c *serverConn) run(sctx context.Context) {
|
|||
Payload: response.data,
|
||||
})
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed marshaling response")
|
||||
log.G(ctx).WithError(err).Error("failed marshaling response")
|
||||
return
|
||||
}
|
||||
|
||||
if err := ch.send(response.id, messageTypeResponse, 0, p); err != nil {
|
||||
logrus.WithError(err).Error("failed sending message on channel")
|
||||
log.G(ctx).WithError(err).Error("failed sending message on channel")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
|
@ -530,7 +537,7 @@ func (c *serverConn) run(sctx context.Context) {
|
|||
flags = flags | flagNoData
|
||||
}
|
||||
if err := ch.send(response.id, messageTypeData, flags, response.data); err != nil {
|
||||
logrus.WithError(err).Error("failed sending message on channel")
|
||||
log.G(ctx).WithError(err).Error("failed sending message on channel")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -552,7 +559,7 @@ func (c *serverConn) run(sctx context.Context) {
|
|||
// requests, so that the client connection is closed
|
||||
return
|
||||
}
|
||||
logrus.WithError(err).Error("error receiving message")
|
||||
log.G(ctx).WithError(err).Error("error receiving message")
|
||||
// else, initiate shutdown
|
||||
case <-shutdown:
|
||||
return
|
||||
|
|
|
@ -219,7 +219,7 @@ func TestServerShutdown(t *testing.T) {
|
|||
|
||||
// register a service that takes until we tell it to stop
|
||||
server.Register(serviceName, map[string]Method{
|
||||
"Test": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
|
||||
"Test": func(_ context.Context, unmarshal func(interface{}) error) (interface{}, error) {
|
||||
var req internal.TestPayload
|
||||
if err := unmarshal(&req); err != nil {
|
||||
return nil, err
|
||||
|
@ -298,6 +298,36 @@ func TestServerClose(t *testing.T) {
|
|||
checkServerShutdown(t, server)
|
||||
}
|
||||
|
||||
func TestImmediateServerShutdown(t *testing.T) {
|
||||
var (
|
||||
ctx = context.Background()
|
||||
server = mustServer(t)(NewServer())
|
||||
addr, listener = newTestListener(t)
|
||||
errs = make(chan error, 1)
|
||||
_, cleanup = newTestClient(t, addr)
|
||||
)
|
||||
defer cleanup()
|
||||
defer listener.Close()
|
||||
go func() {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
errs <- server.Serve(ctx, listener)
|
||||
}()
|
||||
|
||||
registerTestingService(server, &testingServer{})
|
||||
|
||||
if err := server.Shutdown(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
select {
|
||||
case err := <-errs:
|
||||
if err != ErrServerClosed {
|
||||
t.Fatal(err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("retreiving error from server.Shutdown() timed out")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOversizeCall(t *testing.T) {
|
||||
var (
|
||||
ctx = context.Background()
|
||||
|
@ -318,7 +348,7 @@ func TestOversizeCall(t *testing.T) {
|
|||
Foo: strings.Repeat("a", 1+messageLengthMax),
|
||||
}
|
||||
if err := client.Call(ctx, serviceName, "Test", tp, tp); err == nil {
|
||||
t.Fatalf("expected error from non-existent service call")
|
||||
t.Fatalf("expected error from oversized message")
|
||||
} else if status, ok := status.FromError(err); !ok {
|
||||
t.Fatalf("expected status present in error: %v", err)
|
||||
} else if status.Code() != codes.ResourceExhausted {
|
||||
|
@ -363,6 +393,8 @@ func TestClientEOF(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
client.UserOnCloseWait(ctx)
|
||||
|
||||
// server shutdown, but we still make a call.
|
||||
if err := client.Call(ctx, serviceName, "Test", tp, tp); err == nil {
|
||||
t.Fatalf("expected error when calling against shutdown server")
|
||||
|
|
|
@ -140,7 +140,11 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
|
|||
respond(st, p, stream.StreamingServer, true)
|
||||
}()
|
||||
|
||||
if req.Payload != nil {
|
||||
// Empty proto messages serialized to 0 payloads,
|
||||
// so signatures like: rpc Stream(google.protobuf.Empty) returns (stream Data);
|
||||
// don't get invoked here, which causes hang on client side.
|
||||
// See https://github.com/containerd/ttrpc/issues/126
|
||||
if req.Payload != nil || !info.StreamingClient {
|
||||
unmarshal := func(obj interface{}) error {
|
||||
return protoUnmarshal(req.Payload, obj)
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ func TestStreamClient(t *testing.T) {
|
|||
|
||||
desc := &ServiceDesc{
|
||||
Methods: map[string]Method{
|
||||
"Echo": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
|
||||
"Echo": func(_ context.Context, unmarshal func(interface{}) error) (interface{}, error) {
|
||||
var req internal.EchoPayload
|
||||
if err := unmarshal(&req); err != nil {
|
||||
return nil, err
|
||||
|
@ -49,7 +49,7 @@ func TestStreamClient(t *testing.T) {
|
|||
},
|
||||
Streams: map[string]Stream{
|
||||
"EchoStream": {
|
||||
Handler: func(ctx context.Context, ss StreamServer) (interface{}, error) {
|
||||
Handler: func(_ context.Context, ss StreamServer) (interface{}, error) {
|
||||
for {
|
||||
var req internal.EchoPayload
|
||||
if err := ss.RecvMsg(&req); err != nil {
|
||||
|
|
Loading…
Reference in New Issue