Added ack messages once received in pubsub

This commit is contained in:
Amulya Varote 2022-01-25 12:09:46 -08:00 committed by Artur Souza
parent 13b29cb07f
commit 90b15a3268
5 changed files with 33 additions and 8 deletions

View File

@ -2,6 +2,8 @@ using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.Mvc;
using System.Net.Http;
using System.Net;
using Dapr;
using Dapr.Client;
@ -12,9 +14,10 @@ namespace CheckoutService.controller
{
[Topic("order_pub_sub", "orders")]
[HttpPost("checkout")]
public void getCheckout([FromBody] int orderId)
public HttpResponseMessage getCheckout([FromBody] int orderId)
{
Console.WriteLine("Subscriber received : " + orderId);
return new HttpResponseMessage(HttpStatusCode.OK);
}
}
}

View File

@ -1,9 +1,10 @@
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"context"
"github.com/dapr/go-sdk/service/common"
daprd "github.com/dapr/go-sdk/service/http"
@ -17,6 +18,7 @@ var sub = &common.Subscription{
func main() {
s := daprd.NewService(":6002")
http.HandleFunc("/checkout", handleRequest)
if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}
@ -28,4 +30,17 @@ func main() {
func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("Subscriber received: %s", e.Data)
return false, nil
}
}
func handleRequest(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
resp := make(map[string]string)
resp["message"] = "Status Ok"
jsonResp, err := json.Marshal(resp)
if err != nil {
log.Fatalf("Error happened in JSON marshal. Err: %s", err)
}
w.Write(jsonResp)
return
}

View File

@ -3,6 +3,8 @@ package com.service.CheckoutService.controller;
import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -18,10 +20,12 @@ public class CheckoutServiceController {
@Topic(name = "orders", pubsubName = "order_pub_sub")
@PostMapping(path = "/checkout")
public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
public Mono<ResponseEntity> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromSupplier(() -> {
try {
log.info("Subscriber received: " + cloudEvent.getData());
return new ResponseEntity<>("successful",
HttpStatus.OK);
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -9,7 +9,7 @@ start().catch((e) => {
process.exit(1);
});
async function start(orderId) {
async function start(orderId, response) {
const PUBSUB_NAME = "order_pub_sub"
const TOPIC_NAME = "orders"
const server = new DaprServer(
@ -19,8 +19,10 @@ async function start(orderId) {
process.env.DAPR_HTTP_PORT,
CommunicationProtocolEnum.HTTP
);
await server.pubsub.subscribe(PUBSUB_NAME, TOPIC_NAME, async (orderId) => {
await server.pubsub.subscribe(PUBSUB_NAME, TOPIC_NAME, async (orderId) => {
console.log(`Subscriber received: ${JSON.stringify(orderId)}`)
}, (response) => {
response.sendStatus(200);
});
await server.startServer();
}
}

View File

@ -12,5 +12,6 @@ logging.basicConfig(level = logging.INFO)
def mytopic(event: v1.Event) -> None:
data = json.loads(event.Data())
logging.info('Subscriber received: ' + str(data))
return '', 200
app.run(6002)