diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 174f69be4..7e9d48f72 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,11 +22,11 @@ jobs: GOARCH: amd64 GOPROXY: https://proxy.golang.org JDK_VER: 13.0.x - DAPR_CLI_VER: 1.0.0-rc.3 - DAPR_RUNTIME_VER: 1.0.0-rc.2 + DAPR_CLI_VER: 1.0.0-rc.4 + DAPR_RUNTIME_VER: 1.0.0-rc.3 DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh DAPR_CLI_REF: - DAPR_REF: 5a15b3e0f093d2d0938b12f144c7047474a290fe + DAPR_REF: OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }} OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }} GPG_KEY: ${{ secrets.GPG_KEY }} diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java index 504b6876e..b751fbb2f 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java @@ -12,6 +12,7 @@ import io.dapr.it.AppRun; import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; import io.dapr.it.actors.app.MyActorService; +import io.dapr.it.state.GRPCStateClientIT; import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.After; import org.junit.Before; @@ -19,9 +20,12 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.UUID; +import static io.dapr.it.Retry.callWithRetry; import static io.dapr.it.actors.MyActorTestUtils.countMethodCalls; import static io.dapr.it.actors.MyActorTestUtils.fetchMethodCallLogs; import static io.dapr.it.actors.MyActorTestUtils.validateMethodCalls; @@ -36,6 +40,8 @@ public class ActorReminderRecoveryIT extends BaseIT { private ImmutablePair runs; + private DaprRun clientRun; + @Before public void init() throws Exception { runs = startSplitDaprAndApp( @@ -45,6 +51,11 @@ public class ActorReminderRecoveryIT extends BaseIT { true, 60000); + // Run that will stay up for integration tests. + // appId must not contain the appId from the other run, otherwise ITs will not run properly. + clientRun = startDaprApp("ActorReminderRecoveryTestClient", 5000); + clientRun.use(); + Thread.sleep(3000); ActorId actorId = new ActorId(UUID.randomUUID().toString()); @@ -77,8 +88,12 @@ public class ActorReminderRecoveryIT extends BaseIT { logger.debug("Pausing 7 seconds to allow reminder to fire"); Thread.sleep(7000); - List logs = fetchMethodCallLogs(proxy); - validateMethodCalls(logs, METHOD_NAME, 3); + final List logs = new ArrayList<>(); + callWithRetry(() -> { + logs.clear(); + logs.addAll(fetchMethodCallLogs(proxy)); + validateMethodCalls(logs, METHOD_NAME, 3); + }, 5000); // Restarts runtime only. logger.info("Stopping Dapr sidecar"); @@ -87,14 +102,27 @@ public class ActorReminderRecoveryIT extends BaseIT { runs.right.start(); logger.info("Dapr sidecar started"); - logger.debug("Pausing 7 seconds to allow sidecar to be healthy"); + logger.info("Pausing 7 seconds to allow sidecar to be healthy"); Thread.sleep(7000); - List newLogs = fetchMethodCallLogs(proxy); - logger.debug("Pausing 10 seconds to allow reminder to fire a few times"); - Thread.sleep(10000); - List newLogs2 = fetchMethodCallLogs(proxy); - logger.debug("Check if there has been additional calls"); - validateMethodCalls(newLogs2, METHOD_NAME, countMethodCalls(newLogs, METHOD_NAME) + 3); + callWithRetry(() -> { + logger.info("Fetching logs for " + METHOD_NAME); + List newLogs = fetchMethodCallLogs(proxy); + validateMethodCalls(newLogs, METHOD_NAME, 1); + + logger.info("Pausing 10 seconds to allow reminder to fire a few times"); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + logger.error("Sleep interrupted"); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + logger.info("Fetching more logs for " + METHOD_NAME); + List newLogs2 = fetchMethodCallLogs(proxy); + logger.info("Check if there has been additional calls"); + validateMethodCalls(newLogs2, METHOD_NAME, countMethodCalls(newLogs, METHOD_NAME) + 3); + }, 60000); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java index 09f8ffea4..2f370775e 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java @@ -17,11 +17,14 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.UUID; +import static io.dapr.it.Retry.callWithRetry; import static io.dapr.it.actors.MyActorTestUtils.fetchMethodCallLogs; import static io.dapr.it.actors.MyActorTestUtils.validateMethodCalls; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; public class ActorTimerRecoveryIT extends BaseIT { @@ -60,8 +63,12 @@ public class ActorTimerRecoveryIT extends BaseIT { logger.debug("Pausing 7 seconds to allow timer to fire"); Thread.sleep(7000); - List logs = fetchMethodCallLogs(proxy); - validateMethodCalls(logs, METHOD_NAME, 3); + final List logs = new ArrayList<>(); + callWithRetry(() -> { + logs.clear(); + logs.addAll(fetchMethodCallLogs(proxy)); + validateMethodCalls(logs, METHOD_NAME, 3); + }, 5000); // Restarts app only. runs.left.stop(); @@ -69,8 +76,12 @@ public class ActorTimerRecoveryIT extends BaseIT { logger.debug("Pausing 10 seconds to allow timer to fire"); Thread.sleep(10000); - List newLogs = fetchMethodCallLogs(proxy); - validateMethodCalls(newLogs, METHOD_NAME, 3); + final List newLogs = new ArrayList<>(); + callWithRetry(() -> { + newLogs.clear(); + newLogs.addAll(fetchMethodCallLogs(proxy)); + validateMethodCalls(newLogs, METHOD_NAME, 3); + }, 5000); // Check that the restart actually happened by confirming the old logs are not in the new logs. for (MethodEntryTracker oldLog: logs) { diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java index 38123eb5e..d072ab143 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java @@ -133,8 +133,7 @@ public class PubSubIT extends BaseIT { client.publishEvent( PUBSUB_NAME, TOPIC_NAME, - new byte[]{1}, - Collections.singletonMap("content-type", "application/octet-stream")).block(); + new byte[]{1}).block(); System.out.println("Published one byte."); Thread.sleep(3000); diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java index 7b7f68fbc..433af7044 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java @@ -47,7 +47,8 @@ public class SubscriberController { return Mono.fromRunnable(() -> { try { String message = envelope.getData() == null ? "" : envelope.getData().toString(); - System.out.println("Testing topic Subscriber got message: " + message); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); messagesReceivedTestingTopic.add(envelope.getData()); } catch (Exception e) { throw new RuntimeException(e); diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 628d69580..e0976ef02 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -146,10 +146,6 @@ public class DaprClientGrpc extends AbstractDaprClient { Map metadata = request.getMetadata(); if (metadata != null) { envelopeBuilder.putAllMetadata(metadata); - String contentType = metadata.get(io.dapr.client.domain.Metadata.CONTENT_TYPE); - if (contentType != null) { - envelopeBuilder.setDataContentType(contentType); - } } return this.createMono(it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it))