Adding Integration Test for Pub Sub over HTTP (#167)

* Adding Integration Test for Pub Sub over HTTP

* Update PubSubIT.java

* Remove unneeded thread sleep

* Serializer is now optional in client builder.

Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
Juan Jose Herrera 2020-01-29 13:46:58 -06:00 committed by GitHub
parent 45fe471c48
commit ebea096bef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 174 additions and 0 deletions

View File

@ -0,0 +1,77 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it.pubsub.http;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.Verb;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import static io.dapr.it.Retry.callWithRetry;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class PubSubIT extends BaseIT {
//Number of messages to be sent: 10
private static final int NUM_MESSAGES = 10;
//The title of the topic to be used for publishing
private static final String TOPIC_NAME = "testingtopic";
@Test
public void testPubSub() throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir"));
final DaprRun daprRun = startDaprApp(
this.getClass().getSimpleName(),
SubscriberService.SUCCESS_MESSAGE,
SubscriberService.class,
true,
60000);
// At this point, it is guaranteed that the service above is running and all ports being listened to.
DaprClient client = new DaprClientBuilder().build();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
//Publishing messages
client.publishEvent(TOPIC_NAME, message).block();
System.out.println("Published message: " + message);
}
//Publishing a single byte: Example of non-string based content published
client.publishEvent(
TOPIC_NAME,
new byte[] { 1 },
Collections.singletonMap("content-type", "application/octet-stream")).block();
System.out.println("Published one byte.");
Thread.sleep(3000);
callWithRetry(() -> {
System.out.println("Checking results ...");
final List<String> messages = client.invokeService(Verb.GET, daprRun.getAppName(), "messages", null, List.class).block();
assertEquals(11, messages.size());
for (int i = 0; i < NUM_MESSAGES; i++) {
assertTrue(messages.get(i).startsWith("This is message "));
}
byte[] result=new byte[] { 1 };
assertEquals(result.length, messages.get(10).getBytes().length);
assertEquals(result[0], messages.get(10).getBytes()[0]);
}, 2000);
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it.pubsub.http;
import io.dapr.client.domain.CloudEvent;
import io.dapr.serializer.DefaultObjectSerializer;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* SpringBoot Controller to handle input binding.
*/
@RestController
public class SubscriberController {
private static final List<String> messagesReceived = new ArrayList();
/**
* Dapr's default serializer/deserializer.
*/
private static final DefaultObjectSerializer SERIALIZER = new DefaultObjectSerializer ();
@GetMapping("/dapr/subscribe")
public byte[] daprConfig() throws Exception {
return SERIALIZER.serialize(new String[] { "testingtopic" });
}
@GetMapping(path = "/messages")
public List<String> getMessages() {
return messagesReceived;
}
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> headers) {
return Mono.fromRunnable(() -> {
try {
// Dapr's event is compliant to CloudEvent.
CloudEvent envelope = CloudEvent.deserialize(body);
String message = envelope.getData() == null ? "" : envelope.getData();
System.out.println("Subscriber got message: " + message);
messagesReceived.add(envelope.getData());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it.pubsub.http;
import io.dapr.it.binding.http.InputBindingService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Service for subscriber.
*/
@SpringBootApplication(scanBasePackages = {"io.dapr.it.pubsub.http"})
public class SubscriberService {
public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running. Init Elapsed";
public static void main(String[] args) throws Exception {
int port = Integer.parseInt(args[0]);
System.out.printf("Service starting on port %d ...\n", port);
// Start Dapr's callback endpoint.
start(port);
}
/**
* Starts Dapr's callback in a given port.
*
* @param port Port to listen to.
*/
private static void start(int port) {
SpringApplication app = new SpringApplication(SubscriberService.class);
app.run(String.format("--server.port=%d", port));
}
}