Added support for processing raw messages. (#717)

* Added support for processing raw messages.

* Refactored code and updated subscribeendpoint tests.

* Updated file name to match class name.

* Marked subscription and metadata classes as internal.
This commit is contained in:
Sapinder Pal Singh 2021-08-18 21:56:53 +05:30 committed by GitHub
parent ff25eb0bb1
commit de84c7642d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 175 additions and 24 deletions

View File

@ -5,12 +5,12 @@
namespace Microsoft.AspNetCore.Builder namespace Microsoft.AspNetCore.Builder
{ {
using System; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text.Json; using System.Text.Json;
using System.Threading; using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Dapr; using Dapr;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Routing; using Microsoft.AspNetCore.Routing;
using Microsoft.AspNetCore.Routing.Patterns; using Microsoft.AspNetCore.Routing.Patterns;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@ -28,6 +28,24 @@ namespace Microsoft.AspNetCore.Builder
/// <param name="endpoints">The <see cref="IEndpointRouteBuilder" />.</param> /// <param name="endpoints">The <see cref="IEndpointRouteBuilder" />.</param>
/// <returns>The <see cref="IEndpointConventionBuilder" />.</returns> /// <returns>The <see cref="IEndpointConventionBuilder" />.</returns>
public static IEndpointConventionBuilder MapSubscribeHandler(this IEndpointRouteBuilder endpoints) public static IEndpointConventionBuilder MapSubscribeHandler(this IEndpointRouteBuilder endpoints)
{
return CreateSubscribeEndPoint(endpoints);
}
/// <summary>
/// Maps an endpoint that will respond to requests to <c>/dapr/subscribe</c> from the
/// Dapr runtime.
/// </summary>
/// <param name="endpoints">The <see cref="IEndpointRouteBuilder" />.</param>
/// <param name="options">Configuration options</param>
/// <returns>The <see cref="IEndpointConventionBuilder" />.</returns>
/// <seealso cref="MapSubscribeHandler(IEndpointRouteBuilder)"/>
public static IEndpointConventionBuilder MapSubscribeHandler(this IEndpointRouteBuilder endpoints, SubscribeOptions options)
{
return CreateSubscribeEndPoint(endpoints, options);
}
private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRouteBuilder endpoints, SubscribeOptions options = null)
{ {
if (endpoints is null) if (endpoints is null)
{ {
@ -41,13 +59,12 @@ namespace Microsoft.AspNetCore.Builder
.OfType<RouteEndpoint>() .OfType<RouteEndpoint>()
.Where(e => e.Metadata.GetMetadata<ITopicMetadata>()?.Name != null) // only endpoints which have TopicAttribute with not null Name. .Where(e => e.Metadata.GetMetadata<ITopicMetadata>()?.Name != null) // only endpoints which have TopicAttribute with not null Name.
.Distinct() .Distinct()
.Select(e => (e.Metadata.GetMetadata<ITopicMetadata>().PubsubName, e.Metadata.GetMetadata<ITopicMetadata>().Name, e.RoutePattern)); .Select(e => (e.Metadata.GetMetadata<ITopicMetadata>().PubsubName, e.Metadata.GetMetadata<ITopicMetadata>().Name, e.Metadata.GetMetadata<IRawTopicMetadata>()?.EnableRawPayload, e.RoutePattern));
context.Response.ContentType = "application/json";
using var writer = new Utf8JsonWriter(context.Response.BodyWriter);
writer.WriteStartArray();
var logger = context.RequestServices.GetService<ILoggerFactory>().CreateLogger("DaprTopicSubscription"); var logger = context.RequestServices.GetService<ILoggerFactory>().CreateLogger("DaprTopicSubscription");
List<Subscription> subscriptions = new();
foreach (var entry in entries) foreach (var entry in entries)
{ {
// only return topics which have routes without parameters. // only return topics which have routes without parameters.
@ -61,21 +78,35 @@ namespace Microsoft.AspNetCore.Builder
continue; continue;
} }
writer.WriteStartObject();
writer.WriteString("topic", entry.Name);
var route = string.Join("/", var route = string.Join("/",
entry.RoutePattern.PathSegments entry.RoutePattern.PathSegments
.Select(segment => string.Concat(segment.Parts.Cast<RoutePatternLiteralPart>() .Select(segment => string.Concat(segment.Parts.Cast<RoutePatternLiteralPart>()
.Select(part => part.Content)))); .Select(part => part.Content))));
writer.WriteString("route", route); var rawPayload = entry.EnableRawPayload ?? options?.EnableRawPayload;
writer.WriteString("pubsubName", entry.PubsubName); var subscription = new Subscription
writer.WriteEndObject(); {
Topic = entry.Name,
PubsubName = entry.PubsubName,
Route = route
};
if (rawPayload != null)
{
subscription.Metadata = new Metadata
{
RawPayload = rawPayload.ToString().ToLower()
};
}
subscriptions.Add(subscription);
} }
writer.WriteEndArray(); await context.Response.WriteAsync(JsonSerializer.Serialize(subscriptions,
await writer.FlushAsync(); new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
}));
}); });
} }
} }

View File

@ -0,0 +1,18 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Dapr
{
/// <summary>
/// RawMetadata that describes subscribe endpoint to enable or disable processing raw messages.
/// </summary>
public interface IRawTopicMetadata
{
/// <summary>
/// Gets the enable or disable value for processing raw messages.
/// </summary>
bool? EnableRawPayload { get; }
}
}

View File

@ -0,0 +1,18 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Dapr
{
/// <summary>
/// This class defines configurations for the subscribe endpoint.
/// </summary>
public class SubscribeOptions
{
/// <summary>
/// Gets or Sets a value which indicates whether to enable or disable processing raw messages.
/// </summary>
public bool EnableRawPayload { get; set; }
}
}

View File

@ -0,0 +1,44 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Dapr
{
/// <summary>
/// This class defines subscribe endpoint response
/// </summary>
internal class Subscription
{
/// <summary>
/// Gets or sets the topic name.
/// </summary>
public string Topic { get; set; }
/// <summary>
/// Gets or sets the pubsub name
/// </summary>
public string PubsubName { get; set; }
/// <summary>
/// Gets or sets the route
/// </summary>
public string Route { get; set; }
/// <summary>
/// Gets or sets the metadata.
/// </summary>
public Metadata Metadata { get; set; }
}
/// <summary>
/// This class defines the metadata for subscribe endpoint.
/// </summary>
internal class Metadata
{
/// <summary>
/// Gets or sets the rawoayload
/// </summary>
public string RawPayload { get; set; }
}
}

View File

@ -10,7 +10,7 @@ namespace Dapr
/// <summary> /// <summary>
/// Metadata that describes an endpoint as a subscriber to a topic. /// Metadata that describes an endpoint as a subscriber to a topic.
/// </summary> /// </summary>
public class TopicAttribute : Attribute, ITopicMetadata public class TopicAttribute : Attribute, ITopicMetadata, IRawTopicMetadata
{ {
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="TopicAttribute" /> class. /// Initializes a new instance of the <see cref="TopicAttribute" /> class.
@ -26,10 +26,29 @@ namespace Dapr
this.PubsubName = pubsubName; this.PubsubName = pubsubName;
} }
/// <summary>
/// Initializes a new instance of the <see cref="TopicAttribute" /> class.
/// </summary>
/// <param name="pubsubName">The name of the pubsub component to use.</param>
/// <param name="name">The topic name.</param>
/// <param name="enableRawPayload">The enable/disable raw pay load flag.</param>
public TopicAttribute(string pubsubName, string name, bool enableRawPayload)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
this.Name = name;
this.PubsubName = pubsubName;
this.EnableRawPayload = enableRawPayload;
}
/// <inheritdoc/> /// <inheritdoc/>
public string Name { get; } public string Name { get; }
/// <inheritdoc/> /// <inheritdoc/>
public string PubsubName { get; } public string PubsubName { get; }
/// <inheritdoc/>
public bool? EnableRawPayload { get; }
} }
} }

View File

@ -29,6 +29,18 @@ namespace Dapr.AspNetCore.IntegrationTest.App
{ {
} }
[Topic("pubsub", "D", true)]
[HttpPost("/D")]
public void TopicD()
{
}
[Topic("pubsub", "E", false)]
[HttpPost("/E")]
public void TopicE()
{
}
[Topic("pubsub", "register-user")] [Topic("pubsub", "register-user")]
[HttpPost("/register-user")] [HttpPost("/register-user")]
public ActionResult<UserInfo> RegisterUser(UserInfo user) public ActionResult<UserInfo> RegisterUser(UserInfo user)

View File

@ -8,6 +8,7 @@ namespace Dapr.AspNetCore.IntegrationTest
using System.Collections.Generic; using System.Collections.Generic;
using System.Net.Http; using System.Net.Http;
using System.Text.Json; using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks; using System.Threading.Tasks;
using FluentAssertions; using FluentAssertions;
using Xunit; using Xunit;
@ -30,21 +31,29 @@ namespace Dapr.AspNetCore.IntegrationTest
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream); var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);
json.ValueKind.Should().Be(JsonValueKind.Array); json.ValueKind.Should().Be(JsonValueKind.Array);
json.GetArrayLength().Should().Be(5); json.GetArrayLength().Should().Be(7);
var metadata = new List<(string PubsubName, string Topic, string Route)>(); var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload)>();
foreach (var element in json.EnumerateArray()) foreach (var element in json.EnumerateArray())
{ {
var pubsubName = element.GetProperty("pubsubName").GetString(); var pubsubName = element.GetProperty("pubsubName").GetString();
var topic = element.GetProperty("topic").GetString(); var topic = element.GetProperty("topic").GetString();
var route = element.GetProperty("route").GetString(); var route = element.GetProperty("route").GetString();
metadata.Add((pubsubName, topic, route)); var rawPayload = string.Empty;
if(element.TryGetProperty("metadata", out JsonElement metadata))
{
rawPayload = metadata.GetProperty("rawPayload").GetString();
} }
metadata.Should().Contain(("testpubsub", "A", "topic-a")); subscriptions.Add((pubsubName, topic, route,rawPayload));
metadata.Should().Contain(("pubsub", "B", "B")); }
metadata.Should().Contain(("custom-pubsub", "custom-C", "C"));
metadata.Should().Contain(("pubsub", "register-user", "register-user")); subscriptions.Should().Contain(("testpubsub", "A", "topic-a", string.Empty));
metadata.Should().Contain(("pubsub", "register-user-plaintext", "register-user-plaintext")); subscriptions.Should().Contain(("pubsub", "B", "B", string.Empty));
subscriptions.Should().Contain(("custom-pubsub", "custom-C", "C", string.Empty));
subscriptions.Should().Contain(("pubsub", "register-user", "register-user", string.Empty));
subscriptions.Should().Contain(("pubsub", "register-user-plaintext", "register-user-plaintext", string.Empty));
subscriptions.Should().Contain(("pubsub", "D", "D", "true"));
subscriptions.Should().Contain(("pubsub", "E", "E", "false"));
} }
} }
} }