samples/twitter-sentiment-processor/demos/py-demo/provider/provider.py

63 lines
1.5 KiB
Python

import os
import json
import logging
from dapr.clients import DaprClient
from dapr.ext.grpc import App, BindingRequest
APP_PORT = os.getenv("APP_PORT", "3001")
PUBSUB_NAME = os.getenv("PUBSUB_NAME", "processed")
TOPIC_NAME = os.getenv("TOPIC_NAME", "processed-tweets")
STORE_NAME = os.getenv("STORE_NAME", "tweet-store")
app = App()
@app.binding('tweets')
def binding(request: BindingRequest):
payload = request.text()
m = extract_tweets(json.loads(payload))
logging.info(m)
with DaprClient() as d:
tweet_data = json.dumps(m)
d.save_state(STORE_NAME, m['id'], tweet_data)
resp = d.invoke_method(
'tweet-processor',
'sentiment-score',
data=tweet_data)
m['sentiment'] = json.loads(resp.data)
d.publish_event(PUBSUB_NAME, TOPIC_NAME, json.dumps(m))
def extract_tweets(payload):
content = payload['text']
ext_text = payload.get('extended_tweet')
if ext_text:
content = ext_text['full_text']
user_info = payload['user']
return {
'id': payload['id_str'],
'author': user_info['screen_name'] or user_info['name'],
'author_pic': user_info['profile_image_url_https'],
'content': content,
'lang': payload['lang'],
'published': payload['created_at'],
}
def main():
app.run(APP_PORT)
if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
main()