mirror of https://github.com/grpc/grpc-go.git
bufconn: allow readers to receive data after writers close (#1739)
This commit is contained in:
parent
b0e0950972
commit
2625f0335f
|
@ -101,9 +101,11 @@ type pipe struct {
|
|||
buf []byte
|
||||
w, r int
|
||||
|
||||
wwait sync.Cond
|
||||
rwait sync.Cond
|
||||
closed bool
|
||||
wwait sync.Cond
|
||||
rwait sync.Cond
|
||||
|
||||
closed bool
|
||||
writeClosed bool
|
||||
}
|
||||
|
||||
func newPipe(sz int) *pipe {
|
||||
|
@ -132,6 +134,9 @@ func (p *pipe) Read(b []byte) (n int, err error) {
|
|||
if !p.empty() {
|
||||
break
|
||||
}
|
||||
if p.writeClosed {
|
||||
return 0, io.EOF
|
||||
}
|
||||
p.rwait.Wait()
|
||||
}
|
||||
wasFull := p.full()
|
||||
|
@ -160,7 +165,7 @@ func (p *pipe) Write(b []byte) (n int, err error) {
|
|||
for len(b) > 0 {
|
||||
// Block until p is not full.
|
||||
for {
|
||||
if p.closed {
|
||||
if p.closed || p.writeClosed {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
if !p.full() {
|
||||
|
@ -203,14 +208,24 @@ func (p *pipe) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *pipe) closeWrite() error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.writeClosed = true
|
||||
// Signal all blocked readers and writers to return an error.
|
||||
p.rwait.Broadcast()
|
||||
p.wwait.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
type conn struct {
|
||||
io.ReadCloser
|
||||
io.WriteCloser
|
||||
io.Reader
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (c *conn) Close() error {
|
||||
err1 := c.ReadCloser.Close()
|
||||
err2 := c.WriteCloser.Close()
|
||||
err1 := c.Reader.(*pipe).Close()
|
||||
err2 := c.Writer.(*pipe).closeWrite()
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
|
|
|
@ -94,6 +94,56 @@ func TestConn(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestConnCloseWithData(t *testing.T) {
|
||||
lis := Listen(7)
|
||||
errChan := make(chan error)
|
||||
var lisConn net.Conn
|
||||
go func() {
|
||||
var err error
|
||||
if lisConn, err = lis.Accept(); err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
close(errChan)
|
||||
}()
|
||||
dialConn, err := lis.Dial()
|
||||
if err != nil {
|
||||
t.Fatalf("Dial error: %v", err)
|
||||
}
|
||||
if err := <-errChan; err != nil {
|
||||
t.Fatalf("Listen error: %v", err)
|
||||
}
|
||||
|
||||
// Write some data on both sides of the connection.
|
||||
n, err := dialConn.Write([]byte("hello"))
|
||||
if n != 5 || err != nil {
|
||||
t.Fatalf("dialConn.Write([]byte{\"hello\"}) = %v, %v; want 5, <nil>", n, err)
|
||||
}
|
||||
n, err = lisConn.Write([]byte("hello"))
|
||||
if n != 5 || err != nil {
|
||||
t.Fatalf("lisConn.Write([]byte{\"hello\"}) = %v, %v; want 5, <nil>", n, err)
|
||||
}
|
||||
|
||||
// Close dial-side; writes from either side should fail.
|
||||
dialConn.Close()
|
||||
if _, err := lisConn.Write([]byte("hello")); err != io.ErrClosedPipe {
|
||||
t.Fatalf("lisConn.Write() = _, <nil>; want _, <non-nil>")
|
||||
}
|
||||
if _, err := dialConn.Write([]byte("hello")); err != io.ErrClosedPipe {
|
||||
t.Fatalf("dialConn.Write() = _, <nil>; want _, <non-nil>")
|
||||
}
|
||||
|
||||
// Read from both sides; reads on lisConn should work, but dialConn should
|
||||
// fail.
|
||||
buf := make([]byte, 6)
|
||||
if _, err := dialConn.Read(buf); err != io.ErrClosedPipe {
|
||||
t.Fatalf("dialConn.Read(buf) = %v, %v; want _, io.ErrClosedPipe", n, err)
|
||||
}
|
||||
n, err = lisConn.Read(buf)
|
||||
if n != 5 || err != nil {
|
||||
t.Fatalf("lisConn.Read(buf) = %v, %v; want 5, <nil>", n, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListener(t *testing.T) {
|
||||
l := Listen(7)
|
||||
var s net.Conn
|
||||
|
|
Loading…
Reference in New Issue