mirror of https://github.com/grpc/grpc-dotnet.git
Compare commits
15 Commits
master
...
load-balan
Author | SHA1 | Date |
---|---|---|
|
e7abb90957 | |
|
415410bc81 | |
|
1ad9bca016 | |
|
86e3724224 | |
|
fa1ddabf04 | |
|
c952c57e31 | |
|
36f191f27c | |
|
8ed7f0f6d7 | |
|
c299221e32 | |
|
aa389f70d4 | |
|
62e92fe42a | |
|
e03990a16a | |
|
6105cfe489 | |
|
ec478afa4b | |
|
12571ad852 |
|
@ -114,6 +114,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.AspNetCore.Web", "src\
|
|||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.AspNetCore.HealthChecks", "src\Grpc.AspNetCore.HealthChecks\Grpc.AspNetCore.HealthChecks.csproj", "{39A9F2B5-2541-423E-83C9-46C7BFF53F41}"
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Net.Client.LoadBalancing", "src\Grpc.Net.Client.LoadBalancing\Grpc.Net.Client.LoadBalancing.csproj", "{0F42C32D-BAE2-451B-8BE9-17BF995AE101}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Grpc.Net.Client.LoadBalancing.Tests", "test\Grpc.Net.Client.LoadBalancing.Tests\Grpc.Net.Client.LoadBalancing.Tests.csproj", "{78564304-153B-41CF-94F3-B34C5700D9F4}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
|
@ -224,6 +228,14 @@ Global
|
|||
{39A9F2B5-2541-423E-83C9-46C7BFF53F41}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{39A9F2B5-2541-423E-83C9-46C7BFF53F41}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{39A9F2B5-2541-423E-83C9-46C7BFF53F41}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{0F42C32D-BAE2-451B-8BE9-17BF995AE101}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{0F42C32D-BAE2-451B-8BE9-17BF995AE101}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{0F42C32D-BAE2-451B-8BE9-17BF995AE101}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{0F42C32D-BAE2-451B-8BE9-17BF995AE101}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{78564304-153B-41CF-94F3-B34C5700D9F4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{78564304-153B-41CF-94F3-B34C5700D9F4}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{78564304-153B-41CF-94F3-B34C5700D9F4}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{78564304-153B-41CF-94F3-B34C5700D9F4}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
|
@ -264,6 +276,8 @@ Global
|
|||
{429EB088-94FF-4F06-8E54-72157089C8C3} = {8C62055F-8CD7-4859-9001-634D544DF2AE}
|
||||
{778DB6EE-E5B2-4875-A209-40010B5A3E21} = {8C62055F-8CD7-4859-9001-634D544DF2AE}
|
||||
{39A9F2B5-2541-423E-83C9-46C7BFF53F41} = {8C62055F-8CD7-4859-9001-634D544DF2AE}
|
||||
{0F42C32D-BAE2-451B-8BE9-17BF995AE101} = {8C62055F-8CD7-4859-9001-634D544DF2AE}
|
||||
{78564304-153B-41CF-94F3-B34C5700D9F4} = {CECC4AE8-9C4E-4727-939B-517CC2E58D65}
|
||||
EndGlobalSection
|
||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||
SolutionGuid = {CD5C2B19-49B4-480A-990C-36D98A719B07}
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<Description>.NET client for gRPC</Description>
|
||||
<PackageTags>gRPC RPC HTTP/2 Load Balancing</PackageTags>
|
||||
|
||||
<IsGrpcPublishedPackage>true</IsGrpcPublishedPackage>
|
||||
<GenerateDocumentationFile>true</GenerateDocumentationFile>
|
||||
<TargetFramework>netstandard2.1</TargetFramework>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Google.Protobuf" Version="3.11.4" />
|
||||
<PackageReference Include="DnsClient" Version="1.2.0" />
|
||||
<PackageReference Include="System.Text.Json" Version="4.7.1" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Grpc.Net.Client\Grpc.Net.Client.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,137 @@
|
|||
// <auto-generated>
|
||||
// Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
// source: Protos/load_balancer.proto
|
||||
// </auto-generated>
|
||||
// Original file comments:
|
||||
// Copyright 2015 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// This file defines the GRPCLB LoadBalancing protocol.
|
||||
//
|
||||
// The canonical version of this proto can be found at
|
||||
// https://github.com/grpc/grpc-proto/blob/master/grpc/lb/v1/load_balancer.proto
|
||||
#pragma warning disable 0414, 1591
|
||||
#region Designer generated code
|
||||
|
||||
using grpc = global::Grpc.Core;
|
||||
|
||||
namespace Grpc.Lb.V1 {
|
||||
public static partial class LoadBalancer
|
||||
{
|
||||
static readonly string __ServiceName = "grpc.lb.v1.LoadBalancer";
|
||||
|
||||
static readonly grpc::Marshaller<global::Grpc.Lb.V1.LoadBalanceRequest> __Marshaller_grpc_lb_v1_LoadBalanceRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Lb.V1.LoadBalanceRequest.Parser.ParseFrom);
|
||||
static readonly grpc::Marshaller<global::Grpc.Lb.V1.LoadBalanceResponse> __Marshaller_grpc_lb_v1_LoadBalanceResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Lb.V1.LoadBalanceResponse.Parser.ParseFrom);
|
||||
|
||||
static readonly grpc::Method<global::Grpc.Lb.V1.LoadBalanceRequest, global::Grpc.Lb.V1.LoadBalanceResponse> __Method_BalanceLoad = new grpc::Method<global::Grpc.Lb.V1.LoadBalanceRequest, global::Grpc.Lb.V1.LoadBalanceResponse>(
|
||||
grpc::MethodType.DuplexStreaming,
|
||||
__ServiceName,
|
||||
"BalanceLoad",
|
||||
__Marshaller_grpc_lb_v1_LoadBalanceRequest,
|
||||
__Marshaller_grpc_lb_v1_LoadBalanceResponse);
|
||||
|
||||
/// <summary>Service descriptor</summary>
|
||||
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
|
||||
{
|
||||
get { return global::Grpc.Lb.V1.LoadBalancerReflection.Descriptor.Services[0]; }
|
||||
}
|
||||
|
||||
/// <summary>Base class for server-side implementations of LoadBalancer</summary>
|
||||
[grpc::BindServiceMethod(typeof(LoadBalancer), "BindService")]
|
||||
public abstract partial class LoadBalancerBase
|
||||
{
|
||||
/// <summary>
|
||||
/// Bidirectional rpc to get a list of servers.
|
||||
/// </summary>
|
||||
/// <param name="requestStream">Used for reading requests from the client.</param>
|
||||
/// <param name="responseStream">Used for sending responses back to the client.</param>
|
||||
/// <param name="context">The context of the server-side call handler being invoked.</param>
|
||||
/// <returns>A task indicating completion of the handler.</returns>
|
||||
public virtual global::System.Threading.Tasks.Task BalanceLoad(grpc::IAsyncStreamReader<global::Grpc.Lb.V1.LoadBalanceRequest> requestStream, grpc::IServerStreamWriter<global::Grpc.Lb.V1.LoadBalanceResponse> responseStream, grpc::ServerCallContext context)
|
||||
{
|
||||
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Client for LoadBalancer</summary>
|
||||
public partial class LoadBalancerClient : grpc::ClientBase<LoadBalancerClient>
|
||||
{
|
||||
/// <summary>Creates a new client for LoadBalancer</summary>
|
||||
/// <param name="channel">The channel to use to make remote calls.</param>
|
||||
public LoadBalancerClient(grpc::ChannelBase channel) : base(channel)
|
||||
{
|
||||
}
|
||||
/// <summary>Creates a new client for LoadBalancer that uses a custom <c>CallInvoker</c>.</summary>
|
||||
/// <param name="callInvoker">The callInvoker to use to make remote calls.</param>
|
||||
public LoadBalancerClient(grpc::CallInvoker callInvoker) : base(callInvoker)
|
||||
{
|
||||
}
|
||||
/// <summary>Protected parameterless constructor to allow creation of test doubles.</summary>
|
||||
protected LoadBalancerClient() : base()
|
||||
{
|
||||
}
|
||||
/// <summary>Protected constructor to allow creation of configured clients.</summary>
|
||||
/// <param name="configuration">The client configuration.</param>
|
||||
protected LoadBalancerClient(ClientBaseConfiguration configuration) : base(configuration)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bidirectional rpc to get a list of servers.
|
||||
/// </summary>
|
||||
/// <param name="headers">The initial metadata to send with the call. This parameter is optional.</param>
|
||||
/// <param name="deadline">An optional deadline for the call. The call will be cancelled if deadline is hit.</param>
|
||||
/// <param name="cancellationToken">An optional token for canceling the call.</param>
|
||||
/// <returns>The call object.</returns>
|
||||
public virtual grpc::AsyncDuplexStreamingCall<global::Grpc.Lb.V1.LoadBalanceRequest, global::Grpc.Lb.V1.LoadBalanceResponse> BalanceLoad(grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
|
||||
{
|
||||
return BalanceLoad(new grpc::CallOptions(headers, deadline, cancellationToken));
|
||||
}
|
||||
/// <summary>
|
||||
/// Bidirectional rpc to get a list of servers.
|
||||
/// </summary>
|
||||
/// <param name="options">The options for the call.</param>
|
||||
/// <returns>The call object.</returns>
|
||||
public virtual grpc::AsyncDuplexStreamingCall<global::Grpc.Lb.V1.LoadBalanceRequest, global::Grpc.Lb.V1.LoadBalanceResponse> BalanceLoad(grpc::CallOptions options)
|
||||
{
|
||||
return CallInvoker.AsyncDuplexStreamingCall(__Method_BalanceLoad, null, options);
|
||||
}
|
||||
/// <summary>Creates a new instance of client from given <c>ClientBaseConfiguration</c>.</summary>
|
||||
protected override LoadBalancerClient NewInstance(ClientBaseConfiguration configuration)
|
||||
{
|
||||
return new LoadBalancerClient(configuration);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Creates service definition that can be registered with a server</summary>
|
||||
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
|
||||
public static grpc::ServerServiceDefinition BindService(LoadBalancerBase serviceImpl)
|
||||
{
|
||||
return grpc::ServerServiceDefinition.CreateBuilder()
|
||||
.AddMethod(__Method_BalanceLoad, serviceImpl.BalanceLoad).Build();
|
||||
}
|
||||
|
||||
/// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
|
||||
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.</summary>
|
||||
/// <param name="serviceBinder">Service methods will be bound by calling <c>AddMethod</c> on this object.</param>
|
||||
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
|
||||
public static void BindService(grpc::ServiceBinderBase serviceBinder, LoadBalancerBase serviceImpl)
|
||||
{
|
||||
serviceBinder.AddMethod(__Method_BalanceLoad, serviceImpl == null ? null : new grpc::DuplexStreamingServerMethod<global::Grpc.Lb.V1.LoadBalanceRequest, global::Grpc.Lb.V1.LoadBalanceResponse>(serviceImpl.BalanceLoad));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
#endregion
|
|
@ -0,0 +1,18 @@
|
|||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using Grpc.Core;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Policies.Abstraction
|
||||
{
|
||||
/// <summary>
|
||||
/// This abstraction was added to the code base to make policies easy to mock in testing scenarios.
|
||||
/// </summary>
|
||||
internal interface IAsyncDuplexStreamingCall<TRequest, TResponse> : IDisposable
|
||||
{
|
||||
public IAsyncStreamReader<TResponse> ResponseStream { get; }
|
||||
public IClientStreamWriter<TRequest> RequestStream { get; }
|
||||
public Task<Metadata> ResponseHeadersAsync { get; }
|
||||
public Status GetStatus();
|
||||
public Metadata GetTrailers();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
using System;
|
||||
using System.Threading;
|
||||
using Grpc.Core;
|
||||
using Grpc.Lb.V1;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Policies.Abstraction
|
||||
{
|
||||
/// <summary>
|
||||
/// This abstraction was added to the code base to make policies easy to mock in testing scenarios.
|
||||
/// </summary>
|
||||
internal interface ILoadBalancerClient : IDisposable
|
||||
{
|
||||
public IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse> BalanceLoad(Metadata? headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default);
|
||||
public IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse> BalanceLoad(CallOptions options);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
using System.Threading.Tasks;
|
||||
using Grpc.Core;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Policies.Abstraction
|
||||
{
|
||||
/// <summary>
|
||||
/// This class wrap and delegate AsyncDuplexStreamingCall.
|
||||
/// The reason why it was added described here <seealso cref="IAsyncDuplexStreamingCall{TRequest, TResponse}"/>
|
||||
/// </summary>
|
||||
internal sealed class WrappedAsyncDuplexStreamingCall<TRequest, TResponse> : IAsyncDuplexStreamingCall<TRequest, TResponse>
|
||||
{
|
||||
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _asyncDuplexStreamingCall;
|
||||
|
||||
public WrappedAsyncDuplexStreamingCall(AsyncDuplexStreamingCall<TRequest, TResponse> asyncDuplexStreamingCall)
|
||||
{
|
||||
_asyncDuplexStreamingCall = asyncDuplexStreamingCall;
|
||||
}
|
||||
public IAsyncStreamReader<TResponse> ResponseStream => _asyncDuplexStreamingCall.ResponseStream;
|
||||
|
||||
public IClientStreamWriter<TRequest> RequestStream => _asyncDuplexStreamingCall.RequestStream;
|
||||
|
||||
public Task<Metadata> ResponseHeadersAsync => _asyncDuplexStreamingCall.ResponseHeadersAsync;
|
||||
|
||||
public void Dispose() => _asyncDuplexStreamingCall.Dispose();
|
||||
|
||||
public Status GetStatus() => _asyncDuplexStreamingCall.GetStatus();
|
||||
|
||||
public Metadata GetTrailers() => _asyncDuplexStreamingCall.GetTrailers();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using Grpc.Core;
|
||||
using Grpc.Lb.V1;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Policies.Abstraction
|
||||
{
|
||||
/// <summary>
|
||||
/// This class wrap and delegate LoadBalancerClient.
|
||||
/// The reason why it was added described here <seealso cref="ILoadBalancerClient"/>
|
||||
/// </summary>
|
||||
internal sealed class WrappedLoadBalancerClient : ILoadBalancerClient
|
||||
{
|
||||
private readonly GrpcChannel _channelForLB;
|
||||
private readonly LoadBalancer.LoadBalancerClient _loadBalancerClient;
|
||||
public WrappedLoadBalancerClient(List<GrpcNameResolutionResult> resolutionResult, GrpcChannelOptions channelOptionsForLB)
|
||||
{
|
||||
_channelForLB = GrpcChannel.ForAddress($"http://{resolutionResult[0].Host}:{resolutionResult[0].Port}", channelOptionsForLB);
|
||||
_loadBalancerClient = new LoadBalancer.LoadBalancerClient(_channelForLB);
|
||||
}
|
||||
|
||||
public IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse> BalanceLoad(CallOptions options)
|
||||
{
|
||||
return new WrappedAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse>(_loadBalancerClient.BalanceLoad(options));
|
||||
}
|
||||
|
||||
public IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse> BalanceLoad(Metadata? headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default)
|
||||
{
|
||||
return new WrappedAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse>(_loadBalancerClient.BalanceLoad(headers, deadline, cancellationToken));
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_channelForLB.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,190 @@
|
|||
using Microsoft.Extensions.Logging;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using Grpc.Lb.V1;
|
||||
using System.Threading;
|
||||
using System.Net;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using System.Linq;
|
||||
using Grpc.Net.Client.LoadBalancing.Policies.Abstraction;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Policies
|
||||
{
|
||||
/// <summary>
|
||||
/// The load balancing policy creates a subchannel to each server address.
|
||||
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
|
||||
///
|
||||
/// Official name of this policy is "grpclb". It is a implementation of an external load balancing also called lookaside or one-arm loadbalancing.
|
||||
/// More: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md#external-load-balancing-service
|
||||
/// </summary>
|
||||
public sealed class GrpclbPolicy : IGrpcLoadBalancingPolicy
|
||||
{
|
||||
private TimeSpan _clientStatsReportInterval = TimeSpan.FromSeconds(10);
|
||||
private bool _isSecureConnection = false;
|
||||
private int _requestsCounter = 0;
|
||||
private int _subChannelsSelectionCounter = -1;
|
||||
private ILogger _logger = NullLogger.Instance;
|
||||
private ILoggerFactory _loggerFactory = NullLoggerFactory.Instance;
|
||||
private ILoadBalancerClient? _loadBalancerClient;
|
||||
private IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse>? _balancingStreaming;
|
||||
private Timer? _timer;
|
||||
|
||||
/// <summary>
|
||||
/// LoggerFactory is configured (injected) when class is being instantiated.
|
||||
/// </summary>
|
||||
public ILoggerFactory LoggerFactory
|
||||
{
|
||||
set
|
||||
{
|
||||
_loggerFactory = value;
|
||||
_logger = value.CreateLogger<GrpclbPolicy>();
|
||||
}
|
||||
}
|
||||
|
||||
internal bool Disposed { get; private set; }
|
||||
|
||||
internal IReadOnlyList<GrpcSubChannel> SubChannels { get; set; } = Array.Empty<GrpcSubChannel>();
|
||||
|
||||
/// <summary>
|
||||
/// Property created for testing purposes, allows setter injection
|
||||
/// </summary>
|
||||
internal ILoadBalancerClient? OverrideLoadBalancerClient { private get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Creates a subchannel to each server address. Depending on policy this may require additional
|
||||
/// steps eg. reaching out to lookaside loadbalancer.
|
||||
/// </summary>
|
||||
/// <param name="resolutionResult">Resolved list of servers and/or lookaside load balancers.</param>
|
||||
/// <param name="serviceName">The name of the load balanced service (e.g., service.googleapis.com).</param>
|
||||
/// <param name="isSecureConnection">Flag if connection between client and destination server should be secured.</param>
|
||||
/// <returns>List of subchannels.</returns>
|
||||
public async Task CreateSubChannelsAsync(List<GrpcNameResolutionResult> resolutionResult, string serviceName, bool isSecureConnection)
|
||||
{
|
||||
if (resolutionResult == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(resolutionResult));
|
||||
}
|
||||
if (string.IsNullOrWhiteSpace(serviceName))
|
||||
{
|
||||
throw new ArgumentException($"{nameof(serviceName)} not defined");
|
||||
}
|
||||
resolutionResult = resolutionResult.Where(x => x.IsLoadBalancer).ToList();
|
||||
if (resolutionResult.Count == 0)
|
||||
{
|
||||
throw new ArgumentException($"{nameof(resolutionResult)} must contain at least one blancer address");
|
||||
}
|
||||
_isSecureConnection = isSecureConnection;
|
||||
_logger.LogDebug($"Start grpclb policy");
|
||||
_logger.LogDebug($"Start connection to external load balancer");
|
||||
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
|
||||
var channelOptionsForLB = new GrpcChannelOptions()
|
||||
{
|
||||
LoggerFactory = _loggerFactory
|
||||
};
|
||||
_loadBalancerClient = GetLoadBalancerClient(resolutionResult, channelOptionsForLB);
|
||||
_balancingStreaming = _loadBalancerClient.BalanceLoad();
|
||||
var initialRequest = new InitialLoadBalanceRequest() { Name = $"{serviceName}:{resolutionResult[0].Port}" };
|
||||
await _balancingStreaming.RequestStream.WriteAsync(new LoadBalanceRequest() { InitialRequest = initialRequest }).ConfigureAwait(false);
|
||||
await _balancingStreaming.ResponseStream.MoveNext(CancellationToken.None).ConfigureAwait(false);
|
||||
if (_balancingStreaming.ResponseStream.Current.LoadBalanceResponseTypeCase != LoadBalanceResponse.LoadBalanceResponseTypeOneofCase.InitialResponse)
|
||||
{
|
||||
throw new InvalidOperationException("InitialLoadBalanceRequest was not followed by InitialLoadBalanceResponse");
|
||||
}
|
||||
var initialResponse = _balancingStreaming.ResponseStream.Current.InitialResponse;
|
||||
_clientStatsReportInterval = initialResponse.ClientStatsReportInterval.ToTimeSpan();
|
||||
await _balancingStreaming.ResponseStream.MoveNext(CancellationToken.None).ConfigureAwait(false);
|
||||
if (_balancingStreaming.ResponseStream.Current.LoadBalanceResponseTypeCase != LoadBalanceResponse.LoadBalanceResponseTypeOneofCase.ServerList)
|
||||
{
|
||||
throw new InvalidOperationException("InitialLoadBalanceResponse was not followed by ServerList");
|
||||
}
|
||||
UpdateSubChannels(_balancingStreaming.ResponseStream.Current.ServerList);
|
||||
_logger.LogDebug($"SubChannels list created");
|
||||
_timer = new Timer(ReportClientStatsTimerAsync, null, _clientStatsReportInterval, _clientStatsReportInterval);
|
||||
_logger.LogDebug($"Periodic ClientStats reporting enabled");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
|
||||
/// </summary>
|
||||
/// <returns>Selected subchannel.</returns>
|
||||
public GrpcSubChannel GetNextSubChannel()
|
||||
{
|
||||
Interlocked.Increment(ref _requestsCounter);
|
||||
return SubChannels[Interlocked.Increment(ref _subChannelsSelectionCounter) % SubChannels.Count];
|
||||
}
|
||||
|
||||
// async void recommended by Stephen Cleary https://stackoverflow.com/questions/38917818/pass-async-callback-to-timer-constructor
|
||||
private async void ReportClientStatsTimerAsync(object state)
|
||||
{
|
||||
await ReportClientStatsAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task ReportClientStatsAsync()
|
||||
{
|
||||
var requestsCounter = Interlocked.Exchange(ref _requestsCounter, 0);
|
||||
var clientStats = new ClientStats()
|
||||
{
|
||||
NumCallsStarted = requestsCounter,
|
||||
NumCallsFinished = requestsCounter,
|
||||
NumCallsFinishedKnownReceived = requestsCounter,
|
||||
NumCallsFinishedWithClientFailedToSend = 0,
|
||||
Timestamp = Timestamp.FromDateTime(DateTime.UtcNow)
|
||||
};
|
||||
await _balancingStreaming!.RequestStream.WriteAsync(new LoadBalanceRequest() { ClientStats = clientStats }).ConfigureAwait(false);
|
||||
await _balancingStreaming.ResponseStream.MoveNext(CancellationToken.None).ConfigureAwait(false);
|
||||
if (_balancingStreaming.ResponseStream.Current.LoadBalanceResponseTypeCase == LoadBalanceResponse.LoadBalanceResponseTypeOneofCase.ServerList)
|
||||
{
|
||||
UpdateSubChannels(_balancingStreaming.ResponseStream.Current.ServerList);
|
||||
}
|
||||
}
|
||||
|
||||
private void UpdateSubChannels(ServerList serverList)
|
||||
{
|
||||
var result = new List<GrpcSubChannel>();
|
||||
foreach (var server in serverList.Servers)
|
||||
{
|
||||
var ipAddress = new IPAddress(server.IpAddress.ToByteArray()).ToString();
|
||||
var uriBuilder = new UriBuilder();
|
||||
uriBuilder.Host = ipAddress;
|
||||
uriBuilder.Port = server.Port;
|
||||
uriBuilder.Scheme = _isSecureConnection ? "https" : "http";
|
||||
var uri = uriBuilder.Uri;
|
||||
result.Add(new GrpcSubChannel(uri));
|
||||
_logger.LogDebug($"Found a server {uri}");
|
||||
}
|
||||
SubChannels = result;
|
||||
}
|
||||
|
||||
private ILoadBalancerClient GetLoadBalancerClient(List<GrpcNameResolutionResult> resolutionResult, GrpcChannelOptions channelOptionsForLB)
|
||||
{
|
||||
if(OverrideLoadBalancerClient != null)
|
||||
{
|
||||
return OverrideLoadBalancerClient;
|
||||
}
|
||||
return new WrappedLoadBalancerClient(resolutionResult, channelOptionsForLB);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Releases the resources used by the <see cref="GrpclbPolicy"/> class.
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
if (Disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
try
|
||||
{
|
||||
_timer?.Dispose();
|
||||
_balancingStreaming?.RequestStream.CompleteAsync().Wait(); // close request stream to complete gracefully
|
||||
}
|
||||
finally
|
||||
{
|
||||
_loadBalancerClient?.Dispose();
|
||||
}
|
||||
Disposed = true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Policies
|
||||
{
|
||||
/// <summary>
|
||||
/// The load balancing policy creates a subchannel to each server address.
|
||||
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
|
||||
///
|
||||
/// Official name of this policy is "round_robin". It is a implementation of an balancing-aware client.
|
||||
/// More: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md#balancing-aware-client
|
||||
/// </summary>
|
||||
public sealed class RoundRobinPolicy : IGrpcLoadBalancingPolicy
|
||||
{
|
||||
private int _subChannelsSelectionCounter = -1;
|
||||
private ILogger _logger = NullLogger.Instance;
|
||||
|
||||
/// <summary>
|
||||
/// LoggerFactory is configured (injected) when class is being instantiated.
|
||||
/// </summary>
|
||||
public ILoggerFactory LoggerFactory
|
||||
{
|
||||
set => _logger = value.CreateLogger<RoundRobinPolicy>();
|
||||
}
|
||||
|
||||
internal IReadOnlyList<GrpcSubChannel> SubChannels { get; set; } = Array.Empty<GrpcSubChannel>();
|
||||
|
||||
/// <summary>
|
||||
/// Creates a subchannel to each server address. Depending on policy this may require additional
|
||||
/// steps eg. reaching out to lookaside loadbalancer.
|
||||
/// </summary>
|
||||
/// <param name="resolutionResult">Resolved list of servers and/or lookaside load balancers.</param>
|
||||
/// <param name="serviceName">The name of the load balanced service (e.g., service.googleapis.com).</param>
|
||||
/// <param name="isSecureConnection">Flag if connection between client and destination server should be secured.</param>
|
||||
/// <returns>List of subchannels.</returns>
|
||||
public Task CreateSubChannelsAsync(List<GrpcNameResolutionResult> resolutionResult, string serviceName, bool isSecureConnection)
|
||||
{
|
||||
if (resolutionResult == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(resolutionResult));
|
||||
}
|
||||
if (string.IsNullOrWhiteSpace(serviceName))
|
||||
{
|
||||
throw new ArgumentException($"{nameof(serviceName)} not defined");
|
||||
}
|
||||
resolutionResult = resolutionResult.Where(x => !x.IsLoadBalancer).ToList();
|
||||
if (resolutionResult.Count == 0)
|
||||
{
|
||||
throw new ArgumentException($"{nameof(resolutionResult)} must contain at least one non-blancer address");
|
||||
}
|
||||
_logger.LogDebug($"Start round_robin policy");
|
||||
var result = resolutionResult.Select(x =>
|
||||
{
|
||||
var uriBuilder = new UriBuilder();
|
||||
uriBuilder.Host = x.Host;
|
||||
uriBuilder.Port = x.Port ?? (isSecureConnection ? 443 : 80);
|
||||
uriBuilder.Scheme = isSecureConnection ? "https" : "http";
|
||||
var uri = uriBuilder.Uri;
|
||||
_logger.LogDebug($"Found a server {uri}");
|
||||
return new GrpcSubChannel(uri);
|
||||
}).ToList();
|
||||
_logger.LogDebug($"SubChannels list created");
|
||||
SubChannels = result;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
|
||||
/// </summary>
|
||||
/// <returns>Selected subchannel.</returns>
|
||||
public GrpcSubChannel GetNextSubChannel()
|
||||
{
|
||||
return SubChannels[Interlocked.Increment(ref _subChannelsSelectionCounter) % SubChannels.Count];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Releases the resources used by the <see cref="RoundRobinPolicy"/> class.
|
||||
/// </summary>
|
||||
public void Dispose()
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
#region Copyright notice and license
|
||||
|
||||
// Copyright 2019 The gRPC Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#endregion
|
||||
|
||||
using System.Runtime.CompilerServices;
|
||||
|
||||
[assembly: InternalsVisibleTo("Grpc.Net.Client.LoadBalancing.Tests,PublicKey=" +
|
||||
"00240000048000009400000006020000002400005253413100040000010001002f5797a92c6fcde81bd4098f43" +
|
||||
"0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +
|
||||
"27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
|
||||
"71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
|
||||
|
||||
// For Moq. This assembly needs access to internal types via InternalVisibleTo to be able to mock them
|
||||
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602" +
|
||||
"000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02ba" +
|
||||
"a56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667" +
|
||||
"bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6" +
|
||||
"a7d3113e92484cf7045cc7")]
|
|
@ -0,0 +1,170 @@
|
|||
using DnsClient;
|
||||
using DnsClient.Protocol;
|
||||
using Grpc.Net.Client.LoadBalancing.ResolverPlugins.GrpcServiceConfig;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text.Json;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.ResolverPlugins
|
||||
{
|
||||
/// <summary>
|
||||
/// Resolver plugin is responsible for name resolution by reaching the authority and return
|
||||
/// a list of resolved addresses (both IP address and port) and a service config.
|
||||
/// More: https://github.com/grpc/grpc/blob/master/doc/naming.md
|
||||
/// </summary>
|
||||
public sealed class DnsClientResolverPlugin : IGrpcResolverPlugin
|
||||
{
|
||||
private DnsClientResolverPluginOptions _options;
|
||||
private ILogger _logger = NullLogger.Instance;
|
||||
|
||||
/// <summary>
|
||||
/// LoggerFactory is configured (injected) when class is being instantiated.
|
||||
/// </summary>
|
||||
public ILoggerFactory LoggerFactory
|
||||
{
|
||||
set => _logger = value.CreateLogger<DnsClientResolverPlugin>();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Property created for testing purposes, allows setter injection
|
||||
/// </summary>
|
||||
internal IDnsQuery? OverrideDnsClient { private get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Creates a <seealso cref="DnsClientResolverPlugin"/> that is capable of searching SRV and TXT records.
|
||||
/// </summary>
|
||||
public DnsClientResolverPlugin() : this(new DnsClientResolverPluginOptions())
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a <seealso cref="DnsClientResolverPlugin"/> that is capable of searching SRV and TXT records.
|
||||
/// </summary>
|
||||
/// <param name="options">Options allows override default behaviour.</param>
|
||||
public DnsClientResolverPlugin(DnsClientResolverPluginOptions options)
|
||||
{
|
||||
_options = options;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Name resolution for secified target.
|
||||
/// </summary>
|
||||
/// <param name="target">Server address with scheme.</param>
|
||||
/// <returns>List of resolved servers and/or lookaside load balancers.</returns>
|
||||
public async Task<List<GrpcNameResolutionResult>> StartNameResolutionAsync(Uri target)
|
||||
{
|
||||
if (target == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(target));
|
||||
}
|
||||
if (!target.Scheme.Equals("dns", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
throw new ArgumentException($"{nameof(DnsClientResolverPlugin)} require dns:// scheme to set as target address");
|
||||
}
|
||||
var host = target.Host;
|
||||
var dnsClient = GetDnsClient();
|
||||
if (!_options.DisableTxtServiceConfig)
|
||||
{
|
||||
var serviceConfigDnsQuery = $"_grpc_config.{host}";
|
||||
_logger.LogDebug($"Start TXT lookup for {serviceConfigDnsQuery}");
|
||||
var txtRecords = (await dnsClient.QueryAsync(serviceConfigDnsQuery, QueryType.TXT).ConfigureAwait(false)).Answers.OfType<TxtRecord>().ToArray();
|
||||
_logger.LogDebug($"Number of TXT records found: {txtRecords.Length}");
|
||||
var grpcConfigs = txtRecords.SelectMany(x => x.Text).Where(IsGrpcConfigTxtRecord).ToArray();
|
||||
_logger.LogDebug($"Number of grpc_configs found: {grpcConfigs.Length}");
|
||||
if(grpcConfigs.Length != 0 && TryParseGrpcConfig(grpcConfigs[0], out var serviceConfigs))
|
||||
{
|
||||
_logger.LogDebug($"First grpc_config is selected " + grpcConfigs[0]);
|
||||
_logger.LogDebug($"Parsing JSON grpc_config into service config success");
|
||||
var serviceConfig = serviceConfigs[0];
|
||||
_logger.LogDebug($"Service config defines policies: {string.Join(',', serviceConfig.GetLoadBalancingPolicies())}");
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogDebug($"Parsing JSON grpc_config into service config failed, loading service config is skipped");
|
||||
}
|
||||
}
|
||||
var balancingDnsQuery = $"_grpclb._tcp.{host}";
|
||||
var serversDnsQuery = host;
|
||||
_logger.LogDebug($"Start SRV lookup for {balancingDnsQuery}");
|
||||
_logger.LogDebug($"Start A lookup for {serversDnsQuery}");
|
||||
var balancingDnsQueryTask = dnsClient.QueryAsync(balancingDnsQuery, QueryType.SRV);
|
||||
var serversDnsQueryTask = dnsClient.QueryAsync(serversDnsQuery, QueryType.A);
|
||||
await Task.WhenAll(balancingDnsQueryTask, serversDnsQueryTask).ConfigureAwait(false);
|
||||
var results = balancingDnsQueryTask.Result.Answers.OfType<SrvRecord>().Select(x => ParseSrvRecord(x, true))
|
||||
.Union(serversDnsQueryTask.Result.Answers.OfType<ARecord>().Select(x => ParseARecord(x, target.Port, false))).ToList();
|
||||
if (results.Count == 0)
|
||||
{
|
||||
_logger.LogDebug($"Not found any DNS records");
|
||||
return new List<GrpcNameResolutionResult>();
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
private IDnsQuery GetDnsClient()
|
||||
{
|
||||
if (OverrideDnsClient != null)
|
||||
{
|
||||
return OverrideDnsClient;
|
||||
}
|
||||
if (_options.NameServers.Length == 0)
|
||||
{
|
||||
return new LookupClient();
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogDebug($"Override DNS name servers using options");
|
||||
return new LookupClient(_options.NameServers);
|
||||
}
|
||||
}
|
||||
|
||||
private static bool IsGrpcConfigTxtRecord(string txtRecordText)
|
||||
{
|
||||
return txtRecordText.StartsWith("grpc_config=", StringComparison.InvariantCultureIgnoreCase);
|
||||
}
|
||||
|
||||
private static bool TryParseGrpcConfig(string txtRecordText, out ServiceConfig[] serviceConfigs)
|
||||
{
|
||||
try
|
||||
{
|
||||
var options = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
|
||||
var txtRecordValue = txtRecordText.Substring(12); // remove txt key -> grpc_config=
|
||||
serviceConfigs = JsonSerializer.Deserialize<GrpcConfig[]>(txtRecordValue, options)
|
||||
.Select(x => x.ServiceConfig).ToArray();
|
||||
return true;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
serviceConfigs = Array.Empty<ServiceConfig>();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private GrpcNameResolutionResult ParseSrvRecord(SrvRecord srvRecord, bool isLoadBalancer)
|
||||
{
|
||||
_logger.LogDebug($"Found a SRV record {srvRecord.ToString()}");
|
||||
return new GrpcNameResolutionResult(srvRecord.Target)
|
||||
{
|
||||
Port = srvRecord.Port,
|
||||
IsLoadBalancer = isLoadBalancer,
|
||||
Priority = srvRecord.Priority,
|
||||
Weight = srvRecord.Weight
|
||||
};
|
||||
}
|
||||
|
||||
private GrpcNameResolutionResult ParseARecord(ARecord aRecord, int port, bool isLoadBalancer)
|
||||
{
|
||||
_logger.LogDebug($"Found a A record {aRecord.ToString()}");
|
||||
return new GrpcNameResolutionResult(aRecord.Address.ToString())
|
||||
{
|
||||
Port = port,
|
||||
IsLoadBalancer = isLoadBalancer,
|
||||
Priority = 0,
|
||||
Weight = 0
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
using System;
|
||||
using System.Net;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.ResolverPlugins
|
||||
{
|
||||
/// <summary>
|
||||
/// An options class for configuring a <see cref="DnsClientResolverPlugin"/>.
|
||||
/// </summary>
|
||||
public sealed class DnsClientResolverPluginOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Allows override dns nameservers used during lookup. Default value is an empty list.
|
||||
/// If an empty list is specified client defaults to machine list of nameservers.
|
||||
/// </summary>
|
||||
public IPEndPoint[] NameServers { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Allows disabling service config lookup. Default value false.
|
||||
/// </summary>
|
||||
public bool DisableTxtServiceConfig { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Creates a <seealso cref="DnsClientResolverPluginOptions"/> options class with default values.
|
||||
/// </summary>
|
||||
public DnsClientResolverPluginOptions()
|
||||
{
|
||||
NameServers = Array.Empty<IPEndPoint>();
|
||||
DisableTxtServiceConfig = true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
#pragma warning disable CA1812 // Classes in this file are used for deserialization
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.ResolverPlugins.GrpcServiceConfig
|
||||
{
|
||||
// based on: https://github.com/grpc/proposal/blob/master/A2-service-configs-in-dns.md
|
||||
internal sealed class GrpcConfig
|
||||
{
|
||||
public ServiceConfig ServiceConfig { get; set; } = new ServiceConfig();
|
||||
}
|
||||
|
||||
//based on: https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto
|
||||
internal sealed class ServiceConfig
|
||||
{
|
||||
// This field is deprecated but currently widely used
|
||||
public string LoadBalancingPolicy { get; set; } = string.Empty;
|
||||
public List<LoadBalancingConfig> LoadBalancingConfig { get; set; } = new List<LoadBalancingConfig>();
|
||||
public string[] GetLoadBalancingPolicies()
|
||||
{
|
||||
if (LoadBalancingConfig.Count != 0)
|
||||
{
|
||||
return LoadBalancingConfig.Select(x => x.GetPolicyName()).ToArray();
|
||||
}
|
||||
if (LoadBalancingPolicy != string.Empty)
|
||||
{
|
||||
return new string[] { LoadBalancingPolicy };
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidOperationException("Invalid ServiceConfig, load balancing policy must be specified");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class LoadBalancingConfig
|
||||
{
|
||||
public PickFirstConfig? PickFirst { get; set; }
|
||||
public RoundRobinConfig? RoundRobin { get; set; }
|
||||
public GrpcLbConfig? Grpclb { get; set; }
|
||||
|
||||
//XDS policy config should be added here in the future
|
||||
|
||||
public string GetPolicyName()
|
||||
{
|
||||
// according to proto file only one configuration can be specified
|
||||
return Grpclb?.ToString() ?? RoundRobin?.ToString() ?? PickFirst?.ToString()
|
||||
?? throw new InvalidOperationException("Load balancing config without policy defined");
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class PickFirstConfig
|
||||
{
|
||||
//This should be left empty, see service_config.proto file
|
||||
|
||||
public override string ToString()
|
||||
{
|
||||
return "pick_first";
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class RoundRobinConfig
|
||||
{
|
||||
//This should be left empty, see service_config.proto file
|
||||
|
||||
public override string ToString()
|
||||
{
|
||||
return "round_robin";
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class GrpcLbConfig
|
||||
{
|
||||
public List<LoadBalancingConfig>? ChildPolicy { get; set; }
|
||||
|
||||
public string? ServiceName { get; set; }
|
||||
|
||||
public override string ToString()
|
||||
{
|
||||
return "grpclb";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.ResolverPlugins
|
||||
{
|
||||
/// <summary>
|
||||
/// Resolver plugin is responsible for name resolution by reaching the authority and return
|
||||
/// a list of resolved addresses (both IP address and port) and a service config.
|
||||
/// More: https://github.com/grpc/grpc/blob/master/doc/naming.md
|
||||
/// </summary>
|
||||
public sealed class StaticResolverPlugin : IGrpcResolverPlugin
|
||||
{
|
||||
private readonly Func<Uri, List<GrpcNameResolutionResult>> _staticNameResolution;
|
||||
private ILogger _logger = NullLogger.Instance;
|
||||
|
||||
/// <summary>
|
||||
/// LoggerFactory is configured (injected) when class is being instantiated.
|
||||
/// </summary>
|
||||
public ILoggerFactory LoggerFactory
|
||||
{
|
||||
set => _logger = value.CreateLogger<StaticResolverPlugin>();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a <seealso cref="StaticResolverPlugin"/> with configation passed as function parameter.
|
||||
/// </summary>
|
||||
/// <param name="staticNameResolution"></param>
|
||||
public StaticResolverPlugin(Func<Uri, List<GrpcNameResolutionResult>> staticNameResolution)
|
||||
{
|
||||
if(staticNameResolution == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(staticNameResolution));
|
||||
}
|
||||
_staticNameResolution = staticNameResolution;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Name resolution for secified target.
|
||||
/// </summary>
|
||||
/// <param name="target">Server address with scheme.</param>
|
||||
/// <returns>List of resolved servers and/or lookaside load balancers.</returns>
|
||||
public Task<List<GrpcNameResolutionResult>> StartNameResolutionAsync(Uri target)
|
||||
{
|
||||
_logger.LogDebug($"Using static name resolution");
|
||||
return Task.FromResult(_staticNameResolution(target));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -38,8 +38,8 @@ namespace Grpc.Net.Client
|
|||
{
|
||||
internal const int DefaultMaxReceiveMessageSize = 1024 * 1024 * 4; // 4 MB
|
||||
|
||||
private readonly ConcurrentDictionary<IMethod, GrpcMethodInfo> _methodInfoCache;
|
||||
private readonly Func<IMethod, GrpcMethodInfo> _createMethodInfoFunc;
|
||||
private readonly ConcurrentDictionary<IMethod, GrpcCallScope> _callScopeCache;
|
||||
private readonly Func<IMethod, GrpcCallScope> _createCallScopeFunc;
|
||||
|
||||
internal Uri Address { get; }
|
||||
internal HttpClient HttpClient { get; }
|
||||
|
@ -51,6 +51,8 @@ namespace Grpc.Net.Client
|
|||
internal List<CallCredentials>? CallCredentials { get; }
|
||||
internal Dictionary<string, ICompressionProvider> CompressionProviders { get; }
|
||||
internal string MessageAcceptEncoding { get; }
|
||||
internal IGrpcResolverPlugin ResolverPlugin { get; }
|
||||
internal IGrpcLoadBalancingPolicy LoadBalancingPolicy { get; }
|
||||
internal bool Disposed { get; private set; }
|
||||
// Timing related options that are set in unit tests
|
||||
internal ISystemClock Clock = SystemClock.Instance;
|
||||
|
@ -61,7 +63,7 @@ namespace Grpc.Net.Client
|
|||
|
||||
internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(address.Authority)
|
||||
{
|
||||
_methodInfoCache = new ConcurrentDictionary<IMethod, GrpcMethodInfo>();
|
||||
_callScopeCache = new ConcurrentDictionary<IMethod, GrpcCallScope>();
|
||||
|
||||
// Dispose the HttpClient if...
|
||||
// 1. No client was specified and so the channel created the HttpClient itself
|
||||
|
@ -76,7 +78,7 @@ namespace Grpc.Net.Client
|
|||
MessageAcceptEncoding = GrpcProtocolHelpers.GetMessageAcceptEncoding(CompressionProviders);
|
||||
LoggerFactory = channelOptions.LoggerFactory ?? NullLoggerFactory.Instance;
|
||||
ThrowOperationCanceledOnCancellation = channelOptions.ThrowOperationCanceledOnCancellation;
|
||||
_createMethodInfoFunc = CreateMethodInfo;
|
||||
_createCallScopeFunc = CreateCallScope;
|
||||
|
||||
if (channelOptions.Credentials != null)
|
||||
{
|
||||
|
@ -88,6 +90,15 @@ namespace Grpc.Net.Client
|
|||
|
||||
ValidateChannelCredentials();
|
||||
}
|
||||
|
||||
ResolverPlugin = channelOptions.ResolverPlugin;
|
||||
ResolverPlugin.LoggerFactory = LoggerFactory;
|
||||
LoadBalancingPolicy = channelOptions.LoadBalancingPolicy;
|
||||
LoadBalancingPolicy.LoggerFactory = LoggerFactory;
|
||||
|
||||
var resolutionResult = ResolverPlugin.StartNameResolutionAsync(Address).GetAwaiter().GetResult();
|
||||
var isSecureConnection = Address.Scheme == Uri.UriSchemeHttps || (Address.Scheme.Equals("dns", StringComparison.OrdinalIgnoreCase) && Address.Port == 443);
|
||||
LoadBalancingPolicy.CreateSubChannelsAsync(resolutionResult, Address.Host, Address.Scheme == Uri.UriSchemeHttps).Wait();
|
||||
}
|
||||
|
||||
private static HttpClient CreateInternalHttpClient()
|
||||
|
@ -107,17 +118,15 @@ namespace Grpc.Net.Client
|
|||
return httpClient;
|
||||
}
|
||||
|
||||
internal GrpcMethodInfo GetCachedGrpcMethodInfo(IMethod method)
|
||||
internal GrpcCallScope GetCachedGrpcCallScope(IMethod method)
|
||||
{
|
||||
return _methodInfoCache.GetOrAdd(method, _createMethodInfoFunc);
|
||||
return _callScopeCache.GetOrAdd(method, _createCallScopeFunc);
|
||||
}
|
||||
|
||||
private GrpcMethodInfo CreateMethodInfo(IMethod method)
|
||||
private GrpcCallScope CreateCallScope(IMethod method)
|
||||
{
|
||||
var uri = new Uri(method.FullName, UriKind.Relative);
|
||||
var scope = new GrpcCallScope(method.Type, uri);
|
||||
|
||||
return new GrpcMethodInfo(scope, new Uri(Address, uri));
|
||||
return new GrpcCallScope(method.Type, uri);
|
||||
}
|
||||
|
||||
private static Dictionary<string, ICompressionProvider> ResolveCompressionProviders(IList<ICompressionProvider>? compressionProviders)
|
||||
|
@ -165,7 +174,7 @@ namespace Grpc.Net.Client
|
|||
{
|
||||
throw new ObjectDisposedException(nameof(GrpcChannel));
|
||||
}
|
||||
|
||||
|
||||
var invoker = new HttpClientCallInvoker(this);
|
||||
|
||||
return invoker;
|
||||
|
@ -275,6 +284,8 @@ namespace Grpc.Net.Client
|
|||
return;
|
||||
}
|
||||
|
||||
LoadBalancingPolicy.Dispose();
|
||||
|
||||
if (_shouldDisposeHttpClient)
|
||||
{
|
||||
HttpClient.Dispose();
|
||||
|
|
|
@ -96,12 +96,24 @@ namespace Grpc.Net.Client
|
|||
/// </summary>
|
||||
public bool ThrowOperationCanceledOnCancellation { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets name resolver.
|
||||
/// </summary>
|
||||
public IGrpcResolverPlugin ResolverPlugin { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets load balancing policy.
|
||||
/// </summary>
|
||||
public IGrpcLoadBalancingPolicy LoadBalancingPolicy { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="GrpcChannelOptions"/> class.
|
||||
/// </summary>
|
||||
public GrpcChannelOptions()
|
||||
{
|
||||
MaxReceiveMessageSize = GrpcChannel.DefaultMaxReceiveMessageSize;
|
||||
LoadBalancingPolicy = new PickFirstPolicy();
|
||||
ResolverPlugin = new NoneResolverPlugin();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Grpc.Net.Client
|
||||
{
|
||||
/// <summary>
|
||||
/// Resolver plugin is responsible for name resolution by reaching the authority and return
|
||||
/// a list of resolved addresses (both IP address and port) and a service config.
|
||||
/// More: https://github.com/grpc/grpc/blob/master/doc/naming.md
|
||||
/// </summary>
|
||||
public interface IGrpcResolverPlugin
|
||||
{
|
||||
/// <summary>
|
||||
/// LoggerFactory is configured (injected) when class is being instantiated.
|
||||
/// </summary>
|
||||
ILoggerFactory LoggerFactory { set; }
|
||||
|
||||
/// <summary>
|
||||
/// Name resolution for secified target.
|
||||
/// </summary>
|
||||
/// <param name="target">Server address with scheme.</param>
|
||||
/// <returns>List of resolved servers and/or lookaside load balancers.</returns>
|
||||
Task<List<GrpcNameResolutionResult>> StartNameResolutionAsync(Uri target);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The load balancing policy creates a subchannel to each server address.
|
||||
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
|
||||
/// </summary>
|
||||
public interface IGrpcLoadBalancingPolicy : IDisposable
|
||||
{
|
||||
/// <summary>
|
||||
/// LoggerFactory is configured (injected) when class is being instantiated.
|
||||
/// </summary>
|
||||
ILoggerFactory LoggerFactory { set; }
|
||||
|
||||
/// <summary>
|
||||
/// Creates a subchannel to each server address. Depending on policy this may require additional
|
||||
/// steps eg. reaching out to lookaside loadbalancer.
|
||||
/// </summary>
|
||||
/// <param name="resolutionResult">Resolved list of servers and/or lookaside load balancers.</param>
|
||||
/// <param name="serviceName">The name of the load balanced service (e.g., service.googleapis.com).</param>
|
||||
/// <param name="isSecureConnection">Flag if connection between client and destination server should be secured.</param>
|
||||
/// <returns>List of subchannels.</returns>
|
||||
Task CreateSubChannelsAsync(List<GrpcNameResolutionResult> resolutionResult, string serviceName, bool isSecureConnection);
|
||||
|
||||
/// <summary>
|
||||
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
|
||||
/// </summary>
|
||||
/// <returns>Selected subchannel.</returns>
|
||||
GrpcSubChannel GetNextSubChannel();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Resolved address of server or lookaside load balancer.
|
||||
/// </summary>
|
||||
public sealed class GrpcNameResolutionResult
|
||||
{
|
||||
/// <summary>
|
||||
/// Host address.
|
||||
/// </summary>
|
||||
public string Host { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Port.
|
||||
/// </summary>
|
||||
public int? Port { get; set; } = null;
|
||||
|
||||
/// <summary>
|
||||
/// Flag that indicate if machine is load balancer or service.
|
||||
/// </summary>
|
||||
public bool IsLoadBalancer { get; set; } = false;
|
||||
|
||||
/// <summary>
|
||||
/// Priority value, which was obtained from SRV record, for this Host. Default value zero.
|
||||
/// </summary>
|
||||
public int Priority { get; set; } = 0;
|
||||
|
||||
/// <summary>
|
||||
/// Weight value, which was obtained from SRV record, for this Host. Default value zero.
|
||||
/// </summary>
|
||||
public int Weight { get; set; } = 0;
|
||||
|
||||
/// <summary>
|
||||
/// Creates a <see cref="GrpcNameResolutionResult"/> with host and unassigned port.
|
||||
/// </summary>
|
||||
/// <param name="host">Host address of machine.</param>
|
||||
/// <param name="port">Machine port.</param>
|
||||
public GrpcNameResolutionResult(string host, int? port = null)
|
||||
{
|
||||
Host = host;
|
||||
Port = port;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Address of server that can handle requests for RPC.
|
||||
/// </summary>
|
||||
public sealed class GrpcSubChannel
|
||||
{
|
||||
/// <summary>
|
||||
/// Gets the server address in Uri form.
|
||||
/// </summary>
|
||||
public Uri Address { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Creates a <see cref="GrpcSubChannel"/> object with subchannel address.
|
||||
/// </summary>
|
||||
/// <param name="address"></param>
|
||||
public GrpcSubChannel(Uri address)
|
||||
{
|
||||
Address = address;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Assume name was already resolved or pass through to HttpClient to handle
|
||||
/// </summary>
|
||||
internal sealed class NoneResolverPlugin : IGrpcResolverPlugin
|
||||
{
|
||||
private ILogger _logger = NullLogger.Instance;
|
||||
public ILoggerFactory LoggerFactory
|
||||
{
|
||||
set => _logger = value.CreateLogger<NoneResolverPlugin>();
|
||||
}
|
||||
|
||||
public Task<List<GrpcNameResolutionResult>> StartNameResolutionAsync(Uri target)
|
||||
{
|
||||
if (target == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(target));
|
||||
}
|
||||
if (target.Scheme.Equals("dns", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
throw new ArgumentException("dns:// scheme require non-default name resolver in channelOptions.ResolverPlugin");
|
||||
}
|
||||
_logger.LogDebug($"Name resolver using defined target as name resolution");
|
||||
return Task.FromResult(new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult(target.Host, target.Port)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class PickFirstPolicy : IGrpcLoadBalancingPolicy
|
||||
{
|
||||
private ILogger _logger = NullLogger.Instance;
|
||||
public ILoggerFactory LoggerFactory
|
||||
{
|
||||
set => _logger = value.CreateLogger<PickFirstPolicy>();
|
||||
}
|
||||
|
||||
internal IReadOnlyList<GrpcSubChannel> SubChannels { get; set; } = Array.Empty<GrpcSubChannel>();
|
||||
|
||||
public Task CreateSubChannelsAsync(List<GrpcNameResolutionResult> resolutionResult, string serviceName, bool isSecureConnection)
|
||||
{
|
||||
if (resolutionResult == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(resolutionResult));
|
||||
}
|
||||
if (string.IsNullOrWhiteSpace(serviceName))
|
||||
{
|
||||
throw new ArgumentException($"{nameof(serviceName)} not defined");
|
||||
}
|
||||
resolutionResult = resolutionResult.Where(x => !x.IsLoadBalancer).ToList();
|
||||
if (resolutionResult.Count == 0)
|
||||
{
|
||||
throw new ArgumentException($"{nameof(resolutionResult)} must contain at least one non-blancer address");
|
||||
}
|
||||
_logger.LogDebug($"Start first_pick policy");
|
||||
var uriBuilder = new UriBuilder();
|
||||
uriBuilder.Host = resolutionResult[0].Host;
|
||||
uriBuilder.Port = resolutionResult[0].Port ?? (isSecureConnection ? 443 : 80);
|
||||
uriBuilder.Scheme = isSecureConnection ? "https" : "http";
|
||||
var uri = uriBuilder.Uri;
|
||||
var result = new List<GrpcSubChannel> {
|
||||
new GrpcSubChannel(uri)
|
||||
};
|
||||
_logger.LogDebug($"Found a server {uri}");
|
||||
_logger.LogDebug($"SubChannels list created");
|
||||
SubChannels = result;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public GrpcSubChannel GetNextSubChannel()
|
||||
{
|
||||
return SubChannels[0];
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
|
@ -38,6 +38,8 @@ namespace Grpc.Net.Client.Internal
|
|||
_uri = uri;
|
||||
}
|
||||
|
||||
public Uri Uri => _uri;
|
||||
|
||||
public KeyValuePair<string, object> this[int index]
|
||||
{
|
||||
get
|
||||
|
|
|
@ -129,7 +129,9 @@ namespace Grpc.Net.Client.Internal
|
|||
throw new ObjectDisposedException(nameof(GrpcChannel));
|
||||
}
|
||||
|
||||
var methodInfo = Channel.GetCachedGrpcMethodInfo(method);
|
||||
var subchannel = Channel.LoadBalancingPolicy.GetNextSubChannel();
|
||||
var scope = Channel.GetCachedGrpcCallScope(method);
|
||||
var methodInfo = new GrpcMethodInfo(scope, new Uri(subchannel.Address, scope.Uri));
|
||||
var call = new GrpcCall<TRequest, TResponse>(method, methodInfo, options, Channel);
|
||||
|
||||
return call;
|
||||
|
|
|
@ -30,6 +30,12 @@ using System.Runtime.CompilerServices;
|
|||
"27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
|
||||
"71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
|
||||
|
||||
[assembly: InternalsVisibleTo("Grpc.Net.Client.LoadBalancing.Tests,PublicKey=" +
|
||||
"00240000048000009400000006020000002400005253413100040000010001002f5797a92c6fcde81bd4098f43" +
|
||||
"0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +
|
||||
"27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
|
||||
"71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
|
||||
|
||||
[assembly: InternalsVisibleTo("Grpc.Net.ClientFactory.Tests,PublicKey=" +
|
||||
"00240000048000009400000006020000002400005253413100040000010001002f5797a92c6fcde81bd4098f43" +
|
||||
"0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +
|
||||
|
|
|
@ -305,7 +305,11 @@ namespace Microsoft.Extensions.DependencyInjection
|
|||
var os = s.GetRequiredService<IOptionsMonitor<GrpcClientFactoryOptions>>();
|
||||
var clientOptions = os.Get(name);
|
||||
|
||||
httpClient.BaseAddress = clientOptions.Address;
|
||||
// do not set shared baseaddress for subchannels
|
||||
if (clientOptions.Address?.Scheme != "dns")
|
||||
{
|
||||
httpClient.BaseAddress = clientOptions.Address;
|
||||
}
|
||||
|
||||
// Long running server and duplex streaming gRPC requests may not
|
||||
// return any messages for over 100 seconds, triggering a cancellation
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>netcoreapp3.1</TargetFramework>
|
||||
<IsPackable>false</IsPackable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
|
||||
<PackageReference Include="xunit" Version="2.4.0" />
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
|
||||
<PackageReference Include="coverlet.collector" Version="1.0.1" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\Grpc.Net.Client\Grpc.Net.Client.csproj" />
|
||||
<ProjectReference Include="..\..\src\Grpc.Net.Client.LoadBalancing\Grpc.Net.Client.LoadBalancing.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
|
@ -0,0 +1,250 @@
|
|||
using Google.Protobuf;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Grpc.Core;
|
||||
using Grpc.Lb.V1;
|
||||
using Grpc.Net.Client.LoadBalancing.Policies;
|
||||
using Grpc.Net.Client.LoadBalancing.Policies.Abstraction;
|
||||
using Moq;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Net;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Xunit;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Tests.Policies
|
||||
{
|
||||
public sealed class GrpclbPolicyTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task ForEmptyServiceName_UseGrpclbPolicy_ThrowArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new GrpclbPolicy();
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.6.120", 80)
|
||||
{
|
||||
IsLoadBalancer = true
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.6.121", 80)
|
||||
{
|
||||
IsLoadBalancer = true
|
||||
}
|
||||
};
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "", false);
|
||||
});
|
||||
Assert.Equal("serviceName not defined", exception.Message);
|
||||
exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, string.Empty, false);
|
||||
});
|
||||
Assert.Equal("serviceName not defined", exception.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForEmptyResolutionPassed_UseGrpclbPolicy_ThrowArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new GrpclbPolicy();
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(new List<GrpcNameResolutionResult>(), "sample-service.contoso.com", false);
|
||||
});
|
||||
Assert.Equal("resolutionResult must contain at least one blancer address", exception.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForServersResolutionOnly_UseGrpclbPolicy_ThrowArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new GrpclbPolicy();
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.5.211", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.212", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
}
|
||||
};
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false); // non-balancers are ignored
|
||||
});
|
||||
Assert.Equal("resolutionResult must contain at least one blancer address", exception.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForResolutionResultWithBalancers_UseGrpclbPolicy_CreateSubchannelsForFoundServers()
|
||||
{
|
||||
// Arrange
|
||||
var balancerClientMock = new Mock<ILoadBalancerClient>(MockBehavior.Strict);
|
||||
var balancerStreamMock = new Mock<IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse>>(MockBehavior.Strict);
|
||||
var requestStreamMock = new Mock<IClientStreamWriter<LoadBalanceRequest>>(MockBehavior.Loose);
|
||||
|
||||
balancerClientMock.Setup(x => x.Dispose());
|
||||
balancerClientMock.Setup(x => x.BalanceLoad(null, null, It.IsAny<CancellationToken>()))
|
||||
.Returns(balancerStreamMock.Object);
|
||||
|
||||
balancerStreamMock.Setup(x => x.RequestStream).Returns(requestStreamMock.Object);
|
||||
balancerStreamMock.Setup(x => x.ResponseStream).Returns(new TestLoadBalancerResponse(new List<LoadBalanceResponse>
|
||||
{
|
||||
new LoadBalanceResponse()
|
||||
{
|
||||
InitialResponse = GetSampleInitialLoadBalanceResponse()
|
||||
},
|
||||
new LoadBalanceResponse()
|
||||
{
|
||||
ServerList = GetSampleServerList()
|
||||
}
|
||||
}));
|
||||
|
||||
using var policy = new GrpclbPolicy();
|
||||
policy.OverrideLoadBalancerClient = balancerClientMock.Object;
|
||||
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.6.120", 80) { IsLoadBalancer = true }
|
||||
};
|
||||
|
||||
// Act
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false);
|
||||
var subChannels = policy.SubChannels;
|
||||
|
||||
// Assert
|
||||
Assert.Equal(3, subChannels.Count); // subChannels are created per results from GetSampleLoadBalanceResponse
|
||||
Assert.All(subChannels, subChannel => Assert.Equal("http", subChannel.Address.Scheme));
|
||||
Assert.All(subChannels, subChannel => Assert.Equal(80, subChannel.Address.Port));
|
||||
Assert.All(subChannels, subChannel => Assert.StartsWith("10.1.5.", subChannel.Address.Host));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForLoadBalancerClient_UseGrpclbPolicy_EnsureDisposedResources()
|
||||
{
|
||||
// Arrange
|
||||
var balancerClientMock = new Mock<ILoadBalancerClient>(MockBehavior.Strict);
|
||||
var balancerStreamMock = new Mock<IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse>>(MockBehavior.Strict);
|
||||
var requestStreamMock = new Mock<IClientStreamWriter<LoadBalanceRequest>>(MockBehavior.Loose);
|
||||
|
||||
balancerClientMock.Setup(x => x.Dispose()).Verifiable();
|
||||
balancerClientMock.Setup(x => x.BalanceLoad(null, null, It.IsAny<CancellationToken>()))
|
||||
.Returns(balancerStreamMock.Object);
|
||||
|
||||
balancerStreamMock.Setup(x => x.RequestStream).Returns(requestStreamMock.Object);
|
||||
balancerStreamMock.Setup(x => x.ResponseStream).Returns(new TestLoadBalancerResponse(new List<LoadBalanceResponse>
|
||||
{
|
||||
new LoadBalanceResponse()
|
||||
{
|
||||
InitialResponse = GetSampleInitialLoadBalanceResponse()
|
||||
},
|
||||
new LoadBalanceResponse()
|
||||
{
|
||||
ServerList = GetSampleServerList()
|
||||
}
|
||||
}));
|
||||
|
||||
using var policy = new GrpclbPolicy();
|
||||
policy.OverrideLoadBalancerClient = balancerClientMock.Object;
|
||||
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.6.120", 80) { IsLoadBalancer = true }
|
||||
};
|
||||
|
||||
// Act
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false);
|
||||
var subChannels = policy.SubChannels;
|
||||
|
||||
// Assert
|
||||
policy.Dispose();
|
||||
balancerClientMock.Verify(x => x.Dispose(), Times.Once());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ForGrpcSubChannels_UseGrpclbPolicySelectChannels_SelectChannelsInRoundRobin()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new GrpclbPolicy();
|
||||
var subChannels = new List<GrpcSubChannel>()
|
||||
{
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.210:80").Uri),
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.212:80").Uri),
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.211:80").Uri),
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.213:80").Uri)
|
||||
};
|
||||
policy.SubChannels = subChannels;
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
for (int i = 0; i < 30; i++)
|
||||
{
|
||||
var subChannel = policy.GetNextSubChannel();
|
||||
Assert.Equal(subChannels[i % subChannels.Count].Address.Host, subChannel.Address.Host);
|
||||
Assert.Equal(subChannels[i % subChannels.Count].Address.Port, subChannel.Address.Port);
|
||||
Assert.Equal(subChannels[i % subChannels.Count].Address.Scheme, subChannel.Address.Scheme);
|
||||
}
|
||||
}
|
||||
|
||||
private static InitialLoadBalanceResponse GetSampleInitialLoadBalanceResponse()
|
||||
{
|
||||
var initialResponse = new InitialLoadBalanceResponse();
|
||||
initialResponse.ClientStatsReportInterval = Duration.FromTimeSpan(TimeSpan.FromSeconds(10));
|
||||
initialResponse.LoadBalancerDelegate = string.Empty;
|
||||
return initialResponse;
|
||||
}
|
||||
|
||||
private static ServerList GetSampleServerList()
|
||||
{
|
||||
var serverList = new ServerList();
|
||||
serverList.Servers.Add(new Server()
|
||||
{
|
||||
IpAddress = ByteString.CopyFrom(IPAddress.Parse("10.1.5.211").GetAddressBytes()),
|
||||
Port = 80
|
||||
});
|
||||
serverList.Servers.Add(new Server()
|
||||
{
|
||||
IpAddress = ByteString.CopyFrom(IPAddress.Parse("10.1.5.212").GetAddressBytes()),
|
||||
Port = 80
|
||||
});
|
||||
serverList.Servers.Add(new Server()
|
||||
{
|
||||
IpAddress = ByteString.CopyFrom(IPAddress.Parse("10.1.5.213").GetAddressBytes()),
|
||||
Port = 80
|
||||
});
|
||||
return serverList;
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class TestLoadBalancerResponse : IAsyncStreamReader<LoadBalanceResponse>
|
||||
{
|
||||
private readonly IReadOnlyList<LoadBalanceResponse> _loadBalanceResponses;
|
||||
private int _streamIndex;
|
||||
|
||||
public TestLoadBalancerResponse(IReadOnlyList<LoadBalanceResponse> loadBalanceResponses)
|
||||
{
|
||||
_loadBalanceResponses = loadBalanceResponses;
|
||||
_streamIndex = -1;
|
||||
}
|
||||
|
||||
public LoadBalanceResponse Current => _loadBalanceResponses[_streamIndex];
|
||||
|
||||
public Task<bool> MoveNext(CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.FromResult(++_streamIndex < _loadBalanceResponses.Count);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,179 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using Xunit;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Tests.Policies
|
||||
{
|
||||
public sealed class PickFirstPolicyTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task ForEmptyServiceName_UsePickFirstPolicy_ThrowArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new PickFirstPolicy();
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.5.211", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.212", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
}
|
||||
};
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "", false);
|
||||
});
|
||||
Assert.Equal("serviceName not defined", exception.Message);
|
||||
exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, string.Empty, false);
|
||||
});
|
||||
Assert.Equal("serviceName not defined", exception.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForEmptyResolutionPassed_UsePickFirstPolicy_ThrowArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new PickFirstPolicy();
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(new List<GrpcNameResolutionResult>(), "sample-service.contoso.com", false);
|
||||
});
|
||||
Assert.Equal("resolutionResult must contain at least one non-blancer address", exception.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForBalancersResolutionOnly_UsePickFirstPolicy_ThrowArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new PickFirstPolicy();
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.6.120", 80)
|
||||
{
|
||||
IsLoadBalancer = true
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.6.121", 80)
|
||||
{
|
||||
IsLoadBalancer = true
|
||||
}
|
||||
};
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false); // load balancers are ignored
|
||||
});
|
||||
Assert.Equal("resolutionResult must contain at least one non-blancer address", exception.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForResolutionResults_UsePickFirstPolicy_CreateAmmountSubChannels()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new PickFirstPolicy();
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.5.211", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.212", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.213", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.214", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
}
|
||||
};
|
||||
|
||||
// Act
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false);
|
||||
var subChannels = policy.SubChannels;
|
||||
|
||||
// Assert
|
||||
Assert.Single(subChannels);
|
||||
Assert.Equal("http", subChannels[0].Address.Scheme);
|
||||
Assert.Equal(80, subChannels[0].Address.Port);
|
||||
Assert.StartsWith("10.1.5.211", subChannels[0].Address.Host);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForResolutionResultWithBalancers_UsePickFirstPolicy_IgnoreBalancersCreateSubchannels()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new PickFirstPolicy();
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.6.120", 80)
|
||||
{
|
||||
IsLoadBalancer = true
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.212", 8443)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.6.121", 80)
|
||||
{
|
||||
IsLoadBalancer = true
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.214", 8443)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
}
|
||||
};
|
||||
|
||||
// Act
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", true);
|
||||
var subChannels = policy.SubChannels;
|
||||
|
||||
// Assert
|
||||
Assert.Single(subChannels); // load balancers are ignored
|
||||
Assert.Equal("https", subChannels[0].Address.Scheme);
|
||||
Assert.Equal(8443, subChannels[0].Address.Port);
|
||||
Assert.StartsWith("10.1.5.212", subChannels[0].Address.Host);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ForGrpcSubChannels_UsePickFirstPolicySelectChannels_SelectFirstChannel()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new PickFirstPolicy();
|
||||
var subChannels = new List<GrpcSubChannel>()
|
||||
{
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.210:80").Uri),
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.212:80").Uri),
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.211:80").Uri),
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.213:80").Uri)
|
||||
};
|
||||
policy.SubChannels = subChannels;
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
for (int i = 0; i < 30; i++)
|
||||
{
|
||||
var subChannel = policy.GetNextSubChannel();
|
||||
Assert.Equal(subChannels[0].Address.Host, subChannel.Address.Host);
|
||||
Assert.Equal(subChannels[0].Address.Port, subChannel.Address.Port);
|
||||
Assert.Equal(subChannels[0].Address.Scheme, subChannel.Address.Scheme);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,184 @@
|
|||
using Grpc.Net.Client.LoadBalancing.Policies;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using Xunit;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Tests.Policies
|
||||
{
|
||||
public sealed class RoundRobinPolicyTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task ForEmptyServiceName_UseRoundRobinPolicy_ThrowArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new RoundRobinPolicy();
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.5.211", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.212", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
}
|
||||
};
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "", false);
|
||||
});
|
||||
Assert.Equal("serviceName not defined", exception.Message);
|
||||
exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, string.Empty, false);
|
||||
});
|
||||
Assert.Equal("serviceName not defined", exception.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForEmptyResolutionPassed_UseRoundRobinPolicy_ThrowArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new RoundRobinPolicy();
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(new List<GrpcNameResolutionResult>(), "sample-service.contoso.com", false);
|
||||
});
|
||||
Assert.Equal("resolutionResult must contain at least one non-blancer address", exception.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForBalancersResolutionOnly_UseRoundRobinPolicy_ThrowArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new RoundRobinPolicy();
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.6.120", 80)
|
||||
{
|
||||
IsLoadBalancer = true
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.6.121", 80)
|
||||
{
|
||||
IsLoadBalancer = true
|
||||
}
|
||||
};
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false); // load balancers are ignored
|
||||
});
|
||||
Assert.Equal("resolutionResult must contain at least one non-blancer address", exception.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForResolutionResults_UseRoundRobinPolicy_CreateAmmountSubChannels()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new RoundRobinPolicy();
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.5.211", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.212", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.213", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.214", 80)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
}
|
||||
};
|
||||
|
||||
// Act
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false);
|
||||
var subChannels = policy.SubChannels;
|
||||
|
||||
// Assert
|
||||
Assert.Equal(4, subChannels.Count);
|
||||
Assert.All(subChannels, subChannel => Assert.Equal("http", subChannel.Address.Scheme));
|
||||
Assert.All(subChannels, subChannel => Assert.Equal(80, subChannel.Address.Port));
|
||||
Assert.All(subChannels, subChannel => Assert.StartsWith("10.1.5.", subChannel.Address.Host));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForResolutionResultWithBalancers_UseRoundRobinPolicy_IgnoreBalancersCreateSubchannels()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new RoundRobinPolicy();
|
||||
var resolutionResults = new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.5.211", 8443)
|
||||
{
|
||||
IsLoadBalancer = false,
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.212", 8443)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.6.120", 80)
|
||||
{
|
||||
IsLoadBalancer = true
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.6.121", 80)
|
||||
{
|
||||
IsLoadBalancer = true
|
||||
},
|
||||
new GrpcNameResolutionResult("10.1.5.214", 8443)
|
||||
{
|
||||
IsLoadBalancer = false
|
||||
}
|
||||
};
|
||||
|
||||
// Act
|
||||
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", true);
|
||||
var subChannels = policy.SubChannels;
|
||||
|
||||
// Assert
|
||||
Assert.Equal(3, subChannels.Count); // load balancers are ignored
|
||||
Assert.All(subChannels, subChannel => Assert.Equal("https", subChannel.Address.Scheme));
|
||||
Assert.All(subChannels, subChannel => Assert.Equal(8443, subChannel.Address.Port));
|
||||
Assert.All(subChannels, subChannel => Assert.StartsWith("10.1.5.", subChannel.Address.Host));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ForGrpcSubChannels_UseRoundRobinPolicySelectChannels_SelectChannelsInRoundRobin()
|
||||
{
|
||||
// Arrange
|
||||
using var policy = new RoundRobinPolicy();
|
||||
var subChannels = new List<GrpcSubChannel>()
|
||||
{
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.210:80").Uri),
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.212:80").Uri),
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.211:80").Uri),
|
||||
new GrpcSubChannel(new UriBuilder("http://10.1.5.213:80").Uri)
|
||||
};
|
||||
policy.SubChannels = subChannels;
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
for (int i = 0; i < 30; i++)
|
||||
{
|
||||
var subChannel = policy.GetNextSubChannel();
|
||||
Assert.Equal(subChannels[i % subChannels.Count].Address.Host, subChannel.Address.Host);
|
||||
Assert.Equal(subChannels[i % subChannels.Count].Address.Port, subChannel.Address.Port);
|
||||
Assert.Equal(subChannels[i % subChannels.Count].Address.Scheme, subChannel.Address.Scheme);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
using DnsClient;
|
||||
using DnsClient.Protocol;
|
||||
using Grpc.Net.Client.LoadBalancing.ResolverPlugins;
|
||||
using Moq;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Threading.Tasks;
|
||||
using Xunit;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Tests.ResolverPlugins
|
||||
{
|
||||
public sealed class DnsClientResolverPluginTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task ForTargetWithNonDnsScheme_UseDnsClientResolverPlugin_ThrowArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
var resolverPlugin = new DnsClientResolverPlugin();
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await resolverPlugin.StartNameResolutionAsync(new Uri("http://sample.host.com"));
|
||||
});
|
||||
await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await resolverPlugin.StartNameResolutionAsync(new Uri("https://sample.host.com"));
|
||||
});
|
||||
await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await resolverPlugin.StartNameResolutionAsync(new Uri("unknown://sample.host.com"));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
[Fact]
|
||||
public async Task ForTargetAndEmptyDnsResults_UseDnsClientResolverPlugin_ReturnNoFinidings()
|
||||
{
|
||||
// Arrange
|
||||
var serviceHostName = "my-service";
|
||||
var txtDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
|
||||
var srvBalancersDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
|
||||
var aServersDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
|
||||
var dnsClientMock = new Mock<IDnsQuery>(MockBehavior.Strict);
|
||||
|
||||
txtDnsQueryResponse.Setup(x => x.Answers).Returns(new List<TxtRecord>().AsReadOnly());
|
||||
srvBalancersDnsQueryResponse.Setup(x => x.Answers).Returns(new List<SrvRecord>().AsReadOnly());
|
||||
aServersDnsQueryResponse.Setup(x => x.Answers).Returns(new List<SrvRecord>().AsReadOnly());
|
||||
|
||||
dnsClientMock.Setup(x => x.QueryAsync($"_grpc_config.{serviceHostName}", QueryType.TXT, QueryClass.IN, default))
|
||||
.Returns(Task.FromResult(txtDnsQueryResponse.Object));
|
||||
dnsClientMock.Setup(x => x.QueryAsync($"_grpclb._tcp.{serviceHostName}", QueryType.SRV, QueryClass.IN, default))
|
||||
.Returns(Task.FromResult(srvBalancersDnsQueryResponse.Object));
|
||||
dnsClientMock.Setup(x => x.QueryAsync(serviceHostName, QueryType.A, QueryClass.IN, default))
|
||||
.Returns(Task.FromResult(aServersDnsQueryResponse.Object));
|
||||
|
||||
var resolverPlugin = new DnsClientResolverPlugin();
|
||||
resolverPlugin.OverrideDnsClient = dnsClientMock.Object;
|
||||
|
||||
// Act
|
||||
var resolutionResult = await resolverPlugin.StartNameResolutionAsync(new Uri($"dns://{serviceHostName}:80"));
|
||||
|
||||
// Assert
|
||||
Assert.Empty(resolutionResult);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForTargetAndBalancerSrvRecords_UseDnsClientResolverPlugin_ReturnBalancers()
|
||||
{
|
||||
// Arrange
|
||||
var serviceHostName = "my-service";
|
||||
var txtDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
|
||||
var srvBalancersDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
|
||||
var aServersDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
|
||||
var dnsClientMock = new Mock<IDnsQuery>(MockBehavior.Strict);
|
||||
|
||||
txtDnsQueryResponse.Setup(x => x.Answers).Returns(new List<TxtRecord>().AsReadOnly());
|
||||
srvBalancersDnsQueryResponse.Setup(x => x.Answers).Returns(new List<SrvRecord>(GetBalancersSrvRecords(serviceHostName)).AsReadOnly());
|
||||
aServersDnsQueryResponse.Setup(x => x.Answers).Returns(new List<ARecord>(GetServersARecords(serviceHostName)).AsReadOnly());
|
||||
|
||||
dnsClientMock.Setup(x => x.QueryAsync($"_grpc_config.{serviceHostName}", QueryType.TXT, QueryClass.IN, default))
|
||||
.Returns(Task.FromResult(txtDnsQueryResponse.Object));
|
||||
dnsClientMock.Setup(x => x.QueryAsync($"_grpclb._tcp.{serviceHostName}", QueryType.SRV, QueryClass.IN, default))
|
||||
.Returns(Task.FromResult(srvBalancersDnsQueryResponse.Object));
|
||||
dnsClientMock.Setup(x => x.QueryAsync(serviceHostName, QueryType.A, QueryClass.IN, default))
|
||||
.Returns(Task.FromResult(aServersDnsQueryResponse.Object));
|
||||
|
||||
var resolverPlugin = new DnsClientResolverPlugin();
|
||||
resolverPlugin.OverrideDnsClient = dnsClientMock.Object;
|
||||
|
||||
// Act
|
||||
var resolutionResult = await resolverPlugin.StartNameResolutionAsync(new Uri($"dns://{serviceHostName}:443"));
|
||||
|
||||
// Assert
|
||||
Assert.Equal(5, resolutionResult.Count);
|
||||
Assert.Equal(2, resolutionResult.Where(x => x.IsLoadBalancer).Count());
|
||||
Assert.All(resolutionResult.Where(x => x.IsLoadBalancer), x => Assert.Equal(80, x.Port));
|
||||
Assert.All(resolutionResult.Where(x => x.IsLoadBalancer), x => Assert.StartsWith("10-1-6-", x.Host));
|
||||
Assert.Equal(3, resolutionResult.Where(x => !x.IsLoadBalancer).Count());
|
||||
Assert.All(resolutionResult.Where(x => !x.IsLoadBalancer), x => Assert.Equal(443, x.Port));
|
||||
Assert.All(resolutionResult.Where(x => !x.IsLoadBalancer), x => Assert.StartsWith("10.1.5.", x.Host));
|
||||
}
|
||||
|
||||
private List<SrvRecord> GetBalancersSrvRecords(string serviceHostName)
|
||||
{
|
||||
return new List<SrvRecord>()
|
||||
{
|
||||
new SrvRecord(new ResourceRecordInfo($"_grpclb._tcp.{serviceHostName}", ResourceRecordType.SRV, QueryClass.IN, 30, 0), 0, 0, 80, DnsString.Parse($"10-1-6-120.{serviceHostName}")),
|
||||
new SrvRecord(new ResourceRecordInfo($"_grpclb._tcp.{serviceHostName}", ResourceRecordType.SRV, QueryClass.IN, 30, 0), 0, 0, 80, DnsString.Parse($"10-1-6-121.{serviceHostName}"))
|
||||
};
|
||||
}
|
||||
|
||||
private List<ARecord> GetServersARecords(string serviceHostName)
|
||||
{
|
||||
return new List<ARecord>()
|
||||
{
|
||||
new ARecord(new ResourceRecordInfo(serviceHostName, ResourceRecordType.A, QueryClass.IN, 30, 0), IPAddress.Parse("10.1.5.211")),
|
||||
new ARecord(new ResourceRecordInfo(serviceHostName, ResourceRecordType.A, QueryClass.IN, 30, 0), IPAddress.Parse("10.1.5.212")),
|
||||
new ARecord(new ResourceRecordInfo(serviceHostName, ResourceRecordType.A, QueryClass.IN, 30, 0), IPAddress.Parse("10.1.5.213"))
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using Xunit;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Tests.ResolverPlugins
|
||||
{
|
||||
public sealed class NoneResolverPluginTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task ForTarget_UseNoneResolverPlugin_ReturnResolutionResultWithTheSameValue()
|
||||
{
|
||||
// Arrange
|
||||
var resolverPlugin = new NoneResolverPlugin();
|
||||
|
||||
// Act
|
||||
var resolutionResult = await resolverPlugin.StartNameResolutionAsync(new Uri("https://sample.host.com"));
|
||||
|
||||
// Assert
|
||||
Assert.Single(resolutionResult);
|
||||
Assert.Equal("sample.host.com", resolutionResult[0].Host);
|
||||
Assert.Equal(443, resolutionResult[0].Port);
|
||||
Assert.False(resolutionResult[0].IsLoadBalancer);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForTargetWithDnsScheme_UseNoneResolverPlugin_ThrowArgumentException()
|
||||
{
|
||||
// Arrange
|
||||
var resolverPlugin = new NoneResolverPlugin();
|
||||
|
||||
// Act
|
||||
// Assert
|
||||
await Assert.ThrowsAsync<ArgumentException>(async () =>
|
||||
{
|
||||
await resolverPlugin.StartNameResolutionAsync(new Uri("dns://sample.host.com"));
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
using Grpc.Net.Client.LoadBalancing.ResolverPlugins;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using Xunit;
|
||||
|
||||
namespace Grpc.Net.Client.LoadBalancing.Tests.ResolverPlugins
|
||||
{
|
||||
public sealed class StaticResolverPluginTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task ForStaticResolutionFunction_UseStaticResolverPlugin_ReturnPredefinedValues()
|
||||
{
|
||||
// Arrange
|
||||
Func<Uri, List<GrpcNameResolutionResult>> resolveFunction = (uri) =>
|
||||
{
|
||||
return new List<GrpcNameResolutionResult>()
|
||||
{
|
||||
new GrpcNameResolutionResult("10.1.5.212", 8080),
|
||||
new GrpcNameResolutionResult("10.1.5.213", 8080)
|
||||
};
|
||||
};
|
||||
var resolverPlugin = new StaticResolverPlugin(resolveFunction);
|
||||
|
||||
// Act
|
||||
var resolutionResult = await resolverPlugin.StartNameResolutionAsync(new Uri("https://sample.host.com"));
|
||||
|
||||
// Assert
|
||||
Assert.Equal(2, resolutionResult.Count);
|
||||
Assert.Equal("10.1.5.212", resolutionResult[0].Host);
|
||||
Assert.Equal("10.1.5.213", resolutionResult[1].Host);
|
||||
Assert.Equal(8080, resolutionResult[0].Port);
|
||||
Assert.Equal(8080, resolutionResult[1].Port);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -63,17 +63,17 @@ namespace Grpc.Net.Client.Tests
|
|||
|
||||
var logs = testSink.Writes.Where(w => w.LogLevel >= Microsoft.Extensions.Logging.LogLevel.Debug).ToList();
|
||||
|
||||
Assert.AreEqual("Starting gRPC call. Method type: 'Unary', URI: 'https://localhost/ServiceName/MethodName'.", logs[0].State.ToString());
|
||||
AssertScope(logs[0]);
|
||||
Assert.AreEqual("Starting gRPC call. Method type: 'Unary', URI: 'https://localhost/ServiceName/MethodName'.", logs[4].State.ToString());
|
||||
AssertScope(logs[4]);
|
||||
|
||||
Assert.AreEqual("Sending message.", logs[1].State.ToString());
|
||||
AssertScope(logs[1]);
|
||||
Assert.AreEqual("Sending message.", logs[5].State.ToString());
|
||||
AssertScope(logs[5]);
|
||||
|
||||
Assert.AreEqual("Reading message.", logs[2].State.ToString());
|
||||
AssertScope(logs[2]);
|
||||
Assert.AreEqual("Reading message.", logs[6].State.ToString());
|
||||
AssertScope(logs[6]);
|
||||
|
||||
Assert.AreEqual("Finished gRPC call.", logs[3].State.ToString());
|
||||
AssertScope(logs[3]);
|
||||
Assert.AreEqual("Finished gRPC call.", logs[7].State.ToString());
|
||||
AssertScope(logs[7]);
|
||||
|
||||
static void AssertScope(WriteContext log)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue