Support distributed lock API (#764)

* support distributed lock API

Signed-off-by: crazyhzm <crazyhzm@gmail.com>

* add Distributed Lock API Example

Signed-off-by: crazyhzm <crazyhzm@gmail.com>

* fix bash

Signed-off-by: crazyhzm <crazyhzm@gmail.com>

* fix bash

Signed-off-by: crazyhzm <crazyhzm@gmail.com>

* update dapr proto version

Signed-off-by: crazyhzm <crazyhzm@gmail.com>

* revert update dapr proto version

Signed-off-by: crazyhzm <crazyhzm@gmail.com>

* changed to json serializable

Signed-off-by: crazyhzm <crazyhzm@gmail.com>

* fix checkstyle

Signed-off-by: crazyhzm <crazyhzm@gmail.com>

* Fix compilation error due to Mono version change.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: crazyhzm <crazyhzm@gmail.com>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
huazhongming 2023-09-22 12:54:59 +08:00 committed by GitHub
parent b442ba46ad
commit 86893a0742
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1615 additions and 736 deletions

View File

@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: lockstore
spec:
type: lock.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""

View File

@ -0,0 +1,75 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.lock.grpc;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.UnlockRequest;
import io.dapr.client.domain.UnlockResponseStatus;
import reactor.core.publisher.Mono;
/**
* DistributedLockGrpcClient.
*/
public class DistributedLockGrpcClient {
private static final String LOCK_STORE_NAME = "lockstore";
/**
* Executes various methods to check the different apis.
*
* @param args arguments
* @throws Exception throws Exception
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
tryLock(client);
unlock(client);
}
}
/**
* Trying to get lock.
*
* @param client DaprPreviewClient object
*/
public static void tryLock(DaprPreviewClient client) {
System.out.println("*******trying to get a free distributed lock********");
try {
LockRequest lockRequest = new LockRequest(LOCK_STORE_NAME, "resouce1", "owner1", 5);
Mono<Boolean> result = client.tryLock(lockRequest);
System.out.println("Lock result -> " + (Boolean.TRUE.equals(result.block()) ? "SUCCESS" : "FAIL"));
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
/**
* Unlock a lock.
*
* @param client DaprPreviewClient object
*/
public static void unlock(DaprPreviewClient client) {
System.out.println("*******unlock a distributed lock********");
try {
UnlockRequest unlockRequest = new UnlockRequest(LOCK_STORE_NAME, "resouce1", "owner1");
Mono<UnlockResponseStatus> result = client.unlock(unlockRequest);
System.out.println("Unlock result ->" + result.block().name());
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
}

View File

@ -0,0 +1,87 @@
## Distributed Lock API Example
This example provides the different capabilities provided by Dapr Java SDK for Distributed Lock. For further information about Distributed Lock APIs please refer to [this link](https://docs.dapr.io/developing-applications/building-blocks/distributed-lock/)
**This API is available in Preview Mode**.
### Using the Distributed Lock API
The java SDK exposes several methods for this -
* `client.tryLock(...)` for getting a free distributed lock
* `client.unlock(...)` for unlocking a lock
## Pre-requisites
* [Dapr and Dapr Cli](https://docs.dapr.io/getting-started/install-dapr/).
* Java JDK 11 (or greater):
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
* [OpenJDK 11](https://jdk.java.net/11/)
* [Apache Maven](https://maven.apache.org/install.html) version 3.x.
### Checking out the code
Clone this repository:
```sh
git clone https://github.com/dapr/java-sdk.git
cd java-sdk
```
Then build the Maven project:
```sh
# make sure you are in the `java-sdk` directory.
mvn install
```
<!-- END_STEP -->
### Running the example
Get into the examples' directory:
```sh
cd examples
```
Use the following command to run this example-
<!-- STEP
name: Run DistributedLockGrpcClient example
expected_stdout_lines:
- "== APP == Using preview client..."
- "== APP == *******trying to get a free distributed lock********"
- "== APP == Lock result -> SUCCESS"
- "== APP == *******unlock a distributed lock********"
- "== APP == Unlock result -> SUCCESS"
background: true
sleep: 5
-->
```bash
dapr run --components-path ./components/lock --app-id lockgrpc --log-level debug -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.lock.grpc.DistributedLockGrpcClient
```
<!-- END_STEP -->
### Sample output
```
== APP == Using preview client...
== APP == *******trying to get a free distributed lock********
== APP == Lock result -> SUCCESS
== APP == *******unlock a distributed lock********
== APP == Unlock result -> SUCCESS
```
### Cleanup
To stop the app, run (or press CTRL+C):
<!-- STEP
name: Cleanup
-->
```bash
dapr stop --app-id lockgrpc
```
<!-- END_STEP -->

View File

@ -0,0 +1,75 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.lock.http;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.UnlockRequest;
import io.dapr.client.domain.UnlockResponseStatus;
import reactor.core.publisher.Mono;
/**
* DistributedLockGrpcClient.
*/
public class DistributedLockHttpClient {
private static final String LOCK_STORE_NAME = "lockstore";
/**
* Executes various methods to check the different apis.
*
* @param args arguments
* @throws Exception throws Exception
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
tryLock(client);
unlock(client);
}
}
/**
* Trying to get lock.
*
* @param client DaprPreviewClient object
*/
public static void tryLock(DaprPreviewClient client) {
System.out.println("*******trying to get a free distributed lock********");
try {
LockRequest lockRequest = new LockRequest(LOCK_STORE_NAME, "resouce1", "owner1", 5);
Mono<Boolean> result = client.tryLock(lockRequest);
System.out.println("Lock result -> " + (Boolean.TRUE.equals(result.block()) ? "SUCCESS" : "FAIL"));
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
/**
* Unlock a lock.
*
* @param client DaprPreviewClient object
*/
public static void unlock(DaprPreviewClient client) {
System.out.println("*******unlock a distributed lock********");
try {
UnlockRequest unlockRequest = new UnlockRequest(LOCK_STORE_NAME, "resouce1", "owner1");
Mono<UnlockResponseStatus> result = client.unlock(unlockRequest);
System.out.println("Unlock result ->" + result.block().name());
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
}

View File

@ -0,0 +1,87 @@
## Distributed Lock API Example
This example provides the different capabilities provided by Dapr Java SDK for Distributed Lock. For further information about Distributed Lock APIs please refer to [this link](https://docs.dapr.io/developing-applications/building-blocks/distributed-lock/)
**This API is available in Preview Mode**.
### Using the Distributed Lock API
The java SDK exposes several methods for this -
* `client.tryLock(...)` for getting a free distributed lock
* `client.unlock(...)` for unlocking a lock
## Pre-requisites
* [Dapr and Dapr Cli](https://docs.dapr.io/getting-started/install-dapr/).
* Java JDK 11 (or greater):
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
* [OpenJDK 11](https://jdk.java.net/11/)
* [Apache Maven](https://maven.apache.org/install.html) version 3.x.
### Checking out the code
Clone this repository:
```sh
git clone https://github.com/dapr/java-sdk.git
cd java-sdk
```
Then build the Maven project:
```sh
# make sure you are in the `java-sdk` directory.
mvn install
```
<!-- END_STEP -->
### Running the example
Get into the examples' directory:
```sh
cd examples
```
Use the following command to run this example-
<!-- STEP
name: Run DistributedLockHttpClient example
expected_stdout_lines:
- "== APP == Using preview client..."
- "== APP == *******trying to get a free distributed lock********"
- "== APP == Lock result -> SUCCESS"
- "== APP == *******unlock a distributed lock********"
- "== APP == Unlock result -> SUCCESS"
background: true
sleep: 5
-->
```bash
dapr run --components-path ./components/lock --app-id lockhttp --log-level debug -- java -Ddapr.api.protocol=HTTP -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.lock.http.DistributedLockHttpClient
```
<!-- END_STEP -->
### Sample output
```
== APP == Using preview client...
== APP == *******trying to get a free distributed lock********
== APP == Lock result -> SUCCESS
== APP == *******unlock a distributed lock********
== APP == Unlock result -> SUCCESS
```
### Cleanup
To stop the app, run (or press CTRL+C):
<!-- STEP
name: Cleanup
-->
```bash
dapr stop --app-id lockhttp
```
<!-- END_STEP -->

View File

@ -28,6 +28,7 @@ import io.dapr.client.domain.GetStateRequest;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
@ -37,6 +38,8 @@ import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.UnlockRequest;
import io.dapr.client.domain.UnlockResponseStatus;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.domain.query.Query;
@ -622,6 +625,25 @@ abstract class AbstractDaprClient implements DaprClient, DaprPreviewClient {
return this.unsubscribeConfiguration(request);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Boolean> tryLock(String storeName, String resourceId, String lockOwner, Integer expiryInSeconds) {
LockRequest lockRequest = new LockRequest(storeName, resourceId, lockOwner, expiryInSeconds);
return this.tryLock(lockRequest);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<UnlockResponseStatus> unlock(String storeName, String resourceId, String lockOwner) {
UnlockRequest request = new UnlockRequest(storeName, resourceId, lockOwner);
return this.unlock(request);
}
private List<String> filterEmptyKeys(String... keys) {
return Arrays.stream(keys)
.filter(key -> !key.trim().isEmpty())

View File

@ -32,6 +32,7 @@ import io.dapr.client.domain.GetStateRequest;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.QueryStateItem;
import io.dapr.client.domain.QueryStateRequest;
@ -42,6 +43,8 @@ import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.UnlockRequest;
import io.dapr.client.domain.UnlockResponseStatus;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.resiliency.ResiliencyOptions;
@ -785,6 +788,97 @@ public class DaprClientGrpc extends AbstractDaprClient {
}
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Boolean> tryLock(LockRequest request) {
try {
final String stateStoreName = request.getStoreName();
final String resourceId = request.getResourceId();
final String lockOwner = request.getLockOwner();
final Integer expiryInSeconds = request.getExpiryInSeconds();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
if (resourceId == null || resourceId.isEmpty()) {
throw new IllegalArgumentException("ResourceId cannot be null or empty.");
}
if (lockOwner == null || lockOwner.isEmpty()) {
throw new IllegalArgumentException("LockOwner cannot be null or empty.");
}
if (expiryInSeconds < 0) {
throw new IllegalArgumentException("ExpiryInSeconds cannot be negative.");
}
DaprProtos.TryLockRequest.Builder builder = DaprProtos.TryLockRequest.newBuilder()
.setStoreName(stateStoreName)
.setResourceId(resourceId)
.setLockOwner(lockOwner)
.setExpiryInSeconds(expiryInSeconds);
DaprProtos.TryLockRequest tryLockRequest = builder.build();
return Mono.deferContextual(
context -> this.<DaprProtos.TryLockResponse>createMono(
it -> intercept(context, asyncStub).tryLockAlpha1(tryLockRequest, it)
)
).flatMap(response -> {
try {
return Mono.just(response.getSuccess());
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
});
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public Mono<UnlockResponseStatus> unlock(UnlockRequest request) {
try {
final String stateStoreName = request.getStoreName();
final String resourceId = request.getResourceId();
final String lockOwner = request.getLockOwner();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
if (resourceId == null || resourceId.isEmpty()) {
throw new IllegalArgumentException("ResourceId cannot be null or empty.");
}
if (lockOwner == null || lockOwner.isEmpty()) {
throw new IllegalArgumentException("LockOwner cannot be null or empty.");
}
DaprProtos.UnlockRequest.Builder builder = DaprProtos.UnlockRequest.newBuilder()
.setStoreName(stateStoreName)
.setResourceId(resourceId)
.setLockOwner(lockOwner);
DaprProtos.UnlockRequest unlockRequest = builder.build();
return Mono.deferContextual(
context -> this.<DaprProtos.UnlockResponse>createMono(
it -> intercept(context, asyncStub).unlockAlpha1(unlockRequest, it)
)
).flatMap(response -> {
try {
return Mono.just(UnlockResponseStatus.valueOf(response.getStatus().getNumber()));
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
});
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}
/**
* {@inheritDoc}
*/

View File

@ -28,6 +28,7 @@ import io.dapr.client.domain.GetStateRequest;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.Metadata;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.QueryStateItem;
@ -40,6 +41,8 @@ import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.TransactionalStateRequest;
import io.dapr.client.domain.UnlockRequest;
import io.dapr.client.domain.UnlockResponseStatus;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.config.Properties;
@ -685,6 +688,99 @@ public class DaprClientHttp extends AbstractDaprClient {
.map(m -> (Map<String, Map<String, String>>) m);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Boolean> tryLock(LockRequest request) {
try {
final String stateStoreName = request.getStoreName();
final String resourceId = request.getResourceId();
final String lockOwner = request.getLockOwner();
final Integer expiryInSeconds = request.getExpiryInSeconds();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
if (resourceId == null || resourceId.isEmpty()) {
throw new IllegalArgumentException("ResourceId cannot be null or empty.");
}
if (lockOwner == null || lockOwner.isEmpty()) {
throw new IllegalArgumentException("LockOwner cannot be null or empty.");
}
if (expiryInSeconds < 0) {
throw new IllegalArgumentException("ExpiryInSeconds cannot be negative.");
}
byte[] requestBody = INTERNAL_SERIALIZER.serialize(request);
String[] pathSegments = new String[]{DaprHttp.ALPHA_1_API_VERSION, "lock", stateStoreName};
return Mono.deferContextual(
context -> this.client
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, null, requestBody, null, context)
).flatMap(response -> {
try {
Map m = INTERNAL_SERIALIZER.deserialize(response.getBody(), Map.class);
if (m == null) {
return Mono.just(Boolean.FALSE);
}
return Mono.just((Boolean) m.get("success"));
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
});
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public Mono<UnlockResponseStatus> unlock(UnlockRequest request) {
try {
final String stateStoreName = request.getStoreName();
final String resourceId = request.getResourceId();
final String lockOwner = request.getLockOwner();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
if (resourceId == null || resourceId.isEmpty()) {
throw new IllegalArgumentException("ResourceId cannot be null or empty.");
}
if (lockOwner == null || lockOwner.isEmpty()) {
throw new IllegalArgumentException("LockOwner cannot be null or empty.");
}
byte[] requestBody = INTERNAL_SERIALIZER.serialize(request);
String[] pathSegments = new String[]{DaprHttp.ALPHA_1_API_VERSION, "unlock", stateStoreName};
return Mono.deferContextual(
context -> this.client
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, null, requestBody, null, context)
).flatMap(response -> {
try {
Map m = INTERNAL_SERIALIZER.deserialize(response.getBody(), Map.class);
if (m == null) {
return Mono.just(UnlockResponseStatus.INTERNAL_ERROR);
}
Integer statusCode = (Integer) m.get("status");
return Mono.just(UnlockResponseStatus.valueOf(statusCode));
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
});
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}
/**
* {@inheritDoc}
*/

View File

@ -1,224 +1,259 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.query.Query;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
/**
* Generic client interface for preview or alpha APIs in Dapr, regardless of GRPC or HTTP.
*
* @see io.dapr.client.DaprClientBuilder for information on how to make instance for this interface.
*/
public interface DaprPreviewClient extends AutoCloseable {
/**
* Query for states using a query string.
*
* @param storeName Name of the state store to query.
* @param query String value of the query.
* @param metadata Optional metadata passed to the state store.
* @param clazz The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, String query,
Map<String, String> metadata, Class<T> clazz);
/**
* Query for states using a query string.
*
* @param storeName Name of the state store to query.
* @param query String value of the query.
* @param metadata Optional metadata passed to the state store.
* @param type The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, String query,
Map<String, String> metadata, TypeRef<T> type);
/**
* Query for states using a query string.
*
* @param storeName Name of the state store to query.
* @param query String value of the query.
* @param clazz The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, String query, Class<T> clazz);
/**
* Query for states using a query string.
*
* @param storeName Name of the state store to query.
* @param query String value of the query.
* @param type The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, String query, TypeRef<T> type);
/**
* Query for states using a query domain object.
*
* @param storeName Name of the state store to query.
* @param query Query value domain object.
* @param metadata Optional metadata passed to the state store.
* @param clazz The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, Query query,
Map<String, String> metadata, Class<T> clazz);
/**
* Query for states using a query domain object.
*
* @param storeName Name of the state store to query.
* @param query Query value domain object.
* @param metadata Optional metadata passed to the state store.
* @param type The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, Query query,
Map<String, String> metadata, TypeRef<T> type);
/**
* Query for states using a query domain object.
*
* @param storeName Name of the state store to query.
* @param query Query value domain object.
* @param clazz The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, Query query, Class<T> clazz);
/**
* Query for states using a query domain object.
*
* @param storeName Name of the state store to query.
* @param query Query value domain object.
* @param type The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, Query query, TypeRef<T> type);
/**
* Query for states using a query request.
*
* @param request Query request object.
* @param clazz The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Class<T> clazz);
/**
* Query for states using a query request.
*
* @param request Query request object.
* @param type The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, TypeRef<T> type);
/**
* Publish multiple events to Dapr in a single request.
*
* @param request {@link BulkPublishRequest} object.
* @return A Mono of {@link BulkPublishResponse} object.
* @param <T> The type of events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> request);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the {@link List} of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
List<T> events);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the varargs of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
T... events);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the {@link List} of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @param requestMetadata the metadata to be set at the request level for the {@link BulkPublishRequest}.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
Map<String,String> requestMetadata, List<T> events);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the varargs of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @param requestMetadata the metadata to be set at the request level for the {@link BulkPublishRequest}.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
Map<String,String> requestMetadata, T... events);
}
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.UnlockRequest;
import io.dapr.client.domain.UnlockResponseStatus;
import io.dapr.client.domain.query.Query;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
/**
* Generic client interface for preview or alpha APIs in Dapr, regardless of GRPC or HTTP.
*
* @see io.dapr.client.DaprClientBuilder for information on how to make instance for this interface.
*/
public interface DaprPreviewClient extends AutoCloseable {
/**
* Query for states using a query string.
*
* @param storeName Name of the state store to query.
* @param query String value of the query.
* @param metadata Optional metadata passed to the state store.
* @param clazz The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, String query,
Map<String, String> metadata, Class<T> clazz);
/**
* Query for states using a query string.
*
* @param storeName Name of the state store to query.
* @param query String value of the query.
* @param metadata Optional metadata passed to the state store.
* @param type The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, String query,
Map<String, String> metadata, TypeRef<T> type);
/**
* Query for states using a query string.
*
* @param storeName Name of the state store to query.
* @param query String value of the query.
* @param clazz The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, String query, Class<T> clazz);
/**
* Query for states using a query string.
*
* @param storeName Name of the state store to query.
* @param query String value of the query.
* @param type The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, String query, TypeRef<T> type);
/**
* Query for states using a query domain object.
*
* @param storeName Name of the state store to query.
* @param query Query value domain object.
* @param metadata Optional metadata passed to the state store.
* @param clazz The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, Query query,
Map<String, String> metadata, Class<T> clazz);
/**
* Query for states using a query domain object.
*
* @param storeName Name of the state store to query.
* @param query Query value domain object.
* @param metadata Optional metadata passed to the state store.
* @param type The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, Query query,
Map<String, String> metadata, TypeRef<T> type);
/**
* Query for states using a query domain object.
*
* @param storeName Name of the state store to query.
* @param query Query value domain object.
* @param clazz The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, Query query, Class<T> clazz);
/**
* Query for states using a query domain object.
*
* @param storeName Name of the state store to query.
* @param query Query value domain object.
* @param type The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(String storeName, Query query, TypeRef<T> type);
/**
* Query for states using a query request.
*
* @param request Query request object.
* @param clazz The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Class<T> clazz);
/**
* Query for states using a query request.
*
* @param request Query request object.
* @param type The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, TypeRef<T> type);
/**
* Publish multiple events to Dapr in a single request.
*
* @param request {@link BulkPublishRequest} object.
* @return A Mono of {@link BulkPublishResponse} object.
* @param <T> The type of events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> request);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the {@link List} of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
List<T> events);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the varargs of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
T... events);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the {@link List} of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @param requestMetadata the metadata to be set at the request level for the {@link BulkPublishRequest}.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
Map<String,String> requestMetadata, List<T> events);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the varargs of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @param requestMetadata the metadata to be set at the request level for the {@link BulkPublishRequest}.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
Map<String,String> requestMetadata, T... events);
/**
* Tries to get a lock with an expiry.
* @param storeName Name of the store
* @param resourceId Lock key
* @param lockOwner The identifier of lock owner
* @param expiryInSeconds The time before expiry
* @return Whether the lock is successful
*/
Mono<Boolean> tryLock(String storeName, String resourceId, String lockOwner, Integer expiryInSeconds);
/**
* Tries to get a lock with an expiry.
* @param request The request to lock
* @return Whether the lock is successful
*/
Mono<Boolean> tryLock(LockRequest request);
/**
* Unlocks a lock.
* @param storeName Name of the store
* @param resourceId Lock key
* @param lockOwner The identifier of lock owner
* @return Unlock result
*/
Mono<UnlockResponseStatus> unlock(String storeName, String resourceId, String lockOwner);
/**
* Unlocks a lock.
* @param request The request to unlock
* @return Unlock result
*/
Mono<UnlockResponseStatus> unlock(UnlockRequest request);
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.Serializable;
/**
* A request to lock.
*/
public class LockRequest implements Serializable {
/**
* The lock store name,e.g. `redis`.
*/
@JsonIgnore
private final String storeName;
/**
* Required. resourceId is the lock key. e.g. `order_id_111`
* It stands for "which resource I want to protect"
*/
private final String resourceId;
/**
* lockOwner indicate the identifier of lock owner.
* You can generate a uuid as lock_owner.For example,in golang:
* req.LockOwner = uuid.New().String()
* This field is per request,not per process,so it is different for each request,
* which aims to prevent multi-thread in the same process trying the same lock concurrently.
* The reason why we don't make it automatically generated is:
* 1. If it is automatically generated,there must be a 'my_lock_owner_id' field in the response.
* This name is so weird that we think it is inappropriate to put it into the api spec
* 2. If we change the field 'my_lock_owner_id' in the response to 'lock_owner',
* which means the current lock owner of this lock,
* we find that in some lock services users can't get the current lock owner.
* Actually users don't need it at all.
* 3. When reentrant lock is needed,the existing lock_owner is required to identify client
* and check "whether this client can reenter this lock".
* So this field in the request shouldn't be removed.
*/
private final String lockOwner;
/**
* The time before expiry.The time unit is second.
*/
private final Integer expiryInSeconds;
/**
* Constructor of LockRequest.
* @param storeName Name of the store
* @param resourceId Lock key
* @param lockOwner The identifier of lock owner
* @param expiryInSeconds The time before expiry
*/
public LockRequest(String storeName, String resourceId, String lockOwner, Integer expiryInSeconds) {
this.storeName = storeName;
this.resourceId = resourceId;
this.lockOwner = lockOwner;
this.expiryInSeconds = expiryInSeconds;
}
public String getStoreName() {
return storeName;
}
public String getResourceId() {
return resourceId;
}
public String getLockOwner() {
return lockOwner;
}
public Integer getExpiryInSeconds() {
return expiryInSeconds;
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.Serializable;
/**
* A request to unlock.
*/
public class UnlockRequest implements Serializable {
/**
* The lock store name,e.g. `redis`.
*/
@JsonIgnore
private final String storeName;
/**
* Required. resourceId is the lock key. e.g. `order_id_111`
* It stands for "which resource I want to protect"
*/
private final String resourceId;
/**
* lockOwner indicate the identifier of lock owner.
* You can generate a uuid as lock_owner.For example,in golang:
* req.LockOwner = uuid.New().String()
* This field is per request,not per process,so it is different for each request,
* which aims to prevent multi-thread in the same process trying the same lock concurrently.
* The reason why we don't make it automatically generated is:
* 1. If it is automatically generated,there must be a 'my_lock_owner_id' field in the response.
* This name is so weird that we think it is inappropriate to put it into the api spec
* 2. If we change the field 'my_lock_owner_id' in the response to 'lock_owner',
* which means the current lock owner of this lock,
* we find that in some lock services users can't get the current lock owner.
* Actually users don't need it at all.
* 3. When reentrant lock is needed,the existing lock_owner is required to identify client
* and check "whether this client can reenter this lock".
* So this field in the request shouldn't be removed.
*/
private final String lockOwner;
/**
* Constructor for UnlockRequest.
* @param storeName Name of the store
* @param resourceId Lock key
* @param lockOwner The identifier of lock owner.
*/
public UnlockRequest(String storeName, String resourceId, String lockOwner) {
this.storeName = storeName;
this.resourceId = resourceId;
this.lockOwner = lockOwner;
}
public String getStoreName() {
return storeName;
}
public String getResourceId() {
return resourceId;
}
public String getLockOwner() {
return lockOwner;
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
public enum UnlockResponseStatus {
/**
* The unlock operation succeeded.
*/
SUCCESS(0),
/**
* The target you want to unlock does not exist.
*/
LOCK_UNEXIST(1),
/**
* The desired target does not match.
*/
LOCK_BELONG_TO_OTHERS(2),
/**
* Internal error.
*/
INTERNAL_ERROR(3);
private final Integer code;
UnlockResponseStatus(Integer code) {
this.code = code;
}
public Integer getCode() {
return code;
}
/**
* Convert the status code to a UnlockResponseStatus object.
* @param code status code
* @return UnlockResponseStatus
*/
public static UnlockResponseStatus valueOf(int code) {
for (UnlockResponseStatus status : UnlockResponseStatus.values()) {
if (status.getCode().equals(code)) {
return status;
}
}
return INTERNAL_ERROR;
}
}

View File

@ -29,7 +29,6 @@ import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef;
@ -49,10 +48,7 @@ import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@ -66,23 +62,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
import static io.dapr.utils.TestUtils.findFreePort;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
public class DaprClientGrpcTest {
@ -2392,6 +2375,7 @@ public class DaprClientGrpcTest {
result.block();
}
private <T> DaprProtos.GetStateResponse buildFutureGetStateEnvelop(T value, String etag) throws IOException {
return buildGetStateResponse(value, etag);
}

View File

@ -1,392 +1,443 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.QueryStateItem;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.query.Query;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class DaprPreviewClientGrpcTest {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String QUERY_STORE_NAME = "testQueryStore";
private static final String PUBSUB_NAME = "testPubsub";
private static final String TOPIC_NAME = "testTopic";
private GrpcChannelFacade channel;
private DaprGrpc.DaprStub daprStub;
private DaprPreviewClient previewClient;
@Before
public void setup() throws IOException {
channel = mock(GrpcChannelFacade.class);
daprStub = mock(DaprGrpc.DaprStub.class);
when(daprStub.withInterceptors(any())).thenReturn(daprStub);
previewClient = new DaprClientGrpc(
channel, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
doNothing().when(channel).close();
}
@After
public void tearDown() throws Exception {
previewClient.close();
verify(channel).close();
verifyNoMoreInteractions(channel);
}
@Test
public void publishEventsExceptionThrownTest() {
doAnswer((Answer<Void>) invocation -> {
throw newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument");
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
assertThrowsDaprException(
StatusRuntimeException.class,
"INVALID_ARGUMENT",
"INVALID_ARGUMENT: bad bad argument",
() -> previewClient.publishEvents(new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.EMPTY_LIST)).block());
}
@Test
public void publishEventsCallbackExceptionThrownTest() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onError(newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument"));
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
assertThrowsDaprException(
ExecutionException.class,
"INVALID_ARGUMENT",
"INVALID_ARGUMENT: bad bad argument",
() -> previewClient.publishEvents(new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.EMPTY_LIST)).block());
}
@Test(expected = IllegalArgumentException.class)
public void publishEventsContentTypeMismatchException() throws IOException {
DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class);
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
BulkPublishEntry<String> entry = new BulkPublishEntry<>("1", "testEntry"
, "application/octet-stream", null);
BulkPublishRequest<String> wrongReq = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
previewClient.publishEvents(wrongReq).block();
}
@Test
public void publishEventsSerializeException() throws IOException {
DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class);
previewClient = new DaprClientGrpc(channel, daprStub, mockSerializer, new DefaultObjectSerializer());
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any());
BulkPublishEntry<Map<String, String>> entry = new BulkPublishEntry<>("1", new HashMap<>(),
"application/json", null);
BulkPublishRequest<Map<String, String>> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
when(mockSerializer.serialize(any())).thenThrow(IOException.class);
Mono<BulkPublishResponse<Map<String, String>>> result = previewClient.publishEvents(req);
assertThrowsDaprException(
IOException.class,
"UNKNOWN",
"UNKNOWN: ",
() -> result.block());
}
@Test
public void publishEventsTest() {
doAnswer((Answer<BulkPublishResponse>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder();
observer.onNext(builder.build());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
BulkPublishEntry<String> entry = new BulkPublishEntry<>("1", "test",
"text/plain", null);
BulkPublishRequest<String> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
Mono<BulkPublishResponse<String>> result = previewClient.publishEvents(req);
BulkPublishResponse res = result.block();
Assert.assertNotNull(res);
assertEquals("expected no entry in failed entries list", 0, res.getFailedEntries().size());
}
@Test
public void publishEventsWithoutMetaTest() {
doAnswer((Answer<BulkPublishResponse>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder();
observer.onNext(builder.build());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
Mono<BulkPublishResponse<String>> result = previewClient.publishEvents(PUBSUB_NAME, TOPIC_NAME,
"text/plain", Collections.singletonList("test"));
BulkPublishResponse<String> res = result.block();
Assert.assertNotNull(res);
assertEquals("expected no entries in failed entries list", 0, res.getFailedEntries().size());
}
@Test
public void publishEventsWithRequestMetaTest() {
doAnswer((Answer<BulkPublishResponse>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder();
observer.onNext(builder.build());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
Mono<BulkPublishResponse<String>> result = previewClient.publishEvents(PUBSUB_NAME, TOPIC_NAME,
"text/plain", new HashMap<String, String>(){{
put("ttlInSeconds", "123");
}}, Collections.singletonList("test"));
BulkPublishResponse<String> res = result.block();
Assert.assertNotNull(res);
assertEquals("expected no entry in failed entries list", 0, res.getFailedEntries().size());
}
@Test
public void publishEventsObjectTest() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(ArgumentMatchers.argThat(bulkPublishRequest -> {
DaprProtos.BulkPublishRequestEntry entry = bulkPublishRequest.getEntries(0);
if (!"application/json".equals(bulkPublishRequest.getEntries(0).getContentType())) {
return false;
}
if (!"{\"id\":1,\"value\":\"Event\"}".equals(new String(entry.getEvent().toByteArray())) &&
!"{\"value\":\"Event\",\"id\":1}".equals(new String(entry.getEvent().toByteArray()))) {
return false;
}
return true;
}), any());
DaprClientGrpcTest.MyObject event = new DaprClientGrpcTest.MyObject(1, "Event");
BulkPublishEntry<DaprClientGrpcTest.MyObject> entry = new BulkPublishEntry<>("1", event,
"application/json", null);
BulkPublishRequest<DaprClientGrpcTest.MyObject> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
BulkPublishResponse<DaprClientGrpcTest.MyObject> result = previewClient.publishEvents(req).block();
Assert.assertNotNull(result);
Assert.assertEquals("expected no entries to be failed", 0, result.getFailedEntries().size());
}
@Test
public void publishEventsContentTypeOverrideTest() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(ArgumentMatchers.argThat(bulkPublishRequest -> {
DaprProtos.BulkPublishRequestEntry entry = bulkPublishRequest.getEntries(0);
if (!"application/json".equals(entry.getContentType())) {
return false;
}
if (!"\"hello\"".equals(new String(entry.getEvent().toByteArray()))) {
return false;
}
return true;
}), any());
BulkPublishEntry<String> entry = new BulkPublishEntry<>("1", "hello",
"", null);
BulkPublishRequest<String> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
BulkPublishResponse<String> result = previewClient.publishEvents(req).block();
Assert.assertNotNull(result);
Assert.assertEquals("expected no entries to be failed", 0, result.getFailedEntries().size());
}
@Test
public void queryStateExceptionsTest() {
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState("", "query", String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState("storeName", "", String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState("storeName", (Query) null, String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState("storeName", (String) null, String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState(new QueryStateRequest("storeName"), String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState(null, String.class).block();
});
}
@Test
public void queryState() throws JsonProcessingException {
List<QueryStateItem<?>> resp = new ArrayList<>();
resp.add(new QueryStateItem<Object>("1", (Object)"testData", "6f54ad94-dfb9-46f0-a371-e42d550adb7d"));
DaprProtos.QueryStateResponse responseEnvelope = buildQueryStateResponse(resp, "");
doAnswer((Answer<Void>) invocation -> {
DaprProtos.QueryStateRequest req = invocation.getArgument(0);
assertEquals(QUERY_STORE_NAME, req.getStoreName());
assertEquals("query", req.getQuery());
assertEquals(0, req.getMetadataCount());
StreamObserver<DaprProtos.QueryStateResponse> observer = (StreamObserver<DaprProtos.QueryStateResponse>)
invocation.getArguments()[1];
observer.onNext(responseEnvelope);
observer.onCompleted();
return null;
}).when(daprStub).queryStateAlpha1(any(DaprProtos.QueryStateRequest.class), any());
QueryStateResponse<String> response = previewClient.queryState(QUERY_STORE_NAME, "query", String.class).block();
assertNotNull(response);
assertEquals("result size must be 1", 1, response.getResults().size());
assertEquals("result must be same", "1", response.getResults().get(0).getKey());
assertEquals("result must be same", "testData", response.getResults().get(0).getValue());
assertEquals("result must be same", "6f54ad94-dfb9-46f0-a371-e42d550adb7d", response.getResults().get(0).getEtag());
}
@Test
public void queryStateMetadataError() throws JsonProcessingException {
List<QueryStateItem<?>> resp = new ArrayList<>();
resp.add(new QueryStateItem<Object>("1", null, "error data"));
DaprProtos.QueryStateResponse responseEnvelope = buildQueryStateResponse(resp, "");
doAnswer((Answer<Void>) invocation -> {
DaprProtos.QueryStateRequest req = invocation.getArgument(0);
assertEquals(QUERY_STORE_NAME, req.getStoreName());
assertEquals("query", req.getQuery());
assertEquals(1, req.getMetadataCount());
assertEquals(1, req.getMetadataCount());
StreamObserver<DaprProtos.QueryStateResponse> observer = (StreamObserver<DaprProtos.QueryStateResponse>)
invocation.getArguments()[1];
observer.onNext(responseEnvelope);
observer.onCompleted();
return null;
}).when(daprStub).queryStateAlpha1(any(DaprProtos.QueryStateRequest.class), any());
QueryStateResponse<String> response = previewClient.queryState(QUERY_STORE_NAME, "query",
new HashMap<String, String>(){{ put("key", "error"); }}, String.class).block();
assertNotNull(response);
assertEquals("result size must be 1", 1, response.getResults().size());
assertEquals("result must be same", "1", response.getResults().get(0).getKey());
assertEquals("result must be same", "error data", response.getResults().get(0).getError());
}
private DaprProtos.QueryStateResponse buildQueryStateResponse(List<QueryStateItem<?>> resp,String token)
throws JsonProcessingException {
List<DaprProtos.QueryStateItem> items = new ArrayList<>();
for (QueryStateItem<?> item: resp) {
items.add(buildQueryStateItem(item));
}
return DaprProtos.QueryStateResponse.newBuilder()
.addAllResults(items)
.setToken(token)
.build();
}
private DaprProtos.QueryStateItem buildQueryStateItem(QueryStateItem<?> item) throws JsonProcessingException {
DaprProtos.QueryStateItem.Builder it = DaprProtos.QueryStateItem.newBuilder().setKey(item.getKey());
if (item.getValue() != null) {
it.setData(ByteString.copyFrom(MAPPER.writeValueAsBytes(item.getValue())));
}
if (item.getEtag() != null) {
it.setEtag(item.getEtag());
}
if (item.getError() != null) {
it.setError(item.getError());
}
return it.build();
}
private static StatusRuntimeException newStatusRuntimeException(String status, String message) {
return new StatusRuntimeException(Status.fromCode(Status.Code.valueOf(status)).withDescription(message));
}
}
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.QueryStateItem;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.UnlockResponseStatus;
import io.dapr.client.domain.query.Query;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class DaprPreviewClientGrpcTest {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String QUERY_STORE_NAME = "testQueryStore";
private static final String PUBSUB_NAME = "testPubsub";
private static final String TOPIC_NAME = "testTopic";
private static final String LOCK_STORE_NAME = "MyLockStore";
private GrpcChannelFacade channel;
private DaprGrpc.DaprStub daprStub;
private DaprPreviewClient previewClient;
@Before
public void setup() throws IOException {
channel = mock(GrpcChannelFacade.class);
daprStub = mock(DaprGrpc.DaprStub.class);
when(daprStub.withInterceptors(any())).thenReturn(daprStub);
previewClient = new DaprClientGrpc(
channel, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
doNothing().when(channel).close();
}
@After
public void tearDown() throws Exception {
previewClient.close();
verify(channel).close();
verifyNoMoreInteractions(channel);
}
@Test
public void publishEventsExceptionThrownTest() {
doAnswer((Answer<Void>) invocation -> {
throw newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument");
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
assertThrowsDaprException(
StatusRuntimeException.class,
"INVALID_ARGUMENT",
"INVALID_ARGUMENT: bad bad argument",
() -> previewClient.publishEvents(new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.EMPTY_LIST)).block());
}
@Test
public void publishEventsCallbackExceptionThrownTest() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onError(newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument"));
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
assertThrowsDaprException(
ExecutionException.class,
"INVALID_ARGUMENT",
"INVALID_ARGUMENT: bad bad argument",
() -> previewClient.publishEvents(new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.EMPTY_LIST)).block());
}
@Test(expected = IllegalArgumentException.class)
public void publishEventsContentTypeMismatchException() throws IOException {
DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class);
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
BulkPublishEntry<String> entry = new BulkPublishEntry<>("1", "testEntry"
, "application/octet-stream", null);
BulkPublishRequest<String> wrongReq = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
previewClient.publishEvents(wrongReq).block();
}
@Test
public void publishEventsSerializeException() throws IOException {
DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class);
previewClient = new DaprClientGrpc(channel, daprStub, mockSerializer, new DefaultObjectSerializer());
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any());
BulkPublishEntry<Map<String, String>> entry = new BulkPublishEntry<>("1", new HashMap<>(),
"application/json", null);
BulkPublishRequest<Map<String, String>> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
when(mockSerializer.serialize(any())).thenThrow(IOException.class);
Mono<BulkPublishResponse<Map<String, String>>> result = previewClient.publishEvents(req);
assertThrowsDaprException(
IOException.class,
"UNKNOWN",
"UNKNOWN: ",
() -> result.block());
}
@Test
public void publishEventsTest() {
doAnswer((Answer<BulkPublishResponse>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder();
observer.onNext(builder.build());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
BulkPublishEntry<String> entry = new BulkPublishEntry<>("1", "test",
"text/plain", null);
BulkPublishRequest<String> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
Mono<BulkPublishResponse<String>> result = previewClient.publishEvents(req);
BulkPublishResponse res = result.block();
Assert.assertNotNull(res);
assertEquals("expected no entry in failed entries list", 0, res.getFailedEntries().size());
}
@Test
public void publishEventsWithoutMetaTest() {
doAnswer((Answer<BulkPublishResponse>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder();
observer.onNext(builder.build());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
Mono<BulkPublishResponse<String>> result = previewClient.publishEvents(PUBSUB_NAME, TOPIC_NAME,
"text/plain", Collections.singletonList("test"));
BulkPublishResponse<String> res = result.block();
Assert.assertNotNull(res);
assertEquals("expected no entries in failed entries list", 0, res.getFailedEntries().size());
}
@Test
public void publishEventsWithRequestMetaTest() {
doAnswer((Answer<BulkPublishResponse>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder();
observer.onNext(builder.build());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
Mono<BulkPublishResponse<String>> result = previewClient.publishEvents(PUBSUB_NAME, TOPIC_NAME,
"text/plain", new HashMap<String, String>(){{
put("ttlInSeconds", "123");
}}, Collections.singletonList("test"));
BulkPublishResponse<String> res = result.block();
Assert.assertNotNull(res);
assertEquals("expected no entry in failed entries list", 0, res.getFailedEntries().size());
}
@Test
public void publishEventsObjectTest() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(ArgumentMatchers.argThat(bulkPublishRequest -> {
DaprProtos.BulkPublishRequestEntry entry = bulkPublishRequest.getEntries(0);
if (!"application/json".equals(bulkPublishRequest.getEntries(0).getContentType())) {
return false;
}
if (!"{\"id\":1,\"value\":\"Event\"}".equals(new String(entry.getEvent().toByteArray())) &&
!"{\"value\":\"Event\",\"id\":1}".equals(new String(entry.getEvent().toByteArray()))) {
return false;
}
return true;
}), any());
DaprClientGrpcTest.MyObject event = new DaprClientGrpcTest.MyObject(1, "Event");
BulkPublishEntry<DaprClientGrpcTest.MyObject> entry = new BulkPublishEntry<>("1", event,
"application/json", null);
BulkPublishRequest<DaprClientGrpcTest.MyObject> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
BulkPublishResponse<DaprClientGrpcTest.MyObject> result = previewClient.publishEvents(req).block();
Assert.assertNotNull(result);
Assert.assertEquals("expected no entries to be failed", 0, result.getFailedEntries().size());
}
@Test
public void publishEventsContentTypeOverrideTest() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(ArgumentMatchers.argThat(bulkPublishRequest -> {
DaprProtos.BulkPublishRequestEntry entry = bulkPublishRequest.getEntries(0);
if (!"application/json".equals(entry.getContentType())) {
return false;
}
if (!"\"hello\"".equals(new String(entry.getEvent().toByteArray()))) {
return false;
}
return true;
}), any());
BulkPublishEntry<String> entry = new BulkPublishEntry<>("1", "hello",
"", null);
BulkPublishRequest<String> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
BulkPublishResponse<String> result = previewClient.publishEvents(req).block();
Assert.assertNotNull(result);
Assert.assertEquals("expected no entries to be failed", 0, result.getFailedEntries().size());
}
@Test
public void queryStateExceptionsTest() {
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState("", "query", String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState("storeName", "", String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState("storeName", (Query) null, String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState("storeName", (String) null, String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState(new QueryStateRequest("storeName"), String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.queryState(null, String.class).block();
});
}
@Test
public void queryState() throws JsonProcessingException {
List<QueryStateItem<?>> resp = new ArrayList<>();
resp.add(new QueryStateItem<Object>("1", (Object)"testData", "6f54ad94-dfb9-46f0-a371-e42d550adb7d"));
DaprProtos.QueryStateResponse responseEnvelope = buildQueryStateResponse(resp, "");
doAnswer((Answer<Void>) invocation -> {
DaprProtos.QueryStateRequest req = invocation.getArgument(0);
assertEquals(QUERY_STORE_NAME, req.getStoreName());
assertEquals("query", req.getQuery());
assertEquals(0, req.getMetadataCount());
StreamObserver<DaprProtos.QueryStateResponse> observer = (StreamObserver<DaprProtos.QueryStateResponse>)
invocation.getArguments()[1];
observer.onNext(responseEnvelope);
observer.onCompleted();
return null;
}).when(daprStub).queryStateAlpha1(any(DaprProtos.QueryStateRequest.class), any());
QueryStateResponse<String> response = previewClient.queryState(QUERY_STORE_NAME, "query", String.class).block();
assertNotNull(response);
assertEquals("result size must be 1", 1, response.getResults().size());
assertEquals("result must be same", "1", response.getResults().get(0).getKey());
assertEquals("result must be same", "testData", response.getResults().get(0).getValue());
assertEquals("result must be same", "6f54ad94-dfb9-46f0-a371-e42d550adb7d", response.getResults().get(0).getEtag());
}
@Test
public void queryStateMetadataError() throws JsonProcessingException {
List<QueryStateItem<?>> resp = new ArrayList<>();
resp.add(new QueryStateItem<Object>("1", null, "error data"));
DaprProtos.QueryStateResponse responseEnvelope = buildQueryStateResponse(resp, "");
doAnswer((Answer<Void>) invocation -> {
DaprProtos.QueryStateRequest req = invocation.getArgument(0);
assertEquals(QUERY_STORE_NAME, req.getStoreName());
assertEquals("query", req.getQuery());
assertEquals(1, req.getMetadataCount());
assertEquals(1, req.getMetadataCount());
StreamObserver<DaprProtos.QueryStateResponse> observer = (StreamObserver<DaprProtos.QueryStateResponse>)
invocation.getArguments()[1];
observer.onNext(responseEnvelope);
observer.onCompleted();
return null;
}).when(daprStub).queryStateAlpha1(any(DaprProtos.QueryStateRequest.class), any());
QueryStateResponse<String> response = previewClient.queryState(QUERY_STORE_NAME, "query",
new HashMap<String, String>(){{ put("key", "error"); }}, String.class).block();
assertNotNull(response);
assertEquals("result size must be 1", 1, response.getResults().size());
assertEquals("result must be same", "1", response.getResults().get(0).getKey());
assertEquals("result must be same", "error data", response.getResults().get(0).getError());
}
@Test
public void tryLock() {
DaprProtos.TryLockResponse.Builder builder = DaprProtos.TryLockResponse.newBuilder()
.setSuccess(true);
DaprProtos.TryLockResponse response = builder.build();
doAnswer((Answer<Void>) invocation -> {
DaprProtos.TryLockRequest req = invocation.getArgument(0);
assertEquals(LOCK_STORE_NAME, req.getStoreName());
assertEquals("1", req.getResourceId());
assertEquals("owner", req.getLockOwner());
assertEquals(10, req.getExpiryInSeconds());
StreamObserver<DaprProtos.TryLockResponse> observer =
(StreamObserver<DaprProtos.TryLockResponse>) invocation.getArguments()[1];
observer.onNext(response);
observer.onCompleted();
return null;
}).when(daprStub).tryLockAlpha1(any(DaprProtos.TryLockRequest.class), any());
Boolean result = previewClient.tryLock("MyLockStore", "1", "owner", 10).block();
assertEquals(Boolean.TRUE, result);
}
@Test
public void unLock() {
DaprProtos.UnlockResponse.Builder builder = DaprProtos.UnlockResponse.newBuilder()
.setStatus(DaprProtos.UnlockResponse.Status.SUCCESS);
DaprProtos.UnlockResponse response = builder.build();
doAnswer((Answer<Void>) invocation -> {
DaprProtos.UnlockRequest req = invocation.getArgument(0);
assertEquals(LOCK_STORE_NAME, req.getStoreName());
assertEquals("1", req.getResourceId());
assertEquals("owner", req.getLockOwner());
StreamObserver<DaprProtos.UnlockResponse> observer =
(StreamObserver<DaprProtos.UnlockResponse>) invocation.getArguments()[1];
observer.onNext(response);
observer.onCompleted();
return null;
}).when(daprStub).unlockAlpha1(any(DaprProtos.UnlockRequest.class), any());
UnlockResponseStatus result = previewClient.unlock("MyLockStore", "1", "owner").block();
assertEquals(UnlockResponseStatus.SUCCESS, result);
}
private DaprProtos.QueryStateResponse buildQueryStateResponse(List<QueryStateItem<?>> resp,String token)
throws JsonProcessingException {
List<DaprProtos.QueryStateItem> items = new ArrayList<>();
for (QueryStateItem<?> item: resp) {
items.add(buildQueryStateItem(item));
}
return DaprProtos.QueryStateResponse.newBuilder()
.addAllResults(items)
.setToken(token)
.build();
}
private DaprProtos.QueryStateItem buildQueryStateItem(QueryStateItem<?> item) throws JsonProcessingException {
DaprProtos.QueryStateItem.Builder it = DaprProtos.QueryStateItem.newBuilder().setKey(item.getKey());
if (item.getValue() != null) {
it.setData(ByteString.copyFrom(MAPPER.writeValueAsBytes(item.getValue())));
}
if (item.getEtag() != null) {
it.setEtag(item.getEtag());
}
if (item.getError() != null) {
it.setError(item.getError());
}
return it.build();
}
private static StatusRuntimeException newStatusRuntimeException(String status, String message) {
return new StatusRuntimeException(Status.fromCode(Status.Code.valueOf(status)).withDescription(message));
}
}

View File

@ -1,101 +1,131 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.query.Query;
import io.dapr.config.Properties;
import io.dapr.utils.TypeRef;
import okhttp3.OkHttpClient;
import okhttp3.mock.Behavior;
import okhttp3.mock.MockInterceptor;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class DaprPreviewClientHttpTest {
private DaprPreviewClient daprPreviewClientHttp;
private DaprHttp daprHttp;
private OkHttpClient okHttpClient;
private MockInterceptor mockInterceptor;
@Before
public void setUp() {
mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
daprPreviewClientHttp = new DaprClientHttp(daprHttp);
}
@Test
public void queryStateExceptionsTest() {
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("", "query", TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("", "query", String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", "", TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", "", String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", (Query) null, TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", (Query) null, String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", (String) null, TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", (String) null, String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState(null, TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState(new QueryStateRequest("storeName"), TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState(null, String.class).block();
});
}
@Test
public void queryStateTest() {
mockInterceptor.addRule()
.post()
.path("/v1.0-alpha1/state/testStore/query")
.respond("{\"results\": [{\"key\": \"1\",\"data\": \"testData\","
+ "\"etag\": \"6f54ad94-dfb9-46f0-a371-e42d550adb7d\"}]}");
QueryStateResponse<String> response = daprPreviewClientHttp.queryState("testStore", "query", String.class).block();
assertNotNull(response);
assertEquals("result size must be 1", 1, response.getResults().size());
assertEquals("result must be same", "1", response.getResults().get(0).getKey());
assertEquals("result must be same", "testData", response.getResults().get(0).getValue());
assertEquals("result must be same", "6f54ad94-dfb9-46f0-a371-e42d550adb7d", response.getResults().get(0).getEtag());
}
}
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.UnlockRequest;
import io.dapr.client.domain.UnlockResponseStatus;
import io.dapr.client.domain.query.Query;
import io.dapr.config.Properties;
import io.dapr.utils.TypeRef;
import okhttp3.OkHttpClient;
import okhttp3.mock.Behavior;
import okhttp3.mock.MockInterceptor;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class DaprPreviewClientHttpTest {
private static final String LOCK_STORE_NAME = "MyLockStore";
private DaprPreviewClient daprPreviewClientHttp;
private DaprHttp daprHttp;
private OkHttpClient okHttpClient;
private MockInterceptor mockInterceptor;
@Before
public void setUp() {
mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
daprPreviewClientHttp = new DaprClientHttp(daprHttp);
}
@Test
public void queryStateExceptionsTest() {
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("", "query", TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("", "query", String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", "", TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", "", String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", (Query) null, TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", (Query) null, String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", (String) null, TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState("storeName", (String) null, String.class).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState(null, TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState(new QueryStateRequest("storeName"), TypeRef.BOOLEAN).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.queryState(null, String.class).block();
});
}
@Test
public void queryStateTest() {
mockInterceptor.addRule()
.post()
.path("/v1.0-alpha1/state/testStore/query")
.respond("{\"results\": [{\"key\": \"1\",\"data\": \"testData\","
+ "\"etag\": \"6f54ad94-dfb9-46f0-a371-e42d550adb7d\"}]}");
QueryStateResponse<String> response = daprPreviewClientHttp.queryState("testStore", "query", String.class).block();
assertNotNull(response);
assertEquals("result size must be 1", 1, response.getResults().size());
assertEquals("result must be same", "1", response.getResults().get(0).getKey());
assertEquals("result must be same", "testData", response.getResults().get(0).getValue());
assertEquals("result must be same", "6f54ad94-dfb9-46f0-a371-e42d550adb7d", response.getResults().get(0).getEtag());
}
@Test
public void tryLock() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0-alpha1/lock/MyLockStore")
.respond("{ \"success\": true}");
LockRequest lockRequest = new LockRequest(LOCK_STORE_NAME,"1","owner",10);
Mono<Boolean> mono = daprPreviewClientHttp.tryLock(lockRequest);
assertEquals(Boolean.TRUE, mono.block());
}
@Test
public void unLock() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0-alpha1/unlock/MyLockStore")
.respond("{ \"status\": 0}");
UnlockRequest unLockRequest = new UnlockRequest(LOCK_STORE_NAME,"1","owner");
Mono<UnlockResponseStatus> mono = daprPreviewClientHttp.unlock(unLockRequest);
assertEquals(UnlockResponseStatus.SUCCESS, mono.block());
}
}