// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // ------------------------------------------------------------------------ namespace RoutingSample { using System; using System.Collections.Generic; using System.Text.Json; using System.Threading.Tasks; using Dapr; using Dapr.AspNetCore; using Dapr.Client; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; /// /// Startup class. /// public class Startup { /// /// State store name. /// public const string StoreName = "statestore"; /// /// Pubsub component name. "pubsub" is name of the default pub/sub configured by the Dapr CLI. /// public const string PubsubName = "pubsub"; /// /// Initializes a new instance of the class. /// /// Configuration. public Startup(IConfiguration configuration) { this.Configuration = configuration; } /// /// Gets the configuration. /// public IConfiguration Configuration { get; } /// /// Configures Services. /// /// Service Collection. public void ConfigureServices(IServiceCollection services) { services.AddDaprClient(); services.AddSingleton(new JsonSerializerOptions() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, PropertyNameCaseInsensitive = true, }); } /// /// Configures Application Builder and WebHost environment. /// /// Application builder. /// Webhost environment. /// Options for JSON serialization. public void Configure(IApplicationBuilder app, IWebHostEnvironment env, JsonSerializerOptions serializerOptions, ILogger logger) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseRouting(); app.UseCloudEvents(); app.UseEndpoints(endpoints => { endpoints.MapSubscribeHandler(); var depositTopicOptions = new TopicOptions(); depositTopicOptions.PubsubName = PubsubName; depositTopicOptions.Name = "deposit"; depositTopicOptions.DeadLetterTopic = "amountDeadLetterTopic"; var withdrawTopicOptions = new TopicOptions(); withdrawTopicOptions.PubsubName = PubsubName; withdrawTopicOptions.Name = "withdraw"; withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic"; var multiDepositTopicOptions = new TopicOptions { PubsubName = PubsubName, Name = "multideposit" }; var bulkSubscribeTopicOptions = new BulkSubscribeTopicOptions { TopicName = "multideposit", MaxMessagesCount = 250, MaxAwaitDurationMs = 1000 }; endpoints.MapGet("{id}", Balance); endpoints.MapPost("deposit", Deposit).WithTopic(depositTopicOptions); endpoints.MapPost("multideposit", MultiDeposit).WithTopic(multiDepositTopicOptions).WithBulkSubscribe(bulkSubscribeTopicOptions); endpoints.MapPost("deadLetterTopicRoute", ViewErrorMessage).WithTopic(PubsubName, "amountDeadLetterTopic"); endpoints.MapPost("withdraw", Withdraw).WithTopic(withdrawTopicOptions); }); async Task Balance(HttpContext context) { logger.LogInformation("Enter Balance"); var client = context.RequestServices.GetRequiredService(); var id = (string)context.Request.RouteValues["id"]; logger.LogInformation("id is {0}", id); var account = await client.GetStateAsync(StoreName, id); if (account == null) { logger.LogInformation("Account not found"); context.Response.StatusCode = 404; return; } logger.LogInformation("Account balance is {0}", account.Balance); context.Response.ContentType = "application/json"; await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions); } async Task Deposit(HttpContext context) { logger.LogInformation("Enter Deposit"); var client = context.RequestServices.GetRequiredService(); var transaction = await JsonSerializer.DeserializeAsync(context.Request.Body, serializerOptions); logger.LogInformation("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount); var account = await client.GetStateAsync(StoreName, transaction.Id); if (account == null) { account = new Account() { Id = transaction.Id, }; } if (transaction.Amount < 0m) { logger.LogInformation("Invalid amount"); context.Response.StatusCode = 400; return; } account.Balance += transaction.Amount; await client.SaveStateAsync(StoreName, transaction.Id, account); logger.LogInformation("Balance is {0}", account.Balance); context.Response.ContentType = "application/json"; await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions); } async Task MultiDeposit(HttpContext context) { logger.LogInformation("Enter bulk deposit"); var client = context.RequestServices.GetRequiredService(); var bulkMessage = await JsonSerializer.DeserializeAsync>>( context.Request.Body, serializerOptions); List entries = new List(); if (bulkMessage != null) { foreach (var entry in bulkMessage.Entries) { try { var transaction = entry.Event.Data; var state = await client.GetStateEntryAsync(StoreName, transaction.Id); state.Value ??= new Account() { Id = transaction.Id, }; logger.LogInformation("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount); if (transaction.Amount < 0m) { logger.LogInformation("Invalid amount"); context.Response.StatusCode = 400; return; } state.Value.Balance += transaction.Amount; logger.LogInformation("Balance is {0}", state.Value.Balance); await state.SaveAsync(); entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS)); } catch (Exception e) { logger.LogError(e.Message); entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.RETRY)); } } } await JsonSerializer.SerializeAsync(context.Response.Body, new BulkSubscribeAppResponse(entries), serializerOptions); } async Task ViewErrorMessage(HttpContext context) { var transaction = await JsonSerializer.DeserializeAsync(context.Request.Body, serializerOptions); logger.LogInformation("The amount cannot be negative: {0}", transaction.Amount); return; } async Task Withdraw(HttpContext context) { logger.LogInformation("Enter Withdraw"); var client = context.RequestServices.GetRequiredService(); var transaction = await JsonSerializer.DeserializeAsync(context.Request.Body, serializerOptions); logger.LogInformation("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount); var account = await client.GetStateAsync(StoreName, transaction.Id); if (account == null) { logger.LogInformation("Account not found"); context.Response.StatusCode = 404; return; } if (transaction.Amount < 0m) { logger.LogInformation("Invalid amount"); context.Response.StatusCode = 400; return; } account.Balance -= transaction.Amount; await client.SaveStateAsync(StoreName, transaction.Id, account); logger.LogInformation("Balance is {0}", account.Balance); context.Response.ContentType = "application/json"; await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions); } } } }