From 4ee6be0d8ee570846469d97a5395ea7cfc1ff74b Mon Sep 17 00:00:00 2001 From: Siri Varma Vegiraju Date: Tue, 5 Aug 2025 11:00:57 -0700 Subject: [PATCH] Add Failure Policy for Jobs SDK (#1448) * Update CONTRIBUTING.md Signed-off-by: Siri Varma Vegiraju * Add failrue policy Signed-off-by: sirivarma * Add tests Signed-off-by: sirivarma * Add Tests Signed-off-by: sirivarma * Upgrading to 1.15.7 (#1458) * upgrading to 1.15.7 Signed-off-by: salaboy * using DAPR VERSION Signed-off-by: salaboy --------- Signed-off-by: salaboy Signed-off-by: siri-varma * Rename classes Signed-off-by: siri-varma * add rc Signed-off-by: sirivarma * fix checkstyle Signed-off-by: sirivarma * Fix things Signed-off-by: sirivarma * Test latest Signed-off-by: sirivarma * fix checkstyle Signed-off-by: sirivarma * Address comments Signed-off-by: sirivarma * Address comments Signed-off-by: sirivarma --------- Signed-off-by: Siri Varma Vegiraju Signed-off-by: sirivarma Signed-off-by: salaboy Signed-off-by: siri-varma Co-authored-by: artur-ciocanu Co-authored-by: salaboy --- .../it/testcontainers/jobs/DaprJobsIT.java | 65 +++++ .../java/io/dapr/client/DaprClientImpl.java | 60 ++++ .../client/domain/ConstantFailurePolicy.java | 95 ++++++ .../dapr/client/domain/DropFailurePolicy.java | 32 ++ .../io/dapr/client/domain/FailurePolicy.java | 21 ++ .../dapr/client/domain/FailurePolicyType.java | 20 ++ .../io/dapr/client/domain/GetJobResponse.java | 23 ++ .../client/domain/ScheduleJobRequest.java | 23 +- .../client/DaprPreviewClientGrpcTest.java | 275 +++++++++++++++++- 9 files changed, 609 insertions(+), 5 deletions(-) create mode 100644 sdk/src/main/java/io/dapr/client/domain/ConstantFailurePolicy.java create mode 100644 sdk/src/main/java/io/dapr/client/domain/DropFailurePolicy.java create mode 100644 sdk/src/main/java/io/dapr/client/domain/FailurePolicy.java create mode 100644 sdk/src/main/java/io/dapr/client/domain/FailurePolicyType.java diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/jobs/DaprJobsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/jobs/DaprJobsIT.java index 529744b18..ac2b4a71b 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/jobs/DaprJobsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/jobs/DaprJobsIT.java @@ -14,7 +14,10 @@ limitations under the License. package io.dapr.it.testcontainers.jobs; import io.dapr.client.DaprPreviewClient; +import io.dapr.client.domain.ConstantFailurePolicy; import io.dapr.client.domain.DeleteJobRequest; +import io.dapr.client.domain.DropFailurePolicy; +import io.dapr.client.domain.FailurePolicyType; import io.dapr.client.domain.GetJobRequest; import io.dapr.client.domain.GetJobResponse; import io.dapr.client.domain.JobSchedule; @@ -25,6 +28,7 @@ import io.dapr.testcontainers.DaprLogLevel; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.runner.notification.Failure; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -34,6 +38,7 @@ import org.testcontainers.containers.Network; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import java.time.Duration; import java.time.Instant; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; @@ -97,6 +102,9 @@ public class DaprJobsIT { GetJobResponse getJobResponse = daprPreviewClient.getJob(new GetJobRequest("Job")).block(); + + daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block(); + assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); assertEquals("Job", getJobResponse.getName()); } @@ -112,6 +120,9 @@ public class DaprJobsIT { GetJobResponse getJobResponse = daprPreviewClient.getJob(new GetJobRequest("Job")).block(); + + daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block(); + assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); assertEquals(JobSchedule.hourly().getExpression(), getJobResponse.getSchedule().getExpression()); assertEquals("Job", getJobResponse.getName()); @@ -134,6 +145,9 @@ public class DaprJobsIT { GetJobResponse getJobResponse = daprPreviewClient.getJob(new GetJobRequest("Job")).block(); + + daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block(); + assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); assertEquals("2 * 3 * * FRI", getJobResponse.getSchedule().getExpression()); assertEquals("Job", getJobResponse.getName()); @@ -143,6 +157,57 @@ public class DaprJobsIT { getJobResponse.getTtl().toString()); } + @Test + public void testJobScheduleCreationWithDropFailurePolicy() { + Instant currentTime = Instant.now(); + DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC); + + String cronExpression = "2 * 3 * * FRI"; + + daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) + .setData("Job data".getBytes()) + .setRepeat(3) + .setFailurePolicy(new DropFailurePolicy()) + .setSchedule(JobSchedule.fromString(cronExpression))).block(); + + GetJobResponse getJobResponse = + daprPreviewClient.getJob(new GetJobRequest("Job")).block(); + + daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block(); + + assertEquals(FailurePolicyType.DROP, getJobResponse.getFailurePolicy().getFailurePolicyType()); + } + + @Test + public void testJobScheduleCreationWithConstantFailurePolicy() { + Instant currentTime = Instant.now(); + DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC); + + String cronExpression = "2 * 3 * * FRI"; + + daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) + .setData("Job data".getBytes()) + .setRepeat(3) + .setFailurePolicy(new ConstantFailurePolicy(3) + .setDurationBetweenRetries(Duration.of(10, ChronoUnit.SECONDS))) + .setSchedule(JobSchedule.fromString(cronExpression))).block(); + + GetJobResponse getJobResponse = + daprPreviewClient.getJob(new GetJobRequest("Job")).block(); + + daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block(); + + ConstantFailurePolicy jobFailurePolicyConstant = (ConstantFailurePolicy) getJobResponse.getFailurePolicy(); + assertEquals(FailurePolicyType.CONSTANT, getJobResponse.getFailurePolicy().getFailurePolicyType()); + assertEquals(3, (int)jobFailurePolicyConstant.getMaxRetries()); + assertEquals(Duration.of(10, ChronoUnit.SECONDS).getNano(), + jobFailurePolicyConstant.getDurationBetweenRetries().getNano()); + } + @Test public void testDeleteJobRequest() { Instant currentTime = Instant.now(); diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 5b78a59cb..335e56b85 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -27,6 +27,7 @@ import io.dapr.client.domain.BulkPublishResponseFailedEntry; import io.dapr.client.domain.CloudEvent; import io.dapr.client.domain.ComponentMetadata; import io.dapr.client.domain.ConfigurationItem; +import io.dapr.client.domain.ConstantFailurePolicy; import io.dapr.client.domain.ConversationInput; import io.dapr.client.domain.ConversationOutput; import io.dapr.client.domain.ConversationRequest; @@ -34,7 +35,10 @@ import io.dapr.client.domain.ConversationResponse; import io.dapr.client.domain.DaprMetadata; import io.dapr.client.domain.DeleteJobRequest; import io.dapr.client.domain.DeleteStateRequest; +import io.dapr.client.domain.DropFailurePolicy; import io.dapr.client.domain.ExecuteStateTransactionRequest; +import io.dapr.client.domain.FailurePolicy; +import io.dapr.client.domain.FailurePolicyType; import io.dapr.client.domain.GetBulkSecretRequest; import io.dapr.client.domain.GetBulkStateRequest; import io.dapr.client.domain.GetConfigurationRequest; @@ -105,6 +109,7 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1336,6 +1341,10 @@ public class DaprClientImpl extends AbstractDaprClient { scheduleJobRequestBuilder.setDueTime(iso8601Formatter.format(scheduleJobRequest.getDueTime())); } + if (scheduleJobRequest.getFailurePolicy() != null) { + scheduleJobRequestBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy())); + } + scheduleJobRequestBuilder.setOverwrite(scheduleJobRequest.getOverwrite()); Mono scheduleJobResponseMono = @@ -1380,6 +1389,10 @@ public class DaprClientImpl extends AbstractDaprClient { getJobResponse = new GetJobResponse(job.getName(), Instant.parse(job.getDueTime())); } + if (job.hasFailurePolicy()) { + getJobResponse.setFailurePolicy(getJobFailurePolicy(job.getFailurePolicy())); + } + return getJobResponse .setTtl(job.hasTtl() ? Instant.parse(job.getTtl()) : null) .setData(job.hasData() ? job.getData().getValue().toByteArray() : null) @@ -1390,6 +1403,53 @@ public class DaprClientImpl extends AbstractDaprClient { } } + private FailurePolicy getJobFailurePolicy(CommonProtos.JobFailurePolicy jobFailurePolicy) { + if (jobFailurePolicy.hasDrop()) { + return new DropFailurePolicy(); + } + + CommonProtos.JobFailurePolicyConstant jobFailurePolicyConstant = jobFailurePolicy.getConstant(); + if (jobFailurePolicyConstant.hasInterval() && jobFailurePolicyConstant.hasMaxRetries()) { + return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries()) + .setDurationBetweenRetries(Duration.of(jobFailurePolicyConstant.getInterval().getNanos(), + ChronoUnit.NANOS)); + } + + if (jobFailurePolicyConstant.hasMaxRetries()) { + return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries()); + } + + return new ConstantFailurePolicy( + Duration.of(jobFailurePolicyConstant.getInterval().getNanos(), + ChronoUnit.NANOS)); + } + + private CommonProtos.JobFailurePolicy getJobFailurePolicy(FailurePolicy failurePolicy) { + CommonProtos.JobFailurePolicy.Builder jobFailurePolicyBuilder = CommonProtos.JobFailurePolicy.newBuilder(); + + if (failurePolicy.getFailurePolicyType() == FailurePolicyType.DROP) { + jobFailurePolicyBuilder.setDrop(CommonProtos.JobFailurePolicyDrop.newBuilder().build()); + return jobFailurePolicyBuilder.build(); + } + + CommonProtos.JobFailurePolicyConstant.Builder constantPolicyBuilder = + CommonProtos.JobFailurePolicyConstant.newBuilder(); + ConstantFailurePolicy jobConstantFailurePolicy = (ConstantFailurePolicy)failurePolicy; + + if (jobConstantFailurePolicy.getMaxRetries() != null) { + constantPolicyBuilder.setMaxRetries(jobConstantFailurePolicy.getMaxRetries()); + } + + if (jobConstantFailurePolicy.getDurationBetweenRetries() != null) { + constantPolicyBuilder.setInterval(com.google.protobuf.Duration.newBuilder() + .setNanos(jobConstantFailurePolicy.getDurationBetweenRetries().getNano()).build()); + } + + jobFailurePolicyBuilder.setConstant(constantPolicyBuilder.build()); + + return jobFailurePolicyBuilder.build(); + } + /** * {@inheritDoc} */ diff --git a/sdk/src/main/java/io/dapr/client/domain/ConstantFailurePolicy.java b/sdk/src/main/java/io/dapr/client/domain/ConstantFailurePolicy.java new file mode 100644 index 000000000..a891b40e8 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/ConstantFailurePolicy.java @@ -0,0 +1,95 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +import java.time.Duration; + +/** + * A failure policy that applies a constant retry interval for job retries. + * This implementation of {@link FailurePolicy} retries a job a fixed number of times + * with a constant delay between each retry attempt. + */ +public class ConstantFailurePolicy implements FailurePolicy { + + private Integer maxRetries; + private Duration durationBetweenRetries; + + /** + * Constructs a {@code JobConstantFailurePolicy} with the specified maximum number of retries. + * + * @param maxRetries the maximum number of retries + */ + public ConstantFailurePolicy(Integer maxRetries) { + this.maxRetries = maxRetries; + } + + /** + * Constructs a {@code JobConstantFailurePolicy} with the specified duration between retries. + * + * @param durationBetweenRetries the duration to wait between retries + */ + public ConstantFailurePolicy(Duration durationBetweenRetries) { + this.durationBetweenRetries = durationBetweenRetries; + } + + /** + * Sets the duration to wait between retry attempts. + * + * @param durationBetweenRetries the duration between retries + * @return a {@code JobFailurePolicyConstant}. + */ + public ConstantFailurePolicy setDurationBetweenRetries(Duration durationBetweenRetries) { + this.durationBetweenRetries = durationBetweenRetries; + return this; + } + + /** + * Sets the maximum number of retries allowed. + * + * @param maxRetries the number of retries + * @return a {@code JobFailurePolicyConstant}. + */ + public ConstantFailurePolicy setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + /** + * Returns the configured duration between retry attempts. + * + * @return the duration between retries + */ + public Duration getDurationBetweenRetries() { + return this.durationBetweenRetries; + } + + /** + * Returns the configured maximum number of retries. + * + * @return the maximum number of retries + */ + public Integer getMaxRetries() { + return this.maxRetries; + } + + /** + * Returns the type of failure policy. + * + * @return {@link FailurePolicyType#CONSTANT} + */ + @Override + public FailurePolicyType getFailurePolicyType() { + return FailurePolicyType.CONSTANT; + } +} diff --git a/sdk/src/main/java/io/dapr/client/domain/DropFailurePolicy.java b/sdk/src/main/java/io/dapr/client/domain/DropFailurePolicy.java new file mode 100644 index 000000000..4ac602c9d --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/DropFailurePolicy.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +/** + * A failure policy that drops the job upon failure without retrying. + * This implementation of {@link FailurePolicy} immediately discards failed jobs + * instead of retrying them. + */ +public class DropFailurePolicy implements FailurePolicy { + + /** + * Returns the type of failure policy. + * + * @return {@link FailurePolicyType#DROP} + */ + @Override + public FailurePolicyType getFailurePolicyType() { + return FailurePolicyType.DROP; + } +} diff --git a/sdk/src/main/java/io/dapr/client/domain/FailurePolicy.java b/sdk/src/main/java/io/dapr/client/domain/FailurePolicy.java new file mode 100644 index 000000000..5e2f2a3d9 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/FailurePolicy.java @@ -0,0 +1,21 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +/** + * Set a failure policy for the job. + */ +public interface FailurePolicy { + FailurePolicyType getFailurePolicyType(); +} diff --git a/sdk/src/main/java/io/dapr/client/domain/FailurePolicyType.java b/sdk/src/main/java/io/dapr/client/domain/FailurePolicyType.java new file mode 100644 index 000000000..00071f039 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/FailurePolicyType.java @@ -0,0 +1,20 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +public enum FailurePolicyType { + DROP, + + CONSTANT +} diff --git a/sdk/src/main/java/io/dapr/client/domain/GetJobResponse.java b/sdk/src/main/java/io/dapr/client/domain/GetJobResponse.java index f79c0409a..ab0c9c582 100644 --- a/sdk/src/main/java/io/dapr/client/domain/GetJobResponse.java +++ b/sdk/src/main/java/io/dapr/client/domain/GetJobResponse.java @@ -25,6 +25,7 @@ public class GetJobResponse { private Instant dueTime; private Integer repeats; private Instant ttl; + private FailurePolicy failurePolicy; /** * Constructor to create GetJobResponse. @@ -110,6 +111,19 @@ public class GetJobResponse { return this; } + /** + * Sets the failure policy for the scheduled job. + * This defines how the job should behave in case of failure, such as retrying with a delay + * or dropping the job entirely. + * + * @param failurePolicy the {@link FailurePolicy} to apply to the job + * @return this {@code ScheduleJobRequest} instance for method chaining + */ + public GetJobResponse setFailurePolicy(FailurePolicy failurePolicy) { + this.failurePolicy = failurePolicy; + return this; + } + // Getters /** @@ -165,4 +179,13 @@ public class GetJobResponse { public Instant getTtl() { return ttl; } + + /** + * Gets the failure policy. + * + * @return FailurePolicy. + */ + public FailurePolicy getFailurePolicy() { + return failurePolicy; + } } diff --git a/sdk/src/main/java/io/dapr/client/domain/ScheduleJobRequest.java b/sdk/src/main/java/io/dapr/client/domain/ScheduleJobRequest.java index 6113bf649..c191a86b1 100644 --- a/sdk/src/main/java/io/dapr/client/domain/ScheduleJobRequest.java +++ b/sdk/src/main/java/io/dapr/client/domain/ScheduleJobRequest.java @@ -25,6 +25,7 @@ public class ScheduleJobRequest { private Instant dueTime; private Integer repeats; private Instant ttl; + private FailurePolicy failurePolicy; private boolean overwrite; /** @@ -111,7 +112,18 @@ public class ScheduleJobRequest { return this; } - // Getters + /** + * Sets the failure policy for the scheduled job. + * This defines how the job should behave in case of failure, such as retrying with a delay + * or dropping the job entirely. + * + * @param failurePolicy the {@link FailurePolicy} to apply to the job + * @return this {@code ScheduleJobRequest} instance for method chaining + */ + public ScheduleJobRequest setFailurePolicy(FailurePolicy failurePolicy) { + this.failurePolicy = failurePolicy; + return this; + } /** * Gets the name of the job. @@ -168,6 +180,15 @@ public class ScheduleJobRequest { } /** + * Gets the failure policy. + * + * @return FailurePolicy. + */ + public FailurePolicy getFailurePolicy() { + return failurePolicy; + } + + /* * Gets the overwrite flag. * * @return The overwrite flag. diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index b2922bf97..2db935855 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -25,6 +25,8 @@ import io.dapr.client.domain.CloudEvent; import io.dapr.client.domain.DeleteJobRequest; import io.dapr.client.domain.GetJobRequest; import io.dapr.client.domain.GetJobResponse; +import io.dapr.client.domain.ConstantFailurePolicy; +import io.dapr.client.domain.DropFailurePolicy; import io.dapr.client.domain.JobSchedule; import io.dapr.client.domain.ConversationInput; import io.dapr.client.domain.ConversationRequest; @@ -38,6 +40,7 @@ import io.dapr.client.domain.query.Query; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.TypeRef; +import io.dapr.v1.CommonProtos; import io.dapr.v1.DaprAppCallbackProtos; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; @@ -56,6 +59,7 @@ import reactor.core.publisher.Mono; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; @@ -817,6 +821,125 @@ public class DaprPreviewClientGrpcTest { assertEquals("Name in the request cannot be null or empty", exception.getMessage()); } + @Test + public void scheduleJobShouldHavePolicyWhenPolicyIsSet() { + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onCompleted(); // Simulate successful response + return null; + }).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any()); + + ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob", + JobSchedule.fromString("* * * * * *")) + .setFailurePolicy(new DropFailurePolicy()); + + previewClient.scheduleJob(expectedScheduleJobRequest).block(); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class); + + verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any()); + DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue(); + DaprProtos.Job job = actualScheduleJobRequest.getJob(); + assertEquals("testJob", job.getName()); + assertFalse(job.hasData()); + assertEquals( "* * * * * *", job.getSchedule()); + assertEquals(0, job.getRepeats()); + assertFalse(job.hasTtl()); + Assertions.assertTrue(job.hasFailurePolicy()); + } + + @Test + public void scheduleJobShouldHaveConstantPolicyWithMaxRetriesWhenConstantPolicyIsSetWithMaxRetries() { + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onCompleted(); // Simulate successful response + return null; + }).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any()); + + ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob", + JobSchedule.fromString("* * * * * *")) + .setFailurePolicy(new ConstantFailurePolicy(2)); + + previewClient.scheduleJob(expectedScheduleJobRequest).block(); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class); + + verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any()); + DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue(); + DaprProtos.Job job = actualScheduleJobRequest.getJob(); + assertEquals("testJob", job.getName()); + assertFalse(job.hasData()); + assertEquals( "* * * * * *", job.getSchedule()); + assertEquals(0, job.getRepeats()); + assertFalse(job.hasTtl()); + Assertions.assertTrue(job.hasFailurePolicy()); + assertEquals(2, job.getFailurePolicy().getConstant().getMaxRetries()); + } + + @Test + public void scheduleJobShouldHaveConstantPolicyWithIntervalWhenConstantPolicyIsSetWithInterval() { + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onCompleted(); // Simulate successful response + return null; + }).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any()); + + ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob", + JobSchedule.fromString("* * * * * *")) + .setFailurePolicy(new ConstantFailurePolicy(Duration.of(2, ChronoUnit.SECONDS))); + + previewClient.scheduleJob(expectedScheduleJobRequest).block(); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class); + + verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any()); + DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue(); + DaprProtos.Job job = actualScheduleJobRequest.getJob(); + assertEquals("testJob", job.getName()); + assertFalse(job.hasData()); + assertEquals( "* * * * * *", job.getSchedule()); + assertEquals(0, job.getRepeats()); + assertFalse(job.hasTtl()); + Assertions.assertTrue(job.hasFailurePolicy()); + assertEquals(Duration.of(2, ChronoUnit.SECONDS).getNano(), + job.getFailurePolicy().getConstant().getInterval().getNanos()); + } + + @Test + public void scheduleJobShouldHaveBothRetiresAndIntervalWhenConstantPolicyIsSetWithRetriesAndInterval() { + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onCompleted(); // Simulate successful response + return null; + }).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any()); + + ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob", + JobSchedule.fromString("* * * * * *")) + .setFailurePolicy(new ConstantFailurePolicy(Duration.of(2, ChronoUnit.SECONDS)) + .setMaxRetries(10)); + + previewClient.scheduleJob(expectedScheduleJobRequest).block(); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class); + + verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any()); + DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue(); + DaprProtos.Job job = actualScheduleJobRequest.getJob(); + assertEquals("testJob", job.getName()); + assertFalse(job.hasData()); + assertEquals( "* * * * * *", job.getSchedule()); + assertEquals(0, job.getRepeats()); + assertFalse(job.hasTtl()); + Assertions.assertTrue(job.hasFailurePolicy()); + assertEquals(Duration.of(2, ChronoUnit.SECONDS).getNano(), + job.getFailurePolicy().getConstant().getInterval().getNanos()); + assertEquals(10, job.getFailurePolicy().getConstant().getMaxRetries()); + } + @Test public void scheduleJobShouldThrowWhenNameAlreadyExists() { AtomicInteger callCount = new AtomicInteger(0); @@ -891,10 +1014,6 @@ public class DaprPreviewClientGrpcTest { assertEquals("testJob", secondActualRequest.getJob().getName()); } - - - - @Test public void getJobShouldReturnResponseWhenAllFieldsArePresentInRequest() { DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") @@ -993,6 +1112,154 @@ public class DaprPreviewClientGrpcTest { assertEquals(job.getDueTime(), datetime); } + @Test + public void getJobShouldReturnResponseWithDropFailurePolicySet() { + GetJobRequest getJobRequest = new GetJobRequest("testJob"); + + String datetime = OffsetDateTime.now().toString(); + DaprProtos.Job job = DaprProtos.Job.newBuilder() + .setName("testJob") + .setDueTime(datetime) + .setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder() + .setDrop(CommonProtos.JobFailurePolicyDrop.newBuilder().build()).build()) + .build(); + + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onNext(DaprProtos.GetJobResponse.newBuilder() + .setJob(job) + .build()); + observer.onCompleted(); + return null; + }).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any()); + + Mono resultMono = previewClient.getJob(getJobRequest); + + GetJobResponse response = resultMono.block(); + assertNotNull(response); + assertEquals("testJob", response.getName()); + assertNull(response.getData()); + assertNull(response.getSchedule()); + assertNull(response.getRepeats()); + assertNull(response.getTtl()); + assertEquals(job.getDueTime(), datetime); + assertTrue(job.hasFailurePolicy()); + assertTrue(job.getFailurePolicy().hasDrop()); + } + + @Test + public void getJobShouldReturnResponseWithConstantFailurePolicyAndMaxRetriesSet() { + GetJobRequest getJobRequest = new GetJobRequest("testJob"); + + String datetime = OffsetDateTime.now().toString(); + DaprProtos.Job job = DaprProtos.Job.newBuilder() + .setName("testJob") + .setDueTime(datetime) + .setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder() + .setConstant(CommonProtos.JobFailurePolicyConstant.newBuilder().setMaxRetries(2).build()).build()) + .build(); + + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onNext(DaprProtos.GetJobResponse.newBuilder() + .setJob(job) + .build()); + observer.onCompleted(); + return null; + }).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any()); + + Mono resultMono = previewClient.getJob(getJobRequest); + + GetJobResponse response = resultMono.block(); + assertNotNull(response); + assertEquals("testJob", response.getName()); + assertNull(response.getData()); + assertNull(response.getSchedule()); + assertNull(response.getRepeats()); + assertNull(response.getTtl()); + assertEquals(job.getDueTime(), datetime); + assertTrue(job.hasFailurePolicy()); + assertTrue(job.getFailurePolicy().hasConstant()); + assertEquals(2, job.getFailurePolicy().getConstant().getMaxRetries()); + } + + @Test + public void getJobShouldReturnResponseWithConstantFailurePolicyAndIntervalSet() { + GetJobRequest getJobRequest = new GetJobRequest("testJob"); + + String datetime = OffsetDateTime.now().toString(); + DaprProtos.Job job = DaprProtos.Job.newBuilder() + .setName("testJob") + .setDueTime(datetime) + .setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder() + .setConstant(CommonProtos.JobFailurePolicyConstant.newBuilder() + .setInterval(com.google.protobuf.Duration.newBuilder().setNanos(5).build()).build()).build()) + .build(); + + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onNext(DaprProtos.GetJobResponse.newBuilder() + .setJob(job) + .build()); + observer.onCompleted(); + return null; + }).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any()); + + Mono resultMono = previewClient.getJob(getJobRequest); + + GetJobResponse response = resultMono.block(); + assertNotNull(response); + assertEquals("testJob", response.getName()); + assertNull(response.getData()); + assertNull(response.getSchedule()); + assertNull(response.getRepeats()); + assertNull(response.getTtl()); + assertEquals(job.getDueTime(), datetime); + assertTrue(job.hasFailurePolicy()); + assertTrue(job.getFailurePolicy().hasConstant()); + assertEquals(5, job.getFailurePolicy().getConstant().getInterval().getNanos()); + } + + @Test + public void getJobShouldReturnResponseWithConstantFailurePolicyIntervalAndMaxRetriesSet() { + GetJobRequest getJobRequest = new GetJobRequest("testJob"); + + String datetime = OffsetDateTime.now().toString(); + DaprProtos.Job job = DaprProtos.Job.newBuilder() + .setName("testJob") + .setDueTime(datetime) + .setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder() + .setConstant(CommonProtos.JobFailurePolicyConstant.newBuilder() + .setMaxRetries(10) + .setInterval(com.google.protobuf.Duration.newBuilder().setNanos(5).build()).build()).build()) + .build(); + + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onNext(DaprProtos.GetJobResponse.newBuilder() + .setJob(job) + .build()); + observer.onCompleted(); + return null; + }).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any()); + + Mono resultMono = previewClient.getJob(getJobRequest); + + GetJobResponse response = resultMono.block(); + assertNotNull(response); + assertEquals("testJob", response.getName()); + assertNull(response.getData()); + assertNull(response.getSchedule()); + assertNull(response.getRepeats()); + assertNull(response.getTtl()); + assertEquals(job.getDueTime(), datetime); + assertTrue(job.hasFailurePolicy()); + assertTrue(job.getFailurePolicy().hasConstant()); + assertEquals(10, job.getFailurePolicy().getConstant().getMaxRetries()); + assertEquals(5, job.getFailurePolicy().getConstant().getInterval().getNanos()); + } + + @Test public void getJobShouldThrowWhenRequestIsNull() { IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> {