mirror of https://github.com/dapr/quickstarts.git
140 lines
4.0 KiB
Go
140 lines
4.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/mux"
|
|
)
|
|
|
|
var DAPR_HOST, DAPR_HTTP_PORT string
|
|
var okHost, okPort bool
|
|
var appPort = "6001"
|
|
var DAPR_CONFIGURATION_STORE = "configstore"
|
|
var CONFIGURATION_ITEMS = []string{"orderId1", "orderId2"}
|
|
|
|
func main() {
|
|
if DAPR_HOST, okHost = os.LookupEnv("DAPR_HOST"); !okHost {
|
|
DAPR_HOST = "http://localhost"
|
|
}
|
|
if DAPR_HTTP_PORT, okPort = os.LookupEnv("DAPR_HTTP_PORT"); !okPort {
|
|
DAPR_HTTP_PORT = "3500"
|
|
}
|
|
if value, ok := os.LookupEnv("APP_PORT"); ok {
|
|
appPort = value
|
|
}
|
|
|
|
// Get config items from the config store
|
|
for _, item := range CONFIGURATION_ITEMS {
|
|
getResponse, err := http.Get(DAPR_HOST + ":" + DAPR_HTTP_PORT + "/v1.0/configuration/" + DAPR_CONFIGURATION_STORE + "?key=" + item)
|
|
if err != nil {
|
|
fmt.Print("Could not get config item, err:" + err.Error())
|
|
os.Exit(1)
|
|
}
|
|
result, _ := io.ReadAll(getResponse.Body)
|
|
fmt.Println("Configuration for "+item+":", string(result))
|
|
}
|
|
|
|
var subscriptionId string
|
|
|
|
wg := new(sync.WaitGroup)
|
|
wg.Add(2)
|
|
// Start HTTP Server to receive config updates for 20 seconds and then shutdown and unsubscribe from config updates
|
|
go func() {
|
|
startServerToListen(&subscriptionId)
|
|
wg.Done()
|
|
}()
|
|
// Subscribe for config updates
|
|
go func() {
|
|
subscribeToConfigUpdates(&subscriptionId)
|
|
wg.Done()
|
|
}()
|
|
wg.Wait()
|
|
}
|
|
|
|
func subscribeToConfigUpdates(subscriptionId *string) {
|
|
// Add delay to allow app channel to be ready
|
|
time.Sleep(3 * time.Second)
|
|
|
|
subscription, err := http.Get(DAPR_HOST + ":" + DAPR_HTTP_PORT + "/v1.0/configuration/" + DAPR_CONFIGURATION_STORE + "/subscribe")
|
|
if err != nil {
|
|
fmt.Println("Error subscribing to config updates, err:" + err.Error())
|
|
os.Exit(1)
|
|
}
|
|
sub, err := io.ReadAll(subscription.Body)
|
|
if err != nil {
|
|
fmt.Print("Unable to read subscription id, err: " + err.Error())
|
|
os.Exit(1)
|
|
}
|
|
if !strings.Contains(string(sub), "errorCode") {
|
|
var subid map[string]interface{}
|
|
json.Unmarshal(sub, &subid)
|
|
fmt.Println("App subscribed to config changes with subscription id:", subid["id"])
|
|
*subscriptionId = subid["id"].(string)
|
|
} else {
|
|
fmt.Println("Error subscribing to config updates: ", string(sub))
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func startServerToListen(subscriptionId *string) {
|
|
r := mux.NewRouter()
|
|
httpServer := http.Server{
|
|
Addr: ":" + appPort,
|
|
Handler: r,
|
|
}
|
|
r.HandleFunc("/configuration/configstore/{configItem}", configUpdateHandler).Methods("POST")
|
|
fmt.Println("Current time before shutdown: " + time.Now().String())
|
|
// Unsubscribe to config updates and shutdown http server after 20 seconds
|
|
time.AfterFunc(20*time.Second, func() {
|
|
fmt.Println("Current time while shutdown: " + time.Now().String())
|
|
unsubscribeFromConfigUpdates(*subscriptionId)
|
|
fmt.Println("Shutting down HTTP server")
|
|
err := httpServer.Shutdown(context.Background())
|
|
if err != nil {
|
|
fmt.Println("Error shutting down HTTP server, err:" + err.Error())
|
|
}
|
|
})
|
|
|
|
// Start HTTP server
|
|
if err := httpServer.ListenAndServe(); err != nil {
|
|
log.Println("HTTP server error:", err)
|
|
}
|
|
}
|
|
|
|
func unsubscribeFromConfigUpdates(subscriptionId string) {
|
|
unsubscribe, err := http.Get(DAPR_HOST + ":" + DAPR_HTTP_PORT + "/v1.0/configuration/" + DAPR_CONFIGURATION_STORE + "/" + subscriptionId + "/unsubscribe")
|
|
if err != nil {
|
|
fmt.Println("Error unsubscribing from config updates, err:" + err.Error())
|
|
}
|
|
unsub, err := io.ReadAll(unsubscribe.Body)
|
|
if err != nil {
|
|
fmt.Print("Unable to read unsubscribe response, err: " + err.Error())
|
|
}
|
|
if strings.Contains(string(unsub), "true") {
|
|
fmt.Println("App unsubscribed from config changes")
|
|
return
|
|
} else {
|
|
fmt.Println("Error unsubscribing from config updates: ", string(unsub))
|
|
}
|
|
}
|
|
|
|
func configUpdateHandler(w http.ResponseWriter, r *http.Request) {
|
|
body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
log.Panic(err)
|
|
}
|
|
var notification map[string]interface{}
|
|
json.Unmarshal(body, ¬ification)
|
|
update, _ := json.Marshal(notification["items"])
|
|
fmt.Println("Configuration update", string(update))
|
|
}
|