fix update bug (#1765)

This commit is contained in:
IronPan 2019-08-08 13:00:34 -07:00 committed by GitHub
parent 3c9b23216c
commit 957990c78e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 52 additions and 45 deletions

View File

@ -17,6 +17,7 @@ package storage
import (
"database/sql"
"fmt"
"github.com/pkg/errors"
sq "github.com/Masterminds/squirrel"
workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
@ -77,7 +78,7 @@ type RunStore struct {
// total_size. The total_size does not reflect the page size, but it does reflect the number of runs
// matching the supplied filters and resource references.
func (s *RunStore) ListRuns(
filterContext *common.FilterContext, opts *list.Options) ([]*model.Run, int, string, error) {
filterContext *common.FilterContext, opts *list.Options) ([]*model.Run, int, string, error) {
errorF := func(err error) ([]*model.Run, int, string, error) {
return nil, 0, "", util.NewInternalServerError(err, "Failed to list runs: %v", err)
}
@ -143,7 +144,7 @@ func (s *RunStore) ListRuns(
}
func (s *RunStore) buildSelectRunsQuery(selectCount bool, opts *list.Options,
filterContext *common.FilterContext) (string, []interface{}, error) {
filterContext *common.FilterContext) (string, []interface{}, error) {
filteredSelectBuilder, err := list.FilterOnResourceReference("run_details", runColumns,
common.Run, selectCount, filterContext)
if err != nil {
@ -217,7 +218,7 @@ func (s *RunStore) scanRowsToRunDetails(rows *sql.Rows) ([]*model.RunDetail, err
var runs []*model.RunDetail
for rows.Next() {
var uuid, displayName, name, storageState, namespace, description, pipelineId, pipelineSpecManifest,
workflowSpecManifest, parameters, conditions, pipelineRuntimeManifest, workflowRuntimeManifest string
workflowSpecManifest, parameters, conditions, pipelineRuntimeManifest, workflowRuntimeManifest string
var createdAtInSec, scheduledAtInSec, finishedAtInSec int64
var metricsInString, resourceReferencesInString sql.NullString
err := rows.Scan(
@ -309,30 +310,30 @@ func (s *RunStore) CreateRun(r *model.RunDetail) (*model.RunDetail, error) {
if r.StorageState == "" {
r.StorageState = api.Run_STORAGESTATE_AVAILABLE.String()
} else if r.StorageState != api.Run_STORAGESTATE_AVAILABLE.String() &&
r.StorageState != api.Run_STORAGESTATE_ARCHIVED.String() {
r.StorageState != api.Run_STORAGESTATE_ARCHIVED.String() {
return nil, util.NewInvalidInputError("Invalid value for StorageState field: %q.", r.StorageState)
}
runSql, runArgs, err := sq.
Insert("run_details").
SetMap(sq.Eq{
"UUID": r.UUID,
"DisplayName": r.DisplayName,
"Name": r.Name,
"StorageState": r.StorageState,
"Namespace": r.Namespace,
"Description": r.Description,
"CreatedAtInSec": r.CreatedAtInSec,
"ScheduledAtInSec": r.ScheduledAtInSec,
"FinishedAtInSec": r.FinishedAtInSec,
"Conditions": r.Conditions,
"WorkflowRuntimeManifest": r.WorkflowRuntimeManifest,
"PipelineRuntimeManifest": r.PipelineRuntimeManifest,
"PipelineId": r.PipelineId,
"PipelineSpecManifest": r.PipelineSpecManifest,
"WorkflowSpecManifest": r.WorkflowSpecManifest,
"Parameters": r.Parameters,
}).ToSql()
SetMap(sq.Eq{
"UUID": r.UUID,
"DisplayName": r.DisplayName,
"Name": r.Name,
"StorageState": r.StorageState,
"Namespace": r.Namespace,
"Description": r.Description,
"CreatedAtInSec": r.CreatedAtInSec,
"ScheduledAtInSec": r.ScheduledAtInSec,
"FinishedAtInSec": r.FinishedAtInSec,
"Conditions": r.Conditions,
"WorkflowRuntimeManifest": r.WorkflowRuntimeManifest,
"PipelineRuntimeManifest": r.PipelineRuntimeManifest,
"PipelineId": r.PipelineId,
"PipelineSpecManifest": r.PipelineSpecManifest,
"WorkflowSpecManifest": r.WorkflowSpecManifest,
"Parameters": r.Parameters,
}).ToSql()
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to create query to store run to run table: '%v/%v",
r.Namespace, r.Name)
@ -382,7 +383,7 @@ func (s *RunStore) UpdateRun(runID string, condition string, finishedAtInSec int
var storedManifest string
if err := row.Scan(&storedManifest); err != nil {
tx.Rollback()
return util.NewInvalidInputError("Failed to update run %s. Row not found.", runID)
return util.NewInvalidInputError("Failed to update run %s. Row not found. Error: %s", runID, err.Error())
}
if err := s.metadataStore.RecordOutputArtifacts(runID, storedManifest, workflowRuntimeManifest); err != nil {
@ -393,10 +394,10 @@ func (s *RunStore) UpdateRun(runID string, condition string, finishedAtInSec int
sql, args, err := sq.
Update("run_details").
SetMap(sq.Eq{
"Conditions": condition,
"FinishedAtInSec": finishedAtInSec,
"WorkflowRuntimeManifest": workflowRuntimeManifest}).
SetMap(sq.Eq{
"Conditions": condition,
"FinishedAtInSec": finishedAtInSec,
"WorkflowRuntimeManifest": workflowRuntimeManifest}).
Where(sq.Eq{"UUID": runID}).
ToSql()
if err != nil {
@ -410,9 +411,15 @@ func (s *RunStore) UpdateRun(runID string, condition string, finishedAtInSec int
return util.NewInternalServerError(err,
"Failed to update run %s. error: '%v'", runID, err.Error())
}
if r, _ := result.RowsAffected(); r != 1 {
r, err := result.RowsAffected()
if err != nil {
tx.Rollback()
return util.NewInvalidInputError("Failed to update run %s. Row not found.", runID)
return util.NewInternalServerError(err,
"Failed to update run %s. error: '%v'", runID, err.Error())
}
if r > 1 {
tx.Rollback()
return util.NewInternalServerError(errors.New("Failed to update run"), "Failed to update run %s. More than 1 rows affected", runID)
}
if err := tx.Commit(); err != nil {
return util.NewInternalServerError(err, "failed to commit transaction")
@ -438,9 +445,9 @@ func (s *RunStore) CreateOrUpdateRun(runDetail *model.RunDetail) error {
func (s *RunStore) ArchiveRun(runId string) error {
sql, args, err := sq.
Update("run_details").
SetMap(sq.Eq{
"StorageState": api.Run_STORAGESTATE_ARCHIVED.String(),
}).
SetMap(sq.Eq{
"StorageState": api.Run_STORAGESTATE_ARCHIVED.String(),
}).
Where(sq.Eq{"UUID": runId}).
ToSql()
@ -461,9 +468,9 @@ func (s *RunStore) ArchiveRun(runId string) error {
func (s *RunStore) UnarchiveRun(runId string) error {
sql, args, err := sq.
Update("run_details").
SetMap(sq.Eq{
"StorageState": api.Run_STORAGESTATE_AVAILABLE.String(),
}).
SetMap(sq.Eq{
"StorageState": api.Run_STORAGESTATE_AVAILABLE.String(),
}).
Where(sq.Eq{"UUID": runId}).
ToSql()
@ -520,13 +527,13 @@ func (s *RunStore) ReportMetric(metric *model.RunMetric) (err error) {
}
sql, args, err := sq.
Insert("run_metrics").
SetMap(sq.Eq{
"RunUUID": metric.RunUUID,
"NodeID": metric.NodeID,
"Name": metric.Name,
"NumberValue": metric.NumberValue,
"Format": metric.Format,
"Payload": string(payloadBytes)}).ToSql()
SetMap(sq.Eq{
"RunUUID": metric.RunUUID,
"NodeID": metric.NodeID,
"Name": metric.Name,
"NumberValue": metric.NumberValue,
"Format": metric.Format,
"Payload": string(payloadBytes)}).ToSql()
if err != nil {
return util.NewInternalServerError(err,
"failed to create query for inserting metric: %+v", metric)
@ -562,10 +569,10 @@ func (s *RunStore) toRunMetadatas(models []model.ListableDataModel) []model.Run
// used to record artifact metadata.
func NewRunStore(db *DB, time util.TimeInterface, metadataStore *metadata.Store) *RunStore {
return &RunStore{
db: db,
db: db,
resourceReferenceStore: NewResourceReferenceStore(db),
time: time,
metadataStore: metadataStore,
time: time,
metadataStore: metadataStore,
}
}