mirror of https://github.com/dapr/samples.git
163 lines
4.6 KiB
JavaScript
163 lines
4.6 KiB
JavaScript
// This app is called by the Dapr each time a Tweet is received. In demo1 this
|
|
// service just saved the tweets to the state store. Now we are going to call
|
|
// another service via Dapr to score the tweet using direct invocation. Once
|
|
// the tweet is processed it is posted to a pub/sub service to be read by
|
|
// another service.
|
|
require("isomorphic-fetch");
|
|
require("es6-promise").polyfill();
|
|
const logger = require("./logger");
|
|
const express = require("express");
|
|
|
|
// express
|
|
const port = 3001;
|
|
const app = express();
|
|
app.use(express.json());
|
|
|
|
// Dapr
|
|
const daprPort = process.env.DAPR_HTTP_PORT || "3500";
|
|
|
|
// The Dapr endpoint for the state store component to store the tweets.
|
|
const stateEndpoint = `http://localhost:${daprPort}/v1.0/state/tweet-store`;
|
|
|
|
// The Dapr endpoint for the Pub/Sub component used to communicate with other
|
|
// services in a loosely coupled way. Tweets is the name of the component and
|
|
// processed is the name of the topic to which other services will subscribe.
|
|
const pubEndpoint = `http://localhost:${daprPort}/v1.0/publish/tweet-pubsub/scored`;
|
|
|
|
// The Dapr endpoint used to invoke the sentiment-score method on the processor service.
|
|
// We are able to invoke the service using its appId processor
|
|
const serviceEndpoint = `http://localhost:${daprPort}/v1.0/invoke/processor/method/sentiment-score`;
|
|
|
|
// store state
|
|
var saveContent = function (obj) {
|
|
return new Promise(function (resolve, reject) {
|
|
if (!obj || !obj.id) {
|
|
reject({ message: "invalid content" });
|
|
return;
|
|
}
|
|
const state = [{ key: obj.id, value: obj }];
|
|
fetch(stateEndpoint, {
|
|
method: "POST",
|
|
body: JSON.stringify(state),
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
traceparent: obj.trace_parent,
|
|
tracestate: obj.trace_state,
|
|
},
|
|
})
|
|
.then((_res) => {
|
|
if (!_res.ok) {
|
|
logger.debug(_res.statusText);
|
|
reject({ message: "error saving content" });
|
|
} else {
|
|
resolve(obj);
|
|
}
|
|
})
|
|
.catch((error) => {
|
|
logger.error(error);
|
|
reject({ message: error });
|
|
});
|
|
});
|
|
};
|
|
|
|
// tweets handler
|
|
app.post("/tweets", (req, res) => {
|
|
logger.debug("/tweets invoked...");
|
|
const tweet = req.body;
|
|
if (!tweet) {
|
|
res.status(400).send({ error: "invalid content" });
|
|
return;
|
|
}
|
|
|
|
let obj = {
|
|
id: tweet.id_str,
|
|
author: tweet.user.screen_name,
|
|
author_pic: tweet.user.profile_image_url_https,
|
|
content: tweet.full_text || tweet.text, // if extended then use it
|
|
lang: tweet.lang,
|
|
published: tweet.created_at,
|
|
trace_state: req.get("tracestate"),
|
|
trace_parent: req.get("traceparent"),
|
|
sentiment: 0.5, // default to neutral sentiment
|
|
};
|
|
|
|
logger.debug("obj: " + JSON.stringify(obj));
|
|
|
|
scoreSentiment(obj)
|
|
.then(saveContent)
|
|
.then(publishContent)
|
|
.then(function (rez) {
|
|
logger.debug("rez: " + JSON.stringify(rez));
|
|
res.status(200).send({});
|
|
})
|
|
.catch(function (error) {
|
|
logger.error(error.message);
|
|
res.status(500).send(error);
|
|
});
|
|
});
|
|
|
|
// score sentiment
|
|
var scoreSentiment = function (obj) {
|
|
return new Promise(function (resolve, reject) {
|
|
fetch(serviceEndpoint, {
|
|
method: "POST",
|
|
body: JSON.stringify({ lang: obj.lang, text: obj.content }),
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
traceparent: obj.trace_parent,
|
|
tracestate: obj.trace_state,
|
|
},
|
|
})
|
|
.then((_res) => {
|
|
if (!_res.ok) {
|
|
logger.debug(_res.statusText);
|
|
reject({ message: "error invoking service" });
|
|
} else {
|
|
return _res.json();
|
|
}
|
|
})
|
|
.then((_res) => {
|
|
logger.debug("_res: " + JSON.stringify(_res));
|
|
obj.sentiment = _res.score;
|
|
resolve(obj);
|
|
})
|
|
.catch((error) => {
|
|
logger.debug(error);
|
|
reject({ message: error });
|
|
});
|
|
});
|
|
};
|
|
|
|
// publish scored tweets
|
|
var publishContent = function (obj) {
|
|
return new Promise(function (resolve, reject) {
|
|
if (!obj || !obj.id) {
|
|
reject({ message: "invalid content" });
|
|
return;
|
|
}
|
|
fetch(pubEndpoint, {
|
|
method: "POST",
|
|
body: JSON.stringify(obj),
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
traceparent: obj.trace_parent,
|
|
tracestate: obj.trace_state,
|
|
},
|
|
})
|
|
.then((_res) => {
|
|
if (!_res.ok) {
|
|
logger.debug(_res.statusText);
|
|
reject({ message: "error publishing content" });
|
|
} else {
|
|
resolve(obj);
|
|
}
|
|
})
|
|
.catch((error) => {
|
|
logger.error(error);
|
|
reject({ message: error });
|
|
});
|
|
});
|
|
};
|
|
|
|
app.listen(port, () => logger.info(`Port: ${port}!`));
|