merge master

This commit is contained in:
Yuxuan Li 2017-04-21 16:07:34 -07:00
commit bab6b617b7
9 changed files with 497 additions and 2 deletions

View File

@ -333,7 +333,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
cc.mkp = cc.dopts.copts.KeepaliveParams
grpcUA := "grpc-go/" + Version
if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = newProxyDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
return dialContext(ctx, "tcp", addr)
},
)
}
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {

View File

@ -1,4 +1,4 @@
#gRPC Basics: Go
# gRPC Basics: Go
This tutorial provides a basic Go programmer's introduction to working with gRPC. By walking through this example you'll learn how to:

56
go16.go Normal file
View File

@ -0,0 +1,56 @@
// +build go1.6,!go1.7
/*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package grpc
import (
"fmt"
"net"
"net/http"
"golang.org/x/net/context"
)
// dialContext connects to the address on the named network.
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address)
}
func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
req.Cancel = ctx.Done()
if err := req.Write(conn); err != nil {
return fmt.Errorf("failed to write the HTTP request: %v", err)
}
return nil
}

55
go17.go Normal file
View File

@ -0,0 +1,55 @@
// +build go1.7
/*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package grpc
import (
"net"
"net/http"
"golang.org/x/net/context"
)
// dialContext connects to the address on the named network.
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, address)
}
func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
req = req.WithContext(ctx)
if err := req.Write(conn); err != nil {
return err
}
return nil
}

145
proxy.go Normal file
View File

@ -0,0 +1,145 @@
/*
*
* Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package grpc
import (
"bufio"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"golang.org/x/net/context"
)
var (
// errDisabled indicates that proxy is disabled for the address.
errDisabled = errors.New("proxy is disabled for the address")
// The following variable will be overwritten in the tests.
httpProxyFromEnvironment = http.ProxyFromEnvironment
)
func mapAddress(ctx context.Context, address string) (string, error) {
req := &http.Request{
URL: &url.URL{
Scheme: "https",
Host: address,
},
}
url, err := httpProxyFromEnvironment(req)
if err != nil {
return "", err
}
if url == nil {
return "", errDisabled
}
return url.Host, nil
}
// To read a response from a net.Conn, http.ReadResponse() takes a bufio.Reader.
// It's possible that this reader reads more than what's need for the response and stores
// those bytes in the buffer.
// bufConn wraps the original net.Conn and the bufio.Reader to make sure we don't lose the
// bytes in the buffer.
type bufConn struct {
net.Conn
r io.Reader
}
func (c *bufConn) Read(b []byte) (int, error) {
return c.r.Read(b)
}
func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, addr string) (_ net.Conn, err error) {
defer func() {
if err != nil {
conn.Close()
}
}()
req := (&http.Request{
Method: http.MethodConnect,
URL: &url.URL{Host: addr},
Header: map[string][]string{"User-Agent": {grpcUA}},
})
if err := sendHTTPRequest(ctx, req, conn); err != nil {
return nil, fmt.Errorf("failed to write the HTTP request: %v", err)
}
r := bufio.NewReader(conn)
resp, err := http.ReadResponse(r, req)
if err != nil {
return nil, fmt.Errorf("reading server HTTP response: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
dump, err := httputil.DumpResponse(resp, true)
if err != nil {
return nil, fmt.Errorf("failed to do connect handshake, status code: %s", resp.Status)
}
return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
}
return &bufConn{Conn: conn, r: r}, nil
}
// newProxyDialer returns a dialer that connects to proxy first if necessary.
// The returned dialer checks if a proxy is necessary, dial to the proxy with the
// provided dialer, does HTTP CONNECT handshake and returns the connection.
func newProxyDialer(dialer func(context.Context, string) (net.Conn, error)) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (conn net.Conn, err error) {
var skipHandshake bool
newAddr, err := mapAddress(ctx, addr)
if err != nil {
if err != errDisabled {
return nil, err
}
skipHandshake = true
newAddr = addr
}
conn, err = dialer(ctx, newAddr)
if err != nil {
return
}
if !skipHandshake {
conn, err = doHTTPConnectHandshake(ctx, conn, addr)
}
return
}
}

192
proxy_test.go Normal file
View File

@ -0,0 +1,192 @@
/*
*
* Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package grpc
import (
"bufio"
"io"
"net"
"net/http"
"net/url"
"testing"
"time"
"golang.org/x/net/context"
)
const (
envTestAddr = "1.2.3.4:8080"
envProxyAddr = "2.3.4.5:7687"
)
// overwriteAndRestore overwrite function httpProxyFromEnvironment and
// returns a function to restore the default values.
func overwrite(hpfe func(req *http.Request) (*url.URL, error)) func() {
backHPFE := httpProxyFromEnvironment
httpProxyFromEnvironment = hpfe
return func() {
httpProxyFromEnvironment = backHPFE
}
}
func TestMapAddressEnv(t *testing.T) {
// Overwrite the function in the test and restore them in defer.
hpfe := func(req *http.Request) (*url.URL, error) {
if req.URL.Host == envTestAddr {
return &url.URL{
Scheme: "https",
Host: envProxyAddr,
}, nil
}
return nil, nil
}
defer overwrite(hpfe)()
// envTestAddr should be handled by ProxyFromEnvironment.
got, err := mapAddress(context.Background(), envTestAddr)
if err != nil {
t.Error(err)
}
if got != envProxyAddr {
t.Errorf("want %v, got %v", envProxyAddr, got)
}
}
type proxyServer struct {
t *testing.T
lis net.Listener
in net.Conn
out net.Conn
}
func (p *proxyServer) run() {
in, err := p.lis.Accept()
if err != nil {
return
}
p.in = in
req, err := http.ReadRequest(bufio.NewReader(in))
if err != nil {
p.t.Errorf("failed to read CONNECT req: %v", err)
return
}
if req.Method != http.MethodConnect || req.UserAgent() != grpcUA {
resp := http.Response{StatusCode: http.StatusMethodNotAllowed}
resp.Write(p.in)
p.in.Close()
p.t.Errorf("get wrong CONNECT req: %+v", req)
return
}
out, err := net.Dial("tcp", req.URL.Host)
if err != nil {
p.t.Errorf("failed to dial to server: %v", err)
return
}
resp := http.Response{StatusCode: http.StatusOK, Proto: "HTTP/1.0"}
resp.Write(p.in)
p.out = out
go io.Copy(p.in, p.out)
go io.Copy(p.out, p.in)
}
func (p *proxyServer) stop() {
p.lis.Close()
if p.in != nil {
p.in.Close()
}
if p.out != nil {
p.out.Close()
}
}
func TestHTTPConnect(t *testing.T) {
plis, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
p := &proxyServer{t: t, lis: plis}
go p.run()
defer p.stop()
blis, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
msg := []byte{4, 3, 5, 2}
recvBuf := make([]byte, len(msg), len(msg))
done := make(chan struct{})
go func() {
in, err := blis.Accept()
if err != nil {
t.Errorf("failed to accept: %v", err)
return
}
defer in.Close()
in.Read(recvBuf)
close(done)
}()
// Overwrite the function in the test and restore them in defer.
hpfe := func(req *http.Request) (*url.URL, error) {
return &url.URL{Host: plis.Addr().String()}, nil
}
defer overwrite(hpfe)()
// Dial to proxy server.
dialer := newProxyDialer(func(ctx context.Context, addr string) (net.Conn, error) {
if deadline, ok := ctx.Deadline(); ok {
return net.DialTimeout("tcp", addr, deadline.Sub(time.Now()))
}
return net.Dial("tcp", addr)
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
c, err := dialer(ctx, blis.Addr().String())
if err != nil {
t.Fatalf("http connect Dial failed: %v", err)
}
defer c.Close()
// Send msg on the connection.
c.Write(msg)
<-done
// Check received msg.
if string(recvBuf) != string(msg) {
t.Fatalf("received msg: %v, want %v", recvBuf, msg)
}
}

View File

@ -482,3 +482,5 @@ func min(a, b int) int {
}
return b
}
const grpcUA = "grpc-go/" + Version

View File

@ -268,6 +268,9 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
select {
case <-t.Error():
// Incur transport error, simply exit.
case <-cc.ctx.Done():
cs.finish(ErrClientConnClosing)
cs.closeTransportStream(ErrClientConnClosing)
case <-s.Done():
// TODO: The trace of the RPC is terminated here when there is no pending
// I/O, which is probably not the optimal solution.

View File

@ -1021,6 +1021,41 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
awaitNewConnLogOutput()
}
func TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
if e.name == "handler-tls" {
continue
}
testClientConnCloseAfterGoAwayWithActiveStream(t, e)
}
}
func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
if _, err := tc.FullDuplexCall(context.Background()); err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
}
done := make(chan struct{})
go func() {
te.srv.GracefulStop()
close(done)
}()
time.Sleep(time.Second)
cc.Close()
timeout := time.NewTimer(time.Second)
select {
case <-done:
case <-timeout.C:
t.Fatalf("Test timed-out.")
}
}
func TestFailFast(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {