// ------------------------------------------------------------ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. // ------------------------------------------------------------ namespace Dapr { using System; using System.Buffers.Binary; using System.IO; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Text.Json; using System.Threading.Tasks; using Google.Protobuf; using Grpc.Core; public class GrpcUtils { internal static readonly MediaTypeHeaderValue GrpcContentTypeHeaderValue = new MediaTypeHeaderValue("application/grpc"); internal static readonly Version ProtocolVersion = new Version(2, 0); internal const string MessageEncodingHeader = "grpc-encoding"; internal const string IdentityGrpcEncoding = "identity"; internal const string StatusTrailer = "grpc-status"; internal const int MessageDelimiterSize = 4; // how many bytes it takes to encode "Message-Length" internal const int HeaderSize = MessageDelimiterSize + 1; // message length + compression flag /// /// Gets the request from protobuf. /// bytes in http request message content contains gRPC Headers and protobuf. /// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md /// /// /// /// public static async Task GetRequestFromRequestMessageAsync(HttpRequestMessage request) where T : IMessage, new() { var bytes = await request.Content.ReadAsByteArrayAsync(); // first 5 bytes in request are grpc headers // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md var parser = new MessageParser(() => new T()); var envelope = parser.ParseFrom(bytes[5..]); return envelope; } public static HttpResponseMessage CreateResponse(HttpStatusCode statusCode) => CreateResponse(statusCode, string.Empty); public static HttpResponseMessage CreateResponse(HttpStatusCode statusCode, string payload) => CreateResponse(statusCode, new StringContent(payload)); public static HttpResponseMessage CreateResponse( HttpStatusCode statusCode, HttpContent payload, StatusCode? grpcStatusCode = StatusCode.OK, string grpcEncoding = null, Version version = null) { payload.Headers.ContentType = GrpcContentTypeHeaderValue; var message = new HttpResponseMessage(statusCode) { Content = payload, Version = version ?? ProtocolVersion }; message.Headers.Add(MessageEncodingHeader, grpcEncoding ?? IdentityGrpcEncoding); if (grpcStatusCode != null) { message.TrailingHeaders.Add(StatusTrailer, grpcStatusCode.Value.ToString("D")); } return message; } public static Task CreateResponseContent(TResponse response) where TResponse : IMessage { return CreateResponseContentCore(new[] { response }); } private static async Task CreateResponseContentCore(TResponse[] responses) where TResponse : IMessage { var ms = new MemoryStream(); foreach (var response in responses) { await WriteResponseAsync(ms, response); } ms.Seek(0, SeekOrigin.Begin); var streamContent = new StreamContent(ms); streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/grpc"); return streamContent; } private static async Task WriteResponseAsync(Stream ms, TResponse response) where TResponse : IMessage { var data = response.ToByteArray(); await WriteHeaderAsync(ms, data.Length, false); await ms.WriteAsync(data); } private static Task WriteHeaderAsync(Stream stream, int length, bool compress = false) { var headerData = new byte[HeaderSize]; // Compression flag headerData[0] = compress ? (byte)1 : (byte)0; // Message length EncodeMessageLength(length, headerData.AsSpan(1)); return stream.WriteAsync(headerData, 0, headerData.Length); } private static void EncodeMessageLength(int messageLength, Span destination) { BinaryPrimitives.WriteUInt32BigEndian(destination, (uint)messageLength); } } }