mirror of https://github.com/grpc/grpc-go.git
rls: use UNAVAILABLE instead of status from control plane (#5400)
This commit is contained in:
parent
a0d5484ee3
commit
34e4fc3bb5
|
|
@ -27,10 +27,12 @@ import (
|
||||||
|
|
||||||
"google.golang.org/grpc/balancer"
|
"google.golang.org/grpc/balancer"
|
||||||
"google.golang.org/grpc/balancer/rls/internal/keys"
|
"google.golang.org/grpc/balancer/rls/internal/keys"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/connectivity"
|
||||||
internalgrpclog "google.golang.org/grpc/internal/grpclog"
|
internalgrpclog "google.golang.org/grpc/internal/grpclog"
|
||||||
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
|
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -129,9 +131,15 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
|
||||||
// We get here only if the data cache entry has expired. If entry is in
|
// We get here only if the data cache entry has expired. If entry is in
|
||||||
// backoff, delegate to default target or fail the pick.
|
// backoff, delegate to default target or fail the pick.
|
||||||
if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) {
|
if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) {
|
||||||
status := dcEntry.status
|
st := dcEntry.status
|
||||||
p.lb.cacheMu.RUnlock()
|
p.lb.cacheMu.RUnlock()
|
||||||
return p.useDefaultPickIfPossible(info, status)
|
|
||||||
|
// Avoid propagating the status code received on control plane RPCs to the
|
||||||
|
// data plane which can lead to unexpected outcomes as we do not control
|
||||||
|
// the status code sent by the control plane. Propagating the status
|
||||||
|
// message received from the control plane is still fine, as it could be
|
||||||
|
// useful for debugging purposes.
|
||||||
|
return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
|
||||||
}
|
}
|
||||||
|
|
||||||
// We get here only if the entry has expired and is not in backoff.
|
// We get here only if the entry has expired and is not in backoff.
|
||||||
|
|
@ -220,7 +228,12 @@ func (p *rlsPicker) sendRequestAndReturnPick(cacheKey cacheKey, bs *backoffState
|
||||||
|
|
||||||
// Entry is in backoff. Delegate to default target or fail the pick.
|
// Entry is in backoff. Delegate to default target or fail the pick.
|
||||||
case dcEntry.backoffState != nil && dcEntry.backoffTime.After(now):
|
case dcEntry.backoffState != nil && dcEntry.backoffTime.After(now):
|
||||||
return p.useDefaultPickIfPossible(info, dcEntry.status)
|
// Avoid propagating the status code received on control plane RPCs to the
|
||||||
|
// data plane which can lead to unexpected outcomes as we do not control
|
||||||
|
// the status code sent by the control plane. Propagating the status
|
||||||
|
// message received from the control plane is still fine, as it could be
|
||||||
|
// useful for debugging purposes.
|
||||||
|
return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", dcEntry.status.Error())))
|
||||||
|
|
||||||
// Entry has expired, but is not in backoff. Send request and queue pick.
|
// Entry has expired, but is not in backoff. Send request and queue pick.
|
||||||
default:
|
default:
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,6 @@ package rls
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -29,6 +28,7 @@ import (
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
|
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
|
||||||
rlstest "google.golang.org/grpc/internal/testutils/rls"
|
rlstest "google.golang.org/grpc/internal/testutils/rls"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
"google.golang.org/protobuf/types/known/durationpb"
|
"google.golang.org/protobuf/types/known/durationpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -512,7 +512,7 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntryInBackoff(t *testing.T
|
||||||
|
|
||||||
// Set up the fake RLS server to return errors. This will push the cache
|
// Set up the fake RLS server to return errors. This will push the cache
|
||||||
// entry into backoff.
|
// entry into backoff.
|
||||||
var rlsLastErr = errors.New("last RLS request failed")
|
var rlsLastErr = status.Error(codes.DeadlineExceeded, "last RLS request failed")
|
||||||
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
|
rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
|
||||||
return &rlstest.RouteLookupResponse{Err: rlsLastErr}
|
return &rlstest.RouteLookupResponse{Err: rlsLastErr}
|
||||||
})
|
})
|
||||||
|
|
@ -524,7 +524,7 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntryInBackoff(t *testing.T
|
||||||
if test.withDefaultTarget {
|
if test.withDefaultTarget {
|
||||||
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
|
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
|
||||||
} else {
|
} else {
|
||||||
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unknown, rlsLastErr)
|
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, rlsLastErr)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue