diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 915127e22..a74e8e209 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -29,7 +29,7 @@ jobs: DAPR_RUNTIME_VER: 1.6.0-rc.2 DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.6.0-rc.1/install/install.sh DAPR_CLI_REF: - DAPR_REF: 5a307f3deaa1b322f7945179adad0403de80eb7e + DAPR_REF: 4cf499448ef6ee87c83db6a11b84e83237e92665 steps: - uses: actions/checkout@v3 - name: Set up OpenJDK ${{ env.JDK_VER }} diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index 67f8cd106..ce686db14 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -40,7 +40,7 @@ jobs: DAPR_RUNTIME_VER: 1.6.0-rc.2 DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.6.0-rc.1/install/install.sh DAPR_CLI_REF: - DAPR_REF: 5a307f3deaa1b322f7945179adad0403de80eb7e + DAPR_REF: 4cf499448ef6ee87c83db6a11b84e83237e92665 steps: - uses: actions/checkout@v3 - name: Set up OpenJDK ${{ env.JDK_VER }} diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java index 35e77ce5e..32491f68c 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java @@ -37,6 +37,10 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; +import java.util.Random; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Objects; import static io.dapr.it.Retry.callWithRetry; import static io.dapr.it.TestUtils.assertThrowsDaprException; @@ -44,6 +48,8 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; + @RunWith(Parameterized.class) public class PubSubIT extends BaseIT { @@ -51,6 +57,7 @@ public class PubSubIT extends BaseIT { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final TypeRef> CLOUD_EVENT_LIST_TYPE_REF = new TypeRef<>() {}; + private static final TypeRef>> CLOUD_EVENT_LONG_LIST_TYPE_REF = new TypeRef<>() {}; private static final TypeRef>> CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF = new TypeRef<>() {}; //Number of messages to be sent: 10 @@ -66,6 +73,8 @@ public class PubSubIT extends BaseIT { // Topic to test binary data private static final String BINARY_TOPIC_NAME = "binarytopic"; + private static final String LONG_TOPIC_NAME = "testinglongvalues"; + /** * Parameters for this test. * Param #1: useGrpc. @@ -402,6 +411,73 @@ public class PubSubIT extends BaseIT { daprRun.stop(); } + @Test + public void testLongValues() throws Exception { + final DaprRun daprRun = closeLater(startDaprApp( + this.getClass().getSimpleName(), + SubscriberService.SUCCESS_MESSAGE, + SubscriberService.class, + true, + 60000)); + // At this point, it is guaranteed that the service above is running and all ports being listened to. + if (this.useGrpc) { + daprRun.switchToGRPC(); + } else { + daprRun.switchToHTTP(); + } + + ConvertToLong toLong = new ConvertToLong(); + HashSet expected = new HashSet<>(); + Random random = new Random(); + Long randomLong = 590518626939830271L; + random.setSeed(randomLong); + toLong.setValue(randomLong); + expected.add(toLong); + for (int i = 1; i < NUM_MESSAGES; i++) { + ConvertToLong value = new ConvertToLong(); + randomLong = random.nextLong(); + value.setValue(randomLong); + expected.add(value); + System.out.println("expected value is : " +value); + } + Iterator expectVal = expected.iterator(); + try (DaprClient client = new DaprClientBuilder().build()) { + while(expectVal.hasNext()) { + //Publishing messages + client.publishEvent( + PUBSUB_NAME, + LONG_TOPIC_NAME, + expectVal.next(), + Collections.singletonMap(Metadata.TTL_IN_SECONDS, "1")).block(); + + try { + Thread.sleep((long) (1000 * Math.random())); + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + return; + } + } + } + + HashSet actual = new HashSet<>(); + try (DaprClient client = new DaprClientBuilder().build()) { + callWithRetry(() -> { + System.out.println("Checking results for topic " + LONG_TOPIC_NAME); + final List> messages = client.invokeMethod( + daprRun.getAppName(), + "messages/testinglongvalues", + null, + HttpExtension.GET, CLOUD_EVENT_LONG_LIST_TYPE_REF).block(); + assertNotNull(messages); + for (int i = 0; i < NUM_MESSAGES; i++) { + actual.add(messages.get(i).getData()); + } + assertEquals(expected,actual); + }, 2000); + } + } + public static class MyObject { private String id; @@ -413,4 +489,30 @@ public class PubSubIT extends BaseIT { this.id = id; } } + + public static class ConvertToLong { + private Long value; + + public Long getValue() { + return value; + } + + public void setValue(Long value) { + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConvertToLong that = (ConvertToLong) o; + return Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(value); + } + } + } diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java index 38bca7730..726e8990b 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java @@ -115,6 +115,20 @@ public class SubscriberController { }); } + @Topic(name = "testinglongvalues", pubsubName = "messagebus") + @PostMapping(path = "/testinglongvalues") + public Mono handleMessageLongValues(@RequestBody(required = false) CloudEvent cloudEvent) { + return Mono.fromRunnable(() -> { + try { + Long message = cloudEvent.getData().getValue(); + System.out.println("Subscriber got: " + message); + messagesByTopic.compute("testinglongvalues", merge(cloudEvent)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + private BiFunction>, List>> merge(final CloudEvent item) { return (key, value) -> { final List> list = value == null ? new ArrayList<>() : value;