Compare commits

...

49 Commits
v1.2.2 ... main

Author SHA1 Message Date
Kazuyoshi Kato ababa3fc18
Merge pull request #178 from djdongjin/bump-versions
Add dependabot and bump up golang and dependency versions
2025-01-14 09:57:47 -08:00
Jin Dong 4729a87007 Upgrade go, dependency, CI versions
Signed-off-by: Jin Dong <djdongjin95@gmail.com>
2024-12-30 01:42:33 +00:00
Fu Wei 3b8c8b7557
Merge pull request #177 from djdongjin/metadata-clone
Add MD.Clone function
2024-12-28 18:29:38 -05:00
Jin Dong 644ecfaa4c Add dependabot CI
Signed-off-by: Jin Dong <djdongjin95@gmail.com>
2024-12-28 01:39:20 +00:00
Jin Dong 430f734791 Add MD.Clone
Signed-off-by: Jin Dong <djdongjin95@gmail.com>
2024-12-27 16:12:45 +00:00
Akihiro Suda b71d9dee11
Merge pull request #175 from klihub/fixes/serve-listen-shutdown-race
server: fix a Serve() vs. (immediate) Shutdown() race
2024-10-29 09:31:08 +09:00
Derek McGowan bcc40a4d69
Merge pull request #171 from klihub/devel/sender-side-oversize-rejection
channel: reject oversized messages on the sender side(, too).
2024-09-26 10:15:50 -07:00
Krisztian Litkey c4d96d55ad
server: fix Serve() vs. immediate Shutdown() race.
Fix a race where an asynchronous server.Serve() invoked in a
a goroutine races with an almost immediate server.Shutdown().
If Shutdown() finishes its locked closing of listeners before
Serve() gets around to add the new one, Serve will sit stuck
forever in l.Accept(), unless the caller closes the listener
in addition to Shutdown().

This is probably almost impossible to trigger in real life,
but some of the unit tests, which run the server and client
in the same process, occasionally do trigger this. Then, if
the test tries to verify a final ErrServerClosed error from
Serve() after Shutdown() it gets stuck forever.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-09-16 09:52:00 +03:00
Krisztian Litkey ed6c3ba082
server_test: add Serve()/Shutdown() race test.
Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-09-16 09:51:56 +03:00
Krisztian Litkey b5cd6e4b32
channel: allow discovery of overflown message size.
Use a dedicated, grpc Status-compatible error to wrap the
unique grpc status code, the size of the rejected message
and the maximum allowed size when a message is rejected
due to size limitations by the sending side.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-27 11:23:20 +03:00
Krisztian Litkey d8c00dfec3
channel_test: update oversize message test.
Co-authored-by: Alessio Cantillo <cantillo.trd@gmail.com>
Co-authored-by: Qian Zhang <cosmoer@qq.com>
Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-21 17:53:44 +03:00
Krisztian Litkey de273bf751
channel: reject oversized messages on the sender side.
Reject oversized messages on the sender side, keeping the
receiver side rejection intact. This should provide minimal
low-level plumbing for clients to attempt application level
corrective actions on the requestor side, if the high-level
protocol is designed with this in mind.

Co-authored-by: Alessio Cantillo <cantillo.trd@gmail.com>
Co-authored-by: Qian Zhang <cosmoer@qq.com>
Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-21 17:52:38 +03:00
Fu Wei 3f02183720
Merge pull request #170 from klihub/fixes/oversized-call-test-errmsg 2024-08-20 08:06:14 +08:00
Krisztian Litkey 84e1784f34
server_test: fix error message in TestOversizeCall.
Fix copy-pasted error message in unit test.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-19 19:29:04 +03:00
Phil Estes 655622931d
Merge pull request #169 from thaJeztah/containerd_log
switch to github.com/containerd/log for logs
2024-06-19 17:49:44 -04:00
Sebastiaan van Stijn 4785c70883
switch to github.com/containerd/log for logs
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2024-06-19 23:19:35 +02:00
Kazuyoshi Kato 19d523c66a
Merge pull request #162 from austinvazquez/fix-build-status-badge
Fix CI build status badge in readme
2024-05-15 15:55:56 -07:00
Derek McGowan 196dbef628
Merge pull request #166 from containerd/dependabot/go_modules/google.golang.org/protobuf-1.33.0
Bump google.golang.org/protobuf from 1.31.0 to 1.33.0
2024-05-13 16:04:22 -07:00
Kevin Parsons ef5734239e
Merge pull request #168 from kevpar/deadlock
client: Fix deadlock when writing to pipe blocks
2024-05-13 17:22:09 -05:00
Kevin Parsons 1b4f6f8edb client: Fix deadlock when writing to pipe blocks
Use sendLock to guard the entire stream allocation + write to wire
operation, and streamLock to only guard access to the underlying stream
map. This ensures the following:
- We uphold the constraint that new stream IDs on the wire are always
  increasing, because whoever holds sendLock will be ensured to get the
  next stream ID and be the next to write to the wire.
- Locks are always released in LIFO order. This prevents deadlocks.

Taking sendLock before releasing streamLock means that if a goroutine
blocks writing to the pipe, it can make another goroutine get stuck
trying to take sendLock, and therefore streamLock will be kept locked as
well. This can lead to the receiver goroutine no longer being able to
read responses from the pipe, since it needs to take streamLock when
processing a response. This ultimately leads to a complete deadlock of
the client.

It is reasonable for a server to block writes to the pipe if the client
is not reading responses fast enough. So we can't expect writes to never
block.

I have repro'd the hang with a simple ttrpc client and server. The
client spins up 100 goroutines that spam the server with requests
constantly. After a few seconds of running I can see it hang. I have set
the buffer size for the pipe to 0 to more easily repro, but it would
still be possible to hit with a larger buffer size (just may take a
higher volume of requests or larger payloads).

I also validated that I no longer see the hang with this fix, by leaving
the test client/server running for a few minutes. Obviously not 100%
conclusive, but before I could get a hang within several seconds of
running.

Signed-off-by: Kevin Parsons <kevpar@microsoft.com>
2024-05-13 14:01:59 -07:00
Derek McGowan aa5f2d4e10
Merge pull request #167 from containerd/dependabot/go_modules/golang.org/x/net-0.23.0
Bump golang.org/x/net from 0.17.0 to 0.23.0
2024-05-13 08:32:26 -07:00
dependabot[bot] 13b8289864
Bump golang.org/x/net from 0.17.0 to 0.23.0
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.17.0 to 0.23.0.
- [Commits](https://github.com/golang/net/compare/v0.17.0...v0.23.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-04-19 12:24:18 +00:00
dependabot[bot] 272c8575a6
Bump google.golang.org/protobuf from 1.31.0 to 1.33.0
Bumps google.golang.org/protobuf from 1.31.0 to 1.33.0.

---
updated-dependencies:
- dependency-name: google.golang.org/protobuf
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-03-13 22:58:10 +00:00
Derek McGowan 4a2816be9b
Merge pull request #161 from austinvazquez/update-ci
Update GitHub Actions CI to resolve deprecation warnings
2024-02-29 08:50:09 -08:00
Austin Vazquez e0f3eadca5
Fix CI build status badge in readme
Signed-off-by: Austin Vazquez <macedonv@amazon.com>
2024-02-29 15:43:55 +00:00
Austin Vazquez 589a593abc
Update GitHub Actions CI to resolve deprecation warnings
Signed-off-by: Austin Vazquez <macedonv@amazon.com>
2024-02-29 15:28:55 +00:00
Derek McGowan faba5896a9
Merge pull request #158 from dmcgowan/update-protobuf
Fix proto3 generation error
2024-02-21 14:10:30 -08:00
Derek McGowan 73b6a9156d Add optional feature in protobuf compiler
Fixes error "is a proto3 file that contains optional fields, but code generator protoc-gen-go-ttrpc hasn't been updated to support optional fields in proto3. Please ask the owner of this code generator to support proto3 optional."

Signed-off-by: Derek McGowan <derek@mcg.dev>
2024-02-21 13:51:52 -08:00
Derek McGowan 90d421ee7e
Merge pull request #157 from mxpv/empty_payload
Fix streaming with empty payloads
2024-02-05 11:11:35 -08:00
Maksym Pavlenko 44ca0096e1 Add comment
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
2024-02-03 09:50:16 -08:00
Maksym Pavlenko 6615f159ba Fix linter
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
2024-02-02 12:14:32 -08:00
Maksym Pavlenko dea99e9d05 Fix handling of empty payloads
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
2024-02-02 12:10:01 -08:00
Maksym Pavlenko 336fc1b6b4 Add integration test to reproduce issue with empty payloads
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
2024-02-02 10:47:36 -08:00
Derek McGowan baadfd8e79
Merge pull request #156 from containerd/dependabot/go_modules/google.golang.org/grpc-1.57.1
Bump google.golang.org/grpc from 1.57.0 to 1.57.1
2023-10-30 08:05:53 -07:00
dependabot[bot] 1e51c4681d
Bump google.golang.org/grpc from 1.57.0 to 1.57.1
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.57.0 to 1.57.1.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.57.0...v1.57.1)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-10-26 15:40:27 +00:00
Derek McGowan b6bd7ce660
Merge pull request #155 from containerd/dependabot/go_modules/golang.org/x/net-0.17.0
Bump golang.org/x/net from 0.10.0 to 0.17.0
2023-10-26 08:39:44 -07:00
dependabot[bot] bea960d9fe
Bump golang.org/x/net from 0.10.0 to 0.17.0
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.10.0 to 0.17.0.
- [Commits](https://github.com/golang/net/compare/v0.10.0...v0.17.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-10-11 22:39:50 +00:00
Fu Wei 9c0db2b1c3
Merge pull request #152 from klihub/devel/unary-interceptor-chaining
Implement support for unary interceptor chaining.
2023-09-06 09:39:17 +08:00
Krisztian Litkey 40f227ddbb server: implement UnaryServerInterceptor chaining.
Add a WithChainUnaryServerInterceptor server option to allow
using more that one server side interceptor which will then
get chained and invoked in the order given.

This should allow us to implement opentelemetry instrumentation
as interceptors while allowing users to keep intercepting their
server side calls for other reasons at the same time.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2023-08-25 15:57:58 +03:00
Krisztian Litkey f984c9b178 client: implement UnaryClientInterceptor chaining.
Add a WithChainUnaryClientInterceptor client option to allow
using more that one client call interceptor which will then
get chained and invoked in the order given.

This should allow us to implement opentelemetry instrumentation
as interceptors while allowing users to keep intercepting their
client calls for other reasons at the same time.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2023-08-25 15:57:52 +03:00
Fu Wei b2f0adabbf
Merge pull request #153 from klihub/fixes/UserOnCloseWait-comment
Fix grammar in comment for UserOnCloseWait.
2023-08-08 06:12:19 +08:00
Fu Wei 9287d4978d
Merge pull request #154 from liggitt/genproto
Bump genproto dependency
2023-08-08 06:11:24 +08:00
Jordan Liggitt a2fbc14815
go.mod: google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d
Signed-off-by: Jordan Liggitt <liggitt@google.com>
2023-08-02 11:49:10 -04:00
Jordan Liggitt cf2b85de12
go.mod: bump to supported go version
Signed-off-by: Jordan Liggitt <liggitt@google.com>
2023-08-02 10:28:33 -04:00
Krisztian Litkey 8ca4110ebc Fix comment for UserOnCloseWait.
Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2023-07-28 20:02:17 +03:00
Fu Wei 05e0d07d83
Merge pull request #150 from klihub/fixes/server-test/TestClientEOF
server_test: wait for OnClose in TestClientEOF.
2023-07-28 09:22:27 +08:00
Krisztian Litkey e0cd801116 server_test: wait for OnClose in TestClientEOF.
In the test for client Call failing with ErrClosed on a closed
server, wait for the client's OnClose handler to get triggered
to make sure closing the socket had properly been administered
on the client's side. Otherwise trying a new Call() might fail
with some other error than ErrClosed, for instance ENOTCONN.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2023-07-27 17:59:04 +03:00
Fu Wei 712429a9f0
Merge pull request #151 from klihub/fixes/increase-ci-timeout
.github: give more slack for build+tests.
2023-07-25 22:06:23 +08:00
Krisztian Litkey 8d4784675e .github: give more slack for build+tests.
Increase timeout for build+run tests CI workflow
job from 5 to 10 minutes. It regularly times out
during github's busiest hours, typically either on
windows or macos workers.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2023-07-24 14:45:19 +03:00
23 changed files with 795 additions and 173 deletions

16
.github/dependabot.yml vendored Normal file
View File

@ -0,0 +1,16 @@
version: 2
updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10
groups:
golang-x:
patterns:
- "golang.org/x/*"
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10

View File

@ -7,7 +7,7 @@ on:
branches: [ main ]
env:
GO_VERSION: 1.20.x
GO_VERSION: 1.23.x
permissions:
contents: read
@ -27,27 +27,21 @@ jobs:
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
- uses: actions/setup-go@v3
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ env.GO_VERSION }}
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8 # v6.1.1
with:
version: v1.51.2
version: v1.60.3
args: --timeout=5m
skip-cache: true
working-directory: src/github.com/containerd/ttrpc
- name: golangci-lint errors
run: golangci-lint run
working-directory: src/github.com/containerd/ttrpc
if: ${{ failure() }}
#
# Project checks
#
@ -57,16 +51,15 @@ jobs:
timeout-minutes: 5
steps:
- uses: actions/setup-go@v3
with:
go-version: ${{ env.GO_VERSION }}
- uses: actions/checkout@v3
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ env.GO_VERSION }}
- uses: containerd/project-checks@v1.1.0
- uses: containerd/project-checks@434a07157608eeaa1d5c8d4dd506154204cd9401 # v1.1.0
with:
working-directory: src/github.com/containerd/ttrpc
@ -78,22 +71,20 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
go: [1.19.x, 1.20.x]
go: [1.22.x, 1.23.x]
name: ${{ matrix.os }} / ${{ matrix.go }}
runs-on: ${{ matrix.os }}
timeout-minutes: 5
timeout-minutes: 10
steps:
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go }}
- name: Check out code
uses: actions/checkout@v3
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ matrix.go }}
- name: Test
working-directory: src/github.com/containerd/ttrpc
@ -119,7 +110,11 @@ jobs:
timeout-minutes: 5
steps:
- uses: actions/setup-go@v3
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ env.GO_VERSION }}
id: go
@ -130,12 +125,6 @@ jobs:
echo "GOPATH=${{ github.workspace }}" >> $GITHUB_ENV
echo "${{ github.workspace }}/bin" >> $GITHUB_PATH
- name: Check out code
uses: actions/checkout@v3
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- name: Install dependencies
working-directory: src/github.com/containerd/ttrpc
run: |

View File

@ -6,7 +6,7 @@ linters:
- goimports
- revive
- ineffassign
- vet
- govet
- unused
- misspell
disable:
@ -14,7 +14,7 @@ linters:
linters-settings:
revive:
ignore-generated-headers: true
ignore-generated-header: true
rules:
- name: blank-imports
- name: context-as-argument
@ -45,8 +45,8 @@ linters-settings:
issues:
include:
- EXC0002
exclude-dirs:
- example
run:
timeout: 8m
skip-dirs:
- example

View File

@ -1,6 +1,6 @@
# ttrpc
[![Build Status](https://github.com/containerd/ttrpc/workflows/CI/badge.svg)](https://github.com/containerd/ttrpc/actions?query=workflow%3ACI)
[![Build Status](https://github.com/containerd/ttrpc/actions/workflows/ci.yml/badge.svg)](https://github.com/containerd/ttrpc/actions/workflows/ci.yml)
GRPC for low-memory environments.

View File

@ -143,10 +143,10 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
}
func (ch *channel) send(streamID uint32, t messageType, flags uint8, p []byte) error {
// TODO: Error on send rather than on recv
//if len(p) > messageLengthMax {
// return status.Errorf(codes.InvalidArgument, "refusing to send, message length %v exceed maximum message size of %v", len(p), messageLengthMax)
//}
if len(p) > messageLengthMax {
return OversizedMessageError(len(p))
}
if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t, Flags: flags}); err != nil {
return err
}

View File

@ -89,21 +89,19 @@ func TestReadWriteMessage(t *testing.T) {
func TestMessageOversize(t *testing.T) {
var (
w, r = net.Pipe()
wch, rch = newChannel(w), newChannel(r)
msg = bytes.Repeat([]byte("a message of massive length"), 512<<10)
errs = make(chan error, 1)
w, _ = net.Pipe()
wch = newChannel(w)
msg = bytes.Repeat([]byte("a message of massive length"), 512<<10)
errs = make(chan error, 1)
)
go func() {
if err := wch.send(1, 1, 0, msg); err != nil {
errs <- err
}
errs <- wch.send(1, 1, 0, msg)
}()
_, _, err := rch.recv()
err := <-errs
if err == nil {
t.Fatalf("error expected reading with small buffer")
t.Fatalf("sending oversized message expected to fail")
}
status, ok := status.FromError(err)
@ -114,12 +112,4 @@ func TestMessageOversize(t *testing.T) {
if status.Code() != codes.ResourceExhausted {
t.Fatalf("expected grpc status code: %v != %v", status.Code(), codes.ResourceExhausted)
}
select {
case err := <-errs:
if err != nil {
t.Fatal(err)
}
default:
}
}

View File

@ -27,7 +27,7 @@ import (
"syscall"
"time"
"github.com/sirupsen/logrus"
"github.com/containerd/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
@ -71,6 +71,42 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
}
}
// WithChainUnaryClientInterceptor sets the provided chain of client interceptors
func WithChainUnaryClientInterceptor(interceptors ...UnaryClientInterceptor) ClientOpts {
return func(c *Client) {
if len(interceptors) == 0 {
return
}
if c.interceptor != nil {
interceptors = append([]UnaryClientInterceptor{c.interceptor}, interceptors...)
}
c.interceptor = func(
ctx context.Context,
req *Request,
reply *Response,
info *UnaryClientInfo,
final Invoker,
) error {
return interceptors[0](ctx, req, reply, info,
chainUnaryInterceptors(interceptors[1:], final, info))
}
}
}
func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker, info *UnaryClientInfo) Invoker {
if len(interceptors) == 0 {
return final
}
return func(
ctx context.Context,
req *Request,
reply *Response,
) error {
return interceptors[0](ctx, req, reply, info,
chainUnaryInterceptors(interceptors[1:], final, info))
}
}
// NewClient creates a new ttrpc client using the given connection
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx, cancel := context.WithCancel(context.Background())
@ -85,13 +121,16 @@ func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx: ctx,
userCloseFunc: func() {},
userCloseWaitCh: make(chan struct{}),
interceptor: defaultClientInterceptor,
}
for _, o := range opts {
o(c)
}
if c.interceptor == nil {
c.interceptor = defaultClientInterceptor
}
go c.run()
return c
}
@ -286,7 +325,7 @@ func (c *Client) Close() error {
return nil
}
// UserOnCloseWait is used to blocks untils the user's on-close callback
// UserOnCloseWait is used to block until the user's on-close callback
// finishes.
func (c *Client) UserOnCloseWait(ctx context.Context) error {
select {
@ -329,7 +368,7 @@ func (c *Client) receiveLoop() error {
sid := streamID(msg.header.StreamID)
s := c.getStream(sid)
if s == nil {
logrus.WithField("stream", sid).Errorf("ttrpc: received message on inactive stream")
log.G(c.ctx).WithField("stream", sid).Error("ttrpc: received message on inactive stream")
continue
}
@ -337,7 +376,7 @@ func (c *Client) receiveLoop() error {
s.closeWithError(err)
} else {
if err := s.receive(c.ctx, msg); err != nil {
logrus.WithError(err).WithField("stream", sid).Errorf("ttrpc: failed to handle message")
log.G(c.ctx).WithFields(log.Fields{"error": err, "stream": sid}).Error("ttrpc: failed to handle message")
}
}
}
@ -347,25 +386,44 @@ func (c *Client) receiveLoop() error {
// createStream creates a new stream and registers it with the client
// Introduce stream types for multiple or single response
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
c.streamLock.Lock()
// sendLock must be held across both allocation of the stream ID and sending it across the wire.
// This ensures that new stream IDs sent on the wire are always increasing, which is a
// requirement of the TTRPC protocol.
// This use of sendLock could be split into another mutex that covers stream creation + first send,
// and just use sendLock to guard writing to the wire, but for now it seems simpler to have fewer mutexes.
c.sendLock.Lock()
defer c.sendLock.Unlock()
// Check if closed since lock acquired to prevent adding
// anything after cleanup completes
select {
case <-c.ctx.Done():
c.streamLock.Unlock()
return nil, ErrClosed
default:
}
// Stream ID should be allocated at same time
s := newStream(c.nextStreamID, c)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2
var s *stream
if err := func() error {
// In the future this could be replaced with a sync.Map instead of streamLock+map.
c.streamLock.Lock()
defer c.streamLock.Unlock()
c.sendLock.Lock()
defer c.sendLock.Unlock()
c.streamLock.Unlock()
// Check if closed since lock acquired to prevent adding
// anything after cleanup completes
select {
case <-c.ctx.Done():
return ErrClosed
default:
}
s = newStream(c.nextStreamID, c)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2
return nil
}(); err != nil {
return nil, err
}
if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
return s, filterCloseErr(err)

View File

@ -18,6 +18,7 @@ package main
import (
"google.golang.org/protobuf/compiler/protogen"
"google.golang.org/protobuf/types/pluginpb"
)
func main() {
@ -30,6 +31,7 @@ func main() {
return nil
},
}.Run(func(gen *protogen.Plugin) error {
gen.SupportedFeatures = uint64(pluginpb.CodeGeneratorResponse_FEATURE_PROTO3_OPTIONAL)
for _, f := range gen.Files {
if !f.Generate {
continue

View File

@ -16,7 +16,10 @@
package ttrpc
import "errors"
import (
"context"
"errors"
)
type serverConfig struct {
handshaker Handshaker
@ -44,9 +47,40 @@ func WithServerHandshaker(handshaker Handshaker) ServerOpt {
func WithUnaryServerInterceptor(i UnaryServerInterceptor) ServerOpt {
return func(c *serverConfig) error {
if c.interceptor != nil {
return errors.New("only one interceptor allowed per server")
return errors.New("only one unchained interceptor allowed per server")
}
c.interceptor = i
return nil
}
}
// WithChainUnaryServerInterceptor sets the provided chain of server interceptors
func WithChainUnaryServerInterceptor(interceptors ...UnaryServerInterceptor) ServerOpt {
return func(c *serverConfig) error {
if len(interceptors) == 0 {
return nil
}
if c.interceptor != nil {
interceptors = append([]UnaryServerInterceptor{c.interceptor}, interceptors...)
}
c.interceptor = func(
ctx context.Context,
unmarshal Unmarshaler,
info *UnaryServerInfo,
method Method) (interface{}, error) {
return interceptors[0](ctx, unmarshal, info,
chainUnaryServerInterceptors(info, method, interceptors[1:]))
}
return nil
}
}
func chainUnaryServerInterceptors(info *UnaryServerInfo, method Method, interceptors []UnaryServerInterceptor) Method {
if len(interceptors) == 0 {
return method
}
return func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
return interceptors[0](ctx, unmarshal, info,
chainUnaryServerInterceptors(info, method, interceptors[1:]))
}
}

View File

@ -16,7 +16,12 @@
package ttrpc
import "errors"
import (
"errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
// ErrProtocol is a general error in the handling the protocol.
@ -32,3 +37,44 @@ var (
// ErrStreamClosed is when the streaming connection is closed.
ErrStreamClosed = errors.New("ttrpc: stream closed")
)
// OversizedMessageErr is used to indicate refusal to send an oversized message.
// It wraps a ResourceExhausted grpc Status together with the offending message
// length.
type OversizedMessageErr struct {
messageLength int
err error
}
// OversizedMessageError returns an OversizedMessageErr error for the given message
// length if it exceeds the allowed maximum. Otherwise a nil error is returned.
func OversizedMessageError(messageLength int) error {
if messageLength <= messageLengthMax {
return nil
}
return &OversizedMessageErr{
messageLength: messageLength,
err: status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", messageLength, messageLengthMax),
}
}
// Error returns the error message for the corresponding grpc Status for the error.
func (e *OversizedMessageErr) Error() string {
return e.err.Error()
}
// Unwrap returns the corresponding error with our grpc status code.
func (e *OversizedMessageErr) Unwrap() error {
return e.err
}
// RejectedLength retrieves the rejected message length which triggered the error.
func (e *OversizedMessageErr) RejectedLength() int {
return e.messageLength
}
// MaximumLength retrieves the maximum allowed message length that triggered the error.
func (*OversizedMessageErr) MaximumLength() int {
return messageLengthMax
}

16
go.mod
View File

@ -1,14 +1,16 @@
module github.com/containerd/ttrpc
go 1.13
go 1.22
require (
github.com/containerd/log v0.1.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.0
github.com/golang/protobuf v1.5.4
github.com/prometheus/procfs v0.6.0
github.com/sirupsen/logrus v1.8.1
golang.org/x/sys v0.1.0
google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63
google.golang.org/grpc v1.27.1
google.golang.org/protobuf v1.27.1
golang.org/x/sys v0.26.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.0
)
require github.com/sirupsen/logrus v1.9.3 // indirect

89
go.sum
View File

@ -1,100 +1,69 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63 h1:YzfoEYWbODU5Fbt37+h7X16BWQbad7Q4S6gclTKFXM8=
google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -211,7 +211,7 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc =
0x52, 0x03, 0x61, 0x64, 0x64, 0x22, 0x29, 0x0a, 0x03, 0x53, 0x75, 0x6d, 0x12, 0x10, 0x0a, 0x03,
0x73, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x73, 0x75, 0x6d, 0x12, 0x10,
0x0a, 0x03, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x6e, 0x75, 0x6d,
0x32, 0xa0, 0x04, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x5a,
0x32, 0xfa, 0x04, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x5a,
0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x12, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69,
0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
@ -245,11 +245,17 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc =
0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f,
0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x28,
0x01, 0x30, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72,
0x70, 0x63, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x01, 0x30, 0x01, 0x12, 0x58, 0x0a, 0x12, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x50, 0x61, 0x79, 0x6c,
0x6f, 0x61, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x1a, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e,
0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x30, 0x01, 0x42, 0x3d, 0x5a,
0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74,
0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2f, 0x69, 0x6e, 0x74,
0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
0x6e, 0x67, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -278,14 +284,16 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_depIdxs =
2, // 3: ttrpc.integration.streaming.Streaming.DivideStream:input_type -> ttrpc.integration.streaming.Sum
0, // 4: ttrpc.integration.streaming.Streaming.EchoNull:input_type -> ttrpc.integration.streaming.EchoPayload
0, // 5: ttrpc.integration.streaming.Streaming.EchoNullStream:input_type -> ttrpc.integration.streaming.EchoPayload
0, // 6: ttrpc.integration.streaming.Streaming.Echo:output_type -> ttrpc.integration.streaming.EchoPayload
0, // 7: ttrpc.integration.streaming.Streaming.EchoStream:output_type -> ttrpc.integration.streaming.EchoPayload
2, // 8: ttrpc.integration.streaming.Streaming.SumStream:output_type -> ttrpc.integration.streaming.Sum
1, // 9: ttrpc.integration.streaming.Streaming.DivideStream:output_type -> ttrpc.integration.streaming.Part
3, // 10: ttrpc.integration.streaming.Streaming.EchoNull:output_type -> google.protobuf.Empty
3, // 11: ttrpc.integration.streaming.Streaming.EchoNullStream:output_type -> google.protobuf.Empty
6, // [6:12] is the sub-list for method output_type
0, // [0:6] is the sub-list for method input_type
3, // 6: ttrpc.integration.streaming.Streaming.EmptyPayloadStream:input_type -> google.protobuf.Empty
0, // 7: ttrpc.integration.streaming.Streaming.Echo:output_type -> ttrpc.integration.streaming.EchoPayload
0, // 8: ttrpc.integration.streaming.Streaming.EchoStream:output_type -> ttrpc.integration.streaming.EchoPayload
2, // 9: ttrpc.integration.streaming.Streaming.SumStream:output_type -> ttrpc.integration.streaming.Sum
1, // 10: ttrpc.integration.streaming.Streaming.DivideStream:output_type -> ttrpc.integration.streaming.Part
3, // 11: ttrpc.integration.streaming.Streaming.EchoNull:output_type -> google.protobuf.Empty
3, // 12: ttrpc.integration.streaming.Streaming.EchoNullStream:output_type -> google.protobuf.Empty
0, // 13: ttrpc.integration.streaming.Streaming.EmptyPayloadStream:output_type -> ttrpc.integration.streaming.EchoPayload
7, // [7:14] is the sub-list for method output_type
0, // [0:7] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name

View File

@ -34,6 +34,7 @@ service Streaming {
rpc DivideStream(Sum) returns (stream Part);
rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty);
rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty);
rpc EmptyPayloadStream(google.protobuf.Empty) returns (stream EchoPayload);
}
message EchoPayload {

View File

@ -15,6 +15,7 @@ type TTRPCStreamingService interface {
DivideStream(context.Context, *Sum, TTRPCStreaming_DivideStreamServer) error
EchoNull(context.Context, TTRPCStreaming_EchoNullServer) (*emptypb.Empty, error)
EchoNullStream(context.Context, TTRPCStreaming_EchoNullStreamServer) error
EmptyPayloadStream(context.Context, *emptypb.Empty, TTRPCStreaming_EmptyPayloadStreamServer) error
}
type TTRPCStreaming_EchoStreamServer interface {
@ -108,6 +109,19 @@ func (x *ttrpcstreamingEchoNullStreamServer) Recv() (*EchoPayload, error) {
return m, nil
}
type TTRPCStreaming_EmptyPayloadStreamServer interface {
Send(*EchoPayload) error
ttrpc.StreamServer
}
type ttrpcstreamingEmptyPayloadStreamServer struct {
ttrpc.StreamServer
}
func (x *ttrpcstreamingEmptyPayloadStreamServer) Send(m *EchoPayload) error {
return x.StreamServer.SendMsg(m)
}
func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService) {
srv.RegisterService("ttrpc.integration.streaming.Streaming", &ttrpc.ServiceDesc{
Methods: map[string]ttrpc.Method{
@ -159,6 +173,17 @@ func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService)
StreamingClient: true,
StreamingServer: true,
},
"EmptyPayloadStream": {
Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
m := new(emptypb.Empty)
if err := stream.RecvMsg(m); err != nil {
return nil, err
}
return nil, svc.EmptyPayloadStream(ctx, m, &ttrpcstreamingEmptyPayloadStreamServer{stream})
},
StreamingClient: false,
StreamingServer: true,
},
},
})
}
@ -170,6 +195,7 @@ type TTRPCStreamingClient interface {
DivideStream(context.Context, *Sum) (TTRPCStreaming_DivideStreamClient, error)
EchoNull(context.Context) (TTRPCStreaming_EchoNullClient, error)
EchoNullStream(context.Context) (TTRPCStreaming_EchoNullStreamClient, error)
EmptyPayloadStream(context.Context, *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error)
}
type ttrpcstreamingClient struct {
@ -360,3 +386,32 @@ func (x *ttrpcstreamingEchoNullStreamClient) Recv() (*emptypb.Empty, error) {
}
return m, nil
}
func (c *ttrpcstreamingClient) EmptyPayloadStream(ctx context.Context, req *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error) {
stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
StreamingClient: false,
StreamingServer: true,
}, "ttrpc.integration.streaming.Streaming", "EmptyPayloadStream", req)
if err != nil {
return nil, err
}
x := &ttrpcstreamingEmptyPayloadStreamClient{stream}
return x, nil
}
type TTRPCStreaming_EmptyPayloadStreamClient interface {
Recv() (*EchoPayload, error)
ttrpc.ClientStream
}
type ttrpcstreamingEmptyPayloadStreamClient struct {
ttrpc.ClientStream
}
func (x *ttrpcstreamingEmptyPayloadStreamClient) Recv() (*EchoPayload, error) {
m := new(EchoPayload)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

View File

@ -31,6 +31,7 @@ import (
"github.com/containerd/ttrpc"
"github.com/containerd/ttrpc/integration/streaming"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/protobuf/types/known/emptypb"
)
func runService(ctx context.Context, t testing.TB, service streaming.TTRPCStreamingService) (streaming.TTRPCStreamingClient, func()) {
@ -190,6 +191,14 @@ func (tss *testStreamingService) EchoNullStream(_ context.Context, es streaming.
return sendErr
}
func (tss *testStreamingService) EmptyPayloadStream(_ context.Context, _ *emptypb.Empty, streamer streaming.TTRPCStreaming_EmptyPayloadStreamServer) error {
if err := streamer.Send(&streaming.EchoPayload{Seq: 1}); err != nil {
return err
}
return streamer.Send(&streaming.EchoPayload{Seq: 2})
}
func TestStreamingService(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -203,6 +212,7 @@ func TestStreamingService(t *testing.T) {
t.Run("DivideStream", divideStreamTest(ctx, client))
t.Run("EchoNull", echoNullTest(ctx, client))
t.Run("EchoNullStream", echoNullStreamTest(ctx, client))
t.Run("EmptyPayloadStream", emptyPayloadStream(ctx, client))
}
func echoTest(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
@ -385,6 +395,33 @@ func echoNullStreamTest(ctx context.Context, client streaming.TTRPCStreamingClie
}
}
func emptyPayloadStream(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) {
return func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
stream, err := client.EmptyPayloadStream(ctx, nil)
if err != nil {
t.Fatal(err)
}
for i := uint32(1); i < 3; i++ {
first, err := stream.Recv()
if err != nil {
t.Fatal(err)
}
if first.Seq != i {
t.Fatalf("unexpected seq: %d != %d", first.Seq, i)
}
}
if _, err := stream.Recv(); err != io.EOF {
t.Fatalf("Expected io.EOF, got %v", err)
}
}
}
func assertNextEcho(t testing.TB, a, b *streaming.EchoPayload) {
t.Helper()
if a.Msg != b.Msg {

245
interceptor_test.go Normal file
View File

@ -0,0 +1,245 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttrpc
import (
"context"
"reflect"
"strings"
"testing"
"github.com/containerd/ttrpc/internal"
)
func TestUnaryClientInterceptor(t *testing.T) {
var (
intercepted = false
interceptor = func(ctx context.Context, req *Request, reply *Response, _ *UnaryClientInfo, i Invoker) error {
intercepted = true
return i(ctx, req, reply)
}
ctx = context.Background()
server = mustServer(t)(NewServer())
testImpl = &testingServer{}
addr, listener = newTestListener(t)
client, cleanup = newTestClient(t, addr, WithUnaryClientInterceptor(interceptor))
message = strings.Repeat("a", 16)
reply = strings.Repeat(message, 2)
)
defer listener.Close()
defer cleanup()
registerTestingService(server, testImpl)
go server.Serve(ctx, listener)
defer server.Shutdown(ctx)
request := &internal.TestPayload{
Foo: message,
}
response := &internal.TestPayload{}
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !intercepted {
t.Fatalf("ttrpc client call not intercepted")
}
if response.Foo != reply {
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
}
}
func TestChainUnaryClientInterceptor(t *testing.T) {
var (
orderIdx = 0
recorded = []string{}
intercept = func(idx int, tag string) UnaryClientInterceptor {
return func(ctx context.Context, req *Request, reply *Response, _ *UnaryClientInfo, i Invoker) error {
if idx != orderIdx {
t.Fatalf("unexpected interceptor invocation order (%d != %d)", orderIdx, idx)
}
recorded = append(recorded, tag)
orderIdx++
return i(ctx, req, reply)
}
}
ctx = context.Background()
server = mustServer(t)(NewServer())
testImpl = &testingServer{}
addr, listener = newTestListener(t)
client, cleanup = newTestClient(t, addr,
WithChainUnaryClientInterceptor(),
WithChainUnaryClientInterceptor(
intercept(0, "seen it"),
intercept(1, "been"),
intercept(2, "there"),
intercept(3, "done"),
intercept(4, "that"),
),
)
expected = []string{
"seen it",
"been",
"there",
"done",
"that",
}
message = strings.Repeat("a", 16)
reply = strings.Repeat(message, 2)
)
defer listener.Close()
defer cleanup()
registerTestingService(server, testImpl)
go server.Serve(ctx, listener)
defer server.Shutdown(ctx)
request := &internal.TestPayload{
Foo: message,
}
response := &internal.TestPayload{}
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(recorded, expected) {
t.Fatalf("unexpected ttrpc chained client unary interceptor order (%s != %s)",
strings.Join(recorded, " "), strings.Join(expected, " "))
}
if response.Foo != reply {
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
}
}
func TestUnaryServerInterceptor(t *testing.T) {
var (
intercepted = false
interceptor = func(ctx context.Context, unmarshal Unmarshaler, _ *UnaryServerInfo, method Method) (interface{}, error) {
intercepted = true
return method(ctx, unmarshal)
}
ctx = context.Background()
server = mustServer(t)(NewServer(WithUnaryServerInterceptor(interceptor)))
testImpl = &testingServer{}
addr, listener = newTestListener(t)
client, cleanup = newTestClient(t, addr)
message = strings.Repeat("a", 16)
reply = strings.Repeat(message, 2)
)
defer listener.Close()
defer cleanup()
registerTestingService(server, testImpl)
go server.Serve(ctx, listener)
defer server.Shutdown(ctx)
request := &internal.TestPayload{
Foo: message,
}
response := &internal.TestPayload{}
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !intercepted {
t.Fatalf("ttrpc server call not intercepted")
}
if response.Foo != reply {
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
}
}
func TestChainUnaryServerInterceptor(t *testing.T) {
var (
orderIdx = 0
recorded = []string{}
intercept = func(idx int, tag string) UnaryServerInterceptor {
return func(ctx context.Context, unmarshal Unmarshaler, _ *UnaryServerInfo, method Method) (interface{}, error) {
if orderIdx != idx {
t.Fatalf("unexpected interceptor invocation order (%d != %d)", orderIdx, idx)
}
recorded = append(recorded, tag)
orderIdx++
return method(ctx, unmarshal)
}
}
ctx = context.Background()
server = mustServer(t)(NewServer(
WithUnaryServerInterceptor(
intercept(0, "seen it"),
),
WithChainUnaryServerInterceptor(
intercept(1, "been"),
intercept(2, "there"),
intercept(3, "done"),
intercept(4, "that"),
),
))
expected = []string{
"seen it",
"been",
"there",
"done",
"that",
}
testImpl = &testingServer{}
addr, listener = newTestListener(t)
client, cleanup = newTestClient(t, addr)
message = strings.Repeat("a", 16)
reply = strings.Repeat(message, 2)
)
defer listener.Close()
defer cleanup()
registerTestingService(server, testImpl)
go server.Serve(ctx, listener)
defer server.Shutdown(ctx)
request := &internal.TestPayload{
Foo: message,
}
response := &internal.TestPayload{}
if err := client.Call(ctx, serviceName, "Test", request, response); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(recorded, expected) {
t.Fatalf("unexpected ttrpc chained server unary interceptor order (%s != %s)",
strings.Join(recorded, " "), strings.Join(expected, " "))
}
if response.Foo != reply {
t.Fatalf("unexpected test service reply: %q != %q", response.Foo, reply)
}
}

View File

@ -62,6 +62,34 @@ func (m MD) Append(key string, values ...string) {
}
}
// Clone returns a copy of MD or nil if it's nil.
// It's copied from golang's `http.Header.Clone` implementation:
// https://cs.opensource.google/go/go/+/refs/tags/go1.23.4:src/net/http/header.go;l=94
func (m MD) Clone() MD {
if m == nil {
return nil
}
// Find total number of values.
nv := 0
for _, vv := range m {
nv += len(vv)
}
sv := make([]string, nv) // shared backing array for headers' values
m2 := make(MD, len(m))
for k, vv := range m {
if vv == nil {
// Preserve nil values.
m2[k] = nil
continue
}
n := copy(sv, vv)
m2[k] = sv[:n:n]
sv = sv[n:]
}
return m2
}
func (m MD) setRequest(r *Request) {
for k, values := range m {
for _, v := range values {

View File

@ -18,6 +18,8 @@ package ttrpc
import (
"context"
"fmt"
"sync"
"testing"
)
@ -106,3 +108,100 @@ func TestMetadataContext(t *testing.T) {
t.Errorf("invalid metadata value: %q", bar)
}
}
func TestMetadataClone(t *testing.T) {
var metadata MD
m2 := metadata.Clone()
if m2 != nil {
t.Error("MD.Clone() on nil metadata should return nil")
}
metadata = MD{"nil": nil, "foo": {"bar"}, "baz": {"qux", "quxx"}}
m2 = metadata.Clone()
if len(metadata) != len(m2) {
t.Errorf("unexpected number of keys: %d, expected: %d", len(m2), len(metadata))
}
for k, v := range metadata {
v2, ok := m2[k]
if !ok {
t.Errorf("key not found: %s", k)
}
if v == nil && v2 == nil {
continue
}
if v == nil || v2 == nil {
t.Errorf("unexpected nil value: %v, expected: %v", v2, v)
}
if len(v) != len(v2) {
t.Errorf("unexpected number of values: %d, expected: %d", len(v2), len(v))
}
for i := range v {
if v[i] != v2[i] {
t.Errorf("unexpected value: %s, expected: %s", v2[i], v[i])
}
}
}
}
func TestMetadataCloneConcurrent(t *testing.T) {
metadata := make(MD)
metadata.Set("foo", "bar")
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
m2 := metadata.Clone()
m2.Set("foo", "baz")
}()
}
wg.Wait()
// Concurrent modification should clone the metadata first to avoid panic
// due to concurrent map writes.
if val, ok := metadata.Get("foo"); !ok {
t.Error("metadata not found")
} else if val[0] != "bar" {
t.Errorf("invalid metadata value: %q", val[0])
}
}
func simpleClone(src MD) MD {
md := MD{}
for k, v := range src {
md[k] = append(md[k], v...)
}
return md
}
func BenchmarkMetadataClone(b *testing.B) {
for _, sz := range []int{5, 10, 20, 50} {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
metadata := make(MD)
for i := 0; i < sz; i++ {
metadata.Set("foo"+fmt.Sprint(i), "bar"+fmt.Sprint(i))
}
for i := 0; i < b.N; i++ {
_ = metadata.Clone()
}
})
}
}
func BenchmarkSimpleMetadataClone(b *testing.B) {
for _, sz := range []int{5, 10, 20, 50} {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
metadata := make(MD)
for i := 0; i < sz; i++ {
metadata.Set("foo"+fmt.Sprint(i), "bar"+fmt.Sprint(i))
}
for i := 0; i < b.N; i++ {
_ = simpleClone(metadata)
}
})
}
}

View File

@ -27,7 +27,7 @@ import (
"syscall"
"time"
"github.com/sirupsen/logrus"
"github.com/containerd/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@ -74,9 +74,18 @@ func (s *Server) RegisterService(name string, desc *ServiceDesc) {
}
func (s *Server) Serve(ctx context.Context, l net.Listener) error {
s.addListener(l)
s.mu.Lock()
s.addListenerLocked(l)
defer s.closeListener(l)
select {
case <-s.done:
s.mu.Unlock()
return ErrServerClosed
default:
}
s.mu.Unlock()
var (
backoff time.Duration
handshaker = s.config.handshaker
@ -109,7 +118,7 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) error {
}
sleep := time.Duration(rand.Int63n(int64(backoff)))
logrus.WithError(err).Errorf("ttrpc: failed accept; backoff %v", sleep)
log.G(ctx).WithError(err).Errorf("ttrpc: failed accept; backoff %v", sleep)
time.Sleep(sleep)
continue
}
@ -121,14 +130,14 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) error {
approved, handshake, err := handshaker.Handshake(ctx, conn)
if err != nil {
logrus.WithError(err).Error("ttrpc: refusing connection after handshake")
log.G(ctx).WithError(err).Error("ttrpc: refusing connection after handshake")
conn.Close()
continue
}
sc, err := s.newConn(approved, handshake)
if err != nil {
logrus.WithError(err).Error("ttrpc: create connection failed")
log.G(ctx).WithError(err).Error("ttrpc: create connection failed")
conn.Close()
continue
}
@ -188,9 +197,7 @@ func (s *Server) Close() error {
return err
}
func (s *Server) addListener(l net.Listener) {
s.mu.Lock()
defer s.mu.Unlock()
func (s *Server) addListenerLocked(l net.Listener) {
s.listeners[l] = struct{}{}
}
@ -513,12 +520,12 @@ func (c *serverConn) run(sctx context.Context) {
Payload: response.data,
})
if err != nil {
logrus.WithError(err).Error("failed marshaling response")
log.G(ctx).WithError(err).Error("failed marshaling response")
return
}
if err := ch.send(response.id, messageTypeResponse, 0, p); err != nil {
logrus.WithError(err).Error("failed sending message on channel")
log.G(ctx).WithError(err).Error("failed sending message on channel")
return
}
} else {
@ -530,7 +537,7 @@ func (c *serverConn) run(sctx context.Context) {
flags = flags | flagNoData
}
if err := ch.send(response.id, messageTypeData, flags, response.data); err != nil {
logrus.WithError(err).Error("failed sending message on channel")
log.G(ctx).WithError(err).Error("failed sending message on channel")
return
}
}
@ -552,7 +559,7 @@ func (c *serverConn) run(sctx context.Context) {
// requests, so that the client connection is closed
return
}
logrus.WithError(err).Error("error receiving message")
log.G(ctx).WithError(err).Error("error receiving message")
// else, initiate shutdown
case <-shutdown:
return

View File

@ -219,7 +219,7 @@ func TestServerShutdown(t *testing.T) {
// register a service that takes until we tell it to stop
server.Register(serviceName, map[string]Method{
"Test": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
"Test": func(_ context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req internal.TestPayload
if err := unmarshal(&req); err != nil {
return nil, err
@ -298,6 +298,36 @@ func TestServerClose(t *testing.T) {
checkServerShutdown(t, server)
}
func TestImmediateServerShutdown(t *testing.T) {
var (
ctx = context.Background()
server = mustServer(t)(NewServer())
addr, listener = newTestListener(t)
errs = make(chan error, 1)
_, cleanup = newTestClient(t, addr)
)
defer cleanup()
defer listener.Close()
go func() {
time.Sleep(1 * time.Millisecond)
errs <- server.Serve(ctx, listener)
}()
registerTestingService(server, &testingServer{})
if err := server.Shutdown(ctx); err != nil {
t.Fatal(err)
}
select {
case err := <-errs:
if err != ErrServerClosed {
t.Fatal(err)
}
case <-time.After(2 * time.Second):
t.Fatal("retreiving error from server.Shutdown() timed out")
}
}
func TestOversizeCall(t *testing.T) {
var (
ctx = context.Background()
@ -318,7 +348,7 @@ func TestOversizeCall(t *testing.T) {
Foo: strings.Repeat("a", 1+messageLengthMax),
}
if err := client.Call(ctx, serviceName, "Test", tp, tp); err == nil {
t.Fatalf("expected error from non-existent service call")
t.Fatalf("expected error from oversized message")
} else if status, ok := status.FromError(err); !ok {
t.Fatalf("expected status present in error: %v", err)
} else if status.Code() != codes.ResourceExhausted {
@ -363,6 +393,8 @@ func TestClientEOF(t *testing.T) {
t.Fatal(err)
}
client.UserOnCloseWait(ctx)
// server shutdown, but we still make a call.
if err := client.Call(ctx, serviceName, "Test", tp, tp); err == nil {
t.Fatalf("expected error when calling against shutdown server")

View File

@ -140,7 +140,11 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
respond(st, p, stream.StreamingServer, true)
}()
if req.Payload != nil {
// Empty proto messages serialized to 0 payloads,
// so signatures like: rpc Stream(google.protobuf.Empty) returns (stream Data);
// don't get invoked here, which causes hang on client side.
// See https://github.com/containerd/ttrpc/issues/126
if req.Payload != nil || !info.StreamingClient {
unmarshal := func(obj interface{}) error {
return protoUnmarshal(req.Payload, obj)
}

View File

@ -38,7 +38,7 @@ func TestStreamClient(t *testing.T) {
desc := &ServiceDesc{
Methods: map[string]Method{
"Echo": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
"Echo": func(_ context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req internal.EchoPayload
if err := unmarshal(&req); err != nil {
return nil, err
@ -49,7 +49,7 @@ func TestStreamClient(t *testing.T) {
},
Streams: map[string]Stream{
"EchoStream": {
Handler: func(ctx context.Context, ss StreamServer) (interface{}, error) {
Handler: func(_ context.Context, ss StreamServer) (interface{}, error) {
for {
var req internal.EchoPayload
if err := ss.RecvMsg(&req); err != nil {