Workflow Implementation - Continues... (#880)

* Add ElementType.Type to ActorType (#812)

Signed-off-by: LionTao <taojiachun980831@163.com>

Signed-off-by: LionTao <taojiachun980831@163.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Bump codecov/codecov-action from 3.1.0 to 3.1.1 (#788)

Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 3.1.0 to 3.1.1.
- [Release notes](https://github.com/codecov/codecov-action/releases)
- [Changelog](https://github.com/codecov/codecov-action/blob/master/CHANGELOG.md)
- [Commits](https://github.com/codecov/codecov-action/compare/v3.1.0...v3.1.1)

---
updated-dependencies:
- dependency-name: codecov/codecov-action
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Update springboot to latest minor.patch version. (#826)

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Use runtime 1.10.0-rc.X and CLI 1.10.0-rc.X (#827)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Upgrade the version to 1.9.0-SNAPSHOT (#829)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Generate updated javadocs for 1.8.0 (#836)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Update Dapr runtime and CLI to 1.10. (#837)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Inject autoconfiguration in the Spring Boot 3 style (#831)

* Bump from spring boot 2.3.5.RELEASE to 2.7.8

Signed-off-by: Sergio <champel@gmail.com>
(cherry picked from commit 9152c91bc1f08ecf2dd3bccf8159fd5d0500e351)

* Ensure old versions of spring boot are still compatible

Signed-off-by: Sergio <champel@gmail.com>

---------

Signed-off-by: champel <champel@gmail.com>
Signed-off-by: Sergio <champel@gmail.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Bump from reactor 2.3.5.RELEASE to 2.7.8 (#830)

* Bump from reactor 2.3.5.RELEASE to 2.7.8

Signed-off-by: Sergio <champel@gmail.com>

* Simplification

Signed-off-by: Sergio <champel@gmail.com>

---------

Signed-off-by: Sergio <champel@gmail.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Test multiple reminder state types + improve timer tests. (#855)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Convert Config API to Stable endpoints. (#846)

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Add PubSub subscriber examples over gPRC (#833)

* add grpc subscriber

Signed-off-by: MregXN <mregxn@gmail.com>

* modify README.md

Signed-off-by: MregXN <mregxn@gmail.com>

* modify README.md in examples

Signed-off-by: MregXN <mregxn@gmail.com>

* Modify DaprApplication to support examples where protocol is not specified.

Signed-off-by: MregXN <mregxn@gmail.com>

* modify formatter to pass checkstyle

Signed-off-by: MregXN <mregxn@gmail.com>

* Update springboot to latest minor.patch version. (#826)

Signed-off-by: MregXN <mregxn@gmail.com>

* Use runtime 1.10.0-rc.X and CLI 1.10.0-rc.X (#827)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: MregXN <mregxn@gmail.com>

* Upgrade the version to 1.9.0-SNAPSHOT (#829)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: MregXN <mregxn@gmail.com>

* Generate updated javadocs for 1.8.0 (#836)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: MregXN <mregxn@gmail.com>

* Update Dapr runtime and CLI to 1.10. (#837)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: MregXN <mregxn@gmail.com>

* Inject autoconfiguration in the Spring Boot 3 style (#831)

* Bump from spring boot 2.3.5.RELEASE to 2.7.8

Signed-off-by: Sergio <champel@gmail.com>
(cherry picked from commit 9152c91bc1f08ecf2dd3bccf8159fd5d0500e351)

* Ensure old versions of spring boot are still compatible

Signed-off-by: Sergio <champel@gmail.com>

---------

Signed-off-by: champel <champel@gmail.com>
Signed-off-by: Sergio <champel@gmail.com>
Signed-off-by: MregXN <mregxn@gmail.com>

* Bump from reactor 2.3.5.RELEASE to 2.7.8 (#830)

* Bump from reactor 2.3.5.RELEASE to 2.7.8

Signed-off-by: Sergio <champel@gmail.com>

* Simplification

Signed-off-by: Sergio <champel@gmail.com>

---------

Signed-off-by: Sergio <champel@gmail.com>
Signed-off-by: MregXN <mregxn@gmail.com>

* rerun checks

Signed-off-by: MregXN <mregxn@gmail.com>

* modify the way of grpc server starts

Signed-off-by: MregXN <mregxn@gmail.com>

* modify README

Signed-off-by: MregXN <mregxn@gmail.com>

* Update pom.xml

Signed-off-by: MregXN <46479059+MregXN@users.noreply.github.com>

---------

Signed-off-by: MregXN <mregxn@gmail.com>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: champel <champel@gmail.com>
Signed-off-by: Sergio <champel@gmail.com>
Signed-off-by: MregXN <46479059+MregXN@users.noreply.github.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Co-authored-by: champel <champel@gmail.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* auto validate actors (#863)

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Bump codecov/codecov-action from 3.1.1 to 3.1.4 (#862)

Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 3.1.1 to 3.1.4.
- [Release notes](https://github.com/codecov/codecov-action/releases)
- [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md)
- [Commits](https://github.com/codecov/codecov-action/compare/v3.1.1...v3.1.4)

---
updated-dependencies:
- dependency-name: codecov/codecov-action
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Fix 787 (#832)

* prepare before testing

* Update tests

* fix checkstyle

---------

Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Upgrade to 1.11 RCs. (#867)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Init for workflows

Signed-off-by: Bill DeRusha <billderusha@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Updating some javadocs and Years.

Signed-off-by: Hannah Kennedy <hakenned@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Add missing Header

Signed-off-by: Hannah Kennedy <hakenned@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* respond to PR feedback

Signed-off-by: Bill DeRusha <444835+bderusha@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Update workflow example README

Signed-off-by: Bill DeRusha <444835+bderusha@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Address PR feedback

Signed-off-by: Bill DeRusha <444835+bderusha@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* fixup deprecated pom.xml variable

Signed-off-by: Bill DeRusha <444835+bderusha@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Updates based on PR feedback

Signed-off-by: Bill DeRusha <444835+bderusha@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Update pom files per feedback

Signed-off-by: Bill DeRusha <444835+bderusha@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* GetInstanceState implementation (#1)

* addiny getInstanceMetadata, waitForInstanceStart and waitForInstanceCompletion implementation
---------

Co-authored-by: aymanmahmoud_microsoft <aymanmahmoud@microsoft.com>
Signed-off-by: Aymand Mahmoud <aymanmahmoud@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Management API

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* remove try/catch

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* implementing getIsReplaying() method for Authoring API (#7)

Co-authored-by: Julio Rezende <jsilvarezend@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>
Signed-off-by: Julio Rezende <jsilvarezend@microsoft.com>

* Implementing getCurrentInstant() authoring method (#5)

Co-authored-by: Julio Rezende <jsilvarezend@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>
Signed-off-by: Julio Rezende <jsilvarezend@microsoft.com>

* Activity Implementation (#3)

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* fixing issue with getIsReplaying() call (#8)

Co-authored-by: Julio Rezende <jsilvarezend@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>
Signed-off-by: Julio Rezende <jsilvarezend@microsoft.com>

* Generate updated javadocs for 1.9.0 (#878)

* Generate updated javadocs for 1.9.0

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Update _index.md

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Add .sdkmanrc config file and JDK installation instructions (#873)

* Add .sdkmanrc file with installation instructions

Signed-off-by: Emanuel Alves <emanuel.j.b.alves@gmail.com>

* Update README.md

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Emanuel Alves <emanuel.j.b.alves@gmail.com>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Add unit testing example

Signed-off-by: Bill DeRusha <444835+bderusha@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* implementing getIsReplaying() method for Authoring API (#7)

Co-authored-by: Julio Rezende <jsilvarezend@microsoft.com>

Signed-off-by: Julio Rezende <jsilvarezend@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* fix parent pom

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Send Event Implementation (#10)

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Implementing allOf, anyOf, createTimer methods (#11)

Co-authored-by: Julio Rezende <jsilvarezend@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>
Co-authored-by: Julio Rezende <jsilvarezend@microsoft.com>

* Support remote endpoint. (#877)

* Support remote endpoint.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Use GRPC_ENDPOINT and HTTP_ENDPOINT in integration tests.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix happy path for waiting for sidecar test.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Add callSubWorkflow Implementation

Co-authored-by: Aymand Mahmoud <aymanmahmoud@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>
Signed-off-by: Aymand Mahmoud <aymanmahmoud@microsoft.com>

* rename DemoSubWorkflow

Co-authored-by: Aymand Mahmoud <aymanmahmoud@microsoft.com>
Signed-off-by: Aymand Mahmoud <aymanmahmoud@microsoft.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* continueAsNew Implementation (#13)

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* remove duplicate class

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* add missing mockito test dependency

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* use new workflow client implementation

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* moved implementations to new workflow and context

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* relocate duplicate implemantation

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* remove duplicate test and increase test coverage

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* Implement retry and timeout policy for gRPC client. (#889)

* Implement retry and timeout policy for gRPC client.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix invoke actor after aborted flow.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* renamed getIsReplaying

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* rollback changes on client

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* move workflow runtime state package

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* rename workflow instance state to status

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* remove unnecessary else

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* removed unknown state

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* updated comment

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* updated workflow failure details

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* fix style issues

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* rollback merge change

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* fixed pom files

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* rollback actors pom changes on autoformat

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* fixe actors pom

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* fix styling on actors pom

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* fix pom spacing

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* move test to match the package

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* add missing dependencies

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* increased test coverage

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* moved workflow runtime package

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* add exception for missing case

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* add null check for metadata

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* add runtime exception error messages

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* update try catch scope

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* update activity definition to an interface

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* update comments

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* removed redundant method

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

* PR updates

Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>

---------

Signed-off-by: LionTao <taojiachun980831@163.com>
Signed-off-by: Mahmut Canga <cangamahmut@gmail.com>
Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: champel <champel@gmail.com>
Signed-off-by: Sergio <champel@gmail.com>
Signed-off-by: MregXN <mregxn@gmail.com>
Signed-off-by: MregXN <46479059+MregXN@users.noreply.github.com>
Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Signed-off-by: Bill DeRusha <billderusha@microsoft.com>
Signed-off-by: Hannah Kennedy <hakenned@microsoft.com>
Signed-off-by: Bill DeRusha <444835+bderusha@users.noreply.github.com>
Signed-off-by: Aymand Mahmoud <aymanmahmoud@microsoft.com>
Signed-off-by: Julio Rezende <jsilvarezend@microsoft.com>
Signed-off-by: Emanuel Alves <emanuel.j.b.alves@gmail.com>
Signed-off-by: Mahmut Canga <macromania@users.noreply.github.com>
Co-authored-by: LionTao <taojiachun980831@163.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: champel <champel@gmail.com>
Co-authored-by: MregXN <46479059+MregXN@users.noreply.github.com>
Co-authored-by: MatejNedic <matejnedic1@gmail.com>
Co-authored-by: Bill DeRusha <billderusha@microsoft.com>
Co-authored-by: Hannah Kennedy <hakenned@microsoft.com>
Co-authored-by: Bill DeRusha <444835+bderusha@users.noreply.github.com>
Co-authored-by: Aymalla <Aymalla@outlook.com>
Co-authored-by: aymanmahmoud_microsoft <aymanmahmoud@microsoft.com>
Co-authored-by: swetakumari <swetakumari@microsoft.com>
Co-authored-by: julio <107879411+julioalex-rezende@users.noreply.github.com>
Co-authored-by: Julio Rezende <jsilvarezend@microsoft.com>
Co-authored-by: Emanuel Alves <emanuel.j.b.alves@gmail.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
Mahmut Canga 2023-09-13 07:12:38 +01:00 committed by GitHub
parent 85ffe8ed03
commit 056aed4d98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 2290 additions and 99 deletions

View File

@ -0,0 +1,34 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.workflows;
public class DemoActivityInput {
private String message;
public DemoActivityInput() {
}
public DemoActivityInput(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.workflows;
public class DemoActivityOutput {
private String originalMessage;
private String newMessage;
public DemoActivityOutput() {
}
public DemoActivityOutput(String originalMessage, String newMessage) {
this.originalMessage = originalMessage;
this.newMessage = newMessage;
}
public String getOriginalMessage() {
return originalMessage;
}
public void setOriginalMessage(String originalMessage) {
this.originalMessage = originalMessage;
}
public String getNewMessage() {
return newMessage;
}
public void setNewMessage(String newMessage) {
this.newMessage = newMessage;
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.workflows;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
/**
* Implementation of the DemoWorkflow for the server side.
*/
public class DemoSubWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
var logger = ctx.getLogger();
logger.info("Child-Workflow> Started: " + ctx.getName());
logger.info("Child-Workflow> Instance ID: " + ctx.getInstanceId());
logger.info("Child-Workflow> Current Time: " + ctx.getCurrentInstant());
var input = ctx.getInput(String.class);
logger.info("Child-Workflow> Input: " + input);
logger.info("Child-Workflow> Completed");
ctx.complete("result: " + input);
};
}
}

View File

@ -13,31 +13,114 @@ limitations under the License.
package io.dapr.examples.workflows;
import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
/**
* Implementation of the DemoWorkflow for the server side.
*/
public class DemoWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
ctx.getLogger().info("Instance ID: " + ctx.getInstanceId());
ctx.getLogger().info("Waiting for event: 'myEvent'...");
ctx.getLogger().info("Current Orchestration Time: " + ctx.getCurrentInstant());
ctx.getLogger().info("Waiting for event: 'TimedOutEvent'...");
try {
ctx.waitForExternalEvent("myEvent", Duration.ofSeconds(10)).await();
ctx.getLogger().info("Received!");
ctx.waitForExternalEvent("TimedOutEvent", Duration.ofSeconds(10)).await();
} catch (TaskCanceledException e) {
ctx.getLogger().warn("Timed out");
ctx.getLogger().warn(e.getMessage());
}
ctx.complete("finished");
ctx.getLogger().info("Waiting for event: 'TestEvent'...");
try {
ctx.waitForExternalEvent("TestEvent", Duration.ofSeconds(10)).await();
ctx.getLogger().info("Received TestEvent");
} catch (TaskCanceledException e) {
ctx.getLogger().warn("Timed out");
ctx.getLogger().warn(e.getMessage());
}
ctx.getLogger().info("Parallel Execution - Waiting for all tasks to finish...");
try {
Task<String> t1 = ctx.waitForExternalEvent("event1", Duration.ofSeconds(5), String.class);
Task<String> t2 = ctx.waitForExternalEvent("event2", Duration.ofSeconds(5), String.class);
Task<String> t3 = ctx.waitForExternalEvent("event3", Duration.ofSeconds(5), String.class);
List<String> results = ctx.allOf(Arrays.asList(t1, t2, t3)).await();
results.forEach(t -> ctx.getLogger().info("finished task: " + t));
ctx.getLogger().info("All tasks finished!");
} catch (CompositeTaskFailedException e) {
ctx.getLogger().warn(e.getMessage());
List<Exception> exceptions = e.getExceptions();
exceptions.forEach(ex -> ctx.getLogger().warn(ex.getMessage()));
}
ctx.getLogger().info("Parallel Execution - Waiting for any task to finish...");
try {
Task<String> e1 = ctx.waitForExternalEvent("e1", Duration.ofSeconds(5), String.class);
Task<String> e2 = ctx.waitForExternalEvent("e2", Duration.ofSeconds(5), String.class);
Task<String> e3 = ctx.waitForExternalEvent("e3", Duration.ofSeconds(5), String.class);
Task<Void> timeoutTask = ctx.createTimer(Duration.ofSeconds(1));
Task<?> winner = ctx.anyOf(Arrays.asList(e1, e2, e3, timeoutTask)).await();
if (winner == timeoutTask) {
ctx.getLogger().info("All tasks timed out!");
} else {
ctx.getLogger().info("One of the tasks finished!");
}
} catch (TaskCanceledException e) {
ctx.getLogger().warn("Timed out");
ctx.getLogger().warn(e.getMessage());
}
ctx.getLogger().info("Calling Activity...");
var input = new DemoActivityInput("Hello Activity!");
var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await();
ctx.getLogger().info("Activity returned: " + output);
ctx.getLogger().info("Activity returned: " + output.getNewMessage());
ctx.getLogger().info("Activity returned: " + output.getOriginalMessage());
boolean shouldComplete = true;
ctx.getLogger().info("Waiting for event: 'RestartEvent'...");
try {
ctx.waitForExternalEvent("RestartEvent", Duration.ofSeconds(10)).await();
ctx.getLogger().info("Received RestartEvent");
ctx.getLogger().info("Restarting Workflow by calling continueAsNew...");
ctx.continueAsNew("TestInputRestart", false);
shouldComplete = false;
} catch (TaskCanceledException e) {
ctx.getLogger().warn("Restart Timed out");
ctx.getLogger().warn(e.getMessage());
}
if (shouldComplete) {
ctx.getLogger().info("Child-Workflow> Calling ChildWorkflow...");
var childWorkflowInput = "Hello ChildWorkflow!";
var childWorkflowOutput =
ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), childWorkflowInput, String.class).await();
ctx.getLogger().info("Child-Workflow> returned: " + childWorkflowOutput);
ctx.getLogger().info("Workflow finished");
ctx.complete("finished");
return;
}
ctx.getLogger().info("Workflow restarted");
};
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.workflows;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import io.dapr.workflows.runtime.WorkflowActivity;
import io.dapr.workflows.runtime.WorkflowActivityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class DemoWorkflowActivity implements WorkflowActivity {
@Override
public DemoActivityOutput run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(DemoWorkflowActivity.class);
logger.info("Starting Activity: " + ctx.getName());
var message = ctx.getInput(DemoActivityInput.class).getMessage();
var newMessage = message + " World!, from Activity";
logger.info("Message Received from input: " + message);
logger.info("Sending message to output: " + newMessage);
logger.info("Sleeping for 5 seconds to simulate long running operation...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("Activity finished");
var output = new DemoActivityOutput(message, newMessage);
logger.info("Activity returned: " + output);
return output;
}
}

View File

@ -14,8 +14,11 @@ limitations under the License.
package io.dapr.examples.workflows;
import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* For setup instructions, see the README.
@ -24,6 +27,7 @@ public class DemoWorkflowClient {
/**
* The main method.
*
* @param args Input arguments (unused).
* @throws InterruptedException If program has been interrupted.
*/
@ -31,14 +35,67 @@ public class DemoWorkflowClient {
DaprWorkflowClient client = new DaprWorkflowClient();
try (client) {
System.out.println("*****");
String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class);
String separatorStr = "*******";
System.out.println(separatorStr);
String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
System.out.printf("Started new workflow instance with random ID: %s%n", instanceId);
System.out.println("Sleep and allow this workflow instance to timeout...");
TimeUnit.SECONDS.sleep(10);
System.out.println(separatorStr);
System.out.println("**GetInstanceMetadata:Running Workflow**");
WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true);
System.out.printf("Result: %s%n", workflowMetadata);
System.out.println("*****");
System.out.println(separatorStr);
System.out.println("**WaitForInstanceStart**");
try {
WorkflowInstanceStatus waitForInstanceStartResult =
client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
System.out.printf("Result: %s%n", waitForInstanceStartResult);
} catch (TimeoutException ex) {
System.out.printf("waitForInstanceStart has an exception:%s%n", ex);
}
System.out.println(separatorStr);
System.out.println("**SendExternalMessage**");
client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");
System.out.println(separatorStr);
System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **");
client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");
System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId);
System.out.println(separatorStr);
System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **");
client.raiseEvent(instanceId, "e2", "event 2 Payload");
System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId);
System.out.println(separatorStr);
System.out.println("**WaitForInstanceCompletion**");
try {
WorkflowInstanceStatus waitForInstanceCompletionResult =
client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
System.out.printf("Result: %s%n", waitForInstanceCompletionResult);
} catch (TimeoutException ex) {
System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex);
}
System.out.println(separatorStr);
System.out.println("**purgeInstance**");
boolean purgeResult = client.purgeInstance(instanceId);
System.out.printf("purgeResult: %s%n", purgeResult);
System.out.println(separatorStr);
System.out.println("**raiseEvent**");
String eventInstanceId = client.scheduleNewWorkflow(DemoWorkflow.class);
System.out.printf("Started new workflow instance with random ID: %s%n", eventInstanceId);
client.raiseEvent(eventInstanceId, "TestException", null);
System.out.printf("Event raised for workflow with instanceId: %s\n", eventInstanceId);
System.out.println(separatorStr);
String instanceToTerminateId = "terminateMe";
client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId);
System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId);
@ -46,7 +103,20 @@ public class DemoWorkflowClient {
TimeUnit.SECONDS.sleep(5);
System.out.println("Terminate this workflow instance manually before the timeout is reached");
client.terminateWorkflow(instanceToTerminateId, null);
System.out.println("*****");
System.out.println(separatorStr);
String restartingInstanceId = "restarting";
client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId);
System.out.printf("Started new workflow instance with ID: %s%n", restartingInstanceId);
System.out.println("Sleeping 30 seconds to restart the workflow");
TimeUnit.SECONDS.sleep(30);
System.out.println("**SendExternalMessage: RestartEvent**");
client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");
System.out.println("Sleeping 30 seconds to terminate the eternal workflow");
TimeUnit.SECONDS.sleep(30);
client.terminateWorkflow(restartingInstanceId, null);
}
System.out.println("Exiting DemoWorkflowClient.");

View File

@ -30,6 +30,7 @@ public class DemoWorkflowWorker {
public static void main(String[] args) throws Exception {
// Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class);
builder.registerActivity(DemoWorkflowActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {

16
pom.xml
View File

@ -25,7 +25,12 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<maven.deploy.skip>true</maven.deploy.skip>
<!--
manually declare durabletask-client's jackson dependencies for workflows sdk
which conflict with dapr-sdk's jackson dependencies
https://github.com/microsoft/durabletask-java/blob/main/client/build.gradle#L16
-->
<jackson.version>2.12.3</jackson.version>
<gpg.skip>true</gpg.skip>
<spotbugs.version>4.0.0-RC1</spotbugs.version>
<spotbugs.fail>true</spotbugs.fail>
@ -270,7 +275,8 @@
<failOnWarnings>false</failOnWarnings>
<failOnError>true</failOnError>
<goal>site</goal>
<excludePackageNames>io.dapr.examples:io.dapr.springboot:io.dapr.examples.*:io.dapr.springboot.*</excludePackageNames>
<excludePackageNames>io.dapr.examples:io.dapr.springboot:io.dapr.examples.*:io.dapr.springboot.*
</excludePackageNames>
</configuration>
</plugin>
</plugins>
@ -293,9 +299,9 @@
</developers>
<scm>
<url>https://github.com/dapr/java-sdk</url>
<connection>scm:git:https://github.com/dapr/java-sdk.git</connection>
<tag>HEAD</tag>
<url>https://github.com/dapr/java-sdk</url>
<connection>scm:git:https://github.com/dapr/java-sdk.git</connection>
<tag>HEAD</tag>
</scm>
<modules>

View File

@ -1,7 +1,7 @@
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@ -37,12 +37,12 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.gmazzo</groupId>
<artifactId>okhttp-mock</artifactId>
<version>1.4.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.gmazzo</groupId>
<artifactId>okhttp-mock</artifactId>
<version>1.4.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
@ -129,7 +129,7 @@
<rule>
<element>BUNDLE</element>
<limits>
<limit>
<limit>
<counter>LINE</counter>
<value>COVEREDRATIO</value>
<minimum>80%</minimum>

View File

@ -1,7 +1,7 @@
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@ -18,8 +18,6 @@
<properties>
<maven.deploy.skip>false</maven.deploy.skip>
<grpc.version>1.42.1</grpc.version>
<jackson.version>2.12.3</jackson.version>
</properties>
<dependencies>
@ -38,11 +36,23 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>5.7.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft</groupId>
<artifactId>durabletask-client</artifactId>
@ -135,7 +145,7 @@
<rule>
<element>BUNDLE</element>
<limits>
<limit>
<limit>
<counter>LINE</counter>
<value>COVEREDRATIO</value>
<minimum>80%</minimum>

View File

@ -13,13 +13,20 @@ limitations under the License.
package io.dapr.workflows;
import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskOptions;
import com.microsoft.durabletask.TaskOrchestrationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.NOPLogger;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
public class DaprWorkflowContextImpl implements WorkflowContext {
private final TaskOrchestrationContext innerContext;
@ -39,7 +46,7 @@ public class DaprWorkflowContextImpl implements WorkflowContext {
* Constructor for DaprWorkflowContextImpl.
*
* @param context TaskOrchestrationContext
* @param logger Logger
* @param logger Logger
* @throws IllegalArgumentException if context or logger is null
*/
public DaprWorkflowContextImpl(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException {
@ -78,6 +85,13 @@ public class DaprWorkflowContextImpl implements WorkflowContext {
return this.innerContext.getInstanceId();
}
/**
* {@inheritDoc}
*/
public Instant getCurrentInstant() {
return this.innerContext.getCurrentInstant();
}
/**
* {@inheritDoc}
*/
@ -88,7 +102,106 @@ public class DaprWorkflowContextImpl implements WorkflowContext {
/**
* {@inheritDoc}
*/
public Task<Void> waitForExternalEvent(String eventName, Duration timeout) {
return this.innerContext.waitForExternalEvent(eventName, timeout);
@Override
public <V> Task<V> waitForExternalEvent(String name, Duration timeout, Class<V> dataType)
throws TaskCanceledException {
return this.innerContext.waitForExternalEvent(name, timeout, dataType);
}
/**
* Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is
* received or is canceled when {@code timeout} expires.
*
* <p>See {@link #waitForExternalEvent(String, Duration, Class)} for a full description.
*
* @param name the case-insensitive name of the event to wait for
* @param timeout the amount of time to wait before canceling the returned {@code Task}
* @return a new {@link Task} that completes when the external event is received or when {@code timeout} expires
* @throws TaskCanceledException if the specified {@code timeout} value expires before the event is received
*/
@Override
public <V> Task<Void> waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException {
return this.innerContext.waitForExternalEvent(name, timeout, Void.class);
}
/**
* Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is
* received.
*
* <p>See {@link #waitForExternalEvent(String, Duration, Class)} for a full description.
*
* @param name the case-insensitive name of the event to wait for
* @return a new {@link Task} that completes when the external event is received
*/
@Override
public <V> Task<Void> waitForExternalEvent(String name) throws TaskCanceledException {
return this.innerContext.waitForExternalEvent(name, null, Void.class);
}
@Override
public boolean isReplaying() {
return this.innerContext.getIsReplaying();
}
/**
* {@inheritDoc}
*/
public <V> Task<V> callActivity(String name, Object input, TaskOptions options, Class<V> returnType) {
return this.innerContext.callActivity(name, input, options, returnType);
}
/**
* {@inheritDoc}
*/
public <V> Task<List<V>> allOf(List<Task<V>> tasks) throws CompositeTaskFailedException {
return this.innerContext.allOf(tasks);
}
/**
* {@inheritDoc}
*/
public Task<Task<?>> anyOf(List<Task<?>> tasks) {
return this.innerContext.anyOf(tasks);
}
/**
* {@inheritDoc}
*/
public Task<Void> createTimer(Duration duration) {
return this.innerContext.createTimer(duration);
}
/**
* {@inheritDoc}
*/
public <T> T getInput(Class<T> targetType) {
return this.innerContext.getInput(targetType);
}
/**
* {@inheritDoc}
*/
@Override
public <V> Task<V> callSubWorkflow(String name, @Nullable Object input, @Nullable String instanceID,
@Nullable TaskOptions options, Class<V> returnType) {
return this.innerContext.callSubOrchestrator(name, input, instanceID, options, returnType);
}
/**
* {@inheritDoc}
*/
@Override
public void continueAsNew(Object input) {
this.innerContext.continueAsNew(input);
}
/**
* {@inheritDoc}
*/
@Override
public void continueAsNew(Object input, boolean preserveUnprocessedEvents) {
this.innerContext.continueAsNew(input, preserveUnprocessedEvents);
}
}

View File

@ -13,10 +13,20 @@ limitations under the License.
package io.dapr.workflows;
import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskFailedException;
import com.microsoft.durabletask.TaskOptions;
import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.List;
/**
* Context object used by workflow implementations to perform actions such as scheduling activities,
@ -48,6 +58,13 @@ public interface WorkflowContext {
*/
String getInstanceId();
/**
* Gets the current orchestration time in UTC.
*
* @return the current orchestration time in UTC
*/
Instant getCurrentInstant();
/**
* Completes the current workflow.
*
@ -56,13 +73,445 @@ public interface WorkflowContext {
void complete(Object output);
/**
* Waits for an event to be raised with name and returns the event data.
* Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is
* received or is canceled when {@code timeout} expires.
*
* @param eventName The name of the event to wait for. Event names are case-insensitive.
* External event names can be reused any number of times; they are not
* required to be unique.
* @param timeout The amount of time to wait before cancelling the external event task.
* @return An asynchronous durabletask.Task to await.
* <p>If the current orchestration is not yet waiting for an event named {@code name}, then the event will be saved in
* the orchestration instance state and dispatched immediately when this method is called. This event saving occurs
* even if the current orchestrator cancels the wait operation before the event is received.
*
* <p>Orchestrators can wait for the same event name multiple times, so waiting for multiple events with the same name
* is allowed. Each external event received by an orchestrator will complete just one task returned by this method.
*
* @param name the case-insensitive name of the event to wait for
* @param timeout the amount of time to wait before canceling the returned {@code Task}
* @param dataType the expected class type of the event data payload
* @param <V> the expected type of the event data payload
* @return a new {@link Task} that completes when the external event is received or when {@code timeout} expires
* @throws TaskCanceledException if the specified {@code timeout} value expires before the event is received
*/
Task<Void> waitForExternalEvent(String eventName, Duration timeout);
<V> Task<V> waitForExternalEvent(String name, Duration timeout, Class<V> dataType) throws TaskCanceledException;
/**
* Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is
* received or is canceled when {@code timeout} expires.
*
* <p>See {@link #waitForExternalEvent(String, Duration, Class)} for a full description.
*
* @param name the case-insensitive name of the event to wait for
* @param timeout the amount of time to wait before canceling the returned {@code Task}
* @param <V> the expected type of the event data payload
* @return a new {@link Task} that completes when the external event is received or when {@code timeout} expires
* @throws TaskCanceledException if the specified {@code timeout} value expires before the event is received
*/
<V> Task<Void> waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException;
/**
* Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is
* received.
*
* <p>See {@link #waitForExternalEvent(String, Duration, Class)} for a full description.
*
* @param name the case-insensitive name of the event to wait for
* @param <V> the expected type of the event data payload
* @return a new {@link Task} that completes when the external event is received
*/
<V> Task<Void> waitForExternalEvent(String name) throws TaskCanceledException;
/**
* Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is
* received.
*
* <p>See {@link #waitForExternalEvent(String, Duration, Class)} for a full description.
*
* @param name the case-insensitive name of the event to wait for
* @param dataType the expected class type of the event data payload
* @param <V> the expected type of the event data payload
* @return a new {@link Task} that completes when the external event is received
*/
default <V> Task<V> waitForExternalEvent(String name, Class<V> dataType) {
try {
return this.waitForExternalEvent(name, null, dataType);
} catch (TaskCanceledException e) {
// This should never happen because of the max duration
throw new RuntimeException("An unexpected exception was throw while waiting for an external event.", e);
}
}
/**
* Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task}
* that completes when the activity completes. If the activity completes successfully, the returned {@code Task}'s
* value will be the activity's output. If the activity fails, the returned {@code Task} will complete exceptionally
* with a {@link TaskFailedException}.
*
* @param name the name of the activity to call
* @param input the serializable input to pass to the activity
* @param options additional options that control the execution and processing of the activity
* @param returnType the expected class type of the activity output
* @param <V> the expected type of the activity output
* @return a new {@link Task} that completes when the activity completes or fails
*/
<V> Task<V> callActivity(String name, Object input, TaskOptions options, Class<V> returnType);
/**
* Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity
* completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a complete description.
*
* @param name the name of the activity to call
* @return a new {@link Task} that completes when the activity completes or fails
* @see #callActivity(String, Object, TaskOptions, Class)
*/
default Task<Void> callActivity(String name) {
return this.callActivity(name, null, null, Void.class);
}
/**
* Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task}
* that completes when the activity completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a
* complete description.
*
* @param name the name of the activity to call
* @param input the serializable input to pass to the activity
* @return a new {@link Task} that completes when the activity completes or fails
*/
default Task<Void> callActivity(String name, Object input) {
return this.callActivity(name, input, null, Void.class);
}
/**
* Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity
* completes. If the activity completes successfully, the returned {@code Task}'s value will be the activity's
* output. See {@link #callActivity(String, Object, TaskOptions, Class)} for a complete description.
*
* @param name the name of the activity to call
* @param returnType the expected class type of the activity output
* @param <V> the expected type of the activity output
* @return a new {@link Task} that completes when the activity completes or fails
*/
default <V> Task<V> callActivity(String name, Class<V> returnType) {
return this.callActivity(name, null, null, returnType);
}
/**
* Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task}
* that completes when the activity completes.If the activity completes successfully, the returned {@code Task}'s
* value will be the activity's output. See {@link #callActivity(String, Object, TaskOptions, Class)} for a
* complete description.
*
* @param name the name of the activity to call
* @param input the serializable input to pass to the activity
* @param returnType the expected class type of the activity output
* @param <V> the expected type of the activity output
* @return a new {@link Task} that completes when the activity completes or fails
*/
default <V> Task<V> callActivity(String name, Object input, Class<V> returnType) {
return this.callActivity(name, input, null, returnType);
}
/**
* Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task}
* that completes when the activity completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a
* complete description.
*
* @param name the name of the activity to call
* @param input the serializable input to pass to the activity
* @param options additional options that control the execution and processing of the activity
* @return a new {@link Task} that completes when the activity completes or fails
*/
default Task<Void> callActivity(String name, Object input, TaskOptions options) {
return this.callActivity(name, input, options, Void.class);
}
/**
* Gets a value indicating whether the workflow is currently replaying a previous execution.
*
* <p>Workflow functions are "replayed" after being unloaded from memory to reconstruct local variable state.
* During a replay, previously executed tasks will be completed automatically with previously seen values
* that are stored in the workflow history. Once the workflow reaches the point where it's no longer
* replaying existing history, this method will return {@code false}.
*
* <p>You can use this method if you have logic that needs to run only when <em>not</em> replaying. For example,
* certain types of application logging may become too noisy when duplicated as part of replay. The
* application code could check to see whether the function is being replayed and then issue the log statements
* when this value is {@code false}.
*
* @return {@code true} if the workflow is replaying, otherwise {@code false}
*/
boolean isReplaying();
/**
* Returns a new {@code Task} that is completed when all the given {@code Task}s complete. If any of the given
* {@code Task}s complete with an exception, the returned {@code Task} will also complete with an
* {@link CompositeTaskFailedException} containing details of the first encountered failure.
* The value of the returned {@code Task} is an ordered list of the return values of the given tasks.
* If no tasks are provided, returns a {@code Task} completed with value
* {@code null}.
*
* <p>This method is useful for awaiting the completion of a set of independent tasks before continuing to the next
* step in the orchestration, as in the following example:
* <pre>{@code
* Task<String> t1 = ctx.callActivity("MyActivity", String.class);
* Task<String> t2 = ctx.callActivity("MyActivity", String.class);
* Task<String> t3 = ctx.callActivity("MyActivity", String.class);
*
* List<String> orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
* }</pre>
*
* <p>Exceptions in any of the given tasks results in an unchecked {@link CompositeTaskFailedException}.
* This exception can be inspected to obtain failure details of individual {@link Task}s.
* <pre>{@code
* try {
* List<String> orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();
* } catch (CompositeTaskFailedException e) {
* List<Exception> exceptions = e.getExceptions()
* }
* }</pre>
*
* @param tasks the list of {@code Task} objects
* @param <V> the return type of the {@code Task} objects
* @return the values of the completed {@code Task} objects in the same order as the source list
* @throws CompositeTaskFailedException if the specified {@code timeout} value expires before the event is received
*/
<V> Task<List<V>> allOf(List<Task<V>> tasks) throws CompositeTaskFailedException;
/**
* Returns a new {@code Task} that is completed when any of the tasks in {@code tasks} completes.
* See {@link #anyOf(Task[])} for more detailed information.
*
* @param tasks the list of {@code Task} objects
* @return a new {@code Task} that is completed when any of the given {@code Task}s complete
* @see #anyOf(Task[])
*/
Task<Task<?>> anyOf(List<Task<?>> tasks);
/**
* Returns a new {@code Task} that is completed when any of the given {@code Task}s complete. The value of the
* new {@code Task} is a reference to the completed {@code Task} object. If no tasks are provided, returns a
* {@code Task} that never completes.
*
* <p>This method is useful for waiting on multiple concurrent tasks and performing a task-specific operation when the
* first task completes, as in the following example:
* <pre>{@code
* Task<Void> event1 = ctx.waitForExternalEvent("Event1");
* Task<Void> event2 = ctx.waitForExternalEvent("Event2");
* Task<Void> event3 = ctx.waitForExternalEvent("Event3");
*
* Task<?> winner = ctx.anyOf(event1, event2, event3).await();
* if (winner == event1) {
* // ...
* } else if (winner == event2) {
* // ...
* } else if (winner == event3) {
* // ...
* }
* }</pre>
* The {@code anyOf} method can also be used for implementing long-running timeouts, as in the following example:
* <pre>{@code
* Task<Void> activityTask = ctx.callActivity("SlowActivity");
* Task<Void> timeoutTask = ctx.createTimer(Duration.ofMinutes(30));
*
* Task<?> winner = ctx.anyOf(activityTask, timeoutTask).await();
* if (winner == activityTask) {
* // completion case
* } else {
* // timeout case
* }
* }</pre>
*
* @param tasks the list of {@code Task} objects
* @return a new {@code Task} that is completed when any of the given {@code Task}s complete
*/
default Task<Task<?>> anyOf(Task<?>... tasks) {
return this.anyOf(Arrays.asList(tasks));
}
/**
* Creates a durable timer that expires after the specified delay.
*
* <p>Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple,
* internally-managed durable timers. The orchestration code doesn't need to be aware of this behavior. However,
* it may be visible in framework logs and the stored history state.
*
* @param duration the amount of time before the timer should expire
* @return a new {@code Task} that completes after the specified delay
*/
Task<Void> createTimer(Duration duration);
/**
* Creates a durable timer that expires after the specified timestamp with specific zone.
*
* <p>Specifying a long delay (for example, a delay of a few days or more) may result in the creation of multiple,
* internally-managed timers. The workflow code doesn't need to be aware of this behavior. However,
* it may be visible in framework logs and the stored history state.
*
* @param zonedDateTime timestamp with specific zone when the timer should expire
* @return a new {@code Task} that completes after the specified delay
*/
default Task<Void> createTimer(ZonedDateTime zonedDateTime) {
throw new UnsupportedOperationException("This method is not implemented.");
}
/**
* Gets the deserialized input of the current task orchestration.
*
* @param targetType the {@link Class} object associated with {@code V}
* @param <V> the expected type of the workflow input
* @return the deserialized input as an object of type {@code V} or {@code null} if no input was provided.
*/
<V> V getInput(Class<V> targetType);
/**
* Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes
* when the sub-workflow completes.
*
* <p>See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @return a new {@link Task} that completes when the sub-workflow completes or fails
* @see #callSubWorkflow(String, Object, String, TaskOptions, Class)
*/
default Task<Void> callSubWorkflow(String name) {
return this.callSubWorkflow(name, null);
}
/**
* Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes
* when the sub-workflow completes.
*
* <p>See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @return a new {@link Task} that completes when the sub-workflow completes or fails
*/
default Task<Void> callSubWorkflow(String name, Object input) {
return this.callSubWorkflow(name, input, null);
}
/**
* Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes
* when the sub-workflow completes.
*
* <p>See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param returnType the expected class type of the sub-workflow output
* @param <V> the expected type of the sub-workflow output
* @return a new {@link Task} that completes when the sub-workflow completes or fails
*/
default <V> Task<V> callSubWorkflow(String name, Object input, Class<V> returnType) {
return this.callSubWorkflow(name, input, null, returnType);
}
/**
* Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes
* when the sub-workflow completes.
*
* <p>See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param instanceID the unique ID of the sub-workflow
* @param returnType the expected class type of the sub-workflow output
* @param <V> the expected type of the sub-workflow output
* @return a new {@link Task} that completes when the sub-workflow completes or fails
*/
default <V> Task<V> callSubWorkflow(String name, Object input, String instanceID, Class<V> returnType) {
return this.callSubWorkflow(name, input, instanceID, null, returnType);
}
/**
* Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes
* when the sub-workflow completes.
*
* <p>See {@link #callSubWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param instanceID the unique ID of the sub-workflow
* @param options additional options that control the execution and processing of the activity
* @return a new {@link Task} that completes when the sub-workflow completes or fails
*/
default Task<Void> callSubWorkflow(String name, Object input, String instanceID, TaskOptions options) {
return this.callSubWorkflow(name, input, instanceID, options, Void.class);
}
/**
* Asynchronously invokes another workflow as a sub-workflow and returns a {@link Task} that completes
* when the sub-workflow completes. If the sub-workflow completes successfully, the returned
* {@code Task}'s value will be the activity's output. If the sub-workflow fails, the returned {@code Task}
* will complete exceptionally with a {@link TaskFailedException}.
*
* <p>A sub-workflow has its own instance ID, history, and status that is independent of the parent workflow
* that started it. There are many advantages to breaking down large orchestrations into sub-workflows:
* <ul>
* <li>
* Splitting large orchestrations into a series of smaller sub-workflows can make code more maintainable.
* </li>
* <li>
* Distributing orchestration logic across multiple compute nodes concurrently is useful if
* orchestration logic otherwise needs to coordinate a lot of tasks.
* </li>
* <li>
* Memory usage and CPU overhead can be reduced by keeping the history of parent orchestrations smaller.
* </li>
* </ul>
* The disadvantage is that there is overhead associated with starting a sub-workflow and processing its
* output. This is typically only an issue for very small orchestrations.
*
* <p>Because sub-workflows are independent of their parents, terminating a parent orchestration does not affect
* any sub-workflows. sub-workflows must be terminated independently using their unique instance ID,
* which is specified using the {@code instanceID} parameter
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the sub-workflow
* @param instanceID the unique ID of the sub-workflow
* @param options additional options that control the execution and processing of the activity
* @param returnType the expected class type of the sub-workflow output
* @param <V> the expected type of the sub-workflow output
* @return a new {@link Task} that completes when the sub-workflow completes or fails
*/
<V> Task<V> callSubWorkflow(String name,
@Nullable Object input,
@Nullable String instanceID,
@Nullable TaskOptions options,
Class<V> returnType);
/**
* Restarts the orchestration with a new input and clears its history. See {@link #continueAsNew(Object, boolean)}
* for a full description.
*
* @param input the serializable input data to re-initialize the instance with
*/
default void continueAsNew(Object input) {
this.continueAsNew(input, true);
}
/**
* Restarts the orchestration with a new input and clears its history.
*
* <p>This method is primarily designed for eternal orchestrations, which are orchestrations that
* may not ever complete. It works by restarting the orchestration, providing it with a new input,
* and truncating the existing orchestration history. It allows an orchestration to continue
* running indefinitely without having its history grow unbounded. The benefits of periodically
* truncating history include decreased memory usage, decreased storage volumes, and shorter orchestrator
* replays when rebuilding state.
*
* <p>The results of any incomplete tasks will be discarded when an orchestrator calls {@code continueAsNew}.
* For example, if a timer is scheduled and then {@code continueAsNew} is called before the timer fires, the timer
* event will be discarded. The only exception to this is external events. By default, if an external event is
* received by an orchestration but not yet processed, the event is saved in the orchestration state unit it is
* received by a call to {@link #waitForExternalEvent}. These events will remain in memory
* even after an orchestrator restarts using {@code continueAsNew}. This behavior can be disabled by specifying
* {@code false} for the {@code preserveUnprocessedEvents} parameter value.
*
* <p>Orchestrator implementations should complete immediately after calling the{@code continueAsNew} method.
*
* @param input the serializable input data to re-initialize the instance with
* @param preserveUnprocessedEvents {@code true} to push unprocessed external events into the new orchestration
* history, otherwise {@code false}
*/
void continueAsNew(Object input, boolean preserveUnprocessedEvents);
}

View File

@ -15,13 +15,21 @@ package io.dapr.workflows.client;
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.PurgeResult;
import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow;
import io.grpc.ManagedChannel;
import javax.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Defines client operations for managing Dapr Workflow instances.
*/
public class DaprWorkflowClient implements AutoCloseable {
private DurableTaskClient innerClient;
@ -48,7 +56,6 @@ public class DaprWorkflowClient implements AutoCloseable {
*
* @param innerClient DurableTaskGrpcClient with GRPC Channel set up.
* @param grpcChannel ManagedChannel for instance variable setting.
*
*/
private DaprWorkflowClient(DurableTaskClient innerClient, ManagedChannel grpcChannel) {
this.innerClient = innerClient;
@ -70,9 +77,9 @@ public class DaprWorkflowClient implements AutoCloseable {
/**
* Schedules a new workflow using DurableTask client.
*
* @param <T> any Workflow type
* @param <T> any Workflow type
* @param clazz Class extending Workflow to start an instance of.
* @return A String with the randomly-generated instance ID for new Workflow instance.
* @return the randomly-generated instance ID for new Workflow instance.
*/
public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz) {
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName());
@ -81,10 +88,10 @@ public class DaprWorkflowClient implements AutoCloseable {
/**
* Schedules a new workflow using DurableTask client.
*
* @param <T> any Workflow type
* @param <T> any Workflow type
* @param clazz Class extending Workflow to start an instance of.
* @param input the input to pass to the scheduled orchestration instance. Must be serializable.
* @return A String with the randomly-generated instance ID for new Workflow instance.
* @return the randomly-generated instance ID for new Workflow instance.
*/
public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, Object input) {
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input);
@ -93,11 +100,11 @@ public class DaprWorkflowClient implements AutoCloseable {
/**
* Schedules a new workflow using DurableTask client.
*
* @param <T> any Workflow type
* @param clazz Class extending Workflow to start an instance of.
* @param input the input to pass to the scheduled orchestration instance. Must be serializable.
* @param <T> any Workflow type
* @param clazz Class extending Workflow to start an instance of.
* @param input the input to pass to the scheduled orchestration instance. Must be serializable.
* @param instanceId the unique ID of the orchestration instance to schedule
* @return A String with the <code>instanceId</code> parameter value.
* @return the <code>instanceId</code> parameter value.
*/
public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, Object input, String instanceId) {
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input, instanceId);
@ -107,15 +114,117 @@ public class DaprWorkflowClient implements AutoCloseable {
* Terminates the workflow associated with the provided instance id.
*
* @param workflowInstanceId Workflow instance id to terminate.
* @param output the optional output to set for the terminated orchestration instance.
* @param output the optional output to set for the terminated orchestration instance.
*/
public void terminateWorkflow(String workflowInstanceId, @Nullable Object output) {
this.innerClient.terminate(workflowInstanceId, output);
}
/**
* Closes the inner DurableTask client and shutdown the GRPC channel.
* Fetches workflow instance metadata from the configured durable store.
*
* @param instanceId the unique ID of the workflow instance to fetch
* @param getInputsAndOutputs <code>true</code> to fetch the workflow instance's
* inputs, outputs, and custom status, or <code>false</code> to omit them
* @return a metadata record that describes the workflow instance and it execution status, or a default instance
*/
@Nullable
public WorkflowInstanceStatus getInstanceState(String instanceId, boolean getInputsAndOutputs) {
OrchestrationMetadata metadata = this.innerClient.getInstanceMetadata(instanceId, getInputsAndOutputs);
if (metadata == null) {
return null;
}
return new WorkflowInstanceStatus(metadata);
}
/**
* Waits for an workflow to start running and returns an
* {@link WorkflowInstanceStatus} object that contains metadata about the started
* instance and optionally its input, output, and custom status payloads.
*
* <p>A "started" workflow instance is any instance not in the Pending state.
*
* <p>If an workflow instance is already running when this method is called,
* the method will return immediately.
*
* @param instanceId the unique ID of the workflow instance to wait for
* @param timeout the amount of time to wait for the workflow instance to start
* @param getInputsAndOutputs true to fetch the workflow instance's
* inputs, outputs, and custom status, or false to omit them
* @return the workflow instance metadata or null if no such instance is found
* @throws TimeoutException when the workflow instance is not started within the specified amount of time
*/
@Nullable
public WorkflowInstanceStatus waitForInstanceStart(String instanceId, Duration timeout, boolean getInputsAndOutputs)
throws TimeoutException {
OrchestrationMetadata metadata = this.innerClient.waitForInstanceStart(instanceId, timeout, getInputsAndOutputs);
return metadata == null ? null : new WorkflowInstanceStatus(metadata);
}
/**
* Waits for an workflow to complete and returns an {@link WorkflowInstanceStatus} object that contains
* metadata about the completed instance.
*
* <p>A "completed" workflow instance is any instance in one of the terminal states. For example, the
* Completed, Failed, or Terminated states.
*
* <p>Workflows are long-running and could take hours, days, or months before completing.
* Workflows can also be eternal, in which case they'll never complete unless terminated.
* In such cases, this call may block indefinitely, so care must be taken to ensure appropriate timeouts are used.
* If an workflow instance is already complete when this method is called, the method will return immediately.
*
* @param instanceId the unique ID of the workflow instance to wait for
* @param timeout the amount of time to wait for the workflow instance to complete
* @param getInputsAndOutputs true to fetch the workflow instance's inputs, outputs, and custom
* status, or false to omit them
* @return the workflow instance metadata or null if no such instance is found
* @throws TimeoutException when the workflow instance is not completed within the specified amount of time
*/
@Nullable
public WorkflowInstanceStatus waitForInstanceCompletion(String instanceId, Duration timeout,
boolean getInputsAndOutputs) throws TimeoutException {
OrchestrationMetadata metadata =
this.innerClient.waitForInstanceCompletion(instanceId, timeout, getInputsAndOutputs);
return metadata == null ? null : new WorkflowInstanceStatus(metadata);
}
/**
* Sends an event notification message to awaiting workflow instance.
*
* @param workflowInstanceId The ID of the workflow instance that will handle the event.
* @param eventName The name of the event. Event names are case-insensitive.
* @param eventPayload The serializable data payload to include with the event.
*/
public void raiseEvent(String workflowInstanceId, String eventName, Object eventPayload) {
this.innerClient.raiseEvent(workflowInstanceId, eventName, eventPayload);
}
/**
* Purges workflow instance state from the workflow state store.
*
* @param workflowInstanceId The unique ID of the workflow instance to purge.
* @return Return true if the workflow state was found and purged successfully otherwise false.
*/
public boolean purgeInstance(String workflowInstanceId) {
PurgeResult result = this.innerClient.purgeInstance(workflowInstanceId);
if (result != null) {
return result.getDeletedInstanceCount() > 0;
}
return false;
}
public void createTaskHub(boolean recreateIfExists) {
this.innerClient.createTaskHub(recreateIfExists);
}
public void deleteTaskHub() {
this.innerClient.deleteTaskHub();
}
/**
* Closes the inner DurableTask client and shutdown the GRPC channel.
*/
public void close() throws InterruptedException {
try {

View File

@ -0,0 +1,65 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows.client;
import com.microsoft.durabletask.FailureDetails;
/**
* Represents a workflow failure details.
*/
public class WorkflowFailureDetails {
private final FailureDetails workflowFailureDetails;
/**
* Class constructor.
*
* @param failureDetails failure Details
*/
public WorkflowFailureDetails(FailureDetails failureDetails) {
this.workflowFailureDetails = failureDetails;
}
/**
* Gets the error type, which is the namespace-qualified exception type name.
*
* @return the error type, which is the namespace-qualified exception type name
*/
public String getErrorType() {
return workflowFailureDetails.getErrorType();
}
/**
* Gets the error message.
*
* @return the error message
*/
public String getErrorMessage() {
return workflowFailureDetails.getErrorMessage();
}
/**
* Gets the stack trace.
*
* @return the stack trace
*/
public String getStackTrace() {
return workflowFailureDetails.getStackTrace();
}
@Override
public String toString() {
return workflowFailureDetails.toString();
}
}

View File

@ -0,0 +1,204 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows.client;
import com.microsoft.durabletask.FailureDetails;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.runtime.WorkflowRuntimeStatus;
import javax.annotation.Nullable;
import java.time.Instant;
/**
* Represents a snapshot of a workflow instance's current state, including
* metadata.
*/
public class WorkflowInstanceStatus {
private final OrchestrationMetadata orchestrationMetadata;
@Nullable
private final WorkflowFailureDetails failureDetails;
/**
* Class constructor.
*
* @param orchestrationMetadata Durable task orchestration metadata
*/
public WorkflowInstanceStatus(OrchestrationMetadata orchestrationMetadata) {
if (orchestrationMetadata == null) {
throw new IllegalArgumentException("OrchestrationMetadata cannot be null");
}
this.orchestrationMetadata = orchestrationMetadata;
FailureDetails details = orchestrationMetadata.getFailureDetails();
if (details != null) {
this.failureDetails = new WorkflowFailureDetails(details);
} else {
this.failureDetails = null;
}
}
/**
* Gets the name of the workflow.
*
* @return the name of the workflow
*/
public String getName() {
return orchestrationMetadata.getName();
}
/**
* Gets the unique ID of the workflow instance.
*
* @return the unique ID of the workflow instance
*/
public String getInstanceId() {
return orchestrationMetadata.getInstanceId();
}
/**
* Gets the current runtime status of the workflow instance at the time this
* object was fetched.
*
* @return the current runtime status of the workflow instance at the time this object was fetched
*/
public WorkflowRuntimeStatus getRuntimeStatus() {
return WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(orchestrationMetadata.getRuntimeStatus());
}
/**
* Gets the workflow instance's creation time in UTC.
*
* @return the workflow instance's creation time in UTC
*/
public Instant getCreatedAt() {
return orchestrationMetadata.getCreatedAt();
}
/**
* Gets the workflow instance's last updated time in UTC.
*
* @return the workflow instance's last updated time in UTC
*/
public Instant getLastUpdatedAt() {
return orchestrationMetadata.getLastUpdatedAt();
}
/**
* Gets the workflow instance's serialized input, if any, as a string value.
*
* @return the workflow instance's serialized input or {@code null}
*/
public String getSerializedInput() {
return orchestrationMetadata.getSerializedInput();
}
/**
* Gets the workflow instance's serialized output, if any, as a string value.
*
* @return the workflow instance's serialized output or {@code null}
*/
public String getSerializedOutput() {
return orchestrationMetadata.getSerializedOutput();
}
/**
* Gets the failure details, if any, for the failed workflow instance.
*
* <p>This method returns data only if the workflow is in the
* {@link OrchestrationRuntimeStatus#FAILED} state,
* and only if this instance metadata was fetched with the option to include
* output data.
*
* @return the failure details of the failed workflow instance or {@code null}
*/
@Nullable
public WorkflowFailureDetails getFailureDetails() {
return this.failureDetails;
}
/**
* Gets a value indicating whether the workflow instance was running at the time
* this object was fetched.
*
* @return {@code true} if the workflow existed and was in a running state otherwise {@code false}
*/
public boolean isRunning() {
return orchestrationMetadata.isRunning();
}
/**
* Gets a value indicating whether the workflow instance was completed at the
* time this object was fetched.
*
* <p>A workflow instance is considered completed when its runtime status value is
* {@link WorkflowRuntimeStatus#COMPLETED},
* {@link WorkflowRuntimeStatus#FAILED}, or
* {@link WorkflowRuntimeStatus#TERMINATED}.
*
* @return {@code true} if the workflow was in a terminal state; otherwise
* {@code false}
*/
public boolean isCompleted() {
return orchestrationMetadata.isCompleted();
}
/**
* Deserializes the workflow's input into an object of the specified type.
*
* <p>Deserialization is performed using the DataConverter that was
* configured on the DurableTaskClient object that created this workflow
* metadata object.
*
* @param type the class associated with the type to deserialize the input data
* into
* @param <T> the type to deserialize the input data into
* @return the deserialized input value
* @throws IllegalStateException if the metadata was fetched without the option
* to read inputs and outputs
*/
public <T> T readInputAs(Class<T> type) {
return orchestrationMetadata.readInputAs(type);
}
/**
* Deserializes the workflow's output into an object of the specified type.
*
* <p>Deserialization is performed using the DataConverter that was
* configured on the DurableTaskClient
* object that created this workflow metadata object.
*
* @param type the class associated with the type to deserialize the output data
* into
* @param <T> the type to deserialize the output data into
* @return the deserialized input value
* @throws IllegalStateException if the metadata was fetched without the option
* to read inputs and outputs
*/
public <T> T readOutputAs(Class<T> type) {
return orchestrationMetadata.readOutputAs(type);
}
/**
* Generates a user-friendly string representation of the current metadata
* object.
*
* @return a user-friendly string representation of the current metadata object
*/
public String toString() {
return orchestrationMetadata.toString();
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskActivity;
import com.microsoft.durabletask.TaskActivityFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
/**
* Wrapper for Durable Task Framework task activity factory.
*/
public class ActivityWrapper<T extends WorkflowActivity> implements TaskActivityFactory {
private final Constructor<T> activityConstructor;
private final String name;
/**
* Constructor for ActivityWrapper.
*
* @param clazz Class of the activity to wrap.
*/
public ActivityWrapper(Class<T> clazz) {
this.name = clazz.getCanonicalName();
try {
this.activityConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format("No constructor found for activity class '%s'.", this.name), e
);
}
}
@Override
public String getName() {
return name;
}
@Override
public TaskActivity create() {
return ctx -> {
Object result;
T activity;
try {
activity = this.activityConstructor.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(
String.format("Unable to instantiate instance of activity class '%s'", this.name), e
);
}
result = activity.run(new WorkflowActivityContext(ctx));
return result;
};
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows.runtime;
/**
* Common interface for task activity implementations.
*
* <p>Activities are the basic unit of work in a durable task orchestration. Activities are the tasks that are
* orchestrated in the business process. For example, you might create an orchestrator to process an order. The tasks
* ay involve checking the inventory, charging the customer, and creating a shipment. Each task would be a separate
* activity. These activities may be executed serially, in parallel, or some combination of both.
*
* <p>Unlike task orchestrators, activities aren't restricted in the type of work you can do in them. Activity functions
* are frequently used to make network calls or run CPU intensive operations. An activity can also return data back to
* the orchestrator function. The Durable Task runtime guarantees that each called activity function will be executed
* <strong>at least once</strong> during an orchestration's execution.
*
* <p>Because activities only guarantee at least once execution, it's recommended that activity logic be implemented as
* idempotent whenever possible.
*
* <p>Activities are scheduled by orchestrators using one of the {@link io.dapr.workflows.WorkflowContext#callActivity}
* method overloads.
*/
public interface WorkflowActivity {
/**
* Executes the activity logic and returns a value which will be serialized and
* returned to the calling orchestrator.
*
* @param ctx provides information about the current activity execution, like the activity's name and the input
* data provided to it by the orchestrator.
* @return any serializable value to be returned to the calling orchestrator.
*/
Object run(WorkflowActivityContext ctx);
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskActivityContext;
/**
* Wrapper for Durable Task Framework {@link TaskActivityContext}.
*/
public class WorkflowActivityContext implements TaskActivityContext {
private final TaskActivityContext innerContext;
/**
* Constructor for WorkflowActivityContext.
*
* @param context TaskActivityContext
* @throws IllegalArgumentException if context is null
*/
public WorkflowActivityContext(TaskActivityContext context) throws IllegalArgumentException {
if (context == null) {
throw new IllegalArgumentException("Context cannot be null");
}
this.innerContext = context;
}
/**
* Gets the name of the current activity.
*
* @return the name of the current activity
*/
public String getName() {
return this.innerContext.getName();
}
/**
* Gets the input of the current activity.
*
* @param <T> the type of the input
* @param targetType targetType of the input
* @return the input of the current activity
*/
public <T> T getInput(Class<T> targetType) {
return this.innerContext.getInput(targetType);
}
}

View File

@ -44,7 +44,7 @@ public class WorkflowRuntimeBuilder {
/**
* Registers a Workflow object.
*
* @param <T> any Workflow type
* @param <T> any Workflow type
* @param clazz the class being registered
* @return the WorkflowRuntimeBuilder
*/
@ -55,4 +55,16 @@ public class WorkflowRuntimeBuilder {
return this;
}
/**
* Registers an Activity object.
*
* @param clazz the class being registered
* @param <T> any WorkflowActivity type
*/
public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
this.builder = this.builder.addActivity(
new ActivityWrapper<>(clazz)
);
}
}

View File

@ -0,0 +1,139 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import java.util.List;
import java.util.stream.Collectors;
/**
* Enum describing the runtime status of a workflow.
*/
public enum WorkflowRuntimeStatus {
/**
* The workflow started running.
*/
RUNNING,
/**
* The workflow completed normally.
*/
COMPLETED,
/**
* The workflow is continued as new.
*/
CONTINUED_AS_NEW,
/**
* The workflow completed with an unhandled exception.
*/
FAILED,
/**
* The workflow was abruptly cancelled via a management API call.
*/
CANCELED,
/**
* The workflow was abruptly terminated via a management API call.
*/
TERMINATED,
/**
* The workflow was scheduled but hasn't started running.
*/
PENDING,
/**
* The workflow was suspended.
*/
SUSPENDED;
/**
* Convert runtime OrchestrationRuntimeStatus to WorkflowRuntimeStatus.
*
* @param status The OrchestrationRuntimeStatus to convert to WorkflowRuntimeStatus.
* @return The runtime status of the workflow.
*/
public static WorkflowRuntimeStatus fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus status) {
if (status == null) {
throw new IllegalArgumentException("status cannot be null");
}
switch (status) {
case RUNNING:
return WorkflowRuntimeStatus.RUNNING;
case COMPLETED:
return WorkflowRuntimeStatus.COMPLETED;
case CONTINUED_AS_NEW:
return WorkflowRuntimeStatus.CONTINUED_AS_NEW;
case FAILED:
return WorkflowRuntimeStatus.FAILED;
case CANCELED:
return WorkflowRuntimeStatus.CANCELED;
case TERMINATED:
return WorkflowRuntimeStatus.TERMINATED;
case PENDING:
return WorkflowRuntimeStatus.PENDING;
case SUSPENDED:
return WorkflowRuntimeStatus.SUSPENDED;
default:
throw new RuntimeException(String.format("Unknown status value: %s", status));
}
}
/**
* Convert runtime WorkflowRuntimeStatus to OrchestrationRuntimeStatus.
*
* @param status The OrchestrationRuntimeStatus to convert to WorkflowRuntimeStatus.
* @return The runtime status of the Orchestration.
*/
public static OrchestrationRuntimeStatus toOrchestrationRuntimeStatus(WorkflowRuntimeStatus status) {
switch (status) {
case RUNNING:
return OrchestrationRuntimeStatus.RUNNING;
case COMPLETED:
return OrchestrationRuntimeStatus.COMPLETED;
case CONTINUED_AS_NEW:
return OrchestrationRuntimeStatus.CONTINUED_AS_NEW;
case FAILED:
return OrchestrationRuntimeStatus.FAILED;
case CANCELED:
return OrchestrationRuntimeStatus.CANCELED;
case TERMINATED:
return OrchestrationRuntimeStatus.TERMINATED;
case PENDING:
return OrchestrationRuntimeStatus.PENDING;
case SUSPENDED:
return OrchestrationRuntimeStatus.SUSPENDED;
default:
throw new RuntimeException(String.format("Unknown status value: %s", status));
}
}
/**
* Convert runtime WorkflowRuntimeStatus to OrchestrationRuntimeStatus.
*
* @param statuses The list of orchestrationRuntimeStatus to convert to a list of WorkflowRuntimeStatuses.
* @return The list runtime status of the Orchestration.
*/
public static List<OrchestrationRuntimeStatus> toOrchestrationRuntimeStatus(List<WorkflowRuntimeStatus> statuses) {
return statuses.stream()
.map(x -> toOrchestrationRuntimeStatus(x))
.collect(Collectors.toList());
}
}

View File

@ -13,17 +13,24 @@ limitations under the License.
package io.dapr.workflows;
import com.microsoft.durabletask.RetryPolicy;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskOptions;
import com.microsoft.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.DaprWorkflowContextImpl;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DaprWorkflowContextImplTest {
private DaprWorkflowContextImpl context;
@ -35,13 +42,6 @@ public class DaprWorkflowContextImplTest {
context = new DaprWorkflowContextImpl(mockInnerContext);
}
@Test
public void nullConstructorTest() {
assertThrows(IllegalArgumentException.class, () -> { new DaprWorkflowContextImpl(mockInnerContext, null); });
assertThrows(IllegalArgumentException.class, () -> { new DaprWorkflowContextImpl(null, mock(Logger.class)); });
assertThrows(IllegalArgumentException.class, () -> { new DaprWorkflowContextImpl(null, null); });
}
@Test
public void getNameTest() {
context.getName();
@ -55,19 +55,46 @@ public class DaprWorkflowContextImplTest {
}
@Test
public void waitForExternalEventTest() {
doReturn(mock(Task.class))
.when(mockInnerContext).waitForExternalEvent(any(String.class), any(Duration.class));
DaprWorkflowContextImpl testContext = new DaprWorkflowContextImpl(mockInnerContext);
public void getCurrentInstantTest() {
context.getCurrentInstant();
verify(mockInnerContext, times(1)).getCurrentInstant();
}
@Test
public void waitForExternalEventWithEventAndDurationTest() {
String expectedEvent = "TestEvent";
Duration expectedDuration = Duration.ofSeconds(1);
testContext.waitForExternalEvent(expectedEvent, expectedDuration).await();
verify(mockInnerContext, times(1)).waitForExternalEvent(expectedEvent, expectedDuration);
context.waitForExternalEvent(expectedEvent, expectedDuration);
verify(mockInnerContext, times(1)).waitForExternalEvent(expectedEvent, expectedDuration, Void.class);
}
@Test
public void waitForExternalEventTest() {
String expectedEvent = "TestEvent";
Duration expectedDuration = Duration.ofSeconds(1);
context.waitForExternalEvent(expectedEvent, expectedDuration, String.class);
verify(mockInnerContext, times(1)).waitForExternalEvent(expectedEvent, expectedDuration, String.class);
}
@Test
public void callActivityTest() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
context.callActivity(expectedName, expectedInput, String.class);
verify(mockInnerContext, times(1)).callActivity(expectedName, expectedInput, null, String.class);
}
@Test(expected = IllegalArgumentException.class)
public void DaprWorkflowContextWithEmptyInnerContext() {
context = new DaprWorkflowContextImpl(mockInnerContext, null);
}
@Test(expected = IllegalArgumentException.class)
public void DaprWorkflowContextWithEmptyLogger() {
context = new DaprWorkflowContextImpl(null, null);
}
@ -77,10 +104,16 @@ public class DaprWorkflowContextImplTest {
verify(mockInnerContext, times(1)).complete(null);
}
@Test
public void getIsReplayingTest() {
context.isReplaying();
verify(mockInnerContext, times(1)).getIsReplaying();
}
@Test
public void getLoggerReplayingTest() {
Logger mockLogger = mock(Logger.class);
when(mockInnerContext.getIsReplaying()).thenReturn(true);
when(context.isReplaying()).thenReturn(true);
DaprWorkflowContextImpl testContext = new DaprWorkflowContextImpl(mockInnerContext, mockLogger);
String expectedArg = "test print";
@ -92,7 +125,7 @@ public class DaprWorkflowContextImplTest {
@Test
public void getLoggerFirstTimeTest() {
Logger mockLogger = mock(Logger.class);
when(mockInnerContext.getIsReplaying()).thenReturn(false);
when(context.isReplaying()).thenReturn(false);
DaprWorkflowContextImpl testContext = new DaprWorkflowContextImpl(mockInnerContext, mockLogger);
String expectedArg = "test print";
@ -100,4 +133,74 @@ public class DaprWorkflowContextImplTest {
verify(mockLogger, times(1)).info(expectedArg);
}
@Test
public void continueAsNewTest() {
String expectedInput = "TestInput";
context.continueAsNew(expectedInput);
verify(mockInnerContext, times(1)).continueAsNew(expectedInput);
}
@Test
public void allOfTest() {
Task<Void> t1 = mockInnerContext.callActivity("task1");
Task<Void> t2 = mockInnerContext.callActivity("task2");
List<Task<Void>> taskList = Arrays.asList(t1, t2);
context.allOf(taskList);
verify(mockInnerContext, times(1)).allOf(taskList);
}
@Test
public void anyOfTest() {
Task<Void> t1 = mockInnerContext.callActivity("task1");
Task<Void> t2 = mockInnerContext.callActivity("task2");
Task<Void> t3 = mockInnerContext.callActivity("task3");
List<Task<?>> taskList = Arrays.asList(t1, t2);
context.anyOf(taskList);
verify(mockInnerContext, times(1)).anyOf(taskList);
context.anyOf(t1, t2, t3);
verify(mockInnerContext, times(1)).anyOf(Arrays.asList(t1, t2, t3));
}
@Test
public void createTimerTest() {
context.createTimer(Duration.ofSeconds(10));
verify(mockInnerContext, times(1)).createTimer(Duration.ofSeconds(10));
}
@Test(expected = UnsupportedOperationException.class)
public void createTimerWithZonedDateTimeThrowsTest() {
context.createTimer(ZonedDateTime.now());
}
@Test
public void callSubWorkflowWithName() {
String expectedName = "TestActivity";
context.callSubWorkflow(expectedName);
verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, null, null, null, null);
}
@Test
public void callSubWorkflowWithOptions() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId";
TaskOptions expectedOptions = new TaskOptions(new RetryPolicy(1, Duration.ofSeconds(10)));
context.callSubWorkflow(expectedName, expectedInput, expectedInstanceId, expectedOptions, String.class);
verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, expectedInput, expectedInstanceId,
expectedOptions, String.class);
}
@Test
public void callSubWorkflow() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
context.callSubWorkflow(expectedName, expectedInput, String.class);
verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, expectedInput, null, null, String.class);
}
}

View File

@ -14,7 +14,10 @@ limitations under the License.
package io.dapr.workflows.client;
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowStub;
import io.grpc.ManagedChannel;
import org.junit.Before;
@ -22,11 +25,17 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.lang.reflect.Constructor;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DaprWorkflowClientTest {
private static Constructor<DaprWorkflowClient> constructor;
@ -34,18 +43,21 @@ public class DaprWorkflowClientTest {
private DurableTaskClient mockInnerClient;
private ManagedChannel mockGrpcChannel;
public class TestWorkflow extends Workflow {
public static class TestWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> { };
return WorkflowContext::getInstanceId;
}
}
@BeforeClass
public static void beforeAll() {
constructor =
constructor =
Constructor.class.cast(Arrays.stream(DaprWorkflowClient.class.getDeclaredConstructors())
.filter(c -> c.getParameters().length == 2).peek(c -> c.setAccessible(true)).findFirst().get());
.filter(c -> c.getParameters().length == 2).map(c -> {
c.setAccessible(true);
return c;
}).findFirst().get());
}
@Before
@ -102,19 +114,104 @@ public class DaprWorkflowClientTest {
verify(mockInnerClient, times(1)).terminate(expectedArgument, null);
}
@Test
public void getInstanceMetadata() {
// Arrange
String instanceId = "TestWorkflowInstanceId";
OrchestrationMetadata expectedMetadata = mock(OrchestrationMetadata.class);
when(expectedMetadata.getInstanceId()).thenReturn(instanceId);
when(expectedMetadata.getName()).thenReturn("WorkflowName");
when(expectedMetadata.getRuntimeStatus()).thenReturn(OrchestrationRuntimeStatus.RUNNING);
when(mockInnerClient.getInstanceMetadata(instanceId, true)).thenReturn(expectedMetadata);
// Act
WorkflowInstanceStatus metadata = client.getInstanceState(instanceId, true);
// Assert
verify(mockInnerClient, times(1)).getInstanceMetadata(instanceId, true);
assertNotEquals(metadata, null);
assertEquals(metadata.getInstanceId(), expectedMetadata.getInstanceId());
assertEquals(metadata.getName(), expectedMetadata.getName());
assertEquals(metadata.isRunning(), expectedMetadata.isRunning());
assertEquals(metadata.isCompleted(), expectedMetadata.isCompleted());
}
@Test
public void waitForInstanceStart() throws TimeoutException {
// Arrange
String instanceId = "TestWorkflowInstanceId";
Duration timeout = Duration.ofSeconds(10);
OrchestrationMetadata expectedMetadata = mock(OrchestrationMetadata.class);
when(expectedMetadata.getInstanceId()).thenReturn(instanceId);
when(mockInnerClient.waitForInstanceStart(instanceId, timeout, true)).thenReturn(expectedMetadata);
// Act
WorkflowInstanceStatus result = client.waitForInstanceStart(instanceId, timeout, true);
// Assert
verify(mockInnerClient, times(1)).waitForInstanceStart(instanceId, timeout, true);
assertNotEquals(result, null);
assertEquals(result.getInstanceId(), expectedMetadata.getInstanceId());
}
@Test
public void waitForInstanceCompletion() throws TimeoutException {
// Arrange
String instanceId = "TestWorkflowInstanceId";
Duration timeout = Duration.ofSeconds(10);
OrchestrationMetadata expectedMetadata = mock(OrchestrationMetadata.class);
when(expectedMetadata.getInstanceId()).thenReturn(instanceId);
when(mockInnerClient.waitForInstanceCompletion(instanceId, timeout, true)).thenReturn(expectedMetadata);
// Act
WorkflowInstanceStatus result = client.waitForInstanceCompletion(instanceId, timeout, true);
// Assert
verify(mockInnerClient, times(1)).waitForInstanceCompletion(instanceId, timeout, true);
assertNotEquals(result, null);
assertEquals(result.getInstanceId(), expectedMetadata.getInstanceId());
}
@Test
public void raiseEvent() {
String expectedInstanceId = "TestWorkflowInstanceId";
String expectedEventName = "TestEventName";
Object expectedEventPayload = new Object();
client.raiseEvent(expectedInstanceId, expectedEventName, expectedEventPayload);
verify(mockInnerClient, times(1)).raiseEvent(expectedInstanceId,
expectedEventName, expectedEventPayload);
}
@Test
public void purgeInstance() {
String expectedArgument = "TestWorkflowInstanceId";
client.purgeInstance(expectedArgument);
verify(mockInnerClient, times(1)).purgeInstance(expectedArgument);
}
@Test
public void createTaskHub() {
boolean expectedArgument = true;
client.createTaskHub(expectedArgument);
verify(mockInnerClient, times(1)).createTaskHub(expectedArgument);
}
@Test
public void deleteTaskHub() {
client.deleteTaskHub();
verify(mockInnerClient, times(1)).deleteTaskHub();
}
@Test
public void close() throws InterruptedException {
client.close();
verify(mockInnerClient, times(1)).close();
verify(mockGrpcChannel, times(1)).shutdown();
}
@Test
public void closeWithInnerClientRuntimeException() throws InterruptedException {
doThrow(RuntimeException.class).when(mockInnerClient).close();
assertThrows(RuntimeException.class, () -> { client.close(); });
verify(mockInnerClient, times(1)).close();
verify(mockGrpcChannel, times(1)).shutdown();
}
}

View File

@ -0,0 +1,232 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows.client;
import com.microsoft.durabletask.FailureDetails;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.runtime.WorkflowRuntimeStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.time.Instant;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class WorkflowInstanceStatusTest {
private OrchestrationMetadata mockOrchestrationMetadata;
private WorkflowInstanceStatus workflowMetadata;
@Before
public void setUp() throws Exception {
mockOrchestrationMetadata = mock(OrchestrationMetadata.class);
workflowMetadata = new WorkflowInstanceStatus(mockOrchestrationMetadata);
}
@Test
public void getInstanceId() {
// Arrange
String expected = "instanceId";
when(mockOrchestrationMetadata.getInstanceId()).thenReturn(expected);
// Act
String result = workflowMetadata.getInstanceId();
// Assert
verify(mockOrchestrationMetadata, times(1)).getInstanceId();
Assert.assertEquals(result, expected);
}
@Test
public void getName() {
// Arrange
String expected = "WorkflowName";
when(mockOrchestrationMetadata.getName()).thenReturn(expected);
// Act
String result = workflowMetadata.getName();
// Assert
verify(mockOrchestrationMetadata, times(1)).getName();
Assert.assertEquals(result, expected);
}
@Test
public void getCreatedAt() {
// Arrange
Instant expected = Instant.now();
when(mockOrchestrationMetadata.getCreatedAt()).thenReturn(expected);
// Act
Instant result = workflowMetadata.getCreatedAt();
// Assert
verify(mockOrchestrationMetadata, times(1)).getCreatedAt();
Assert.assertEquals(result, expected);
}
@Test
public void getLastUpdatedAt() {
// Arrange
Instant expected = Instant.now();
when(mockOrchestrationMetadata.getLastUpdatedAt()).thenReturn(expected);
// Act
Instant result = workflowMetadata.getLastUpdatedAt();
// Assert
verify(mockOrchestrationMetadata, times(1)).getLastUpdatedAt();
Assert.assertEquals(result, expected);
}
@Test
public void getFailureDetails() {
// Arrange
FailureDetails mockFailureDetails = mock(FailureDetails.class);
when(mockFailureDetails.getErrorType()).thenReturn("errorType");
when(mockFailureDetails.getErrorMessage()).thenReturn("errorMessage");
when(mockFailureDetails.getStackTrace()).thenReturn("stackTrace");
OrchestrationMetadata orchestrationMetadata = mock(OrchestrationMetadata.class);
when(orchestrationMetadata.getFailureDetails()).thenReturn(mockFailureDetails);
// Act
WorkflowInstanceStatus metadata = new WorkflowInstanceStatus(orchestrationMetadata);
WorkflowFailureDetails result = metadata.getFailureDetails();
// Assert
verify(orchestrationMetadata, times(1)).getFailureDetails();
Assert.assertEquals(result.getErrorType(), mockFailureDetails.getErrorType());
Assert.assertEquals(result.getErrorMessage(), mockFailureDetails.getErrorMessage());
Assert.assertEquals(result.getStackTrace(), mockFailureDetails.getStackTrace());
}
@Test
public void getRuntimeStatus() {
// Arrange
WorkflowRuntimeStatus expected = WorkflowRuntimeStatus.RUNNING;
when(mockOrchestrationMetadata.getRuntimeStatus()).thenReturn(OrchestrationRuntimeStatus.RUNNING);
// Act
WorkflowRuntimeStatus result = workflowMetadata.getRuntimeStatus();
// Assert
verify(mockOrchestrationMetadata, times(1)).getRuntimeStatus();
Assert.assertEquals(result, expected);
}
@Test
public void isRunning() {
// Arrange
boolean expected = true;
when(mockOrchestrationMetadata.isRunning()).thenReturn(expected);
// Act
boolean result = workflowMetadata.isRunning();
// Assert
verify(mockOrchestrationMetadata, times(1)).isRunning();
Assert.assertEquals(result, expected);
}
@Test
public void isCompleted() {
// Arrange
boolean expected = true;
when(mockOrchestrationMetadata.isCompleted()).thenReturn(expected);
// Act
boolean result = workflowMetadata.isCompleted();
// Assert
verify(mockOrchestrationMetadata, times(1)).isCompleted();
Assert.assertEquals(result, expected);
}
@Test
public void getSerializedInput() {
// Arrange
String expected = "{input: \"test\"}";
when(mockOrchestrationMetadata.getSerializedInput()).thenReturn(expected);
// Act
String result = workflowMetadata.getSerializedInput();
// Assert
verify(mockOrchestrationMetadata, times(1)).getSerializedInput();
Assert.assertEquals(result, expected);
}
@Test
public void getSerializedOutput() {
// Arrange
String expected = "{output: \"test\"}";
when(mockOrchestrationMetadata.getSerializedOutput()).thenReturn(expected);
// Act
String result = workflowMetadata.getSerializedOutput();
// Assert
verify(mockOrchestrationMetadata, times(1)).getSerializedOutput();
Assert.assertEquals(result, expected);
}
@Test
public void readInputAs() {
// Arrange
String expected = "[{property: \"test input\"}}]";
when(mockOrchestrationMetadata.readInputAs(String.class)).thenReturn(expected);
// Act
String result = workflowMetadata.readInputAs(String.class);
// Assert
verify(mockOrchestrationMetadata, times(1)).readInputAs(String.class);
Assert.assertEquals(result, expected);
}
@Test
public void readOutputAs() {
// Arrange
String expected = "[{property: \"test output\"}}]";
when(mockOrchestrationMetadata.readOutputAs(String.class)).thenReturn(expected);
// Act
String result = workflowMetadata.readOutputAs(String.class);
// Assert
verify(mockOrchestrationMetadata, times(1)).readOutputAs(String.class);
Assert.assertEquals(result, expected);
}
@Test
public void testToString() {
// Arrange
String expected = "string value";
when(mockOrchestrationMetadata.toString()).thenReturn(expected);
// Act
String result = workflowMetadata.toString();
// Assert
//verify(mockOrchestrationMetadata, times(1)).toString();
Assert.assertEquals(result, expected);
}
}

View File

@ -0,0 +1,43 @@
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskActivityContext;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ActivityWrapperTest {
public static class TestActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
String activityContextName = ctx.getName();
return ctx.getInput(String.class) + " world! from " + activityContextName;
}
}
@Test
public void getName() throws NoSuchMethodException {
ActivityWrapper<ActivityWrapperTest.TestActivity> wrapper = new ActivityWrapper<>(
ActivityWrapperTest.TestActivity.class);
Assert.assertEquals(
"io.dapr.workflows.runtime.ActivityWrapperTest.TestActivity",
wrapper.getName()
);
}
@Test
public void createWithClass() throws NoSuchMethodException {
TaskActivityContext mockContext = mock(TaskActivityContext.class);
ActivityWrapper<ActivityWrapperTest.TestActivity> wrapper = new ActivityWrapper<>(
ActivityWrapperTest.TestActivity.class);
when(mockContext.getInput(String.class)).thenReturn("Hello");
when(mockContext.getName()).thenReturn("TestActivityContext");
Object result = wrapper.create().run(mockContext);
verify(mockContext, times(1)).getInput(String.class);
Assert.assertEquals("Hello world! from TestActivityContext", result);
}
}

View File

@ -21,15 +21,18 @@ import io.dapr.workflows.WorkflowStub;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class OrchestratorWrapperTest {
public static class TestWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return WorkflowContext::getInstanceId;
}
public static class TestWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return WorkflowContext::getInstanceId;
}
}
@Test
public void getName() {
@ -44,7 +47,7 @@ public class OrchestratorWrapperTest {
public void createWithClass() {
TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class);
OrchestratorWrapper<TestWorkflow> wrapper = new OrchestratorWrapper<>(TestWorkflow.class);
when( mockContext.getInstanceId() ).thenReturn("uuid");
when(mockContext.getInstanceId()).thenReturn("uuid");
wrapper.create().run(mockContext);
verify(mockContext, times(1)).getInstanceId();
}

View File

@ -11,7 +11,15 @@ public class WorkflowRuntimeBuilderTest {
public static class TestWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> { };
return ctx -> {
};
}
}
public static class TestActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
return null;
}
}
@ -20,5 +28,13 @@ public class WorkflowRuntimeBuilderTest {
assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class));
}
@Test
public void registerValidWorkflowActivityClass() {
assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerActivity(TestActivity.class));
}
@Test
public void buildTest() {
assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().build());
}
}

View File

@ -0,0 +1,96 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class WorkflowRuntimeStatusTest {
@Before
public void setUp() throws Exception {
}
@Test
public void fromOrchestrationRuntimeStatus() {
Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.RUNNING),
WorkflowRuntimeStatus.RUNNING);
Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.COMPLETED),
WorkflowRuntimeStatus.COMPLETED);
Assert.assertEquals(
WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.CONTINUED_AS_NEW),
WorkflowRuntimeStatus.CONTINUED_AS_NEW);
Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.FAILED),
WorkflowRuntimeStatus.FAILED);
Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.CANCELED),
WorkflowRuntimeStatus.CANCELED);
Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.TERMINATED),
WorkflowRuntimeStatus.TERMINATED);
Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.PENDING),
WorkflowRuntimeStatus.PENDING);
Assert.assertEquals(WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(OrchestrationRuntimeStatus.SUSPENDED),
WorkflowRuntimeStatus.SUSPENDED);
}
@Test
public void toOrchestrationRuntimeStatus() {
Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.RUNNING),
OrchestrationRuntimeStatus.RUNNING);
Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.COMPLETED),
OrchestrationRuntimeStatus.COMPLETED);
Assert.assertEquals(
WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.CONTINUED_AS_NEW),
OrchestrationRuntimeStatus.CONTINUED_AS_NEW);
Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.FAILED),
OrchestrationRuntimeStatus.FAILED);
Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.CANCELED),
OrchestrationRuntimeStatus.CANCELED);
Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.TERMINATED),
OrchestrationRuntimeStatus.TERMINATED);
Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.PENDING),
OrchestrationRuntimeStatus.PENDING);
Assert.assertEquals(WorkflowRuntimeStatus.toOrchestrationRuntimeStatus(WorkflowRuntimeStatus.SUSPENDED),
OrchestrationRuntimeStatus.SUSPENDED);
}
@Test
public void fromOrchestrationRuntimeStatusThrowsIllegalArgumentException() {
try {
WorkflowRuntimeStatus.fromOrchestrationRuntimeStatus(null);
Assert.fail("Expected exception not thrown");
} catch (IllegalArgumentException e) {
Assert.assertEquals("status cannot be null", e.getMessage());
}
}
}

View File

@ -28,7 +28,7 @@ public class QueryTest {
orFilter.addClause(new EqFilter<>("v2", true));
orFilter.addClause(new InFilter<>("v3", 1.3, 1.5));
filter.addClause(orFilter);
filter.addClause((Filter<?>) orFilter);
// Add Filter
q.setFilter(filter);
@ -110,7 +110,7 @@ public class QueryTest {
orFilter.addClause(new EqFilter<>("v2", true));
// invalid OR filter
filter.addClause(orFilter);
filter.addClause((Filter<?>) orFilter);
// Add Filter
q.setFilter(filter);