Compare commits

...

15 Commits

Author SHA1 Message Date
Paweł Wichary e7abb90957
Fix receive ServerList after InitialLoadBalanceResponse and use interval 2020-03-30 14:47:12 +02:00
Paweł Wichary 415410bc81
Send service name to load balancer
Based on java implementation
2020-03-30 12:46:22 +02:00
Paweł Wichary 1ad9bca016
Refactor report load 2020-03-30 10:30:56 +02:00
Paweł Wichary 86e3724224
Add test, ensure client is disposed 2020-03-30 09:35:34 +02:00
Paweł Wichary fa1ddabf04
Report load initial implementation 2020-03-29 20:55:55 +02:00
Paweł Wichary c952c57e31
Keep connection loadbalancer open for grpclb policy and dispose 2020-03-29 19:45:31 +02:00
Paweł Wichary 36f191f27c
Make subchannels owned by policies
Based on java implementation
2020-03-29 17:58:52 +02:00
Paweł Wichary 8ed7f0f6d7
Changes to searching dns records
Based on java implementation
2020-03-29 12:50:14 +02:00
Paweł Wichary c299221e32
Disable searching for TXT records by default
Based on java implementation
2020-03-29 12:29:38 +02:00
Paweł Wichary aa389f70d4
Enable subchanels in ClientFactory 2020-03-28 20:21:49 +01:00
Paweł Wichary 62e92fe42a
Add tests for load balancing implementation 2020-03-28 15:32:08 +01:00
Paweł Wichary e03990a16a
Start selecting subChannels from first element and fix parsing issue 2020-03-28 15:32:08 +01:00
Paweł Wichary 6105cfe489
Resolve failing logging test, add project to solution 2020-03-28 15:32:08 +01:00
Paweł Wichary ec478afa4b
Documentation for public types 2020-03-28 15:32:08 +01:00
Paweł Wichary 12571ad852
Implement load balancing policies 2020-03-28 15:32:08 +01:00
30 changed files with 3789 additions and 21 deletions

View File

@ -114,6 +114,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.AspNetCore.Web", "src\
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.AspNetCore.HealthChecks", "src\Grpc.AspNetCore.HealthChecks\Grpc.AspNetCore.HealthChecks.csproj", "{39A9F2B5-2541-423E-83C9-46C7BFF53F41}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Grpc.Net.Client.LoadBalancing", "src\Grpc.Net.Client.LoadBalancing\Grpc.Net.Client.LoadBalancing.csproj", "{0F42C32D-BAE2-451B-8BE9-17BF995AE101}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Grpc.Net.Client.LoadBalancing.Tests", "test\Grpc.Net.Client.LoadBalancing.Tests\Grpc.Net.Client.LoadBalancing.Tests.csproj", "{78564304-153B-41CF-94F3-B34C5700D9F4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -224,6 +228,14 @@ Global
{39A9F2B5-2541-423E-83C9-46C7BFF53F41}.Debug|Any CPU.Build.0 = Debug|Any CPU
{39A9F2B5-2541-423E-83C9-46C7BFF53F41}.Release|Any CPU.ActiveCfg = Release|Any CPU
{39A9F2B5-2541-423E-83C9-46C7BFF53F41}.Release|Any CPU.Build.0 = Release|Any CPU
{0F42C32D-BAE2-451B-8BE9-17BF995AE101}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0F42C32D-BAE2-451B-8BE9-17BF995AE101}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0F42C32D-BAE2-451B-8BE9-17BF995AE101}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0F42C32D-BAE2-451B-8BE9-17BF995AE101}.Release|Any CPU.Build.0 = Release|Any CPU
{78564304-153B-41CF-94F3-B34C5700D9F4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{78564304-153B-41CF-94F3-B34C5700D9F4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{78564304-153B-41CF-94F3-B34C5700D9F4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{78564304-153B-41CF-94F3-B34C5700D9F4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -264,6 +276,8 @@ Global
{429EB088-94FF-4F06-8E54-72157089C8C3} = {8C62055F-8CD7-4859-9001-634D544DF2AE}
{778DB6EE-E5B2-4875-A209-40010B5A3E21} = {8C62055F-8CD7-4859-9001-634D544DF2AE}
{39A9F2B5-2541-423E-83C9-46C7BFF53F41} = {8C62055F-8CD7-4859-9001-634D544DF2AE}
{0F42C32D-BAE2-451B-8BE9-17BF995AE101} = {8C62055F-8CD7-4859-9001-634D544DF2AE}
{78564304-153B-41CF-94F3-B34C5700D9F4} = {CECC4AE8-9C4E-4727-939B-517CC2E58D65}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CD5C2B19-49B4-480A-990C-36D98A719B07}

View File

@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Description>.NET client for gRPC</Description>
<PackageTags>gRPC RPC HTTP/2 Load Balancing</PackageTags>
<IsGrpcPublishedPackage>true</IsGrpcPublishedPackage>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<TargetFramework>netstandard2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.11.4" />
<PackageReference Include="DnsClient" Version="1.2.0" />
<PackageReference Include="System.Text.Json" Version="4.7.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Grpc.Net.Client\Grpc.Net.Client.csproj" />
</ItemGroup>
</Project>

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,137 @@
// <auto-generated>
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: Protos/load_balancer.proto
// </auto-generated>
// Original file comments:
// Copyright 2015 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file defines the GRPCLB LoadBalancing protocol.
//
// The canonical version of this proto can be found at
// https://github.com/grpc/grpc-proto/blob/master/grpc/lb/v1/load_balancer.proto
#pragma warning disable 0414, 1591
#region Designer generated code
using grpc = global::Grpc.Core;
namespace Grpc.Lb.V1 {
public static partial class LoadBalancer
{
static readonly string __ServiceName = "grpc.lb.v1.LoadBalancer";
static readonly grpc::Marshaller<global::Grpc.Lb.V1.LoadBalanceRequest> __Marshaller_grpc_lb_v1_LoadBalanceRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Lb.V1.LoadBalanceRequest.Parser.ParseFrom);
static readonly grpc::Marshaller<global::Grpc.Lb.V1.LoadBalanceResponse> __Marshaller_grpc_lb_v1_LoadBalanceResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Lb.V1.LoadBalanceResponse.Parser.ParseFrom);
static readonly grpc::Method<global::Grpc.Lb.V1.LoadBalanceRequest, global::Grpc.Lb.V1.LoadBalanceResponse> __Method_BalanceLoad = new grpc::Method<global::Grpc.Lb.V1.LoadBalanceRequest, global::Grpc.Lb.V1.LoadBalanceResponse>(
grpc::MethodType.DuplexStreaming,
__ServiceName,
"BalanceLoad",
__Marshaller_grpc_lb_v1_LoadBalanceRequest,
__Marshaller_grpc_lb_v1_LoadBalanceResponse);
/// <summary>Service descriptor</summary>
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
{
get { return global::Grpc.Lb.V1.LoadBalancerReflection.Descriptor.Services[0]; }
}
/// <summary>Base class for server-side implementations of LoadBalancer</summary>
[grpc::BindServiceMethod(typeof(LoadBalancer), "BindService")]
public abstract partial class LoadBalancerBase
{
/// <summary>
/// Bidirectional rpc to get a list of servers.
/// </summary>
/// <param name="requestStream">Used for reading requests from the client.</param>
/// <param name="responseStream">Used for sending responses back to the client.</param>
/// <param name="context">The context of the server-side call handler being invoked.</param>
/// <returns>A task indicating completion of the handler.</returns>
public virtual global::System.Threading.Tasks.Task BalanceLoad(grpc::IAsyncStreamReader<global::Grpc.Lb.V1.LoadBalanceRequest> requestStream, grpc::IServerStreamWriter<global::Grpc.Lb.V1.LoadBalanceResponse> responseStream, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
}
/// <summary>Client for LoadBalancer</summary>
public partial class LoadBalancerClient : grpc::ClientBase<LoadBalancerClient>
{
/// <summary>Creates a new client for LoadBalancer</summary>
/// <param name="channel">The channel to use to make remote calls.</param>
public LoadBalancerClient(grpc::ChannelBase channel) : base(channel)
{
}
/// <summary>Creates a new client for LoadBalancer that uses a custom <c>CallInvoker</c>.</summary>
/// <param name="callInvoker">The callInvoker to use to make remote calls.</param>
public LoadBalancerClient(grpc::CallInvoker callInvoker) : base(callInvoker)
{
}
/// <summary>Protected parameterless constructor to allow creation of test doubles.</summary>
protected LoadBalancerClient() : base()
{
}
/// <summary>Protected constructor to allow creation of configured clients.</summary>
/// <param name="configuration">The client configuration.</param>
protected LoadBalancerClient(ClientBaseConfiguration configuration) : base(configuration)
{
}
/// <summary>
/// Bidirectional rpc to get a list of servers.
/// </summary>
/// <param name="headers">The initial metadata to send with the call. This parameter is optional.</param>
/// <param name="deadline">An optional deadline for the call. The call will be cancelled if deadline is hit.</param>
/// <param name="cancellationToken">An optional token for canceling the call.</param>
/// <returns>The call object.</returns>
public virtual grpc::AsyncDuplexStreamingCall<global::Grpc.Lb.V1.LoadBalanceRequest, global::Grpc.Lb.V1.LoadBalanceResponse> BalanceLoad(grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return BalanceLoad(new grpc::CallOptions(headers, deadline, cancellationToken));
}
/// <summary>
/// Bidirectional rpc to get a list of servers.
/// </summary>
/// <param name="options">The options for the call.</param>
/// <returns>The call object.</returns>
public virtual grpc::AsyncDuplexStreamingCall<global::Grpc.Lb.V1.LoadBalanceRequest, global::Grpc.Lb.V1.LoadBalanceResponse> BalanceLoad(grpc::CallOptions options)
{
return CallInvoker.AsyncDuplexStreamingCall(__Method_BalanceLoad, null, options);
}
/// <summary>Creates a new instance of client from given <c>ClientBaseConfiguration</c>.</summary>
protected override LoadBalancerClient NewInstance(ClientBaseConfiguration configuration)
{
return new LoadBalancerClient(configuration);
}
}
/// <summary>Creates service definition that can be registered with a server</summary>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
public static grpc::ServerServiceDefinition BindService(LoadBalancerBase serviceImpl)
{
return grpc::ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_BalanceLoad, serviceImpl.BalanceLoad).Build();
}
/// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.</summary>
/// <param name="serviceBinder">Service methods will be bound by calling <c>AddMethod</c> on this object.</param>
/// <param name="serviceImpl">An object implementing the server-side handling logic.</param>
public static void BindService(grpc::ServiceBinderBase serviceBinder, LoadBalancerBase serviceImpl)
{
serviceBinder.AddMethod(__Method_BalanceLoad, serviceImpl == null ? null : new grpc::DuplexStreamingServerMethod<global::Grpc.Lb.V1.LoadBalanceRequest, global::Grpc.Lb.V1.LoadBalanceResponse>(serviceImpl.BalanceLoad));
}
}
}
#endregion

View File

@ -0,0 +1,18 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
namespace Grpc.Net.Client.LoadBalancing.Policies.Abstraction
{
/// <summary>
/// This abstraction was added to the code base to make policies easy to mock in testing scenarios.
/// </summary>
internal interface IAsyncDuplexStreamingCall<TRequest, TResponse> : IDisposable
{
public IAsyncStreamReader<TResponse> ResponseStream { get; }
public IClientStreamWriter<TRequest> RequestStream { get; }
public Task<Metadata> ResponseHeadersAsync { get; }
public Status GetStatus();
public Metadata GetTrailers();
}
}

View File

@ -0,0 +1,16 @@
using System;
using System.Threading;
using Grpc.Core;
using Grpc.Lb.V1;
namespace Grpc.Net.Client.LoadBalancing.Policies.Abstraction
{
/// <summary>
/// This abstraction was added to the code base to make policies easy to mock in testing scenarios.
/// </summary>
internal interface ILoadBalancerClient : IDisposable
{
public IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse> BalanceLoad(Metadata? headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default);
public IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse> BalanceLoad(CallOptions options);
}
}

View File

@ -0,0 +1,30 @@
using System.Threading.Tasks;
using Grpc.Core;
namespace Grpc.Net.Client.LoadBalancing.Policies.Abstraction
{
/// <summary>
/// This class wrap and delegate AsyncDuplexStreamingCall.
/// The reason why it was added described here <seealso cref="IAsyncDuplexStreamingCall{TRequest, TResponse}"/>
/// </summary>
internal sealed class WrappedAsyncDuplexStreamingCall<TRequest, TResponse> : IAsyncDuplexStreamingCall<TRequest, TResponse>
{
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _asyncDuplexStreamingCall;
public WrappedAsyncDuplexStreamingCall(AsyncDuplexStreamingCall<TRequest, TResponse> asyncDuplexStreamingCall)
{
_asyncDuplexStreamingCall = asyncDuplexStreamingCall;
}
public IAsyncStreamReader<TResponse> ResponseStream => _asyncDuplexStreamingCall.ResponseStream;
public IClientStreamWriter<TRequest> RequestStream => _asyncDuplexStreamingCall.RequestStream;
public Task<Metadata> ResponseHeadersAsync => _asyncDuplexStreamingCall.ResponseHeadersAsync;
public void Dispose() => _asyncDuplexStreamingCall.Dispose();
public Status GetStatus() => _asyncDuplexStreamingCall.GetStatus();
public Metadata GetTrailers() => _asyncDuplexStreamingCall.GetTrailers();
}
}

View File

@ -0,0 +1,38 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Grpc.Core;
using Grpc.Lb.V1;
namespace Grpc.Net.Client.LoadBalancing.Policies.Abstraction
{
/// <summary>
/// This class wrap and delegate LoadBalancerClient.
/// The reason why it was added described here <seealso cref="ILoadBalancerClient"/>
/// </summary>
internal sealed class WrappedLoadBalancerClient : ILoadBalancerClient
{
private readonly GrpcChannel _channelForLB;
private readonly LoadBalancer.LoadBalancerClient _loadBalancerClient;
public WrappedLoadBalancerClient(List<GrpcNameResolutionResult> resolutionResult, GrpcChannelOptions channelOptionsForLB)
{
_channelForLB = GrpcChannel.ForAddress($"http://{resolutionResult[0].Host}:{resolutionResult[0].Port}", channelOptionsForLB);
_loadBalancerClient = new LoadBalancer.LoadBalancerClient(_channelForLB);
}
public IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse> BalanceLoad(CallOptions options)
{
return new WrappedAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse>(_loadBalancerClient.BalanceLoad(options));
}
public IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse> BalanceLoad(Metadata? headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default)
{
return new WrappedAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse>(_loadBalancerClient.BalanceLoad(headers, deadline, cancellationToken));
}
public void Dispose()
{
_channelForLB.Dispose();
}
}
}

View File

@ -0,0 +1,190 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using Grpc.Lb.V1;
using System.Threading;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using System.Linq;
using Grpc.Net.Client.LoadBalancing.Policies.Abstraction;
using Google.Protobuf.WellKnownTypes;
namespace Grpc.Net.Client.LoadBalancing.Policies
{
/// <summary>
/// The load balancing policy creates a subchannel to each server address.
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
///
/// Official name of this policy is "grpclb". It is a implementation of an external load balancing also called lookaside or one-arm loadbalancing.
/// More: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md#external-load-balancing-service
/// </summary>
public sealed class GrpclbPolicy : IGrpcLoadBalancingPolicy
{
private TimeSpan _clientStatsReportInterval = TimeSpan.FromSeconds(10);
private bool _isSecureConnection = false;
private int _requestsCounter = 0;
private int _subChannelsSelectionCounter = -1;
private ILogger _logger = NullLogger.Instance;
private ILoggerFactory _loggerFactory = NullLoggerFactory.Instance;
private ILoadBalancerClient? _loadBalancerClient;
private IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse>? _balancingStreaming;
private Timer? _timer;
/// <summary>
/// LoggerFactory is configured (injected) when class is being instantiated.
/// </summary>
public ILoggerFactory LoggerFactory
{
set
{
_loggerFactory = value;
_logger = value.CreateLogger<GrpclbPolicy>();
}
}
internal bool Disposed { get; private set; }
internal IReadOnlyList<GrpcSubChannel> SubChannels { get; set; } = Array.Empty<GrpcSubChannel>();
/// <summary>
/// Property created for testing purposes, allows setter injection
/// </summary>
internal ILoadBalancerClient? OverrideLoadBalancerClient { private get; set; }
/// <summary>
/// Creates a subchannel to each server address. Depending on policy this may require additional
/// steps eg. reaching out to lookaside loadbalancer.
/// </summary>
/// <param name="resolutionResult">Resolved list of servers and/or lookaside load balancers.</param>
/// <param name="serviceName">The name of the load balanced service (e.g., service.googleapis.com).</param>
/// <param name="isSecureConnection">Flag if connection between client and destination server should be secured.</param>
/// <returns>List of subchannels.</returns>
public async Task CreateSubChannelsAsync(List<GrpcNameResolutionResult> resolutionResult, string serviceName, bool isSecureConnection)
{
if (resolutionResult == null)
{
throw new ArgumentNullException(nameof(resolutionResult));
}
if (string.IsNullOrWhiteSpace(serviceName))
{
throw new ArgumentException($"{nameof(serviceName)} not defined");
}
resolutionResult = resolutionResult.Where(x => x.IsLoadBalancer).ToList();
if (resolutionResult.Count == 0)
{
throw new ArgumentException($"{nameof(resolutionResult)} must contain at least one blancer address");
}
_isSecureConnection = isSecureConnection;
_logger.LogDebug($"Start grpclb policy");
_logger.LogDebug($"Start connection to external load balancer");
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
var channelOptionsForLB = new GrpcChannelOptions()
{
LoggerFactory = _loggerFactory
};
_loadBalancerClient = GetLoadBalancerClient(resolutionResult, channelOptionsForLB);
_balancingStreaming = _loadBalancerClient.BalanceLoad();
var initialRequest = new InitialLoadBalanceRequest() { Name = $"{serviceName}:{resolutionResult[0].Port}" };
await _balancingStreaming.RequestStream.WriteAsync(new LoadBalanceRequest() { InitialRequest = initialRequest }).ConfigureAwait(false);
await _balancingStreaming.ResponseStream.MoveNext(CancellationToken.None).ConfigureAwait(false);
if (_balancingStreaming.ResponseStream.Current.LoadBalanceResponseTypeCase != LoadBalanceResponse.LoadBalanceResponseTypeOneofCase.InitialResponse)
{
throw new InvalidOperationException("InitialLoadBalanceRequest was not followed by InitialLoadBalanceResponse");
}
var initialResponse = _balancingStreaming.ResponseStream.Current.InitialResponse;
_clientStatsReportInterval = initialResponse.ClientStatsReportInterval.ToTimeSpan();
await _balancingStreaming.ResponseStream.MoveNext(CancellationToken.None).ConfigureAwait(false);
if (_balancingStreaming.ResponseStream.Current.LoadBalanceResponseTypeCase != LoadBalanceResponse.LoadBalanceResponseTypeOneofCase.ServerList)
{
throw new InvalidOperationException("InitialLoadBalanceResponse was not followed by ServerList");
}
UpdateSubChannels(_balancingStreaming.ResponseStream.Current.ServerList);
_logger.LogDebug($"SubChannels list created");
_timer = new Timer(ReportClientStatsTimerAsync, null, _clientStatsReportInterval, _clientStatsReportInterval);
_logger.LogDebug($"Periodic ClientStats reporting enabled");
}
/// <summary>
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
/// </summary>
/// <returns>Selected subchannel.</returns>
public GrpcSubChannel GetNextSubChannel()
{
Interlocked.Increment(ref _requestsCounter);
return SubChannels[Interlocked.Increment(ref _subChannelsSelectionCounter) % SubChannels.Count];
}
// async void recommended by Stephen Cleary https://stackoverflow.com/questions/38917818/pass-async-callback-to-timer-constructor
private async void ReportClientStatsTimerAsync(object state)
{
await ReportClientStatsAsync().ConfigureAwait(false);
}
private async Task ReportClientStatsAsync()
{
var requestsCounter = Interlocked.Exchange(ref _requestsCounter, 0);
var clientStats = new ClientStats()
{
NumCallsStarted = requestsCounter,
NumCallsFinished = requestsCounter,
NumCallsFinishedKnownReceived = requestsCounter,
NumCallsFinishedWithClientFailedToSend = 0,
Timestamp = Timestamp.FromDateTime(DateTime.UtcNow)
};
await _balancingStreaming!.RequestStream.WriteAsync(new LoadBalanceRequest() { ClientStats = clientStats }).ConfigureAwait(false);
await _balancingStreaming.ResponseStream.MoveNext(CancellationToken.None).ConfigureAwait(false);
if (_balancingStreaming.ResponseStream.Current.LoadBalanceResponseTypeCase == LoadBalanceResponse.LoadBalanceResponseTypeOneofCase.ServerList)
{
UpdateSubChannels(_balancingStreaming.ResponseStream.Current.ServerList);
}
}
private void UpdateSubChannels(ServerList serverList)
{
var result = new List<GrpcSubChannel>();
foreach (var server in serverList.Servers)
{
var ipAddress = new IPAddress(server.IpAddress.ToByteArray()).ToString();
var uriBuilder = new UriBuilder();
uriBuilder.Host = ipAddress;
uriBuilder.Port = server.Port;
uriBuilder.Scheme = _isSecureConnection ? "https" : "http";
var uri = uriBuilder.Uri;
result.Add(new GrpcSubChannel(uri));
_logger.LogDebug($"Found a server {uri}");
}
SubChannels = result;
}
private ILoadBalancerClient GetLoadBalancerClient(List<GrpcNameResolutionResult> resolutionResult, GrpcChannelOptions channelOptionsForLB)
{
if(OverrideLoadBalancerClient != null)
{
return OverrideLoadBalancerClient;
}
return new WrappedLoadBalancerClient(resolutionResult, channelOptionsForLB);
}
/// <summary>
/// Releases the resources used by the <see cref="GrpclbPolicy"/> class.
/// </summary>
public void Dispose()
{
if (Disposed)
{
return;
}
try
{
_timer?.Dispose();
_balancingStreaming?.RequestStream.CompleteAsync().Wait(); // close request stream to complete gracefully
}
finally
{
_loadBalancerClient?.Dispose();
}
Disposed = true;
}
}
}

View File

@ -0,0 +1,88 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Net.Client.LoadBalancing.Policies
{
/// <summary>
/// The load balancing policy creates a subchannel to each server address.
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
///
/// Official name of this policy is "round_robin". It is a implementation of an balancing-aware client.
/// More: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md#balancing-aware-client
/// </summary>
public sealed class RoundRobinPolicy : IGrpcLoadBalancingPolicy
{
private int _subChannelsSelectionCounter = -1;
private ILogger _logger = NullLogger.Instance;
/// <summary>
/// LoggerFactory is configured (injected) when class is being instantiated.
/// </summary>
public ILoggerFactory LoggerFactory
{
set => _logger = value.CreateLogger<RoundRobinPolicy>();
}
internal IReadOnlyList<GrpcSubChannel> SubChannels { get; set; } = Array.Empty<GrpcSubChannel>();
/// <summary>
/// Creates a subchannel to each server address. Depending on policy this may require additional
/// steps eg. reaching out to lookaside loadbalancer.
/// </summary>
/// <param name="resolutionResult">Resolved list of servers and/or lookaside load balancers.</param>
/// <param name="serviceName">The name of the load balanced service (e.g., service.googleapis.com).</param>
/// <param name="isSecureConnection">Flag if connection between client and destination server should be secured.</param>
/// <returns>List of subchannels.</returns>
public Task CreateSubChannelsAsync(List<GrpcNameResolutionResult> resolutionResult, string serviceName, bool isSecureConnection)
{
if (resolutionResult == null)
{
throw new ArgumentNullException(nameof(resolutionResult));
}
if (string.IsNullOrWhiteSpace(serviceName))
{
throw new ArgumentException($"{nameof(serviceName)} not defined");
}
resolutionResult = resolutionResult.Where(x => !x.IsLoadBalancer).ToList();
if (resolutionResult.Count == 0)
{
throw new ArgumentException($"{nameof(resolutionResult)} must contain at least one non-blancer address");
}
_logger.LogDebug($"Start round_robin policy");
var result = resolutionResult.Select(x =>
{
var uriBuilder = new UriBuilder();
uriBuilder.Host = x.Host;
uriBuilder.Port = x.Port ?? (isSecureConnection ? 443 : 80);
uriBuilder.Scheme = isSecureConnection ? "https" : "http";
var uri = uriBuilder.Uri;
_logger.LogDebug($"Found a server {uri}");
return new GrpcSubChannel(uri);
}).ToList();
_logger.LogDebug($"SubChannels list created");
SubChannels = result;
return Task.CompletedTask;
}
/// <summary>
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
/// </summary>
/// <returns>Selected subchannel.</returns>
public GrpcSubChannel GetNextSubChannel()
{
return SubChannels[Interlocked.Increment(ref _subChannelsSelectionCounter) % SubChannels.Count];
}
/// <summary>
/// Releases the resources used by the <see cref="RoundRobinPolicy"/> class.
/// </summary>
public void Dispose()
{
}
}
}

View File

@ -0,0 +1,32 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Grpc.Net.Client.LoadBalancing.Tests,PublicKey=" +
"00240000048000009400000006020000002400005253413100040000010001002f5797a92c6fcde81bd4098f43" +
"0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +
"27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
"71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
// For Moq. This assembly needs access to internal types via InternalVisibleTo to be able to mock them
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602" +
"000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02ba" +
"a56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667" +
"bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6" +
"a7d3113e92484cf7045cc7")]

View File

@ -0,0 +1,170 @@
using DnsClient;
using DnsClient.Protocol;
using Grpc.Net.Client.LoadBalancing.ResolverPlugins.GrpcServiceConfig;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
namespace Grpc.Net.Client.LoadBalancing.ResolverPlugins
{
/// <summary>
/// Resolver plugin is responsible for name resolution by reaching the authority and return
/// a list of resolved addresses (both IP address and port) and a service config.
/// More: https://github.com/grpc/grpc/blob/master/doc/naming.md
/// </summary>
public sealed class DnsClientResolverPlugin : IGrpcResolverPlugin
{
private DnsClientResolverPluginOptions _options;
private ILogger _logger = NullLogger.Instance;
/// <summary>
/// LoggerFactory is configured (injected) when class is being instantiated.
/// </summary>
public ILoggerFactory LoggerFactory
{
set => _logger = value.CreateLogger<DnsClientResolverPlugin>();
}
/// <summary>
/// Property created for testing purposes, allows setter injection
/// </summary>
internal IDnsQuery? OverrideDnsClient { private get; set; }
/// <summary>
/// Creates a <seealso cref="DnsClientResolverPlugin"/> that is capable of searching SRV and TXT records.
/// </summary>
public DnsClientResolverPlugin() : this(new DnsClientResolverPluginOptions())
{
}
/// <summary>
/// Creates a <seealso cref="DnsClientResolverPlugin"/> that is capable of searching SRV and TXT records.
/// </summary>
/// <param name="options">Options allows override default behaviour.</param>
public DnsClientResolverPlugin(DnsClientResolverPluginOptions options)
{
_options = options;
}
/// <summary>
/// Name resolution for secified target.
/// </summary>
/// <param name="target">Server address with scheme.</param>
/// <returns>List of resolved servers and/or lookaside load balancers.</returns>
public async Task<List<GrpcNameResolutionResult>> StartNameResolutionAsync(Uri target)
{
if (target == null)
{
throw new ArgumentNullException(nameof(target));
}
if (!target.Scheme.Equals("dns", StringComparison.OrdinalIgnoreCase))
{
throw new ArgumentException($"{nameof(DnsClientResolverPlugin)} require dns:// scheme to set as target address");
}
var host = target.Host;
var dnsClient = GetDnsClient();
if (!_options.DisableTxtServiceConfig)
{
var serviceConfigDnsQuery = $"_grpc_config.{host}";
_logger.LogDebug($"Start TXT lookup for {serviceConfigDnsQuery}");
var txtRecords = (await dnsClient.QueryAsync(serviceConfigDnsQuery, QueryType.TXT).ConfigureAwait(false)).Answers.OfType<TxtRecord>().ToArray();
_logger.LogDebug($"Number of TXT records found: {txtRecords.Length}");
var grpcConfigs = txtRecords.SelectMany(x => x.Text).Where(IsGrpcConfigTxtRecord).ToArray();
_logger.LogDebug($"Number of grpc_configs found: {grpcConfigs.Length}");
if(grpcConfigs.Length != 0 && TryParseGrpcConfig(grpcConfigs[0], out var serviceConfigs))
{
_logger.LogDebug($"First grpc_config is selected " + grpcConfigs[0]);
_logger.LogDebug($"Parsing JSON grpc_config into service config success");
var serviceConfig = serviceConfigs[0];
_logger.LogDebug($"Service config defines policies: {string.Join(',', serviceConfig.GetLoadBalancingPolicies())}");
}
else
{
_logger.LogDebug($"Parsing JSON grpc_config into service config failed, loading service config is skipped");
}
}
var balancingDnsQuery = $"_grpclb._tcp.{host}";
var serversDnsQuery = host;
_logger.LogDebug($"Start SRV lookup for {balancingDnsQuery}");
_logger.LogDebug($"Start A lookup for {serversDnsQuery}");
var balancingDnsQueryTask = dnsClient.QueryAsync(balancingDnsQuery, QueryType.SRV);
var serversDnsQueryTask = dnsClient.QueryAsync(serversDnsQuery, QueryType.A);
await Task.WhenAll(balancingDnsQueryTask, serversDnsQueryTask).ConfigureAwait(false);
var results = balancingDnsQueryTask.Result.Answers.OfType<SrvRecord>().Select(x => ParseSrvRecord(x, true))
.Union(serversDnsQueryTask.Result.Answers.OfType<ARecord>().Select(x => ParseARecord(x, target.Port, false))).ToList();
if (results.Count == 0)
{
_logger.LogDebug($"Not found any DNS records");
return new List<GrpcNameResolutionResult>();
}
return results;
}
private IDnsQuery GetDnsClient()
{
if (OverrideDnsClient != null)
{
return OverrideDnsClient;
}
if (_options.NameServers.Length == 0)
{
return new LookupClient();
}
else
{
_logger.LogDebug($"Override DNS name servers using options");
return new LookupClient(_options.NameServers);
}
}
private static bool IsGrpcConfigTxtRecord(string txtRecordText)
{
return txtRecordText.StartsWith("grpc_config=", StringComparison.InvariantCultureIgnoreCase);
}
private static bool TryParseGrpcConfig(string txtRecordText, out ServiceConfig[] serviceConfigs)
{
try
{
var options = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
var txtRecordValue = txtRecordText.Substring(12); // remove txt key -> grpc_config=
serviceConfigs = JsonSerializer.Deserialize<GrpcConfig[]>(txtRecordValue, options)
.Select(x => x.ServiceConfig).ToArray();
return true;
}
catch (Exception)
{
serviceConfigs = Array.Empty<ServiceConfig>();
return false;
}
}
private GrpcNameResolutionResult ParseSrvRecord(SrvRecord srvRecord, bool isLoadBalancer)
{
_logger.LogDebug($"Found a SRV record {srvRecord.ToString()}");
return new GrpcNameResolutionResult(srvRecord.Target)
{
Port = srvRecord.Port,
IsLoadBalancer = isLoadBalancer,
Priority = srvRecord.Priority,
Weight = srvRecord.Weight
};
}
private GrpcNameResolutionResult ParseARecord(ARecord aRecord, int port, bool isLoadBalancer)
{
_logger.LogDebug($"Found a A record {aRecord.ToString()}");
return new GrpcNameResolutionResult(aRecord.Address.ToString())
{
Port = port,
IsLoadBalancer = isLoadBalancer,
Priority = 0,
Weight = 0
};
}
}
}

View File

@ -0,0 +1,31 @@
using System;
using System.Net;
namespace Grpc.Net.Client.LoadBalancing.ResolverPlugins
{
/// <summary>
/// An options class for configuring a <see cref="DnsClientResolverPlugin"/>.
/// </summary>
public sealed class DnsClientResolverPluginOptions
{
/// <summary>
/// Allows override dns nameservers used during lookup. Default value is an empty list.
/// If an empty list is specified client defaults to machine list of nameservers.
/// </summary>
public IPEndPoint[] NameServers { get; set; }
/// <summary>
/// Allows disabling service config lookup. Default value false.
/// </summary>
public bool DisableTxtServiceConfig { get; set; }
/// <summary>
/// Creates a <seealso cref="DnsClientResolverPluginOptions"/> options class with default values.
/// </summary>
public DnsClientResolverPluginOptions()
{
NameServers = Array.Empty<IPEndPoint>();
DisableTxtServiceConfig = true;
}
}
}

View File

@ -0,0 +1,84 @@
#pragma warning disable CA1812 // Classes in this file are used for deserialization
using System;
using System.Collections.Generic;
using System.Linq;
namespace Grpc.Net.Client.LoadBalancing.ResolverPlugins.GrpcServiceConfig
{
// based on: https://github.com/grpc/proposal/blob/master/A2-service-configs-in-dns.md
internal sealed class GrpcConfig
{
public ServiceConfig ServiceConfig { get; set; } = new ServiceConfig();
}
//based on: https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto
internal sealed class ServiceConfig
{
// This field is deprecated but currently widely used
public string LoadBalancingPolicy { get; set; } = string.Empty;
public List<LoadBalancingConfig> LoadBalancingConfig { get; set; } = new List<LoadBalancingConfig>();
public string[] GetLoadBalancingPolicies()
{
if (LoadBalancingConfig.Count != 0)
{
return LoadBalancingConfig.Select(x => x.GetPolicyName()).ToArray();
}
if (LoadBalancingPolicy != string.Empty)
{
return new string[] { LoadBalancingPolicy };
}
else
{
throw new InvalidOperationException("Invalid ServiceConfig, load balancing policy must be specified");
}
}
}
internal sealed class LoadBalancingConfig
{
public PickFirstConfig? PickFirst { get; set; }
public RoundRobinConfig? RoundRobin { get; set; }
public GrpcLbConfig? Grpclb { get; set; }
//XDS policy config should be added here in the future
public string GetPolicyName()
{
// according to proto file only one configuration can be specified
return Grpclb?.ToString() ?? RoundRobin?.ToString() ?? PickFirst?.ToString()
?? throw new InvalidOperationException("Load balancing config without policy defined");
}
}
internal sealed class PickFirstConfig
{
//This should be left empty, see service_config.proto file
public override string ToString()
{
return "pick_first";
}
}
internal sealed class RoundRobinConfig
{
//This should be left empty, see service_config.proto file
public override string ToString()
{
return "round_robin";
}
}
internal sealed class GrpcLbConfig
{
public List<LoadBalancingConfig>? ChildPolicy { get; set; }
public string? ServiceName { get; set; }
public override string ToString()
{
return "grpclb";
}
}
}

View File

@ -0,0 +1,51 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Grpc.Net.Client.LoadBalancing.ResolverPlugins
{
/// <summary>
/// Resolver plugin is responsible for name resolution by reaching the authority and return
/// a list of resolved addresses (both IP address and port) and a service config.
/// More: https://github.com/grpc/grpc/blob/master/doc/naming.md
/// </summary>
public sealed class StaticResolverPlugin : IGrpcResolverPlugin
{
private readonly Func<Uri, List<GrpcNameResolutionResult>> _staticNameResolution;
private ILogger _logger = NullLogger.Instance;
/// <summary>
/// LoggerFactory is configured (injected) when class is being instantiated.
/// </summary>
public ILoggerFactory LoggerFactory
{
set => _logger = value.CreateLogger<StaticResolverPlugin>();
}
/// <summary>
/// Creates a <seealso cref="StaticResolverPlugin"/> with configation passed as function parameter.
/// </summary>
/// <param name="staticNameResolution"></param>
public StaticResolverPlugin(Func<Uri, List<GrpcNameResolutionResult>> staticNameResolution)
{
if(staticNameResolution == null)
{
throw new ArgumentNullException(nameof(staticNameResolution));
}
_staticNameResolution = staticNameResolution;
}
/// <summary>
/// Name resolution for secified target.
/// </summary>
/// <param name="target">Server address with scheme.</param>
/// <returns>List of resolved servers and/or lookaside load balancers.</returns>
public Task<List<GrpcNameResolutionResult>> StartNameResolutionAsync(Uri target)
{
_logger.LogDebug($"Using static name resolution");
return Task.FromResult(_staticNameResolution(target));
}
}
}

View File

@ -38,8 +38,8 @@ namespace Grpc.Net.Client
{
internal const int DefaultMaxReceiveMessageSize = 1024 * 1024 * 4; // 4 MB
private readonly ConcurrentDictionary<IMethod, GrpcMethodInfo> _methodInfoCache;
private readonly Func<IMethod, GrpcMethodInfo> _createMethodInfoFunc;
private readonly ConcurrentDictionary<IMethod, GrpcCallScope> _callScopeCache;
private readonly Func<IMethod, GrpcCallScope> _createCallScopeFunc;
internal Uri Address { get; }
internal HttpClient HttpClient { get; }
@ -51,6 +51,8 @@ namespace Grpc.Net.Client
internal List<CallCredentials>? CallCredentials { get; }
internal Dictionary<string, ICompressionProvider> CompressionProviders { get; }
internal string MessageAcceptEncoding { get; }
internal IGrpcResolverPlugin ResolverPlugin { get; }
internal IGrpcLoadBalancingPolicy LoadBalancingPolicy { get; }
internal bool Disposed { get; private set; }
// Timing related options that are set in unit tests
internal ISystemClock Clock = SystemClock.Instance;
@ -61,7 +63,7 @@ namespace Grpc.Net.Client
internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(address.Authority)
{
_methodInfoCache = new ConcurrentDictionary<IMethod, GrpcMethodInfo>();
_callScopeCache = new ConcurrentDictionary<IMethod, GrpcCallScope>();
// Dispose the HttpClient if...
// 1. No client was specified and so the channel created the HttpClient itself
@ -76,7 +78,7 @@ namespace Grpc.Net.Client
MessageAcceptEncoding = GrpcProtocolHelpers.GetMessageAcceptEncoding(CompressionProviders);
LoggerFactory = channelOptions.LoggerFactory ?? NullLoggerFactory.Instance;
ThrowOperationCanceledOnCancellation = channelOptions.ThrowOperationCanceledOnCancellation;
_createMethodInfoFunc = CreateMethodInfo;
_createCallScopeFunc = CreateCallScope;
if (channelOptions.Credentials != null)
{
@ -88,6 +90,15 @@ namespace Grpc.Net.Client
ValidateChannelCredentials();
}
ResolverPlugin = channelOptions.ResolverPlugin;
ResolverPlugin.LoggerFactory = LoggerFactory;
LoadBalancingPolicy = channelOptions.LoadBalancingPolicy;
LoadBalancingPolicy.LoggerFactory = LoggerFactory;
var resolutionResult = ResolverPlugin.StartNameResolutionAsync(Address).GetAwaiter().GetResult();
var isSecureConnection = Address.Scheme == Uri.UriSchemeHttps || (Address.Scheme.Equals("dns", StringComparison.OrdinalIgnoreCase) && Address.Port == 443);
LoadBalancingPolicy.CreateSubChannelsAsync(resolutionResult, Address.Host, Address.Scheme == Uri.UriSchemeHttps).Wait();
}
private static HttpClient CreateInternalHttpClient()
@ -107,17 +118,15 @@ namespace Grpc.Net.Client
return httpClient;
}
internal GrpcMethodInfo GetCachedGrpcMethodInfo(IMethod method)
internal GrpcCallScope GetCachedGrpcCallScope(IMethod method)
{
return _methodInfoCache.GetOrAdd(method, _createMethodInfoFunc);
return _callScopeCache.GetOrAdd(method, _createCallScopeFunc);
}
private GrpcMethodInfo CreateMethodInfo(IMethod method)
private GrpcCallScope CreateCallScope(IMethod method)
{
var uri = new Uri(method.FullName, UriKind.Relative);
var scope = new GrpcCallScope(method.Type, uri);
return new GrpcMethodInfo(scope, new Uri(Address, uri));
return new GrpcCallScope(method.Type, uri);
}
private static Dictionary<string, ICompressionProvider> ResolveCompressionProviders(IList<ICompressionProvider>? compressionProviders)
@ -165,7 +174,7 @@ namespace Grpc.Net.Client
{
throw new ObjectDisposedException(nameof(GrpcChannel));
}
var invoker = new HttpClientCallInvoker(this);
return invoker;
@ -275,6 +284,8 @@ namespace Grpc.Net.Client
return;
}
LoadBalancingPolicy.Dispose();
if (_shouldDisposeHttpClient)
{
HttpClient.Dispose();

View File

@ -96,12 +96,24 @@ namespace Grpc.Net.Client
/// </summary>
public bool ThrowOperationCanceledOnCancellation { get; set; }
/// <summary>
/// Gets or sets name resolver.
/// </summary>
public IGrpcResolverPlugin ResolverPlugin { get; set; }
/// <summary>
/// Gets or sets load balancing policy.
/// </summary>
public IGrpcLoadBalancingPolicy LoadBalancingPolicy { get; set; }
/// <summary>
/// Initializes a new instance of the <see cref="GrpcChannelOptions"/> class.
/// </summary>
public GrpcChannelOptions()
{
MaxReceiveMessageSize = GrpcChannel.DefaultMaxReceiveMessageSize;
LoadBalancingPolicy = new PickFirstPolicy();
ResolverPlugin = new NoneResolverPlugin();
}
}
}

View File

@ -0,0 +1,198 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Grpc.Net.Client
{
/// <summary>
/// Resolver plugin is responsible for name resolution by reaching the authority and return
/// a list of resolved addresses (both IP address and port) and a service config.
/// More: https://github.com/grpc/grpc/blob/master/doc/naming.md
/// </summary>
public interface IGrpcResolverPlugin
{
/// <summary>
/// LoggerFactory is configured (injected) when class is being instantiated.
/// </summary>
ILoggerFactory LoggerFactory { set; }
/// <summary>
/// Name resolution for secified target.
/// </summary>
/// <param name="target">Server address with scheme.</param>
/// <returns>List of resolved servers and/or lookaside load balancers.</returns>
Task<List<GrpcNameResolutionResult>> StartNameResolutionAsync(Uri target);
}
/// <summary>
/// The load balancing policy creates a subchannel to each server address.
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
/// </summary>
public interface IGrpcLoadBalancingPolicy : IDisposable
{
/// <summary>
/// LoggerFactory is configured (injected) when class is being instantiated.
/// </summary>
ILoggerFactory LoggerFactory { set; }
/// <summary>
/// Creates a subchannel to each server address. Depending on policy this may require additional
/// steps eg. reaching out to lookaside loadbalancer.
/// </summary>
/// <param name="resolutionResult">Resolved list of servers and/or lookaside load balancers.</param>
/// <param name="serviceName">The name of the load balanced service (e.g., service.googleapis.com).</param>
/// <param name="isSecureConnection">Flag if connection between client and destination server should be secured.</param>
/// <returns>List of subchannels.</returns>
Task CreateSubChannelsAsync(List<GrpcNameResolutionResult> resolutionResult, string serviceName, bool isSecureConnection);
/// <summary>
/// For each RPC sent, the load balancing policy decides which subchannel (i.e., which server) the RPC should be sent to.
/// </summary>
/// <returns>Selected subchannel.</returns>
GrpcSubChannel GetNextSubChannel();
}
/// <summary>
/// Resolved address of server or lookaside load balancer.
/// </summary>
public sealed class GrpcNameResolutionResult
{
/// <summary>
/// Host address.
/// </summary>
public string Host { get; set; }
/// <summary>
/// Port.
/// </summary>
public int? Port { get; set; } = null;
/// <summary>
/// Flag that indicate if machine is load balancer or service.
/// </summary>
public bool IsLoadBalancer { get; set; } = false;
/// <summary>
/// Priority value, which was obtained from SRV record, for this Host. Default value zero.
/// </summary>
public int Priority { get; set; } = 0;
/// <summary>
/// Weight value, which was obtained from SRV record, for this Host. Default value zero.
/// </summary>
public int Weight { get; set; } = 0;
/// <summary>
/// Creates a <see cref="GrpcNameResolutionResult"/> with host and unassigned port.
/// </summary>
/// <param name="host">Host address of machine.</param>
/// <param name="port">Machine port.</param>
public GrpcNameResolutionResult(string host, int? port = null)
{
Host = host;
Port = port;
}
}
/// <summary>
/// Address of server that can handle requests for RPC.
/// </summary>
public sealed class GrpcSubChannel
{
/// <summary>
/// Gets the server address in Uri form.
/// </summary>
public Uri Address { get; }
/// <summary>
/// Creates a <see cref="GrpcSubChannel"/> object with subchannel address.
/// </summary>
/// <param name="address"></param>
public GrpcSubChannel(Uri address)
{
Address = address;
}
}
/// <summary>
/// Assume name was already resolved or pass through to HttpClient to handle
/// </summary>
internal sealed class NoneResolverPlugin : IGrpcResolverPlugin
{
private ILogger _logger = NullLogger.Instance;
public ILoggerFactory LoggerFactory
{
set => _logger = value.CreateLogger<NoneResolverPlugin>();
}
public Task<List<GrpcNameResolutionResult>> StartNameResolutionAsync(Uri target)
{
if (target == null)
{
throw new ArgumentNullException(nameof(target));
}
if (target.Scheme.Equals("dns", StringComparison.OrdinalIgnoreCase))
{
throw new ArgumentException("dns:// scheme require non-default name resolver in channelOptions.ResolverPlugin");
}
_logger.LogDebug($"Name resolver using defined target as name resolution");
return Task.FromResult(new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult(target.Host, target.Port)
});
}
}
internal sealed class PickFirstPolicy : IGrpcLoadBalancingPolicy
{
private ILogger _logger = NullLogger.Instance;
public ILoggerFactory LoggerFactory
{
set => _logger = value.CreateLogger<PickFirstPolicy>();
}
internal IReadOnlyList<GrpcSubChannel> SubChannels { get; set; } = Array.Empty<GrpcSubChannel>();
public Task CreateSubChannelsAsync(List<GrpcNameResolutionResult> resolutionResult, string serviceName, bool isSecureConnection)
{
if (resolutionResult == null)
{
throw new ArgumentNullException(nameof(resolutionResult));
}
if (string.IsNullOrWhiteSpace(serviceName))
{
throw new ArgumentException($"{nameof(serviceName)} not defined");
}
resolutionResult = resolutionResult.Where(x => !x.IsLoadBalancer).ToList();
if (resolutionResult.Count == 0)
{
throw new ArgumentException($"{nameof(resolutionResult)} must contain at least one non-blancer address");
}
_logger.LogDebug($"Start first_pick policy");
var uriBuilder = new UriBuilder();
uriBuilder.Host = resolutionResult[0].Host;
uriBuilder.Port = resolutionResult[0].Port ?? (isSecureConnection ? 443 : 80);
uriBuilder.Scheme = isSecureConnection ? "https" : "http";
var uri = uriBuilder.Uri;
var result = new List<GrpcSubChannel> {
new GrpcSubChannel(uri)
};
_logger.LogDebug($"Found a server {uri}");
_logger.LogDebug($"SubChannels list created");
SubChannels = result;
return Task.CompletedTask;
}
public GrpcSubChannel GetNextSubChannel()
{
return SubChannels[0];
}
public void Dispose()
{
}
}
}

View File

@ -38,6 +38,8 @@ namespace Grpc.Net.Client.Internal
_uri = uri;
}
public Uri Uri => _uri;
public KeyValuePair<string, object> this[int index]
{
get

View File

@ -129,7 +129,9 @@ namespace Grpc.Net.Client.Internal
throw new ObjectDisposedException(nameof(GrpcChannel));
}
var methodInfo = Channel.GetCachedGrpcMethodInfo(method);
var subchannel = Channel.LoadBalancingPolicy.GetNextSubChannel();
var scope = Channel.GetCachedGrpcCallScope(method);
var methodInfo = new GrpcMethodInfo(scope, new Uri(subchannel.Address, scope.Uri));
var call = new GrpcCall<TRequest, TResponse>(method, methodInfo, options, Channel);
return call;

View File

@ -30,6 +30,12 @@ using System.Runtime.CompilerServices;
"27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
"71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
[assembly: InternalsVisibleTo("Grpc.Net.Client.LoadBalancing.Tests,PublicKey=" +
"00240000048000009400000006020000002400005253413100040000010001002f5797a92c6fcde81bd4098f43" +
"0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +
"27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
"71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
[assembly: InternalsVisibleTo("Grpc.Net.ClientFactory.Tests,PublicKey=" +
"00240000048000009400000006020000002400005253413100040000010001002f5797a92c6fcde81bd4098f43" +
"0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +

View File

@ -305,7 +305,11 @@ namespace Microsoft.Extensions.DependencyInjection
var os = s.GetRequiredService<IOptionsMonitor<GrpcClientFactoryOptions>>();
var clientOptions = os.Get(name);
httpClient.BaseAddress = clientOptions.Address;
// do not set shared baseaddress for subchannels
if (clientOptions.Address?.Scheme != "dns")
{
httpClient.BaseAddress = clientOptions.Address;
}
// Long running server and duplex streaming gRPC requests may not
// return any messages for over 100 seconds, triggering a cancellation

View File

@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
<PackageReference Include="xunit" Version="2.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
<PackageReference Include="coverlet.collector" Version="1.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Grpc.Net.Client\Grpc.Net.Client.csproj" />
<ProjectReference Include="..\..\src\Grpc.Net.Client.LoadBalancing\Grpc.Net.Client.LoadBalancing.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,250 @@
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Grpc.Lb.V1;
using Grpc.Net.Client.LoadBalancing.Policies;
using Grpc.Net.Client.LoadBalancing.Policies.Abstraction;
using Moq;
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace Grpc.Net.Client.LoadBalancing.Tests.Policies
{
public sealed class GrpclbPolicyTests
{
[Fact]
public async Task ForEmptyServiceName_UseGrpclbPolicy_ThrowArgumentException()
{
// Arrange
using var policy = new GrpclbPolicy();
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.6.120", 80)
{
IsLoadBalancer = true
},
new GrpcNameResolutionResult("10.1.6.121", 80)
{
IsLoadBalancer = true
}
};
// Act
// Assert
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(resolutionResults, "", false);
});
Assert.Equal("serviceName not defined", exception.Message);
exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(resolutionResults, string.Empty, false);
});
Assert.Equal("serviceName not defined", exception.Message);
}
[Fact]
public async Task ForEmptyResolutionPassed_UseGrpclbPolicy_ThrowArgumentException()
{
// Arrange
using var policy = new GrpclbPolicy();
// Act
// Assert
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(new List<GrpcNameResolutionResult>(), "sample-service.contoso.com", false);
});
Assert.Equal("resolutionResult must contain at least one blancer address", exception.Message);
}
[Fact]
public async Task ForServersResolutionOnly_UseGrpclbPolicy_ThrowArgumentException()
{
// Arrange
using var policy = new GrpclbPolicy();
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.5.211", 80)
{
IsLoadBalancer = false
},
new GrpcNameResolutionResult("10.1.5.212", 80)
{
IsLoadBalancer = false
}
};
// Act
// Assert
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false); // non-balancers are ignored
});
Assert.Equal("resolutionResult must contain at least one blancer address", exception.Message);
}
[Fact]
public async Task ForResolutionResultWithBalancers_UseGrpclbPolicy_CreateSubchannelsForFoundServers()
{
// Arrange
var balancerClientMock = new Mock<ILoadBalancerClient>(MockBehavior.Strict);
var balancerStreamMock = new Mock<IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse>>(MockBehavior.Strict);
var requestStreamMock = new Mock<IClientStreamWriter<LoadBalanceRequest>>(MockBehavior.Loose);
balancerClientMock.Setup(x => x.Dispose());
balancerClientMock.Setup(x => x.BalanceLoad(null, null, It.IsAny<CancellationToken>()))
.Returns(balancerStreamMock.Object);
balancerStreamMock.Setup(x => x.RequestStream).Returns(requestStreamMock.Object);
balancerStreamMock.Setup(x => x.ResponseStream).Returns(new TestLoadBalancerResponse(new List<LoadBalanceResponse>
{
new LoadBalanceResponse()
{
InitialResponse = GetSampleInitialLoadBalanceResponse()
},
new LoadBalanceResponse()
{
ServerList = GetSampleServerList()
}
}));
using var policy = new GrpclbPolicy();
policy.OverrideLoadBalancerClient = balancerClientMock.Object;
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.6.120", 80) { IsLoadBalancer = true }
};
// Act
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false);
var subChannels = policy.SubChannels;
// Assert
Assert.Equal(3, subChannels.Count); // subChannels are created per results from GetSampleLoadBalanceResponse
Assert.All(subChannels, subChannel => Assert.Equal("http", subChannel.Address.Scheme));
Assert.All(subChannels, subChannel => Assert.Equal(80, subChannel.Address.Port));
Assert.All(subChannels, subChannel => Assert.StartsWith("10.1.5.", subChannel.Address.Host));
}
[Fact]
public async Task ForLoadBalancerClient_UseGrpclbPolicy_EnsureDisposedResources()
{
// Arrange
var balancerClientMock = new Mock<ILoadBalancerClient>(MockBehavior.Strict);
var balancerStreamMock = new Mock<IAsyncDuplexStreamingCall<LoadBalanceRequest, LoadBalanceResponse>>(MockBehavior.Strict);
var requestStreamMock = new Mock<IClientStreamWriter<LoadBalanceRequest>>(MockBehavior.Loose);
balancerClientMock.Setup(x => x.Dispose()).Verifiable();
balancerClientMock.Setup(x => x.BalanceLoad(null, null, It.IsAny<CancellationToken>()))
.Returns(balancerStreamMock.Object);
balancerStreamMock.Setup(x => x.RequestStream).Returns(requestStreamMock.Object);
balancerStreamMock.Setup(x => x.ResponseStream).Returns(new TestLoadBalancerResponse(new List<LoadBalanceResponse>
{
new LoadBalanceResponse()
{
InitialResponse = GetSampleInitialLoadBalanceResponse()
},
new LoadBalanceResponse()
{
ServerList = GetSampleServerList()
}
}));
using var policy = new GrpclbPolicy();
policy.OverrideLoadBalancerClient = balancerClientMock.Object;
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.6.120", 80) { IsLoadBalancer = true }
};
// Act
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false);
var subChannels = policy.SubChannels;
// Assert
policy.Dispose();
balancerClientMock.Verify(x => x.Dispose(), Times.Once());
}
[Fact]
public void ForGrpcSubChannels_UseGrpclbPolicySelectChannels_SelectChannelsInRoundRobin()
{
// Arrange
using var policy = new GrpclbPolicy();
var subChannels = new List<GrpcSubChannel>()
{
new GrpcSubChannel(new UriBuilder("http://10.1.5.210:80").Uri),
new GrpcSubChannel(new UriBuilder("http://10.1.5.212:80").Uri),
new GrpcSubChannel(new UriBuilder("http://10.1.5.211:80").Uri),
new GrpcSubChannel(new UriBuilder("http://10.1.5.213:80").Uri)
};
policy.SubChannels = subChannels;
// Act
// Assert
for (int i = 0; i < 30; i++)
{
var subChannel = policy.GetNextSubChannel();
Assert.Equal(subChannels[i % subChannels.Count].Address.Host, subChannel.Address.Host);
Assert.Equal(subChannels[i % subChannels.Count].Address.Port, subChannel.Address.Port);
Assert.Equal(subChannels[i % subChannels.Count].Address.Scheme, subChannel.Address.Scheme);
}
}
private static InitialLoadBalanceResponse GetSampleInitialLoadBalanceResponse()
{
var initialResponse = new InitialLoadBalanceResponse();
initialResponse.ClientStatsReportInterval = Duration.FromTimeSpan(TimeSpan.FromSeconds(10));
initialResponse.LoadBalancerDelegate = string.Empty;
return initialResponse;
}
private static ServerList GetSampleServerList()
{
var serverList = new ServerList();
serverList.Servers.Add(new Server()
{
IpAddress = ByteString.CopyFrom(IPAddress.Parse("10.1.5.211").GetAddressBytes()),
Port = 80
});
serverList.Servers.Add(new Server()
{
IpAddress = ByteString.CopyFrom(IPAddress.Parse("10.1.5.212").GetAddressBytes()),
Port = 80
});
serverList.Servers.Add(new Server()
{
IpAddress = ByteString.CopyFrom(IPAddress.Parse("10.1.5.213").GetAddressBytes()),
Port = 80
});
return serverList;
}
}
internal sealed class TestLoadBalancerResponse : IAsyncStreamReader<LoadBalanceResponse>
{
private readonly IReadOnlyList<LoadBalanceResponse> _loadBalanceResponses;
private int _streamIndex;
public TestLoadBalancerResponse(IReadOnlyList<LoadBalanceResponse> loadBalanceResponses)
{
_loadBalanceResponses = loadBalanceResponses;
_streamIndex = -1;
}
public LoadBalanceResponse Current => _loadBalanceResponses[_streamIndex];
public Task<bool> MoveNext(CancellationToken cancellationToken)
{
return Task.FromResult(++_streamIndex < _loadBalanceResponses.Count);
}
}
}

View File

@ -0,0 +1,179 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Xunit;
namespace Grpc.Net.Client.LoadBalancing.Tests.Policies
{
public sealed class PickFirstPolicyTests
{
[Fact]
public async Task ForEmptyServiceName_UsePickFirstPolicy_ThrowArgumentException()
{
// Arrange
using var policy = new PickFirstPolicy();
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.5.211", 80)
{
IsLoadBalancer = false
},
new GrpcNameResolutionResult("10.1.5.212", 80)
{
IsLoadBalancer = false
}
};
// Act
// Assert
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(resolutionResults, "", false);
});
Assert.Equal("serviceName not defined", exception.Message);
exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(resolutionResults, string.Empty, false);
});
Assert.Equal("serviceName not defined", exception.Message);
}
[Fact]
public async Task ForEmptyResolutionPassed_UsePickFirstPolicy_ThrowArgumentException()
{
// Arrange
using var policy = new PickFirstPolicy();
// Act
// Assert
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(new List<GrpcNameResolutionResult>(), "sample-service.contoso.com", false);
});
Assert.Equal("resolutionResult must contain at least one non-blancer address", exception.Message);
}
[Fact]
public async Task ForBalancersResolutionOnly_UsePickFirstPolicy_ThrowArgumentException()
{
// Arrange
using var policy = new PickFirstPolicy();
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.6.120", 80)
{
IsLoadBalancer = true
},
new GrpcNameResolutionResult("10.1.6.121", 80)
{
IsLoadBalancer = true
}
};
// Act
// Assert
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false); // load balancers are ignored
});
Assert.Equal("resolutionResult must contain at least one non-blancer address", exception.Message);
}
[Fact]
public async Task ForResolutionResults_UsePickFirstPolicy_CreateAmmountSubChannels()
{
// Arrange
using var policy = new PickFirstPolicy();
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.5.211", 80)
{
IsLoadBalancer = false
},
new GrpcNameResolutionResult("10.1.5.212", 80)
{
IsLoadBalancer = false
},
new GrpcNameResolutionResult("10.1.5.213", 80)
{
IsLoadBalancer = false
},
new GrpcNameResolutionResult("10.1.5.214", 80)
{
IsLoadBalancer = false
}
};
// Act
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false);
var subChannels = policy.SubChannels;
// Assert
Assert.Single(subChannels);
Assert.Equal("http", subChannels[0].Address.Scheme);
Assert.Equal(80, subChannels[0].Address.Port);
Assert.StartsWith("10.1.5.211", subChannels[0].Address.Host);
}
[Fact]
public async Task ForResolutionResultWithBalancers_UsePickFirstPolicy_IgnoreBalancersCreateSubchannels()
{
// Arrange
using var policy = new PickFirstPolicy();
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.6.120", 80)
{
IsLoadBalancer = true
},
new GrpcNameResolutionResult("10.1.5.212", 8443)
{
IsLoadBalancer = false
},
new GrpcNameResolutionResult("10.1.6.121", 80)
{
IsLoadBalancer = true
},
new GrpcNameResolutionResult("10.1.5.214", 8443)
{
IsLoadBalancer = false
}
};
// Act
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", true);
var subChannels = policy.SubChannels;
// Assert
Assert.Single(subChannels); // load balancers are ignored
Assert.Equal("https", subChannels[0].Address.Scheme);
Assert.Equal(8443, subChannels[0].Address.Port);
Assert.StartsWith("10.1.5.212", subChannels[0].Address.Host);
}
[Fact]
public void ForGrpcSubChannels_UsePickFirstPolicySelectChannels_SelectFirstChannel()
{
// Arrange
using var policy = new PickFirstPolicy();
var subChannels = new List<GrpcSubChannel>()
{
new GrpcSubChannel(new UriBuilder("http://10.1.5.210:80").Uri),
new GrpcSubChannel(new UriBuilder("http://10.1.5.212:80").Uri),
new GrpcSubChannel(new UriBuilder("http://10.1.5.211:80").Uri),
new GrpcSubChannel(new UriBuilder("http://10.1.5.213:80").Uri)
};
policy.SubChannels = subChannels;
// Act
// Assert
for (int i = 0; i < 30; i++)
{
var subChannel = policy.GetNextSubChannel();
Assert.Equal(subChannels[0].Address.Host, subChannel.Address.Host);
Assert.Equal(subChannels[0].Address.Port, subChannel.Address.Port);
Assert.Equal(subChannels[0].Address.Scheme, subChannel.Address.Scheme);
}
}
}
}

View File

@ -0,0 +1,184 @@
using Grpc.Net.Client.LoadBalancing.Policies;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Xunit;
namespace Grpc.Net.Client.LoadBalancing.Tests.Policies
{
public sealed class RoundRobinPolicyTests
{
[Fact]
public async Task ForEmptyServiceName_UseRoundRobinPolicy_ThrowArgumentException()
{
// Arrange
using var policy = new RoundRobinPolicy();
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.5.211", 80)
{
IsLoadBalancer = false
},
new GrpcNameResolutionResult("10.1.5.212", 80)
{
IsLoadBalancer = false
}
};
// Act
// Assert
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(resolutionResults, "", false);
});
Assert.Equal("serviceName not defined", exception.Message);
exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(resolutionResults, string.Empty, false);
});
Assert.Equal("serviceName not defined", exception.Message);
}
[Fact]
public async Task ForEmptyResolutionPassed_UseRoundRobinPolicy_ThrowArgumentException()
{
// Arrange
using var policy = new RoundRobinPolicy();
// Act
// Assert
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(new List<GrpcNameResolutionResult>(), "sample-service.contoso.com", false);
});
Assert.Equal("resolutionResult must contain at least one non-blancer address", exception.Message);
}
[Fact]
public async Task ForBalancersResolutionOnly_UseRoundRobinPolicy_ThrowArgumentException()
{
// Arrange
using var policy = new RoundRobinPolicy();
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.6.120", 80)
{
IsLoadBalancer = true
},
new GrpcNameResolutionResult("10.1.6.121", 80)
{
IsLoadBalancer = true
}
};
// Act
// Assert
var exception = await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false); // load balancers are ignored
});
Assert.Equal("resolutionResult must contain at least one non-blancer address", exception.Message);
}
[Fact]
public async Task ForResolutionResults_UseRoundRobinPolicy_CreateAmmountSubChannels()
{
// Arrange
using var policy = new RoundRobinPolicy();
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.5.211", 80)
{
IsLoadBalancer = false
},
new GrpcNameResolutionResult("10.1.5.212", 80)
{
IsLoadBalancer = false
},
new GrpcNameResolutionResult("10.1.5.213", 80)
{
IsLoadBalancer = false
},
new GrpcNameResolutionResult("10.1.5.214", 80)
{
IsLoadBalancer = false
}
};
// Act
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", false);
var subChannels = policy.SubChannels;
// Assert
Assert.Equal(4, subChannels.Count);
Assert.All(subChannels, subChannel => Assert.Equal("http", subChannel.Address.Scheme));
Assert.All(subChannels, subChannel => Assert.Equal(80, subChannel.Address.Port));
Assert.All(subChannels, subChannel => Assert.StartsWith("10.1.5.", subChannel.Address.Host));
}
[Fact]
public async Task ForResolutionResultWithBalancers_UseRoundRobinPolicy_IgnoreBalancersCreateSubchannels()
{
// Arrange
using var policy = new RoundRobinPolicy();
var resolutionResults = new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.5.211", 8443)
{
IsLoadBalancer = false,
},
new GrpcNameResolutionResult("10.1.5.212", 8443)
{
IsLoadBalancer = false
},
new GrpcNameResolutionResult("10.1.6.120", 80)
{
IsLoadBalancer = true
},
new GrpcNameResolutionResult("10.1.6.121", 80)
{
IsLoadBalancer = true
},
new GrpcNameResolutionResult("10.1.5.214", 8443)
{
IsLoadBalancer = false
}
};
// Act
await policy.CreateSubChannelsAsync(resolutionResults, "sample-service.contoso.com", true);
var subChannels = policy.SubChannels;
// Assert
Assert.Equal(3, subChannels.Count); // load balancers are ignored
Assert.All(subChannels, subChannel => Assert.Equal("https", subChannel.Address.Scheme));
Assert.All(subChannels, subChannel => Assert.Equal(8443, subChannel.Address.Port));
Assert.All(subChannels, subChannel => Assert.StartsWith("10.1.5.", subChannel.Address.Host));
}
[Fact]
public void ForGrpcSubChannels_UseRoundRobinPolicySelectChannels_SelectChannelsInRoundRobin()
{
// Arrange
using var policy = new RoundRobinPolicy();
var subChannels = new List<GrpcSubChannel>()
{
new GrpcSubChannel(new UriBuilder("http://10.1.5.210:80").Uri),
new GrpcSubChannel(new UriBuilder("http://10.1.5.212:80").Uri),
new GrpcSubChannel(new UriBuilder("http://10.1.5.211:80").Uri),
new GrpcSubChannel(new UriBuilder("http://10.1.5.213:80").Uri)
};
policy.SubChannels = subChannels;
// Act
// Assert
for (int i = 0; i < 30; i++)
{
var subChannel = policy.GetNextSubChannel();
Assert.Equal(subChannels[i % subChannels.Count].Address.Host, subChannel.Address.Host);
Assert.Equal(subChannels[i % subChannels.Count].Address.Port, subChannel.Address.Port);
Assert.Equal(subChannels[i % subChannels.Count].Address.Scheme, subChannel.Address.Scheme);
}
}
}
}

View File

@ -0,0 +1,126 @@
using DnsClient;
using DnsClient.Protocol;
using Grpc.Net.Client.LoadBalancing.ResolverPlugins;
using Moq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Xunit;
namespace Grpc.Net.Client.LoadBalancing.Tests.ResolverPlugins
{
public sealed class DnsClientResolverPluginTests
{
[Fact]
public async Task ForTargetWithNonDnsScheme_UseDnsClientResolverPlugin_ThrowArgumentException()
{
// Arrange
var resolverPlugin = new DnsClientResolverPlugin();
// Act
// Assert
await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await resolverPlugin.StartNameResolutionAsync(new Uri("http://sample.host.com"));
});
await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await resolverPlugin.StartNameResolutionAsync(new Uri("https://sample.host.com"));
});
await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await resolverPlugin.StartNameResolutionAsync(new Uri("unknown://sample.host.com"));
});
}
[Fact]
public async Task ForTargetAndEmptyDnsResults_UseDnsClientResolverPlugin_ReturnNoFinidings()
{
// Arrange
var serviceHostName = "my-service";
var txtDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
var srvBalancersDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
var aServersDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
var dnsClientMock = new Mock<IDnsQuery>(MockBehavior.Strict);
txtDnsQueryResponse.Setup(x => x.Answers).Returns(new List<TxtRecord>().AsReadOnly());
srvBalancersDnsQueryResponse.Setup(x => x.Answers).Returns(new List<SrvRecord>().AsReadOnly());
aServersDnsQueryResponse.Setup(x => x.Answers).Returns(new List<SrvRecord>().AsReadOnly());
dnsClientMock.Setup(x => x.QueryAsync($"_grpc_config.{serviceHostName}", QueryType.TXT, QueryClass.IN, default))
.Returns(Task.FromResult(txtDnsQueryResponse.Object));
dnsClientMock.Setup(x => x.QueryAsync($"_grpclb._tcp.{serviceHostName}", QueryType.SRV, QueryClass.IN, default))
.Returns(Task.FromResult(srvBalancersDnsQueryResponse.Object));
dnsClientMock.Setup(x => x.QueryAsync(serviceHostName, QueryType.A, QueryClass.IN, default))
.Returns(Task.FromResult(aServersDnsQueryResponse.Object));
var resolverPlugin = new DnsClientResolverPlugin();
resolverPlugin.OverrideDnsClient = dnsClientMock.Object;
// Act
var resolutionResult = await resolverPlugin.StartNameResolutionAsync(new Uri($"dns://{serviceHostName}:80"));
// Assert
Assert.Empty(resolutionResult);
}
[Fact]
public async Task ForTargetAndBalancerSrvRecords_UseDnsClientResolverPlugin_ReturnBalancers()
{
// Arrange
var serviceHostName = "my-service";
var txtDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
var srvBalancersDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
var aServersDnsQueryResponse = new Mock<IDnsQueryResponse>(MockBehavior.Strict);
var dnsClientMock = new Mock<IDnsQuery>(MockBehavior.Strict);
txtDnsQueryResponse.Setup(x => x.Answers).Returns(new List<TxtRecord>().AsReadOnly());
srvBalancersDnsQueryResponse.Setup(x => x.Answers).Returns(new List<SrvRecord>(GetBalancersSrvRecords(serviceHostName)).AsReadOnly());
aServersDnsQueryResponse.Setup(x => x.Answers).Returns(new List<ARecord>(GetServersARecords(serviceHostName)).AsReadOnly());
dnsClientMock.Setup(x => x.QueryAsync($"_grpc_config.{serviceHostName}", QueryType.TXT, QueryClass.IN, default))
.Returns(Task.FromResult(txtDnsQueryResponse.Object));
dnsClientMock.Setup(x => x.QueryAsync($"_grpclb._tcp.{serviceHostName}", QueryType.SRV, QueryClass.IN, default))
.Returns(Task.FromResult(srvBalancersDnsQueryResponse.Object));
dnsClientMock.Setup(x => x.QueryAsync(serviceHostName, QueryType.A, QueryClass.IN, default))
.Returns(Task.FromResult(aServersDnsQueryResponse.Object));
var resolverPlugin = new DnsClientResolverPlugin();
resolverPlugin.OverrideDnsClient = dnsClientMock.Object;
// Act
var resolutionResult = await resolverPlugin.StartNameResolutionAsync(new Uri($"dns://{serviceHostName}:443"));
// Assert
Assert.Equal(5, resolutionResult.Count);
Assert.Equal(2, resolutionResult.Where(x => x.IsLoadBalancer).Count());
Assert.All(resolutionResult.Where(x => x.IsLoadBalancer), x => Assert.Equal(80, x.Port));
Assert.All(resolutionResult.Where(x => x.IsLoadBalancer), x => Assert.StartsWith("10-1-6-", x.Host));
Assert.Equal(3, resolutionResult.Where(x => !x.IsLoadBalancer).Count());
Assert.All(resolutionResult.Where(x => !x.IsLoadBalancer), x => Assert.Equal(443, x.Port));
Assert.All(resolutionResult.Where(x => !x.IsLoadBalancer), x => Assert.StartsWith("10.1.5.", x.Host));
}
private List<SrvRecord> GetBalancersSrvRecords(string serviceHostName)
{
return new List<SrvRecord>()
{
new SrvRecord(new ResourceRecordInfo($"_grpclb._tcp.{serviceHostName}", ResourceRecordType.SRV, QueryClass.IN, 30, 0), 0, 0, 80, DnsString.Parse($"10-1-6-120.{serviceHostName}")),
new SrvRecord(new ResourceRecordInfo($"_grpclb._tcp.{serviceHostName}", ResourceRecordType.SRV, QueryClass.IN, 30, 0), 0, 0, 80, DnsString.Parse($"10-1-6-121.{serviceHostName}"))
};
}
private List<ARecord> GetServersARecords(string serviceHostName)
{
return new List<ARecord>()
{
new ARecord(new ResourceRecordInfo(serviceHostName, ResourceRecordType.A, QueryClass.IN, 30, 0), IPAddress.Parse("10.1.5.211")),
new ARecord(new ResourceRecordInfo(serviceHostName, ResourceRecordType.A, QueryClass.IN, 30, 0), IPAddress.Parse("10.1.5.212")),
new ARecord(new ResourceRecordInfo(serviceHostName, ResourceRecordType.A, QueryClass.IN, 30, 0), IPAddress.Parse("10.1.5.213"))
};
}
}
}

View File

@ -0,0 +1,39 @@
using System;
using System.Threading.Tasks;
using Xunit;
namespace Grpc.Net.Client.LoadBalancing.Tests.ResolverPlugins
{
public sealed class NoneResolverPluginTests
{
[Fact]
public async Task ForTarget_UseNoneResolverPlugin_ReturnResolutionResultWithTheSameValue()
{
// Arrange
var resolverPlugin = new NoneResolverPlugin();
// Act
var resolutionResult = await resolverPlugin.StartNameResolutionAsync(new Uri("https://sample.host.com"));
// Assert
Assert.Single(resolutionResult);
Assert.Equal("sample.host.com", resolutionResult[0].Host);
Assert.Equal(443, resolutionResult[0].Port);
Assert.False(resolutionResult[0].IsLoadBalancer);
}
[Fact]
public async Task ForTargetWithDnsScheme_UseNoneResolverPlugin_ThrowArgumentException()
{
// Arrange
var resolverPlugin = new NoneResolverPlugin();
// Act
// Assert
await Assert.ThrowsAsync<ArgumentException>(async () =>
{
await resolverPlugin.StartNameResolutionAsync(new Uri("dns://sample.host.com"));
});
}
}
}

View File

@ -0,0 +1,36 @@
using Grpc.Net.Client.LoadBalancing.ResolverPlugins;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Xunit;
namespace Grpc.Net.Client.LoadBalancing.Tests.ResolverPlugins
{
public sealed class StaticResolverPluginTests
{
[Fact]
public async Task ForStaticResolutionFunction_UseStaticResolverPlugin_ReturnPredefinedValues()
{
// Arrange
Func<Uri, List<GrpcNameResolutionResult>> resolveFunction = (uri) =>
{
return new List<GrpcNameResolutionResult>()
{
new GrpcNameResolutionResult("10.1.5.212", 8080),
new GrpcNameResolutionResult("10.1.5.213", 8080)
};
};
var resolverPlugin = new StaticResolverPlugin(resolveFunction);
// Act
var resolutionResult = await resolverPlugin.StartNameResolutionAsync(new Uri("https://sample.host.com"));
// Assert
Assert.Equal(2, resolutionResult.Count);
Assert.Equal("10.1.5.212", resolutionResult[0].Host);
Assert.Equal("10.1.5.213", resolutionResult[1].Host);
Assert.Equal(8080, resolutionResult[0].Port);
Assert.Equal(8080, resolutionResult[1].Port);
}
}
}

View File

@ -63,17 +63,17 @@ namespace Grpc.Net.Client.Tests
var logs = testSink.Writes.Where(w => w.LogLevel >= Microsoft.Extensions.Logging.LogLevel.Debug).ToList();
Assert.AreEqual("Starting gRPC call. Method type: 'Unary', URI: 'https://localhost/ServiceName/MethodName'.", logs[0].State.ToString());
AssertScope(logs[0]);
Assert.AreEqual("Starting gRPC call. Method type: 'Unary', URI: 'https://localhost/ServiceName/MethodName'.", logs[4].State.ToString());
AssertScope(logs[4]);
Assert.AreEqual("Sending message.", logs[1].State.ToString());
AssertScope(logs[1]);
Assert.AreEqual("Sending message.", logs[5].State.ToString());
AssertScope(logs[5]);
Assert.AreEqual("Reading message.", logs[2].State.ToString());
AssertScope(logs[2]);
Assert.AreEqual("Reading message.", logs[6].State.ToString());
AssertScope(logs[6]);
Assert.AreEqual("Finished gRPC call.", logs[3].State.ToString());
AssertScope(logs[3]);
Assert.AreEqual("Finished gRPC call.", logs[7].State.ToString());
AssertScope(logs[7]);
static void AssertScope(WriteContext log)
{