Compare commits

...

14 Commits
v1.2.5 ... main

Author SHA1 Message Date
Kazuyoshi Kato ababa3fc18
Merge pull request #178 from djdongjin/bump-versions
Add dependabot and bump up golang and dependency versions
2025-01-14 09:57:47 -08:00
Jin Dong 4729a87007 Upgrade go, dependency, CI versions
Signed-off-by: Jin Dong <djdongjin95@gmail.com>
2024-12-30 01:42:33 +00:00
Fu Wei 3b8c8b7557
Merge pull request #177 from djdongjin/metadata-clone
Add MD.Clone function
2024-12-28 18:29:38 -05:00
Jin Dong 644ecfaa4c Add dependabot CI
Signed-off-by: Jin Dong <djdongjin95@gmail.com>
2024-12-28 01:39:20 +00:00
Jin Dong 430f734791 Add MD.Clone
Signed-off-by: Jin Dong <djdongjin95@gmail.com>
2024-12-27 16:12:45 +00:00
Akihiro Suda b71d9dee11
Merge pull request #175 from klihub/fixes/serve-listen-shutdown-race
server: fix a Serve() vs. (immediate) Shutdown() race
2024-10-29 09:31:08 +09:00
Derek McGowan bcc40a4d69
Merge pull request #171 from klihub/devel/sender-side-oversize-rejection
channel: reject oversized messages on the sender side(, too).
2024-09-26 10:15:50 -07:00
Krisztian Litkey c4d96d55ad
server: fix Serve() vs. immediate Shutdown() race.
Fix a race where an asynchronous server.Serve() invoked in a
a goroutine races with an almost immediate server.Shutdown().
If Shutdown() finishes its locked closing of listeners before
Serve() gets around to add the new one, Serve will sit stuck
forever in l.Accept(), unless the caller closes the listener
in addition to Shutdown().

This is probably almost impossible to trigger in real life,
but some of the unit tests, which run the server and client
in the same process, occasionally do trigger this. Then, if
the test tries to verify a final ErrServerClosed error from
Serve() after Shutdown() it gets stuck forever.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-09-16 09:52:00 +03:00
Krisztian Litkey ed6c3ba082
server_test: add Serve()/Shutdown() race test.
Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-09-16 09:51:56 +03:00
Krisztian Litkey b5cd6e4b32
channel: allow discovery of overflown message size.
Use a dedicated, grpc Status-compatible error to wrap the
unique grpc status code, the size of the rejected message
and the maximum allowed size when a message is rejected
due to size limitations by the sending side.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-27 11:23:20 +03:00
Krisztian Litkey d8c00dfec3
channel_test: update oversize message test.
Co-authored-by: Alessio Cantillo <cantillo.trd@gmail.com>
Co-authored-by: Qian Zhang <cosmoer@qq.com>
Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-21 17:53:44 +03:00
Krisztian Litkey de273bf751
channel: reject oversized messages on the sender side.
Reject oversized messages on the sender side, keeping the
receiver side rejection intact. This should provide minimal
low-level plumbing for clients to attempt application level
corrective actions on the requestor side, if the high-level
protocol is designed with this in mind.

Co-authored-by: Alessio Cantillo <cantillo.trd@gmail.com>
Co-authored-by: Qian Zhang <cosmoer@qq.com>
Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-21 17:52:38 +03:00
Fu Wei 3f02183720
Merge pull request #170 from klihub/fixes/oversized-call-test-errmsg 2024-08-20 08:06:14 +08:00
Krisztian Litkey 84e1784f34
server_test: fix error message in TestOversizeCall.
Fix copy-pasted error message in unit test.

Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
2024-08-19 19:29:04 +03:00
14 changed files with 289 additions and 88 deletions

16
.github/dependabot.yml vendored Normal file
View File

@ -0,0 +1,16 @@
version: 2
updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10
groups:
golang-x:
patterns:
- "golang.org/x/*"
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10

View File

@ -7,7 +7,7 @@ on:
branches: [ main ]
env:
GO_VERSION: 1.20.x
GO_VERSION: 1.23.x
permissions:
contents: read
@ -27,27 +27,21 @@ jobs:
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
- uses: actions/setup-go@v5
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ env.GO_VERSION }}
- name: golangci-lint
uses: golangci/golangci-lint-action@v4
uses: golangci/golangci-lint-action@971e284b6050e8a5849b72094c50ab08da042db8 # v6.1.1
with:
version: v1.51.2
version: v1.60.3
args: --timeout=5m
skip-cache: true
working-directory: src/github.com/containerd/ttrpc
- name: golangci-lint errors
run: golangci-lint run
working-directory: src/github.com/containerd/ttrpc
if: ${{ failure() }}
#
# Project checks
#
@ -57,16 +51,15 @@ jobs:
timeout-minutes: 5
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@v5
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ env.GO_VERSION }}
- uses: containerd/project-checks@v1.1.0
- uses: containerd/project-checks@434a07157608eeaa1d5c8d4dd506154204cd9401 # v1.1.0
with:
working-directory: src/github.com/containerd/ttrpc
@ -78,20 +71,18 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
go: [1.19.x, 1.20.x]
go: [1.22.x, 1.23.x]
name: ${{ matrix.os }} / ${{ matrix.go }}
runs-on: ${{ matrix.os }}
timeout-minutes: 10
steps:
- name: Check out code
uses: actions/checkout@v4
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@v5
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ matrix.go }}
@ -119,13 +110,11 @@ jobs:
timeout-minutes: 5
steps:
- name: Check out code
uses: actions/checkout@v4
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
path: src/github.com/containerd/ttrpc
fetch-depth: 25
- uses: actions/setup-go@v5
- uses: actions/setup-go@3041bf56c941b39c61721a86cd11f3bb1338122a # v5.2.0
with:
go-version: ${{ env.GO_VERSION }}
id: go

View File

@ -6,7 +6,7 @@ linters:
- goimports
- revive
- ineffassign
- vet
- govet
- unused
- misspell
disable:
@ -14,7 +14,7 @@ linters:
linters-settings:
revive:
ignore-generated-headers: true
ignore-generated-header: true
rules:
- name: blank-imports
- name: context-as-argument
@ -45,8 +45,8 @@ linters-settings:
issues:
include:
- EXC0002
exclude-dirs:
- example
run:
timeout: 8m
skip-dirs:
- example

View File

@ -143,10 +143,10 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
}
func (ch *channel) send(streamID uint32, t messageType, flags uint8, p []byte) error {
// TODO: Error on send rather than on recv
//if len(p) > messageLengthMax {
// return status.Errorf(codes.InvalidArgument, "refusing to send, message length %v exceed maximum message size of %v", len(p), messageLengthMax)
//}
if len(p) > messageLengthMax {
return OversizedMessageError(len(p))
}
if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t, Flags: flags}); err != nil {
return err
}

View File

@ -89,21 +89,19 @@ func TestReadWriteMessage(t *testing.T) {
func TestMessageOversize(t *testing.T) {
var (
w, r = net.Pipe()
wch, rch = newChannel(w), newChannel(r)
msg = bytes.Repeat([]byte("a message of massive length"), 512<<10)
errs = make(chan error, 1)
w, _ = net.Pipe()
wch = newChannel(w)
msg = bytes.Repeat([]byte("a message of massive length"), 512<<10)
errs = make(chan error, 1)
)
go func() {
if err := wch.send(1, 1, 0, msg); err != nil {
errs <- err
}
errs <- wch.send(1, 1, 0, msg)
}()
_, _, err := rch.recv()
err := <-errs
if err == nil {
t.Fatalf("error expected reading with small buffer")
t.Fatalf("sending oversized message expected to fail")
}
status, ok := status.FromError(err)
@ -114,12 +112,4 @@ func TestMessageOversize(t *testing.T) {
if status.Code() != codes.ResourceExhausted {
t.Fatalf("expected grpc status code: %v != %v", status.Code(), codes.ResourceExhausted)
}
select {
case err := <-errs:
if err != nil {
t.Fatal(err)
}
default:
}
}

View File

@ -16,7 +16,12 @@
package ttrpc
import "errors"
import (
"errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
// ErrProtocol is a general error in the handling the protocol.
@ -32,3 +37,44 @@ var (
// ErrStreamClosed is when the streaming connection is closed.
ErrStreamClosed = errors.New("ttrpc: stream closed")
)
// OversizedMessageErr is used to indicate refusal to send an oversized message.
// It wraps a ResourceExhausted grpc Status together with the offending message
// length.
type OversizedMessageErr struct {
messageLength int
err error
}
// OversizedMessageError returns an OversizedMessageErr error for the given message
// length if it exceeds the allowed maximum. Otherwise a nil error is returned.
func OversizedMessageError(messageLength int) error {
if messageLength <= messageLengthMax {
return nil
}
return &OversizedMessageErr{
messageLength: messageLength,
err: status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", messageLength, messageLengthMax),
}
}
// Error returns the error message for the corresponding grpc Status for the error.
func (e *OversizedMessageErr) Error() string {
return e.err.Error()
}
// Unwrap returns the corresponding error with our grpc status code.
func (e *OversizedMessageErr) Unwrap() error {
return e.err
}
// RejectedLength retrieves the rejected message length which triggered the error.
func (e *OversizedMessageErr) RejectedLength() int {
return e.messageLength
}
// MaximumLength retrieves the maximum allowed message length that triggered the error.
func (*OversizedMessageErr) MaximumLength() int {
return messageLengthMax
}

17
go.mod
View File

@ -1,19 +1,16 @@
module github.com/containerd/ttrpc
go 1.19
go 1.22
require (
github.com/containerd/log v0.1.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/golang/protobuf v1.5.4
github.com/prometheus/procfs v0.6.0
golang.org/x/sys v0.18.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d
google.golang.org/grpc v1.57.1
google.golang.org/protobuf v1.33.0
golang.org/x/sys v0.26.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.0
)
require (
github.com/sirupsen/logrus v1.9.3 // indirect
golang.org/x/net v0.23.0 // indirect
)
require github.com/sirupsen/logrus v1.9.3 // indirect

35
go.sum
View File

@ -5,12 +5,11 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@ -22,6 +21,7 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@ -33,8 +33,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -44,11 +44,12 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
@ -57,14 +58,12 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d h1:pgIUhmqwKOUlnKna4r6amKdUngdL8DrkpFeV8+VBElY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM=
google.golang.org/grpc v1.57.1 h1:upNTNqv0ES+2ZOOqACwVtS3Il8M12/+Hz41RCPzAjQg=
google.golang.org/grpc v1.57.1/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -28,7 +28,7 @@ import (
func TestUnaryClientInterceptor(t *testing.T) {
var (
intercepted = false
interceptor = func(ctx context.Context, req *Request, reply *Response, ci *UnaryClientInfo, i Invoker) error {
interceptor = func(ctx context.Context, req *Request, reply *Response, _ *UnaryClientInfo, i Invoker) error {
intercepted = true
return i(ctx, req, reply)
}
@ -73,7 +73,7 @@ func TestChainUnaryClientInterceptor(t *testing.T) {
orderIdx = 0
recorded = []string{}
intercept = func(idx int, tag string) UnaryClientInterceptor {
return func(ctx context.Context, req *Request, reply *Response, ci *UnaryClientInfo, i Invoker) error {
return func(ctx context.Context, req *Request, reply *Response, _ *UnaryClientInfo, i Invoker) error {
if idx != orderIdx {
t.Fatalf("unexpected interceptor invocation order (%d != %d)", orderIdx, idx)
}

View File

@ -62,6 +62,34 @@ func (m MD) Append(key string, values ...string) {
}
}
// Clone returns a copy of MD or nil if it's nil.
// It's copied from golang's `http.Header.Clone` implementation:
// https://cs.opensource.google/go/go/+/refs/tags/go1.23.4:src/net/http/header.go;l=94
func (m MD) Clone() MD {
if m == nil {
return nil
}
// Find total number of values.
nv := 0
for _, vv := range m {
nv += len(vv)
}
sv := make([]string, nv) // shared backing array for headers' values
m2 := make(MD, len(m))
for k, vv := range m {
if vv == nil {
// Preserve nil values.
m2[k] = nil
continue
}
n := copy(sv, vv)
m2[k] = sv[:n:n]
sv = sv[n:]
}
return m2
}
func (m MD) setRequest(r *Request) {
for k, values := range m {
for _, v := range values {

View File

@ -18,6 +18,8 @@ package ttrpc
import (
"context"
"fmt"
"sync"
"testing"
)
@ -106,3 +108,100 @@ func TestMetadataContext(t *testing.T) {
t.Errorf("invalid metadata value: %q", bar)
}
}
func TestMetadataClone(t *testing.T) {
var metadata MD
m2 := metadata.Clone()
if m2 != nil {
t.Error("MD.Clone() on nil metadata should return nil")
}
metadata = MD{"nil": nil, "foo": {"bar"}, "baz": {"qux", "quxx"}}
m2 = metadata.Clone()
if len(metadata) != len(m2) {
t.Errorf("unexpected number of keys: %d, expected: %d", len(m2), len(metadata))
}
for k, v := range metadata {
v2, ok := m2[k]
if !ok {
t.Errorf("key not found: %s", k)
}
if v == nil && v2 == nil {
continue
}
if v == nil || v2 == nil {
t.Errorf("unexpected nil value: %v, expected: %v", v2, v)
}
if len(v) != len(v2) {
t.Errorf("unexpected number of values: %d, expected: %d", len(v2), len(v))
}
for i := range v {
if v[i] != v2[i] {
t.Errorf("unexpected value: %s, expected: %s", v2[i], v[i])
}
}
}
}
func TestMetadataCloneConcurrent(t *testing.T) {
metadata := make(MD)
metadata.Set("foo", "bar")
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
m2 := metadata.Clone()
m2.Set("foo", "baz")
}()
}
wg.Wait()
// Concurrent modification should clone the metadata first to avoid panic
// due to concurrent map writes.
if val, ok := metadata.Get("foo"); !ok {
t.Error("metadata not found")
} else if val[0] != "bar" {
t.Errorf("invalid metadata value: %q", val[0])
}
}
func simpleClone(src MD) MD {
md := MD{}
for k, v := range src {
md[k] = append(md[k], v...)
}
return md
}
func BenchmarkMetadataClone(b *testing.B) {
for _, sz := range []int{5, 10, 20, 50} {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
metadata := make(MD)
for i := 0; i < sz; i++ {
metadata.Set("foo"+fmt.Sprint(i), "bar"+fmt.Sprint(i))
}
for i := 0; i < b.N; i++ {
_ = metadata.Clone()
}
})
}
}
func BenchmarkSimpleMetadataClone(b *testing.B) {
for _, sz := range []int{5, 10, 20, 50} {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
metadata := make(MD)
for i := 0; i < sz; i++ {
metadata.Set("foo"+fmt.Sprint(i), "bar"+fmt.Sprint(i))
}
for i := 0; i < b.N; i++ {
_ = simpleClone(metadata)
}
})
}
}

View File

@ -74,9 +74,18 @@ func (s *Server) RegisterService(name string, desc *ServiceDesc) {
}
func (s *Server) Serve(ctx context.Context, l net.Listener) error {
s.addListener(l)
s.mu.Lock()
s.addListenerLocked(l)
defer s.closeListener(l)
select {
case <-s.done:
s.mu.Unlock()
return ErrServerClosed
default:
}
s.mu.Unlock()
var (
backoff time.Duration
handshaker = s.config.handshaker
@ -188,9 +197,7 @@ func (s *Server) Close() error {
return err
}
func (s *Server) addListener(l net.Listener) {
s.mu.Lock()
defer s.mu.Unlock()
func (s *Server) addListenerLocked(l net.Listener) {
s.listeners[l] = struct{}{}
}

View File

@ -219,7 +219,7 @@ func TestServerShutdown(t *testing.T) {
// register a service that takes until we tell it to stop
server.Register(serviceName, map[string]Method{
"Test": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
"Test": func(_ context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req internal.TestPayload
if err := unmarshal(&req); err != nil {
return nil, err
@ -298,6 +298,36 @@ func TestServerClose(t *testing.T) {
checkServerShutdown(t, server)
}
func TestImmediateServerShutdown(t *testing.T) {
var (
ctx = context.Background()
server = mustServer(t)(NewServer())
addr, listener = newTestListener(t)
errs = make(chan error, 1)
_, cleanup = newTestClient(t, addr)
)
defer cleanup()
defer listener.Close()
go func() {
time.Sleep(1 * time.Millisecond)
errs <- server.Serve(ctx, listener)
}()
registerTestingService(server, &testingServer{})
if err := server.Shutdown(ctx); err != nil {
t.Fatal(err)
}
select {
case err := <-errs:
if err != ErrServerClosed {
t.Fatal(err)
}
case <-time.After(2 * time.Second):
t.Fatal("retreiving error from server.Shutdown() timed out")
}
}
func TestOversizeCall(t *testing.T) {
var (
ctx = context.Background()
@ -318,7 +348,7 @@ func TestOversizeCall(t *testing.T) {
Foo: strings.Repeat("a", 1+messageLengthMax),
}
if err := client.Call(ctx, serviceName, "Test", tp, tp); err == nil {
t.Fatalf("expected error from non-existent service call")
t.Fatalf("expected error from oversized message")
} else if status, ok := status.FromError(err); !ok {
t.Fatalf("expected status present in error: %v", err)
} else if status.Code() != codes.ResourceExhausted {

View File

@ -38,7 +38,7 @@ func TestStreamClient(t *testing.T) {
desc := &ServiceDesc{
Methods: map[string]Method{
"Echo": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
"Echo": func(_ context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req internal.EchoPayload
if err := unmarshal(&req); err != nil {
return nil, err
@ -49,7 +49,7 @@ func TestStreamClient(t *testing.T) {
},
Streams: map[string]Stream{
"EchoStream": {
Handler: func(ctx context.Context, ss StreamServer) (interface{}, error) {
Handler: func(_ context.Context, ss StreamServer) (interface{}, error) {
for {
var req internal.EchoPayload
if err := ss.RecvMsg(&req); err != nil {