mirror of https://github.com/dapr/java-sdk.git
Merge branch 'master' into javi-use-name-to-register-activity
This commit is contained in:
commit
a882849784
|
|
@ -14,7 +14,10 @@ limitations under the License.
|
|||
package io.dapr.it.testcontainers.jobs;
|
||||
|
||||
import io.dapr.client.DaprPreviewClient;
|
||||
import io.dapr.client.domain.ConstantFailurePolicy;
|
||||
import io.dapr.client.domain.DeleteJobRequest;
|
||||
import io.dapr.client.domain.DropFailurePolicy;
|
||||
import io.dapr.client.domain.FailurePolicyType;
|
||||
import io.dapr.client.domain.GetJobRequest;
|
||||
import io.dapr.client.domain.GetJobResponse;
|
||||
import io.dapr.client.domain.JobSchedule;
|
||||
|
|
@ -25,6 +28,7 @@ import io.dapr.testcontainers.DaprLogLevel;
|
|||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.runner.notification.Failure;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
|
||||
|
|
@ -34,6 +38,7 @@ import org.testcontainers.containers.Network;
|
|||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
|
@ -97,6 +102,9 @@ public class DaprJobsIT {
|
|||
|
||||
GetJobResponse getJobResponse =
|
||||
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
|
||||
|
||||
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
|
||||
|
||||
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
|
||||
assertEquals("Job", getJobResponse.getName());
|
||||
}
|
||||
|
|
@ -112,6 +120,9 @@ public class DaprJobsIT {
|
|||
|
||||
GetJobResponse getJobResponse =
|
||||
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
|
||||
|
||||
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
|
||||
|
||||
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
|
||||
assertEquals(JobSchedule.hourly().getExpression(), getJobResponse.getSchedule().getExpression());
|
||||
assertEquals("Job", getJobResponse.getName());
|
||||
|
|
@ -134,6 +145,9 @@ public class DaprJobsIT {
|
|||
|
||||
GetJobResponse getJobResponse =
|
||||
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
|
||||
|
||||
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
|
||||
|
||||
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
|
||||
assertEquals("2 * 3 * * FRI", getJobResponse.getSchedule().getExpression());
|
||||
assertEquals("Job", getJobResponse.getName());
|
||||
|
|
@ -143,6 +157,57 @@ public class DaprJobsIT {
|
|||
getJobResponse.getTtl().toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobScheduleCreationWithDropFailurePolicy() {
|
||||
Instant currentTime = Instant.now();
|
||||
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
|
||||
.withZone(ZoneOffset.UTC);
|
||||
|
||||
String cronExpression = "2 * 3 * * FRI";
|
||||
|
||||
daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime)
|
||||
.setTtl(currentTime.plus(2, ChronoUnit.HOURS))
|
||||
.setData("Job data".getBytes())
|
||||
.setRepeat(3)
|
||||
.setFailurePolicy(new DropFailurePolicy())
|
||||
.setSchedule(JobSchedule.fromString(cronExpression))).block();
|
||||
|
||||
GetJobResponse getJobResponse =
|
||||
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
|
||||
|
||||
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
|
||||
|
||||
assertEquals(FailurePolicyType.DROP, getJobResponse.getFailurePolicy().getFailurePolicyType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJobScheduleCreationWithConstantFailurePolicy() {
|
||||
Instant currentTime = Instant.now();
|
||||
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
|
||||
.withZone(ZoneOffset.UTC);
|
||||
|
||||
String cronExpression = "2 * 3 * * FRI";
|
||||
|
||||
daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime)
|
||||
.setTtl(currentTime.plus(2, ChronoUnit.HOURS))
|
||||
.setData("Job data".getBytes())
|
||||
.setRepeat(3)
|
||||
.setFailurePolicy(new ConstantFailurePolicy(3)
|
||||
.setDurationBetweenRetries(Duration.of(10, ChronoUnit.SECONDS)))
|
||||
.setSchedule(JobSchedule.fromString(cronExpression))).block();
|
||||
|
||||
GetJobResponse getJobResponse =
|
||||
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
|
||||
|
||||
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
|
||||
|
||||
ConstantFailurePolicy jobFailurePolicyConstant = (ConstantFailurePolicy) getJobResponse.getFailurePolicy();
|
||||
assertEquals(FailurePolicyType.CONSTANT, getJobResponse.getFailurePolicy().getFailurePolicyType());
|
||||
assertEquals(3, (int)jobFailurePolicyConstant.getMaxRetries());
|
||||
assertEquals(Duration.of(10, ChronoUnit.SECONDS).getNano(),
|
||||
jobFailurePolicyConstant.getDurationBetweenRetries().getNano());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteJobRequest() {
|
||||
Instant currentTime = Instant.now();
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import io.dapr.client.domain.BulkPublishResponseFailedEntry;
|
|||
import io.dapr.client.domain.CloudEvent;
|
||||
import io.dapr.client.domain.ComponentMetadata;
|
||||
import io.dapr.client.domain.ConfigurationItem;
|
||||
import io.dapr.client.domain.ConstantFailurePolicy;
|
||||
import io.dapr.client.domain.ConversationInput;
|
||||
import io.dapr.client.domain.ConversationOutput;
|
||||
import io.dapr.client.domain.ConversationRequest;
|
||||
|
|
@ -34,7 +35,10 @@ import io.dapr.client.domain.ConversationResponse;
|
|||
import io.dapr.client.domain.DaprMetadata;
|
||||
import io.dapr.client.domain.DeleteJobRequest;
|
||||
import io.dapr.client.domain.DeleteStateRequest;
|
||||
import io.dapr.client.domain.DropFailurePolicy;
|
||||
import io.dapr.client.domain.ExecuteStateTransactionRequest;
|
||||
import io.dapr.client.domain.FailurePolicy;
|
||||
import io.dapr.client.domain.FailurePolicyType;
|
||||
import io.dapr.client.domain.GetBulkSecretRequest;
|
||||
import io.dapr.client.domain.GetBulkStateRequest;
|
||||
import io.dapr.client.domain.GetConfigurationRequest;
|
||||
|
|
@ -105,6 +109,7 @@ import java.time.Duration;
|
|||
import java.time.Instant;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
|
@ -1336,6 +1341,10 @@ public class DaprClientImpl extends AbstractDaprClient {
|
|||
scheduleJobRequestBuilder.setDueTime(iso8601Formatter.format(scheduleJobRequest.getDueTime()));
|
||||
}
|
||||
|
||||
if (scheduleJobRequest.getFailurePolicy() != null) {
|
||||
scheduleJobRequestBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy()));
|
||||
}
|
||||
|
||||
scheduleJobRequestBuilder.setOverwrite(scheduleJobRequest.getOverwrite());
|
||||
|
||||
Mono<DaprProtos.ScheduleJobResponse> scheduleJobResponseMono =
|
||||
|
|
@ -1380,6 +1389,10 @@ public class DaprClientImpl extends AbstractDaprClient {
|
|||
getJobResponse = new GetJobResponse(job.getName(), Instant.parse(job.getDueTime()));
|
||||
}
|
||||
|
||||
if (job.hasFailurePolicy()) {
|
||||
getJobResponse.setFailurePolicy(getJobFailurePolicy(job.getFailurePolicy()));
|
||||
}
|
||||
|
||||
return getJobResponse
|
||||
.setTtl(job.hasTtl() ? Instant.parse(job.getTtl()) : null)
|
||||
.setData(job.hasData() ? job.getData().getValue().toByteArray() : null)
|
||||
|
|
@ -1390,6 +1403,53 @@ public class DaprClientImpl extends AbstractDaprClient {
|
|||
}
|
||||
}
|
||||
|
||||
private FailurePolicy getJobFailurePolicy(CommonProtos.JobFailurePolicy jobFailurePolicy) {
|
||||
if (jobFailurePolicy.hasDrop()) {
|
||||
return new DropFailurePolicy();
|
||||
}
|
||||
|
||||
CommonProtos.JobFailurePolicyConstant jobFailurePolicyConstant = jobFailurePolicy.getConstant();
|
||||
if (jobFailurePolicyConstant.hasInterval() && jobFailurePolicyConstant.hasMaxRetries()) {
|
||||
return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries())
|
||||
.setDurationBetweenRetries(Duration.of(jobFailurePolicyConstant.getInterval().getNanos(),
|
||||
ChronoUnit.NANOS));
|
||||
}
|
||||
|
||||
if (jobFailurePolicyConstant.hasMaxRetries()) {
|
||||
return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries());
|
||||
}
|
||||
|
||||
return new ConstantFailurePolicy(
|
||||
Duration.of(jobFailurePolicyConstant.getInterval().getNanos(),
|
||||
ChronoUnit.NANOS));
|
||||
}
|
||||
|
||||
private CommonProtos.JobFailurePolicy getJobFailurePolicy(FailurePolicy failurePolicy) {
|
||||
CommonProtos.JobFailurePolicy.Builder jobFailurePolicyBuilder = CommonProtos.JobFailurePolicy.newBuilder();
|
||||
|
||||
if (failurePolicy.getFailurePolicyType() == FailurePolicyType.DROP) {
|
||||
jobFailurePolicyBuilder.setDrop(CommonProtos.JobFailurePolicyDrop.newBuilder().build());
|
||||
return jobFailurePolicyBuilder.build();
|
||||
}
|
||||
|
||||
CommonProtos.JobFailurePolicyConstant.Builder constantPolicyBuilder =
|
||||
CommonProtos.JobFailurePolicyConstant.newBuilder();
|
||||
ConstantFailurePolicy jobConstantFailurePolicy = (ConstantFailurePolicy)failurePolicy;
|
||||
|
||||
if (jobConstantFailurePolicy.getMaxRetries() != null) {
|
||||
constantPolicyBuilder.setMaxRetries(jobConstantFailurePolicy.getMaxRetries());
|
||||
}
|
||||
|
||||
if (jobConstantFailurePolicy.getDurationBetweenRetries() != null) {
|
||||
constantPolicyBuilder.setInterval(com.google.protobuf.Duration.newBuilder()
|
||||
.setNanos(jobConstantFailurePolicy.getDurationBetweenRetries().getNano()).build());
|
||||
}
|
||||
|
||||
jobFailurePolicyBuilder.setConstant(constantPolicyBuilder.build());
|
||||
|
||||
return jobFailurePolicyBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Copyright 2021 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* A failure policy that applies a constant retry interval for job retries.
|
||||
* This implementation of {@link FailurePolicy} retries a job a fixed number of times
|
||||
* with a constant delay between each retry attempt.
|
||||
*/
|
||||
public class ConstantFailurePolicy implements FailurePolicy {
|
||||
|
||||
private Integer maxRetries;
|
||||
private Duration durationBetweenRetries;
|
||||
|
||||
/**
|
||||
* Constructs a {@code JobConstantFailurePolicy} with the specified maximum number of retries.
|
||||
*
|
||||
* @param maxRetries the maximum number of retries
|
||||
*/
|
||||
public ConstantFailurePolicy(Integer maxRetries) {
|
||||
this.maxRetries = maxRetries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a {@code JobConstantFailurePolicy} with the specified duration between retries.
|
||||
*
|
||||
* @param durationBetweenRetries the duration to wait between retries
|
||||
*/
|
||||
public ConstantFailurePolicy(Duration durationBetweenRetries) {
|
||||
this.durationBetweenRetries = durationBetweenRetries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the duration to wait between retry attempts.
|
||||
*
|
||||
* @param durationBetweenRetries the duration between retries
|
||||
* @return a {@code JobFailurePolicyConstant}.
|
||||
*/
|
||||
public ConstantFailurePolicy setDurationBetweenRetries(Duration durationBetweenRetries) {
|
||||
this.durationBetweenRetries = durationBetweenRetries;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum number of retries allowed.
|
||||
*
|
||||
* @param maxRetries the number of retries
|
||||
* @return a {@code JobFailurePolicyConstant}.
|
||||
*/
|
||||
public ConstantFailurePolicy setMaxRetries(int maxRetries) {
|
||||
this.maxRetries = maxRetries;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured duration between retry attempts.
|
||||
*
|
||||
* @return the duration between retries
|
||||
*/
|
||||
public Duration getDurationBetweenRetries() {
|
||||
return this.durationBetweenRetries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured maximum number of retries.
|
||||
*
|
||||
* @return the maximum number of retries
|
||||
*/
|
||||
public Integer getMaxRetries() {
|
||||
return this.maxRetries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the type of failure policy.
|
||||
*
|
||||
* @return {@link FailurePolicyType#CONSTANT}
|
||||
*/
|
||||
@Override
|
||||
public FailurePolicyType getFailurePolicyType() {
|
||||
return FailurePolicyType.CONSTANT;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Copyright 2021 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
/**
|
||||
* A failure policy that drops the job upon failure without retrying.
|
||||
* This implementation of {@link FailurePolicy} immediately discards failed jobs
|
||||
* instead of retrying them.
|
||||
*/
|
||||
public class DropFailurePolicy implements FailurePolicy {
|
||||
|
||||
/**
|
||||
* Returns the type of failure policy.
|
||||
*
|
||||
* @return {@link FailurePolicyType#DROP}
|
||||
*/
|
||||
@Override
|
||||
public FailurePolicyType getFailurePolicyType() {
|
||||
return FailurePolicyType.DROP;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Copyright 2025 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
/**
|
||||
* Set a failure policy for the job.
|
||||
*/
|
||||
public interface FailurePolicy {
|
||||
FailurePolicyType getFailurePolicyType();
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
/*
|
||||
* Copyright 2025 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
public enum FailurePolicyType {
|
||||
DROP,
|
||||
|
||||
CONSTANT
|
||||
}
|
||||
|
|
@ -25,6 +25,7 @@ public class GetJobResponse {
|
|||
private Instant dueTime;
|
||||
private Integer repeats;
|
||||
private Instant ttl;
|
||||
private FailurePolicy failurePolicy;
|
||||
|
||||
/**
|
||||
* Constructor to create GetJobResponse.
|
||||
|
|
@ -110,6 +111,19 @@ public class GetJobResponse {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the failure policy for the scheduled job.
|
||||
* This defines how the job should behave in case of failure, such as retrying with a delay
|
||||
* or dropping the job entirely.
|
||||
*
|
||||
* @param failurePolicy the {@link FailurePolicy} to apply to the job
|
||||
* @return this {@code ScheduleJobRequest} instance for method chaining
|
||||
*/
|
||||
public GetJobResponse setFailurePolicy(FailurePolicy failurePolicy) {
|
||||
this.failurePolicy = failurePolicy;
|
||||
return this;
|
||||
}
|
||||
|
||||
// Getters
|
||||
|
||||
/**
|
||||
|
|
@ -165,4 +179,13 @@ public class GetJobResponse {
|
|||
public Instant getTtl() {
|
||||
return ttl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the failure policy.
|
||||
*
|
||||
* @return FailurePolicy.
|
||||
*/
|
||||
public FailurePolicy getFailurePolicy() {
|
||||
return failurePolicy;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ public class ScheduleJobRequest {
|
|||
private Instant dueTime;
|
||||
private Integer repeats;
|
||||
private Instant ttl;
|
||||
private FailurePolicy failurePolicy;
|
||||
private boolean overwrite;
|
||||
|
||||
/**
|
||||
|
|
@ -111,7 +112,18 @@ public class ScheduleJobRequest {
|
|||
return this;
|
||||
}
|
||||
|
||||
// Getters
|
||||
/**
|
||||
* Sets the failure policy for the scheduled job.
|
||||
* This defines how the job should behave in case of failure, such as retrying with a delay
|
||||
* or dropping the job entirely.
|
||||
*
|
||||
* @param failurePolicy the {@link FailurePolicy} to apply to the job
|
||||
* @return this {@code ScheduleJobRequest} instance for method chaining
|
||||
*/
|
||||
public ScheduleJobRequest setFailurePolicy(FailurePolicy failurePolicy) {
|
||||
this.failurePolicy = failurePolicy;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the name of the job.
|
||||
|
|
@ -168,6 +180,15 @@ public class ScheduleJobRequest {
|
|||
}
|
||||
|
||||
/**
|
||||
* Gets the failure policy.
|
||||
*
|
||||
* @return FailurePolicy.
|
||||
*/
|
||||
public FailurePolicy getFailurePolicy() {
|
||||
return failurePolicy;
|
||||
}
|
||||
|
||||
/*
|
||||
* Gets the overwrite flag.
|
||||
*
|
||||
* @return The overwrite flag.
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ import io.dapr.client.domain.CloudEvent;
|
|||
import io.dapr.client.domain.DeleteJobRequest;
|
||||
import io.dapr.client.domain.GetJobRequest;
|
||||
import io.dapr.client.domain.GetJobResponse;
|
||||
import io.dapr.client.domain.ConstantFailurePolicy;
|
||||
import io.dapr.client.domain.DropFailurePolicy;
|
||||
import io.dapr.client.domain.JobSchedule;
|
||||
import io.dapr.client.domain.ConversationInput;
|
||||
import io.dapr.client.domain.ConversationRequest;
|
||||
|
|
@ -38,6 +40,7 @@ import io.dapr.client.domain.query.Query;
|
|||
import io.dapr.serializer.DaprObjectSerializer;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import io.dapr.utils.TypeRef;
|
||||
import io.dapr.v1.CommonProtos;
|
||||
import io.dapr.v1.DaprAppCallbackProtos;
|
||||
import io.dapr.v1.DaprGrpc;
|
||||
import io.dapr.v1.DaprProtos;
|
||||
|
|
@ -56,6 +59,7 @@ import reactor.core.publisher.Mono;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
|
|
@ -817,6 +821,125 @@ public class DaprPreviewClientGrpcTest {
|
|||
assertEquals("Name in the request cannot be null or empty", exception.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void scheduleJobShouldHavePolicyWhenPolicyIsSet() {
|
||||
doAnswer(invocation -> {
|
||||
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
|
||||
observer.onCompleted(); // Simulate successful response
|
||||
return null;
|
||||
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
|
||||
|
||||
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
|
||||
JobSchedule.fromString("* * * * * *"))
|
||||
.setFailurePolicy(new DropFailurePolicy());
|
||||
|
||||
previewClient.scheduleJob(expectedScheduleJobRequest).block();
|
||||
|
||||
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
|
||||
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
|
||||
|
||||
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
|
||||
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
|
||||
DaprProtos.Job job = actualScheduleJobRequest.getJob();
|
||||
assertEquals("testJob", job.getName());
|
||||
assertFalse(job.hasData());
|
||||
assertEquals( "* * * * * *", job.getSchedule());
|
||||
assertEquals(0, job.getRepeats());
|
||||
assertFalse(job.hasTtl());
|
||||
Assertions.assertTrue(job.hasFailurePolicy());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void scheduleJobShouldHaveConstantPolicyWithMaxRetriesWhenConstantPolicyIsSetWithMaxRetries() {
|
||||
doAnswer(invocation -> {
|
||||
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
|
||||
observer.onCompleted(); // Simulate successful response
|
||||
return null;
|
||||
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
|
||||
|
||||
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
|
||||
JobSchedule.fromString("* * * * * *"))
|
||||
.setFailurePolicy(new ConstantFailurePolicy(2));
|
||||
|
||||
previewClient.scheduleJob(expectedScheduleJobRequest).block();
|
||||
|
||||
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
|
||||
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
|
||||
|
||||
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
|
||||
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
|
||||
DaprProtos.Job job = actualScheduleJobRequest.getJob();
|
||||
assertEquals("testJob", job.getName());
|
||||
assertFalse(job.hasData());
|
||||
assertEquals( "* * * * * *", job.getSchedule());
|
||||
assertEquals(0, job.getRepeats());
|
||||
assertFalse(job.hasTtl());
|
||||
Assertions.assertTrue(job.hasFailurePolicy());
|
||||
assertEquals(2, job.getFailurePolicy().getConstant().getMaxRetries());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void scheduleJobShouldHaveConstantPolicyWithIntervalWhenConstantPolicyIsSetWithInterval() {
|
||||
doAnswer(invocation -> {
|
||||
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
|
||||
observer.onCompleted(); // Simulate successful response
|
||||
return null;
|
||||
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
|
||||
|
||||
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
|
||||
JobSchedule.fromString("* * * * * *"))
|
||||
.setFailurePolicy(new ConstantFailurePolicy(Duration.of(2, ChronoUnit.SECONDS)));
|
||||
|
||||
previewClient.scheduleJob(expectedScheduleJobRequest).block();
|
||||
|
||||
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
|
||||
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
|
||||
|
||||
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
|
||||
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
|
||||
DaprProtos.Job job = actualScheduleJobRequest.getJob();
|
||||
assertEquals("testJob", job.getName());
|
||||
assertFalse(job.hasData());
|
||||
assertEquals( "* * * * * *", job.getSchedule());
|
||||
assertEquals(0, job.getRepeats());
|
||||
assertFalse(job.hasTtl());
|
||||
Assertions.assertTrue(job.hasFailurePolicy());
|
||||
assertEquals(Duration.of(2, ChronoUnit.SECONDS).getNano(),
|
||||
job.getFailurePolicy().getConstant().getInterval().getNanos());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void scheduleJobShouldHaveBothRetiresAndIntervalWhenConstantPolicyIsSetWithRetriesAndInterval() {
|
||||
doAnswer(invocation -> {
|
||||
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
|
||||
observer.onCompleted(); // Simulate successful response
|
||||
return null;
|
||||
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
|
||||
|
||||
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
|
||||
JobSchedule.fromString("* * * * * *"))
|
||||
.setFailurePolicy(new ConstantFailurePolicy(Duration.of(2, ChronoUnit.SECONDS))
|
||||
.setMaxRetries(10));
|
||||
|
||||
previewClient.scheduleJob(expectedScheduleJobRequest).block();
|
||||
|
||||
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
|
||||
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
|
||||
|
||||
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
|
||||
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
|
||||
DaprProtos.Job job = actualScheduleJobRequest.getJob();
|
||||
assertEquals("testJob", job.getName());
|
||||
assertFalse(job.hasData());
|
||||
assertEquals( "* * * * * *", job.getSchedule());
|
||||
assertEquals(0, job.getRepeats());
|
||||
assertFalse(job.hasTtl());
|
||||
Assertions.assertTrue(job.hasFailurePolicy());
|
||||
assertEquals(Duration.of(2, ChronoUnit.SECONDS).getNano(),
|
||||
job.getFailurePolicy().getConstant().getInterval().getNanos());
|
||||
assertEquals(10, job.getFailurePolicy().getConstant().getMaxRetries());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void scheduleJobShouldThrowWhenNameAlreadyExists() {
|
||||
AtomicInteger callCount = new AtomicInteger(0);
|
||||
|
|
@ -891,10 +1014,6 @@ public class DaprPreviewClientGrpcTest {
|
|||
assertEquals("testJob", secondActualRequest.getJob().getName());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void getJobShouldReturnResponseWhenAllFieldsArePresentInRequest() {
|
||||
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
|
||||
|
|
@ -993,6 +1112,154 @@ public class DaprPreviewClientGrpcTest {
|
|||
assertEquals(job.getDueTime(), datetime);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getJobShouldReturnResponseWithDropFailurePolicySet() {
|
||||
GetJobRequest getJobRequest = new GetJobRequest("testJob");
|
||||
|
||||
String datetime = OffsetDateTime.now().toString();
|
||||
DaprProtos.Job job = DaprProtos.Job.newBuilder()
|
||||
.setName("testJob")
|
||||
.setDueTime(datetime)
|
||||
.setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder()
|
||||
.setDrop(CommonProtos.JobFailurePolicyDrop.newBuilder().build()).build())
|
||||
.build();
|
||||
|
||||
doAnswer(invocation -> {
|
||||
StreamObserver<DaprProtos.GetJobResponse> observer = invocation.getArgument(1);
|
||||
observer.onNext(DaprProtos.GetJobResponse.newBuilder()
|
||||
.setJob(job)
|
||||
.build());
|
||||
observer.onCompleted();
|
||||
return null;
|
||||
}).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any());
|
||||
|
||||
Mono<GetJobResponse> resultMono = previewClient.getJob(getJobRequest);
|
||||
|
||||
GetJobResponse response = resultMono.block();
|
||||
assertNotNull(response);
|
||||
assertEquals("testJob", response.getName());
|
||||
assertNull(response.getData());
|
||||
assertNull(response.getSchedule());
|
||||
assertNull(response.getRepeats());
|
||||
assertNull(response.getTtl());
|
||||
assertEquals(job.getDueTime(), datetime);
|
||||
assertTrue(job.hasFailurePolicy());
|
||||
assertTrue(job.getFailurePolicy().hasDrop());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getJobShouldReturnResponseWithConstantFailurePolicyAndMaxRetriesSet() {
|
||||
GetJobRequest getJobRequest = new GetJobRequest("testJob");
|
||||
|
||||
String datetime = OffsetDateTime.now().toString();
|
||||
DaprProtos.Job job = DaprProtos.Job.newBuilder()
|
||||
.setName("testJob")
|
||||
.setDueTime(datetime)
|
||||
.setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder()
|
||||
.setConstant(CommonProtos.JobFailurePolicyConstant.newBuilder().setMaxRetries(2).build()).build())
|
||||
.build();
|
||||
|
||||
doAnswer(invocation -> {
|
||||
StreamObserver<DaprProtos.GetJobResponse> observer = invocation.getArgument(1);
|
||||
observer.onNext(DaprProtos.GetJobResponse.newBuilder()
|
||||
.setJob(job)
|
||||
.build());
|
||||
observer.onCompleted();
|
||||
return null;
|
||||
}).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any());
|
||||
|
||||
Mono<GetJobResponse> resultMono = previewClient.getJob(getJobRequest);
|
||||
|
||||
GetJobResponse response = resultMono.block();
|
||||
assertNotNull(response);
|
||||
assertEquals("testJob", response.getName());
|
||||
assertNull(response.getData());
|
||||
assertNull(response.getSchedule());
|
||||
assertNull(response.getRepeats());
|
||||
assertNull(response.getTtl());
|
||||
assertEquals(job.getDueTime(), datetime);
|
||||
assertTrue(job.hasFailurePolicy());
|
||||
assertTrue(job.getFailurePolicy().hasConstant());
|
||||
assertEquals(2, job.getFailurePolicy().getConstant().getMaxRetries());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getJobShouldReturnResponseWithConstantFailurePolicyAndIntervalSet() {
|
||||
GetJobRequest getJobRequest = new GetJobRequest("testJob");
|
||||
|
||||
String datetime = OffsetDateTime.now().toString();
|
||||
DaprProtos.Job job = DaprProtos.Job.newBuilder()
|
||||
.setName("testJob")
|
||||
.setDueTime(datetime)
|
||||
.setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder()
|
||||
.setConstant(CommonProtos.JobFailurePolicyConstant.newBuilder()
|
||||
.setInterval(com.google.protobuf.Duration.newBuilder().setNanos(5).build()).build()).build())
|
||||
.build();
|
||||
|
||||
doAnswer(invocation -> {
|
||||
StreamObserver<DaprProtos.GetJobResponse> observer = invocation.getArgument(1);
|
||||
observer.onNext(DaprProtos.GetJobResponse.newBuilder()
|
||||
.setJob(job)
|
||||
.build());
|
||||
observer.onCompleted();
|
||||
return null;
|
||||
}).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any());
|
||||
|
||||
Mono<GetJobResponse> resultMono = previewClient.getJob(getJobRequest);
|
||||
|
||||
GetJobResponse response = resultMono.block();
|
||||
assertNotNull(response);
|
||||
assertEquals("testJob", response.getName());
|
||||
assertNull(response.getData());
|
||||
assertNull(response.getSchedule());
|
||||
assertNull(response.getRepeats());
|
||||
assertNull(response.getTtl());
|
||||
assertEquals(job.getDueTime(), datetime);
|
||||
assertTrue(job.hasFailurePolicy());
|
||||
assertTrue(job.getFailurePolicy().hasConstant());
|
||||
assertEquals(5, job.getFailurePolicy().getConstant().getInterval().getNanos());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getJobShouldReturnResponseWithConstantFailurePolicyIntervalAndMaxRetriesSet() {
|
||||
GetJobRequest getJobRequest = new GetJobRequest("testJob");
|
||||
|
||||
String datetime = OffsetDateTime.now().toString();
|
||||
DaprProtos.Job job = DaprProtos.Job.newBuilder()
|
||||
.setName("testJob")
|
||||
.setDueTime(datetime)
|
||||
.setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder()
|
||||
.setConstant(CommonProtos.JobFailurePolicyConstant.newBuilder()
|
||||
.setMaxRetries(10)
|
||||
.setInterval(com.google.protobuf.Duration.newBuilder().setNanos(5).build()).build()).build())
|
||||
.build();
|
||||
|
||||
doAnswer(invocation -> {
|
||||
StreamObserver<DaprProtos.GetJobResponse> observer = invocation.getArgument(1);
|
||||
observer.onNext(DaprProtos.GetJobResponse.newBuilder()
|
||||
.setJob(job)
|
||||
.build());
|
||||
observer.onCompleted();
|
||||
return null;
|
||||
}).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any());
|
||||
|
||||
Mono<GetJobResponse> resultMono = previewClient.getJob(getJobRequest);
|
||||
|
||||
GetJobResponse response = resultMono.block();
|
||||
assertNotNull(response);
|
||||
assertEquals("testJob", response.getName());
|
||||
assertNull(response.getData());
|
||||
assertNull(response.getSchedule());
|
||||
assertNull(response.getRepeats());
|
||||
assertNull(response.getTtl());
|
||||
assertEquals(job.getDueTime(), datetime);
|
||||
assertTrue(job.hasFailurePolicy());
|
||||
assertTrue(job.getFailurePolicy().hasConstant());
|
||||
assertEquals(10, job.getFailurePolicy().getConstant().getMaxRetries());
|
||||
assertEquals(5, job.getFailurePolicy().getConstant().getInterval().getNanos());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void getJobShouldThrowWhenRequestIsNull() {
|
||||
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> {
|
||||
|
|
|
|||
Loading…
Reference in New Issue