Delete DaprHTTPClient for actors. (#1056)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
Artur Souza 2024-06-25 13:22:58 -07:00 committed by GitHub
parent 48364b1f65
commit d9cfba7e8d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 32 additions and 400 deletions

View File

@ -315,7 +315,7 @@ public class ActorRuntime implements Closeable {
* @throws java.lang.IllegalStateException if any required field is missing
*/
private static DaprClient buildDaprClient(ManagedChannel channel) {
return new DaprGrpcClient(channel);
return new DaprClientImpl(channel);
}
/**

View File

@ -36,7 +36,7 @@ import java.util.concurrent.ExecutionException;
/**
* A DaprClient over HTTP for Actor's runtime.
*/
class DaprGrpcClient implements DaprClient {
class DaprClientImpl implements DaprClient {
/**
* Use to handle internal serialization.
@ -60,7 +60,7 @@ class DaprGrpcClient implements DaprClient {
*
* @param channel channel (client needs to close channel after use).
*/
DaprGrpcClient(ManagedChannel channel) {
DaprClientImpl(ManagedChannel channel) {
this(DaprGrpc.newStub(channel));
}
@ -69,7 +69,7 @@ class DaprGrpcClient implements DaprClient {
*
* @param daprStubClient Dapr's GRPC client.
*/
DaprGrpcClient(DaprGrpc.DaprStub daprStubClient) {
DaprClientImpl(DaprGrpc.DaprStub daprStubClient) {
this.client = daprStubClient;
}

View File

@ -1,205 +0,0 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.actors.runtime;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import io.dapr.client.DaprHttp;
import reactor.core.publisher.Mono;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
/**
* A DaprClient over HTTP for Actor's runtime.
*/
class DaprHttpClient implements DaprClient {
/**
* Internal serializer for state.
*/
private static final ActorObjectSerializer INTERNAL_SERIALIZER = new ActorObjectSerializer();
/**
* Shared Json Factory as per Jackson's documentation, used only for this class.
*/
private static final JsonFactory JSON_FACTORY = new JsonFactory();
/**
* The HTTP client to be used.
*
* @see DaprHttp
*/
private final DaprHttp client;
/**
* Internal constructor.
*
* @param client Dapr's http client.
*/
DaprHttpClient(DaprHttp client) {
this.client = client;
}
/**
* {@inheritDoc}
*/
@Override
public Mono<byte[]> getState(String actorType, String actorId, String keyName) {
String[] pathSegments = new String[] { DaprHttp.API_VERSION, "actors", actorType, actorId, "state", keyName };
Mono<DaprHttp.Response> responseMono =
this.client.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, null, "", null, null);
return responseMono.map(r -> {
if ((r.getStatusCode() != 200) && (r.getStatusCode() != 204)) {
throw new IllegalStateException(
String.format("Error getting actor state: %s/%s/%s", actorType, actorId, keyName));
}
return r.getBody();
});
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> saveStateTransactionally(
String actorType,
String actorId,
List<ActorStateOperation> operations) {
// Constructing the JSON via a stream API to avoid creating transient objects to be instantiated.
byte[] payload = null;
try (ByteArrayOutputStream writer = new ByteArrayOutputStream()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
// Start array
generator.writeStartArray();
for (ActorStateOperation stateOperation : operations) {
// Start operation object.
generator.writeStartObject();
generator.writeStringField("operation", stateOperation.getOperationType());
// Start request object.
generator.writeObjectFieldStart("request");
generator.writeStringField("key", stateOperation.getKey());
Object value = stateOperation.getValue();
if (value != null) {
if (value instanceof String) {
// DefaultObjectSerializer is a JSON serializer, so we just pass it on.
generator.writeFieldName("value");
generator.writeRawValue((String) value);
} else if (value instanceof byte[]) {
// Custom serializer uses byte[].
// DefaultObjectSerializer is just a passthrough for byte[], so we handle it here too.
generator.writeBinaryField("value", (byte[]) value);
} else {
return Mono.error(() -> {
throw new IllegalArgumentException("Actor state value must be String or byte[]");
});
}
}
// End request object.
generator.writeEndObject();
// End operation object.
generator.writeEndObject();
}
// End array
generator.writeEndArray();
generator.close();
writer.flush();
payload = writer.toByteArray();
} catch (IOException e) {
return Mono.error(e);
}
String[] pathSegments = new String[] { DaprHttp.API_VERSION, "actors", actorType, actorId, "state" };
return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), pathSegments, null, payload, null, null).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> registerReminder(
String actorType,
String actorId,
String reminderName,
ActorReminderParams reminderParams) {
String[] pathSegments = new String[] {
DaprHttp.API_VERSION,
"actors",
actorType,
actorId,
"reminders",
reminderName
};
return Mono.fromCallable(() -> INTERNAL_SERIALIZER.serialize(reminderParams))
.flatMap(data ->
this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), pathSegments, null, data, null, null)
).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> unregisterReminder(String actorType, String actorId, String reminderName) {
String[] pathSegments = new String[] {
DaprHttp.API_VERSION,
"actors",
actorType,
actorId,
"reminders",
reminderName
};
return this.client.invokeApi(DaprHttp.HttpMethods.DELETE.name(), pathSegments, null, null, null).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> registerTimer(
String actorType,
String actorId,
String timerName,
ActorTimerParams timerParams) {
return Mono.fromCallable(() -> INTERNAL_SERIALIZER.serialize(timerParams))
.flatMap(data -> {
String[] pathSegments = new String[] {
DaprHttp.API_VERSION,
"actors",
actorType,
actorId,
"timers",
timerName
};
return this.client.invokeApi(DaprHttp.HttpMethods.PUT.name(), pathSegments, null, data, null, null);
}).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> unregisterTimer(String actorType, String actorId, String timerName) {
String[] pathSegments = new String[] { DaprHttp.API_VERSION, "actors", actorType, actorId, "timers", timerName };
return this.client.invokeApi(DaprHttp.HttpMethods.DELETE.name(), pathSegments, null, null, null).then();
}
}

View File

@ -68,7 +68,7 @@ public class DaprGrpcClientTest {
private final DaprGrpc.DaprImplBase serviceImpl = new CustomDaprClient();
private DaprGrpcClient client;
private DaprClientImpl client;
@Rule
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
@ -87,7 +87,7 @@ public class DaprGrpcClientTest {
InProcessChannelBuilder.forName(serverName).directExecutor().build());
// Create a HelloWorldClient using the in-process channel;
client = new DaprGrpcClient(DaprGrpc.newStub(channel));
client = new DaprClientImpl(DaprGrpc.newStub(channel));
}
@Test

View File

@ -1,178 +0,0 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.actors.runtime;
import io.dapr.client.DaprHttp;
import io.dapr.client.DaprHttpProxy;
import io.dapr.config.Properties;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.mock.Behavior;
import okhttp3.mock.MockInterceptor;
import okhttp3.mock.RuleAnswer;
import okio.Buffer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import static io.dapr.actors.TestUtils.formatIpAddress;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
public class DaprHttpClientTest {
private DaprHttpClient DaprHttpClient;
private OkHttpClient okHttpClient;
private MockInterceptor mockInterceptor;
private String sidecarIp;
private final String EXPECTED_RESULT = "{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}";
@BeforeEach
public void setUp() throws Exception {
sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get());
mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
}
@Test
public void getActorState() {
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3000/v1.0/actors/DemoActor/1/state/order")
.respond(EXPECTED_RESULT);
DaprHttp daprHttp = new DaprHttpProxy(sidecarIp, 3000, okHttpClient);
DaprHttpClient = new DaprHttpClient(daprHttp);
Mono<byte[]> mono = DaprHttpClient.getState("DemoActor", "1", "order");
assertEquals(new String(mono.block()), EXPECTED_RESULT);
}
@Test
public void getActorStateIPv6() {
String prevSidecarIp = sidecarIp;
System.setProperty(Properties.SIDECAR_IP.getName(), "2001:db8:3333:4444:5555:6666:7777:8888");
sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get());
mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3000/v1.0/actors/DemoActor/1/state/order")
.respond(EXPECTED_RESULT);
DaprHttp daprHttp = new DaprHttpProxy(sidecarIp, 3000, okHttpClient);
DaprHttpClient = new DaprHttpClient(daprHttp);
System.setProperty(Properties.SIDECAR_IP.getName(), prevSidecarIp);
Mono<byte[]> mono = DaprHttpClient.getState("DemoActor", "1", "order");
assertEquals(new String(mono.block()), EXPECTED_RESULT);
}
@Test
public void saveActorStateTransactionally() {
mockInterceptor.addRule()
.put("http://" + sidecarIp + ":3000/v1.0/actors/DemoActor/1/state")
.respond(EXPECTED_RESULT);
DaprHttp daprHttp = new DaprHttpProxy(sidecarIp, 3000, okHttpClient);
DaprHttpClient = new DaprHttpClient(daprHttp);
List<ActorStateOperation> ops = Collections.singletonList(new ActorStateOperation("UPSERT", "key", "value"));
Mono<Void> mono = DaprHttpClient.saveStateTransactionally("DemoActor", "1", ops);
assertNull(mono.block());
}
@Test
public void registerActorReminder() {
mockInterceptor.addRule()
.put("http://" + sidecarIp + ":3000/v1.0/actors/DemoActor/1/reminders/reminder")
.respond(EXPECTED_RESULT);
DaprHttp daprHttp = new DaprHttpProxy(sidecarIp, 3000, okHttpClient);
DaprHttpClient = new DaprHttpClient(daprHttp);
Mono<Void> mono =
DaprHttpClient.registerReminder(
"DemoActor",
"1",
"reminder",
new ActorReminderParams("".getBytes(), Duration.ofSeconds(1), Duration.ofSeconds(2)));
assertNull(mono.block());
}
@Test
public void unregisterActorReminder() {
mockInterceptor.addRule()
.delete("http://" + sidecarIp + ":3000/v1.0/actors/DemoActor/1/reminders/reminder")
.respond(EXPECTED_RESULT);
DaprHttp daprHttp = new DaprHttpProxy(sidecarIp, 3000, okHttpClient);
DaprHttpClient = new DaprHttpClient(daprHttp);
Mono<Void> mono = DaprHttpClient.unregisterReminder("DemoActor", "1", "reminder");
assertNull(mono.block());
}
@Test
public void registerActorTimer() {
String data = "hello world";
mockInterceptor.addRule()
.put("http://" + sidecarIp + ":3000/v1.0/actors/DemoActor/1/timers/timer")
.answer(new RuleAnswer() {
@Override
public Response.Builder respond(Request request) {
String expectedBody = "{\"dueTime\":\"0h0m5s0ms\"," +
"\"period\":\"0h0m10s0ms\"," +
"\"callback\":\"mycallback\"," +
"\"data\":\""+ Base64.getEncoder().encodeToString(data.getBytes()) +"\"}";
String body = "";
try {
Buffer buffer = new Buffer();
request.body().writeTo(buffer);
body = buffer.readString(Charset.defaultCharset());
} catch (IOException e) {
fail();
}
assertEquals(expectedBody, body);
return new Response.Builder()
.code(200)
.body(ResponseBody.create("{}", MediaType.get("application/json")));
}
});
DaprHttp daprHttp = new DaprHttpProxy(sidecarIp, 3000, okHttpClient);
DaprHttpClient = new DaprHttpClient(daprHttp);
Mono<Void> mono =
DaprHttpClient.registerTimer(
"DemoActor",
"1",
"timer",
new ActorTimerParams(
"mycallback",
data.getBytes(),
Duration.ofSeconds(5),
Duration.ofSeconds(10)));
assertNull(mono.block());
}
@Test
public void unregisterActorTimer() {
mockInterceptor.addRule()
.delete("http://" + sidecarIp + ":3000/v1.0/actors/DemoActor/1/timers/timer")
.respond(EXPECTED_RESULT);
DaprHttp daprHttp = new DaprHttpProxy(sidecarIp, 3000, okHttpClient);
DaprHttpClient = new DaprHttpClient(daprHttp);
Mono<Void> mono = DaprHttpClient.unregisterTimer("DemoActor", "1", "timer");
assertNull(mono.block());
}
}

View File

@ -12,7 +12,7 @@ limitations under the License.
*/
package io.dapr.actors.runtime;
import io.dapr.client.DaprHttp;
import io.grpc.ManagedChannel;
/**
* Exposes useful methods for IT in DaprClientHttp.
@ -20,10 +20,10 @@ import io.dapr.client.DaprHttp;
public class DaprClientHttpUtils {
public static void unregisterActorReminder(
DaprHttp client,
ManagedChannel channel,
String actorType,
String actorId,
String reminderName) throws Exception {
new DaprHttpClient(client).unregisterReminder(actorType, actorId, reminderName).block();
String reminderName) {
new DaprClientImpl(channel).unregisterReminder(actorType, actorId, reminderName).block();
}
}

View File

@ -17,10 +17,12 @@ import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.actors.runtime.DaprClientHttpUtils;
import io.dapr.client.DaprHttp;
import io.dapr.client.DaprHttpBuilder;
import io.dapr.config.Properties;
import io.dapr.it.BaseIT;
import io.dapr.it.actors.app.MyActorService;
import io.dapr.utils.Version;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -54,14 +56,15 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT {
@AfterEach
public void cleanUpTestCase() {
// Delete the reminder in case the test failed, otherwise it may interfere with future tests since it is persisted.
DaprHttp client = new DaprHttpBuilder().build();
System.out.println("Invoking during cleanup");
var channel = buildManagedChannel();
try {
DaprClientHttpUtils.unregisterActorReminder(client, ACTOR_TYPE, ACTOR_ID, REMINDER_NAME);
} catch(Exception e) {
System.out.println("Invoking during cleanup");
DaprClientHttpUtils.unregisterActorReminder(channel, ACTOR_TYPE, ACTOR_ID, REMINDER_NAME);
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.shutdown();
}
}
/**
@ -225,4 +228,16 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT {
}
}
}
private static ManagedChannel buildManagedChannel() {
int port = Properties.GRPC_PORT.get();
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}
return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port)
.usePlaintext()
.userAgent(Version.getSdkVersion())
.build();
}
}