Merge branch 'master' into issue-1466-camelcase

This commit is contained in:
artur-ciocanu 2025-08-06 02:14:16 +03:00 committed by GitHub
commit 53ef5e9c84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 609 additions and 5 deletions

View File

@ -14,7 +14,10 @@ limitations under the License.
package io.dapr.it.testcontainers.jobs; package io.dapr.it.testcontainers.jobs;
import io.dapr.client.DaprPreviewClient; import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.ConstantFailurePolicy;
import io.dapr.client.domain.DeleteJobRequest; import io.dapr.client.domain.DeleteJobRequest;
import io.dapr.client.domain.DropFailurePolicy;
import io.dapr.client.domain.FailurePolicyType;
import io.dapr.client.domain.GetJobRequest; import io.dapr.client.domain.GetJobRequest;
import io.dapr.client.domain.GetJobResponse; import io.dapr.client.domain.GetJobResponse;
import io.dapr.client.domain.JobSchedule; import io.dapr.client.domain.JobSchedule;
@ -25,6 +28,7 @@ import io.dapr.testcontainers.DaprLogLevel;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.runner.notification.Failure;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@ -34,6 +38,7 @@ import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.junit.jupiter.Testcontainers;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
@ -97,6 +102,9 @@ public class DaprJobsIT {
GetJobResponse getJobResponse = GetJobResponse getJobResponse =
daprPreviewClient.getJob(new GetJobRequest("Job")).block(); daprPreviewClient.getJob(new GetJobRequest("Job")).block();
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
assertEquals("Job", getJobResponse.getName()); assertEquals("Job", getJobResponse.getName());
} }
@ -112,6 +120,9 @@ public class DaprJobsIT {
GetJobResponse getJobResponse = GetJobResponse getJobResponse =
daprPreviewClient.getJob(new GetJobRequest("Job")).block(); daprPreviewClient.getJob(new GetJobRequest("Job")).block();
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
assertEquals(JobSchedule.hourly().getExpression(), getJobResponse.getSchedule().getExpression()); assertEquals(JobSchedule.hourly().getExpression(), getJobResponse.getSchedule().getExpression());
assertEquals("Job", getJobResponse.getName()); assertEquals("Job", getJobResponse.getName());
@ -134,6 +145,9 @@ public class DaprJobsIT {
GetJobResponse getJobResponse = GetJobResponse getJobResponse =
daprPreviewClient.getJob(new GetJobRequest("Job")).block(); daprPreviewClient.getJob(new GetJobRequest("Job")).block();
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString());
assertEquals("2 * 3 * * FRI", getJobResponse.getSchedule().getExpression()); assertEquals("2 * 3 * * FRI", getJobResponse.getSchedule().getExpression());
assertEquals("Job", getJobResponse.getName()); assertEquals("Job", getJobResponse.getName());
@ -143,6 +157,57 @@ public class DaprJobsIT {
getJobResponse.getTtl().toString()); getJobResponse.getTtl().toString());
} }
@Test
public void testJobScheduleCreationWithDropFailurePolicy() {
Instant currentTime = Instant.now();
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC);
String cronExpression = "2 * 3 * * FRI";
daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime)
.setTtl(currentTime.plus(2, ChronoUnit.HOURS))
.setData("Job data".getBytes())
.setRepeat(3)
.setFailurePolicy(new DropFailurePolicy())
.setSchedule(JobSchedule.fromString(cronExpression))).block();
GetJobResponse getJobResponse =
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
assertEquals(FailurePolicyType.DROP, getJobResponse.getFailurePolicy().getFailurePolicyType());
}
@Test
public void testJobScheduleCreationWithConstantFailurePolicy() {
Instant currentTime = Instant.now();
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC);
String cronExpression = "2 * 3 * * FRI";
daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime)
.setTtl(currentTime.plus(2, ChronoUnit.HOURS))
.setData("Job data".getBytes())
.setRepeat(3)
.setFailurePolicy(new ConstantFailurePolicy(3)
.setDurationBetweenRetries(Duration.of(10, ChronoUnit.SECONDS)))
.setSchedule(JobSchedule.fromString(cronExpression))).block();
GetJobResponse getJobResponse =
daprPreviewClient.getJob(new GetJobRequest("Job")).block();
daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block();
ConstantFailurePolicy jobFailurePolicyConstant = (ConstantFailurePolicy) getJobResponse.getFailurePolicy();
assertEquals(FailurePolicyType.CONSTANT, getJobResponse.getFailurePolicy().getFailurePolicyType());
assertEquals(3, (int)jobFailurePolicyConstant.getMaxRetries());
assertEquals(Duration.of(10, ChronoUnit.SECONDS).getNano(),
jobFailurePolicyConstant.getDurationBetweenRetries().getNano());
}
@Test @Test
public void testDeleteJobRequest() { public void testDeleteJobRequest() {
Instant currentTime = Instant.now(); Instant currentTime = Instant.now();

View File

@ -27,6 +27,7 @@ import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.CloudEvent; import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.ComponentMetadata; import io.dapr.client.domain.ComponentMetadata;
import io.dapr.client.domain.ConfigurationItem; import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.ConstantFailurePolicy;
import io.dapr.client.domain.ConversationInput; import io.dapr.client.domain.ConversationInput;
import io.dapr.client.domain.ConversationOutput; import io.dapr.client.domain.ConversationOutput;
import io.dapr.client.domain.ConversationRequest; import io.dapr.client.domain.ConversationRequest;
@ -34,7 +35,10 @@ import io.dapr.client.domain.ConversationResponse;
import io.dapr.client.domain.DaprMetadata; import io.dapr.client.domain.DaprMetadata;
import io.dapr.client.domain.DeleteJobRequest; import io.dapr.client.domain.DeleteJobRequest;
import io.dapr.client.domain.DeleteStateRequest; import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.DropFailurePolicy;
import io.dapr.client.domain.ExecuteStateTransactionRequest; import io.dapr.client.domain.ExecuteStateTransactionRequest;
import io.dapr.client.domain.FailurePolicy;
import io.dapr.client.domain.FailurePolicyType;
import io.dapr.client.domain.GetBulkSecretRequest; import io.dapr.client.domain.GetBulkSecretRequest;
import io.dapr.client.domain.GetBulkStateRequest; import io.dapr.client.domain.GetBulkStateRequest;
import io.dapr.client.domain.GetConfigurationRequest; import io.dapr.client.domain.GetConfigurationRequest;
@ -105,6 +109,7 @@ import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -1336,6 +1341,10 @@ public class DaprClientImpl extends AbstractDaprClient {
scheduleJobRequestBuilder.setDueTime(iso8601Formatter.format(scheduleJobRequest.getDueTime())); scheduleJobRequestBuilder.setDueTime(iso8601Formatter.format(scheduleJobRequest.getDueTime()));
} }
if (scheduleJobRequest.getFailurePolicy() != null) {
scheduleJobRequestBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy()));
}
scheduleJobRequestBuilder.setOverwrite(scheduleJobRequest.getOverwrite()); scheduleJobRequestBuilder.setOverwrite(scheduleJobRequest.getOverwrite());
Mono<DaprProtos.ScheduleJobResponse> scheduleJobResponseMono = Mono<DaprProtos.ScheduleJobResponse> scheduleJobResponseMono =
@ -1380,6 +1389,10 @@ public class DaprClientImpl extends AbstractDaprClient {
getJobResponse = new GetJobResponse(job.getName(), Instant.parse(job.getDueTime())); getJobResponse = new GetJobResponse(job.getName(), Instant.parse(job.getDueTime()));
} }
if (job.hasFailurePolicy()) {
getJobResponse.setFailurePolicy(getJobFailurePolicy(job.getFailurePolicy()));
}
return getJobResponse return getJobResponse
.setTtl(job.hasTtl() ? Instant.parse(job.getTtl()) : null) .setTtl(job.hasTtl() ? Instant.parse(job.getTtl()) : null)
.setData(job.hasData() ? job.getData().getValue().toByteArray() : null) .setData(job.hasData() ? job.getData().getValue().toByteArray() : null)
@ -1390,6 +1403,53 @@ public class DaprClientImpl extends AbstractDaprClient {
} }
} }
private FailurePolicy getJobFailurePolicy(CommonProtos.JobFailurePolicy jobFailurePolicy) {
if (jobFailurePolicy.hasDrop()) {
return new DropFailurePolicy();
}
CommonProtos.JobFailurePolicyConstant jobFailurePolicyConstant = jobFailurePolicy.getConstant();
if (jobFailurePolicyConstant.hasInterval() && jobFailurePolicyConstant.hasMaxRetries()) {
return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries())
.setDurationBetweenRetries(Duration.of(jobFailurePolicyConstant.getInterval().getNanos(),
ChronoUnit.NANOS));
}
if (jobFailurePolicyConstant.hasMaxRetries()) {
return new ConstantFailurePolicy(jobFailurePolicyConstant.getMaxRetries());
}
return new ConstantFailurePolicy(
Duration.of(jobFailurePolicyConstant.getInterval().getNanos(),
ChronoUnit.NANOS));
}
private CommonProtos.JobFailurePolicy getJobFailurePolicy(FailurePolicy failurePolicy) {
CommonProtos.JobFailurePolicy.Builder jobFailurePolicyBuilder = CommonProtos.JobFailurePolicy.newBuilder();
if (failurePolicy.getFailurePolicyType() == FailurePolicyType.DROP) {
jobFailurePolicyBuilder.setDrop(CommonProtos.JobFailurePolicyDrop.newBuilder().build());
return jobFailurePolicyBuilder.build();
}
CommonProtos.JobFailurePolicyConstant.Builder constantPolicyBuilder =
CommonProtos.JobFailurePolicyConstant.newBuilder();
ConstantFailurePolicy jobConstantFailurePolicy = (ConstantFailurePolicy)failurePolicy;
if (jobConstantFailurePolicy.getMaxRetries() != null) {
constantPolicyBuilder.setMaxRetries(jobConstantFailurePolicy.getMaxRetries());
}
if (jobConstantFailurePolicy.getDurationBetweenRetries() != null) {
constantPolicyBuilder.setInterval(com.google.protobuf.Duration.newBuilder()
.setNanos(jobConstantFailurePolicy.getDurationBetweenRetries().getNano()).build());
}
jobFailurePolicyBuilder.setConstant(constantPolicyBuilder.build());
return jobFailurePolicyBuilder.build();
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */

View File

@ -0,0 +1,95 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import java.time.Duration;
/**
* A failure policy that applies a constant retry interval for job retries.
* This implementation of {@link FailurePolicy} retries a job a fixed number of times
* with a constant delay between each retry attempt.
*/
public class ConstantFailurePolicy implements FailurePolicy {
private Integer maxRetries;
private Duration durationBetweenRetries;
/**
* Constructs a {@code JobConstantFailurePolicy} with the specified maximum number of retries.
*
* @param maxRetries the maximum number of retries
*/
public ConstantFailurePolicy(Integer maxRetries) {
this.maxRetries = maxRetries;
}
/**
* Constructs a {@code JobConstantFailurePolicy} with the specified duration between retries.
*
* @param durationBetweenRetries the duration to wait between retries
*/
public ConstantFailurePolicy(Duration durationBetweenRetries) {
this.durationBetweenRetries = durationBetweenRetries;
}
/**
* Sets the duration to wait between retry attempts.
*
* @param durationBetweenRetries the duration between retries
* @return a {@code JobFailurePolicyConstant}.
*/
public ConstantFailurePolicy setDurationBetweenRetries(Duration durationBetweenRetries) {
this.durationBetweenRetries = durationBetweenRetries;
return this;
}
/**
* Sets the maximum number of retries allowed.
*
* @param maxRetries the number of retries
* @return a {@code JobFailurePolicyConstant}.
*/
public ConstantFailurePolicy setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
}
/**
* Returns the configured duration between retry attempts.
*
* @return the duration between retries
*/
public Duration getDurationBetweenRetries() {
return this.durationBetweenRetries;
}
/**
* Returns the configured maximum number of retries.
*
* @return the maximum number of retries
*/
public Integer getMaxRetries() {
return this.maxRetries;
}
/**
* Returns the type of failure policy.
*
* @return {@link FailurePolicyType#CONSTANT}
*/
@Override
public FailurePolicyType getFailurePolicyType() {
return FailurePolicyType.CONSTANT;
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
/**
* A failure policy that drops the job upon failure without retrying.
* This implementation of {@link FailurePolicy} immediately discards failed jobs
* instead of retrying them.
*/
public class DropFailurePolicy implements FailurePolicy {
/**
* Returns the type of failure policy.
*
* @return {@link FailurePolicyType#DROP}
*/
@Override
public FailurePolicyType getFailurePolicyType() {
return FailurePolicyType.DROP;
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
/**
* Set a failure policy for the job.
*/
public interface FailurePolicy {
FailurePolicyType getFailurePolicyType();
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
public enum FailurePolicyType {
DROP,
CONSTANT
}

View File

@ -25,6 +25,7 @@ public class GetJobResponse {
private Instant dueTime; private Instant dueTime;
private Integer repeats; private Integer repeats;
private Instant ttl; private Instant ttl;
private FailurePolicy failurePolicy;
/** /**
* Constructor to create GetJobResponse. * Constructor to create GetJobResponse.
@ -110,6 +111,19 @@ public class GetJobResponse {
return this; return this;
} }
/**
* Sets the failure policy for the scheduled job.
* This defines how the job should behave in case of failure, such as retrying with a delay
* or dropping the job entirely.
*
* @param failurePolicy the {@link FailurePolicy} to apply to the job
* @return this {@code ScheduleJobRequest} instance for method chaining
*/
public GetJobResponse setFailurePolicy(FailurePolicy failurePolicy) {
this.failurePolicy = failurePolicy;
return this;
}
// Getters // Getters
/** /**
@ -165,4 +179,13 @@ public class GetJobResponse {
public Instant getTtl() { public Instant getTtl() {
return ttl; return ttl;
} }
/**
* Gets the failure policy.
*
* @return FailurePolicy.
*/
public FailurePolicy getFailurePolicy() {
return failurePolicy;
}
} }

View File

@ -25,6 +25,7 @@ public class ScheduleJobRequest {
private Instant dueTime; private Instant dueTime;
private Integer repeats; private Integer repeats;
private Instant ttl; private Instant ttl;
private FailurePolicy failurePolicy;
private boolean overwrite; private boolean overwrite;
/** /**
@ -111,7 +112,18 @@ public class ScheduleJobRequest {
return this; return this;
} }
// Getters /**
* Sets the failure policy for the scheduled job.
* This defines how the job should behave in case of failure, such as retrying with a delay
* or dropping the job entirely.
*
* @param failurePolicy the {@link FailurePolicy} to apply to the job
* @return this {@code ScheduleJobRequest} instance for method chaining
*/
public ScheduleJobRequest setFailurePolicy(FailurePolicy failurePolicy) {
this.failurePolicy = failurePolicy;
return this;
}
/** /**
* Gets the name of the job. * Gets the name of the job.
@ -168,6 +180,15 @@ public class ScheduleJobRequest {
} }
/** /**
* Gets the failure policy.
*
* @return FailurePolicy.
*/
public FailurePolicy getFailurePolicy() {
return failurePolicy;
}
/*
* Gets the overwrite flag. * Gets the overwrite flag.
* *
* @return The overwrite flag. * @return The overwrite flag.

View File

@ -25,6 +25,8 @@ import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.DeleteJobRequest; import io.dapr.client.domain.DeleteJobRequest;
import io.dapr.client.domain.GetJobRequest; import io.dapr.client.domain.GetJobRequest;
import io.dapr.client.domain.GetJobResponse; import io.dapr.client.domain.GetJobResponse;
import io.dapr.client.domain.ConstantFailurePolicy;
import io.dapr.client.domain.DropFailurePolicy;
import io.dapr.client.domain.JobSchedule; import io.dapr.client.domain.JobSchedule;
import io.dapr.client.domain.ConversationInput; import io.dapr.client.domain.ConversationInput;
import io.dapr.client.domain.ConversationRequest; import io.dapr.client.domain.ConversationRequest;
@ -38,6 +40,7 @@ import io.dapr.client.domain.query.Query;
import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef; import io.dapr.utils.TypeRef;
import io.dapr.v1.CommonProtos;
import io.dapr.v1.DaprAppCallbackProtos; import io.dapr.v1.DaprAppCallbackProtos;
import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos; import io.dapr.v1.DaprProtos;
@ -56,6 +59,7 @@ import reactor.core.publisher.Mono;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
@ -817,6 +821,125 @@ public class DaprPreviewClientGrpcTest {
assertEquals("Name in the request cannot be null or empty", exception.getMessage()); assertEquals("Name in the request cannot be null or empty", exception.getMessage());
} }
@Test
public void scheduleJobShouldHavePolicyWhenPolicyIsSet() {
doAnswer(invocation -> {
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
observer.onCompleted(); // Simulate successful response
return null;
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
JobSchedule.fromString("* * * * * *"))
.setFailurePolicy(new DropFailurePolicy());
previewClient.scheduleJob(expectedScheduleJobRequest).block();
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
DaprProtos.Job job = actualScheduleJobRequest.getJob();
assertEquals("testJob", job.getName());
assertFalse(job.hasData());
assertEquals( "* * * * * *", job.getSchedule());
assertEquals(0, job.getRepeats());
assertFalse(job.hasTtl());
Assertions.assertTrue(job.hasFailurePolicy());
}
@Test
public void scheduleJobShouldHaveConstantPolicyWithMaxRetriesWhenConstantPolicyIsSetWithMaxRetries() {
doAnswer(invocation -> {
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
observer.onCompleted(); // Simulate successful response
return null;
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
JobSchedule.fromString("* * * * * *"))
.setFailurePolicy(new ConstantFailurePolicy(2));
previewClient.scheduleJob(expectedScheduleJobRequest).block();
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
DaprProtos.Job job = actualScheduleJobRequest.getJob();
assertEquals("testJob", job.getName());
assertFalse(job.hasData());
assertEquals( "* * * * * *", job.getSchedule());
assertEquals(0, job.getRepeats());
assertFalse(job.hasTtl());
Assertions.assertTrue(job.hasFailurePolicy());
assertEquals(2, job.getFailurePolicy().getConstant().getMaxRetries());
}
@Test
public void scheduleJobShouldHaveConstantPolicyWithIntervalWhenConstantPolicyIsSetWithInterval() {
doAnswer(invocation -> {
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
observer.onCompleted(); // Simulate successful response
return null;
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
JobSchedule.fromString("* * * * * *"))
.setFailurePolicy(new ConstantFailurePolicy(Duration.of(2, ChronoUnit.SECONDS)));
previewClient.scheduleJob(expectedScheduleJobRequest).block();
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
DaprProtos.Job job = actualScheduleJobRequest.getJob();
assertEquals("testJob", job.getName());
assertFalse(job.hasData());
assertEquals( "* * * * * *", job.getSchedule());
assertEquals(0, job.getRepeats());
assertFalse(job.hasTtl());
Assertions.assertTrue(job.hasFailurePolicy());
assertEquals(Duration.of(2, ChronoUnit.SECONDS).getNano(),
job.getFailurePolicy().getConstant().getInterval().getNanos());
}
@Test
public void scheduleJobShouldHaveBothRetiresAndIntervalWhenConstantPolicyIsSetWithRetriesAndInterval() {
doAnswer(invocation -> {
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
observer.onCompleted(); // Simulate successful response
return null;
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
JobSchedule.fromString("* * * * * *"))
.setFailurePolicy(new ConstantFailurePolicy(Duration.of(2, ChronoUnit.SECONDS))
.setMaxRetries(10));
previewClient.scheduleJob(expectedScheduleJobRequest).block();
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
DaprProtos.Job job = actualScheduleJobRequest.getJob();
assertEquals("testJob", job.getName());
assertFalse(job.hasData());
assertEquals( "* * * * * *", job.getSchedule());
assertEquals(0, job.getRepeats());
assertFalse(job.hasTtl());
Assertions.assertTrue(job.hasFailurePolicy());
assertEquals(Duration.of(2, ChronoUnit.SECONDS).getNano(),
job.getFailurePolicy().getConstant().getInterval().getNanos());
assertEquals(10, job.getFailurePolicy().getConstant().getMaxRetries());
}
@Test @Test
public void scheduleJobShouldThrowWhenNameAlreadyExists() { public void scheduleJobShouldThrowWhenNameAlreadyExists() {
AtomicInteger callCount = new AtomicInteger(0); AtomicInteger callCount = new AtomicInteger(0);
@ -891,10 +1014,6 @@ public class DaprPreviewClientGrpcTest {
assertEquals("testJob", secondActualRequest.getJob().getName()); assertEquals("testJob", secondActualRequest.getJob().getName());
} }
@Test @Test
public void getJobShouldReturnResponseWhenAllFieldsArePresentInRequest() { public void getJobShouldReturnResponseWhenAllFieldsArePresentInRequest() {
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
@ -993,6 +1112,154 @@ public class DaprPreviewClientGrpcTest {
assertEquals(job.getDueTime(), datetime); assertEquals(job.getDueTime(), datetime);
} }
@Test
public void getJobShouldReturnResponseWithDropFailurePolicySet() {
GetJobRequest getJobRequest = new GetJobRequest("testJob");
String datetime = OffsetDateTime.now().toString();
DaprProtos.Job job = DaprProtos.Job.newBuilder()
.setName("testJob")
.setDueTime(datetime)
.setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder()
.setDrop(CommonProtos.JobFailurePolicyDrop.newBuilder().build()).build())
.build();
doAnswer(invocation -> {
StreamObserver<DaprProtos.GetJobResponse> observer = invocation.getArgument(1);
observer.onNext(DaprProtos.GetJobResponse.newBuilder()
.setJob(job)
.build());
observer.onCompleted();
return null;
}).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any());
Mono<GetJobResponse> resultMono = previewClient.getJob(getJobRequest);
GetJobResponse response = resultMono.block();
assertNotNull(response);
assertEquals("testJob", response.getName());
assertNull(response.getData());
assertNull(response.getSchedule());
assertNull(response.getRepeats());
assertNull(response.getTtl());
assertEquals(job.getDueTime(), datetime);
assertTrue(job.hasFailurePolicy());
assertTrue(job.getFailurePolicy().hasDrop());
}
@Test
public void getJobShouldReturnResponseWithConstantFailurePolicyAndMaxRetriesSet() {
GetJobRequest getJobRequest = new GetJobRequest("testJob");
String datetime = OffsetDateTime.now().toString();
DaprProtos.Job job = DaprProtos.Job.newBuilder()
.setName("testJob")
.setDueTime(datetime)
.setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder()
.setConstant(CommonProtos.JobFailurePolicyConstant.newBuilder().setMaxRetries(2).build()).build())
.build();
doAnswer(invocation -> {
StreamObserver<DaprProtos.GetJobResponse> observer = invocation.getArgument(1);
observer.onNext(DaprProtos.GetJobResponse.newBuilder()
.setJob(job)
.build());
observer.onCompleted();
return null;
}).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any());
Mono<GetJobResponse> resultMono = previewClient.getJob(getJobRequest);
GetJobResponse response = resultMono.block();
assertNotNull(response);
assertEquals("testJob", response.getName());
assertNull(response.getData());
assertNull(response.getSchedule());
assertNull(response.getRepeats());
assertNull(response.getTtl());
assertEquals(job.getDueTime(), datetime);
assertTrue(job.hasFailurePolicy());
assertTrue(job.getFailurePolicy().hasConstant());
assertEquals(2, job.getFailurePolicy().getConstant().getMaxRetries());
}
@Test
public void getJobShouldReturnResponseWithConstantFailurePolicyAndIntervalSet() {
GetJobRequest getJobRequest = new GetJobRequest("testJob");
String datetime = OffsetDateTime.now().toString();
DaprProtos.Job job = DaprProtos.Job.newBuilder()
.setName("testJob")
.setDueTime(datetime)
.setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder()
.setConstant(CommonProtos.JobFailurePolicyConstant.newBuilder()
.setInterval(com.google.protobuf.Duration.newBuilder().setNanos(5).build()).build()).build())
.build();
doAnswer(invocation -> {
StreamObserver<DaprProtos.GetJobResponse> observer = invocation.getArgument(1);
observer.onNext(DaprProtos.GetJobResponse.newBuilder()
.setJob(job)
.build());
observer.onCompleted();
return null;
}).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any());
Mono<GetJobResponse> resultMono = previewClient.getJob(getJobRequest);
GetJobResponse response = resultMono.block();
assertNotNull(response);
assertEquals("testJob", response.getName());
assertNull(response.getData());
assertNull(response.getSchedule());
assertNull(response.getRepeats());
assertNull(response.getTtl());
assertEquals(job.getDueTime(), datetime);
assertTrue(job.hasFailurePolicy());
assertTrue(job.getFailurePolicy().hasConstant());
assertEquals(5, job.getFailurePolicy().getConstant().getInterval().getNanos());
}
@Test
public void getJobShouldReturnResponseWithConstantFailurePolicyIntervalAndMaxRetriesSet() {
GetJobRequest getJobRequest = new GetJobRequest("testJob");
String datetime = OffsetDateTime.now().toString();
DaprProtos.Job job = DaprProtos.Job.newBuilder()
.setName("testJob")
.setDueTime(datetime)
.setFailurePolicy(CommonProtos.JobFailurePolicy.newBuilder()
.setConstant(CommonProtos.JobFailurePolicyConstant.newBuilder()
.setMaxRetries(10)
.setInterval(com.google.protobuf.Duration.newBuilder().setNanos(5).build()).build()).build())
.build();
doAnswer(invocation -> {
StreamObserver<DaprProtos.GetJobResponse> observer = invocation.getArgument(1);
observer.onNext(DaprProtos.GetJobResponse.newBuilder()
.setJob(job)
.build());
observer.onCompleted();
return null;
}).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any());
Mono<GetJobResponse> resultMono = previewClient.getJob(getJobRequest);
GetJobResponse response = resultMono.block();
assertNotNull(response);
assertEquals("testJob", response.getName());
assertNull(response.getData());
assertNull(response.getSchedule());
assertNull(response.getRepeats());
assertNull(response.getTtl());
assertEquals(job.getDueTime(), datetime);
assertTrue(job.hasFailurePolicy());
assertTrue(job.getFailurePolicy().hasConstant());
assertEquals(10, job.getFailurePolicy().getConstant().getMaxRetries());
assertEquals(5, job.getFailurePolicy().getConstant().getInterval().getNanos());
}
@Test @Test
public void getJobShouldThrowWhenRequestIsNull() { public void getJobShouldThrowWhenRequestIsNull() {
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> {