diff --git a/pub_sub/java/.gitignore b/pub_sub/java/.gitignore new file mode 100644 index 00000000..c8319c2c --- /dev/null +++ b/pub_sub/java/.gitignore @@ -0,0 +1,39 @@ +## Maven +target/ +release.properties +.mvn + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +# IDE generated files and directories +*.iml +.idea/ +*.ipr +*.iws +.vs/ +.vscode/ +.code-workspace +.settings/ +.metadata + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# macOS +.DS_Store diff --git a/pub_sub/java/http/README.md b/pub_sub/java/http/README.md new file mode 100644 index 00000000..497e013d --- /dev/null +++ b/pub_sub/java/http/README.md @@ -0,0 +1,61 @@ +# Dapr pub/sub + +In this quickstart, there is a publisher microservice `checkout` and a subscriber microservice `order-processor` to demonstrate how Dapr enables a publish-subscribe pattern. `checkout` generates messages and publishes to a specific orders topic, and `order-processor` subscribers listen for messages of topic orders. + +Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/) link for more information about Dapr and Pub-Sub. + +> **Note:** This example leverages HTTPClient only. If you are looking for the example using the Dapr Client SDK (recommended) [click here](../sdk). + +This quickstart includes one publisher: + +- Java client message generator `checkout` + +And one subscriber: + +- Java subscriber `order-processor` + +## Pre-requisites + +* [Dapr and Dapr Cli](https://docs.dapr.io/getting-started/install-dapr/). +* Java JDK 11 (or greater): [Oracle JDK](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) or [OpenJDK](https://jdk.java.net/11/). +* [Apache Maven](https://maven.apache.org/install.html) version 3.x. + +### Run Java message publisher app with Dapr + +1. Open a new terminal window and navigate to `checkout` directory: + +```bash +cd checkout +``` + +2. Install dependencies: + +```bash +mvn clean install +``` + +3. Run the Java publisher app with Dapr: + +```bash + dapr run --app-id checkout --components-path ../../../components -- java -jar target/CheckoutService-0.0.1-SNAPSHOT.jar +``` + +### Run Java message subscriber app with Dapr + +1. Open a new terminal window and navigate to `order-processor` directory: + +```bash +cd order-processor +``` + +2. Install dependencies: + +```bash +mvn clean install +``` + +3. Run the Java subscriber app with Dapr: + +```bash + dapr run --app-port 8080 --app-id order-processor --components-path ../../../components -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar +``` diff --git a/pub_sub/java/http/checkout/pom.xml b/pub_sub/java/http/checkout/pom.xml new file mode 100644 index 00000000..6eb404b4 --- /dev/null +++ b/pub_sub/java/http/checkout/pom.xml @@ -0,0 +1,53 @@ + + + 4.0.0 + + com.service + CheckoutService + 0.0.1-SNAPSHOT + Demo for Dapr pubsub component + + 11 + 11 + 1.6.1 + + + + org.json + json + 20211205 + + + org.slf4j + slf4j-api + ${slf4jVersion} + + + org.slf4j + slf4j-simple + ${slf4jVersion} + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + com.service.CheckoutServiceApplication + + + + + + + + + diff --git a/pub_sub/java/http/checkout/src/main/java/com/service/CheckoutServiceApplication.java b/pub_sub/java/http/checkout/src/main/java/com/service/CheckoutServiceApplication.java new file mode 100644 index 00000000..3633db63 --- /dev/null +++ b/pub_sub/java/http/checkout/src/main/java/com/service/CheckoutServiceApplication.java @@ -0,0 +1,48 @@ +package com.service; + +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +public class CheckoutServiceApplication { + private static final Logger logger = LoggerFactory.getLogger(CheckoutServiceApplication.class); + private static final HttpClient httpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .connectTimeout(Duration.ofSeconds(10)) + .build(); + + private static final String PUBSUB_NAME = "order_pub_sub"; + private static final String TOPIC = "orders"; + private static String DAPR_HOST = System.getenv().getOrDefault("DAPR_HOST", "http://localhost"); + private static String DAPR_HTTP_PORT = System.getenv().getOrDefault("DAPR_HTTP_PORT", "3500"); + + public static void main(String[] args) throws InterruptedException, IOException { + String uri = DAPR_HOST +":"+ DAPR_HTTP_PORT + "/v1.0/publish/"+PUBSUB_NAME+"/"+TOPIC; + while (true) { + Random random = new Random(); + int orderId = random.nextInt(1000 - 1) + 1; + JSONObject obj = new JSONObject(); + obj.put("orderId", orderId); + + // Publish an event/message using Dapr PubSub via HTTP Post + HttpRequest request = HttpRequest.newBuilder() + .POST(HttpRequest.BodyPublishers.ofString(obj.toString())) + .uri(URI.create(uri)) + .header("Content-Type", "application/json") + .build(); + + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + logger.info("Published data: {}", orderId); + TimeUnit.MILLISECONDS.sleep(3000); + } + } +} diff --git a/pub_sub/java/http/order-processor/pom.xml b/pub_sub/java/http/order-processor/pom.xml new file mode 100644 index 00000000..092146c5 --- /dev/null +++ b/pub_sub/java/http/order-processor/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.6.3 + + + com.service + OrderProcessingService + 0.0.1-SNAPSHOT + OrderProcessingService + Demo for Dapr pubsub component + + 11 + + + + org.springframework.boot + spring-boot-starter-web + + + org.projectlombok + lombok + true + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/pub_sub/java/http/order-processor/src/main/java/com/service/OrderProcessingServiceApplication.java b/pub_sub/java/http/order-processor/src/main/java/com/service/OrderProcessingServiceApplication.java new file mode 100644 index 00000000..733f414d --- /dev/null +++ b/pub_sub/java/http/order-processor/src/main/java/com/service/OrderProcessingServiceApplication.java @@ -0,0 +1,11 @@ +package com.service; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class OrderProcessingServiceApplication { + public static void main(String[] args) { + SpringApplication.run(OrderProcessingServiceApplication.class, args); + } +} diff --git a/pub_sub/java/http/order-processor/src/main/java/com/service/controller/OrderProcessingServiceController.java b/pub_sub/java/http/order-processor/src/main/java/com/service/controller/OrderProcessingServiceController.java new file mode 100644 index 00000000..9290f4ee --- /dev/null +++ b/pub_sub/java/http/order-processor/src/main/java/com/service/controller/OrderProcessingServiceController.java @@ -0,0 +1,47 @@ +package com.service.controller; + +import com.service.model.DaprSubscription; +import com.service.model.Order; +import com.service.model.SubscriptionData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class OrderProcessingServiceController { + private static final Logger logger = LoggerFactory.getLogger(OrderProcessingServiceController.class); + + /** + * Register Dapr pub/sub subscriptions. + * + * @return DaprSubscription Object containing pubsub name, topic and route for subscription. + */ + @GetMapping(path = "/dapr/subscribe", produces = MediaType.APPLICATION_JSON_VALUE) + public DaprSubscription[] getSubscription() { + DaprSubscription daprSubscription = DaprSubscription.builder() + .pubSubName("order_pub_sub") + .topic("orders") + .route("orders") + .build(); + logger.info("Subscribed to Pubsubname {} and topic {}", "order_pub_sub", "orders"); + DaprSubscription[] arr = new DaprSubscription[]{daprSubscription}; + return arr; + } + + /** + * Dapr subscription in /dapr/subscribe sets up this route. + * + * @param body Request body + * @return ResponseEntity Returns ResponseEntity.ok() + */ + @PostMapping(path = "/orders", consumes = MediaType.ALL_VALUE) + public ResponseEntity processOrders(@RequestBody SubscriptionData body) { + logger.info("Subscriber received: "+ body.getData().getOrderId()); + return ResponseEntity.ok().build(); + } +} diff --git a/pub_sub/java/http/order-processor/src/main/java/com/service/model/DaprSubscription.java b/pub_sub/java/http/order-processor/src/main/java/com/service/model/DaprSubscription.java new file mode 100644 index 00000000..ff152931 --- /dev/null +++ b/pub_sub/java/http/order-processor/src/main/java/com/service/model/DaprSubscription.java @@ -0,0 +1,12 @@ +package com.service.model; + +import lombok.Builder; +import lombok.Getter; + +@Builder +@Getter +public class DaprSubscription { + private String pubSubName; + private String topic; + private String route; +} diff --git a/pub_sub/java/http/order-processor/src/main/java/com/service/model/Order.java b/pub_sub/java/http/order-processor/src/main/java/com/service/model/Order.java new file mode 100644 index 00000000..9999ab1b --- /dev/null +++ b/pub_sub/java/http/order-processor/src/main/java/com/service/model/Order.java @@ -0,0 +1,13 @@ +package com.service.model; + +import lombok.Getter; +import lombok.Setter; + +/** + * A class to represent the data object that is being passed to subscribe endpoint. + */ +@Getter +@Setter +public class Order { + private int orderId; +} diff --git a/pub_sub/java/http/order-processor/src/main/java/com/service/model/SubscriptionData.java b/pub_sub/java/http/order-processor/src/main/java/com/service/model/SubscriptionData.java new file mode 100644 index 00000000..49c8d28a --- /dev/null +++ b/pub_sub/java/http/order-processor/src/main/java/com/service/model/SubscriptionData.java @@ -0,0 +1,15 @@ +package com.service.model; + +import lombok.Getter; +import lombok.Setter; + +/** + * Helps in unpacking a cloud event which Checkout application generates. + * + * @param Type of data being passed to subscriber end point(orderProcessor app) + */ +@Getter +@Setter +public class SubscriptionData { + private T data; +} diff --git a/pub_sub/java/http/order-processor/src/main/resources/application.properties b/pub_sub/java/http/order-processor/src/main/resources/application.properties new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/pub_sub/java/http/order-processor/src/main/resources/application.properties @@ -0,0 +1 @@ +