Added Path to handle duplicate workflow events (#2312)

Signed-off-by: Soumya Ghosh Dastidar <gdsoumya@gmail.com>
This commit is contained in:
Soumya Ghosh Dastidar 2020-11-04 09:48:12 +05:30 committed by GitHub
parent d61f522c8b
commit cfc2989657
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 42 additions and 20 deletions

View File

@ -74,7 +74,13 @@ func SendWorkflowUpdates(clusterData map[string]string, event chan types.Workflo
eventMap[eventData.UID] = eventData
// generate gql payload
payload, err := GenerateWorkflowPayload(clusterData["CID"], clusterData["KEY"], eventData)
payload, err := GenerateWorkflowPayload(clusterData["CID"], clusterData["KEY"], "false", eventData)
if eventData.FinishedAt != "" {
payload, err = GenerateWorkflowPayload(clusterData["CID"], clusterData["KEY"], "true", eventData)
delete(eventMap, eventData.UID)
}
if err != nil {
logrus.WithError(err).Print("ERROR PARSING WORKFLOW EVENT")
}
@ -84,10 +90,6 @@ func SendWorkflowUpdates(clusterData map[string]string, event chan types.Workflo
logrus.Print(err.Error())
}
logrus.Print("RESPONSE ", body)
if eventData.FinishedAt != "" {
delete(eventMap, eventData.UID)
}
}
}

View File

@ -22,7 +22,7 @@ func MarshalGQLData(gqlData interface{}) (string, error) {
}
// generate gql mutation payload for workflow event
func GenerateWorkflowPayload(cid, accessKey string, wfEvent types.WorkflowEvent) ([]byte, error) {
func GenerateWorkflowPayload(cid, accessKey, completed string, wfEvent types.WorkflowEvent) ([]byte, error) {
clusterID := `{cluster_id: \"` + cid + `\", access_key: \"` + accessKey + `\"}`
for id, event := range wfEvent.Nodes {
@ -34,7 +34,7 @@ func GenerateWorkflowPayload(cid, accessKey string, wfEvent types.WorkflowEvent)
if err != nil {
return nil, err
}
mutation := `{ workflow_id: \"` + wfEvent.WorkflowID + `\", workflow_run_id: \"` + wfEvent.UID + `\", workflow_name:\"` + wfEvent.Name + `\", cluster_id: ` + clusterID + `, execution_data:\"` + processed[1:len(processed)-1] + `\"}`
mutation := `{ workflow_id: \"` + wfEvent.WorkflowID + `\", workflow_run_id: \"` + wfEvent.UID + `\", completed: ` + completed + `, workflow_name:\"` + wfEvent.Name + `\", cluster_id: ` + clusterID + `, execution_data:\"` + processed[1:len(processed)-1] + `\"}`
var payload = []byte(`{"query":"mutation { chaosWorkflowRun(workflowData:` + mutation + ` )}"}`)
return payload, nil
}

View File

@ -163,6 +163,7 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/litmuschaos/litmus v0.0.0-20201030044325-64ebcdc8ffcb h1:3DGMJMWqy8Yeyb7jkfTFE7MHMJ+2hDj9ov8vO8GtLjg=
github.com/litmuschaos/litmus v0.0.0-20201103140214-d61f522c8b8f h1:G7nRXRpmMmkkhIvEysz0i4UTZZwflRdoyWlJaGTmEuQ=
github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=

View File

@ -2220,6 +2220,7 @@ input WorkflowRunInput {
workflow_name: String!
execution_data: String!
cluster_id: ClusterIdentity!
completed: Boolean!
}
type PodLogResponse {
@ -11773,6 +11774,12 @@ func (ec *executionContext) unmarshalInputWorkflowRunInput(ctx context.Context,
if err != nil {
return it, err
}
case "completed":
var err error
it.Completed, err = ec.unmarshalNBoolean2bool(ctx, v)
if err != nil {
return it, err
}
}
}

View File

@ -353,6 +353,7 @@ type WorkflowRunInput struct {
WorkflowName string `json:"workflow_name"`
ExecutionData string `json:"execution_data"`
ClusterID *ClusterIdentity `json:"cluster_id"`
Completed bool `json:"completed"`
}
type WorkflowRuns struct {

View File

@ -130,6 +130,7 @@ input WorkflowRunInput {
workflow_name: String!
execution_data: String!
cluster_id: ClusterIdentity!
completed: Boolean!
}
type PodLogResponse {

View File

@ -68,6 +68,7 @@ type WorkflowRun struct {
WorkflowRunID string `bson:"workflow_run_id"`
LastUpdated string `bson:"last_updated"`
ExecutionData string `bson:"execution_data"`
Completed bool `bson:"completed"`
}
type WeightagesInput struct {

View File

@ -2,12 +2,12 @@ package mongodb
import (
"context"
"errors"
"fmt"
"log"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
)
func InsertCluster(cluster Cluster) error {
@ -44,32 +44,36 @@ func UpdateCluster(query bson.D, update bson.D) error {
return nil
}
func UpsertWorkflowRun(workflow_id string, wfRun WorkflowRun) error {
opts := options.Update().SetUpsert(true)
func UpdateWorkflowRun(workflow_id string, wfRun WorkflowRun) (int, error) {
ctx, _ := context.WithTimeout(backgroundContext, 10*time.Second)
count, err := workflowCollection.CountDocuments(ctx, bson.M{"workflow_id": workflow_id, "workflow_runs.workflow_run_id": wfRun.WorkflowRunID})
if err != nil {
return err
return 0, err
}
updateCount := 1
if count == 0 {
filter := bson.M{"workflow_id": workflow_id}
update := bson.M{"$push": bson.M{"workflow_runs": wfRun}}
_, err = workflowCollection.UpdateOne(ctx, filter, update, opts)
updateResp, err := workflowCollection.UpdateOne(ctx, filter, update)
if err != nil {
return err
return 0, err
}
if updateResp.MatchedCount == 0 {
return 0, errors.New("workflow not found")
}
} else if count == 1 {
filter := bson.M{"workflow_id": workflow_id, "workflow_runs.workflow_run_id": wfRun.WorkflowRunID}
update := bson.M{"$set": bson.M{"workflow_runs.$.last_updated": wfRun.LastUpdated, "workflow_runs.$.execution_data": wfRun.ExecutionData}}
_, err = workflowCollection.UpdateOne(ctx, filter, update, opts)
filter := bson.M{"workflow_id": workflow_id, "workflow_runs.workflow_run_id": wfRun.WorkflowRunID, "workflow_runs.completed": false}
update := bson.M{"$set": bson.M{"workflow_runs.$.last_updated": wfRun.LastUpdated, "workflow_runs.$.execution_data": wfRun.ExecutionData, "workflow_runs.$.completed": wfRun.Completed}}
updateResp, err := workflowCollection.UpdateOne(ctx, filter, update)
if err != nil {
return err
return 0, err
}
updateCount = int(updateResp.MatchedCount)
}
return nil
return updateCount, nil
}
func GetWorkflowsByProjectID(project_id string) ([]ChaosWorkFlowInput, error) {

View File

@ -122,17 +122,22 @@ func WorkFlowRunHandler(input model.WorkflowRunInput, r store.StateData) (string
return "", err
}
//err = database.UpsertWorkflowRun(database.WorkflowRun(newWorkflowRun))
err = database.UpsertWorkflowRun(input.WorkflowID, database.WorkflowRun{
//err = database.UpdateWorkflowRun(database.WorkflowRun(newWorkflowRun))
count, err := database.UpdateWorkflowRun(input.WorkflowID, database.WorkflowRun{
WorkflowRunID: input.WorkflowRunID,
LastUpdated: strconv.FormatInt(time.Now().Unix(), 10),
ExecutionData: input.ExecutionData,
Completed: input.Completed,
})
if err != nil {
log.Print("ERROR", err)
return "", err
}
if count == 0 {
return "Workflow Run Discarded[Duplicate Event]", nil
}
subscriptions.SendWorkflowEvent(model.WorkflowRun{
ClusterID: cluster.ClusterID,
ClusterName: cluster.ClusterName,