From cfc2989657401f74c38861f811e22474b6144342 Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Dastidar <44349253+gdsoumya@users.noreply.github.com> Date: Wed, 4 Nov 2020 09:48:12 +0530 Subject: [PATCH] Added Path to handle duplicate workflow events (#2312) Signed-off-by: Soumya Ghosh Dastidar --- .../subscriber/pkg/gql/mutations.go | 12 +++++---- .../cluster-agents/subscriber/pkg/gql/util.go | 4 +-- litmus-portal/graphql-server/go.sum | 1 + .../graph/generated/generated.go | 7 +++++ .../graphql-server/graph/model/models_gen.go | 1 + .../graphql-server/graph/schema.graphqls | 1 + .../pkg/database/mongodb/init.go | 1 + .../pkg/database/mongodb/operations.go | 26 +++++++++++-------- .../pkg/graphql/mutations/mutation.go | 9 +++++-- 9 files changed, 42 insertions(+), 20 deletions(-) diff --git a/litmus-portal/cluster-agents/subscriber/pkg/gql/mutations.go b/litmus-portal/cluster-agents/subscriber/pkg/gql/mutations.go index b06d986e9..d0dd89d9c 100644 --- a/litmus-portal/cluster-agents/subscriber/pkg/gql/mutations.go +++ b/litmus-portal/cluster-agents/subscriber/pkg/gql/mutations.go @@ -74,7 +74,13 @@ func SendWorkflowUpdates(clusterData map[string]string, event chan types.Workflo eventMap[eventData.UID] = eventData // generate gql payload - payload, err := GenerateWorkflowPayload(clusterData["CID"], clusterData["KEY"], eventData) + payload, err := GenerateWorkflowPayload(clusterData["CID"], clusterData["KEY"], "false", eventData) + + if eventData.FinishedAt != "" { + payload, err = GenerateWorkflowPayload(clusterData["CID"], clusterData["KEY"], "true", eventData) + delete(eventMap, eventData.UID) + } + if err != nil { logrus.WithError(err).Print("ERROR PARSING WORKFLOW EVENT") } @@ -84,10 +90,6 @@ func SendWorkflowUpdates(clusterData map[string]string, event chan types.Workflo logrus.Print(err.Error()) } logrus.Print("RESPONSE ", body) - - if eventData.FinishedAt != "" { - delete(eventMap, eventData.UID) - } } } diff --git a/litmus-portal/cluster-agents/subscriber/pkg/gql/util.go b/litmus-portal/cluster-agents/subscriber/pkg/gql/util.go index 9797289fb..973570313 100644 --- a/litmus-portal/cluster-agents/subscriber/pkg/gql/util.go +++ b/litmus-portal/cluster-agents/subscriber/pkg/gql/util.go @@ -22,7 +22,7 @@ func MarshalGQLData(gqlData interface{}) (string, error) { } // generate gql mutation payload for workflow event -func GenerateWorkflowPayload(cid, accessKey string, wfEvent types.WorkflowEvent) ([]byte, error) { +func GenerateWorkflowPayload(cid, accessKey, completed string, wfEvent types.WorkflowEvent) ([]byte, error) { clusterID := `{cluster_id: \"` + cid + `\", access_key: \"` + accessKey + `\"}` for id, event := range wfEvent.Nodes { @@ -34,7 +34,7 @@ func GenerateWorkflowPayload(cid, accessKey string, wfEvent types.WorkflowEvent) if err != nil { return nil, err } - mutation := `{ workflow_id: \"` + wfEvent.WorkflowID + `\", workflow_run_id: \"` + wfEvent.UID + `\", workflow_name:\"` + wfEvent.Name + `\", cluster_id: ` + clusterID + `, execution_data:\"` + processed[1:len(processed)-1] + `\"}` + mutation := `{ workflow_id: \"` + wfEvent.WorkflowID + `\", workflow_run_id: \"` + wfEvent.UID + `\", completed: ` + completed + `, workflow_name:\"` + wfEvent.Name + `\", cluster_id: ` + clusterID + `, execution_data:\"` + processed[1:len(processed)-1] + `\"}` var payload = []byte(`{"query":"mutation { chaosWorkflowRun(workflowData:` + mutation + ` )}"}`) return payload, nil } diff --git a/litmus-portal/graphql-server/go.sum b/litmus-portal/graphql-server/go.sum index 812d4ea05..994b6761e 100644 --- a/litmus-portal/graphql-server/go.sum +++ b/litmus-portal/graphql-server/go.sum @@ -163,6 +163,7 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/litmuschaos/litmus v0.0.0-20201030044325-64ebcdc8ffcb h1:3DGMJMWqy8Yeyb7jkfTFE7MHMJ+2hDj9ov8vO8GtLjg= +github.com/litmuschaos/litmus v0.0.0-20201103140214-d61f522c8b8f h1:G7nRXRpmMmkkhIvEysz0i4UTZZwflRdoyWlJaGTmEuQ= github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE= diff --git a/litmus-portal/graphql-server/graph/generated/generated.go b/litmus-portal/graphql-server/graph/generated/generated.go index 09340f9b6..377166de9 100644 --- a/litmus-portal/graphql-server/graph/generated/generated.go +++ b/litmus-portal/graphql-server/graph/generated/generated.go @@ -2220,6 +2220,7 @@ input WorkflowRunInput { workflow_name: String! execution_data: String! cluster_id: ClusterIdentity! + completed: Boolean! } type PodLogResponse { @@ -11773,6 +11774,12 @@ func (ec *executionContext) unmarshalInputWorkflowRunInput(ctx context.Context, if err != nil { return it, err } + case "completed": + var err error + it.Completed, err = ec.unmarshalNBoolean2bool(ctx, v) + if err != nil { + return it, err + } } } diff --git a/litmus-portal/graphql-server/graph/model/models_gen.go b/litmus-portal/graphql-server/graph/model/models_gen.go index e706bed62..0f8a88b84 100644 --- a/litmus-portal/graphql-server/graph/model/models_gen.go +++ b/litmus-portal/graphql-server/graph/model/models_gen.go @@ -353,6 +353,7 @@ type WorkflowRunInput struct { WorkflowName string `json:"workflow_name"` ExecutionData string `json:"execution_data"` ClusterID *ClusterIdentity `json:"cluster_id"` + Completed bool `json:"completed"` } type WorkflowRuns struct { diff --git a/litmus-portal/graphql-server/graph/schema.graphqls b/litmus-portal/graphql-server/graph/schema.graphqls index 4b1a15ee6..63a2ea5ba 100644 --- a/litmus-portal/graphql-server/graph/schema.graphqls +++ b/litmus-portal/graphql-server/graph/schema.graphqls @@ -130,6 +130,7 @@ input WorkflowRunInput { workflow_name: String! execution_data: String! cluster_id: ClusterIdentity! + completed: Boolean! } type PodLogResponse { diff --git a/litmus-portal/graphql-server/pkg/database/mongodb/init.go b/litmus-portal/graphql-server/pkg/database/mongodb/init.go index a206c78fd..aad98f702 100644 --- a/litmus-portal/graphql-server/pkg/database/mongodb/init.go +++ b/litmus-portal/graphql-server/pkg/database/mongodb/init.go @@ -68,6 +68,7 @@ type WorkflowRun struct { WorkflowRunID string `bson:"workflow_run_id"` LastUpdated string `bson:"last_updated"` ExecutionData string `bson:"execution_data"` + Completed bool `bson:"completed"` } type WeightagesInput struct { diff --git a/litmus-portal/graphql-server/pkg/database/mongodb/operations.go b/litmus-portal/graphql-server/pkg/database/mongodb/operations.go index c891e97e7..95ee19839 100644 --- a/litmus-portal/graphql-server/pkg/database/mongodb/operations.go +++ b/litmus-portal/graphql-server/pkg/database/mongodb/operations.go @@ -2,12 +2,12 @@ package mongodb import ( "context" + "errors" "fmt" "log" "time" "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo/options" ) func InsertCluster(cluster Cluster) error { @@ -44,32 +44,36 @@ func UpdateCluster(query bson.D, update bson.D) error { return nil } -func UpsertWorkflowRun(workflow_id string, wfRun WorkflowRun) error { - opts := options.Update().SetUpsert(true) +func UpdateWorkflowRun(workflow_id string, wfRun WorkflowRun) (int, error) { ctx, _ := context.WithTimeout(backgroundContext, 10*time.Second) count, err := workflowCollection.CountDocuments(ctx, bson.M{"workflow_id": workflow_id, "workflow_runs.workflow_run_id": wfRun.WorkflowRunID}) if err != nil { - return err + return 0, err } + updateCount := 1 if count == 0 { filter := bson.M{"workflow_id": workflow_id} update := bson.M{"$push": bson.M{"workflow_runs": wfRun}} - _, err = workflowCollection.UpdateOne(ctx, filter, update, opts) + updateResp, err := workflowCollection.UpdateOne(ctx, filter, update) if err != nil { - return err + return 0, err + } + if updateResp.MatchedCount == 0 { + return 0, errors.New("workflow not found") } } else if count == 1 { - filter := bson.M{"workflow_id": workflow_id, "workflow_runs.workflow_run_id": wfRun.WorkflowRunID} - update := bson.M{"$set": bson.M{"workflow_runs.$.last_updated": wfRun.LastUpdated, "workflow_runs.$.execution_data": wfRun.ExecutionData}} - _, err = workflowCollection.UpdateOne(ctx, filter, update, opts) + filter := bson.M{"workflow_id": workflow_id, "workflow_runs.workflow_run_id": wfRun.WorkflowRunID, "workflow_runs.completed": false} + update := bson.M{"$set": bson.M{"workflow_runs.$.last_updated": wfRun.LastUpdated, "workflow_runs.$.execution_data": wfRun.ExecutionData, "workflow_runs.$.completed": wfRun.Completed}} + updateResp, err := workflowCollection.UpdateOne(ctx, filter, update) if err != nil { - return err + return 0, err } + updateCount = int(updateResp.MatchedCount) } - return nil + return updateCount, nil } func GetWorkflowsByProjectID(project_id string) ([]ChaosWorkFlowInput, error) { diff --git a/litmus-portal/graphql-server/pkg/graphql/mutations/mutation.go b/litmus-portal/graphql-server/pkg/graphql/mutations/mutation.go index bf8a322bc..a59265d02 100644 --- a/litmus-portal/graphql-server/pkg/graphql/mutations/mutation.go +++ b/litmus-portal/graphql-server/pkg/graphql/mutations/mutation.go @@ -122,17 +122,22 @@ func WorkFlowRunHandler(input model.WorkflowRunInput, r store.StateData) (string return "", err } - //err = database.UpsertWorkflowRun(database.WorkflowRun(newWorkflowRun)) - err = database.UpsertWorkflowRun(input.WorkflowID, database.WorkflowRun{ + //err = database.UpdateWorkflowRun(database.WorkflowRun(newWorkflowRun)) + count, err := database.UpdateWorkflowRun(input.WorkflowID, database.WorkflowRun{ WorkflowRunID: input.WorkflowRunID, LastUpdated: strconv.FormatInt(time.Now().Unix(), 10), ExecutionData: input.ExecutionData, + Completed: input.Completed, }) if err != nil { log.Print("ERROR", err) return "", err } + if count == 0 { + return "Workflow Run Discarded[Duplicate Event]", nil + } + subscriptions.SendWorkflowEvent(model.WorkflowRun{ ClusterID: cluster.ClusterID, ClusterName: cluster.ClusterName,