Fix PubSub publish + Update IT to use RC3 bits. (#471)

* Update IT to use RC3 bits.

* Fixing publish method not to handle content-type metadata in GRPC.

* Fix flaky actor ITs.
This commit is contained in:
Artur Souza 2021-02-01 18:19:50 -08:00 committed by GitHub
parent 3918db1758
commit 277f9958a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 58 additions and 23 deletions

View File

@ -22,11 +22,11 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: 13.0.x
DAPR_CLI_VER: 1.0.0-rc.3
DAPR_RUNTIME_VER: 1.0.0-rc.2
DAPR_CLI_VER: 1.0.0-rc.4
DAPR_RUNTIME_VER: 1.0.0-rc.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh
DAPR_CLI_REF:
DAPR_REF: 5a15b3e0f093d2d0938b12f144c7047474a290fe
DAPR_REF:
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
GPG_KEY: ${{ secrets.GPG_KEY }}

View File

@ -12,6 +12,7 @@ import io.dapr.it.AppRun;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.actors.app.MyActorService;
import io.dapr.it.state.GRPCStateClientIT;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.After;
import org.junit.Before;
@ -19,9 +20,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import static io.dapr.it.Retry.callWithRetry;
import static io.dapr.it.actors.MyActorTestUtils.countMethodCalls;
import static io.dapr.it.actors.MyActorTestUtils.fetchMethodCallLogs;
import static io.dapr.it.actors.MyActorTestUtils.validateMethodCalls;
@ -36,6 +40,8 @@ public class ActorReminderRecoveryIT extends BaseIT {
private ImmutablePair<AppRun, DaprRun> runs;
private DaprRun clientRun;
@Before
public void init() throws Exception {
runs = startSplitDaprAndApp(
@ -45,6 +51,11 @@ public class ActorReminderRecoveryIT extends BaseIT {
true,
60000);
// Run that will stay up for integration tests.
// appId must not contain the appId from the other run, otherwise ITs will not run properly.
clientRun = startDaprApp("ActorReminderRecoveryTestClient", 5000);
clientRun.use();
Thread.sleep(3000);
ActorId actorId = new ActorId(UUID.randomUUID().toString());
@ -77,8 +88,12 @@ public class ActorReminderRecoveryIT extends BaseIT {
logger.debug("Pausing 7 seconds to allow reminder to fire");
Thread.sleep(7000);
List<MethodEntryTracker> logs = fetchMethodCallLogs(proxy);
validateMethodCalls(logs, METHOD_NAME, 3);
final List<MethodEntryTracker> logs = new ArrayList<>();
callWithRetry(() -> {
logs.clear();
logs.addAll(fetchMethodCallLogs(proxy));
validateMethodCalls(logs, METHOD_NAME, 3);
}, 5000);
// Restarts runtime only.
logger.info("Stopping Dapr sidecar");
@ -87,14 +102,27 @@ public class ActorReminderRecoveryIT extends BaseIT {
runs.right.start();
logger.info("Dapr sidecar started");
logger.debug("Pausing 7 seconds to allow sidecar to be healthy");
logger.info("Pausing 7 seconds to allow sidecar to be healthy");
Thread.sleep(7000);
List<MethodEntryTracker> newLogs = fetchMethodCallLogs(proxy);
logger.debug("Pausing 10 seconds to allow reminder to fire a few times");
Thread.sleep(10000);
List<MethodEntryTracker> newLogs2 = fetchMethodCallLogs(proxy);
logger.debug("Check if there has been additional calls");
validateMethodCalls(newLogs2, METHOD_NAME, countMethodCalls(newLogs, METHOD_NAME) + 3);
callWithRetry(() -> {
logger.info("Fetching logs for " + METHOD_NAME);
List<MethodEntryTracker> newLogs = fetchMethodCallLogs(proxy);
validateMethodCalls(newLogs, METHOD_NAME, 1);
logger.info("Pausing 10 seconds to allow reminder to fire a few times");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
logger.error("Sleep interrupted");
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
logger.info("Fetching more logs for " + METHOD_NAME);
List<MethodEntryTracker> newLogs2 = fetchMethodCallLogs(proxy);
logger.info("Check if there has been additional calls");
validateMethodCalls(newLogs2, METHOD_NAME, countMethodCalls(newLogs, METHOD_NAME) + 3);
}, 60000);
}
}

View File

@ -17,11 +17,14 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import static io.dapr.it.Retry.callWithRetry;
import static io.dapr.it.actors.MyActorTestUtils.fetchMethodCallLogs;
import static io.dapr.it.actors.MyActorTestUtils.validateMethodCalls;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
public class ActorTimerRecoveryIT extends BaseIT {
@ -60,8 +63,12 @@ public class ActorTimerRecoveryIT extends BaseIT {
logger.debug("Pausing 7 seconds to allow timer to fire");
Thread.sleep(7000);
List<MethodEntryTracker> logs = fetchMethodCallLogs(proxy);
validateMethodCalls(logs, METHOD_NAME, 3);
final List<MethodEntryTracker> logs = new ArrayList<>();
callWithRetry(() -> {
logs.clear();
logs.addAll(fetchMethodCallLogs(proxy));
validateMethodCalls(logs, METHOD_NAME, 3);
}, 5000);
// Restarts app only.
runs.left.stop();
@ -69,8 +76,12 @@ public class ActorTimerRecoveryIT extends BaseIT {
logger.debug("Pausing 10 seconds to allow timer to fire");
Thread.sleep(10000);
List<MethodEntryTracker> newLogs = fetchMethodCallLogs(proxy);
validateMethodCalls(newLogs, METHOD_NAME, 3);
final List<MethodEntryTracker> newLogs = new ArrayList<>();
callWithRetry(() -> {
newLogs.clear();
newLogs.addAll(fetchMethodCallLogs(proxy));
validateMethodCalls(newLogs, METHOD_NAME, 3);
}, 5000);
// Check that the restart actually happened by confirming the old logs are not in the new logs.
for (MethodEntryTracker oldLog: logs) {

View File

@ -133,8 +133,7 @@ public class PubSubIT extends BaseIT {
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
new byte[]{1},
Collections.singletonMap("content-type", "application/octet-stream")).block();
new byte[]{1}).block();
System.out.println("Published one byte.");
Thread.sleep(3000);

View File

@ -47,7 +47,8 @@ public class SubscriberController {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
System.out.println("Testing topic Subscriber got message: " + message);
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesReceivedTestingTopic.add(envelope.getData());
} catch (Exception e) {
throw new RuntimeException(e);

View File

@ -146,10 +146,6 @@ public class DaprClientGrpc extends AbstractDaprClient {
Map<String, String> metadata = request.getMetadata();
if (metadata != null) {
envelopeBuilder.putAllMetadata(metadata);
String contentType = metadata.get(io.dapr.client.domain.Metadata.CONTENT_TYPE);
if (contentType != null) {
envelopeBuilder.setDataContentType(contentType);
}
}
return this.<Empty>createMono(it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it))