Kafka Producer with Dapr Consumer Sample (#223)

* raw payload sample

Signed-off-by: Fernando Rocha <fernando@diagrid.io>

* updating readme

Signed-off-by: Fernando Rocha <fernando@diagrid.io>

---------

Signed-off-by: Fernando Rocha <fernando@diagrid.io>
This commit is contained in:
Fernando Rocha 2025-02-04 13:11:43 -08:00 committed by GitHub
parent ad548a185d
commit 2a438ee9d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 824 additions and 0 deletions

View File

@ -35,6 +35,8 @@ If you are new to Dapr, you may want to review following resources first:
| [Workflow + external endpoint invocation](./workflow-external-invocation) | Demonstrates how to use the Dapr Workflow API to coordinate an order process that includes an activity which uses service invocation for non-Dapr endpoints. |
| [Workflow + multi-app microservice in Python](./workflow-orderprocessing-python) | Demonstrates how to use the Dapr Workflow Python SDK to coordinate an order process across multiple dapr-enabled microservices. |
| [Outbox in .NET](./outbox) | Demonstrates how to use the outbox transactions with Redis and MySql statestores and Redis as the message broker |
| [Consuming Kafka messages without CloudEvents](./pubsub-raw-payload) | Demonstrates how to integrate a Kafka producer using the Confluent Kafka SDK with a Dapr-powered consumer in .NET applications |
| [AWS EKS Pod Identity](./dapr-eks-podidentity) | Demonstrates how to set up Dapr with AWS EKS Pod Identity for accessing AWS Secrets Manager |
## External samples

414
pubsub-raw-payload/.gitignore vendored Normal file
View File

@ -0,0 +1,414 @@
# User-specific files
*.rsuser
*.suo
*.user
*.userosscache
*.sln.docstates
# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
# Mono auto generated files
mono_crash.*
# Build results
[Dd]ebug/
[Dd]ebugPublic/
[Rr]elease/
[Rr]eleases/
x64/
x86/
[Ww][Ii][Nn]32/
[Aa][Rr][Mm]/
[Aa][Rr][Mm]64/
bld/
[Bb]in/
[Oo]bj/
[Ll]og/
[Ll]ogs/
# Visual Studio 2015/2017 cache/options directory
.vs/
# Uncomment if you have tasks that create the project's static files in wwwroot
#wwwroot/
# Visual Studio 2017 auto generated files
Generated\ Files/
# MSTest test Results
[Tt]est[Rr]esult*/
[Bb]uild[Ll]og.*
# NUnit
*.VisualState.xml
TestResult.xml
nunit-*.xml
# Build Results of an ATL Project
[Dd]ebugPS/
[Rr]eleasePS/
dlldata.c
# Benchmark Results
BenchmarkDotNet.Artifacts/
# .NET Core
project.lock.json
project.fragment.lock.json
artifacts/
# ASP.NET Scaffolding
ScaffoldingReadMe.txt
# StyleCop
StyleCopReport.xml
# Files built by Visual Studio
*_i.c
*_p.c
*_h.h
*.ilk
*.meta
*.obj
*.iobj
*.pch
*.pdb
*.ipdb
*.pgc
*.pgd
*.rsp
*.sbr
*.tlb
*.tli
*.tlh
*.tmp
*.tmp_proj
*_wpftmp.csproj
*.log
*.tlog
*.vspscc
*.vssscc
.builds
*.pidb
*.svclog
*.scc
# Chutzpah Test files
_Chutzpah*
# Visual C++ cache files
ipch/
*.aps
*.ncb
*.opendb
*.opensdf
*.sdf
*.cachefile
*.VC.db
*.VC.VC.opendb
# Visual Studio profiler
*.psess
*.vsp
*.vspx
*.sap
# Visual Studio Trace Files
*.e2e
# TFS 2012 Local Workspace
$tf/
# Guidance Automation Toolkit
*.gpState
# ReSharper is a .NET coding add-in
_ReSharper*/
*.[Rr]e[Ss]harper
*.DotSettings.user
# TeamCity is a build add-in
_TeamCity*
# DotCover is a Code Coverage Tool
*.dotCover
# AxoCover is a Code Coverage Tool
.axoCover/*
!.axoCover/settings.json
# Coverlet is a free, cross platform Code Coverage Tool
coverage*.json
coverage*.xml
coverage*.info
# Visual Studio code coverage results
*.coverage
*.coveragexml
# NCrunch
_NCrunch_*
.*crunch*.local.xml
nCrunchTemp_*
# MightyMoose
*.mm.*
AutoTest.Net/
# Web workbench (sass)
.sass-cache/
# Installshield output folder
[Ee]xpress/
# DocProject is a documentation generator add-in
DocProject/buildhelp/
DocProject/Help/*.HxT
DocProject/Help/*.HxC
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/Html2
DocProject/Help/html
# Click-Once directory
publish/
# Publish Web Output
*.[Pp]ublish.xml
*.azurePubxml
# Note: Comment the next line if you want to checkin your web deploy settings,
# but database connection strings (with potential passwords) will be unencrypted
*.pubxml
*.publishproj
# Microsoft Azure Web App publish settings. Comment the next line if you want to
# checkin your Azure Web App publish settings, but sensitive information contained
# in these scripts will be unencrypted
PublishScripts/
# NuGet Packages
*.nupkg
# NuGet Symbol Packages
*.snupkg
# The packages folder can be ignored because of Package Restore
**/[Pp]ackages/*
# except build/, which is used as an MSBuild target.
!**/[Pp]ackages/build/
# Uncomment if necessary however generally it will be regenerated when needed
#!**/[Pp]ackages/repositories.config
# NuGet v3's project.json files produces more ignorable files
*.nuget.props
*.nuget.targets
# Microsoft Azure Build Output
csx/
*.build.csdef
# Microsoft Azure Emulator
ecf/
rcf/
# Windows Store app package directories and files
AppPackages/
BundleArtifacts/
Package.StoreAssociation.xml
_pkginfo.txt
*.appx
*.appxbundle
*.appxupload
# Visual Studio cache files
# files ending in .cache can be ignored
*.[Cc]ache
# but keep track of directories ending in .cache
!?*.[Cc]ache/
# Others
ClientBin/
~$*
*~
*.dbmdl
*.dbproj.schemaview
*.jfm
*.pfx
*.publishsettings
orleans.codegen.cs
# Including strong name files can present a security risk
# (https://github.com/github/gitignore/pull/2483#issue-259490424)
#*.snk
# Since there are multiple workflows, uncomment next line to ignore bower_components
# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
#bower_components/
# RIA/Silverlight projects
Generated_Code/
# Backup & report files from converting an old project file
# to a newer Visual Studio version. Backup files are not needed,
# because we have git ;-)
_UpgradeReport_Files/
Backup*/
UpgradeLog*.XML
UpgradeLog*.htm
ServiceFabricBackup/
*.rptproj.bak
# SQL Server files
*.mdf
*.ldf
*.ndf
# Business Intelligence projects
*.rdl.data
*.bim.layout
*.bim_*.settings
*.rptproj.rsuser
*- [Bb]ackup.rdl
*- [Bb]ackup ([0-9]).rdl
*- [Bb]ackup ([0-9][0-9]).rdl
# Microsoft Fakes
FakesAssemblies/
# GhostDoc plugin setting file
*.GhostDoc.xml
# Node.js Tools for Visual Studio
.ntvs_analysis.dat
node_modules/
# Visual Studio 6 build log
*.plg
# Visual Studio 6 workspace options file
*.opt
# Visual Studio 6 auto-generated workspace file (contains which files were open etc.)
*.vbw
# Visual Studio 6 auto-generated project file (contains which files were open etc.)
*.vbp
# Visual Studio 6 workspace and project file (working project files containing files to include in project)
*.dsw
*.dsp
# Visual Studio 6 technical files
# Visual Studio LightSwitch build output
**/*.HTMLClient/GeneratedArtifacts
**/*.DesktopClient/GeneratedArtifacts
**/*.DesktopClient/ModelManifest.xml
**/*.Server/GeneratedArtifacts
**/*.Server/ModelManifest.xml
_Pvt_Extensions
# Paket dependency manager
.paket/paket.exe
paket-files/
# FAKE - F# Make
.fake/
# CodeRush personal settings
.cr/personal
# Python Tools for Visual Studio (PTVS)
__pycache__/
*.pyc
# Cake - Uncomment if you are using it
# tools/**
# !tools/packages.config
# Tabs Studio
*.tss
# Telerik's JustMock configuration file
*.jmconfig
# BizTalk build output
*.btp.cs
*.btm.cs
*.odx.cs
*.xsd.cs
# OpenCover UI analysis results
OpenCover/
# Azure Stream Analytics local run output
ASALocalRun/
# MSBuild Binary and Structured Log
*.binlog
# NVidia Nsight GPU debugger configuration file
*.nvuser
# MFractors (Xamarin productivity tool) working folder
.mfractor/
# Local History for Visual Studio
.localhistory/
# Visual Studio History (VSHistory) files
.vshistory/
# BeatPulse healthcheck temp database
healthchecksdb
# Backup folder for Package Reference Convert tool in Visual Studio 2017
MigrationBackup/
# Ionide (cross platform F# VS Code tools) working folder
.ionide/
# Fody - auto-generated XML schema
FodyWeavers.xsd
# VS Code files for those working on multiple tools
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace
# Local History for Visual Studio Code
.history/
# Windows Installer files from build outputs
*.cab
*.msi
*.msix
*.msm
*.msp
# JetBrains Rider
*.sln.iml
### DotnetCore ###
# .NET Core build folders
bin/
obj/
# Common node modules locations
/node_modules
/wwwroot/node_modules
### VisualStudioCode ###
!.vscode/*.code-snippets
# Local History for Visual Studio Code
# Built Visual Studio Code Extensions
*.vsix
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
.ionide

View File

@ -0,0 +1,131 @@
# Consuming Kafka messages without CloudEvents
## Sample info
| Attribute | Details |
|--------|--------|
| Dapr runtime version | v1.14.4 |
| Dapr .NET SDK | v1.14.0 |
| Language | C# |
| Environment | Local |
## Overview
This sample demonstrates how to integrate a Kafka producer using the Confluent Kafka SDK with a Dapr-powered consumer in .NET applications. The producer publishes messages directly to Kafka, while the consumer uses Dapr's pub/sub building block to receive them. These messages are not wrapped as CloudEvents, which is the default Dapr behaviour when publishing/subscribing to messages.
You can find more details about publishing & subscribing messages without CloudEvents [here](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-raw).
## Prerequisites
- [.NET 8 SDK](https://dotnet.microsoft.com/download)
- [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/)
- [Docker](https://www.docker.com/products/docker-desktop)
## Setup
1. Clone the repository
2. Navigate to the solution folder:
```bash
cd pubsub-raw-payload
```
3. Start Kafka using Docker Compose:
```bash
docker-compose up -d
```
## Running the Applications
1. Start the Dapr Subscriber:
```bash
dapr run --app-id subscriber \
--app-port 5001 \
--dapr-http-port 3501 \
--resources-path ./components \
-- dotnet run --project src/Subscriber/Subscriber.csproj
```
2. In a new terminal, start the Kafka Publisher:
```bash
dotnet run --project src/Publisher/Publisher.csproj
```
## Subscription Configuration
### Programmatic Subscription
The subscriber uses programmatic subscription configured in code:
```csharp
app.MapGet("/dapr/subscribe", () =>
{
var subscriptions = new[]
{
new
{
pubsubname = "pubsub",
topic = "messages",
route = "/messages",
metadata = new Dictionary<string, string>
{
{ "isRawPayload", "true" }
}
}
};
return Results.Ok(subscriptions);
});
```
### Declarative Subscription
Alternatively, create a `subscription.yaml` in your components directory:
```yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: message-subscription
spec:
topic: messages
routes:
default: /messages
pubsubname: pubsub
metadata:
isRawPayload: "true"
```
When using declarative subscriptions:
1. Remove the `/dapr/subscribe` endpoint from your subscriber application
2. Place the `subscription.yaml` file in your components directory
3. The subscription will be automatically loaded when you start your application
## Testing
To publish a message:
```bash
curl -X POST http://localhost:5000/publish
```
The subscriber will display received messages in its console output.
## Stopping the Applications
1. Stop the running applications using Ctrl+C in each terminal
2. Stop Kafka:
```bash
docker-compose down
```
## Important Notes
1. The `isRawPayload` attribute is required for receiving raw JSON messages in .NET applications
2. The publisher uses the Confluent.Kafka client directly to publish messages to Kafka
3. The subscriber uses Dapr's pub/sub building block to consume messages
4. Make sure your Kafka broker is running before starting the applications

View File

@ -0,0 +1,16 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: localhost:9092
- name: consumerGroup
value: dapr
- name: clientId
value: dapr-pubsub-sample
- name: authRequired
value: false

View File

@ -0,0 +1,13 @@
# apiVersion: dapr.io/v2alpha1
# kind: Subscription
# metadata:
# name: message-subscription
# spec:
# topic: messages
# routes:
# default: /messages
# pubsubname: pubsub
# metadata:
# isRawPayload: "true"
# scopes:
# - subscriber

View File

@ -0,0 +1,21 @@
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

View File

@ -0,0 +1,43 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.2.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{827E0CD3-B72D-47B6-A68D-7590B98EB39B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Publisher", "src\Publisher\Publisher.csproj", "{B53A3F93-644F-077D-263C-2A9461829575}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shared", "src\Shared\Shared.csproj", "{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Subscriber", "src\Subscriber\Subscriber.csproj", "{B93D2770-CD58-5609-5939-2FC86CCE9651}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{B53A3F93-644F-077D-263C-2A9461829575}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B53A3F93-644F-077D-263C-2A9461829575}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B53A3F93-644F-077D-263C-2A9461829575}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B53A3F93-644F-077D-263C-2A9461829575}.Release|Any CPU.Build.0 = Release|Any CPU
{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73}.Debug|Any CPU.Build.0 = Debug|Any CPU
{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73}.Release|Any CPU.ActiveCfg = Release|Any CPU
{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73}.Release|Any CPU.Build.0 = Release|Any CPU
{B93D2770-CD58-5609-5939-2FC86CCE9651}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B93D2770-CD58-5609-5939-2FC86CCE9651}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B93D2770-CD58-5609-5939-2FC86CCE9651}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B93D2770-CD58-5609-5939-2FC86CCE9651}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{B53A3F93-644F-077D-263C-2A9461829575} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
{B93D2770-CD58-5609-5939-2FC86CCE9651} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3945FF2B-1CF3-4CB9-835C-A0E05C800F0F}
EndGlobalSection
EndGlobal

View File

@ -0,0 +1,59 @@
using Confluent.Kafka;
using System.Text.Json;
using Shared;
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
// Kafka producer config
var producerConfig = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "kafka-producer-sample"
};
// Create producer instance
using var producer = new ProducerBuilder<string, string>(producerConfig).Build();
app.MapGet("/", () => "Publisher API");
app.MapPost("/publish", async (HttpContext context) =>
{
var message = new Message(
Guid.NewGuid().ToString(),
$"Hello at {DateTime.UtcNow}",
DateTime.UtcNow
);
try
{
// Serialize the message to JSON
var jsonMessage = JsonSerializer.Serialize(message);
// Create the Kafka message
var kafkaMessage = new Message<string, string>
{
Key = message.Id, // Using the message ID as the key
Value = jsonMessage
};
// Publish to Kafka
var deliveryResult = await producer.ProduceAsync(
"messages", // topic name
kafkaMessage
);
Console.WriteLine($"Delivered message to: {deliveryResult.TopicPartitionOffset}");
return Results.Ok(message);
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
return Results.StatusCode(500);
}
});
app.Run();
// Ensure proper cleanup
AppDomain.CurrentDomain.ProcessExit += (s, e) => producer?.Dispose();

View File

@ -0,0 +1,13 @@
{
"profiles": {
"Publisher": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
"applicationUrl": "http://localhost:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}

View File

@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Shared\Shared.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.8.0" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,9 @@
namespace Shared;
using System.Text.Json.Serialization;
public record Message(
[property: JsonPropertyName("id")] string Id,
[property: JsonPropertyName("content")] string Content,
[property: JsonPropertyName("timestamp")] DateTime Timestamp
);

View File

@ -0,0 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
</Project>

View File

@ -0,0 +1,52 @@
using System.Text.Json;
using Shared;
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
app.MapGet("/", () => "Subscriber API");
app.MapGet("/dapr/subscribe", () =>
{
var subscriptions = new[]
{
new
{
pubsubname = "pubsub",
topic = "messages",
route = "/messages",
metadata = new Dictionary<string, string>
{
{ "isRawPayload", "true" }
}
}
};
return Results.Ok(subscriptions);
});
app.MapPost("/messages", async (HttpContext context) =>
{
using var reader = new StreamReader(context.Request.Body);
var json = await reader.ReadToEndAsync();
Console.WriteLine($"Raw message received: {json}"); // Debug log
try
{
var message = JsonSerializer.Deserialize<Message>(json);
if (message != null)
{
Console.WriteLine($"Received message: {message.Id}");
Console.WriteLine($"Content: {message.Content}");
Console.WriteLine($"Timestamp: {message.Timestamp}");
}
}
catch (JsonException ex)
{
Console.WriteLine($"Error deserializing message: {ex.Message}");
return Results.BadRequest("Invalid message format");
}
return Results.Ok();
});
app.Run();

View File

@ -0,0 +1,13 @@
{
"profiles": {
"Subscriber": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
"applicationUrl": "http://localhost:5001",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}

View File

@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapr.AspNetCore" Version="1.14" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Shared\Shared.csproj" />
</ItemGroup>
</Project>