Migrate PubSub removing flaky test

Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com>
This commit is contained in:
Matheus Cruz 2025-06-10 10:44:40 -03:00
parent 3a8fd611da
commit b40a5b23c7
3 changed files with 931 additions and 0 deletions

View File

@ -0,0 +1,637 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.testcontainers.pubsub.http;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.Metadata;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.config.Properties;
import io.dapr.it.pubsub.http.PubSubIT;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.spring.boot.autoconfigure.client.DaprClientAutoConfiguration;
import io.dapr.testcontainers.DaprContainer;
import io.dapr.testcontainers.DaprLogLevel;
import io.dapr.utils.TypeRef;
import org.assertj.core.api.SoftAssertions;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import static io.dapr.it.Retry.callWithRetry;
import static io.dapr.it.TestUtils.assertThrowsDaprException;
import static io.dapr.it.TestUtils.assertThrowsDaprExceptionWithReason;
import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
classes = {
TestPubSubApplication.class
}
)
@Testcontainers
@Tag("testcontainers")
public class DaprPubSubIT {
private static final Network DAPR_NETWORK = Network.newNetwork();
private static final Random RANDOM = new Random();
private static final int PORT = RANDOM.nextInt(1000) + 8000;
private static final String APP_FOUND_MESSAGE_PATTERN = ".*application discovered on port.*";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String PUBSUB_APP_ID = "pubsub-dapr-app";
private static final String PUBSUB_NAME = "pubsub";
// topics
private static final String TOPIC_BULK = "testingbulktopic";
private static final String TOPIC_NAME = "testingtopic";
private static final String ANOTHER_TOPIC_NAME = "anothertopic";
private static final String TYPED_TOPIC_NAME = "typedtestingtopic";
private static final String BINARY_TOPIC_NAME = "binarytopic";
private static final String TTL_TOPIC_NAME = "ttltopic";
private static final String LONG_TOPIC_NAME = "testinglongvalues";
private static final int NUM_MESSAGES = 10;
// typeRefs
private static final TypeRef<List<CloudEvent>> CLOUD_EVENT_LIST_TYPE_REF = new TypeRef<>() {
};
private static final TypeRef<List<CloudEvent<PubSubIT.ConvertToLong>>> CLOUD_EVENT_LONG_LIST_TYPE_REF =
new TypeRef<>() {
};
private static final TypeRef<List<CloudEvent<PubSubIT.MyObject>>> CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF =
new TypeRef<>() {
};
@Container
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG)
.withAppName(PUBSUB_APP_ID)
.withNetwork(DAPR_NETWORK)
.withDaprLogLevel(DaprLogLevel.DEBUG)
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
.withAppChannelAddress("host.testcontainers.internal")
.withAppPort(PORT);
/**
* Expose the Dapr ports to the host.
*
* @param registry the dynamic property registry
*/
@DynamicPropertySource
static void daprProperties(DynamicPropertyRegistry registry) {
registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint);
registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint);
registry.add("server.port", () -> PORT);
}
@BeforeEach
public void setUp() {
org.testcontainers.Testcontainers.exposeHostPorts(PORT);
}
@Test
@DisplayName("Should receive INVALID_ARGUMENT when the specified Pub/Sub name does not exist")
public void shouldReceiveInvalidArgument() throws Exception {
Wait.forLogMessage(APP_FOUND_MESSAGE_PATTERN, 1).waitUntilReady(DAPR_CONTAINER);
try (DaprClient client = createDaprClientBuilder().build()) {
assertThrowsDaprExceptionWithReason(
"INVALID_ARGUMENT",
"INVALID_ARGUMENT: pubsub unknown pubsub is not found",
"DAPR_PUBSUB_NOT_FOUND",
() -> client.publishEvent("unknown pubsub", "mytopic", "payload").block());
}
}
@Test
@DisplayName("Should receive INVALID_ARGUMENT using bulk publish when the specified Pub/Sub name does not exist")
public void shouldReceiveInvalidArgumentWithBulkPublish() throws Exception {
try (DaprPreviewClient client = createDaprClientBuilder().buildPreviewClient()) {
assertThrowsDaprException(
"INVALID_ARGUMENT",
"INVALID_ARGUMENT: pubsub unknown pubsub is not found",
() -> client.publishEvents("unknown pubsub", "mytopic", "text/plain", "message").block());
}
}
@Test
@DisplayName("Should publish some payload types successfully")
public void shouldPublishSomePayloadTypesWithNoError() throws Exception {
DaprObjectSerializer serializer = createJacksonObjectSerializer();
try (
DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build();
DaprPreviewClient previewClient = createDaprClientBuilder().withObjectSerializer(serializer)
.buildPreviewClient()
) {
publishBulkStringsAsserting(previewClient);
publishMyObjectAsserting(previewClient);
publishByteAsserting(previewClient);
publishCloudEventAsserting(previewClient);
Thread.sleep(10000);
callWithRetry(() -> validatePublishedMessages(client), 2000);
}
}
@Test
@DisplayName("Should publish various payload types to different topics")
public void testPubSub() throws Exception {
DaprObjectSerializer serializer = createJacksonObjectSerializer();
// Send a batch of messages on one topic
try (DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build()) {
sendBulkMessagesAsText(client, TOPIC_NAME);
sendBulkMessagesAsText(client, ANOTHER_TOPIC_NAME);
//Publishing an object.
PubSubIT.MyObject object = new PubSubIT.MyObject();
object.setId("123");
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, object).block();
System.out.println("Published one object.");
client.publishEvent(PUBSUB_NAME, TYPED_TOPIC_NAME, object).block();
System.out.println("Published another object.");
//Publishing a single byte: Example of non-string based content published
publishOneByteSync(client, TOPIC_NAME);
CloudEvent<String> cloudEvent = new CloudEvent<>();
cloudEvent.setId("1234");
cloudEvent.setData("message from cloudevent");
cloudEvent.setSource("test");
cloudEvent.setSpecversion("1");
cloudEvent.setType("myevent");
cloudEvent.setDatacontenttype("text/plain");
//Publishing a cloud event.
client.publishEvent(new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEvent)
.setContentType("application/cloudevents+json")).block();
System.out.println("Published one cloud event.");
{
CloudEvent<String> cloudEventV2 = new CloudEvent<>();
cloudEventV2.setId("2222");
cloudEventV2.setData("message from cloudevent v2");
cloudEventV2.setSource("test");
cloudEventV2.setSpecversion("1");
cloudEventV2.setType("myevent.v2");
cloudEventV2.setDatacontenttype("text/plain");
client.publishEvent(
new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV2)
.setContentType("application/cloudevents+json")).block();
System.out.println("Published one cloud event for v2.");
}
{
CloudEvent<String> cloudEventV3 = new CloudEvent<>();
cloudEventV3.setId("3333");
cloudEventV3.setData("message from cloudevent v3");
cloudEventV3.setSource("test");
cloudEventV3.setSpecversion("1");
cloudEventV3.setType("myevent.v3");
cloudEventV3.setDatacontenttype("text/plain");
client.publishEvent(
new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV3)
.setContentType("application/cloudevents+json")).block();
System.out.println("Published one cloud event for v3.");
}
Thread.sleep(2000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + TOPIC_NAME);
List<CloudEvent> messages = client.invokeMethod(
PUBSUB_APP_ID,
"messages/testingtopic",
null,
HttpExtension.GET,
CLOUD_EVENT_LIST_TYPE_REF
).block();
assertThat(messages)
.hasSize(13)
.extracting(CloudEvent::getData)
.filteredOn(Objects::nonNull)
.contains(
"AQ==",
"message from cloudevent"
);
for (int i = 0; i < NUM_MESSAGES; i++) {
String expectedMessage = String.format("This is message #%d on topic %s", i, TOPIC_NAME);
assertThat(messages)
.extracting(CloudEvent::getData)
.filteredOn(Objects::nonNull)
.anyMatch(expectedMessage::equals);
}
assertThat(messages)
.extracting(CloudEvent::getData)
.filteredOn(LinkedHashMap.class::isInstance)
.map(data -> (String) ((LinkedHashMap<?, ?>) data).get("id"))
.contains("123");
}, 2000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + TOPIC_NAME + " V2");
List<CloudEvent> messages = client.invokeMethod(
PUBSUB_APP_ID,
"messages/testingtopicV2",
null,
HttpExtension.GET,
CLOUD_EVENT_LIST_TYPE_REF
).block();
assertThat(messages)
.hasSize(1);
}, 2000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + TOPIC_NAME + " V3");
List<CloudEvent> messages = client.invokeMethod(
PUBSUB_APP_ID,
"messages/testingtopicV3",
null,
HttpExtension.GET,
CLOUD_EVENT_LIST_TYPE_REF
).block();
assertThat(messages)
.hasSize(1);
}, 2000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + TYPED_TOPIC_NAME);
List<CloudEvent<PubSubIT.MyObject>> messages = client.invokeMethod(
PUBSUB_APP_ID,
"messages/typedtestingtopic",
null,
HttpExtension.GET,
CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF
).block();
assertThat(messages)
.extracting(CloudEvent::getData)
.filteredOn(Objects::nonNull)
.filteredOn(PubSubIT.MyObject.class::isInstance)
.map(PubSubIT.MyObject::getId)
.contains("123");
}, 2000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + ANOTHER_TOPIC_NAME);
List<CloudEvent> messages = client.invokeMethod(
PUBSUB_APP_ID,
"messages/anothertopic",
null,
HttpExtension.GET,
CLOUD_EVENT_LIST_TYPE_REF
).block();
assertThat(messages)
.hasSize(10);
for (int i = 0; i < NUM_MESSAGES; i++) {
String expectedMessage = String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME);
assertThat(messages)
.extracting(CloudEvent::getData)
.filteredOn(Objects::nonNull)
.anyMatch(expectedMessage::equals);
}
}, 2000);
}
}
@Test
@DisplayName("Should publish binary payload type successfully")
public void shouldPublishBinary() throws Exception {
DaprObjectSerializer serializer = createBinaryObjectSerializer();
try (DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build()) {
publishOneByteSync(client, BINARY_TOPIC_NAME);
}
Thread.sleep(3000);
try (DaprClient client = createDaprClientBuilder().build()) {
callWithRetry(() -> {
System.out.println("Checking results for topic " + BINARY_TOPIC_NAME);
final List<CloudEvent> messages = client.invokeMethod(
PUBSUB_APP_ID,
"messages/binarytopic",
null,
HttpExtension.GET, CLOUD_EVENT_LIST_TYPE_REF).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(messages.size()).isEqualTo(1);
softly.assertThat(messages.get(0).getData()).isNull();
softly.assertThat(messages.get(0).getBinaryData()).isEqualTo(new byte[] {1});
});
}, 2000);
}
}
private static void publishOneByteSync(DaprClient client, String topicName) {
client.publishEvent(
PUBSUB_NAME,
topicName,
new byte[] {1}).block();
}
private static void sendBulkMessagesAsText(DaprClient client, String topicName) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d on topic %s", i, topicName);
client.publishEvent(PUBSUB_NAME, topicName, message).block();
}
}
private void publishMyObjectAsserting(DaprPreviewClient previewClient) {
PubSubIT.MyObject object = new PubSubIT.MyObject();
object.setId("123");
BulkPublishResponse<PubSubIT.MyObject> response = previewClient.publishEvents(
PUBSUB_NAME,
TOPIC_BULK,
"application/json",
Collections.singletonList(object)
).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(response).isNotNull();
softly.assertThat(response.getFailedEntries().size()).isZero();
});
}
private void publishBulkStringsAsserting(DaprPreviewClient previewClient) {
List<String> messages = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
messages.add(String.format("This is message #%d on topic %s", i, TOPIC_BULK));
}
BulkPublishResponse<String> response = previewClient.publishEvents(PUBSUB_NAME, TOPIC_BULK, "", messages).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(response).isNotNull();
softly.assertThat(response.getFailedEntries().size()).isZero();
});
}
private void publishByteAsserting(DaprPreviewClient previewClient) {
BulkPublishResponse<byte[]> response = previewClient.publishEvents(
PUBSUB_NAME,
TOPIC_BULK,
"",
Collections.singletonList(new byte[] {1})
).block();
SoftAssertions.assertSoftly(softly -> {
assertThat(response).isNotNull();
softly.assertThat(response.getFailedEntries().size()).isZero();
});
}
private void publishCloudEventAsserting(DaprPreviewClient previewClient) {
CloudEvent<String> cloudEvent = new CloudEvent<>();
cloudEvent.setId("1234");
cloudEvent.setData("message from cloudevent");
cloudEvent.setSource("test");
cloudEvent.setSpecversion("1");
cloudEvent.setType("myevent");
cloudEvent.setDatacontenttype("text/plain");
BulkPublishRequest<CloudEvent<String>> req = new BulkPublishRequest<>(
PUBSUB_NAME,
TOPIC_BULK,
Collections.singletonList(
new BulkPublishEntry<>("1", cloudEvent, "application/cloudevents+json", null)
)
);
BulkPublishResponse<CloudEvent<String>> response = previewClient.publishEvents(req).block();
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(response).isNotNull();
softly.assertThat(response.getFailedEntries().size()).isZero();
});
}
private void validatePublishedMessages(DaprClient client) {
List<CloudEvent> cloudEventMessages = client.invokeMethod(
PUBSUB_APP_ID,
"messages/redis/testingbulktopic",
null,
HttpExtension.GET,
CLOUD_EVENT_LIST_TYPE_REF
).block();
assertThat(cloudEventMessages)
.as("expected non-null list of cloud events")
.isNotNull()
.hasSize(13);
for (int i = 0; i < NUM_MESSAGES; i++) {
String expectedMessage = String.format("This is message #%d on topic %s", i, TOPIC_BULK);
assertThat(cloudEventMessages)
.as("expected text payload to match for message %d", i)
.anySatisfy(event -> assertThat(event.getData()).isEqualTo(expectedMessage));
}
assertThat(cloudEventMessages)
.filteredOn(event -> event.getData() instanceof LinkedHashMap)
.map(event -> (LinkedHashMap<?, ?>) event.getData())
.anySatisfy(map -> assertThat(map.get("id")).isEqualTo("123"));
assertThat(cloudEventMessages)
.map(CloudEvent::getData)
.anySatisfy(data -> assertThat(data).isEqualTo("AQ=="));
assertThat(cloudEventMessages)
.map(CloudEvent::getData)
.anySatisfy(data -> assertThat(data).isEqualTo("message from cloudevent"));
}
@Test
@DisplayName("Should publish with TTL")
public void testPubSubTTLMetadata() throws Exception {
// Send a batch of messages on one topic, all to be expired in 1 second.
try (DaprClient client = createDaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d on topic %s", i, TTL_TOPIC_NAME);
//Publishing messages
client.publishEvent(
PUBSUB_NAME,
TTL_TOPIC_NAME,
message,
Map.of(Metadata.TTL_IN_SECONDS, "1"))
.block();
System.out.printf("Published message: '%s' to topic '%s' pubsub_name '%s'%n", message, TOPIC_NAME, PUBSUB_NAME);
}
}
// Sleeps for two seconds to let them expire.
Thread.sleep(2000);
try (DaprClient client = createDaprClientBuilder().build()) {
callWithRetry(() -> {
System.out.println("Checking results for topic " + TTL_TOPIC_NAME);
final List
messages = client.invokeMethod(PUBSUB_APP_ID, "messages/" + TTL_TOPIC_NAME, null, HttpExtension.GET, List.class).block();
assertThat(messages).hasSize(0);
}, 2000);
}
}
@Test
@DisplayName("Should publish long values")
public void testLongValues() throws Exception {
Random random = new Random(590518626939830271L);
Set<PubSubIT.ConvertToLong> values = new HashSet<>();
values.add(new PubSubIT.ConvertToLong().setVal(590518626939830271L));
PubSubIT.ConvertToLong val;
for (int i = 0; i < NUM_MESSAGES - 1; i++) {
do {
val = new PubSubIT.ConvertToLong().setVal(random.nextLong());
} while (values.contains(val));
values.add(val);
}
Iterator<PubSubIT.ConvertToLong> valuesIt = values.iterator();
try (DaprClient client = createDaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
PubSubIT.ConvertToLong value = valuesIt.next();
System.out.println("The long value sent " + value.getValue());
//Publishing messages
client.publishEvent(
PUBSUB_NAME,
LONG_TOPIC_NAME,
value,
Map.of(Metadata.TTL_IN_SECONDS, "30")).block();
try {
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
return;
}
}
}
Set<PubSubIT.ConvertToLong> actual = new HashSet<>();
try (DaprClient client = createDaprClientBuilder().build()) {
callWithRetry(() -> {
System.out.println("Checking results for topic " + LONG_TOPIC_NAME);
final List<CloudEvent<PubSubIT.ConvertToLong>> messages = client.invokeMethod(
PUBSUB_APP_ID,
"messages/testinglongvalues",
null,
HttpExtension.GET, CLOUD_EVENT_LONG_LIST_TYPE_REF).block();
assertNotNull(messages);
for (CloudEvent<PubSubIT.ConvertToLong> message : messages) {
actual.add(message.getData());
}
assertThat(values).isEqualTo(actual);
}, 2000);
}
}
private static DaprClientBuilder createDaprClientBuilder() {
return new DaprClientBuilder()
.withPropertyOverride(Properties.HTTP_ENDPOINT, "http://localhost:" + DAPR_CONTAINER.getHttpPort())
.withPropertyOverride(Properties.GRPC_ENDPOINT, "http://localhost:" + DAPR_CONTAINER.getGrpcPort());
}
private DaprObjectSerializer createJacksonObjectSerializer() {
return new DaprObjectSerializer() {
@Override
public byte[] serialize(Object o) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsBytes(o);
}
@Override
public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
return OBJECT_MAPPER.readValue(data, OBJECT_MAPPER.constructType(type.getType()));
}
@Override
public String getContentType() {
return "application/json";
}
};
}
private @NotNull DaprObjectSerializer createBinaryObjectSerializer() {
return new DaprObjectSerializer() {
@Override
public byte[] serialize(Object o) {
return (byte[]) o;
}
@Override
public <T> T deserialize(byte[] data, TypeRef<T> type) {
return (T) data;
}
@Override
public String getContentType() {
return "application/octet-stream";
}
};
}
}

View File

@ -0,0 +1,271 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.testcontainers.pubsub.http;
import io.dapr.Rule;
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.it.pubsub.http.PubSubIT;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
/**
* SpringBoot Controller to handle input binding.
*/
@RestController
public class SubscriberController {
private final Map<String, List<CloudEvent<?>>> messagesByTopic = Collections.synchronizedMap(new HashMap<>());
@GetMapping(path = "/messages/{topic}")
public List<CloudEvent<?>> getMessagesByTopic(@PathVariable("topic") String topic) {
return messagesByTopic.getOrDefault(topic, Collections.emptyList());
}
private static final List<CloudEvent> messagesReceivedBulkPublishTopic = new ArrayList();
private static final List<CloudEvent> messagesReceivedTestingTopic = new ArrayList();
private static final List<CloudEvent> messagesReceivedTestingTopicV2 = new ArrayList();
private static final List<CloudEvent> messagesReceivedTestingTopicV3 = new ArrayList();
private static final List<BulkSubscribeAppResponse> responsesReceivedTestingTopicBulkSub = new ArrayList<>();
@GetMapping(path = "/messages/redis/testingbulktopic")
public List<CloudEvent> getMessagesReceivedBulkTopic() {
return messagesReceivedBulkPublishTopic;
}
@GetMapping(path = "/messages/testingtopic")
public List<CloudEvent> getMessagesReceivedTestingTopic() {
return messagesReceivedTestingTopic;
}
@GetMapping(path = "/messages/testingtopicV2")
public List<CloudEvent> getMessagesReceivedTestingTopicV2() {
return messagesReceivedTestingTopicV2;
}
@GetMapping(path = "/messages/testingtopicV3")
public List<CloudEvent> getMessagesReceivedTestingTopicV3() {
return messagesReceivedTestingTopicV3;
}
@GetMapping(path = "/messages/topicBulkSub")
public List<BulkSubscribeAppResponse> getMessagesReceivedTestingTopicBulkSub() {
System.out.println("res size: " + responsesReceivedTestingTopicBulkSub.size());
return responsesReceivedTestingTopicBulkSub;
}
@Topic(name = "testingtopic", pubsubName = "pubsub")
@PostMapping("/route1")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesReceivedTestingTopic.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "testingbulktopic", pubsubName = "pubsub")
@PostMapping("/route1_redis")
public Mono<Void> handleBulkTopicMessage(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Testing bulk publish topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesReceivedBulkPublishTopic.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "testingtopic", pubsubName = "pubsub",
rule = @Rule(match = "event.type == 'myevent.v2'", priority = 2))
@PostMapping(path = "/route1_v2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesReceivedTestingTopicV2.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "testingtopic", pubsubName = "pubsub",
rule = @Rule(match = "event.type == 'myevent.v3'", priority = 1))
@PostMapping(path = "/route1_v3")
public Mono<Void> handleMessageV3(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesReceivedTestingTopicV3.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "typedtestingtopic", pubsubName = "pubsub")
@PostMapping(path = "/route1b")
public Mono<Void> handleMessageTyped(@RequestBody(required = false) CloudEvent<PubSubIT.MyObject> envelope) {
return Mono.fromRunnable(() -> {
try {
String id = envelope.getData() == null ? "" : envelope.getData().getId();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Testing typed topic Subscriber got message with ID: " + id + "; Content-type: " + contentType);
messagesByTopic.compute("typedtestingtopic", merge(envelope));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "binarytopic", pubsubName = "pubsub")
@PostMapping(path = "/route2")
public Mono<Void> handleBinaryMessage(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Binary topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesByTopic.compute("binarytopic", merge(envelope));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "#{'another'.concat('topic')}", pubsubName = "${pubsubName:pubsub}")
@PostMapping(path = "/route3")
public Mono<Void> handleMessageAnotherTopic(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
System.out.println("Another topic Subscriber got message: " + message);
messagesByTopic.compute("anothertopic", merge(envelope));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@PostMapping(path = "/route4")
public Mono<Void> handleMessageTTLTopic(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
System.out.println("TTL topic Subscriber got message: " + message);
messagesByTopic.compute("ttltopic", merge(envelope));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "testinglongvalues", pubsubName = "pubsub")
@PostMapping(path = "/testinglongvalues")
public Mono<Void> handleMessageLongValues(@RequestBody(required = false) CloudEvent<PubSubIT.ConvertToLong> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
Long message = cloudEvent.getData().getValue();
System.out.println("Subscriber got: " + message);
messagesByTopic.compute("testinglongvalues", merge(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
/**
* Receive messages using the bulk subscribe API.
* The maxBulkSubCount and maxBulkSubAwaitDurationMs are adjusted to ensure
* that all the test messages arrive in a single batch.
*
* @param bulkMessage incoming bulk of messages from the message bus.
* @return status for each message received.
*/
@BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 100)
@Topic(name = "topicBulkSub", pubsubName = "pubsub")
@PostMapping(path = "/routeBulkSub")
public Mono<BulkSubscribeAppResponse> handleMessageBulk(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
System.out.println("bulkMessage: " + bulkMessage.getEntries().size());
if (bulkMessage.getEntries().size() == 0) {
BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(new ArrayList<>());
responsesReceivedTestingTopicBulkSub.add(response);
System.out.println("res size: " + responsesReceivedTestingTopicBulkSub.size());
return response;
}
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<>();
for (BulkSubscribeMessageEntry<?> entry: bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber got entry ID: %s\n", entry.getEntryId());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(entries);
responsesReceivedTestingTopicBulkSub.add(response);
System.out.println("res size: " + responsesReceivedTestingTopicBulkSub.size());
return response;
});
}
private BiFunction<String, List<CloudEvent<?>>, List<CloudEvent<?>>> merge(final CloudEvent<?> item) {
return (key, value) -> {
final List<CloudEvent<?>> list = value == null ? new ArrayList<>() : value;
list.add(item);
return list;
};
}
@GetMapping(path = "/health")
public void health() {
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.testcontainers.pubsub.http;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TestPubSubApplication {
public static void main(String[] args) {
SpringApplication.run(TestPubSubApplication.class, args);
}
}