Compare commits

..

No commits in common. "main" and "v1.2.2" have entirely different histories.
main ... v1.2.2

23 changed files with 174 additions and 796 deletions

View File

@ -1,16 +0,0 @@
version: 2
updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10
groups:
golang-x:
patterns:
- "golang.org/x/*"
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10

View File

@ -7,7 +7,7 @@ on:
branches: [ main ]
env:
GO_VERSION: 1.23.x
GO_VERSION: 1.20.x
permissions:
contents: read
@ -27,21 +27,27 @@ jobs:
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@v3
with:
path: src/github.com/containerd/ttrpc
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
- uses: actions/setup-go@v3
with:
go-version: ${{ env.GO_VERSION }}
- name: golangci-lint
uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8 # v6.1.1
uses: golangci/golangci-lint-action@v3
with:
version: v1.60.3
version: v1.51.2
args: --timeout=5m
skip-cache: true
working-directory: src/github.com/containerd/ttrpc
- name: golangci-lint errors
run: golangci-lint run
working-directory: src/github.com/containerd/ttrpc
if: ${{ failure() }}
#
# Project checks
#
@ -51,15 +57,16 @@ jobs:
timeout-minutes: 5
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
- uses: actions/setup-go@v3
with:
go-version: ${{ env.GO_VERSION }}
- uses: containerd/project-checks@434a07157608eeaa1d5c8d4dd506154204cd9401 # v1.1.0
- uses: actions/checkout@v3
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: containerd/project-checks@v1.1.0
with:
working-directory: src/github.com/containerd/ttrpc
@ -71,20 +78,22 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
go: [1.22.x, 1.23.x]
go: [1.19.x, 1.20.x]
name: ${{ matrix.os }} / ${{ matrix.go }}
runs-on: ${{ matrix.os }}
timeout-minutes: 10
timeout-minutes: 5
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go }}
- name: Check out code
uses: actions/checkout@v3
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ matrix.go }}
- name: Test
working-directory: src/github.com/containerd/ttrpc
@ -110,11 +119,7 @@ jobs:
timeout-minutes: 5
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
- uses: actions/setup-go@v3
with:
go-version: ${{ env.GO_VERSION }}
id: go
@ -125,6 +130,12 @@ jobs:
echo "GOPATH=${{ github.workspace }}" >> $GITHUB_ENV
echo "${{ github.workspace }}/bin" >> $GITHUB_PATH
- name: Check out code
uses: actions/checkout@v3
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- name: Install dependencies
working-directory: src/github.com/containerd/ttrpc
run: |

View File

@ -6,7 +6,7 @@ linters:
- goimports
- revive
- ineffassign
- govet
- vet
- unused
- misspell
disable:
@ -14,7 +14,7 @@ linters:
linters-settings:
revive:
ignore-generated-header: true
ignore-generated-headers: true
rules:
- name: blank-imports
- name: context-as-argument
@ -45,8 +45,8 @@ linters-settings:
issues:
include:
- EXC0002
exclude-dirs:
- example
run:
timeout: 8m
skip-dirs:
- example

View File

@ -1,6 +1,6 @@
# ttrpc
[![Build Status](https://github.com/containerd/ttrpc/actions/workflows/ci.yml/badge.svg)](https://github.com/containerd/ttrpc/actions/workflows/ci.yml)
[![Build Status](https://github.com/containerd/ttrpc/workflows/CI/badge.svg)](https://github.com/containerd/ttrpc/actions?query=workflow%3ACI)
GRPC for low-memory environments.

View File

@ -143,10 +143,10 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
}
func (ch *channel) send(streamID uint32, t messageType, flags uint8, p []byte) error {
if len(p) > messageLengthMax {
return OversizedMessageError(len(p))
}
// TODO: Error on send rather than on recv
//if len(p) > messageLengthMax {
// return status.Errorf(codes.InvalidArgument, "refusing to send, message length %v exceed maximum message size of %v", len(p), messageLengthMax)
//}
if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t, Flags: flags}); err != nil {
return err
}

View File

@ -89,19 +89,21 @@ func TestReadWriteMessage(t *testing.T) {
func TestMessageOversize(t *testing.T) {
var (
w, _ = net.Pipe()
wch = newChannel(w)
msg = bytes.Repeat([]byte("a message of massive length"), 512<<10)
errs = make(chan error, 1)
w, r = net.Pipe()
wch, rch = newChannel(w), newChannel(r)
msg = bytes.Repeat([]byte("a message of massive length"), 512<<10)
errs = make(chan error, 1)
)
go func() {
errs <- wch.send(1, 1, 0, msg)
if err := wch.send(1, 1, 0, msg); err != nil {
errs <- err
}
}()
err := <-errs
_, _, err := rch.recv()
if err == nil {
t.Fatalf("sending oversized message expected to fail")
t.Fatalf("error expected reading with small buffer")
}
status, ok := status.FromError(err)
@ -112,4 +114,12 @@ func TestMessageOversize(t *testing.T) {
if status.Code() != codes.ResourceExhausted {
t.Fatalf("expected grpc status code: %v != %v", status.Code(), codes.ResourceExhausted)
}
select {
case err := <-errs:
if err != nil {
t.Fatal(err)
}
default:
}
}

View File

@ -27,7 +27,7 @@ import (
"syscall"
"time"
"github.com/containerd/log"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
@ -71,42 +71,6 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
}
}
// WithChainUnaryClientInterceptor sets the provided chain of client interceptors
func WithChainUnaryClientInterceptor(interceptors ...UnaryClientInterceptor) ClientOpts {
return func(c *Client) {
if len(interceptors) == 0 {
return
}
if c.interceptor != nil {
interceptors = append([]UnaryClientInterceptor{c.interceptor}, interceptors...)
}
c.interceptor = func(
ctx context.Context,
req *Request,
reply *Response,
info *UnaryClientInfo,
final Invoker,
) error {
return interceptors[0](ctx, req, reply, info,
chainUnaryInterceptors(interceptors[1:], final, info))
}
}
}
func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker, info *UnaryClientInfo) Invoker {
if len(interceptors) == 0 {
return final
}
return func(
ctx context.Context,
req *Request,
reply *Response,
) error {
return interceptors[0](ctx, req, reply, info,
chainUnaryInterceptors(interceptors[1:], final, info))
}
}
// NewClient creates a new ttrpc client using the given connection
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx, cancel := context.WithCancel(context.Background())
@ -121,16 +85,13 @@ func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx: ctx,
userCloseFunc: func() {},
userCloseWaitCh: make(chan struct{}),
interceptor: defaultClientInterceptor,
}
for _, o := range opts {
o(c)
}
if c.interceptor == nil {
c.interceptor = defaultClientInterceptor
}
go c.run()
return c
}
@ -325,7 +286,7 @@ func (c *Client) Close() error {
return nil
}
// UserOnCloseWait is used to block until the user's on-close callback
// UserOnCloseWait is used to blocks untils the user's on-close callback
// finishes.
func (c *Client) UserOnCloseWait(ctx context.Context) error {
select {
@ -368,7 +329,7 @@ func (c *Client) receiveLoop() error {
sid := streamID(msg.header.StreamID)
s := c.getStream(sid)
if s == nil {
log.G(c.ctx).WithField("stream", sid).Error("ttrpc: received message on inactive stream")
logrus.WithField("stream", sid).Errorf("ttrpc: received message on inactive stream")
continue
}
@ -376,7 +337,7 @@ func (c *Client) receiveLoop() error {
s.closeWithError(err)
} else {
if err := s.receive(c.ctx, msg); err != nil {
log.G(c.ctx).WithFields(log.Fields{"error": err, "stream": sid}).Error("ttrpc: failed to handle message")
logrus.WithError(err).WithField("stream", sid).Errorf("ttrpc: failed to handle message")
}
}
}
@ -386,44 +347,25 @@ func (c *Client) receiveLoop() error {
// createStream creates a new stream and registers it with the client
// Introduce stream types for multiple or single response
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
// sendLock must be held across both allocation of the stream ID and sending it across the wire.
// This ensures that new stream IDs sent on the wire are always increasing, which is a
// requirement of the TTRPC protocol.
// This use of sendLock could be split into another mutex that covers stream creation + first send,
// and just use sendLock to guard writing to the wire, but for now it seems simpler to have fewer mutexes.
c.sendLock.Lock()
defer c.sendLock.Unlock()
c.streamLock.Lock()
// Check if closed since lock acquired to prevent adding
// anything after cleanup completes
select {
case <-c.ctx.Done():
c.streamLock.Unlock()
return nil, ErrClosed
default:
}
var s *stream
if err := func() error {
// In the future this could be replaced with a sync.Map instead of streamLock+map.
c.streamLock.Lock()
defer c.streamLock.Unlock()
// Stream ID should be allocated at same time
s := newStream(c.nextStreamID, c)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2
// Check if closed since lock acquired to prevent adding
// anything after cleanup completes
select {
case <-c.ctx.Done():
return ErrClosed
default:
}
s = newStream(c.nextStreamID, c)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2
return nil
}(); err != nil {
return nil, err
}
c.sendLock.Lock()
defer c.sendLock.Unlock()
c.streamLock.Unlock()
if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
return s, filterCloseErr(err)

View File

@ -18,7 +18,6 @@ package main
import (
"google.golang.org/protobuf/compiler/protogen"
"google.golang.org/protobuf/types/pluginpb"
)
func main() {
@ -31,7 +30,6 @@ func main() {
return nil
},
}.Run(func(gen *protogen.Plugin) error {
gen.SupportedFeatures = uint64(pluginpb.CodeGeneratorResponse_FEATURE_PROTO3_OPTIONAL)
for _, f := range gen.Files {
if !f.Generate {
continue

View File

@ -16,10 +16,7 @@
package ttrpc
import (
"context"
"errors"
)
import "errors"
type serverConfig struct {
handshaker Handshaker
@ -47,40 +44,9 @@ func WithServerHandshaker(handshaker Handshaker) ServerOpt {
func WithUnaryServerInterceptor(i UnaryServerInterceptor) ServerOpt {
return func(c *serverConfig) error {
if c.interceptor != nil {
return errors.New("only one unchained interceptor allowed per server")
return errors.New("only one interceptor allowed per server")
}
c.interceptor = i
return nil
}
}
// WithChainUnaryServerInterceptor sets the provided chain of server interceptors
func WithChainUnaryServerInterceptor(interceptors ...UnaryServerInterceptor) ServerOpt {
return func(c *serverConfig) error {
if len(interceptors) == 0 {
return nil
}
if c.interceptor != nil {
interceptors = append([]UnaryServerInterceptor{c.interceptor}, interceptors...)
}
c.interceptor = func(
ctx context.Context,
unmarshal Unmarshaler,
info *UnaryServerInfo,
method Method) (interface{}, error) {
return interceptors[0](ctx, unmarshal, info,
chainUnaryServerInterceptors(info, method, interceptors[1:]))
}
return nil
}
}
func chainUnaryServerInterceptors(info *UnaryServerInfo, method Method, interceptors []UnaryServerInterceptor) Method {
if len(interceptors) == 0 {
return method
}
return func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
return interceptors[0](ctx, unmarshal, info,
chainUnaryServerInterceptors(info, method, interceptors[1:]))
}
}

View File

@ -16,12 +16,7 @@
package ttrpc
import (
"errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
import "errors"
var (
// ErrProtocol is a general error in the handling the protocol.
@ -37,44 +32,3 @@ var (
// ErrStreamClosed is when the streaming connection is closed.
ErrStreamClosed = errors.New("ttrpc: stream closed")
)
// OversizedMessageErr is used to indicate refusal to send an oversized message.
// It wraps a ResourceExhausted grpc Status together with the offending message
// length.
type OversizedMessageErr struct {
messageLength int
err error
}
// OversizedMessageError returns an OversizedMessageErr error for the given message
// length if it exceeds the allowed maximum. Otherwise a nil error is returned.
func OversizedMessageError(messageLength int) error {
if messageLength <= messageLengthMax {
return nil
}
return &OversizedMessageErr{
messageLength: messageLength,
err: status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", messageLength, messageLengthMax),
}
}
// Error returns the error message for the corresponding grpc Status for the error.
func (e *OversizedMessageErr) Error() string {
return e.err.Error()
}
// Unwrap returns the corresponding error with our grpc status code.
func (e *OversizedMessageErr) Unwrap() error {
return e.err
}
// RejectedLength retrieves the rejected message length which triggered the error.
func (e *OversizedMessageErr) RejectedLength() int {
return e.messageLength
}
// MaximumLength retrieves the maximum allowed message length that triggered the error.
func (*OversizedMessageErr) MaximumLength() int {
return messageLengthMax
}

16
go.mod
View File

@ -1,16 +1,14 @@
module github.com/containerd/ttrpc
go 1.22
go 1.13
require (
github.com/containerd/log v0.1.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.4
github.com/golang/protobuf v1.5.0
github.com/prometheus/procfs v0.6.0
golang.org/x/sys v0.26.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.0
github.com/sirupsen/logrus v1.8.1
golang.org/x/sys v0.1.0
google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63
google.golang.org/grpc v1.27.1
google.golang.org/protobuf v1.27.1
)
require github.com/sirupsen/logrus v1.9.3 // indirect

89
go.sum
View File

@ -1,69 +1,100 @@
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63 h1:YzfoEYWbODU5Fbt37+h7X16BWQbad7Q4S6gclTKFXM8=
google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -211,7 +211,7 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc =
0x52, 0x03, 0x61, 0x64, 0x64, 0x22, 0x29, 0x0a, 0x03, 0x53, 0x75, 0x6d, 0x12, 0x10, 0x0a, 0x03,
0x73, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x73, 0x75, 0x6d, 0x12, 0x10,
0x0a, 0x03, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x6e, 0x75, 0x6d,
0x32, 0xfa, 0x04, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x5a,
0x32, 0xa0, 0x04, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x5a,
0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x12, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69,
0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
@ -245,17 +245,11 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc =
0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f,
0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x28,
0x01, 0x30, 0x01, 0x12, 0x58, 0x0a, 0x12, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x50, 0x61, 0x79, 0x6c,
0x6f, 0x61, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x1a, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e,
0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x30, 0x01, 0x42, 0x3d, 0x5a,
0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74,
0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2f, 0x69, 0x6e, 0x74,
0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
0x6e, 0x67, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
0x01, 0x30, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72,
0x70, 0x63, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -284,16 +278,14 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_depIdxs =
2, // 3: ttrpc.integration.streaming.Streaming.DivideStream:input_type -> ttrpc.integration.streaming.Sum
0, // 4: ttrpc.integration.streaming.Streaming.EchoNull:input_type -> ttrpc.integration.streaming.EchoPayload
0, // 5: ttrpc.integration.streaming.Streaming.EchoNullStream:input_type -> ttrpc.integration.streaming.EchoPayload
3, // 6: ttrpc.integration.streaming.Streaming.EmptyPayloadStream:input_type -> google.protobuf.Empty
0, // 7: ttrpc.integration.streaming.Streaming.Echo:output_type -> ttrpc.integration.streaming.EchoPayload
0, // 8: ttrpc.integration.streaming.Streaming.EchoStream:output_type -> ttrpc.integration.streaming.EchoPayload
2, // 9: ttrpc.integration.streaming.Streaming.SumStream:output_type -> ttrpc.integration.streaming.Sum
1, // 10: ttrpc.integration.streaming.Streaming.DivideStream:output_type -> ttrpc.integration.streaming.Part
3, // 11: ttrpc.integration.streaming.Streaming.EchoNull:output_type -> google.protobuf.Empty
3, // 12: ttrpc.integration.streaming.Streaming.EchoNullStream:output_type -> google.protobuf.Empty
0, // 13: ttrpc.integration.streaming.Streaming.EmptyPayloadStream:output_type -> ttrpc.integration.streaming.EchoPayload
7, // [7:14] is the sub-list for method output_type
0, // [0:7] is the sub-list for method input_type
0, // 6: ttrpc.integration.streaming.Streaming.Echo:output_type -> ttrpc.integration.streaming.EchoPayload
0, // 7: ttrpc.integration.streaming.Streaming.EchoStream:output_type -> ttrpc.integration.streaming.EchoPayload
2, // 8: ttrpc.integration.streaming.Streaming.SumStream:output_type -> ttrpc.integration.streaming.Sum
1, // 9: ttrpc.integration.streaming.Streaming.DivideStream:output_type -> ttrpc.integration.streaming.Part
3, // 10: ttrpc.integration.streaming.Streaming.EchoNull:output_type -> google.protobuf.Empty
3, // 11: ttrpc.integration.streaming.Streaming.EchoNullStream:output_type -> google.protobuf.Empty
6, // [6:12] is the sub-list for method output_type
0, // [0:6] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name

View File

@ -34,7 +34,6 @@ service Streaming {
rpc DivideStream(Sum) returns (stream Part);
rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty);
rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty);
rpc EmptyPayloadStream(google.protobuf.Empty) returns (stream EchoPayload);
}
message EchoPayload {

View File

@ -15,7 +15,6 @@ type TTRPCStreamingService interface {
DivideStream(context.Context, *Sum, TTRPCStreaming_DivideStreamServer) error
EchoNull(context.Context, TTRPCStreaming_EchoNullServer) (*emptypb.Empty, error)
EchoNullStream(context.Context, TTRPCStreaming_EchoNullStreamServer) error
EmptyPayloadStream(context.Context, *emptypb.Empty, TTRPCStreaming_EmptyPayloadStreamServer) error
}
type TTRPCStreaming_EchoStreamServer interface {
@ -109,19 +108,6 @@ func (x *ttrpcstreamingEchoNullStreamServer) Recv() (*EchoPayload, error) {
return m, nil
}
type TTRPCStreaming_EmptyPayloadStreamServer interface {
Send(*EchoPayload) error
ttrpc.StreamServer
}
type ttrpcstreamingEmptyPayloadStreamServer struct {
ttrpc.StreamServer
}
func (x *ttrpcstreamingEmptyPayloadStreamServer) Send(m *EchoPayload) error {
return x.StreamServer.SendMsg(m)
}
func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService) {
srv.RegisterService("ttrpc.integration.streaming.Streaming", &ttrpc.ServiceDesc{
Methods: map[string]ttrpc.Method{
@ -173,17 +159,6 @@ func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService)
StreamingClient: true,
StreamingServer: true,
},
"EmptyPayloadStream": {
Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
m := new(emptypb.Empty)
if err := stream.RecvMsg(m); err != nil {
return nil, err
}
return nil, svc.EmptyPayloadStream(ctx, m, &ttrpcstreamingEmptyPayloadStreamServer{stream})
},
StreamingClient: false,
StreamingServer: true,
},
},
})
}
@ -195,7 +170,6 @@ type TTRPCStreamingClient interface {
DivideStream(context.Context, *Sum) (TTRPCStreaming_DivideStreamClient, error)
EchoNull(context.Context) (TTRPCStreaming_EchoNullClient, error)
EchoNullStream(context.Context) (TTRPCStreaming_EchoNullStreamClient, error)
EmptyPayloadStream(context.Context, *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error)
}
type ttrpcstreamingClient struct {
@ -386,32 +360,3 @@ func (x *ttrpcstreamingEchoNullStreamClient) Recv() (*emptypb.Empty, error) {
}
return m, nil
}
func (c *ttrpcstreamingClient) EmptyPayloadStream(ctx context.Context, req *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error) {
stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
StreamingClient: false,
StreamingServer: true,
}, "ttrpc.integration.streaming.Streaming", "EmptyPayloadStream", req)
if err != nil {
return nil, err
}
x := &ttrpcstreamingEmptyPayloadStreamClient{stream}
return x, nil
}
type TTRPCStreaming_EmptyPayloadStreamClient interface {
Recv() (*EchoPayload, error)
ttrpc.ClientStream
}
type ttrpcstreamingEmptyPayloadStreamClient struct {
ttrpc.ClientStream
}
func (x *ttrpcstreamingEmptyPayloadStreamClient) Recv() (*EchoPayload, error) {
m := new(EchoPayload)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

View File

@ -31,7 +31,6 @@ import (
"github.com/containerd/ttrpc"
"github.com/containerd/ttrpc/integration/streaming"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/protobuf/types/known/emptypb"
)
func runService(ctx context.Context, t testing.TB, service streaming.TTRPCStreamingService) (streaming.TTRPCStreamingClient, func()) {
@ -191,14 +190,6 @@ func (tss *testStreamingService) EchoNullStream(_ context.Context, es streaming.
return sendErr
}
func (tss *testStreamingService) EmptyPayloadStream(_ context.Context, _ *emptypb.Empty, streamer streaming.TTRPCStreaming_EmptyPayloadStreamServer) error {
if err := streamer.Send(&streaming.EchoPayload{Seq: 1}); err != nil {
return err
}
return streamer.Send(&streaming.EchoPayload{Seq: 2})
}
func TestStreamingService(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -212,7 +203,6 @@ func TestStreamingService(t *testing.T) {
t.Run("DivideStream", divideStreamTest(ctx, client))
t.Run("EchoNull", echoNullTest(ctx, client))
t.Run("EchoNullStream", echoNullStreamTest(ctx, client))
t.Run("EmptyPayloadStream", emptyPayloadStream(ctx, client))
}
func echoTest(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
@ -395,33 +385,6 @@ func echoNullStreamTest(ctx context.Context, client streaming.TTRPCStreamingClie
}
}
func emptyPayloadStream(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
return func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
stream, err := client.EmptyPayloadStream(ctx, nil)
if err != nil {
t.Fatal(err)
}
for i := uint32(1); i < 3; i++ {
first, err := stream.Recv()
if err != nil {
t.Fatal(err)
}
if first.Seq != i {
t.Fatalf("unexpected seq: %d != %d", first.Seq, i)
}
}
if _, err := stream.Recv(); err != io.EOF {
t.Fatalf("Expected io.EOF, got %v", err)
}
}
}
func assertNextEcho(t testing.TB, a, b *streaming.EchoPayload) {
t.Helper()
if a.Msg != b.Msg {

View File

@ -1,245 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttrpc
import (
"context"
"reflect"
"strings"
"testing"
"github.com/containerd/ttrpc/internal"
)
func TestUnaryClientInterceptor(t *testing.T) {
var (
intercepted = false
interceptor = func(ctx context.Context, req *Request, reply *Response, _ *UnaryClientInfo, i Invoker) error {
intercepted = true
return i(ctx, req, reply)
}
ctx = context.Background()
server = mustServer(t)(NewServer())
testImpl = &testingServer{}
addr, listener = newTestListener(t)
client, cleanup = newTestClient(t, addr, WithUnaryClientInterceptor(interceptor))
message = strings.Repeat("a", 16)
reply = strings.Repeat(message, 2)
)
defer listener.Close()
defer cleanup()
registerTestingService(server, testImpl)
go server.Serve(ctx, listener)
defer server.Shutdown(ctx)
request := &internal.TestPayload{
Foo: message,
}
response := &internal.TestPayload{}
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !intercepted {
t.Fatalf("ttrpc client call not intercepted")
}
if response.Foo != reply {
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
}
}
func TestChainUnaryClientInterceptor(t *testing.T) {
var (
orderIdx = 0
recorded = []string{}
intercept = func(idx int, tag string) UnaryClientInterceptor {
return func(ctx context.Context, req *Request, reply *Response, _ *UnaryClientInfo, i Invoker) error {
if idx != orderIdx {
t.Fatalf("unexpected interceptor invocation order (%d != %d)", orderIdx, idx)
}
recorded = append(recorded, tag)
orderIdx++
return i(ctx, req, reply)
}
}
ctx = context.Background()
server = mustServer(t)(NewServer())
testImpl = &testingServer{}
addr, listener = newTestListener(t)
client, cleanup = newTestClient(t, addr,
WithChainUnaryClientInterceptor(),
WithChainUnaryClientInterceptor(
intercept(0, "seen it"),
intercept(1, "been"),
intercept(2, "there"),
intercept(3, "done"),
intercept(4, "that"),
),
)
expected = []string{
"seen it",
"been",
"there",
"done",
"that",
}
message = strings.Repeat("a", 16)
reply = strings.Repeat(message, 2)
)
defer listener.Close()
defer cleanup()
registerTestingService(server, testImpl)
go server.Serve(ctx, listener)
defer server.Shutdown(ctx)
request := &internal.TestPayload{
Foo: message,
}
response := &internal.TestPayload{}
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(recorded, expected) {
t.Fatalf("unexpected ttrpc chained client unary interceptor order (%s != %s)",
strings.Join(recorded, " "), strings.Join(expected, " "))
}
if response.Foo != reply {
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
}
}
func TestUnaryServerInterceptor(t *testing.T) {
var (
intercepted = false
interceptor = func(ctx context.Context, unmarshal Unmarshaler, _ *UnaryServerInfo, method Method) (interface{}, error) {
intercepted = true
return method(ctx, unmarshal)
}
ctx = context.Background()
server = mustServer(t)(NewServer(WithUnaryServerInterceptor(interceptor)))
testImpl = &testingServer{}
addr, listener = newTestListener(t)
client, cleanup = newTestClient(t, addr)
message = strings.Repeat("a", 16)
reply = strings.Repeat(message, 2)
)
defer listener.Close()
defer cleanup()
registerTestingService(server, testImpl)
go server.Serve(ctx, listener)
defer server.Shutdown(ctx)
request := &internal.TestPayload{
Foo: message,
}
response := &internal.TestPayload{}
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !intercepted {
t.Fatalf("ttrpc server call not intercepted")
}
if response.Foo != reply {
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
}
}
func TestChainUnaryServerInterceptor(t *testing.T) {
var (
orderIdx = 0
recorded = []string{}
intercept = func(idx int, tag string) UnaryServerInterceptor {
return func(ctx context.Context, unmarshal Unmarshaler, _ *UnaryServerInfo, method Method) (interface{}, error) {
if orderIdx != idx {
t.Fatalf("unexpected interceptor invocation order (%d != %d)", orderIdx, idx)
}
recorded = append(recorded, tag)
orderIdx++
return method(ctx, unmarshal)
}
}
ctx = context.Background()
server = mustServer(t)(NewServer(
WithUnaryServerInterceptor(
intercept(0, "seen it"),
),
WithChainUnaryServerInterceptor(
intercept(1, "been"),
intercept(2, "there"),
intercept(3, "done"),
intercept(4, "that"),
),
))
expected = []string{
"seen it",
"been",
"there",
"done",
"that",
}
testImpl = &testingServer{}
addr, listener = newTestListener(t)
client, cleanup = newTestClient(t, addr)
message = strings.Repeat("a", 16)
reply = strings.Repeat(message, 2)
)
defer listener.Close()
defer cleanup()
registerTestingService(server, testImpl)
go server.Serve(ctx, listener)
defer server.Shutdown(ctx)
request := &internal.TestPayload{
Foo: message,
}
response := &internal.TestPayload{}
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(recorded, expected) {
t.Fatalf("unexpected ttrpc chained server unary interceptor order (%s != %s)",
strings.Join(recorded, " "), strings.Join(expected, " "))
}
if response.Foo != reply {
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
}
}

View File

@ -62,34 +62,6 @@ func (m MD) Append(key string, values ...string) {
}
}
// Clone returns a copy of MD or nil if it's nil.
// It's copied from golang's `http.Header.Clone` implementation:
// https://cs.opensource.google/go/go/+/refs/tags/go1.23.4:src/net/http/header.go;l=94
func (m MD) Clone() MD {
if m == nil {
return nil
}
// Find total number of values.
nv := 0
for _, vv := range m {
nv += len(vv)
}
sv := make([]string, nv) // shared backing array for headers' values
m2 := make(MD, len(m))
for k, vv := range m {
if vv == nil {
// Preserve nil values.
m2[k] = nil
continue
}
n := copy(sv, vv)
m2[k] = sv[:n:n]
sv = sv[n:]
}
return m2
}
func (m MD) setRequest(r *Request) {
for k, values := range m {
for _, v := range values {

View File

@ -18,8 +18,6 @@ package ttrpc
import (
"context"
"fmt"
"sync"
"testing"
)
@ -108,100 +106,3 @@ func TestMetadataContext(t *testing.T) {
t.Errorf("invalid metadata value: %q", bar)
}
}
func TestMetadataClone(t *testing.T) {
var metadata MD
m2 := metadata.Clone()
if m2 != nil {
t.Error("MD.Clone() on nil metadata should return nil")
}
metadata = MD{"nil": nil, "foo": {"bar"}, "baz": {"qux", "quxx"}}
m2 = metadata.Clone()
if len(metadata) != len(m2) {
t.Errorf("unexpected number of keys: %d, expected: %d", len(m2), len(metadata))
}
for k, v := range metadata {
v2, ok := m2[k]
if !ok {
t.Errorf("key not found: %s", k)
}
if v == nil && v2 == nil {
continue
}
if v == nil || v2 == nil {
t.Errorf("unexpected nil value: %v, expected: %v", v2, v)
}
if len(v) != len(v2) {
t.Errorf("unexpected number of values: %d, expected: %d", len(v2), len(v))
}
for i := range v {
if v[i] != v2[i] {
t.Errorf("unexpected value: %s, expected: %s", v2[i], v[i])
}
}
}
}
func TestMetadataCloneConcurrent(t *testing.T) {
metadata := make(MD)
metadata.Set("foo", "bar")
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
m2 := metadata.Clone()
m2.Set("foo", "baz")
}()
}
wg.Wait()
// Concurrent modification should clone the metadata first to avoid panic
// due to concurrent map writes.
if val, ok := metadata.Get("foo"); !ok {
t.Error("metadata not found")
} else if val[0] != "bar" {
t.Errorf("invalid metadata value: %q", val[0])
}
}
func simpleClone(src MD) MD {
md := MD{}
for k, v := range src {
md[k] = append(md[k], v...)
}
return md
}
func BenchmarkMetadataClone(b *testing.B) {
for _, sz := range []int{5, 10, 20, 50} {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
metadata := make(MD)
for i := 0; i < sz; i++ {
metadata.Set("foo"+fmt.Sprint(i), "bar"+fmt.Sprint(i))
}
for i := 0; i < b.N; i++ {
_ = metadata.Clone()
}
})
}
}
func BenchmarkSimpleMetadataClone(b *testing.B) {
for _, sz := range []int{5, 10, 20, 50} {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
metadata := make(MD)
for i := 0; i < sz; i++ {
metadata.Set("foo"+fmt.Sprint(i), "bar"+fmt.Sprint(i))
}
for i := 0; i < b.N; i++ {
_ = simpleClone(metadata)
}
})
}
}

View File

@ -27,7 +27,7 @@ import (
"syscall"
"time"
"github.com/containerd/log"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@ -74,18 +74,9 @@ func (s *Server) RegisterService(name string, desc *ServiceDesc) {
}
func (s *Server) Serve(ctx context.Context, l net.Listener) error {
s.mu.Lock()
s.addListenerLocked(l)
s.addListener(l)
defer s.closeListener(l)
select {
case <-s.done:
s.mu.Unlock()
return ErrServerClosed
default:
}
s.mu.Unlock()
var (
backoff time.Duration
handshaker = s.config.handshaker
@ -118,7 +109,7 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) error {
}
sleep := time.Duration(rand.Int63n(int64(backoff)))
log.G(ctx).WithError(err).Errorf("ttrpc: failed accept; backoff %v", sleep)
logrus.WithError(err).Errorf("ttrpc: failed accept; backoff %v", sleep)
time.Sleep(sleep)
continue
}
@ -130,14 +121,14 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) error {
approved, handshake, err := handshaker.Handshake(ctx, conn)
if err != nil {
log.G(ctx).WithError(err).Error("ttrpc: refusing connection after handshake")
logrus.WithError(err).Error("ttrpc: refusing connection after handshake")
conn.Close()
continue
}
sc, err := s.newConn(approved, handshake)
if err != nil {
log.G(ctx).WithError(err).Error("ttrpc: create connection failed")
logrus.WithError(err).Error("ttrpc: create connection failed")
conn.Close()
continue
}
@ -197,7 +188,9 @@ func (s *Server) Close() error {
return err
}
func (s *Server) addListenerLocked(l net.Listener) {
func (s *Server) addListener(l net.Listener) {
s.mu.Lock()
defer s.mu.Unlock()
s.listeners[l] = struct{}{}
}
@ -520,12 +513,12 @@ func (c *serverConn) run(sctx context.Context) {
Payload: response.data,
})
if err != nil {
log.G(ctx).WithError(err).Error("failed marshaling response")
logrus.WithError(err).Error("failed marshaling response")
return
}
if err := ch.send(response.id, messageTypeResponse, 0, p); err != nil {
log.G(ctx).WithError(err).Error("failed sending message on channel")
logrus.WithError(err).Error("failed sending message on channel")
return
}
} else {
@ -537,7 +530,7 @@ func (c *serverConn) run(sctx context.Context) {
flags = flags | flagNoData
}
if err := ch.send(response.id, messageTypeData, flags, response.data); err != nil {
log.G(ctx).WithError(err).Error("failed sending message on channel")
logrus.WithError(err).Error("failed sending message on channel")
return
}
}
@ -559,7 +552,7 @@ func (c *serverConn) run(sctx context.Context) {
// requests, so that the client connection is closed
return
}
log.G(ctx).WithError(err).Error("error receiving message")
logrus.WithError(err).Error("error receiving message")
// else, initiate shutdown
case <-shutdown:
return

View File

@ -219,7 +219,7 @@ func TestServerShutdown(t *testing.T) {
// register a service that takes until we tell it to stop
server.Register(serviceName, map[string]Method{
"Test": func(_ context.Context, unmarshal func(interface{}) error) (interface{}, error) {
"Test": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req internal.TestPayload
if err := unmarshal(&req); err != nil {
return nil, err
@ -298,36 +298,6 @@ func TestServerClose(t *testing.T) {
checkServerShutdown(t, server)
}
func TestImmediateServerShutdown(t *testing.T) {
var (
ctx = context.Background()
server = mustServer(t)(NewServer())
addr, listener = newTestListener(t)
errs = make(chan error, 1)
_, cleanup = newTestClient(t, addr)
)
defer cleanup()
defer listener.Close()
go func() {
time.Sleep(1 * time.Millisecond)
errs <- server.Serve(ctx, listener)
}()
registerTestingService(server, &testingServer{})
if err := server.Shutdown(ctx); err != nil {
t.Fatal(err)
}
select {
case err := <-errs:
if err != ErrServerClosed {
t.Fatal(err)
}
case <-time.After(2 * time.Second):
t.Fatal("retreiving error from server.Shutdown() timed out")
}
}
func TestOversizeCall(t *testing.T) {
var (
ctx = context.Background()
@ -348,7 +318,7 @@ func TestOversizeCall(t *testing.T) {
Foo: strings.Repeat("a", 1+messageLengthMax),
}
if err := client.Call(ctx, serviceName, "Test", tp, tp); err == nil {
t.Fatalf("expected error from oversized message")
t.Fatalf("expected error from non-existent service call")
} else if status, ok := status.FromError(err); !ok {
t.Fatalf("expected status present in error: %v", err)
} else if status.Code() != codes.ResourceExhausted {
@ -393,8 +363,6 @@ func TestClientEOF(t *testing.T) {
t.Fatal(err)
}
client.UserOnCloseWait(ctx)
// server shutdown, but we still make a call.
if err := client.Call(ctx, serviceName, "Test", tp, tp); err == nil {
t.Fatalf("expected error when calling against shutdown server")

View File

@ -140,11 +140,7 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
respond(st, p, stream.StreamingServer, true)
}()
// Empty proto messages serialized to 0 payloads,
// so signatures like: rpc Stream(google.protobuf.Empty) returns (stream Data);
// don't get invoked here, which causes hang on client side.
// See https://github.com/containerd/ttrpc/issues/126
if req.Payload != nil || !info.StreamingClient {
if req.Payload != nil {
unmarshal := func(obj interface{}) error {
return protoUnmarshal(req.Payload, obj)
}

View File

@ -38,7 +38,7 @@ func TestStreamClient(t *testing.T) {
desc := &ServiceDesc{
Methods: map[string]Method{
"Echo": func(_ context.Context, unmarshal func(interface{}) error) (interface{}, error) {
"Echo": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req internal.EchoPayload
if err := unmarshal(&req); err != nil {
return nil, err
@ -49,7 +49,7 @@ func TestStreamClient(t *testing.T) {
},
Streams: map[string]Stream{
"EchoStream": {
Handler: func(_ context.Context, ss StreamServer) (interface{}, error) {
Handler: func(ctx context.Context, ss StreamServer) (interface{}, error) {
for {
var req internal.EchoPayload
if err := ss.RecvMsg(&req); err != nil {