Add middleware for unwrapping cloud events

Fixes: #74

note: This relies on the fix for dapr/dapr#574 which has been merged.

This change introduces a middleware that can upwrap a *structured* cloud
event. This is the format used by dapr by default now for pub/sub
messaging. Adding the middleware makes it transparent to the developer
whether the data can from a cloud event or was a basic RPC call.

We're adding the middleware for this first since it's the most general
approach. It has a drawback compared with other approaches, performance.

Users could alternatively use the SDK from CloudEvents to read their
data without the middleware.

We might also want to add an MVC formatter in the future, which could do
the unwrapping and deserialization to a user-type in a single operation.
This commit is contained in:
Ryan Nowak 2019-10-13 13:58:01 +11:00
parent 4fb29e44e3
commit a6ef7e451d
8 changed files with 469 additions and 2 deletions

View File

@ -0,0 +1,155 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Dapr
{
using System;
using System.IO;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.WebUtilities;
internal class CloudEventsMiddleware
{
private const string ContentType = "application/cloudevents+json";
private readonly RequestDelegate next;
public CloudEventsMiddleware(RequestDelegate next)
{
this.next = next;
}
public Task InvokeAsync(HttpContext httpContext)
{
// This middleware unwraps any requests with a cloud events (JSON) content type
// and replaces the request body + request content type so that it can be read by a
// non-cloud-events-aware piece of code.
//
// This corresponds to cloud events in the *structured* format:
// https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#13-content-modes
//
// For *binary* format, we don't have to do anything
//
// We don't support batching.
//
// The philosophy here is that we don't report an error for things we don't support, because
// that would block someone from implementing their own support for it. We only report an error
// when something we do support isn't correct.
if (!this.MatchesContentType(httpContext, out var charSet))
{
return this.next(httpContext);
}
return this.ProcessBodyAsync(httpContext, charSet);
}
private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
{
JsonElement json;
if (string.Equals(charSet, Encoding.UTF8.WebName, StringComparison.OrdinalIgnoreCase))
{
json = await JsonSerializer.DeserializeAsync<JsonElement>(httpContext.Request.Body);
}
else
{
using (var reader = new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet)))
{
var text = await reader.ReadToEndAsync();
json = JsonSerializer.Deserialize<JsonElement>(text);
}
}
Stream originalBody;
Stream body;
string originalContentType;
string contentType;
// Data is optional.
if (json.TryGetProperty("data", out var data))
{
body = new MemoryStream();
await JsonSerializer.SerializeAsync<JsonElement>(body, data);
body.Seek(0L, SeekOrigin.Begin);
if (json.TryGetProperty("datacontenttype", out var dataContentType) &&
dataContentType.ValueKind == JsonValueKind.String)
{
contentType = dataContentType.GetString();
// Since S.T.Json always outputs utf-8, we may need to normalize the data content type
// to remove any charset information. We generally just assume utf-8 everywhere, so omitting
// a charset is a safe bet.
if (contentType.Contains("charset") && MediaTypeHeaderValue.TryParse(contentType, out var parsed))
{
parsed.CharSet = null;
contentType = parsed.ToString();
}
}
else
{
// assume JSON is not specified.
contentType = "application/json";
}
}
else
{
body = new MemoryStream();
contentType = null;
}
originalBody = httpContext.Request.Body;
originalContentType = httpContext.Request.ContentType;
try
{
httpContext.Request.Body = body;
httpContext.Request.ContentType = contentType;
await this.next(httpContext);
}
finally
{
httpContext.Request.ContentType = originalContentType;
httpContext.Request.Body = originalBody;
}
}
private bool MatchesContentType(HttpContext httpContext, out string charSet)
{
if (httpContext.Request.ContentType == null)
{
charSet = null;
return false;
}
// Handle cases where the content type includes additional parameters like charset.
// Doing the string comparison up front so we can avoid allocation.
if (!httpContext.Request.ContentType.StartsWith(ContentType))
{
charSet = null;
return false;
}
if (!MediaTypeHeaderValue.TryParse(httpContext.Request.ContentType, out var parsed))
{
charSet = null;
return false;
}
if (parsed.MediaType != ContentType)
{
charSet = null;
return false;
}
charSet = parsed.CharSet ?? "UTF-8";
return true;
}
}
}

View File

@ -0,0 +1,33 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Microsoft.AspNetCore.Builder
{
using System;
using Dapr;
/// <summary>
/// Provides extension methods for <see cref="IApplicationBuilder" />.
/// </summary>
public static class DaprApplicationBuilderExtensions
{
/// <summary>
/// Adds the cloud events middleware to the middleware pipeline. The cloud events middleware will unwrap
/// requests that use the cloud events structured format, allowing the event payload to be read directly.
/// </summary>
/// <param name="builder">An <see cref="IApplicationBuilder" />.</param>
/// <returns>The <see cref="IApplicationBuilder" />.</returns>
public static IApplicationBuilder UseCloudEvents(this IApplicationBuilder builder)
{
if (builder is null)
{
throw new ArgumentNullException(nameof(builder));
}
builder.UseMiddleware<CloudEventsMiddleware>();
return builder;
}
}
}

View File

@ -13,11 +13,18 @@ namespace Dapr.AspNetCore.IntegrationTest.App
public class DaprController : ControllerBase
{
[Topic("B")]
[HttpPost("/topic-b")]
[HttpPost("/B")]
public void TopicB()
{
}
[Topic("register-user")]
[HttpPost("/register-user")]
public ActionResult<UserInfo> RegisterUser(UserInfo user)
{
return user; // echo back the user for testing
}
[HttpPost("/controllerwithoutstateentry/{widget}")]
public async Task AddOneWithoutStateEntry([FromServices]StateClient state, [FromState] Widget widget)
{

View File

@ -37,6 +37,8 @@ namespace Dapr.AspNetCore.IntegrationTest.App
app.UseAuthorization();
app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();

View File

@ -0,0 +1,15 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Dapr.AspNetCore.IntegrationTest.App
{
using System.ComponentModel.DataAnnotations;
public class UserInfo
{
[Required]
public string Name { get; set; }
}
}

View File

@ -0,0 +1,127 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Dapr.AspNetCore.IntegrationTest
{
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Dapr.AspNetCore.IntegrationTest.App;
using FluentAssertions;
using Microsoft.VisualStudio.TestTools.UnitTesting;
[TestClass]
public class CloudEventsIntegrationTest
{
private readonly JsonSerializerOptions options = new JsonSerializerOptions()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
PropertyNameCaseInsensitive = true,
};
[TestMethod]
public async Task CanSendEmptyStructuredCloudEvent()
{
using (var factory = new AppWebApplicationFactory())
{
var httpClient = factory.CreateClient();
var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost/B");
request.Content = new StringContent("{}", Encoding.UTF8);
request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/cloudevents+json");
var response = await httpClient.SendAsync(request);
response.EnsureSuccessStatusCode();
}
}
[TestMethod]
public async Task CanSendStructuredCloudEvent()
{
using (var factory = new AppWebApplicationFactory())
{
var httpClient = factory.CreateClient();
var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost/register-user");
request.Content = new StringContent(
JsonSerializer.Serialize(
new
{
data = new
{
name = "jimmy",
},
}),
Encoding.UTF8);
request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/cloudevents+json");
var response = await httpClient.SendAsync(request);
response.EnsureSuccessStatusCode();
var userInfo = await JsonSerializer.DeserializeAsync<UserInfo>(await response.Content.ReadAsStreamAsync(), this.options);
userInfo.Name.Should().Be("jimmy");
}
}
[TestMethod]
public async Task CanSendStructuredCloudEvent_WithContentType()
{
using (var factory = new AppWebApplicationFactory())
{
var httpClient = factory.CreateClient();
var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost/register-user");
request.Content = new StringContent(
JsonSerializer.Serialize(
new
{
data = new
{
name = "jimmy",
},
datacontenttype = "text/json",
}),
Encoding.UTF8);
request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/cloudevents+json");
var response = await httpClient.SendAsync(request);
response.EnsureSuccessStatusCode();
var userInfo = await JsonSerializer.DeserializeAsync<UserInfo>(await response.Content.ReadAsStreamAsync(), this.options);
userInfo.Name.Should().Be("jimmy");
}
}
// Yeah, I know, binary isn't a great term for this, it's what the cloudevents spec uses.
// Basically this is here to test that an endpoint can handle requests with and without
// an envelope.
[TestMethod]
public async Task CanSendBinaryCloudEvent_WithContentType()
{
using (var factory = new AppWebApplicationFactory())
{
var httpClient = factory.CreateClient();
var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost/register-user");
request.Content = new StringContent(
JsonSerializer.Serialize(
new
{
name = "jimmy",
}),
Encoding.UTF8);
request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
var response = await httpClient.SendAsync(request);
response.EnsureSuccessStatusCode();
var userInfo = await JsonSerializer.DeserializeAsync<UserInfo>(await response.Content.ReadAsStreamAsync(), this.options);
userInfo.Name.Should().Be("jimmy");
}
}
}
}

View File

@ -31,7 +31,7 @@ namespace Dapr.AspNetCore.IntegrationTest
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);
json.ValueKind.Should().Be(JsonValueKind.Array);
json.GetArrayLength().Should().Be(2);
json.GetArrayLength().Should().Be(3);
var topics = new List<string>();
foreach (var element in json.EnumerateArray())
{
@ -40,6 +40,7 @@ namespace Dapr.AspNetCore.IntegrationTest
topics.Should().Contain("A");
topics.Should().Contain("B");
topics.Should().Contain("register-user");
}
}
}

View File

@ -0,0 +1,127 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Dapr.AspNetCore.Test
{
using System.IO;
using System.Text;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.VisualStudio.TestTools.UnitTesting;
[TestClass]
public class CloudEventsMiddlewareTest
{
[DataTestMethod]
[DataRow("text/plain")]
[DataRow("application/json")] // "binary" format
[DataRow("application/cloudevents")] // no format
[DataRow("application/cloudevents+xml")] // wrong format
[DataRow("application/cloudevents-batch+json")] // we don't support batch
public async Task InvokeAsync_IgnoresOtherContentTypes(string contentType)
{
var app = new ApplicationBuilder(null);
app.UseCloudEvents();
// Do verification in the scope of the middleware
app.Run(httpContext =>
{
httpContext.Request.ContentType.Should().Be(contentType);
ReadBody(httpContext.Request.Body).Should().Be("Hello, world!");
return Task.CompletedTask;
});
var pipeline = app.Build();
var context = new DefaultHttpContext();
context.Request.ContentType = contentType;
context.Request.Body = MakeBody("Hello, world!");
await pipeline.Invoke(context);
}
[DataTestMethod]
[DataRow(null, null)] // assumes application/json + utf8
[DataRow("application/json", null)] // assumes utf8
[DataRow("application/json", "utf-8")]
[DataRow("application/json", "UTF-8")]
[DataRow("application/person+json", "UTF-16")] // arbitrary content type and charset
public async Task InvokeAsync_ReplacesBodyJson(string dataContentType, string charSet)
{
var encoding = charSet == null ? null : Encoding.GetEncoding(charSet);
var app = new ApplicationBuilder(null);
app.UseCloudEvents();
// Do verification in the scope of the middleware
app.Run(httpContext =>
{
httpContext.Request.ContentType.Should().Be(dataContentType ?? "application/json");
ReadBody(httpContext.Request.Body).Should().Be("{\"name\":\"jimmy\"}");
return Task.CompletedTask;
});
var pipeline = app.Build();
var context = new DefaultHttpContext();
context.Request.ContentType = charSet == null ? "application/cloudevents+json" : $"application/cloudevents+json;charset={charSet}";
context.Request.Body = dataContentType == null ?
MakeBody("{ \"data\": { \"name\":\"jimmy\" } }", encoding) :
MakeBody($"{{ \"datacontenttype\": \"{dataContentType}\", \"data\": {{ \"name\":\"jimmy\" }} }}", encoding);
await pipeline.Invoke(context);
}
// This is a special case. S.T.Json will always output utf8, so we have to reinterpret the charset
// of the datacontenttype.
[TestMethod]
public async Task InvokeAsync_ReplacesBodyJson_NormalizesPayloadCharset()
{
var dataContentType = "application/person+json;charset=UTF-16";
var charSet = "UTF-16";
var encoding = Encoding.GetEncoding(charSet);
var app = new ApplicationBuilder(null);
app.UseCloudEvents();
// Do verification in the scope of the middleware
app.Run(httpContext =>
{
httpContext.Request.ContentType.Should().Be("application/person+json");
ReadBody(httpContext.Request.Body).Should().Be("{\"name\":\"jimmy\"}");
return Task.CompletedTask;
});
var pipeline = app.Build();
var context = new DefaultHttpContext();
context.Request.ContentType = $"application/cloudevents+json;charset={charSet}";
context.Request.Body =
MakeBody($"{{ \"datacontenttype\": \"{dataContentType}\", \"data\": {{ \"name\":\"jimmy\" }} }}", encoding);
await pipeline.Invoke(context);
}
private static Stream MakeBody(string text, Encoding encoding = null)
{
encoding ??= Encoding.UTF8;
var stream = new MemoryStream();
stream.Write(encoding.GetBytes(text));
stream.Seek(0L, SeekOrigin.Begin);
return stream;
}
private static string ReadBody(Stream stream, Encoding encoding = null)
{
encoding ??= Encoding.UTF8;
var bytes = new byte[stream.Length];
stream.Read(bytes, 0, bytes.Length);
return encoding.GetString(bytes);
}
}
}