mirror of https://github.com/dapr/java-sdk.git
Bump from reactor 2.3.5.RELEASE to 2.7.8 (#830)
* Bump from reactor 2.3.5.RELEASE to 2.7.8 Signed-off-by: Sergio <champel@gmail.com> * Simplification Signed-off-by: Sergio <champel@gmail.com> --------- Signed-off-by: Sergio <champel@gmail.com>
This commit is contained in:
parent
7d78d1880f
commit
7649ae44b0
|
@ -89,7 +89,7 @@ public class OpenTelemetryConfig {
|
|||
* Converts current OpenTelemetry's context into Reactor's context.
|
||||
* @return Reactor's context.
|
||||
*/
|
||||
public static reactor.util.context.Context getReactorContext() {
|
||||
public static reactor.util.context.ContextView getReactorContext() {
|
||||
return getReactorContext(Context.current());
|
||||
}
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ public class BulkPublisher {
|
|||
System.out.println("Going to publish message : " + message);
|
||||
}
|
||||
BulkPublishResponse<?> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
|
||||
.subscriberContext(getReactorContext()).block();
|
||||
.contextWrite(getReactorContext()).block();
|
||||
System.out.println("Published the set of messages in a single call to Dapr");
|
||||
if (res != null) {
|
||||
if (res.getFailedEntries().size() > 0) {
|
||||
|
|
|
@ -63,7 +63,7 @@ public class PublisherWithTracing {
|
|||
client.publishEvent(
|
||||
PUBSUB_NAME,
|
||||
TOPIC_NAME,
|
||||
message).subscriberContext(getReactorContext()).block();
|
||||
message).contextWrite(getReactorContext()).block();
|
||||
System.out.println("Published message: " + message);
|
||||
|
||||
try {
|
||||
|
|
|
@ -67,7 +67,7 @@ public class InvokeClient {
|
|||
InvokeMethodRequest sleepRequest = new InvokeMethodRequest(SERVICE_APP_ID, "proxy_sleep")
|
||||
.setHttpExtension(HttpExtension.POST);
|
||||
return client.invokeMethod(sleepRequest, TypeRef.get(Void.class));
|
||||
}).subscriberContext(getReactorContext()).block();
|
||||
}).contextWrite(getReactorContext()).block();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ public class TracingDemoMiddleServiceController {
|
|||
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "echo")
|
||||
.setBody(body)
|
||||
.setHttpExtension(HttpExtension.POST);
|
||||
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context));
|
||||
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -71,7 +71,7 @@ public class TracingDemoMiddleServiceController {
|
|||
public Mono<Void> sleep(@RequestAttribute(name = "opentelemetry-context") Context context) {
|
||||
InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "sleep")
|
||||
.setHttpExtension(HttpExtension.POST);
|
||||
return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)).then();
|
||||
return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)).then();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ import io.grpc.MethodDescriptor;
|
|||
import io.grpc.stub.StreamObserver;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.MonoSink;
|
||||
import reactor.util.context.Context;
|
||||
import reactor.util.context.ContextView;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -65,7 +65,7 @@ class DaprGrpcClient implements DaprClient {
|
|||
.setMethod(methodName)
|
||||
.setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload))
|
||||
.build();
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.<DaprProtos.InvokeActorResponse>createMono(
|
||||
it -> intercept(context, client).invokeActor(req, it)
|
||||
)
|
||||
|
@ -109,7 +109,7 @@ class DaprGrpcClient implements DaprClient {
|
|||
* @param client GRPC client for Dapr.
|
||||
* @return Client after adding interceptors.
|
||||
*/
|
||||
private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
|
||||
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
|
||||
return GrpcWrapper.intercept(context, client);
|
||||
}
|
||||
|
||||
|
|
|
@ -131,7 +131,7 @@
|
|||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-core</artifactId>
|
||||
<version>3.3.11.RELEASE</version>
|
||||
<version>3.5.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
|
|
@ -101,7 +101,7 @@ public class MethodInvokeIT extends BaseIT {
|
|||
.block(Duration.ofMillis(10))).getMessage();
|
||||
long delay = System.currentTimeMillis() - started;
|
||||
assertTrue(delay <= 500); // 500 ms is a reasonable delay if the request timed out.
|
||||
assertEquals("Timeout on blocking read for 10 MILLISECONDS", message);
|
||||
assertEquals("Timeout on blocking read for 10000000 NANOSECONDS", message);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -118,7 +118,7 @@ public class MethodInvokeIT extends BaseIT {
|
|||
}).getMessage();
|
||||
long delay = System.currentTimeMillis() - started;
|
||||
assertTrue(delay <= 200); // 200 ms is a reasonable delay if the request timed out.
|
||||
assertEquals("Timeout on blocking read for 10 MILLISECONDS", message);
|
||||
assertEquals("Timeout on blocking read for 10000000 NANOSECONDS", message);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -79,7 +79,7 @@ public class TracingIT extends BaseIT {
|
|||
try (Scope scope = span.makeCurrent()) {
|
||||
SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build();
|
||||
client.invokeMethod(daprRun.getAppName(), "sleepOverGRPC", req.toByteArray(), HttpExtension.POST)
|
||||
.subscriberContext(getReactorContext())
|
||||
.contextWrite(getReactorContext())
|
||||
.block();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ public class TracingIT extends BaseIT {
|
|||
try (DaprClient client = new DaprClientBuilder().build()) {
|
||||
try (Scope scope = span.makeCurrent()) {
|
||||
client.invokeMethod(daprRun.getAppName(), "sleep", 1, HttpExtension.POST)
|
||||
.subscriberContext(getReactorContext())
|
||||
.contextWrite(getReactorContext())
|
||||
.block();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-core</artifactId>
|
||||
<version>3.3.11.RELEASE</version>
|
||||
<version>3.5.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
|
|
|
@ -67,7 +67,7 @@ import reactor.core.publisher.Flux;
|
|||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.MonoSink;
|
||||
import reactor.util.context.Context;
|
||||
import reactor.util.context.ContextView;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -181,7 +181,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
envelopeBuilder.putAllMetadata(metadata);
|
||||
}
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context ->
|
||||
this.<Empty>createMono(
|
||||
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
|
||||
|
@ -254,7 +254,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
for (BulkPublishEntry<T> entry: request.getEntries()) {
|
||||
entryMap.put(entry.getEntryId(), entry);
|
||||
}
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context ->
|
||||
this.<DaprProtos.BulkPublishResponse>createMono(
|
||||
it -> intercept(context, asyncStub).bulkPublishEventAlpha1(envelopeBuilder.build(), it)
|
||||
|
@ -298,7 +298,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
// gRPC to gRPC does not handle metadata in Dapr runtime proto.
|
||||
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.<CommonProtos.InvokeResponse>createMono(
|
||||
it -> intercept(context, asyncStub).invokeService(envelope, it)
|
||||
)
|
||||
|
@ -345,7 +345,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
}
|
||||
DaprProtos.InvokeBindingRequest envelope = builder.build();
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
|
||||
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
|
||||
)
|
||||
|
@ -392,7 +392,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
|
||||
DaprProtos.GetStateRequest envelope = builder.build();
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context ->
|
||||
this.<DaprProtos.GetStateResponse>createMono(
|
||||
it -> intercept(context, asyncStub).getState(envelope, it)
|
||||
|
@ -441,7 +441,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
|
||||
DaprProtos.GetBulkStateRequest envelope = builder.build();
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.<DaprProtos.GetBulkStateResponse>createMono(it -> intercept(context, asyncStub)
|
||||
.getBulkState(envelope, it)
|
||||
)
|
||||
|
@ -525,7 +525,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
}
|
||||
DaprProtos.ExecuteStateTransactionRequest req = builder.build();
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it))
|
||||
).then();
|
||||
} catch (Exception e) {
|
||||
|
@ -551,7 +551,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
}
|
||||
DaprProtos.SaveStateRequest req = builder.build();
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).saveState(req, it))
|
||||
).then();
|
||||
} catch (Exception ex) {
|
||||
|
@ -635,7 +635,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
|
||||
DaprProtos.DeleteStateRequest req = builder.build();
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.<Empty>createMono(it -> intercept(context, asyncStub).deleteState(req, it))
|
||||
).then();
|
||||
} catch (Exception ex) {
|
||||
|
@ -713,7 +713,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
}
|
||||
DaprProtos.GetSecretRequest req = requestBuilder.build();
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.<DaprProtos.GetSecretResponse>createMono(it -> intercept(context, asyncStub).getSecret(req, it))
|
||||
).map(DaprProtos.GetSecretResponse::getDataMap);
|
||||
}
|
||||
|
@ -738,7 +738,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
|
||||
DaprProtos.GetBulkSecretRequest envelope = builder.build();
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context ->
|
||||
this.<DaprProtos.GetBulkSecretResponse>createMono(
|
||||
it -> intercept(context, asyncStub).getBulkSecret(envelope, it)
|
||||
|
@ -791,7 +791,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
|
||||
DaprProtos.QueryStateRequest envelope = builder.build();
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.<DaprProtos.QueryStateResponse>createMono(
|
||||
it -> intercept(context, asyncStub).queryStateAlpha1(envelope, it)
|
||||
)
|
||||
|
@ -855,7 +855,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
*/
|
||||
@Override
|
||||
public Mono<Void> shutdown() {
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.<Empty>createMono(
|
||||
it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it))
|
||||
).then();
|
||||
|
@ -889,7 +889,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
}
|
||||
|
||||
private Mono<Map<String, ConfigurationItem>> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) {
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context ->
|
||||
this.<DaprProtos.GetConfigurationResponse>createMono(
|
||||
it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it)
|
||||
|
@ -1034,7 +1034,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
* @param client GRPC client for Dapr.
|
||||
* @return Client after adding interceptors.
|
||||
*/
|
||||
private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) {
|
||||
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
|
||||
return GrpcWrapper.intercept(context, client);
|
||||
}
|
||||
|
||||
|
|
|
@ -177,7 +177,7 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic };
|
||||
|
||||
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.client.invokeApi(
|
||||
DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context
|
||||
)
|
||||
|
@ -237,7 +237,7 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
} else {
|
||||
headers.put(Metadata.CONTENT_TYPE, objectSerializer.getContentType());
|
||||
}
|
||||
Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
|
||||
Mono<DaprHttp.Response> response = Mono.deferContextual(
|
||||
context -> this.client.invokeApi(httpMethod, pathSegments.toArray(new String[0]),
|
||||
httpExtension.getQueryParams(), serializedRequestBody, headers, context)
|
||||
);
|
||||
|
@ -309,7 +309,7 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
|
||||
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "bindings", name };
|
||||
|
||||
Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
|
||||
Mono<DaprHttp.Response> response = Mono.deferContextual(
|
||||
context -> this.client.invokeApi(
|
||||
httpMethod, pathSegments, null, payload, null, context)
|
||||
);
|
||||
|
@ -349,7 +349,7 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "bulk" };
|
||||
|
||||
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.client
|
||||
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, requestBody, null, context)
|
||||
).flatMap(s -> {
|
||||
|
@ -394,7 +394,7 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
|
||||
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key };
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.client
|
||||
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryParams, null, context)
|
||||
).flatMap(s -> {
|
||||
|
@ -452,7 +452,7 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
|
||||
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "transaction" };
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.client.invokeApi(
|
||||
DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedOperationBody, null, context
|
||||
)
|
||||
|
@ -500,7 +500,7 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
|
||||
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName };
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.client.invokeApi(
|
||||
DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedStateBody, null, context)
|
||||
).then();
|
||||
|
@ -543,7 +543,7 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
|
||||
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key };
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.client.invokeApi(
|
||||
DaprHttp.HttpMethods.DELETE.name(), pathSegments, queryParams, headers, context)
|
||||
).then();
|
||||
|
@ -631,7 +631,7 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
|
||||
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, key };
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.client
|
||||
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context)
|
||||
).flatMap(response -> {
|
||||
|
@ -667,7 +667,7 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
|
||||
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, "bulk" };
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.client
|
||||
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context)
|
||||
).flatMap(response -> {
|
||||
|
@ -709,17 +709,17 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
} else {
|
||||
throw new IllegalArgumentException("Both query and queryString fields are not set.");
|
||||
}
|
||||
return Mono.subscriberContext().flatMap(
|
||||
context -> this.client
|
||||
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
|
||||
queryArgs, serializedRequest, null, context)
|
||||
).flatMap(response -> {
|
||||
try {
|
||||
return Mono.justOrEmpty(buildQueryStateResponse(response, type));
|
||||
} catch (Exception e) {
|
||||
return DaprException.wrapMono(e);
|
||||
}
|
||||
});
|
||||
return Mono.deferContextual(
|
||||
context -> this.client
|
||||
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
|
||||
queryArgs, serializedRequest, null, context)
|
||||
).flatMap(response -> {
|
||||
try {
|
||||
return Mono.justOrEmpty(buildQueryStateResponse(response, type));
|
||||
} catch (Exception e) {
|
||||
return DaprException.wrapMono(e);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
return DaprException.wrapMono(e);
|
||||
}
|
||||
|
@ -739,14 +739,14 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
@Override
|
||||
public Mono<Void> shutdown() {
|
||||
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" };
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
|
||||
null, null, context))
|
||||
.then();
|
||||
}
|
||||
|
||||
private <T> QueryStateResponse<T> buildQueryStateResponse(DaprHttp.Response response,
|
||||
TypeRef<T> type) throws IOException {
|
||||
TypeRef<T> type) throws IOException {
|
||||
JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody());
|
||||
if (!root.has("results")) {
|
||||
return new QueryStateResponse<>(Collections.emptyList(), null);
|
||||
|
@ -810,36 +810,36 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
queryParams.putAll(queryArgs);
|
||||
|
||||
String[] pathSegments = new String[] {DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName };
|
||||
return Mono.subscriberContext().flatMap(
|
||||
context -> this.client
|
||||
.invokeApi(
|
||||
DaprHttp.HttpMethods.GET.name(),
|
||||
pathSegments, queryParams,
|
||||
(String) null, null, context)
|
||||
return Mono.deferContextual(
|
||||
context -> this.client
|
||||
.invokeApi(
|
||||
DaprHttp.HttpMethods.GET.name(),
|
||||
pathSegments, queryParams,
|
||||
(String) null, null, context)
|
||||
).map(
|
||||
response -> {
|
||||
try {
|
||||
Map m = INTERNAL_SERIALIZER.deserialize(response.getBody(), Map.class);
|
||||
Set<String> set = m.keySet();
|
||||
JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody());
|
||||
Iterator<String> itr = set.iterator();
|
||||
Map<String, ConfigurationItem> result = new HashMap<>();
|
||||
while (itr.hasNext()) {
|
||||
String key = itr.next();
|
||||
String value = root.get(key).path("value").asText();
|
||||
String version = root.get(key).path("version").asText();
|
||||
result.put(key, new ConfigurationItem(
|
||||
key,
|
||||
value,
|
||||
version,
|
||||
new HashMap<>()
|
||||
));
|
||||
}
|
||||
return Collections.unmodifiableMap(result);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
response -> {
|
||||
try {
|
||||
Map m = INTERNAL_SERIALIZER.deserialize(response.getBody(), Map.class);
|
||||
Set<String> set = m.keySet();
|
||||
JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody());
|
||||
Iterator<String> itr = set.iterator();
|
||||
Map<String, ConfigurationItem> result = new HashMap<>();
|
||||
while (itr.hasNext()) {
|
||||
String key = itr.next();
|
||||
String value = root.get(key).path("value").asText();
|
||||
String version = root.get(key).path("version").asText();
|
||||
result.put(key, new ConfigurationItem(
|
||||
key,
|
||||
value,
|
||||
version,
|
||||
new HashMap<>()
|
||||
));
|
||||
}
|
||||
return Collections.unmodifiableMap(result);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (Exception ex) {
|
||||
return DaprException.wrapMono(ex);
|
||||
|
@ -871,12 +871,12 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
|
||||
String[] pathSegments =
|
||||
new String[] { DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName, "subscribe" };
|
||||
SubscribeConfigurationResponse res = Mono.subscriberContext().flatMap(
|
||||
context -> this.client.invokeApi(
|
||||
DaprHttp.HttpMethods.GET.name(),
|
||||
pathSegments, queryParams,
|
||||
(String) null, null, context
|
||||
)
|
||||
SubscribeConfigurationResponse res = Mono.deferContextual(
|
||||
context -> this.client.invokeApi(
|
||||
DaprHttp.HttpMethods.GET.name(),
|
||||
pathSegments, queryParams,
|
||||
(String) null, null, context
|
||||
)
|
||||
).map(response -> {
|
||||
try {
|
||||
JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody());
|
||||
|
@ -913,7 +913,7 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
String[] pathSegments = new String[]
|
||||
{ DaprHttp.ALPHA_1_API_VERSION, "configuration", configStoreName, id, "unsubscribe" };
|
||||
|
||||
return Mono.subscriberContext().flatMap(
|
||||
return Mono.deferContextual(
|
||||
context -> this.client
|
||||
.invokeApi(
|
||||
DaprHttp.HttpMethods.GET.name(),
|
||||
|
|
|
@ -29,7 +29,7 @@ import okhttp3.RequestBody;
|
|||
import okhttp3.ResponseBody;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.context.Context;
|
||||
import reactor.util.context.ContextView;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
@ -183,7 +183,7 @@ public class DaprHttp implements AutoCloseable {
|
|||
String[] pathSegments,
|
||||
Map<String, List<String>> urlParameters,
|
||||
Map<String, String> headers,
|
||||
Context context) {
|
||||
ContextView context) {
|
||||
return this.invokeApi(method, pathSegments, urlParameters, (byte[]) null, headers, context);
|
||||
}
|
||||
|
||||
|
@ -204,7 +204,7 @@ public class DaprHttp implements AutoCloseable {
|
|||
Map<String, List<String>> urlParameters,
|
||||
String content,
|
||||
Map<String, String> headers,
|
||||
Context context) {
|
||||
ContextView context) {
|
||||
|
||||
return this.invokeApi(
|
||||
method, pathSegments, urlParameters, content == null
|
||||
|
@ -224,12 +224,12 @@ public class DaprHttp implements AutoCloseable {
|
|||
* @return Asynchronous response
|
||||
*/
|
||||
public Mono<Response> invokeApi(
|
||||
String method,
|
||||
String[] pathSegments,
|
||||
Map<String, List<String>> urlParameters,
|
||||
byte[] content,
|
||||
Map<String, String> headers,
|
||||
Context context) {
|
||||
String method,
|
||||
String[] pathSegments,
|
||||
Map<String, List<String>> urlParameters,
|
||||
byte[] content,
|
||||
Map<String, String> headers,
|
||||
ContextView context) {
|
||||
// fromCallable() is needed so the invocation does not happen early, causing a hot mono.
|
||||
return Mono.fromCallable(() -> doInvokeApi(method, pathSegments, urlParameters, content, headers, context))
|
||||
.flatMap(f -> Mono.fromFuture(f));
|
||||
|
@ -256,10 +256,10 @@ public class DaprHttp implements AutoCloseable {
|
|||
* @return CompletableFuture for Response.
|
||||
*/
|
||||
private CompletableFuture<Response> doInvokeApi(String method,
|
||||
String[] pathSegments,
|
||||
Map<String, List<String>> urlParameters,
|
||||
byte[] content, Map<String, String> headers,
|
||||
Context context) {
|
||||
String[] pathSegments,
|
||||
Map<String, List<String>> urlParameters,
|
||||
byte[] content, Map<String, String> headers,
|
||||
ContextView context) {
|
||||
final String requestId = UUID.randomUUID().toString();
|
||||
RequestBody body;
|
||||
|
||||
|
@ -282,8 +282,8 @@ public class DaprHttp implements AutoCloseable {
|
|||
Optional.ofNullable(urlParameters).orElse(Collections.emptyMap()).entrySet().stream()
|
||||
.forEach(urlParameter ->
|
||||
Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream()
|
||||
.forEach(urlParameterValue ->
|
||||
urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue)));
|
||||
.forEach(urlParameterValue ->
|
||||
urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue)));
|
||||
|
||||
Request.Builder requestBuilder = new Request.Builder()
|
||||
.url(urlBuilder.build())
|
||||
|
@ -305,7 +305,6 @@ public class DaprHttp implements AutoCloseable {
|
|||
if (daprApiToken != null) {
|
||||
requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken);
|
||||
}
|
||||
|
||||
requestBuilder.addHeader(Headers.DAPR_USER_AGENT, Version.getSdkVersion());
|
||||
|
||||
if (headers != null) {
|
||||
|
|
|
@ -23,9 +23,9 @@ import io.grpc.ForwardingClientCall;
|
|||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import reactor.util.context.Context;
|
||||
import reactor.util.context.ContextView;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -61,7 +61,7 @@ public final class GrpcWrapper {
|
|||
* @param client GRPC client for Dapr.
|
||||
* @return Client after adding interceptors.
|
||||
*/
|
||||
public static DaprGrpc.DaprStub intercept(final Context context, DaprGrpc.DaprStub client) {
|
||||
public static DaprGrpc.DaprStub intercept(final ContextView context, DaprGrpc.DaprStub client) {
|
||||
ClientInterceptor interceptor = new ClientInterceptor() {
|
||||
@Override
|
||||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import reactor.util.context.ContextView;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
@ -189,7 +190,7 @@ public class DaprClientGrpcTelemetryTest {
|
|||
.setBody("request")
|
||||
.setHttpExtension(HttpExtension.NONE);
|
||||
Mono<Void> result = this.client.invokeMethod(req, TypeRef.get(Void.class))
|
||||
.subscriberContext(it -> it.putAll(contextCopy == null ? Context.empty() : contextCopy));
|
||||
.contextWrite(it -> it.putAll(contextCopy == null ? (ContextView) Context.empty() : contextCopy));
|
||||
result.block();
|
||||
}
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import reactor.util.context.ContextView;
|
||||
|
||||
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
|
||||
import static io.dapr.utils.TestUtils.findFreePort;
|
||||
|
@ -422,7 +423,7 @@ public class DaprClientHttpTest {
|
|||
.setBody("request")
|
||||
.setHttpExtension(HttpExtension.POST);
|
||||
Mono<Void> result = daprClientHttp.invokeMethod(req, TypeRef.get(Void.class))
|
||||
.subscriberContext(it -> it.putAll(context));
|
||||
.contextWrite(it -> it.putAll((ContextView) context));
|
||||
result.block();
|
||||
}
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ limitations under the License.
|
|||
package io.dapr.client;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.context.Context;
|
||||
import reactor.util.context.ContextView;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -45,7 +45,7 @@ public class DaprHttpStub extends DaprHttp {
|
|||
String[] pathSegments,
|
||||
Map<String, List<String>> urlParameters,
|
||||
Map<String, String> headers,
|
||||
Context context) {
|
||||
ContextView context) {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ public class DaprHttpStub extends DaprHttp {
|
|||
Map<String, List<String>> urlParameters,
|
||||
String content,
|
||||
Map<String, String> headers,
|
||||
Context context) {
|
||||
ContextView context) {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
@ -71,7 +71,7 @@ public class DaprHttpStub extends DaprHttp {
|
|||
Map<String, List<String>> urlParameters,
|
||||
byte[] content,
|
||||
Map<String, String> headers,
|
||||
Context context) {
|
||||
ContextView context) {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue