Add wait() method to block until Sidecar is up. (#329)

* Add wait() method to block until Sidecar is up.

App might depend on sidecar right away. This PR adds a Wait() method to
enable app to wait for sidecar to be up before invoking the first call.

GRPC client creation on Dapr Go SDK is blocking, so waiting for client
readiness  is less of a problem here than on SDKs where client
connection establishment is async.

Closes #287

Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>

* Move Wait its own file.

If anything, this will make testing and the change more localized.

Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>

* Adding unresponsive TCP and Unix servers and tests

Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>

* Remove comments, clean code up

Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>

* Remove a bit of code duplication on tests

Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>

* Fix Wait and test server setup.

* Multiple state changes can happen for a single GRPC Connection.
  previous code assume a single one and was failing miserably. Fixed.
* The logic for the test server's tear down was lacking. Fixed

Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>

* Rename on aux. method

Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>

* Add link to gRPC documentation about connectivity semantics

Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>

* Fixing lint errors

Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>

* Fixing more lint errors

Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>

Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>
This commit is contained in:
Tiago Alves Macambira 2022-11-02 10:21:15 -07:00 committed by GitHub
parent b465b1fa07
commit c2dfec6abf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 212 additions and 0 deletions

View File

@ -144,6 +144,9 @@ type Client interface {
// Shutdown the sidecar.
Shutdown(ctx context.Context) error
// Wait for a sidecar to become available for at most `timeout` seconds. Returns errWaitTimedOut if timeout is reached.
Wait(ctx context.Context, timeout time.Duration) error
// WithTraceID adds existing trace ID to the outgoing context.
WithTraceID(ctx context.Context, id string) context.Context

52
client/wait.go Normal file
View File

@ -0,0 +1,52 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"errors"
"time"
"google.golang.org/grpc/connectivity"
)
// The following errors are returned from Wait.
var (
// A call to Wait timed out while waiting for a gRPC connection to reach a Ready state.
errWaitTimedOut = errors.New("timed out waiting for client connectivity")
)
func (c *GRPCClient) Wait(ctx context.Context, timeout time.Duration) error {
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// SDKs for other languages implement Wait by attempting to connect to a TCP endpoint
// with a timeout. Go's SDKs handles more endpoints than just TCP ones. To simplify
// the code here, we rely on GRPCs connectivity state management instead.
// See https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
for {
curState := c.connection.GetState()
if curState == connectivity.Ready {
return nil
}
select {
case <-timeoutCtx.Done():
return errWaitTimedOut
default:
// Multiple state changes can happen: keep waiting for a successful one or time out
c.connection.WaitForStateChange(timeoutCtx, curState)
}
}
}

157
client/wait_test.go Normal file
View File

@ -0,0 +1,157 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"net"
"os"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const (
unresponsiveServerHost = "127.0.0.1"
unresponsiveTCPPort = "0" // Port set to 0 so O.S. auto-selects one for us
unresponsiveUnixSocketFilePath = "/tmp/unresponsive-server.socket"
waitTimeout = 5 * time.Second
connectionTimeout = 4 * waitTimeout // Larger than waitTimeout but still bounded
autoCloseTimeout = 2 * connectionTimeout // Server will close connections after this
)
type Server struct {
listener net.Listener
address string
done chan bool
nClientsSeen uint64
}
func (s *Server) Close() {
close(s.done)
if err := s.listener.Close(); err != nil {
logger.Fatal(err)
}
os.Remove(unresponsiveUnixSocketFilePath)
}
func (s *Server) listenButKeepSilent() {
for {
conn, err := s.listener.Accept() // Accept connections but that's it!
if err != nil {
select {
case <-s.done:
return
default:
logger.Fatal(err)
break
}
} else {
go func(conn net.Conn) {
atomic.AddUint64(&s.nClientsSeen, 1)
time.Sleep(autoCloseTimeout)
conn.Close()
}(conn)
}
}
}
func createUnresponsiveTCPServer() (*Server, error) {
return createUnresponsiveServer("tcp", net.JoinHostPort(unresponsiveServerHost, unresponsiveTCPPort))
}
func createUnresponsiveUnixServer() (*Server, error) {
return createUnresponsiveServer("unix", unresponsiveUnixSocketFilePath)
}
func createUnresponsiveServer(network string, unresponsiveServerAddress string) (*Server, error) {
serverListener, err := net.Listen(network, unresponsiveServerAddress)
if err != nil {
logger.Fatalf("Creation of test server on network %s and address %s failed with error: %v",
network, unresponsiveServerAddress, err)
return nil, err
}
server := &Server{
listener: serverListener,
address: serverListener.Addr().String(),
done: make(chan bool),
nClientsSeen: 0,
}
go server.listenButKeepSilent()
return server, nil
}
func createNonBlockingClient(ctx context.Context, serverAddr string) (client Client, err error) {
conn, err := grpc.DialContext(
ctx,
serverAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
logger.Fatal(err)
return nil, err
}
return NewClientWithConnection(conn), nil
}
func TestGrpcWaitHappyCase(t *testing.T) {
ctx := context.Background()
err := testClient.Wait(ctx, waitTimeout)
assert.NoError(t, err)
}
func TestGrpcWaitUnresponsiveTcpServer(t *testing.T) {
ctx := context.Background()
server, err := createUnresponsiveTCPServer()
assert.NoError(t, err)
defer server.Close()
clientConnectionTimeoutCtx, cancel := context.WithTimeout(ctx, connectionTimeout)
defer cancel()
client, err := createNonBlockingClient(clientConnectionTimeoutCtx, server.address)
assert.NoError(t, err)
err = client.Wait(ctx, waitTimeout)
assert.Error(t, err)
assert.Equal(t, errWaitTimedOut, err)
assert.Equal(t, uint64(1), atomic.LoadUint64(&server.nClientsSeen))
}
func TestGrpcWaitUnresponsiveUnixServer(t *testing.T) {
ctx := context.Background()
server, err := createUnresponsiveUnixServer()
assert.NoError(t, err)
defer server.Close()
clientConnectionTimeoutCtx, cancel := context.WithTimeout(ctx, connectionTimeout)
defer cancel()
client, err := createNonBlockingClient(clientConnectionTimeoutCtx, "unix://"+server.address)
assert.NoError(t, err)
err = client.Wait(ctx, waitTimeout)
assert.Error(t, err)
assert.Equal(t, errWaitTimedOut, err)
assert.Equal(t, uint64(1), atomic.LoadUint64(&server.nClientsSeen))
}