mirror of https://github.com/dapr/java-sdk.git
Addressing comments regarding where to build the query parameters forthe HTTP Client calls. (#84)
* Addressing comments regarding where to build the query parameters for the HTTP Client calls. * Making map immutable while also preventing a NPE
This commit is contained in:
parent
9703dabaae
commit
c56b08913b
|
@ -70,7 +70,7 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
byte[] serializedEvent = objectSerializer.serialize(event);
|
||||
StringBuilder url = new StringBuilder(Constants.PUBLISH_PATH).append("/").append(topic);
|
||||
return this.client.invokeAPI(
|
||||
DaprHttp.HttpMethods.POST.name(), url.toString(), serializedEvent, metadata).then();
|
||||
DaprHttp.HttpMethods.POST.name(), url.toString(), null, serializedEvent, metadata).then();
|
||||
} catch (Exception ex) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
|
@ -94,7 +94,7 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
}
|
||||
String path = String.format("%s/%s/method/%s", Constants.INVOKE_PATH, appId, method);
|
||||
byte[] serializedRequestBody = objectSerializer.serialize(request);
|
||||
Mono<DaprHttp.Response> response = this.client.invokeAPI(httMethod, path, serializedRequestBody, metadata);
|
||||
Mono<DaprHttp.Response> response = this.client.invokeAPI(httMethod, path, null, serializedRequestBody, metadata);
|
||||
return response.flatMap(r -> {
|
||||
try {
|
||||
return Mono.just(objectSerializer.deserialize(r.getBody(), clazz));
|
||||
|
@ -157,6 +157,7 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
.invokeAPI(
|
||||
DaprHttp.HttpMethods.POST.name(),
|
||||
url.toString(),
|
||||
null,
|
||||
objectSerializer.serialize(jsonMap),
|
||||
null)
|
||||
.then();
|
||||
|
@ -181,10 +182,10 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
|
||||
StringBuilder url = new StringBuilder(Constants.STATE_PATH)
|
||||
.append("/")
|
||||
.append(state.getKey())
|
||||
.append(getOptionsAsQueryParameter(stateOptions));
|
||||
.append(state.getKey());
|
||||
Map<String, String> urlParameters = stateOptions.getStateOptionsAsMap();
|
||||
return this.client
|
||||
.invokeAPI(DaprHttp.HttpMethods.GET.name(), url.toString(), headers)
|
||||
.invokeAPI(DaprHttp.HttpMethods.GET.name(), url.toString(), urlParameters, headers)
|
||||
.flatMap(s -> {
|
||||
try {
|
||||
return Mono.just(buildStateKeyValue(s, state.getKey(), clazz));
|
||||
|
@ -212,10 +213,11 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
if (etag != null && !etag.trim().isEmpty()) {
|
||||
headers.put(Constants.HEADER_HTTP_ETAG_ID, etag);
|
||||
}
|
||||
String url = Constants.STATE_PATH + getOptionsAsQueryParameter(options);;
|
||||
String url = Constants.STATE_PATH;
|
||||
Map<String, String> urlParameter = options.getStateOptionsAsMap();
|
||||
byte[] serializedStateBody = objectSerializer.serialize(states);
|
||||
return this.client.invokeAPI(
|
||||
DaprHttp.HttpMethods.POST.name(), url, serializedStateBody, headers).then();
|
||||
DaprHttp.HttpMethods.POST.name(), url, urlParameter, serializedStateBody, headers).then();
|
||||
} catch (Exception ex) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
|
@ -246,8 +248,9 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
if (state.getEtag() != null && !state.getEtag().trim().isEmpty()) {
|
||||
headers.put(Constants.HEADER_HTTP_ETAG_ID, state.getEtag());
|
||||
}
|
||||
String url = Constants.STATE_PATH + "/" + state.getKey() + getOptionsAsQueryParameter(options);
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.DELETE.name(), url, headers).then();
|
||||
String url = Constants.STATE_PATH + "/" + state.getKey();
|
||||
Map<String, String> urlParameters = options.getStateOptionsAsMap();
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.DELETE.name(), url, urlParameters, headers).then();
|
||||
} catch (Exception ex) {
|
||||
return Mono.error(ex);
|
||||
}
|
||||
|
@ -259,7 +262,7 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
@Override
|
||||
public Mono<String> invokeActorMethod(String actorType, String actorId, String methodName, String jsonPayload) {
|
||||
String url = String.format(Constants.ACTOR_METHOD_RELATIVE_URL_FORMAT, actorType, actorId, methodName);
|
||||
Mono<DaprHttp.Response> responseMono = this.client.invokeAPI(DaprHttp.HttpMethods.POST.name(), url, jsonPayload, null);
|
||||
Mono<DaprHttp.Response> responseMono = this.client.invokeAPI(DaprHttp.HttpMethods.POST.name(), url, null, jsonPayload, null);
|
||||
return responseMono.flatMap(f -> {
|
||||
try {
|
||||
return Mono.just(objectSerializer.deserialize(f.getBody(), String.class));
|
||||
|
@ -275,7 +278,7 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
@Override
|
||||
public Mono<String> getActorState(String actorType, String actorId, String keyName) {
|
||||
String url = String.format(Constants.ACTOR_STATE_KEY_RELATIVE_URL_FORMAT, actorType, actorId, keyName);
|
||||
Mono<DaprHttp.Response> responseMono = this.client.invokeAPI(DaprHttp.HttpMethods.GET.name(), url, "", null);
|
||||
Mono<DaprHttp.Response> responseMono = this.client.invokeAPI(DaprHttp.HttpMethods.GET.name(), url, null, "", null);
|
||||
return responseMono.flatMap(f -> {
|
||||
try {
|
||||
return Mono.just(objectSerializer.deserialize(f.getBody(), String.class));
|
||||
|
@ -291,7 +294,7 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
@Override
|
||||
public Mono<Void> saveActorStateTransactionally(String actorType, String actorId, String data) {
|
||||
String url = String.format(Constants.ACTOR_STATE_RELATIVE_URL_FORMAT, actorType, actorId);
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.PUT.name(), url, data, null).then();
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.PUT.name(), url, null, data, null).then();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -300,7 +303,7 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
@Override
|
||||
public Mono<Void> registerActorReminder(String actorType, String actorId, String reminderName, String data) {
|
||||
String url = String.format(Constants.ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName);
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.PUT.name(), url, data, null).then();
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.PUT.name(), url, null, data, null).then();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -309,7 +312,7 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
@Override
|
||||
public Mono<Void> unregisterActorReminder(String actorType, String actorId, String reminderName) {
|
||||
String url = String.format(Constants.ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName);
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.DELETE.name(), url, null).then();
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.DELETE.name(), url, null, null).then();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -318,7 +321,7 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
@Override
|
||||
public Mono<Void> registerActorTimer(String actorType, String actorId, String timerName, String data) {
|
||||
String url = String.format(Constants.ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName);
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.PUT.name(), url, data, null).then();
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.PUT.name(), url,null, data, null).then();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -327,53 +330,7 @@ public class DaprClientHttpAdapter implements DaprClient {
|
|||
@Override
|
||||
public Mono<Void> unregisterActorTimer(String actorType, String actorId, String timerName) {
|
||||
String url = String.format(Constants.ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName);
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.DELETE.name(), url, null).then();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the string with params for a given URL.
|
||||
*
|
||||
* TODO: Move this logic down the stack to use okhttp's builder instead:
|
||||
* https://square.github.io/okhttp/4.x/okhttp/okhttp3/-http-url/-builder/add-query-parameter/
|
||||
* @param options State options to be converted.
|
||||
* @return String with query params.
|
||||
* @throws IllegalAccessException Cannot extract params.
|
||||
*/
|
||||
private String getOptionsAsQueryParameter(StateOptions options)
|
||||
throws IllegalAccessException {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
Map<String, Object> mapOptions = transformStateOptionsToMap(options);
|
||||
if (mapOptions != null && !mapOptions.isEmpty()) {
|
||||
sb.append("?");
|
||||
for (Map.Entry<String, Object> option : mapOptions.entrySet()) {
|
||||
sb.append(option.getKey()).append("=").append(option.getValue()).append("&");
|
||||
}
|
||||
sb.deleteCharAt(sb.length()-1);
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts state options to map.
|
||||
*
|
||||
* TODO: Move this logic to StateOptions.
|
||||
* @param options Instance to have is methods converted into map.
|
||||
* @return Map for the state options.
|
||||
* @throws IllegalAccessException Cannot extract params.
|
||||
*/
|
||||
private Map<String, Object> transformStateOptionsToMap(StateOptions options)
|
||||
throws IllegalAccessException {
|
||||
Map<String, Object> mapOptions = null;
|
||||
if (options != null) {
|
||||
mapOptions = new HashMap<>();
|
||||
for (Field field : options.getClass().getFields()) {
|
||||
Object fieldValue = field.get(options);
|
||||
if (fieldValue != null) {
|
||||
mapOptions.put(field.getName(), fieldValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
return mapOptions;
|
||||
return this.client.invokeAPI(DaprHttp.HttpMethods.DELETE.name(), url, null, null).then();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -9,11 +9,7 @@ import io.dapr.exceptions.DaprError;
|
|||
import io.dapr.exceptions.DaprException;
|
||||
import io.dapr.utils.Constants;
|
||||
import io.dapr.utils.ObjectSerializer;
|
||||
import okhttp3.MediaType;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.RequestBody;
|
||||
import okhttp3.Response;
|
||||
import okhttp3.*;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -112,8 +108,8 @@ class DaprHttp {
|
|||
* @param urlString url as String.
|
||||
* @return Asynchronous text
|
||||
*/
|
||||
public Mono<Response> invokeAPI(String method, String urlString, Map<String, String> headers) {
|
||||
return this.invokeAPI(method, urlString, (byte[]) null, headers);
|
||||
public Mono<Response> invokeAPI(String method, String urlString, Map<String, String> urlParameters, Map<String, String> headers) {
|
||||
return this.invokeAPI(method, urlString, urlParameters, (byte[]) null, headers);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -124,8 +120,8 @@ class DaprHttp {
|
|||
* @param content payload to be posted.
|
||||
* @return Asynchronous text
|
||||
*/
|
||||
public Mono<Response> invokeAPI(String method, String urlString, String content, Map<String, String> headers) {
|
||||
return this.invokeAPI(method, urlString, content == null ? EMPTY_BYTES : content.getBytes(StandardCharsets.UTF_8), headers);
|
||||
public Mono<Response> invokeAPI(String method, String urlString, Map<String, String> urlParameters, String content, Map<String, String> headers) {
|
||||
return this.invokeAPI(method, urlString, urlParameters, content == null ? EMPTY_BYTES : content.getBytes(StandardCharsets.UTF_8), headers);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -136,7 +132,7 @@ class DaprHttp {
|
|||
* @param content payload to be posted.
|
||||
* @return Asynchronous text
|
||||
*/
|
||||
public Mono<Response> invokeAPI(String method, String urlString, byte[] content, Map<String, String> headers) {
|
||||
public Mono<Response> invokeAPI(String method, String urlString, Map<String, String> urlParameters, byte[] content, Map<String, String> headers) {
|
||||
return Mono.fromFuture(CompletableFuture.supplyAsync(
|
||||
() -> {
|
||||
try {
|
||||
|
@ -151,9 +147,13 @@ class DaprHttp {
|
|||
} else {
|
||||
body = RequestBody.Companion.create(content, mediaType);
|
||||
}
|
||||
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
|
||||
urlBuilder.host(this.baseUrl).addPathSegment(urlString);
|
||||
Optional.ofNullable(urlParameters).orElse(Collections.emptyMap()).entrySet().stream()
|
||||
.forEach(urlParameter -> urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameter.getValue()));
|
||||
|
||||
Request.Builder requestBuilder = new Request.Builder()
|
||||
.url(new URL(this.baseUrl + urlString))
|
||||
.url(urlBuilder.build())
|
||||
.addHeader(Constants.HEADER_DAPR_REQUEST_ID, requestId);
|
||||
if (HttpMethods.GET.name().equals(method)) {
|
||||
requestBuilder.get();
|
||||
|
|
|
@ -4,7 +4,13 @@
|
|||
*/
|
||||
package io.dapr.client.domain;
|
||||
|
||||
import io.dapr.utils.DurationUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public class StateOptions {
|
||||
private final Consistency consistency;
|
||||
|
@ -29,6 +35,31 @@ public class StateOptions {
|
|||
return retryPolicy;
|
||||
}
|
||||
|
||||
public Map<String, String> getStateOptionsAsMap() {
|
||||
Map<String, String> mapOptions = null;
|
||||
if (this != null) {
|
||||
mapOptions = new HashMap<>();
|
||||
if (this.getConsistency() != null) {
|
||||
mapOptions.put("consistency", this.getConsistency().getValue());
|
||||
}
|
||||
if (this.getConcurrency() != null) {
|
||||
mapOptions.put("concurrency", this.getConcurrency().getValue());
|
||||
}
|
||||
if (this.getRetryPolicy() != null) {
|
||||
if (this.getRetryPolicy().getInterval() != null) {
|
||||
mapOptions.put("retryInterval", DurationUtils.ConvertDurationToDaprFormat(this.getRetryPolicy().getInterval()));
|
||||
}
|
||||
if (this.getRetryPolicy().getThreshold() != null) {
|
||||
mapOptions.put("retryThreshold", this.getRetryPolicy().getThreshold().toString());
|
||||
}
|
||||
if (this.getRetryPolicy().getPattern() != null) {
|
||||
mapOptions.put("retryPattern", this.getRetryPolicy().getPattern().getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableMap(Optional.ofNullable(mapOptions).orElse(Collections.EMPTY_MAP));
|
||||
}
|
||||
|
||||
public static enum Consistency {
|
||||
EVENTUAL("eventual"),
|
||||
STRONG("strong");
|
||||
|
@ -76,11 +107,11 @@ public class StateOptions {
|
|||
}
|
||||
|
||||
private final Duration interval;
|
||||
private final String threshold;
|
||||
private final Integer threshold;
|
||||
private final Pattern pattern;
|
||||
|
||||
|
||||
public RetryPolicy(Duration interval, String threshold, Pattern pattern) {
|
||||
public RetryPolicy(Duration interval, Integer threshold, Pattern pattern) {
|
||||
this.interval = interval;
|
||||
this.threshold = threshold;
|
||||
this.pattern = pattern;
|
||||
|
@ -90,12 +121,13 @@ public class StateOptions {
|
|||
return interval;
|
||||
}
|
||||
|
||||
public String getThreshold() {
|
||||
public Integer getThreshold() {
|
||||
return threshold;
|
||||
}
|
||||
|
||||
public Pattern getPattern() {
|
||||
return pattern;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -511,7 +511,7 @@ public class DaprClientGrpcAdapterTest {
|
|||
}
|
||||
|
||||
private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency,
|
||||
Duration interval, String threshold, StateOptions.RetryPolicy.Pattern pattern) {
|
||||
Duration interval, Integer threshold, StateOptions.RetryPolicy.Pattern pattern) {
|
||||
|
||||
StateOptions.RetryPolicy retryPolicy = null;
|
||||
if (interval != null || threshold != null || pattern != null) {
|
||||
|
|
|
@ -32,7 +32,7 @@ public class DaprHttpStub extends DaprHttp {
|
|||
* @return
|
||||
*/
|
||||
@Override
|
||||
public Mono<DaprHttp.Response> invokeAPI(String method, String urlString, Map<String, String> headers) {
|
||||
public Mono<DaprHttp.Response> invokeAPI(String method, String urlString, Map<String, String> urlParameters, Map<String, String> headers) {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
@ -40,7 +40,7 @@ public class DaprHttpStub extends DaprHttp {
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<DaprHttp.Response> invokeAPI(String method, String urlString, String content, Map<String, String> headers) {
|
||||
public Mono<DaprHttp.Response> invokeAPI(String method, String urlString, Map<String, String> urlParameters, String content, Map<String, String> headers) {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
@ -48,7 +48,7 @@ public class DaprHttpStub extends DaprHttp {
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<DaprHttp.Response> invokeAPI(String method, String urlString, byte[] content, Map<String, String> headers) {
|
||||
public Mono<DaprHttp.Response> invokeAPI(String method, String urlString, Map<String, String> urlParameters, byte[] content, Map<String, String> headers) {
|
||||
return Mono.empty();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ public class DaprHttpTest {
|
|||
|
||||
DaprHttp daprHttp = new DaprHttp("http://localhost",3500,okHttpClient);
|
||||
|
||||
Mono<DaprHttp.Response> mono = daprHttp.invokeAPI("POST","v1.0/state",null);
|
||||
Mono<DaprHttp.Response> mono = daprHttp.invokeAPI("POST","v1.0/state",null, null);
|
||||
DaprHttp.Response response = mono.block();
|
||||
String body = serializer.deserialize(response.getBody(), String.class);
|
||||
assertEquals(EXPECTED_RESULT,body);
|
||||
|
@ -60,7 +60,7 @@ public class DaprHttpTest {
|
|||
|
||||
DaprHttp daprHttp = new DaprHttp("http://localhost",3500,okHttpClient);
|
||||
|
||||
Mono<DaprHttp.Response> mono = daprHttp.invokeAPI("DELETE","v1.0/state",null);
|
||||
Mono<DaprHttp.Response> mono = daprHttp.invokeAPI("DELETE","v1.0/state", null, null);
|
||||
DaprHttp.Response response = mono.block();
|
||||
String body = serializer.deserialize(response.getBody(), String.class);
|
||||
assertEquals(EXPECTED_RESULT,body);
|
||||
|
@ -76,7 +76,7 @@ public class DaprHttpTest {
|
|||
|
||||
DaprHttp daprHttp = new DaprHttp("http://localhost",3500,okHttpClient);
|
||||
|
||||
Mono<DaprHttp.Response> mono = daprHttp.invokeAPI("GET","v1.0/get",null);
|
||||
Mono<DaprHttp.Response> mono = daprHttp.invokeAPI("GET","v1.0/get",null, null);
|
||||
DaprHttp.Response response = mono.block();
|
||||
String body = serializer.deserialize(response.getBody(), String.class);
|
||||
assertEquals(EXPECTED_RESULT,body);
|
||||
|
@ -95,7 +95,7 @@ public class DaprHttpTest {
|
|||
.respond(EXPECTED_RESULT);
|
||||
DaprHttp daprHttp = new DaprHttp("http://localhost",3500,okHttpClient);
|
||||
|
||||
Mono<DaprHttp.Response> mono = daprHttp.invokeAPI("GET","v1.0/get",headers);
|
||||
Mono<DaprHttp.Response> mono = daprHttp.invokeAPI("GET","v1.0/get",headers, null);
|
||||
DaprHttp.Response response = mono.block();
|
||||
String body = serializer.deserialize(response.getBody(), String.class);
|
||||
assertEquals(EXPECTED_RESULT,body);
|
||||
|
@ -116,7 +116,7 @@ public class DaprHttpTest {
|
|||
|
||||
DaprHttp daprHttp = new DaprHttp("http://localhost",3500,okHttpClient);
|
||||
|
||||
Mono<DaprHttp.Response> mono = daprHttp.invokeAPI("GET","v1.0/get",headers);
|
||||
Mono<DaprHttp.Response> mono = daprHttp.invokeAPI("GET","v1.0/get", headers, null);
|
||||
DaprHttp.Response response = mono.block();
|
||||
String body = serializer.deserialize(response.getBody(), String.class);
|
||||
assertEquals(EXPECTED_RESULT,body);
|
||||
|
|
|
@ -113,6 +113,7 @@ public class DaprRuntimeTest {
|
|||
when(daprHttp.invokeAPI(
|
||||
eq("POST"),
|
||||
eq(Constants.PUBLISH_PATH + "/" + TOPIC_NAME),
|
||||
eq(null),
|
||||
eq(message.data),
|
||||
eq(null)))
|
||||
.thenAnswer(invocationOnMock -> this.daprRuntime.handleInvocation(
|
||||
|
@ -198,6 +199,7 @@ public class DaprRuntimeTest {
|
|||
when(daprHttp.invokeAPI(
|
||||
eq("POST"),
|
||||
eq(Constants.INVOKE_PATH + "/" + APP_ID + "/method/" + METHOD_NAME),
|
||||
eq(null),
|
||||
eq(message.data),
|
||||
any()))
|
||||
.thenAnswer(x ->
|
||||
|
|
Loading…
Reference in New Issue