mirror of https://github.com/dapr/java-sdk.git
Refactor IT into new jar + use JVM sytem properties (#156)
* Refactor IT into new jar + use JVM sytem properties * Fixing ITs and silence flaky tests. * Fix flaky tests.
This commit is contained in:
parent
7f1d4f09c7
commit
a40afafca4
|
@ -40,16 +40,18 @@ jobs:
|
|||
dapr --version
|
||||
- name: Install Local kafka using docker-compose
|
||||
run: |
|
||||
docker-compose -f ./sdk/src/test/java/io/dapr/it/deploy/local-test-kafka.yml up -d
|
||||
docker-compose -f ./sdk-tests/deploy/local-test-kafka.yml up -d
|
||||
docker ps
|
||||
- name: Clean up files
|
||||
run: mvn clean
|
||||
- name: Build sdk
|
||||
run: mvn compile
|
||||
- name: Unit-test
|
||||
run: mvn test
|
||||
- name: Integration-test
|
||||
run: mvn integration-test
|
||||
run: mvn compile -q
|
||||
- name: Unit tests
|
||||
run: mvn test -q
|
||||
- name: Install jars
|
||||
run: mvn install -q
|
||||
- name: Integration tests
|
||||
run: mvn -f sdk-tests/pom.xml verify -q
|
||||
- name: Upload test report for sdk
|
||||
uses: actions/upload-artifact@master
|
||||
with:
|
||||
|
@ -60,8 +62,6 @@ jobs:
|
|||
with:
|
||||
name: report-dapr-java-sdk-actors
|
||||
path: sdk-actors/target/jacoco-report/
|
||||
- name: Packaging jars
|
||||
run: mvn package
|
||||
- name: Get pom parent version
|
||||
run: |
|
||||
PARENT_VERSION=$(mvn -q -Dexec.executable=echo -Dexec.args='${project.version}' --non-recursive exec:exec)
|
||||
|
|
|
@ -39,4 +39,5 @@
|
|||
hs_err_pid*
|
||||
|
||||
# Some other generated folders/files
|
||||
components/
|
||||
**/components/redis.yaml
|
||||
**/components/redis_messagebus.yaml
|
||||
|
|
1
pom.xml
1
pom.xml
|
@ -119,6 +119,7 @@
|
|||
<goal>verify</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<!--suppress UnresolvedMavenProperty -->
|
||||
<skip>${skipITs}</skip>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
|
|
@ -28,7 +28,6 @@
|
|||
</repositories>
|
||||
|
||||
<properties>
|
||||
<skipITs>true</skipITs>
|
||||
<maven.deploy.skip>false</maven.deploy.skip>
|
||||
</properties>
|
||||
|
||||
|
@ -60,12 +59,6 @@
|
|||
<version>5.5.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.stefanbirkner</groupId>
|
||||
<artifactId>system-rules</artifactId>
|
||||
<version>1.19.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-cli</groupId>
|
||||
<artifactId>commons-cli</artifactId>
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
package io.dapr.actors.it;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.contrib.java.lang.system.EnvironmentVariables;
|
||||
import java.util.Optional;
|
||||
|
||||
public class BaseIT {
|
||||
|
||||
protected static DaprIntegrationTestingRunner daprIntegrationTestingRunner;
|
||||
|
||||
public static DaprIntegrationTestingRunner createDaprIntegrationTestingRunner(String successMessage, Class serviceClass, Boolean useAppPort, Boolean useGrpcPort, Boolean useHttpPort, int sleepTime, boolean isClient) {
|
||||
return new DaprIntegrationTestingRunner(successMessage, serviceClass, useAppPort, useGrpcPort, useHttpPort, sleepTime, isClient);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanUp() {
|
||||
Optional.ofNullable(daprIntegrationTestingRunner).ifPresent(daprRunner -> daprRunner.destroyDapr());
|
||||
}
|
||||
|
||||
public static class MyData {
|
||||
|
||||
/// Gets or sets the value for PropertyA.
|
||||
private String propertyA;
|
||||
|
||||
/// Gets or sets the value for PropertyB.
|
||||
private String propertyB;
|
||||
|
||||
private MyData myData;
|
||||
|
||||
public String getPropertyB() {
|
||||
return propertyB;
|
||||
}
|
||||
|
||||
public void setPropertyB(String propertyB) {
|
||||
this.propertyB = propertyB;
|
||||
}
|
||||
|
||||
public String getPropertyA() {
|
||||
return propertyA;
|
||||
}
|
||||
|
||||
public void setPropertyA(String propertyA) {
|
||||
this.propertyA = propertyA;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MyData{" +
|
||||
"propertyA='" + propertyA + '\'' +
|
||||
", propertyB='" + propertyB + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
public MyData getMyData() {
|
||||
return myData;
|
||||
}
|
||||
|
||||
public void setMyData(MyData myData) {
|
||||
this.myData = myData;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,168 +0,0 @@
|
|||
package io.dapr.actors.it;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.contrib.java.lang.system.EnvironmentVariables;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class DaprIntegrationTestingRunner {
|
||||
|
||||
private static AtomicInteger appGeneratorId = new AtomicInteger();
|
||||
|
||||
@Rule
|
||||
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
|
||||
public DaprIntegrationTestingRunner.DaprFreePorts DAPR_FREEPORTS;
|
||||
|
||||
private Runtime rt = Runtime.getRuntime();
|
||||
private Process proc;
|
||||
|
||||
private String successMessage;
|
||||
private Class serviceClass;
|
||||
private Boolean useAppPort;
|
||||
private Boolean useGrpcPort;
|
||||
private Boolean useHttpPort;
|
||||
private int sleepTime;
|
||||
private String appName;
|
||||
private boolean appRanOK = Boolean.FALSE;
|
||||
private boolean isClient = false;
|
||||
|
||||
DaprIntegrationTestingRunner(String successMessage, Class serviceClass, Boolean useAppPort, Boolean useGrpcPort, Boolean useHttpPort, int sleepTime, boolean isClient) {
|
||||
this.successMessage = successMessage;
|
||||
this.serviceClass = serviceClass;
|
||||
this.useAppPort = useAppPort;
|
||||
this.useGrpcPort = useGrpcPort;
|
||||
this.useHttpPort = useHttpPort;
|
||||
this.sleepTime = sleepTime;
|
||||
this.isClient = isClient;
|
||||
this.generateAppName();
|
||||
try {
|
||||
DAPR_FREEPORTS = new DaprIntegrationTestingRunner.DaprFreePorts().initPorts();
|
||||
environmentVariables.set("DAPR_HTTP_PORT", String.valueOf(DAPR_FREEPORTS.getHttpPort()));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public DaprFreePorts initializeDapr() throws Exception {
|
||||
String daprCommand=this.buildDaprCommand();
|
||||
System.out.println(daprCommand);
|
||||
proc= rt.exec(daprCommand);
|
||||
|
||||
final Thread stuffToDo = new Thread(() -> {
|
||||
try {
|
||||
try (InputStream stdin = proc.getInputStream()) {
|
||||
try(InputStreamReader isr = new InputStreamReader(stdin)) {
|
||||
try (BufferedReader br = new BufferedReader(isr)){
|
||||
String line;
|
||||
while ((line = br.readLine()) != null) {
|
||||
System.out.println(line);
|
||||
if (line.contains(successMessage)) {
|
||||
this.appRanOK = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
Assert.fail(ex.getMessage());
|
||||
}
|
||||
});
|
||||
stuffToDo.start();
|
||||
Thread.sleep(sleepTime);
|
||||
return DAPR_FREEPORTS;
|
||||
}
|
||||
|
||||
private static final String DAPR_RUN = "dapr run --app-id %s ";
|
||||
|
||||
/**
|
||||
* The args in -Dexec.args are the App name, and if needed the app's port.
|
||||
* The args are passed as a CSV due to conflict of parsing a space separated list in different OS
|
||||
*/
|
||||
private static final String DAPR_COMMAND = " -- mvn exec:java -Dexec.mainClass=%s -Dexec.classpathScope=test -Dexec.args=\"%s,%s\"";
|
||||
|
||||
private String buildDaprCommand(){
|
||||
StringBuilder stringBuilder= new StringBuilder(String.format(DAPR_RUN, this.appName))
|
||||
.append(this.useAppPort ? "--app-port " + this.DAPR_FREEPORTS.appPort : "")
|
||||
.append(this.useGrpcPort ? " --grpc-port " + this.DAPR_FREEPORTS.grpcPort : "")
|
||||
.append(this.useHttpPort ? " --port " + this.DAPR_FREEPORTS.httpPort : "")
|
||||
.append(String.format(DAPR_COMMAND, this.serviceClass.getCanonicalName(), this.appName, buildPortsParamCommands()));
|
||||
return stringBuilder.toString();
|
||||
}
|
||||
|
||||
private String buildPortsParamCommands() {
|
||||
StringBuilder ports = new StringBuilder();
|
||||
if (this.useAppPort) {
|
||||
ports.append(this.DAPR_FREEPORTS.appPort);
|
||||
}
|
||||
return ports.toString();
|
||||
}
|
||||
|
||||
private void generateAppName(){
|
||||
this.appName="DAPRapp" + appGeneratorId.incrementAndGet();
|
||||
}
|
||||
|
||||
public boolean isAppRanOK() {
|
||||
return appRanOK;
|
||||
}
|
||||
|
||||
public String getAppName() {
|
||||
return appName;
|
||||
}
|
||||
|
||||
private static Integer findRandomOpenPortOnAllLocalInterfaces() throws Exception {
|
||||
try (
|
||||
ServerSocket socket = new ServerSocket(0)
|
||||
) {
|
||||
return socket.getLocalPort();
|
||||
}
|
||||
}
|
||||
|
||||
public void destroyDapr() {
|
||||
Optional.ofNullable(rt).ifPresent( runtime -> {
|
||||
try {
|
||||
runtime.exec("dapr stop --app-id " + this.appName);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
Optional.ofNullable(proc).ifPresent(process -> process.destroy());
|
||||
}
|
||||
|
||||
public static class DaprFreePorts {
|
||||
private int grpcPort;
|
||||
private int httpPort;
|
||||
private int appPort;
|
||||
|
||||
public DaprFreePorts initPorts() throws Exception {
|
||||
this.appPort = findRandomOpenPortOnAllLocalInterfaces();
|
||||
this.grpcPort = findRandomOpenPortOnAllLocalInterfaces();
|
||||
this.httpPort = findRandomOpenPortOnAllLocalInterfaces();
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getGrpcPort() {
|
||||
return grpcPort;
|
||||
}
|
||||
|
||||
public int getHttpPort() {
|
||||
return httpPort;
|
||||
}
|
||||
|
||||
public int getAppPort() {
|
||||
return appPort;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,105 +0,0 @@
|
|||
package io.dapr.actors.it.actors;
|
||||
|
||||
import io.dapr.actors.ActorId;
|
||||
import io.dapr.actors.client.ActorProxy;
|
||||
import io.dapr.actors.client.ActorProxyBuilder;
|
||||
import io.dapr.actors.it.BaseIT;
|
||||
import io.dapr.actors.it.DaprIntegrationTestingRunner;
|
||||
import io.dapr.actors.it.services.springboot.ActorService;
|
||||
import io.dapr.actors.it.services.springboot.EmptyService;
|
||||
import io.dapr.client.DaprClient;
|
||||
import io.dapr.client.DaprClientBuilder;
|
||||
import io.dapr.client.domain.Verb;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import org.junit.*;
|
||||
import org.junit.contrib.java.lang.system.EnvironmentVariables;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class ActivationDeactiviationIT extends BaseIT {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(ActivationDeactiviationIT.class);
|
||||
|
||||
private static final AtomicInteger atomicInteger = new AtomicInteger(1);
|
||||
|
||||
private final String BASE_URL = "actors/%s/%s";
|
||||
|
||||
private final DefaultObjectSerializer serializer = new DefaultObjectSerializer();
|
||||
|
||||
private DaprIntegrationTestingRunner clientDaprIntegrationTestingRunner;
|
||||
|
||||
@Rule
|
||||
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
|
||||
|
||||
@After
|
||||
public void cleanUpTestCase() {
|
||||
Optional.ofNullable(clientDaprIntegrationTestingRunner).ifPresent(daprRunner -> daprRunner.destroyDapr());
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
daprIntegrationTestingRunner =
|
||||
createDaprIntegrationTestingRunner(
|
||||
"dapr initialized",
|
||||
ActorService.class,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
30000,
|
||||
false
|
||||
);
|
||||
daprIntegrationTestingRunner.initializeDapr();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThatWhenInvokingMethodActorActivatesItselfAndDeactivesIteselfAfterElepsedTime() throws Exception {
|
||||
Thread.sleep(20000);
|
||||
assertTrue("Service App did not started sucessfully", daprIntegrationTestingRunner.isAppRanOK());
|
||||
clientDaprIntegrationTestingRunner =
|
||||
createDaprIntegrationTestingRunner(
|
||||
"BUILD SUCCESS",
|
||||
EmptyService.class,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
20000,
|
||||
true
|
||||
);
|
||||
clientDaprIntegrationTestingRunner.initializeDapr();
|
||||
environmentVariables.set("DAPR_HTTP_PORT", String.valueOf(clientDaprIntegrationTestingRunner.DAPR_FREEPORTS.getHttpPort()));
|
||||
final AtomicInteger atomicInteger = new AtomicInteger(1);
|
||||
String actorType = "DemoActorTest";
|
||||
DefaultObjectSerializer serializer = new DefaultObjectSerializer();
|
||||
logger.debug("Creating proxy builder");
|
||||
ActorProxyBuilder proxyBuilder = new ActorProxyBuilder(actorType, serializer);
|
||||
logger.debug("Creating actorId");
|
||||
ActorId actorId1 = new ActorId(Integer.toString(atomicInteger.getAndIncrement()));
|
||||
logger.debug("Building proxy");
|
||||
ActorProxy proxy = proxyBuilder.build(actorId1);
|
||||
|
||||
logger.debug("Invoking Say from Proxy");
|
||||
String sayResponse = proxy.invokeActorMethod("say", "message", String.class).block();
|
||||
logger.debug("asserting not null response: [" + sayResponse + "]");
|
||||
assertNotNull(sayResponse);
|
||||
|
||||
logger.debug("Retrieving active Actors");
|
||||
List<String> activeActors = proxy.invokeActorMethod("retrieveActiveActors", null, List.class).block();
|
||||
logger.debug("Active actors: [" + activeActors.toString() + "]");
|
||||
assertTrue("Expecting actorId:[" + actorId1.toString() + "]", activeActors.contains(actorId1.toString()));
|
||||
|
||||
logger.debug("Waitng for 15 seconds so actor deactives itself");
|
||||
Thread.sleep(15000);
|
||||
|
||||
ActorId actorId2 = new ActorId(Integer.toString(atomicInteger.getAndIncrement()));
|
||||
ActorProxy proxy2 = proxyBuilder.build(actorId2);
|
||||
List<String> activeActorsSecondtry = proxy2.invokeActorMethod("retrieveActiveActors", null, List.class).block();
|
||||
logger.debug("Active actors: [" + activeActorsSecondtry.toString() + "]");
|
||||
assertFalse("NOT Expecting actorId:[" + actorId1.toString() + "]", activeActorsSecondtry.contains(actorId1.toString()));
|
||||
}
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
package io.dapr.actors.it.services.springboot;
|
||||
|
||||
import io.dapr.actors.client.ActorProxy;
|
||||
import io.dapr.actors.runtime.ActorRuntime;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.CommandLineParser;
|
||||
import org.apache.commons.cli.DefaultParser;
|
||||
import org.apache.commons.cli.Options;
|
||||
|
||||
public class ActorService {
|
||||
|
||||
/**
|
||||
* Starts the service.
|
||||
* @param args Expects the port: -p PORT
|
||||
* @throws Exception If cannot start service.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
// If port string is not valid, it will throw an exception.
|
||||
long port = Long.parseLong(args[0].split(",")[1]);
|
||||
ActorRuntime.getInstance().registerActor(DemoActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
|
||||
|
||||
DaprApplication.start(port);
|
||||
}
|
||||
}
|
|
@ -1,9 +0,0 @@
|
|||
package io.dapr.actors.it.services.springboot;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface DemoActor {
|
||||
String say(String something);
|
||||
|
||||
List<String> retrieveActiveActors();
|
||||
}
|
|
@ -1,14 +0,0 @@
|
|||
package io.dapr.actors.it.services.springboot;
|
||||
|
||||
/**
|
||||
* Use this class in order to run DAPR with any needed services, like states.
|
||||
*
|
||||
* To run manually, from repo root:
|
||||
* 1. mvn clean install
|
||||
* 2. dapr run --grpc-port 41707 --port 32851 -- mvn exec:java -Dexec.mainClass=io.dapr.it.services.EmptyService -Dexec.classpathScope="test" -Dexec.args="-p 44511 -grpcPort 41707 -httpPort 32851" -pl=sdk
|
||||
*/
|
||||
public class EmptyService {
|
||||
public static void main(String[] args) {
|
||||
System.out.println("Hello from EmptyService");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: sample123
|
||||
spec:
|
||||
type: bindings.kafka
|
||||
metadata:
|
||||
# Kafka broker connection setting
|
||||
- name: brokers
|
||||
value: localhost:9092
|
||||
# consumer configuration: topic and consumer group
|
||||
- name: topics
|
||||
value: sample
|
||||
- name: consumerGroup
|
||||
value: group1
|
||||
# publisher configuration: topic
|
||||
- name: publishTopic
|
||||
value: sample
|
||||
- name: authRequired
|
||||
value: "false"
|
|
@ -0,0 +1,103 @@
|
|||
<project
|
||||
xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>io.dapr</groupId>
|
||||
<artifactId>dapr-sdk-tests</artifactId>
|
||||
<version>0.0.0-SNAPSHOT</version>
|
||||
<name>dapr-sdk-tests</name>
|
||||
<description>Tests for Dapr's Java SDK - not to be published as a jar.</description>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<snapshots>
|
||||
<enabled>false</enabled>
|
||||
</snapshots>
|
||||
<id>central</id>
|
||||
<name>libs-release</name>
|
||||
<url>https://repo.spring.io/libs-release</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>11</maven.compiler.source>
|
||||
<maven.compiler.target>11</maven.compiler.target>
|
||||
<maven.deploy.skip>true</maven.deploy.skip>
|
||||
<dapr.sdk.version>0.2.0-SNAPSHOT</dapr.sdk.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.dapr</groupId>
|
||||
<artifactId>dapr-sdk</artifactId>
|
||||
<version>${dapr.sdk.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dapr</groupId>
|
||||
<artifactId>dapr-sdk-actors</artifactId>
|
||||
<version>${dapr.sdk.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>5.5.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
<version>2.2.2.RELEASE</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-autoconfigure</artifactId>
|
||||
<version>2.2.2.RELEASE</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>3.2.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>failsafe-maven-plugin</artifactId>
|
||||
<version>2.4.3-alpha-1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>integration-test</goal>
|
||||
<goal>verify</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<!--suppress UnresolvedMavenProperty -->
|
||||
<skip>${skipITs}</skip>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import org.junit.AfterClass;
|
||||
|
||||
public abstract class BaseIT {
|
||||
|
||||
private static final Collection<DaprRun> DAPR_RUNS = new ArrayList<>();
|
||||
|
||||
protected static DaprRun startDaprApp(
|
||||
String testName,
|
||||
String successMessage,
|
||||
Class serviceClass,
|
||||
Boolean useAppPort,
|
||||
int maxWaitMilliseconds) throws Exception {
|
||||
return startDaprApp(testName, successMessage, serviceClass, useAppPort, true, maxWaitMilliseconds);
|
||||
}
|
||||
|
||||
protected static DaprRun startDaprApp(
|
||||
String testName,
|
||||
String successMessage,
|
||||
Class serviceClass,
|
||||
Boolean useAppPort,
|
||||
Boolean useDaprPorts,
|
||||
int maxWaitMilliseconds) throws Exception {
|
||||
DaprRun run = new DaprRun(
|
||||
testName,
|
||||
DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts),
|
||||
successMessage,
|
||||
serviceClass,
|
||||
maxWaitMilliseconds);
|
||||
DAPR_RUNS.add(run);
|
||||
run.start();
|
||||
run.use();
|
||||
return run;
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanUp() throws Exception {
|
||||
for (DaprRun app : DAPR_RUNS) {
|
||||
app.stop();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class Command {
|
||||
|
||||
private static final int SUCCESS_WAIT_TIMEOUT_MINUTES = 5;
|
||||
|
||||
private final String successMessage;
|
||||
|
||||
private final String command;
|
||||
|
||||
private Process process;
|
||||
|
||||
public Command(String successMessage, String command) {
|
||||
this.successMessage = successMessage;
|
||||
this.command = command;
|
||||
}
|
||||
|
||||
public void run() throws InterruptedException, IOException {
|
||||
final AtomicBoolean success = new AtomicBoolean(false);
|
||||
final Semaphore finished = new Semaphore(0);
|
||||
this.process = Runtime.getRuntime().exec(command);
|
||||
|
||||
final Thread stdoutReader = new Thread(() -> {
|
||||
try {
|
||||
try (InputStream stdin = this.process.getInputStream()) {
|
||||
try (InputStreamReader isr = new InputStreamReader(stdin)) {
|
||||
try (BufferedReader br = new BufferedReader(isr)) {
|
||||
String line;
|
||||
while ((line = br.readLine()) != null) {
|
||||
System.out.println(line);
|
||||
if (line.contains(successMessage)) {
|
||||
success.set(true);
|
||||
finished.release();
|
||||
// Keep running
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!success.get()) {
|
||||
finished.release();
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
});
|
||||
|
||||
stdoutReader.start();
|
||||
// Waits for success to happen within 1 minute.
|
||||
finished.tryAcquire(SUCCESS_WAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
|
||||
if (!success.get()) {
|
||||
throw new RuntimeException("Could find success criteria for command: " + command);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class DaprPorts {
|
||||
|
||||
private final Integer grpcPort;
|
||||
|
||||
private final Integer httpPort;
|
||||
|
||||
private final Integer appPort;
|
||||
|
||||
private DaprPorts(Integer appPort, Integer httpPort, Integer grpcPort) {
|
||||
this.grpcPort = grpcPort;
|
||||
this.httpPort = httpPort;
|
||||
this.appPort = appPort;
|
||||
}
|
||||
|
||||
public static DaprPorts build(boolean appPort, boolean httpPort, boolean grpcPort) throws IOException {
|
||||
List<Integer> freePorts = findFreePorts(3);
|
||||
return new DaprPorts(
|
||||
appPort ? freePorts.get(0) : null,
|
||||
httpPort ? freePorts.get(1) : null,
|
||||
grpcPort ? freePorts.get(2) : null);
|
||||
}
|
||||
|
||||
public static DaprPorts build() throws IOException {
|
||||
return build(true, true, true);
|
||||
}
|
||||
|
||||
public Integer getGrpcPort() {
|
||||
return grpcPort;
|
||||
}
|
||||
|
||||
public Integer getHttpPort() {
|
||||
return httpPort;
|
||||
}
|
||||
|
||||
public Integer getAppPort() {
|
||||
return appPort;
|
||||
}
|
||||
|
||||
private static List<Integer> findFreePorts(int n) throws IOException {
|
||||
if (n <= 0) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
try (
|
||||
ServerSocket socket = new ServerSocket(0)
|
||||
) {
|
||||
socket.setReuseAddress(true);
|
||||
int port = socket.getLocalPort();
|
||||
List<Integer> output = findFreePorts(n - 1);
|
||||
output.add(port);
|
||||
return output;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,146 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static io.dapr.it.Retry.callWithRetry;
|
||||
|
||||
|
||||
public class DaprRun {
|
||||
|
||||
private static final String DAPR_RUN = "dapr run --app-id %s ";
|
||||
|
||||
// the arg in -Dexec.args is the app's port
|
||||
private static final String DAPR_COMMAND =
|
||||
" -- mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\"";
|
||||
|
||||
private final DaprPorts ports;
|
||||
|
||||
private final String appName;
|
||||
|
||||
private final int maxWaitMilliseconds;
|
||||
|
||||
private final AtomicBoolean started;
|
||||
|
||||
private final Command startCommand;
|
||||
|
||||
private final Command stopCommand;
|
||||
|
||||
DaprRun(
|
||||
String testName, DaprPorts ports, String successMessage, Class serviceClass, int maxWaitMilliseconds) {
|
||||
// The app name needs to be deterministic since we depend on it to kill previous runs.
|
||||
this.appName = String.format("%s_%s", testName, serviceClass.getSimpleName());
|
||||
this.startCommand =
|
||||
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports));
|
||||
this.stopCommand = new Command(
|
||||
"app stopped successfully",
|
||||
"dapr stop --app-id " + this.appName);
|
||||
this.ports = ports;
|
||||
this.maxWaitMilliseconds = maxWaitMilliseconds;
|
||||
this.started = new AtomicBoolean(false);
|
||||
}
|
||||
|
||||
public void start() throws InterruptedException, IOException {
|
||||
long start = System.currentTimeMillis();
|
||||
// First, try to stop previous run (if left running).
|
||||
this.stop();
|
||||
System.out.println("Starting dapr application ...");
|
||||
this.startCommand.run();
|
||||
this.started.set(true);
|
||||
|
||||
long timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start);
|
||||
if (this.ports.getAppPort() != null) {
|
||||
callWithRetry(() -> {
|
||||
System.out.println("Checking if app is listening on port ...");
|
||||
assertListeningOnPort(this.ports.getAppPort());
|
||||
}, timeLeft);
|
||||
}
|
||||
|
||||
if (this.ports.getHttpPort() != null) {
|
||||
timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start);
|
||||
callWithRetry(() -> {
|
||||
System.out.println("Checking if Dapr is listening on HTTP port ...");
|
||||
assertListeningOnPort(this.ports.getHttpPort());
|
||||
}, timeLeft);
|
||||
}
|
||||
|
||||
if (this.ports.getGrpcPort() != null) {
|
||||
timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start);
|
||||
callWithRetry(() -> {
|
||||
System.out.println("Checking if Dapr is listening on GRPC port ...");
|
||||
assertListeningOnPort(this.ports.getGrpcPort());
|
||||
}, timeLeft);
|
||||
}
|
||||
System.out.println("Dapr application started.");
|
||||
}
|
||||
|
||||
public void stop() throws InterruptedException, IOException {
|
||||
System.out.println("Stopping dapr application ...");
|
||||
try {
|
||||
this.stopCommand.run();
|
||||
} catch (RuntimeException e) {
|
||||
System.out.println("Could not stop app: " + this.appName);
|
||||
}
|
||||
|
||||
System.out.println("Dapr application stopped.");
|
||||
}
|
||||
|
||||
public void use() {
|
||||
if (this.ports.getHttpPort() != null) {
|
||||
System.getProperties().setProperty("dapr.http.port", String.valueOf(this.ports.getHttpPort()));
|
||||
}
|
||||
if (this.ports.getGrpcPort() != null) {
|
||||
System.getProperties().setProperty("dapr.grpc.port", String.valueOf(this.ports.getGrpcPort()));
|
||||
}
|
||||
System.getProperties().setProperty("dapr.grpc.enabled", Boolean.FALSE.toString());
|
||||
}
|
||||
|
||||
public void switchToGRPC() {
|
||||
System.getProperties().setProperty("dapr.grpc.enabled", Boolean.TRUE.toString());
|
||||
}
|
||||
|
||||
public int getGrpcPort() {
|
||||
return ports.getGrpcPort();
|
||||
}
|
||||
|
||||
public int getHttpPort() {
|
||||
return ports.getHttpPort();
|
||||
}
|
||||
|
||||
public int getAppPort() {
|
||||
return ports.getAppPort();
|
||||
}
|
||||
|
||||
public String getAppName() {
|
||||
return appName;
|
||||
}
|
||||
|
||||
private static String buildDaprCommand(String appName, Class serviceClass, DaprPorts ports) {
|
||||
StringBuilder stringBuilder = new StringBuilder(String.format(DAPR_RUN, appName))
|
||||
.append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "")
|
||||
.append(ports.getHttpPort() != null ? " --port " + ports.getHttpPort() : "")
|
||||
.append(ports.getGrpcPort() != null ? " --grpc-port " + ports.getGrpcPort() : "")
|
||||
.append(String.format(DAPR_COMMAND, serviceClass.getCanonicalName(),
|
||||
ports.getAppPort() != null ? ports.getAppPort().toString() : ""));
|
||||
return stringBuilder.toString();
|
||||
}
|
||||
|
||||
private static void assertListeningOnPort(int port) {
|
||||
System.out.printf("Checking port %d ...\n", port);
|
||||
|
||||
java.net.SocketAddress socketAddress = new java.net.InetSocketAddress(io.dapr.utils.Constants.DEFAULT_HOSTNAME, port);
|
||||
try (java.net.Socket socket = new java.net.Socket()) {
|
||||
socket.connect(socketAddress, 1000);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
System.out.printf("Confirmed listening on port %d.\n", port);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it;
|
||||
|
||||
public class Retry {
|
||||
|
||||
private Retry() {}
|
||||
|
||||
public static void callWithRetry(Runnable function, long retryTimeoutMilliseconds) throws InterruptedException {
|
||||
long started = System.currentTimeMillis();
|
||||
while (true) {
|
||||
Throwable exception;
|
||||
try {
|
||||
function.run();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
exception = e;
|
||||
} catch (AssertionError e) {
|
||||
exception = e;
|
||||
}
|
||||
|
||||
if (System.currentTimeMillis() - started >= retryTimeoutMilliseconds) {
|
||||
throw new RuntimeException(exception);
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.actors;
|
||||
|
||||
import io.dapr.actors.ActorId;
|
||||
import io.dapr.actors.client.ActorProxy;
|
||||
import io.dapr.actors.client.ActorProxyBuilder;
|
||||
import io.dapr.it.BaseIT;
|
||||
import io.dapr.it.actors.services.springboot.ActorService;
|
||||
import io.dapr.it.services.EmptyService;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static io.dapr.it.Retry.callWithRetry;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class ActivationDeactivationIT extends BaseIT {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(ActivationDeactivationIT.class);
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
// The call below will fail if service cannot start successfully.
|
||||
startDaprApp(
|
||||
ActivationDeactivationIT.class.getSimpleName(),
|
||||
ActorService.SUCCESS_MESSAGE,
|
||||
ActorService.class,
|
||||
true,
|
||||
60000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void activateInvokeDeactivate() throws Exception {
|
||||
// The call below will fail if service cannot start successfully.
|
||||
startDaprApp(
|
||||
this.getClass().getSimpleName(),
|
||||
EmptyService.SUCCESS_MESSAGE,
|
||||
EmptyService.class,
|
||||
false,
|
||||
20000);
|
||||
// TODO: Figure out why this wait is needed to make the actor calls work. Where is the delay coming from?
|
||||
Thread.sleep(120000);
|
||||
|
||||
final AtomicInteger atomicInteger = new AtomicInteger(1);
|
||||
String actorType = "DemoActorTest";
|
||||
DefaultObjectSerializer serializer = new DefaultObjectSerializer();
|
||||
logger.debug("Creating proxy builder");
|
||||
ActorProxyBuilder proxyBuilder = new ActorProxyBuilder(actorType, serializer);
|
||||
logger.debug("Creating actorId");
|
||||
ActorId actorId1 = new ActorId(Integer.toString(atomicInteger.getAndIncrement()));
|
||||
logger.debug("Building proxy");
|
||||
ActorProxy proxy = proxyBuilder.build(actorId1);
|
||||
|
||||
callWithRetry(() -> {
|
||||
logger.debug("Invoking Say from Proxy");
|
||||
String sayResponse = proxy.invokeActorMethod("say", "message", String.class).block();
|
||||
logger.debug("asserting not null response: [" + sayResponse + "]");
|
||||
assertNotNull(sayResponse);
|
||||
}, 60000);
|
||||
|
||||
logger.debug("Retrieving active Actors");
|
||||
List<String> activeActors = proxy.invokeActorMethod("retrieveActiveActors", null, List.class).block();
|
||||
logger.debug("Active actors: [" + activeActors.toString() + "]");
|
||||
assertTrue("Expecting actorId:[" + actorId1.toString() + "]", activeActors.contains(actorId1.toString()));
|
||||
|
||||
ActorId actorId2 = new ActorId(Integer.toString(atomicInteger.getAndIncrement()));
|
||||
ActorProxy proxy2 = proxyBuilder.build(actorId2);
|
||||
callWithRetry(() -> {
|
||||
List<String> activeActorsSecondTry = proxy2.invokeActorMethod("retrieveActiveActors", null, List.class).block();
|
||||
logger.debug("Active actors: [" + activeActorsSecondTry.toString() + "]");
|
||||
assertFalse("NOT Expecting actorId:[" + actorId1.toString() + "]", activeActorsSecondTry.contains(actorId1.toString()));
|
||||
}, 15000);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.actors.services.springboot;
|
||||
|
||||
import io.dapr.actors.runtime.ActorRuntime;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
|
||||
public class ActorService {
|
||||
|
||||
public static final String SUCCESS_MESSAGE = "actors: established connection to placement service at localhost";
|
||||
|
||||
/**
|
||||
* Starts the service.
|
||||
*
|
||||
* @param args Expects the port as only argument.
|
||||
* @throws Exception If cannot start service.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
// If port string is not valid, it will throw an exception.
|
||||
long port = Long.parseLong(args[0]);
|
||||
ActorRuntime.getInstance().registerActor(
|
||||
DemoActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
|
||||
|
||||
DaprApplication.start(port);
|
||||
}
|
||||
}
|
|
@ -1,4 +1,9 @@
|
|||
package io.dapr.actors.it.services.springboot;
|
||||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.actors.services.springboot;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
@ -6,11 +11,12 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||
/**
|
||||
* Dapr's HTTP callback implementation via SpringBoot.
|
||||
*/
|
||||
@SpringBootApplication(scanBasePackages = {"io.dapr.actors.it.services.springboot"})
|
||||
@SpringBootApplication(scanBasePackages = {"io.dapr.it.actors.services.springboot"})
|
||||
public class DaprApplication {
|
||||
|
||||
/**
|
||||
* Starts Dapr's callback in a given port.
|
||||
*
|
||||
* @param port Port to listen to.
|
||||
*/
|
||||
public static void start(long port) {
|
|
@ -1,20 +1,14 @@
|
|||
package io.dapr.actors.it.services.springboot;
|
||||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.actors.services.springboot;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.dapr.actors.runtime.ActorManagerTest;
|
||||
import io.dapr.actors.runtime.ActorRuntime;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Calendar;
|
||||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
|
||||
/**
|
||||
* SpringBoot Controller to handle callback APIs for Dapr.
|
||||
*/
|
|
@ -0,0 +1,14 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.actors.services.springboot;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface DemoActor {
|
||||
String say(String something);
|
||||
|
||||
List<String> retrieveActiveActors();
|
||||
}
|
|
@ -1,4 +1,9 @@
|
|||
package io.dapr.actors.it.services.springboot;
|
||||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.actors.services.springboot;
|
||||
|
||||
import io.dapr.actors.ActorId;
|
||||
import io.dapr.actors.runtime.AbstractActor;
|
||||
|
@ -32,8 +37,8 @@ public class DemoActorImpl extends AbstractActor implements DemoActor {
|
|||
|
||||
// Handles the request by printing message.
|
||||
System.out.println("Server say method for actor " +
|
||||
super.getId() + ": " +
|
||||
(something == null ? "" : something + " @ " + utcNowAsString));
|
||||
super.getId() + ": " +
|
||||
(something == null ? "" : something + " @ " + utcNowAsString));
|
||||
|
||||
// Now respond with current timestamp.
|
||||
return utcNowAsString;
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.binding.http;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.dapr.client.DaprClient;
|
||||
import io.dapr.client.DaprClientBuilder;
|
||||
import io.dapr.client.domain.Verb;
|
||||
import io.dapr.it.BaseIT;
|
||||
import io.dapr.it.DaprRun;
|
||||
import io.dapr.it.services.EmptyService;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
|
||||
import static io.dapr.it.Retry.callWithRetry;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* Service for input and output binding example.
|
||||
*/
|
||||
public class BindingIT extends BaseIT {
|
||||
|
||||
public static class MyClass {
|
||||
public MyClass() {
|
||||
}
|
||||
|
||||
public String message;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void inputOutputBinding() throws Exception {
|
||||
System.out.println("Working Directory = " + System.getProperty("user.dir"));
|
||||
|
||||
DaprRun daprRunInputBinding = startDaprApp(
|
||||
this.getClass().getSimpleName(),
|
||||
InputBindingService.SUCCESS_MESSAGE,
|
||||
InputBindingService.class,
|
||||
true,
|
||||
60000);
|
||||
// At this point, it is guaranteed that the service aboce is running and all ports being listened to.
|
||||
// TODO: figure out why this wait is needed for this scenario to work end-to-end. Kafka not up yet?
|
||||
Thread.sleep(120000);
|
||||
|
||||
DaprClient client = new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer()).build();
|
||||
|
||||
final String BINDING_NAME = "sample123";
|
||||
|
||||
// This is an example of sending data in a user-defined object. The input binding will receive:
|
||||
// {"message":"hello"}
|
||||
MyClass myClass = new MyClass();
|
||||
myClass.message = "hello";
|
||||
|
||||
System.out.println("sending first message");
|
||||
client.invokeBinding(BINDING_NAME, myClass).block();
|
||||
|
||||
// This is an example of sending a plain string. The input binding will receive
|
||||
// cat
|
||||
final String m = "cat";
|
||||
System.out.println("sending " + m);
|
||||
client.invokeBinding(BINDING_NAME, m).block();
|
||||
|
||||
callWithRetry(() -> {
|
||||
System.out.println("Checking results ...");
|
||||
final List<String> messages = client.invokeService(Verb.GET, daprRunInputBinding.getAppName(), "messages", null, List.class).block();
|
||||
assertEquals(2, messages.size());
|
||||
MyClass resultClass = null;
|
||||
try {
|
||||
resultClass = new ObjectMapper().readValue(messages.get(0), MyClass.class);
|
||||
} catch (Exception ex) {
|
||||
ex.printStackTrace();
|
||||
fail("Error on decode message 1");
|
||||
}
|
||||
|
||||
try {
|
||||
assertEquals("cat", new ObjectMapper().readValue(messages.get(1), String.class));
|
||||
} catch (Exception ex) {
|
||||
ex.printStackTrace();
|
||||
fail("Error on decode message 2");
|
||||
}
|
||||
assertEquals("hello", resultClass.message);
|
||||
}, 8000);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.binding.http;
|
||||
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.PutMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* SpringBoot Controller to handle input binding.
|
||||
*/
|
||||
@RestController
|
||||
public class InputBindingController {
|
||||
|
||||
private static final List<String> messagesReceived = new ArrayList();
|
||||
|
||||
@GetMapping("/dapr/config")
|
||||
public String daprConfig() throws Exception {
|
||||
return "{\"actorIdleTimeout\":\"5s\",\"actorScanInterval\":\"2s\",\"drainOngoingCallTimeout\":\"1s\",\"drainBalancedActors\":true,\"entities\":[]}";
|
||||
}
|
||||
|
||||
@PostMapping(path = "/sample123")
|
||||
@PutMapping(path = "/sample123")
|
||||
public void handleInputBinding(@RequestBody(required = false) String body) {
|
||||
messagesReceived.add(body);
|
||||
System.out.println("Received message through binding: " + (body == null ? "" : body));
|
||||
}
|
||||
|
||||
@GetMapping(path = "/messages")
|
||||
public List<String> getMessages() {
|
||||
return messagesReceived;
|
||||
}
|
||||
|
||||
@GetMapping(path = "/")
|
||||
public String hello() {
|
||||
return "hello";
|
||||
}
|
||||
|
||||
}
|
|
@ -3,27 +3,32 @@
|
|||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.services;
|
||||
package io.dapr.it.binding.http;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication(scanBasePackages = {"io.dapr.it.services"})
|
||||
public class InputBindingExample {
|
||||
@SpringBootApplication(scanBasePackages = {"io.dapr.it.binding.http"})
|
||||
public class InputBindingService {
|
||||
|
||||
public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running. Init Elapsed";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// If port string is not valid, it will throw an exception.
|
||||
int port = Integer.parseInt(args[0]);
|
||||
|
||||
System.out.printf("Service starting on port %d ...\n", port);
|
||||
// Start Dapr's callback endpoint.
|
||||
InputBindingExample.start(port);
|
||||
start(port);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts Dapr's callback in a given port.
|
||||
*
|
||||
* @param port Port to listen to.
|
||||
*/
|
||||
public static void start(int port) {
|
||||
SpringApplication app = new SpringApplication(InputBindingExample.class);
|
||||
private static void start(int port) {
|
||||
SpringApplication app = new SpringApplication(InputBindingService.class);
|
||||
app.run(String.format("--server.port=%d", port));
|
||||
}
|
||||
|
|
@ -7,13 +7,16 @@ package io.dapr.it.services;
|
|||
|
||||
/**
|
||||
* Use this class in order to run DAPR with any needed services, like states.
|
||||
*
|
||||
* <p>
|
||||
* To run manually, from repo root:
|
||||
* 1. mvn clean install
|
||||
* 2. dapr run --grpc-port 41707 --port 32851 -- mvn exec:java -Dexec.mainClass=io.dapr.it.services.EmptyService -Dexec.classpathScope="test" -Dexec.args="-p 44511 -grpcPort 41707 -httpPort 32851" -pl=sdk
|
||||
*/
|
||||
public class EmptyService {
|
||||
public static void main(String[] args) {
|
||||
System.out.println("Hello from EmptyService");
|
||||
}
|
||||
|
||||
public static final String SUCCESS_MESSAGE = "Hello from " + EmptyService.class.getSimpleName();
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
System.out.println(SUCCESS_MESSAGE);
|
||||
}
|
||||
}
|
|
@ -6,11 +6,14 @@
|
|||
package io.dapr.it.state;
|
||||
|
||||
import io.dapr.client.DaprClient;
|
||||
import io.dapr.client.DaprClientTestBuilder;
|
||||
import io.dapr.client.DaprClientBuilder;
|
||||
import io.dapr.client.DaprClientGrpcAdapter;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.it.BaseIT;
|
||||
import io.dapr.it.DaprRun;
|
||||
import io.dapr.it.services.EmptyService;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
@ -25,19 +28,23 @@ import static org.junit.Assert.*;
|
|||
*/
|
||||
public class GRPCStateClientIT extends BaseIT {
|
||||
|
||||
private static DaprClient daprClient;
|
||||
private static DaprRun daprRun;
|
||||
|
||||
private static DaprClient daprClient;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
daprIntegrationTestingRunner =
|
||||
createDaprIntegrationTestingRunner(
|
||||
"BUILD SUCCESS",
|
||||
daprRun = startDaprApp(
|
||||
GRPCStateClientIT.class.getSimpleName(),
|
||||
EmptyService.SUCCESS_MESSAGE,
|
||||
EmptyService.class,
|
||||
false,
|
||||
0
|
||||
);
|
||||
daprIntegrationTestingRunner.initializeDapr();
|
||||
daprClient = DaprClientTestBuilder.buildGrpcClient();
|
||||
5000
|
||||
);
|
||||
daprRun.switchToGRPC();
|
||||
daprClient = new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer()).build();
|
||||
|
||||
assertTrue(daprClient instanceof DaprClientGrpcAdapter);
|
||||
}
|
||||
|
||||
|
||||
|
@ -193,7 +200,7 @@ public class GRPCStateClientIT extends BaseIT {
|
|||
//review that state value changes
|
||||
assertNotNull(myDataResponse.getEtag());
|
||||
//review that the etag changes after an update
|
||||
assertNotEquals(firstETag,myDataResponse.getEtag());
|
||||
assertNotEquals(firstETag, myDataResponse.getEtag());
|
||||
assertNotNull(myDataResponse.getKey());
|
||||
assertNotNull(myDataResponse.getValue());
|
||||
assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
|
||||
|
@ -237,7 +244,6 @@ public class GRPCStateClientIT extends BaseIT {
|
|||
saveResponse.block();
|
||||
|
||||
|
||||
|
||||
response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
|
||||
//retrive the data wihout any etag
|
||||
myDataResponse = response.block();
|
||||
|
@ -245,7 +251,7 @@ public class GRPCStateClientIT extends BaseIT {
|
|||
//review that state value changes
|
||||
assertNotNull(myDataResponse.getEtag());
|
||||
//review that the etag changes after an update
|
||||
assertNotEquals(firstETag,myDataResponse.getEtag());
|
||||
assertNotEquals(firstETag, myDataResponse.getEtag());
|
||||
assertNotNull(myDataResponse.getKey());
|
||||
assertNotNull(myDataResponse.getValue());
|
||||
assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
|
||||
|
@ -296,7 +302,6 @@ public class GRPCStateClientIT extends BaseIT {
|
|||
final String stateKey = "myeKeyToBeDeletedWithWrongEtag";
|
||||
|
||||
|
||||
|
||||
//Create dummy data to be store
|
||||
MyData data = new MyData();
|
||||
data.setPropertyA("data in property A");
|
||||
|
@ -339,7 +344,6 @@ public class GRPCStateClientIT extends BaseIT {
|
|||
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null);
|
||||
|
||||
|
||||
|
||||
//create Dummy data
|
||||
MyData data = new MyData();
|
||||
data.setPropertyA("data in property A");
|
||||
|
@ -397,7 +401,6 @@ public class GRPCStateClientIT extends BaseIT {
|
|||
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE, null);
|
||||
|
||||
|
||||
|
||||
//create Dummy data
|
||||
MyData data = new MyData();
|
||||
data.setPropertyA("data in property A");
|
||||
|
@ -447,14 +450,13 @@ public class GRPCStateClientIT extends BaseIT {
|
|||
assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
|
||||
}
|
||||
|
||||
@Test(timeout=13000)
|
||||
@Test(timeout = 13000)
|
||||
public void saveDeleteWithRetry() {
|
||||
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
|
||||
StateOptions.RetryPolicy retryPolicy= new StateOptions.RetryPolicy(Duration.ofSeconds(3),3, StateOptions.RetryPolicy.Pattern.LINEAR);
|
||||
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3, StateOptions.RetryPolicy.Pattern.LINEAR);
|
||||
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
|
||||
|
||||
|
||||
|
||||
//Create dummy data to be store
|
||||
MyData data = new MyData();
|
||||
data.setPropertyA("data in property A");
|
||||
|
@ -484,25 +486,24 @@ public class GRPCStateClientIT extends BaseIT {
|
|||
try {
|
||||
//delete action
|
||||
deleteResponse.block();
|
||||
}catch(RuntimeException ex){
|
||||
} catch (RuntimeException ex) {
|
||||
assertTrue(ex.getMessage().contains("failed to set value after 3 retries"));
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println("DEBUG: Logic A took " + (end - start) + " MilliSeconds");
|
||||
long elapsedTime = end -start;
|
||||
assertTrue(elapsedTime>9000 && elapsedTime<9200);
|
||||
long elapsedTime = end - start;
|
||||
assertTrue(elapsedTime > 9000 && elapsedTime < 9200);
|
||||
|
||||
}
|
||||
|
||||
@Ignore("Ignored as an issue on DAPR")
|
||||
@Test(timeout=13000)
|
||||
@Test(timeout = 13000)
|
||||
public void saveUpdateWithRetry() {
|
||||
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
|
||||
StateOptions.RetryPolicy retryPolicy= new StateOptions.RetryPolicy(Duration.ofSeconds(4),3, StateOptions.RetryPolicy.Pattern.LINEAR);
|
||||
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3, StateOptions.RetryPolicy.Pattern.LINEAR);
|
||||
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
|
||||
|
||||
|
||||
|
||||
//Create dummy data to be store
|
||||
MyData data = new MyData();
|
||||
data.setPropertyA("data in property A");
|
||||
|
@ -533,13 +534,13 @@ public class GRPCStateClientIT extends BaseIT {
|
|||
|
||||
try {
|
||||
saveResponse.block();
|
||||
}catch(RuntimeException ex){
|
||||
} catch (RuntimeException ex) {
|
||||
assertTrue(ex.getMessage().contains("failed to set value after 3 retries"));
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println("DEBUG: Logic A took " + (end - start) + " MilliSeconds");
|
||||
long elapsedTime = end -start;
|
||||
assertTrue(elapsedTime>9000 && elapsedTime<9200);
|
||||
long elapsedTime = end - start;
|
||||
assertTrue(elapsedTime > 9000 && elapsedTime < 9200);
|
||||
|
||||
}
|
||||
|
|
@ -8,36 +8,25 @@ package io.dapr.it.state;
|
|||
import io.dapr.DaprGrpc;
|
||||
import io.dapr.DaprProtos;
|
||||
import io.dapr.it.BaseIT;
|
||||
import io.dapr.it.DaprIntegrationTestingRunner;
|
||||
import io.dapr.it.services.HelloWorldGrpcStateService;
|
||||
import io.dapr.it.DaprRun;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static io.dapr.it.DaprIntegrationTestingRunner.DAPR_FREEPORTS;
|
||||
|
||||
public class HelloWorldClientIT extends BaseIT {
|
||||
|
||||
private static DaprIntegrationTestingRunner daprIntegrationTestingRunner;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
daprIntegrationTestingRunner =
|
||||
createDaprIntegrationTestingRunner(
|
||||
"BUILD SUCCESS",
|
||||
@Test
|
||||
public void testHelloWorldState() throws Exception {
|
||||
DaprRun daprRun = startDaprApp(
|
||||
HelloWorldClientIT.class.getSimpleName(),
|
||||
HelloWorldGrpcStateService.SUCCESS_MESSAGE,
|
||||
HelloWorldGrpcStateService.class,
|
||||
false,
|
||||
2000
|
||||
);
|
||||
daprIntegrationTestingRunner.initializeDapr();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHelloWorldState(){
|
||||
);
|
||||
ManagedChannel channel =
|
||||
ManagedChannelBuilder.forAddress("localhost", DAPR_FREEPORTS.getGrpcPort()).usePlaintext().build();
|
||||
ManagedChannelBuilder.forAddress("localhost", daprRun.getGrpcPort()).usePlaintext().build();
|
||||
DaprGrpc.DaprBlockingStub client = DaprGrpc.newBlockingStub(channel);
|
||||
|
||||
String key = "mykey";
|
||||
|
@ -49,7 +38,7 @@ public class HelloWorldClientIT extends BaseIT {
|
|||
DaprProtos.GetStateResponseEnvelope response = client.getState(req);
|
||||
String value = response.getData().getValue().toStringUtf8();
|
||||
System.out.println("Got: " + value);
|
||||
Assert.assertEquals("Hello World",value);
|
||||
Assert.assertEquals("Hello World", value);
|
||||
}
|
||||
|
||||
// Then, delete it.
|
||||
|
@ -70,7 +59,7 @@ public class HelloWorldClientIT extends BaseIT {
|
|||
DaprProtos.GetStateResponseEnvelope response = client.getState(req);
|
||||
String value = response.getData().getValue().toStringUtf8();
|
||||
System.out.println("Got: " + value);
|
||||
Assert.assertEquals("",value);
|
||||
Assert.assertEquals("", value);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.state;
|
||||
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.dapr.DaprGrpc;
|
||||
import io.dapr.DaprGrpc.DaprBlockingStub;
|
||||
import io.dapr.DaprProtos.SaveStateEnvelope;
|
||||
import io.dapr.DaprProtos.StateRequest;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
|
||||
|
||||
/**
|
||||
* Simple example.
|
||||
* To run manually, from repo root:
|
||||
* 1. mvn clean install
|
||||
* 2. dapr run --grpc-port 50001 -- mvn exec:java -Dexec.mainClass=io.dapr.it.state.HelloWorldGrpcStateService -Dexec.classpathScope="test" -pl=sdk
|
||||
*/
|
||||
public class HelloWorldGrpcStateService {
|
||||
|
||||
public static final String SUCCESS_MESSAGE = "Hello from " + HelloWorldGrpcStateService.class.getSimpleName();
|
||||
|
||||
public static void main(String[] args) {
|
||||
String grpcPort = System.getenv("DAPR_GRPC_PORT");
|
||||
|
||||
// If port string is not valid, it will throw an exception.
|
||||
int grpcPortInt = Integer.parseInt(grpcPort);
|
||||
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", grpcPortInt).usePlaintext().build();
|
||||
DaprBlockingStub client = DaprGrpc.newBlockingStub(channel);
|
||||
|
||||
String key = "mykey";
|
||||
// First, write key-value pair.
|
||||
|
||||
String value = "Hello World";
|
||||
StateRequest req = StateRequest
|
||||
.newBuilder()
|
||||
.setKey(key)
|
||||
.setValue(Any.newBuilder().setValue(ByteString.copyFromUtf8(value)).build())
|
||||
.build();
|
||||
SaveStateEnvelope state = SaveStateEnvelope.newBuilder()
|
||||
.addRequests(req)
|
||||
.build();
|
||||
client.saveState(state);
|
||||
System.out.println("Saved!");
|
||||
channel.shutdown();
|
||||
|
||||
System.out.println(SUCCESS_MESSAGE);
|
||||
}
|
||||
}
|
|
@ -7,11 +7,12 @@ package io.dapr.it.state;
|
|||
|
||||
import io.dapr.client.DaprClient;
|
||||
import io.dapr.client.DaprClientBuilder;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.it.BaseIT;
|
||||
import io.dapr.it.DaprRun;
|
||||
import io.dapr.it.services.EmptyService;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
|
@ -25,16 +26,17 @@ import java.time.Duration;
|
|||
*/
|
||||
public class HttpStateClientIT extends BaseIT {
|
||||
|
||||
private static DaprRun daprRun;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
daprIntegrationTestingRunner =
|
||||
createDaprIntegrationTestingRunner(
|
||||
"BUILD SUCCESS",
|
||||
daprRun = startDaprApp(
|
||||
HttpStateClientIT.class.getSimpleName(),
|
||||
EmptyService.SUCCESS_MESSAGE,
|
||||
EmptyService.class,
|
||||
false,
|
||||
0
|
||||
);
|
||||
daprIntegrationTestingRunner.initializeDapr();
|
||||
1000
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -192,7 +194,7 @@ public class HttpStateClientIT extends BaseIT {
|
|||
//review that state value changes
|
||||
Assert.assertNotNull(myDataResponse.getEtag());
|
||||
//review that the etag changes after an update
|
||||
Assert.assertNotEquals(firstETag,myDataResponse.getEtag());
|
||||
Assert.assertNotEquals(firstETag, myDataResponse.getEtag());
|
||||
Assert.assertNotNull(myDataResponse.getKey());
|
||||
Assert.assertNotNull(myDataResponse.getValue());
|
||||
Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
|
||||
|
@ -245,7 +247,7 @@ public class HttpStateClientIT extends BaseIT {
|
|||
//review that state value changes
|
||||
Assert.assertNotNull(myDataResponse.getEtag());
|
||||
//review that the etag changes after an update
|
||||
Assert.assertNotEquals(firstETag,myDataResponse.getEtag());
|
||||
Assert.assertNotEquals(firstETag, myDataResponse.getEtag());
|
||||
Assert.assertNotNull(myDataResponse.getKey());
|
||||
Assert.assertNotNull(myDataResponse.getValue());
|
||||
Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
|
||||
|
@ -446,10 +448,10 @@ public class HttpStateClientIT extends BaseIT {
|
|||
Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
|
||||
}
|
||||
|
||||
@Test(timeout=13000)
|
||||
@Test(timeout = 13000)
|
||||
public void saveDeleteWithRetry() {
|
||||
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
|
||||
StateOptions.RetryPolicy retryPolicy= new StateOptions.RetryPolicy(Duration.ofSeconds(3),3, StateOptions.RetryPolicy.Pattern.LINEAR);
|
||||
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3, StateOptions.RetryPolicy.Pattern.LINEAR);
|
||||
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
|
||||
|
||||
//create DAPR client
|
||||
|
@ -483,21 +485,21 @@ public class HttpStateClientIT extends BaseIT {
|
|||
try {
|
||||
//delete action
|
||||
deleteResponse.block();
|
||||
}catch(RuntimeException ex){
|
||||
} catch (RuntimeException ex) {
|
||||
Assert.assertTrue(ex.getMessage().contains("failed to set value after 3 retries"));
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println("DEBUG: Logic A took " + (end - start) + " MilliSeconds");
|
||||
long elapsedTime = end -start;
|
||||
Assert.assertTrue(elapsedTime>9000 && elapsedTime<9200);
|
||||
long elapsedTime = end - start;
|
||||
Assert.assertTrue(elapsedTime > 9000 && elapsedTime < 9200);
|
||||
|
||||
}
|
||||
|
||||
@Ignore("Ignored as an issue on DAPR")
|
||||
@Test(timeout=13000)
|
||||
@Test(timeout = 13000)
|
||||
public void saveUpdateWithRetry() {
|
||||
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
|
||||
StateOptions.RetryPolicy retryPolicy= new StateOptions.RetryPolicy(Duration.ofSeconds(4),3, StateOptions.RetryPolicy.Pattern.EXPONENTIAL);
|
||||
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3, StateOptions.RetryPolicy.Pattern.EXPONENTIAL);
|
||||
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
|
||||
|
||||
//create DAPR client
|
||||
|
@ -532,13 +534,13 @@ public class HttpStateClientIT extends BaseIT {
|
|||
|
||||
try {
|
||||
saveResponse.block();
|
||||
}catch(RuntimeException ex){
|
||||
} catch (RuntimeException ex) {
|
||||
Assert.assertTrue(ex.getMessage().contains("failed to set value after 3 retries"));
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println("DEBUG: Logic A took " + (end - start) + " MilliSeconds");
|
||||
long elapsedTime = end -start;
|
||||
Assert.assertTrue(elapsedTime>9000 && elapsedTime<9200);
|
||||
long elapsedTime = end - start;
|
||||
Assert.assertTrue(elapsedTime > 9000 && elapsedTime < 9200);
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.state;
|
||||
|
||||
public class MyData {
|
||||
|
||||
/// Gets or sets the value for PropertyA.
|
||||
private String propertyA;
|
||||
|
||||
/// Gets or sets the value for PropertyB.
|
||||
private String propertyB;
|
||||
|
||||
private MyData myData;
|
||||
|
||||
public String getPropertyB() {
|
||||
return propertyB;
|
||||
}
|
||||
|
||||
public void setPropertyB(String propertyB) {
|
||||
this.propertyB = propertyB;
|
||||
}
|
||||
|
||||
public String getPropertyA() {
|
||||
return propertyA;
|
||||
}
|
||||
|
||||
public void setPropertyA(String propertyA) {
|
||||
this.propertyA = propertyA;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MyData{" +
|
||||
"propertyA='" + propertyA + '\'' +
|
||||
", propertyB='" + propertyB + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
public MyData getMyData() {
|
||||
return myData;
|
||||
}
|
||||
|
||||
public void setMyData(MyData myData) {
|
||||
this.myData = myData;
|
||||
}
|
||||
}
|
|
@ -28,7 +28,6 @@
|
|||
</repositories>
|
||||
|
||||
<properties>
|
||||
<skipITs>false</skipITs>
|
||||
<maven.deploy.skip>false</maven.deploy.skip>
|
||||
</properties>
|
||||
|
||||
|
@ -80,12 +79,6 @@
|
|||
<version>1.3.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.stefanbirkner</groupId>
|
||||
<artifactId>system-rules</artifactId>
|
||||
<version>1.19.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
|
@ -121,7 +114,6 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
|
|
|
@ -8,6 +8,7 @@ import io.dapr.DaprGrpc;
|
|||
import io.dapr.serializer.DaprObjectSerializer;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import io.dapr.utils.Constants;
|
||||
import io.dapr.utils.Properties;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import okhttp3.OkHttpClient;
|
||||
|
@ -18,18 +19,6 @@ import okhttp3.OkHttpClient;
|
|||
*/
|
||||
public class DaprClientBuilder {
|
||||
|
||||
/**
|
||||
* HTTP port for Dapr after checking environment variable.
|
||||
*/
|
||||
private static final int HTTP_PORT = DaprClientBuilder.getEnvHttpPortOrDefault(
|
||||
Constants.ENV_DAPR_HTTP_PORT, Constants.DEFAULT_HTTP_PORT);
|
||||
|
||||
/**
|
||||
* GRPC port for Dapr after checking environment variable.
|
||||
*/
|
||||
private static final int GRPC_PORT = DaprClientBuilder.getEnvHttpPortOrDefault(
|
||||
Constants.ENV_DAPR_GRPC_PORT, Constants.DEFAULT_GRPC_PORT);
|
||||
|
||||
/**
|
||||
* Serializer used for request and response objects in DaprClient.
|
||||
*/
|
||||
|
@ -41,26 +30,9 @@ public class DaprClientBuilder {
|
|||
private final DaprObjectSerializer stateSerializer;
|
||||
|
||||
/**
|
||||
* Finds the port defined by env variable or sticks to default.
|
||||
* @param envName Name of env variable with the port.
|
||||
* @param defaultPort Default port if cannot find a valid port.
|
||||
*
|
||||
* @return Port from env variable or default.
|
||||
* Determine if this builder will create GRPC clients instead of HTTP clients.
|
||||
*/
|
||||
private static int getEnvHttpPortOrDefault(String envName, int defaultPort) {
|
||||
String envPort = System.getenv(envName);
|
||||
if (envPort == null || envPort.trim().isEmpty()) {
|
||||
return defaultPort;
|
||||
}
|
||||
|
||||
try {
|
||||
return Integer.parseInt(envPort.trim());
|
||||
} catch (NumberFormatException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
return defaultPort;
|
||||
}
|
||||
private final boolean useGRPC;
|
||||
|
||||
/**
|
||||
* Creates a constructor for DaprClient.
|
||||
|
@ -80,6 +52,7 @@ public class DaprClientBuilder {
|
|||
|
||||
this.objectSerializer = objectSerializer;
|
||||
this.stateSerializer = stateSerializer;
|
||||
this.useGRPC = Properties.USE_GRPC.get();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -89,6 +62,10 @@ public class DaprClientBuilder {
|
|||
* @throws java.lang.IllegalStateException if any required field is missing
|
||||
*/
|
||||
public DaprClient build() {
|
||||
if (this.useGRPC) {
|
||||
return buildDaprClientGrpc();
|
||||
}
|
||||
|
||||
return buildDaprClientHttp();
|
||||
}
|
||||
|
||||
|
@ -99,10 +76,11 @@ public class DaprClientBuilder {
|
|||
* @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number.
|
||||
*/
|
||||
private DaprClient buildDaprClientGrpc() {
|
||||
if (GRPC_PORT <= 0) {
|
||||
int port = Properties.GRPC_PORT.get();
|
||||
if (port <= 0) {
|
||||
throw new IllegalStateException("Invalid port.");
|
||||
}
|
||||
ManagedChannel channel = ManagedChannelBuilder.forAddress(Constants.DEFAULT_HOSTNAME, GRPC_PORT).usePlaintext().build();
|
||||
ManagedChannel channel = ManagedChannelBuilder.forAddress(Constants.DEFAULT_HOSTNAME, port).usePlaintext().build();
|
||||
return new DaprClientGrpcAdapter(DaprGrpc.newFutureStub(channel), this.objectSerializer, this.stateSerializer);
|
||||
}
|
||||
|
||||
|
@ -112,11 +90,12 @@ public class DaprClientBuilder {
|
|||
* @return DaprClient over HTTP.
|
||||
*/
|
||||
private DaprClient buildDaprClientHttp() {
|
||||
if (HTTP_PORT <= 0) {
|
||||
int port = Properties.HTTP_PORT.get();
|
||||
if (port <= 0) {
|
||||
throw new IllegalStateException("Invalid port.");
|
||||
}
|
||||
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
|
||||
DaprHttp daprHttp = new DaprHttp(HTTP_PORT, okHttpClient);
|
||||
DaprHttp daprHttp = new DaprHttp(port, okHttpClient);
|
||||
return new DaprClientHttpAdapter(daprHttp, this.objectSerializer, this.stateSerializer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.Map;
|
|||
* @see io.dapr.DaprGrpc
|
||||
* @see io.dapr.client.DaprClient
|
||||
*/
|
||||
class DaprClientGrpcAdapter implements DaprClient {
|
||||
public class DaprClientGrpcAdapter implements DaprClient {
|
||||
|
||||
/**
|
||||
* The GRPC client to be used
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
*/
|
||||
package io.dapr.client;
|
||||
|
||||
import io.dapr.utils.Constants;
|
||||
import io.dapr.utils.Properties;
|
||||
import okhttp3.OkHttpClient;
|
||||
|
||||
import java.time.Duration;
|
||||
|
@ -14,11 +14,6 @@ import java.time.Duration;
|
|||
*/
|
||||
public class DaprHttpBuilder {
|
||||
|
||||
/**
|
||||
* Default port for Dapr after checking environment variable.
|
||||
*/
|
||||
private static final int PORT = DaprHttpBuilder.getEnvPortOrDefault();
|
||||
|
||||
/**
|
||||
* Read timeout for http calls.
|
||||
*/
|
||||
|
@ -29,26 +24,6 @@ public class DaprHttpBuilder {
|
|||
*/
|
||||
private Duration readTimeout = DEFAULT_READ_TIMEOUT;
|
||||
|
||||
/**
|
||||
* Tries to get a valid port from environment variable or returns default.
|
||||
*
|
||||
* @return Port defined in env variable or default.
|
||||
*/
|
||||
private static int getEnvPortOrDefault() {
|
||||
String envPort = System.getenv(Constants.ENV_DAPR_HTTP_PORT);
|
||||
if (envPort == null || envPort.trim().isEmpty()) {
|
||||
return Constants.DEFAULT_HTTP_PORT;
|
||||
}
|
||||
|
||||
try {
|
||||
return Integer.parseInt(envPort.trim());
|
||||
} catch (NumberFormatException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
return Constants.DEFAULT_HTTP_PORT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the read timeout duration for the instance to be built.
|
||||
*
|
||||
|
@ -79,6 +54,6 @@ public class DaprHttpBuilder {
|
|||
OkHttpClient.Builder builder = new OkHttpClient.Builder();
|
||||
builder.readTimeout(DEFAULT_READ_TIMEOUT);
|
||||
OkHttpClient okHttpClient = builder.build();
|
||||
return new DaprHttp(PORT, okHttpClient);
|
||||
return new DaprHttp(Properties.HTTP_PORT.get(), okHttpClient);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,31 +19,6 @@ public final class Constants {
|
|||
*/
|
||||
public static final String DEFAULT_HOSTNAME = "localhost";
|
||||
|
||||
/**
|
||||
* Dapr's default http base url.
|
||||
*/
|
||||
public static final String DEFAULT_BASE_HTTP_URL = "http://" + DEFAULT_HOSTNAME;
|
||||
|
||||
/**
|
||||
* Dapr's default HTTP port.
|
||||
*/
|
||||
public static final int DEFAULT_HTTP_PORT = 3500;
|
||||
|
||||
/**
|
||||
* Dapr's default GRPC port.
|
||||
*/
|
||||
public static final int DEFAULT_GRPC_PORT = 50051;
|
||||
|
||||
/**
|
||||
* Environment variable used to set Dapr's HTTP port.
|
||||
*/
|
||||
public static final String ENV_DAPR_HTTP_PORT = "DAPR_HTTP_PORT";
|
||||
|
||||
/**
|
||||
* Environment variable used to set Dapr's GRPC port.
|
||||
*/
|
||||
public static final String ENV_DAPR_GRPC_PORT = "DAPR_GRPC_PORT";
|
||||
|
||||
/**
|
||||
* Header used for request id in Dapr.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.utils;
|
||||
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Global properties for Dapr's SDK, using Supplier so they are dynamically resolved.
|
||||
*/
|
||||
public class Properties {
|
||||
|
||||
/**
|
||||
* Dapr's default HTTP port.
|
||||
*/
|
||||
private static final Integer DEFAULT_HTTP_PORT = 3500;
|
||||
|
||||
/**
|
||||
* Dapr's default GRPC port.
|
||||
*/
|
||||
private static final Integer DEFAULT_GRPC_PORT = 50051;
|
||||
|
||||
/**
|
||||
* Dapr's default GRPC port.
|
||||
*/
|
||||
private static final Boolean DEFAULT_GRPC_ENABLED = false;
|
||||
|
||||
/**
|
||||
* HTTP port for Dapr after checking system property and environment variable.
|
||||
*/
|
||||
public static final Supplier<Integer> HTTP_PORT = () -> getIntOrDefault(
|
||||
"dapr.http.port", "DAPR_HTTP_PORT", DEFAULT_HTTP_PORT);
|
||||
|
||||
/**
|
||||
* GRPC port for Dapr after checking system property and environment variable.
|
||||
*/
|
||||
public static final Supplier<Integer> GRPC_PORT = () -> getIntOrDefault(
|
||||
"dapr.grpc.port", "DAPR_GRPC_PORT", DEFAULT_GRPC_PORT);
|
||||
|
||||
/**
|
||||
* Determines if Dapr client will use GRPC to talk to Dapr's side car.
|
||||
*/
|
||||
public static final Supplier<Boolean> USE_GRPC = () -> getBooleanOrDefault(
|
||||
"dapr.grpc.enabled", "DAPR_GRPC_ENABLED", DEFAULT_GRPC_ENABLED);
|
||||
|
||||
/**
|
||||
* Finds an integer defined by system property first, then env variable or sticks to default.
|
||||
* @param propName Name of the JVM's system property to override (1st).
|
||||
* @param envName Name of env variable (2nd).
|
||||
* @param defaultValue Default value if cannot find a valid config (last).
|
||||
*
|
||||
* @return Integer from system property (1st) or env variable (2nd) or default (last).
|
||||
*/
|
||||
public static Integer getIntOrDefault(String propName, String envName, Integer defaultValue) {
|
||||
return getValueOrDefault(propName, envName, defaultValue, s -> Integer.valueOf(s));
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds a boolean defined by system property first, then env variable or sticks to default.
|
||||
* @param propName Name of the JVM's system property to override (1st).
|
||||
* @param envName Name of env variable (2nd).
|
||||
* @param defaultValue Default value if cannot find a valid config (last).
|
||||
*
|
||||
* @return Boolean from system property (1st) or env variable (2nd) or default (last).
|
||||
*/
|
||||
public static Boolean getBooleanOrDefault(String propName, String envName, Boolean defaultValue) {
|
||||
return getValueOrDefault(propName, envName, defaultValue, s -> Boolean.valueOf(s));
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds a value defined by system property first, then env variable or sticks to default.
|
||||
* @param propName Name of the JVM's system property to override (1st).
|
||||
* @param envName Name of env variable (2nd).
|
||||
* @param defaultValue Default value if cannot find a valid config (last).
|
||||
*
|
||||
* @return Value from system property (1st) or env variable (2nd) or default (last).
|
||||
*/
|
||||
private static <T> T getValueOrDefault(String propName, String envName, T defaultValue, Function<String, T> parser) {
|
||||
String propValue = System.getProperty(propName);
|
||||
if (propValue != null && !propValue.trim().isEmpty()) {
|
||||
try {
|
||||
return parser.apply(propValue);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
// OK, we tried. Falling back to system environment variable.
|
||||
}
|
||||
}
|
||||
|
||||
String envValue = System.getenv(envName);
|
||||
if (envValue == null || envValue.trim().isEmpty()) {
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
try {
|
||||
return parser.apply(envValue);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
// OK, we tried. Falling back to default.
|
||||
}
|
||||
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,39 +1,21 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.client;
|
||||
|
||||
import io.dapr.DaprGrpc;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import io.dapr.utils.Constants;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
|
||||
/**
|
||||
* Builder for DaprClient used in tests only.
|
||||
*/
|
||||
public class DaprClientTestBuilder {
|
||||
|
||||
/**
|
||||
* Builds a DaprClient.
|
||||
* @param client DaprHttp used for http calls (can be mocked or stubbed)
|
||||
* @return New instance of DaprClient.
|
||||
*/
|
||||
public static DaprClient buildHttpClient(DaprHttp client) {
|
||||
return new DaprClientHttpAdapter(client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a DaprGrpcClient.
|
||||
* @return New instance of DaprClient.
|
||||
*/
|
||||
public static DaprClient buildGrpcClient(){
|
||||
int gprcPort = Integer.parseInt(System.getenv(Constants.ENV_DAPR_GRPC_PORT));
|
||||
ManagedChannel channel = ManagedChannelBuilder.forAddress(Constants.DEFAULT_HOSTNAME, gprcPort).usePlaintext().build();
|
||||
return new DaprClientGrpcAdapter(DaprGrpc.newFutureStub(channel),
|
||||
new DefaultObjectSerializer(),
|
||||
new DefaultObjectSerializer());
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.client;
|
||||
|
||||
/**
|
||||
* Builder for DaprClient used in tests only.
|
||||
*/
|
||||
public class DaprClientTestBuilder {
|
||||
|
||||
/**
|
||||
* Builds a DaprClient.
|
||||
* @param client DaprHttp used for http calls (can be mocked or stubbed)
|
||||
* @return New instance of DaprClient.
|
||||
*/
|
||||
public static DaprClient buildHttpClient(DaprHttp client) {
|
||||
return new DaprClientHttpAdapter(client);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,84 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.contrib.java.lang.system.EnvironmentVariables;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static io.dapr.it.DaprIntegrationTestingRunner.DAPR_FREEPORTS;
|
||||
|
||||
@Ignore
|
||||
public class BaseIT {
|
||||
|
||||
protected static DaprIntegrationTestingRunner daprIntegrationTestingRunner;
|
||||
|
||||
|
||||
@ClassRule
|
||||
public static final EnvironmentVariables environmentVariables = new EnvironmentVariables();
|
||||
|
||||
@BeforeClass
|
||||
public static void setEnvironmentVariables(){
|
||||
environmentVariables.set("DAPR_HTTP_PORT", String.valueOf(DAPR_FREEPORTS.getHttpPort()));
|
||||
environmentVariables.set("DAPR_GRPC_PORT", String.valueOf(DAPR_FREEPORTS.getGrpcPort()));
|
||||
}
|
||||
|
||||
public static DaprIntegrationTestingRunner createDaprIntegrationTestingRunner(String successMessage, Class serviceClass, Boolean useAppPort, int sleepTime) {
|
||||
return new DaprIntegrationTestingRunner(successMessage, serviceClass, useAppPort, sleepTime);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanUp() {
|
||||
Optional.ofNullable(daprIntegrationTestingRunner).ifPresent(daprRunner -> daprRunner.destroyDapr());
|
||||
}
|
||||
|
||||
public static class MyData {
|
||||
|
||||
/// Gets or sets the value for PropertyA.
|
||||
private String propertyA;
|
||||
|
||||
/// Gets or sets the value for PropertyB.
|
||||
private String propertyB;
|
||||
|
||||
private MyData myData;
|
||||
|
||||
public String getPropertyB() {
|
||||
return propertyB;
|
||||
}
|
||||
|
||||
public void setPropertyB(String propertyB) {
|
||||
this.propertyB = propertyB;
|
||||
}
|
||||
|
||||
public String getPropertyA() {
|
||||
return propertyA;
|
||||
}
|
||||
|
||||
public void setPropertyA(String propertyA) {
|
||||
this.propertyA = propertyA;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MyData{" +
|
||||
"propertyA='" + propertyA + '\'' +
|
||||
", propertyB='" + propertyB + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
public MyData getMyData() {
|
||||
return myData;
|
||||
}
|
||||
|
||||
public void setMyData(MyData myData) {
|
||||
this.myData = myData;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,158 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
|
||||
public class DaprIntegrationTestingRunner {
|
||||
|
||||
public static DaprIntegrationTestingRunner.DaprFreePorts DAPR_FREEPORTS;
|
||||
|
||||
static {
|
||||
try {
|
||||
DAPR_FREEPORTS = new DaprIntegrationTestingRunner.DaprFreePorts().initPorts();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private Runtime rt = Runtime.getRuntime();
|
||||
private Process proc;
|
||||
|
||||
private String successMessage;
|
||||
private Class serviceClass;
|
||||
private Boolean useAppPort;
|
||||
private int sleepTime;
|
||||
private String appName;
|
||||
|
||||
DaprIntegrationTestingRunner(String successMessage, Class serviceClass, Boolean useAppPort, int sleepTime) {
|
||||
this.successMessage = successMessage;
|
||||
this.serviceClass = serviceClass;
|
||||
this.useAppPort = useAppPort;
|
||||
this.sleepTime = sleepTime;
|
||||
this.generateAppName();
|
||||
}
|
||||
|
||||
public DaprFreePorts initializeDapr() throws Exception {
|
||||
String daprCommand=this.buildDaprCommand();
|
||||
System.out.println(daprCommand);
|
||||
proc= rt.exec(daprCommand);
|
||||
|
||||
final Runnable stuffToDo = new Thread(() -> {
|
||||
try {
|
||||
try (InputStream stdin = proc.getInputStream()) {
|
||||
try(InputStreamReader isr = new InputStreamReader(stdin)) {
|
||||
try (BufferedReader br = new BufferedReader(isr)){
|
||||
String line;
|
||||
while ((line = br.readLine()) != null) {
|
||||
System.out.println(line);
|
||||
if (line.contains(successMessage)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
Assert.fail(ex.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
final ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
final Future future = executor.submit(stuffToDo);
|
||||
executor.shutdown(); // This does not cancel the already-scheduled task.
|
||||
future.get(1, TimeUnit.MINUTES);
|
||||
Thread.sleep(sleepTime);
|
||||
return DAPR_FREEPORTS;
|
||||
}
|
||||
|
||||
private static final String DAPR_RUN = "dapr run --app-id %s ";
|
||||
|
||||
// the arg in -Dexec.args is the app's port
|
||||
private static final String DAPR_COMMAND = " -- mvn exec:java -Dexec.mainClass=%s -Dexec.classpathScope=test -Dexec.args=\"%d\"";
|
||||
|
||||
private String buildDaprCommand(){
|
||||
StringBuilder stringBuilder= new StringBuilder(String.format(DAPR_RUN, this.getAppName()))
|
||||
.append(this.useAppPort ? "--app-port " + this.DAPR_FREEPORTS.appPort : "")
|
||||
.append(" --grpc-port ")
|
||||
.append(this.DAPR_FREEPORTS.grpcPort)
|
||||
.append(" --port ")
|
||||
.append(this.DAPR_FREEPORTS.httpPort)
|
||||
.append(String.format(DAPR_COMMAND, this.serviceClass.getCanonicalName(),this.DAPR_FREEPORTS.appPort));
|
||||
return stringBuilder.toString();
|
||||
}
|
||||
|
||||
private void generateAppName(){
|
||||
|
||||
this.appName=UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
private static Integer findRandomOpenPortOnAllLocalInterfaces() throws Exception {
|
||||
try (
|
||||
ServerSocket socket = new ServerSocket(0)
|
||||
) {
|
||||
return socket.getLocalPort();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public void destroyDapr() {
|
||||
Optional.ofNullable(rt).ifPresent( runtime -> {
|
||||
try {
|
||||
System.out.println("Start dapr Stop");
|
||||
runtime.exec("dapr stop --app-id " + this.getAppName());
|
||||
System.out.println("End Dapr Stop");
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
Optional.ofNullable(proc).ifPresent(process -> {
|
||||
System.out.println("Start process Stop");
|
||||
process.destroyForcibly();
|
||||
System.out.println("End process Stop");
|
||||
});
|
||||
}
|
||||
|
||||
public String getAppName() {
|
||||
return appName;
|
||||
}
|
||||
|
||||
public static class DaprFreePorts
|
||||
{
|
||||
public DaprFreePorts initPorts() throws Exception {
|
||||
this.appPort= findRandomOpenPortOnAllLocalInterfaces();
|
||||
this.grpcPort= findRandomOpenPortOnAllLocalInterfaces();
|
||||
this.httpPort= findRandomOpenPortOnAllLocalInterfaces();
|
||||
return this;
|
||||
}
|
||||
|
||||
private int grpcPort;
|
||||
|
||||
public int getGrpcPort() {
|
||||
return grpcPort;
|
||||
}
|
||||
|
||||
public int getHttpPort() {
|
||||
return httpPort;
|
||||
}
|
||||
|
||||
public int getAppPort() {
|
||||
return appPort;
|
||||
}
|
||||
|
||||
private int httpPort;
|
||||
|
||||
private int appPort;
|
||||
}
|
||||
}
|
|
@ -1,96 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.binding.http;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.dapr.client.DaprClient;
|
||||
import io.dapr.client.DaprClientBuilder;
|
||||
import io.dapr.client.domain.Verb;
|
||||
import io.dapr.it.BaseIT;
|
||||
import io.dapr.it.services.InputBindingExample;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* Service for output binding example.
|
||||
*/
|
||||
public class OutputBindingExample extends BaseIT {
|
||||
|
||||
private static DaprClient client;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
daprIntegrationTestingRunner =
|
||||
createDaprIntegrationTestingRunner(
|
||||
"dapr initialized. Status: Running. Init Elapsed",
|
||||
InputBindingExample.class,
|
||||
true,
|
||||
19000
|
||||
);
|
||||
daprIntegrationTestingRunner.initializeDapr();
|
||||
client = new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer()).build();
|
||||
}
|
||||
|
||||
public static class MyClass {
|
||||
public MyClass(){}
|
||||
public String message;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void BindingTest() {
|
||||
|
||||
|
||||
final String BINDING_NAME = "sample123";
|
||||
|
||||
// This is an example of sending data in a user-defined object. The input binding will receive:
|
||||
// {"message":"hello"}
|
||||
MyClass myClass = new MyClass();
|
||||
myClass.message = "hello";
|
||||
|
||||
System.out.println("sending first message");
|
||||
client.invokeBinding(BINDING_NAME, myClass).block();
|
||||
|
||||
// This is an example of sending a plain string. The input binding will receive
|
||||
// cat
|
||||
final String m = "cat";
|
||||
System.out.println("sending " + m);
|
||||
client.invokeBinding(BINDING_NAME, m).block();
|
||||
|
||||
try {
|
||||
Thread.sleep(8000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
|
||||
final List<String> messages = client.invokeService(Verb.GET, daprIntegrationTestingRunner.getAppName(), "messages", null, List.class).block();
|
||||
assertEquals(2,messages.size());
|
||||
MyClass resultClass = null;
|
||||
try {
|
||||
resultClass = new ObjectMapper().readValue(new String(Base64.getDecoder().decode(messages.get(0))), MyClass.class);
|
||||
}catch (Exception ex)
|
||||
{
|
||||
ex.printStackTrace();
|
||||
fail("Error on decode message 1");
|
||||
}
|
||||
|
||||
try {
|
||||
assertEquals("cat", new ObjectMapper().readValue(new String(Base64.getDecoder().decode(messages.get(1))), String.class));
|
||||
}catch (Exception ex){
|
||||
ex.printStackTrace();
|
||||
fail("Error on decode message 2");
|
||||
}
|
||||
assertEquals("hello",resultClass.message);
|
||||
}
|
||||
}
|
|
@ -1,51 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.services;
|
||||
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.dapr.DaprGrpc;
|
||||
import io.dapr.DaprGrpc.DaprBlockingStub;
|
||||
import io.dapr.DaprProtos.SaveStateEnvelope;
|
||||
import io.dapr.DaprProtos.StateRequest;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import org.apache.commons.cli.*;
|
||||
|
||||
|
||||
/**
|
||||
* Simple example.
|
||||
* To run manually, from repo root:
|
||||
* 1. mvn clean install
|
||||
* 2. dapr run --grpc-port 50001 -- mvn exec:java -Dexec.mainClass=io.dapr.it.services.HelloWorldGrpcStateService -Dexec.classpathScope="test" -pl=sdk
|
||||
*/
|
||||
public class HelloWorldGrpcStateService {
|
||||
|
||||
public static void main(String[] args) throws ParseException {
|
||||
String grpcPort = System.getenv("DAPR_GRPC_PORT");
|
||||
|
||||
// If port string is not valid, it will throw an exception.
|
||||
int grpcPortInt = Integer.parseInt(grpcPort);
|
||||
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", grpcPortInt).usePlaintext().build();
|
||||
DaprBlockingStub client = DaprGrpc.newBlockingStub(channel);
|
||||
|
||||
String key = "mykey";
|
||||
// First, write key-value pair.
|
||||
|
||||
String value = "Hello World";
|
||||
StateRequest req = StateRequest
|
||||
.newBuilder()
|
||||
.setKey(key)
|
||||
.setValue(Any.newBuilder().setValue(ByteString.copyFromUtf8(value)).build())
|
||||
.build();
|
||||
SaveStateEnvelope state = SaveStateEnvelope.newBuilder()
|
||||
.addRequests(req)
|
||||
.build();
|
||||
client.saveState(state);
|
||||
System.out.println("Saved!");
|
||||
channel.shutdown();
|
||||
}
|
||||
}
|
|
@ -1,41 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.services;
|
||||
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* SpringBoot Controller to handle input binding.
|
||||
*/
|
||||
@RestController
|
||||
public class InputBindingController {
|
||||
|
||||
private static final List<byte[]> messagesReceived = new ArrayList();
|
||||
|
||||
@PostMapping(path = "/sample123")
|
||||
public Mono<Void> handleInputBinding(@RequestBody(required = false) byte[] body) {
|
||||
|
||||
return Mono.fromRunnable(() -> {
|
||||
messagesReceived.add(body);
|
||||
System.out.println("Received message through binding: " + (body == null ? "" : new String(body)));
|
||||
});
|
||||
}
|
||||
|
||||
@GetMapping(path = "/messages")
|
||||
public Mono<List<byte[]>> getMessages(){
|
||||
return Mono.just( messagesReceived);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue