309 lines
9.5 KiB
C#
309 lines
9.5 KiB
C#
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
using System.Reflection;
|
|
using System.Text;
|
|
using RabbitMQ.Client;
|
|
using RabbitMQ.Client.Events;
|
|
using TestApplication.Shared;
|
|
|
|
namespace TestApplication.RabbitMq;
|
|
|
|
internal static class Program
|
|
{
|
|
#if RABBITMQ_7_0_0_OR_GREATER
|
|
public static async Task Main(string[] args)
|
|
{
|
|
ConsoleHelper.WriteSplashScreen(args);
|
|
|
|
var factory = new ConnectionFactory { HostName = "localhost", Port = int.Parse(GetRabbitMqPort(args)) };
|
|
await using var connection = await factory.CreateConnectionAsync();
|
|
await using var channel = await connection.CreateChannelAsync();
|
|
|
|
Console.WriteLine(channel.GetType().FullName);
|
|
|
|
await channel.QueueDeclareAsync(
|
|
queue: "hello",
|
|
durable: false,
|
|
exclusive: false,
|
|
autoDelete: false,
|
|
arguments: null);
|
|
|
|
const string message = "Hello World!";
|
|
ReadOnlyMemory<byte> body = Encoding.UTF8.GetBytes(message);
|
|
|
|
await channel.BasicPublishAsync(
|
|
exchange: string.Empty,
|
|
routingKey: "hello",
|
|
body: body,
|
|
mandatory: false);
|
|
Console.WriteLine($"Sent: {message}");
|
|
|
|
var consumer = new AsyncEventingBasicConsumer(channel);
|
|
|
|
using var resetEvent = new ManualResetEventSlim(false);
|
|
|
|
consumer.ReceivedAsync += (model, ea) =>
|
|
{
|
|
var receivedBody = ea.Body.ToArray();
|
|
var receivedMessage = Encoding.UTF8.GetString(receivedBody);
|
|
Console.WriteLine($"Received: {receivedMessage}");
|
|
resetEvent.Set();
|
|
return Task.CompletedTask;
|
|
};
|
|
|
|
await channel.BasicConsumeAsync(queue: "hello", autoAck: true, consumer);
|
|
|
|
resetEvent.Wait(TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
private static string GetRabbitMqPort(string[] args)
|
|
{
|
|
if (args.Length > 1)
|
|
{
|
|
return args[1];
|
|
}
|
|
|
|
return "5672";
|
|
}
|
|
#else
|
|
private const string RoutingKey = "hello";
|
|
private static readonly TimeSpan DefaultWaitTimeout = TimeSpan.FromSeconds(10);
|
|
private static int _messageNumber;
|
|
|
|
// based on tutorials from https://www.rabbitmq.com/tutorials/
|
|
|
|
public static int Main(string[] args)
|
|
{
|
|
ConsoleHelper.WriteSplashScreen(args);
|
|
|
|
var result = PublishAndConsumeWithSyncDispatcher(args);
|
|
if (result != 0)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
return PublishAndConsumeAsyncWithAsyncDispatcher(args);
|
|
}
|
|
|
|
private static int PublishAndConsumeWithSyncDispatcher(string[] args)
|
|
{
|
|
var syncConsumersConnectionFactory = new ConnectionFactory { HostName = "localhost", Port = int.Parse(GetRabbitMqPort(args)) };
|
|
using var syncConsumersConnection = syncConsumersConnectionFactory.CreateConnection();
|
|
using var syncConsumersModel = syncConsumersConnection.CreateModel();
|
|
|
|
syncConsumersModel.QueueDeclare(
|
|
queue: RoutingKey,
|
|
durable: false,
|
|
exclusive: false,
|
|
autoDelete: false,
|
|
arguments: null);
|
|
|
|
Publish(syncConsumersModel, GetTestMessage(), null);
|
|
|
|
for (var i = 0; i < 5; i++)
|
|
{
|
|
var basicGetResult = syncConsumersModel.BasicGet(queue: RoutingKey, true);
|
|
if (basicGetResult is not null)
|
|
{
|
|
ProcessReceivedMessage(basicGetResult.Body);
|
|
break;
|
|
}
|
|
|
|
Console.WriteLine($"Read nothing, retrying: {i}");
|
|
}
|
|
|
|
Publish(syncConsumersModel, GetTestMessage(), syncConsumersModel.CreateBasicProperties());
|
|
|
|
var consumer = new TestSyncConsumer(syncConsumersModel);
|
|
|
|
using var mre = new ManualResetEventSlim(false);
|
|
|
|
consumer.Received += (_, ea) =>
|
|
{
|
|
Console.WriteLine("[x] Handling BasicDeliver in TestSyncConsumer.");
|
|
ProcessReceivedMessage(ea.Body);
|
|
mre.Set();
|
|
};
|
|
|
|
var consumerTag = syncConsumersModel.BasicConsume(RoutingKey, true, consumer);
|
|
if (!mre.Wait(DefaultWaitTimeout))
|
|
{
|
|
Console.WriteLine("Timed-out waiting for callback to complete.");
|
|
return 1;
|
|
}
|
|
|
|
syncConsumersModel.BasicCancel(consumerTag);
|
|
return 0;
|
|
}
|
|
|
|
private static int PublishAndConsumeAsyncWithAsyncDispatcher(string[] args)
|
|
{
|
|
var asyncConsumersConnectionFactory = new ConnectionFactory
|
|
{
|
|
HostName = "localhost",
|
|
Port = int.Parse(GetRabbitMqPort(args)),
|
|
DispatchConsumersAsync = true
|
|
};
|
|
using var asyncConsumersConnection = asyncConsumersConnectionFactory.CreateConnection();
|
|
using var asyncConsumersModel = asyncConsumersConnection.CreateModel();
|
|
|
|
Publish(asyncConsumersModel, GetTestMessage(), asyncConsumersModel.CreateBasicProperties());
|
|
var asyncConsumer = new TestAsyncConsumer(asyncConsumersModel);
|
|
|
|
using var mre = new ManualResetEventSlim(false);
|
|
|
|
asyncConsumer.Received += async (_, ea) =>
|
|
{
|
|
Console.WriteLine("[x] Handling BasicDeliver in TestAsyncConsumer.");
|
|
ProcessReceivedMessage(ea.Body);
|
|
await Task.Yield();
|
|
mre.Set();
|
|
};
|
|
|
|
var asyncConsumerTag = asyncConsumersModel.BasicConsume(RoutingKey, true, asyncConsumer);
|
|
|
|
if (!mre.Wait(DefaultWaitTimeout))
|
|
{
|
|
Console.WriteLine("Timed-out waiting for callback to complete.");
|
|
return 1;
|
|
}
|
|
|
|
asyncConsumersModel.BasicCancel(asyncConsumerTag);
|
|
|
|
if (!Close(asyncConsumersModel))
|
|
{
|
|
Console.WriteLine("Timed-out waiting for consumer to close.");
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
private static bool Close(IModel asyncConsumersModel)
|
|
{
|
|
#if RABBITMQ_6_0_0_OR_GREATER
|
|
const string delegateFieldName = "_delegate";
|
|
#else
|
|
const string delegateFieldName = "m_delegate";
|
|
#endif
|
|
var delegateField = asyncConsumersModel.GetType().GetField(delegateFieldName, BindingFlags.NonPublic | BindingFlags.Instance);
|
|
var delegateFieldValue = delegateField?.GetValue(asyncConsumersModel);
|
|
var closeMethod = delegateFieldValue?.GetType().GetMethod(
|
|
"Close",
|
|
BindingFlags.Instance | BindingFlags.Public,
|
|
null,
|
|
[typeof(ushort), typeof(string), typeof(bool)],
|
|
null);
|
|
#if RABBITMQ_6_0_0_OR_GREATER
|
|
var task = (Task)closeMethod?.Invoke(delegateFieldValue, [(ushort)200, "Goodbye", true])!;
|
|
#else
|
|
var task = Task.Run(() =>
|
|
{
|
|
closeMethod?.Invoke(delegateFieldValue, [(ushort)200, "Goodbye", true]);
|
|
});
|
|
#endif
|
|
return task.Wait(DefaultWaitTimeout);
|
|
}
|
|
|
|
#if RABBITMQ_6_0_0_OR_GREATER
|
|
private static void ProcessReceivedMessage(ReadOnlyMemory<byte> messageBody)
|
|
#else
|
|
private static void ProcessReceivedMessage(byte[] messageBody)
|
|
#endif
|
|
{
|
|
var receivedMessageContent = Encoding.UTF8.GetString(messageBody.ToArray());
|
|
Console.WriteLine($" [x] Received {receivedMessageContent}");
|
|
}
|
|
|
|
private static void Publish(IModel channel, string message, IBasicProperties? basicProperties)
|
|
{
|
|
channel.BasicPublish(
|
|
exchange: string.Empty,
|
|
routingKey: RoutingKey,
|
|
basicProperties: basicProperties,
|
|
body: Encoding.UTF8.GetBytes(message));
|
|
Console.WriteLine($" [x] Sent {message}");
|
|
}
|
|
|
|
private static string GetTestMessage()
|
|
{
|
|
return $"Hello World!{_messageNumber++}";
|
|
}
|
|
|
|
private static string GetRabbitMqPort(string[] args)
|
|
{
|
|
if (args.Length > 1)
|
|
{
|
|
return args[1];
|
|
}
|
|
|
|
return "5672";
|
|
}
|
|
|
|
// Custom consumer classes, with implementation (simplified) based on EventingBasicConsumer/AsyncEventingBasicConsumer
|
|
// from the library.
|
|
private class TestAsyncConsumer : AsyncDefaultBasicConsumer
|
|
{
|
|
public TestAsyncConsumer(IModel model)
|
|
: base(model)
|
|
{
|
|
}
|
|
|
|
public event AsyncEventHandler<BasicDeliverEventArgs>? Received;
|
|
|
|
public override Task? HandleBasicDeliver(
|
|
string consumerTag,
|
|
ulong deliveryTag,
|
|
bool redelivered,
|
|
string exchange,
|
|
string routingKey,
|
|
IBasicProperties properties,
|
|
#if RABBITMQ_6_0_0_OR_GREATER
|
|
ReadOnlyMemory<byte> body)
|
|
#else
|
|
byte[] body)
|
|
#endif
|
|
{
|
|
return Received?.Invoke(
|
|
this,
|
|
new BasicDeliverEventArgs(
|
|
consumerTag,
|
|
deliveryTag,
|
|
redelivered,
|
|
exchange,
|
|
routingKey,
|
|
properties,
|
|
body));
|
|
}
|
|
}
|
|
|
|
private class TestSyncConsumer : DefaultBasicConsumer
|
|
{
|
|
public TestSyncConsumer(IModel model)
|
|
: base(model)
|
|
{
|
|
}
|
|
|
|
public event EventHandler<BasicDeliverEventArgs>? Received;
|
|
|
|
public override void HandleBasicDeliver(
|
|
string consumerTag,
|
|
ulong deliveryTag,
|
|
bool redelivered,
|
|
string exchange,
|
|
string routingKey,
|
|
IBasicProperties properties,
|
|
#if RABBITMQ_6_0_0_OR_GREATER
|
|
ReadOnlyMemory<byte> body)
|
|
#else
|
|
byte[] body)
|
|
#endif
|
|
{
|
|
base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
|
|
Received?.Invoke(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
|
|
}
|
|
}
|
|
#endif
|
|
}
|