litmus/litmus-portal/graphql-server/pkg/chaos-workflow/handler/handler.go

874 lines
26 KiB
Go

package handler
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"github.com/jinzhu/copier"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"github.com/google/uuid"
"github.com/litmuschaos/litmus/litmus-portal/graphql-server/graph/model"
types "github.com/litmuschaos/litmus/litmus-portal/graphql-server/pkg/chaos-workflow"
"github.com/litmuschaos/litmus/litmus-portal/graphql-server/pkg/chaos-workflow/ops"
"github.com/litmuschaos/litmus/litmus-portal/graphql-server/pkg/cluster"
store "github.com/litmuschaos/litmus/litmus-portal/graphql-server/pkg/data-store"
dbOperationsCluster "github.com/litmuschaos/litmus/litmus-portal/graphql-server/pkg/database/mongodb/cluster"
dbOperationsProject "github.com/litmuschaos/litmus/litmus-portal/graphql-server/pkg/database/mongodb/project"
dbOperationsWorkflow "github.com/litmuschaos/litmus/litmus-portal/graphql-server/pkg/database/mongodb/workflow"
dbSchemaWorkflow "github.com/litmuschaos/litmus/litmus-portal/graphql-server/pkg/database/mongodb/workflow"
dbOperationsWorkflowTemplate "github.com/litmuschaos/litmus/litmus-portal/graphql-server/pkg/database/mongodb/workflowtemplate"
dbSchemaWorkflowTemplate "github.com/litmuschaos/litmus/litmus-portal/graphql-server/pkg/database/mongodb/workflowtemplate"
gitOpsHandler "github.com/litmuschaos/litmus/litmus-portal/graphql-server/pkg/gitops/handler"
)
func CreateChaosWorkflow(ctx context.Context, input *model.ChaosWorkFlowInput, r *store.StateData) (*model.ChaosWorkFlowResponse, error) {
input, wfType, err := ops.ProcessWorkflow(input)
if err != nil {
log.Print("Error processing workflow: ", err)
return nil, err
}
// GitOps Update
err = gitOpsHandler.UpsertWorkflowToGit(ctx, input)
if err != nil {
log.Print("Error performing git push: ", err)
return nil, err
}
err = ops.ProcessWorkflowCreation(input, wfType, r)
if err != nil {
log.Print("Error executing workflow: ", err)
return nil, err
}
return &model.ChaosWorkFlowResponse{
WorkflowID: *input.WorkflowID,
CronSyntax: input.CronSyntax,
WorkflowName: input.WorkflowName,
WorkflowDescription: input.WorkflowDescription,
IsCustomWorkflow: input.IsCustomWorkflow,
}, nil
}
func DeleteWorkflow(ctx context.Context, workflow_id *string, workflowRunID *string, r *store.StateData) (bool, error) {
query := bson.D{{"workflow_id", workflow_id}}
workflow, err := dbOperationsWorkflow.GetWorkflow(query)
if err != nil {
return false, err
}
if *workflow_id != "" && *workflowRunID != "" {
for _, workflow_run := range workflow.WorkflowRuns {
if workflow_run.WorkflowRunID == *workflowRunID {
bool_true := true
workflow_run.IsRemoved = &bool_true
}
}
err = ops.ProcessWorkflowRunDelete(query, workflowRunID, workflow, r)
if err != nil {
return false, err
}
return true, nil
} else if *workflow_id != "" && *workflowRunID == "" {
wf := model.ChaosWorkFlowInput{
ProjectID: workflow.ProjectID,
WorkflowID: &workflow.WorkflowID,
WorkflowName: workflow.WorkflowName,
}
// gitOps delete
err = gitOpsHandler.DeleteWorkflowFromGit(ctx, &wf)
if err != nil {
log.Print("Error performing git push: ", err)
return false, err
}
err = ops.ProcessWorkflowDelete(query, workflow, r)
if err != nil {
return false, err
}
return true, nil
}
return false, err
}
func UpdateWorkflow(ctx context.Context, input *model.ChaosWorkFlowInput, r *store.StateData) (*model.ChaosWorkFlowResponse, error) {
input, wfType, err := ops.ProcessWorkflow(input)
if err != nil {
log.Print("Error processing workflow update: ", err)
return nil, err
}
// GitOps Update
err = gitOpsHandler.UpsertWorkflowToGit(ctx, input)
if err != nil {
log.Print("Error performing git push: ", err)
return nil, err
}
err = ops.ProcessWorkflowUpdate(input, wfType, r)
if err != nil {
log.Print("Error executing workflow update: ", err)
return nil, err
}
return &model.ChaosWorkFlowResponse{
WorkflowID: *input.WorkflowID,
CronSyntax: input.CronSyntax,
WorkflowName: input.WorkflowName,
WorkflowDescription: input.WorkflowDescription,
IsCustomWorkflow: input.IsCustomWorkflow,
}, nil
}
// QueryWorkflowRuns sends all the workflow runs for a project from the DB
func QueryWorkflowRuns(input model.GetWorkflowRunsInput) (*model.GetWorkflowsOutput, error) {
var pipeline mongo.Pipeline
// Match with projectID
matchProjectIdStage := bson.D{
{"$match", bson.D{
{"project_id", input.ProjectID},
}},
}
pipeline = append(pipeline, matchProjectIdStage)
includeAllFromWorkflow := bson.D{
{"workflow_id", 1},
{"workflow_name", 1},
{"workflow_manifest", 1},
{"cronSyntax", 1},
{"workflow_description", 1},
{"weightages", 1},
{"isCustomWorkflow", 1},
{"updated_at", 1},
{"created_at", 1},
{"project_id", 1},
{"cluster_id", 1},
{"cluster_name", 1},
{"cluster_type", 1},
{"isRemoved", 1},
}
// Filter the available workflows where isRemoved is false
matchWfRunIsRemovedStage := bson.D{
{"$project", append(includeAllFromWorkflow,
bson.E{Key: "workflow_runs", Value: bson.D{
{"$filter", bson.D{
{"input", "$workflow_runs"},
{"as", "wfRun"},
{"cond", bson.D{
{"$eq", bson.A{"$$wfRun.isRemoved", false}},
}},
}},
}},
)},
}
pipeline = append(pipeline, matchWfRunIsRemovedStage)
// Match the pipelineIds from the input array
if len(input.WorkflowRunIds) != 0 {
matchWfRunIdStage := bson.D{
{"$project", append(includeAllFromWorkflow,
bson.E{Key: "workflow_runs", Value: bson.D{
{"$filter", bson.D{
{"input", "$workflow_runs"},
{"as", "wfRun"},
{"cond", bson.D{
{"$in", bson.A{"$$wfRun.workflow_run_id", input.WorkflowRunIds}},
}},
}},
}},
)},
}
pipeline = append(pipeline, matchWfRunIdStage)
}
// Filtering based on multiple parameters
if input.Filter != nil {
// Filtering based on workflow name
if input.Filter.WorkflowName != nil && *input.Filter.WorkflowName != "" {
matchWfNameStage := bson.D{
{"$match", bson.D{
{"workflow_name", bson.D{
{"$regex", input.Filter.WorkflowName},
}},
}},
}
pipeline = append(pipeline, matchWfNameStage)
}
// Filtering based on cluster name
if input.Filter.ClusterName != nil && *input.Filter.ClusterName != "All" && *input.Filter.ClusterName != "" {
matchClusterStage := bson.D{
{"$match", bson.D{
{"cluster_name", input.Filter.ClusterName},
}},
}
pipeline = append(pipeline, matchClusterStage)
}
// Filtering based on phase
if input.Filter.WorkflowStatus != nil && *input.Filter.WorkflowStatus != "All" && *input.Filter.WorkflowStatus != "" {
filterWfRunPhaseStage := bson.D{
{"$project", append(includeAllFromWorkflow,
bson.E{Key: "workflow_runs", Value: bson.D{
{"$filter", bson.D{
{"input", "$workflow_runs"},
{"as", "wfRun"},
{"cond", bson.D{
{"$eq", bson.A{"$$wfRun.phase", string(*input.Filter.WorkflowStatus)}},
}},
}},
}},
)},
}
pipeline = append(pipeline, filterWfRunPhaseStage)
}
// Filtering based on date range
if input.Filter.DateRange != nil {
endDate := string(time.Now().Unix())
if input.Filter.DateRange.EndDate != nil {
endDate = *input.Filter.DateRange.EndDate
}
filterWfRunDateStage := bson.D{
{"$project", append(includeAllFromWorkflow,
bson.E{Key: "workflow_runs", Value: bson.D{
{"$filter", bson.D{
{"input", "$workflow_runs"},
{"as", "wfRun"},
{"cond", bson.D{
{"$and", bson.A{
bson.D{{"$lte", bson.A{"$$wfRun.last_updated", endDate}}},
bson.D{{"$gte", bson.A{"$$wfRun.last_updated", input.Filter.DateRange.StartDate}}},
}},
}},
}},
}},
)},
}
pipeline = append(pipeline, filterWfRunDateStage)
}
}
// Flatten out the workflow runs
unwindStage := bson.D{
{"$unwind", bson.D{
{"path", "$workflow_runs"},
}},
}
pipeline = append(pipeline, unwindStage)
var sortStage bson.D
switch {
case input.Sort != nil && input.Sort.Field == model.WorkflowRunSortingFieldTime:
// Sorting based on LastUpdated time
if input.Sort.Descending != nil && *input.Sort.Descending {
sortStage = bson.D{
{"$sort", bson.D{
{"workflow_runs.last_updated", -1},
}},
}
} else {
sortStage = bson.D{
{"$sort", bson.D{
{"workflow_runs.last_updated", 1},
}},
}
}
case input.Sort != nil && input.Sort.Field == model.WorkflowRunSortingFieldName:
// Sorting based on WorkflowName time
if input.Sort.Descending != nil && *input.Sort.Descending {
sortStage = bson.D{
{"$sort", bson.D{
{"workflow_name", -1},
}},
}
} else {
sortStage = bson.D{
{"$sort", bson.D{
{"workflow_name", 1},
}},
}
}
default:
// Default sorting: sorts it by LastUpdated time in descending order
sortStage = bson.D{
{"$sort", bson.D{
{"workflow_runs.last_updated", -1},
}},
}
}
// Pagination
paginatedWorkflows := bson.A{
sortStage,
}
if input.Pagination != nil {
paginationSkipStage := bson.D{
{"$skip", input.Pagination.Page * input.Pagination.Limit},
}
paginationLimitStage := bson.D{
{"$limit", input.Pagination.Limit},
}
paginatedWorkflows = append(paginatedWorkflows, paginationSkipStage, paginationLimitStage)
}
// Add two stages where we first count the number of filtered workflow and then paginate the results
facetStage := bson.D{
{"$facet", bson.D{
{"total_filtered_workflow_runs", bson.A{
bson.D{{"$count", "count"}},
}},
{"flattened_workflow_runs", paginatedWorkflows},
}},
}
pipeline = append(pipeline, facetStage)
// Call aggregation on pipeline
workflowsCursor, err := dbOperationsWorkflow.GetAggregateWorkflows(pipeline)
var result []*model.WorkflowRun
var workflows []dbSchemaWorkflow.AggregatedWorkflowRuns
if err = workflowsCursor.All(context.Background(), &workflows); err != nil || len(workflows) == 0 {
fmt.Println(err)
return &model.GetWorkflowsOutput{
TotalNoOfWorkflowRuns: 0,
WorkflowRuns: result,
}, nil
}
for _, workflow := range workflows[0].FlattenedWorkflowRuns {
workflowRun := workflow.WorkflowRuns
newWorkflowRun := model.WorkflowRun{
WorkflowName: workflow.WorkflowName,
WorkflowID: workflow.WorkflowID,
WorkflowRunID: workflowRun.WorkflowRunID,
LastUpdated: workflowRun.LastUpdated,
ProjectID: workflow.ProjectID,
ClusterID: workflow.ClusterID,
Phase: workflowRun.Phase,
ResiliencyScore: workflowRun.ResiliencyScore,
ExperimentsPassed: workflowRun.ExperimentsPassed,
TotalExperiments: workflowRun.TotalExperiments,
ExecutionData: workflowRun.ExecutionData,
ClusterName: workflow.ClusterName,
ClusterType: &workflow.ClusterType,
IsRemoved: workflowRun.IsRemoved,
}
result = append(result, &newWorkflowRun)
}
totalFilteredWorkflowRunsCounter := 0
if len(workflows) > 0 && len(workflows[0].TotalFilteredWorkflowRuns) > 0 {
totalFilteredWorkflowRunsCounter = workflows[0].TotalFilteredWorkflowRuns[0].Count
}
output := model.GetWorkflowsOutput{
TotalNoOfWorkflowRuns: totalFilteredWorkflowRunsCounter,
WorkflowRuns: result,
}
return &output, nil
}
// QueryListWorkflow returns all the workflows present in the given project
func QueryListWorkflow(workflowInput model.ListWorkflowsInput) (*model.ListWorkflowsOutput, error) {
var pipeline mongo.Pipeline
// Match with projectID
matchProjectIdStage := bson.D{
{"$match", bson.D{
{"project_id", workflowInput.ProjectID},
}},
}
pipeline = append(pipeline, matchProjectIdStage)
// Match the pipelineIds from the input array
if len(workflowInput.WorkflowIds) != 0 {
matchWfIdStage := bson.D{
{"$match", bson.D{
{"workflow_id", bson.D{
{"$in", workflowInput.WorkflowIds},
}},
}},
}
pipeline = append(pipeline, matchWfIdStage)
}
// Filtering out the workflows that are deleted/removed
matchWfIsRemovedStage := bson.D{
{"$match", bson.D{
{"isRemoved", bson.D{
{"$eq", false},
}},
}},
}
pipeline = append(pipeline, matchWfIsRemovedStage)
// Filtering based on multiple parameters
if workflowInput.Filter != nil {
// Filtering based on workflow name
if workflowInput.Filter.WorkflowName != nil && *workflowInput.Filter.WorkflowName != "" {
matchWfNameStage := bson.D{
{"$match", bson.D{
{"workflow_name", bson.D{
{"$regex", workflowInput.Filter.WorkflowName},
}},
}},
}
pipeline = append(pipeline, matchWfNameStage)
}
// Filtering based on cluster name
if workflowInput.Filter.ClusterName != nil && *workflowInput.Filter.ClusterName != "All" && *workflowInput.Filter.ClusterName != "" {
matchClusterStage := bson.D{
{"$match", bson.D{
{"cluster_name", workflowInput.Filter.ClusterName},
}},
}
pipeline = append(pipeline, matchClusterStage)
}
}
var sortStage bson.D
switch {
case workflowInput.Sort != nil && workflowInput.Sort.Field == model.WorkflowSortingFieldName:
// Sorting based on WorkflowName time
if workflowInput.Sort.Descending != nil && *workflowInput.Sort.Descending {
sortStage = bson.D{
{"$sort", bson.D{
{"workflow_name", -1},
}},
}
} else {
sortStage = bson.D{
{"$sort", bson.D{
{"workflow_name", 1},
}},
}
}
default:
// Default sorting: sorts it by LastUpdated time in descending order
sortStage = bson.D{
{"$sort", bson.D{
{"updated_at", -1},
}},
}
}
// Pagination
paginatedWorkflows := bson.A{
sortStage,
}
if workflowInput.Pagination != nil {
paginationSkipStage := bson.D{
{"$skip", workflowInput.Pagination.Page * workflowInput.Pagination.Limit},
}
paginationLimitStage := bson.D{
{"$limit", workflowInput.Pagination.Limit},
}
paginatedWorkflows = append(paginatedWorkflows, paginationSkipStage, paginationLimitStage)
}
// Add two stages where we first count the number of filtered workflow and then paginate the results
facetStage := bson.D{
{"$facet", bson.D{
{"total_filtered_workflows", bson.A{
bson.D{{"$count", "count"}},
}},
{"scheduled_workflows", paginatedWorkflows},
}},
}
pipeline = append(pipeline, facetStage)
// Call aggregation on pipeline
workflowsCursor, err := dbOperationsWorkflow.GetAggregateWorkflows(pipeline)
if err != nil {
return nil, err
}
var result []*model.Workflow
var workflows []dbSchemaWorkflow.AggregatedWorkflows
if err = workflowsCursor.All(context.Background(), &workflows); err != nil || len(workflows) == 0 {
return &model.ListWorkflowsOutput{
TotalNoOfWorkflows: 0,
Workflows: result,
}, nil
}
for _, workflow := range workflows[0].ScheduledWorkflows {
cluster, err := dbOperationsCluster.GetCluster(workflow.ClusterID)
if err != nil {
return nil, err
}
var Weightages []*model.Weightages
copier.Copy(&Weightages, &workflow.Weightages)
var WorkflowRuns []*model.WorkflowRuns
copier.Copy(&WorkflowRuns, &workflow.WorkflowRuns)
newChaosWorkflows := model.Workflow{
WorkflowID: workflow.WorkflowID,
WorkflowManifest: workflow.WorkflowManifest,
WorkflowName: workflow.WorkflowName,
CronSyntax: workflow.CronSyntax,
WorkflowDescription: workflow.WorkflowDescription,
Weightages: Weightages,
IsCustomWorkflow: workflow.IsCustomWorkflow,
UpdatedAt: workflow.UpdatedAt,
CreatedAt: workflow.CreatedAt,
ProjectID: workflow.ProjectID,
IsRemoved: workflow.IsRemoved,
ClusterName: cluster.ClusterName,
ClusterID: cluster.ClusterID,
ClusterType: cluster.ClusterType,
WorkflowRuns: WorkflowRuns,
}
result = append(result, &newChaosWorkflows)
}
totalFilteredWorkflowsCounter := 0
if len(workflows) > 0 && len(workflows[0].TotalFilteredWorkflows) > 0 {
totalFilteredWorkflowsCounter = workflows[0].TotalFilteredWorkflows[0].Count
}
output := model.ListWorkflowsOutput{
TotalNoOfWorkflows: totalFilteredWorkflowsCounter,
Workflows: result,
}
return &output, nil
}
// WorkFlowRunHandler Updates or Inserts a new Workflow Run into the DB
func WorkFlowRunHandler(input model.WorkflowRunInput, r store.StateData) (string, error) {
cluster, err := cluster.VerifyCluster(*input.ClusterID)
if err != nil {
log.Println("ERROR", err)
return "", err
}
// Parse and store execution data
var executionData types.ExecutionData
err = json.Unmarshal([]byte(input.ExecutionData), &executionData)
if err != nil {
log.Println("Can not parse Execution Data of workflow run with id: ", input.WorkflowRunID)
return "", err
}
// Resiliency Score will be calculated only if workflow execution is completed
if input.Completed {
executionData = ops.ResiliencyScoreCalculator(executionData, input.WorkflowID)
}
count := 0
isRemoved := false
count, err = dbOperationsWorkflow.UpdateWorkflowRun(input.WorkflowID, dbSchemaWorkflow.ChaosWorkflowRun{
WorkflowRunID: input.WorkflowRunID,
LastUpdated: strconv.FormatInt(time.Now().Unix(), 10),
Phase: executionData.Phase,
ResiliencyScore: &executionData.ResiliencyScore,
ExperimentsPassed: &executionData.ExperimentsPassed,
TotalExperiments: &executionData.TotalExperiments,
ExecutionData: input.ExecutionData,
Completed: input.Completed,
IsRemoved: &isRemoved,
})
if err != nil {
log.Print("ERROR", err)
return "", err
}
if count == 0 {
return "Workflow Run Discarded[Duplicate Event]", nil
}
ops.SendWorkflowEvent(model.WorkflowRun{
ClusterID: cluster.ClusterID,
ClusterName: cluster.ClusterName,
ProjectID: cluster.ProjectID,
LastUpdated: strconv.FormatInt(time.Now().Unix(), 10),
WorkflowRunID: input.WorkflowRunID,
WorkflowName: input.WorkflowName,
Phase: executionData.Phase,
ResiliencyScore: &executionData.ResiliencyScore,
ExperimentsPassed: &executionData.ExperimentsPassed,
TotalExperiments: &executionData.TotalExperiments,
ExecutionData: input.ExecutionData,
WorkflowID: input.WorkflowID,
IsRemoved: &isRemoved,
}, &r)
return "Workflow Run Accepted", nil
}
// LogsHandler receives logs from the workflow-agent and publishes to frontend clients
func LogsHandler(podLog model.PodLog, r store.StateData) (string, error) {
_, err := cluster.VerifyCluster(*podLog.ClusterID)
if err != nil {
log.Print("ERROR", err)
return "", err
}
if reqChan, ok := r.WorkflowLog[podLog.RequestID]; ok {
resp := model.PodLogResponse{
PodName: podLog.PodName,
WorkflowRunID: podLog.WorkflowRunID,
PodType: podLog.PodType,
Log: podLog.Log,
}
reqChan <- &resp
close(reqChan)
return "LOGS SENT SUCCESSFULLY", nil
}
return "LOG REQUEST CANCELLED", nil
}
// GetLogs query is used to fetch the logs from the cluster
func GetLogs(reqID string, pod model.PodLogRequest, r store.StateData) {
data, err := json.Marshal(pod)
if err != nil {
log.Print("ERROR WHILE MARSHALLING POD DETAILS")
}
reqType := "logs"
externalData := string(data)
payload := model.ClusterAction{
ProjectID: reqID,
Action: &model.ActionPayload{
RequestType: reqType,
ExternalData: &externalData,
},
}
if clusterChan, ok := r.ConnectedCluster[pod.ClusterID]; ok {
clusterChan <- &payload
} else if reqChan, ok := r.WorkflowLog[reqID]; ok {
resp := model.PodLogResponse{
PodName: pod.PodName,
WorkflowRunID: pod.WorkflowRunID,
PodType: pod.PodType,
Log: "CLUSTER ERROR : CLUSTER NOT CONNECTED",
}
reqChan <- &resp
close(reqChan)
}
}
// ReRunWorkflow sends workflow run request(single run workflow only) to agent on workflow re-run request
func ReRunWorkflow(workflowID string) (string, error) {
query := bson.D{{"workflow_id", workflowID}, {"isRemoved", false}}
workflows, err := dbOperationsWorkflow.GetWorkflows(query)
if err != nil {
log.Print("Could not get workflow :", err)
return "could not get workflow", err
}
if len(workflows) == 0 {
return "", errors.New("no such workflow found")
}
resKind := gjson.Get(workflows[0].WorkflowManifest, "kind").String()
if strings.ToLower(resKind) == "cronworkflow" { // no op
return "", errors.New("cronworkflows cannot be re-run")
}
workflows[0].WorkflowManifest, err = sjson.Set(workflows[0].WorkflowManifest, "metadata.name", workflows[0].WorkflowName+"-"+strconv.FormatInt(time.Now().Unix(), 10))
if err != nil {
log.Print("Failed to updated workflow name [re-run] :", err)
return "", errors.New("Failed to updated workflow name " + err.Error())
}
ops.SendWorkflowToSubscriber(&model.ChaosWorkFlowInput{
WorkflowManifest: workflows[0].WorkflowManifest,
ProjectID: workflows[0].ProjectID,
ClusterID: workflows[0].ClusterID,
}, nil, "create", store.Store)
return "Request for re-run acknowledged, workflowID: " + workflowID, nil
}
// KubeObjHandler receives Kubernetes Object data from subscriber
func KubeObjHandler(kubeData model.KubeObjectData, r store.StateData) (string, error) {
_, err := cluster.VerifyCluster(*kubeData.ClusterID)
if err != nil {
log.Print("Error", err)
return "", err
}
if reqChan, ok := r.KubeObjectData[kubeData.RequestID]; ok {
resp := model.KubeObjectResponse{
ClusterID: kubeData.ClusterID.ClusterID,
KubeObj: kubeData.KubeObj,
}
reqChan <- &resp
close(reqChan)
return "KubeData sent successfully", nil
}
return "KubeData sent successfully", nil
}
func GetKubeObjData(reqID string, kubeObject model.KubeObjectRequest, r store.StateData) {
reqType := kubeObject.ObjectType
data, err := json.Marshal(kubeObject)
if err != nil {
log.Print("ERROR WHILE MARSHALLING POD DETAILS")
}
externalData := string(data)
payload := model.ClusterAction{
ProjectID: reqID,
Action: &model.ActionPayload{
RequestType: reqType,
ExternalData: &externalData,
},
}
if clusterChan, ok := r.ConnectedCluster[kubeObject.ClusterID]; ok {
clusterChan <- &payload
} else if reqChan, ok := r.KubeObjectData[reqID]; ok {
resp := model.KubeObjectResponse{
ClusterID: kubeObject.ClusterID,
KubeObj: "Data not available",
}
reqChan <- &resp
close(reqChan)
}
}
// SaveWorkflowTemplate is used to save the workflow manifest as a template
func SaveWorkflowTemplate(ctx context.Context, templateInput *model.TemplateInput) (*model.ManifestTemplate, error) {
IsExist, err := IsTemplateAvailable(ctx, templateInput.TemplateName, templateInput.ProjectID)
if err != nil {
return nil, err
}
if IsExist == true {
return nil, errors.New("Template already exists")
}
projectData, err := dbOperationsProject.GetProject(ctx, bson.D{{"_id", templateInput.ProjectID}})
if err != nil {
return nil, err
}
uuid := uuid.New()
template := &dbSchemaWorkflowTemplate.ManifestTemplate{
TemplateID: uuid.String(),
TemplateName: templateInput.TemplateName,
TemplateDescription: templateInput.TemplateDescription,
ProjectID: templateInput.ProjectID,
Manifest: templateInput.Manifest,
ProjectName: projectData.Name,
CreatedAt: strconv.FormatInt(time.Now().Unix(), 10),
IsRemoved: false,
IsCustomWorkflow: templateInput.IsCustomWorkflow,
}
err = dbOperationsWorkflowTemplate.CreateWorkflowTemplate(ctx, template)
if err != nil {
log.Print("Error", err)
}
return template.GetManifestTemplateOutput(), nil
}
// ListWorkflowTemplate is used to list all the workflow templates available in the project
func ListWorkflowTemplate(ctx context.Context, projectID string) ([]*model.ManifestTemplate, error) {
templates, err := dbSchemaWorkflowTemplate.GetTemplatesByProjectID(ctx, projectID)
if err != nil {
return nil, err
}
var templateList []*model.ManifestTemplate
for _, template := range templates {
templateList = append(templateList, template.GetManifestTemplateOutput())
}
return templateList, err
}
// QueryTemplateWorkflowID is used to fetch the workflow template with template id
func QueryTemplateWorkflowByID(ctx context.Context, templateID string) (*model.ManifestTemplate, error) {
template, err := dbSchemaWorkflowTemplate.GetTemplateByTemplateID(ctx, templateID)
if err != nil {
return nil, err
}
return template.GetManifestTemplateOutput(), err
}
// DeleteWorkflowTemplate is used to delete the workflow template (update the is_removed field as true)
func DeleteWorkflowTemplate(ctx context.Context, templateID string) (bool, error) {
query := bson.D{{"template_id", templateID}}
update := bson.D{{"$set", bson.D{{"is_removed", true}}}}
err := dbOperationsWorkflowTemplate.UpdateTemplateManifest(ctx, query, update)
if err != nil {
log.Print("Err", err)
return false, err
}
return true, err
}
// IsTemplateAvailable is used to check if a template name already exists in the database
func IsTemplateAvailable(ctx context.Context, templateName string, projectID string) (bool, error) {
templates, err := dbOperationsWorkflowTemplate.GetTemplatesByProjectID(ctx, projectID)
if err != nil {
return true, err
}
for _, n := range templates {
if n.TemplateName == templateName {
return true, nil
}
}
return false, nil
}
func SyncWorkflowRun(ctx context.Context, workflow_id string, workflowRunID string, r *store.StateData) (bool, error) {
query := bson.D{{"workflow_id", workflow_id}}
workflow, err := dbOperationsWorkflow.GetWorkflow(query)
if err != nil {
return false, err
}
for _, workflow_run := range workflow.WorkflowRuns {
if workflow.IsRemoved == true {
return false, errors.New("workflow has been removed")
}
if workflow_run.WorkflowRunID == workflowRunID && !workflow_run.Completed && workflow.IsRemoved == false {
err = ops.ProcessWorkflowRunSync(workflow_id, &workflowRunID, workflow, r)
if err != nil {
return false, err
}
}
}
return true, nil
}