Compare commits

...

28 Commits
v1.2.3 ... main

Author SHA1 Message Date
Kazuyoshi Kato ababa3fc18
Merge pull request #178 from djdongjin/bump-versions
Add dependabot and bump up golang and dependency versions
2025-01-14 09:57:47 -08:00
Jin Dong 4729a87007 Upgrade go, dependency, CI versions
Signed-off-by: Jin Dong <djdongjin95@gmail.com>
2024-12-30 01:42:33 +00:00
Fu Wei 3b8c8b7557
Merge pull request #177 from djdongjin/metadata-clone
Add MD.Clone function
2024-12-28 18:29:38 -05:00
Jin Dong 644ecfaa4c Add dependabot CI
Signed-off-by: Jin Dong <djdongjin95@gmail.com>
2024-12-28 01:39:20 +00:00
Jin Dong 430f734791 Add MD.Clone
Signed-off-by: Jin Dong <djdongjin95@gmail.com>
2024-12-27 16:12:45 +00:00
Akihiro Suda b71d9dee11
Merge pull request #175 from klihub/fixes/serve-listen-shutdown-race
server: fix a Serve() vs. (immediate) Shutdown() race
2024-10-29 09:31:08 +09:00
Derek McGowan bcc40a4d69
Merge pull request #171 from klihub/devel/sender-side-oversize-rejection
channel: reject oversized messages on the sender side(, too).
2024-09-26 10:15:50 -07:00
Krisztian Litkey c4d96d55ad
server: fix Serve() vs. immediate Shutdown() race.
Fix a race where an asynchronous server.Serve() invoked in a
a goroutine races with an almost immediate server.Shutdown().
If Shutdown() finishes its locked closing of listeners before
Serve() gets around to add the new one, Serve will sit stuck
forever in l.Accept(), unless the caller closes the listener
in addition to Shutdown().

This is probably almost impossible to trigger in real life,
but some of the unit tests, which run the server and client
in the same process, occasionally do trigger this. Then, if
the test tries to verify a final ErrServerClosed error from
Serve() after Shutdown() it gets stuck forever.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-09-16 09:52:00 +03:00
Krisztian Litkey ed6c3ba082
server_test: add Serve()/Shutdown() race test.
Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-09-16 09:51:56 +03:00
Krisztian Litkey b5cd6e4b32
channel: allow discovery of overflown message size.
Use a dedicated, grpc Status-compatible error to wrap the
unique grpc status code, the size of the rejected message
and the maximum allowed size when a message is rejected
due to size limitations by the sending side.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-27 11:23:20 +03:00
Krisztian Litkey d8c00dfec3
channel_test: update oversize message test.
Co-authored-by: Alessio Cantillo <cantillo.trd@gmail.com>
Co-authored-by: Qian Zhang <cosmoer@qq.com>
Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-21 17:53:44 +03:00
Krisztian Litkey de273bf751
channel: reject oversized messages on the sender side.
Reject oversized messages on the sender side, keeping the
receiver side rejection intact. This should provide minimal
low-level plumbing for clients to attempt application level
corrective actions on the requestor side, if the high-level
protocol is designed with this in mind.

Co-authored-by: Alessio Cantillo <cantillo.trd@gmail.com>
Co-authored-by: Qian Zhang <cosmoer@qq.com>
Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-21 17:52:38 +03:00
Fu Wei 3f02183720
Merge pull request #170 from klihub/fixes/oversized-call-test-errmsg 2024-08-20 08:06:14 +08:00
Krisztian Litkey 84e1784f34
server_test: fix error message in TestOversizeCall.
Fix copy-pasted error message in unit test.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-19 19:29:04 +03:00
Phil Estes 655622931d
Merge pull request #169 from thaJeztah/containerd_log
switch to github.com/containerd/log for logs
2024-06-19 17:49:44 -04:00
Sebastiaan van Stijn 4785c70883
switch to github.com/containerd/log for logs
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2024-06-19 23:19:35 +02:00
Kazuyoshi Kato 19d523c66a
Merge pull request #162 from austinvazquez/fix-build-status-badge
Fix CI build status badge in readme
2024-05-15 15:55:56 -07:00
Derek McGowan 196dbef628
Merge pull request #166 from containerd/dependabot/go_modules/google.golang.org/protobuf-1.33.0
Bump google.golang.org/protobuf from 1.31.0 to 1.33.0
2024-05-13 16:04:22 -07:00
Kevin Parsons ef5734239e
Merge pull request #168 from kevpar/deadlock
client: Fix deadlock when writing to pipe blocks
2024-05-13 17:22:09 -05:00
Kevin Parsons 1b4f6f8edb client: Fix deadlock when writing to pipe blocks
Use sendLock to guard the entire stream allocation + write to wire
operation, and streamLock to only guard access to the underlying stream
map. This ensures the following:
- We uphold the constraint that new stream IDs on the wire are always
  increasing, because whoever holds sendLock will be ensured to get the
  next stream ID and be the next to write to the wire.
- Locks are always released in LIFO order. This prevents deadlocks.

Taking sendLock before releasing streamLock means that if a goroutine
blocks writing to the pipe, it can make another goroutine get stuck
trying to take sendLock, and therefore streamLock will be kept locked as
well. This can lead to the receiver goroutine no longer being able to
read responses from the pipe, since it needs to take streamLock when
processing a response. This ultimately leads to a complete deadlock of
the client.

It is reasonable for a server to block writes to the pipe if the client
is not reading responses fast enough. So we can't expect writes to never
block.

I have repro'd the hang with a simple ttrpc client and server. The
client spins up 100 goroutines that spam the server with requests
constantly. After a few seconds of running I can see it hang. I have set
the buffer size for the pipe to 0 to more easily repro, but it would
still be possible to hit with a larger buffer size (just may take a
higher volume of requests or larger payloads).

I also validated that I no longer see the hang with this fix, by leaving
the test client/server running for a few minutes. Obviously not 100%
conclusive, but before I could get a hang within several seconds of
running.

Signed-off-by: Kevin Parsons <kevpar@microsoft.com>
2024-05-13 14:01:59 -07:00
Derek McGowan aa5f2d4e10
Merge pull request #167 from containerd/dependabot/go_modules/golang.org/x/net-0.23.0
Bump golang.org/x/net from 0.17.0 to 0.23.0
2024-05-13 08:32:26 -07:00
dependabot[bot] 13b8289864
Bump golang.org/x/net from 0.17.0 to 0.23.0
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.17.0 to 0.23.0.
- [Commits](https://github.com/golang/net/compare/v0.17.0...v0.23.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-04-19 12:24:18 +00:00
dependabot[bot] 272c8575a6
Bump google.golang.org/protobuf from 1.31.0 to 1.33.0
Bumps google.golang.org/protobuf from 1.31.0 to 1.33.0.

---
updated-dependencies:
- dependency-name: google.golang.org/protobuf
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-03-13 22:58:10 +00:00
Derek McGowan 4a2816be9b
Merge pull request #161 from austinvazquez/update-ci
Update GitHub Actions CI to resolve deprecation warnings
2024-02-29 08:50:09 -08:00
Austin Vazquez e0f3eadca5
Fix CI build status badge in readme
Signed-off-by: Austin Vazquez <macedonv@amazon.com>
2024-02-29 15:43:55 +00:00
Austin Vazquez 589a593abc
Update GitHub Actions CI to resolve deprecation warnings
Signed-off-by: Austin Vazquez <macedonv@amazon.com>
2024-02-29 15:28:55 +00:00
Derek McGowan faba5896a9
Merge pull request #158 from dmcgowan/update-protobuf
Fix proto3 generation error
2024-02-21 14:10:30 -08:00
Derek McGowan 73b6a9156d Add optional feature in protobuf compiler
Fixes error "is a proto3 file that contains optional fields, but code generator protoc-gen-go-ttrpc hasn't been updated to support optional fields in proto3. Please ask the owner of this code generator to support proto3 optional."

Signed-off-by: Derek McGowan <derek@mcg.dev>
2024-02-21 13:51:52 -08:00
17 changed files with 351 additions and 119 deletions

16
.github/dependabot.yml vendored Normal file
View File

@ -0,0 +1,16 @@
version: 2
updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10
groups:
golang-x:
patterns:
- "golang.org/x/*"
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10

View File

@ -7,7 +7,7 @@ on:
branches: [ main ]
env:
GO_VERSION: 1.20.x
GO_VERSION: 1.23.x
permissions:
contents: read
@ -27,27 +27,21 @@ jobs:
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
- uses: actions/setup-go@v3
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ env.GO_VERSION }}
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8 # v6.1.1
with:
version: v1.51.2
version: v1.60.3
args: --timeout=5m
skip-cache: true
working-directory: src/github.com/containerd/ttrpc
- name: golangci-lint errors
run: golangci-lint run
working-directory: src/github.com/containerd/ttrpc
if: ${{ failure() }}
#
# Project checks
#
@ -57,16 +51,15 @@ jobs:
timeout-minutes: 5
steps:
- uses: actions/setup-go@v3
with:
go-version: ${{ env.GO_VERSION }}
- uses: actions/checkout@v3
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ env.GO_VERSION }}
- uses: containerd/project-checks@v1.1.0
- uses: containerd/project-checks@434a07157608eeaa1d5c8d4dd506154204cd9401 # v1.1.0
with:
working-directory: src/github.com/containerd/ttrpc
@ -78,22 +71,20 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
go: [1.19.x, 1.20.x]
go: [1.22.x, 1.23.x]
name: ${{ matrix.os }} / ${{ matrix.go }}
runs-on: ${{ matrix.os }}
timeout-minutes: 10
steps:
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go }}
- name: Check out code
uses: actions/checkout@v3
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ matrix.go }}
- name: Test
working-directory: src/github.com/containerd/ttrpc
@ -119,7 +110,11 @@ jobs:
timeout-minutes: 5
steps:
- uses: actions/setup-go@v3
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ env.GO_VERSION }}
id: go
@ -130,12 +125,6 @@ jobs:
echo "GOPATH=${{ github.workspace }}" >> $GITHUB_ENV
echo "${{ github.workspace }}/bin" >> $GITHUB_PATH
- name: Check out code
uses: actions/checkout@v3
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- name: Install dependencies
working-directory: src/github.com/containerd/ttrpc
run: |

View File

@ -6,7 +6,7 @@ linters:
- goimports
- revive
- ineffassign
- vet
- govet
- unused
- misspell
disable:
@ -14,7 +14,7 @@ linters:
linters-settings:
revive:
ignore-generated-headers: true
ignore-generated-header: true
rules:
- name: blank-imports
- name: context-as-argument
@ -45,8 +45,8 @@ linters-settings:
issues:
include:
- EXC0002
exclude-dirs:
- example
run:
timeout: 8m
skip-dirs:
- example

View File

@ -1,6 +1,6 @@
# ttrpc
[![Build Status](https://github.com/containerd/ttrpc/workflows/CI/badge.svg)](https://github.com/containerd/ttrpc/actions?query=workflow%3ACI)
[![Build Status](https://github.com/containerd/ttrpc/actions/workflows/ci.yml/badge.svg)](https://github.com/containerd/ttrpc/actions/workflows/ci.yml)
GRPC for low-memory environments.

View File

@ -143,10 +143,10 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
}
func (ch *channel) send(streamID uint32, t messageType, flags uint8, p []byte) error {
// TODO: Error on send rather than on recv
//if len(p) > messageLengthMax {
// return status.Errorf(codes.InvalidArgument, "refusing to send, message length %v exceed maximum message size of %v", len(p), messageLengthMax)
//}
if len(p) > messageLengthMax {
return OversizedMessageError(len(p))
}
if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t, Flags: flags}); err != nil {
return err
}

View File

@ -89,21 +89,19 @@ func TestReadWriteMessage(t *testing.T) {
func TestMessageOversize(t *testing.T) {
var (
w, r = net.Pipe()
wch, rch = newChannel(w), newChannel(r)
msg = bytes.Repeat([]byte("a message of massive length"), 512<<10)
errs = make(chan error, 1)
w, _ = net.Pipe()
wch = newChannel(w)
msg = bytes.Repeat([]byte("a message of massive length"), 512<<10)
errs = make(chan error, 1)
)
go func() {
if err := wch.send(1, 1, 0, msg); err != nil {
errs <- err
}
errs <- wch.send(1, 1, 0, msg)
}()
_, _, err := rch.recv()
err := <-errs
if err == nil {
t.Fatalf("error expected reading with small buffer")
t.Fatalf("sending oversized message expected to fail")
}
status, ok := status.FromError(err)
@ -114,12 +112,4 @@ func TestMessageOversize(t *testing.T) {
if status.Code() != codes.ResourceExhausted {
t.Fatalf("expected grpc status code: %v != %v", status.Code(), codes.ResourceExhausted)
}
select {
case err := <-errs:
if err != nil {
t.Fatal(err)
}
default:
}
}

View File

@ -27,7 +27,7 @@ import (
"syscall"
"time"
"github.com/sirupsen/logrus"
"github.com/containerd/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
@ -368,7 +368,7 @@ func (c *Client) receiveLoop() error {
sid := streamID(msg.header.StreamID)
s := c.getStream(sid)
if s == nil {
logrus.WithField("stream", sid).Errorf("ttrpc: received message on inactive stream")
log.G(c.ctx).WithField("stream", sid).Error("ttrpc: received message on inactive stream")
continue
}
@ -376,7 +376,7 @@ func (c *Client) receiveLoop() error {
s.closeWithError(err)
} else {
if err := s.receive(c.ctx, msg); err != nil {
logrus.WithError(err).WithField("stream", sid).Errorf("ttrpc: failed to handle message")
log.G(c.ctx).WithFields(log.Fields{"error": err, "stream": sid}).Error("ttrpc: failed to handle message")
}
}
}
@ -386,25 +386,44 @@ func (c *Client) receiveLoop() error {
// createStream creates a new stream and registers it with the client
// Introduce stream types for multiple or single response
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
c.streamLock.Lock()
// sendLock must be held across both allocation of the stream ID and sending it across the wire.
// This ensures that new stream IDs sent on the wire are always increasing, which is a
// requirement of the TTRPC protocol.
// This use of sendLock could be split into another mutex that covers stream creation + first send,
// and just use sendLock to guard writing to the wire, but for now it seems simpler to have fewer mutexes.
c.sendLock.Lock()
defer c.sendLock.Unlock()
// Check if closed since lock acquired to prevent adding
// anything after cleanup completes
select {
case <-c.ctx.Done():
c.streamLock.Unlock()
return nil, ErrClosed
default:
}
// Stream ID should be allocated at same time
s := newStream(c.nextStreamID, c)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2
var s *stream
if err := func() error {
// In the future this could be replaced with a sync.Map instead of streamLock+map.
c.streamLock.Lock()
defer c.streamLock.Unlock()
c.sendLock.Lock()
defer c.sendLock.Unlock()
c.streamLock.Unlock()
// Check if closed since lock acquired to prevent adding
// anything after cleanup completes
select {
case <-c.ctx.Done():
return ErrClosed
default:
}
s = newStream(c.nextStreamID, c)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2
return nil
}(); err != nil {
return nil, err
}
if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
return s, filterCloseErr(err)

View File

@ -18,6 +18,7 @@ package main
import (
"google.golang.org/protobuf/compiler/protogen"
"google.golang.org/protobuf/types/pluginpb"
)
func main() {
@ -30,6 +31,7 @@ func main() {
return nil
},
}.Run(func(gen *protogen.Plugin) error {
gen.SupportedFeatures = uint64(pluginpb.CodeGeneratorResponse_FEATURE_PROTO3_OPTIONAL)
for _, f := range gen.Files {
if !f.Generate {
continue

View File

@ -16,7 +16,12 @@
package ttrpc
import "errors"
import (
"errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
// ErrProtocol is a general error in the handling the protocol.
@ -32,3 +37,44 @@ var (
// ErrStreamClosed is when the streaming connection is closed.
ErrStreamClosed = errors.New("ttrpc: stream closed")
)
// OversizedMessageErr is used to indicate refusal to send an oversized message.
// It wraps a ResourceExhausted grpc Status together with the offending message
// length.
type OversizedMessageErr struct {
messageLength int
err error
}
// OversizedMessageError returns an OversizedMessageErr error for the given message
// length if it exceeds the allowed maximum. Otherwise a nil error is returned.
func OversizedMessageError(messageLength int) error {
if messageLength <= messageLengthMax {
return nil
}
return &OversizedMessageErr{
messageLength: messageLength,
err: status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", messageLength, messageLengthMax),
}
}
// Error returns the error message for the corresponding grpc Status for the error.
func (e *OversizedMessageErr) Error() string {
return e.err.Error()
}
// Unwrap returns the corresponding error with our grpc status code.
func (e *OversizedMessageErr) Unwrap() error {
return e.err
}
// RejectedLength retrieves the rejected message length which triggered the error.
func (e *OversizedMessageErr) RejectedLength() int {
return e.messageLength
}
// MaximumLength retrieves the maximum allowed message length that triggered the error.
func (*OversizedMessageErr) MaximumLength() int {
return messageLengthMax
}

16
go.mod
View File

@ -1,16 +1,16 @@
module github.com/containerd/ttrpc
go 1.19
go 1.22
require (
github.com/containerd/log v0.1.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/golang/protobuf v1.5.4
github.com/prometheus/procfs v0.6.0
github.com/sirupsen/logrus v1.8.1
golang.org/x/sys v0.13.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d
google.golang.org/grpc v1.57.1
google.golang.org/protobuf v1.31.0
golang.org/x/sys v0.26.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.0
)
require golang.org/x/net v0.17.0 // indirect
require github.com/sirupsen/logrus v1.9.3 // indirect

52
go.sum
View File

@ -1,23 +1,27 @@
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@ -29,22 +33,23 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
@ -53,11 +58,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d h1:pgIUhmqwKOUlnKna4r6amKdUngdL8DrkpFeV8+VBElY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM=
google.golang.org/grpc v1.57.1 h1:upNTNqv0ES+2ZOOqACwVtS3Il8M12/+Hz41RCPzAjQg=
google.golang.org/grpc v1.57.1/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -28,7 +28,7 @@ import (
func TestUnaryClientInterceptor(t *testing.T) {
var (
intercepted = false
interceptor = func(ctx context.Context, req *Request, reply *Response, ci *UnaryClientInfo, i Invoker) error {
interceptor = func(ctx context.Context, req *Request, reply *Response, _ *UnaryClientInfo, i Invoker) error {
intercepted = true
return i(ctx, req, reply)
}
@ -73,7 +73,7 @@ func TestChainUnaryClientInterceptor(t *testing.T) {
orderIdx = 0
recorded = []string{}
intercept = func(idx int, tag string) UnaryClientInterceptor {
return func(ctx context.Context, req *Request, reply *Response, ci *UnaryClientInfo, i Invoker) error {
return func(ctx context.Context, req *Request, reply *Response, _ *UnaryClientInfo, i Invoker) error {
if idx != orderIdx {
t.Fatalf("unexpected interceptor invocation order (%d != %d)", orderIdx, idx)
}

View File

@ -62,6 +62,34 @@ func (m MD) Append(key string, values ...string) {
}
}
// Clone returns a copy of MD or nil if it's nil.
// It's copied from golang's `http.Header.Clone` implementation:
// https://cs.opensource.google/go/go/+/refs/tags/go1.23.4:src/net/http/header.go;l=94
func (m MD) Clone() MD {
if m == nil {
return nil
}
// Find total number of values.
nv := 0
for _, vv := range m {
nv += len(vv)
}
sv := make([]string, nv) // shared backing array for headers' values
m2 := make(MD, len(m))
for k, vv := range m {
if vv == nil {
// Preserve nil values.
m2[k] = nil
continue
}
n := copy(sv, vv)
m2[k] = sv[:n:n]
sv = sv[n:]
}
return m2
}
func (m MD) setRequest(r *Request) {
for k, values := range m {
for _, v := range values {

View File

@ -18,6 +18,8 @@ package ttrpc
import (
"context"
"fmt"
"sync"
"testing"
)
@ -106,3 +108,100 @@ func TestMetadataContext(t *testing.T) {
t.Errorf("invalid metadata value: %q", bar)
}
}
func TestMetadataClone(t *testing.T) {
var metadata MD
m2 := metadata.Clone()
if m2 != nil {
t.Error("MD.Clone() on nil metadata should return nil")
}
metadata = MD{"nil": nil, "foo": {"bar"}, "baz": {"qux", "quxx"}}
m2 = metadata.Clone()
if len(metadata) != len(m2) {
t.Errorf("unexpected number of keys: %d, expected: %d", len(m2), len(metadata))
}
for k, v := range metadata {
v2, ok := m2[k]
if !ok {
t.Errorf("key not found: %s", k)
}
if v == nil && v2 == nil {
continue
}
if v == nil || v2 == nil {
t.Errorf("unexpected nil value: %v, expected: %v", v2, v)
}
if len(v) != len(v2) {
t.Errorf("unexpected number of values: %d, expected: %d", len(v2), len(v))
}
for i := range v {
if v[i] != v2[i] {
t.Errorf("unexpected value: %s, expected: %s", v2[i], v[i])
}
}
}
}
func TestMetadataCloneConcurrent(t *testing.T) {
metadata := make(MD)
metadata.Set("foo", "bar")
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
m2 := metadata.Clone()
m2.Set("foo", "baz")
}()
}
wg.Wait()
// Concurrent modification should clone the metadata first to avoid panic
// due to concurrent map writes.
if val, ok := metadata.Get("foo"); !ok {
t.Error("metadata not found")
} else if val[0] != "bar" {
t.Errorf("invalid metadata value: %q", val[0])
}
}
func simpleClone(src MD) MD {
md := MD{}
for k, v := range src {
md[k] = append(md[k], v...)
}
return md
}
func BenchmarkMetadataClone(b *testing.B) {
for _, sz := range []int{5, 10, 20, 50} {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
metadata := make(MD)
for i := 0; i < sz; i++ {
metadata.Set("foo"+fmt.Sprint(i), "bar"+fmt.Sprint(i))
}
for i := 0; i < b.N; i++ {
_ = metadata.Clone()
}
})
}
}
func BenchmarkSimpleMetadataClone(b *testing.B) {
for _, sz := range []int{5, 10, 20, 50} {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
metadata := make(MD)
for i := 0; i < sz; i++ {
metadata.Set("foo"+fmt.Sprint(i), "bar"+fmt.Sprint(i))
}
for i := 0; i < b.N; i++ {
_ = simpleClone(metadata)
}
})
}
}

View File

@ -27,7 +27,7 @@ import (
"syscall"
"time"
"github.com/sirupsen/logrus"
"github.com/containerd/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@ -74,9 +74,18 @@ func (s *Server) RegisterService(name string, desc *ServiceDesc) {
}
func (s *Server) Serve(ctx context.Context, l net.Listener) error {
s.addListener(l)
s.mu.Lock()
s.addListenerLocked(l)
defer s.closeListener(l)
select {
case <-s.done:
s.mu.Unlock()
return ErrServerClosed
default:
}
s.mu.Unlock()
var (
backoff time.Duration
handshaker = s.config.handshaker
@ -109,7 +118,7 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) error {
}
sleep := time.Duration(rand.Int63n(int64(backoff)))
logrus.WithError(err).Errorf("ttrpc: failed accept; backoff %v", sleep)
log.G(ctx).WithError(err).Errorf("ttrpc: failed accept; backoff %v", sleep)
time.Sleep(sleep)
continue
}
@ -121,14 +130,14 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) error {
approved, handshake, err := handshaker.Handshake(ctx, conn)
if err != nil {
logrus.WithError(err).Error("ttrpc: refusing connection after handshake")
log.G(ctx).WithError(err).Error("ttrpc: refusing connection after handshake")
conn.Close()
continue
}
sc, err := s.newConn(approved, handshake)
if err != nil {
logrus.WithError(err).Error("ttrpc: create connection failed")
log.G(ctx).WithError(err).Error("ttrpc: create connection failed")
conn.Close()
continue
}
@ -188,9 +197,7 @@ func (s *Server) Close() error {
return err
}
func (s *Server) addListener(l net.Listener) {
s.mu.Lock()
defer s.mu.Unlock()
func (s *Server) addListenerLocked(l net.Listener) {
s.listeners[l] = struct{}{}
}
@ -513,12 +520,12 @@ func (c *serverConn) run(sctx context.Context) {
Payload: response.data,
})
if err != nil {
logrus.WithError(err).Error("failed marshaling response")
log.G(ctx).WithError(err).Error("failed marshaling response")
return
}
if err := ch.send(response.id, messageTypeResponse, 0, p); err != nil {
logrus.WithError(err).Error("failed sending message on channel")
log.G(ctx).WithError(err).Error("failed sending message on channel")
return
}
} else {
@ -530,7 +537,7 @@ func (c *serverConn) run(sctx context.Context) {
flags = flags | flagNoData
}
if err := ch.send(response.id, messageTypeData, flags, response.data); err != nil {
logrus.WithError(err).Error("failed sending message on channel")
log.G(ctx).WithError(err).Error("failed sending message on channel")
return
}
}
@ -552,7 +559,7 @@ func (c *serverConn) run(sctx context.Context) {
// requests, so that the client connection is closed
return
}
logrus.WithError(err).Error("error receiving message")
log.G(ctx).WithError(err).Error("error receiving message")
// else, initiate shutdown
case <-shutdown:
return

View File

@ -219,7 +219,7 @@ func TestServerShutdown(t *testing.T) {
// register a service that takes until we tell it to stop
server.Register(serviceName, map[string]Method{
"Test": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
"Test": func(_ context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req internal.TestPayload
if err := unmarshal(&req); err != nil {
return nil, err
@ -298,6 +298,36 @@ func TestServerClose(t *testing.T) {
checkServerShutdown(t, server)
}
func TestImmediateServerShutdown(t *testing.T) {
var (
ctx = context.Background()
server = mustServer(t)(NewServer())
addr, listener = newTestListener(t)
errs = make(chan error, 1)
_, cleanup = newTestClient(t, addr)
)
defer cleanup()
defer listener.Close()
go func() {
time.Sleep(1 * time.Millisecond)
errs <- server.Serve(ctx, listener)
}()
registerTestingService(server, &testingServer{})
if err := server.Shutdown(ctx); err != nil {
t.Fatal(err)
}
select {
case err := <-errs:
if err != ErrServerClosed {
t.Fatal(err)
}
case <-time.After(2 * time.Second):
t.Fatal("retreiving error from server.Shutdown() timed out")
}
}
func TestOversizeCall(t *testing.T) {
var (
ctx = context.Background()
@ -318,7 +348,7 @@ func TestOversizeCall(t *testing.T) {
Foo: strings.Repeat("a", 1+messageLengthMax),
}
if err := client.Call(ctx, serviceName, "Test", tp, tp); err == nil {
t.Fatalf("expected error from non-existent service call")
t.Fatalf("expected error from oversized message")
} else if status, ok := status.FromError(err); !ok {
t.Fatalf("expected status present in error: %v", err)
} else if status.Code() != codes.ResourceExhausted {

View File

@ -38,7 +38,7 @@ func TestStreamClient(t *testing.T) {
desc := &ServiceDesc{
Methods: map[string]Method{
"Echo": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
"Echo": func(_ context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req internal.EchoPayload
if err := unmarshal(&req); err != nil {
return nil, err
@ -49,7 +49,7 @@ func TestStreamClient(t *testing.T) {
},
Streams: map[string]Stream{
"EchoStream": {
Handler: func(ctx context.Context, ss StreamServer) (interface{}, error) {
Handler: func(_ context.Context, ss StreamServer) (interface{}, error) {
for {
var req internal.EchoPayload
if err := ss.RecvMsg(&req); err != nil {