diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java new file mode 100644 index 000000000..8e916b1b7 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.it.pubsub.http; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.domain.Verb; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import io.dapr.serializer.DefaultObjectSerializer; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static io.dapr.it.Retry.callWithRetry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class PubSubIT extends BaseIT { + + //Number of messages to be sent: 10 + private static final int NUM_MESSAGES = 10; + + //The title of the topic to be used for publishing + private static final String TOPIC_NAME = "testingtopic"; + + @Test + public void testPubSub() throws Exception { + System.out.println("Working Directory = " + System.getProperty("user.dir")); + + final DaprRun daprRun = startDaprApp( + this.getClass().getSimpleName(), + SubscriberService.SUCCESS_MESSAGE, + SubscriberService.class, + true, + 60000); + // At this point, it is guaranteed that the service above is running and all ports being listened to. + + DaprClient client = new DaprClientBuilder().build(); + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("This is message #%d", i); + //Publishing messages + client.publishEvent(TOPIC_NAME, message).block(); + System.out.println("Published message: " + message); + } + + //Publishing a single byte: Example of non-string based content published + client.publishEvent( + TOPIC_NAME, + new byte[] { 1 }, + Collections.singletonMap("content-type", "application/octet-stream")).block(); + System.out.println("Published one byte."); + + Thread.sleep(3000); + + callWithRetry(() -> { + System.out.println("Checking results ..."); + final List messages = client.invokeService(Verb.GET, daprRun.getAppName(), "messages", null, List.class).block(); + assertEquals(11, messages.size()); + + for (int i = 0; i < NUM_MESSAGES; i++) { + + assertTrue(messages.get(i).startsWith("This is message ")); + + } + byte[] result=new byte[] { 1 }; + assertEquals(result.length, messages.get(10).getBytes().length); + assertEquals(result[0], messages.get(10).getBytes()[0]); + + }, 2000); + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java new file mode 100644 index 000000000..507676879 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.it.pubsub.http; + +import io.dapr.client.domain.CloudEvent; +import io.dapr.serializer.DefaultObjectSerializer; +import org.springframework.web.bind.annotation.*; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * SpringBoot Controller to handle input binding. + */ +@RestController +public class SubscriberController { + + private static final List messagesReceived = new ArrayList(); + + /** + * Dapr's default serializer/deserializer. + */ + private static final DefaultObjectSerializer SERIALIZER = new DefaultObjectSerializer (); + + @GetMapping("/dapr/subscribe") + public byte[] daprConfig() throws Exception { + return SERIALIZER.serialize(new String[] { "testingtopic" }); + } + + @GetMapping(path = "/messages") + public List getMessages() { + return messagesReceived; + } + + @PostMapping(path = "/testingtopic") + public Mono handleMessage(@RequestBody(required = false) byte[] body, + @RequestHeader Map headers) { + return Mono.fromRunnable(() -> { + try { + // Dapr's event is compliant to CloudEvent. + CloudEvent envelope = CloudEvent.deserialize(body); + + String message = envelope.getData() == null ? "" : envelope.getData(); + System.out.println("Subscriber got message: " + message); + messagesReceived.add(envelope.getData()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberService.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberService.java new file mode 100644 index 000000000..4426ec22e --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberService.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.it.pubsub.http; + +import io.dapr.it.binding.http.InputBindingService; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +/** + * Service for subscriber. + */ +@SpringBootApplication(scanBasePackages = {"io.dapr.it.pubsub.http"}) +public class SubscriberService { + + public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running. Init Elapsed"; + + public static void main(String[] args) throws Exception { + int port = Integer.parseInt(args[0]); + + System.out.printf("Service starting on port %d ...\n", port); + + // Start Dapr's callback endpoint. + start(port); + } + + /** + * Starts Dapr's callback in a given port. + * + * @param port Port to listen to. + */ + private static void start(int port) { + SpringApplication app = new SpringApplication(SubscriberService.class); + app.run(String.format("--server.port=%d", port)); + } + +} \ No newline at end of file