Merge pull request #253 from mortent/ImproveEventSystem

Clean up the event hierarchy
This commit is contained in:
Kubernetes Prow Robot 2020-10-18 14:08:13 -07:00 committed by GitHub
commit 5748e7c5ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 87 additions and 200 deletions

View File

@ -14,7 +14,6 @@ import (
"k8s.io/cli-runtime/pkg/genericclioptions"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/common"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/print/list"
)
@ -54,18 +53,10 @@ func (ef *formatter) FormatApplyEvent(ae event.ApplyEvent, as *list.ApplyStats,
return nil
}
func (ef *formatter) FormatStatusEvent(se pollevent.Event, _ list.Collector) error {
switch se.EventType {
case pollevent.ResourceUpdateEvent:
func (ef *formatter) FormatStatusEvent(se event.StatusEvent, _ list.Collector) error {
if se.Type == event.StatusEventResourceUpdate {
id := se.Resource.Identifier
ef.printResourceStatus(id, se)
case pollevent.ErrorEvent:
id := se.Resource.Identifier
gk := id.GroupKind
ef.print("%s error: %s\n", resourceIDToString(gk, id.Name),
se.Error.Error())
case pollevent.CompletedEvent:
ef.print("all resources has reached the Current status")
}
return nil
}
@ -110,7 +101,7 @@ func (ef *formatter) FormatErrorEvent(_ event.ErrorEvent) error {
return nil
}
func (ef *formatter) printResourceStatus(id object.ObjMetadata, se pollevent.Event) {
func (ef *formatter) printResourceStatus(id object.ObjMetadata, se event.StatusEvent) {
ef.print("%s is %s: %s", resourceIDToString(id.GroupKind, id.Name),
se.Resource.Status.String(), se.Resource.Message)
}

View File

@ -65,7 +65,7 @@ func TestFormatter_FormatApplyEvent(t *testing.T) {
ServersideApplied: 1,
},
statusCollector: &fakeCollector{
m: map[object.ObjMetadata]pollevent.Event{
m: map[object.ObjMetadata]event.StatusEvent{
object.ObjMetadata{ //nolint:gofmt
GroupKind: schema.GroupKind{
Group: "apps",
@ -103,14 +103,14 @@ deployment.apps/my-dep is Current: Resource is Current
func TestFormatter_FormatStatusEvent(t *testing.T) {
testCases := map[string]struct {
previewStrategy common.DryRunStrategy
event pollevent.Event
event event.StatusEvent
statusCollector list.Collector
expected string
}{
"resource update with Current status": {
previewStrategy: common.DryRunNone,
event: pollevent.Event{
EventType: pollevent.ResourceUpdateEvent,
event: event.StatusEvent{
Type: event.StatusEventResourceUpdate,
Resource: &pollevent.ResourceStatus{
Identifier: object.ObjMetadata{
GroupKind: schema.GroupKind{
@ -126,31 +126,6 @@ func TestFormatter_FormatStatusEvent(t *testing.T) {
},
expected: "deployment.apps/bar is Current: Resource is Current",
},
"status event with error": {
previewStrategy: common.DryRunNone,
event: pollevent.Event{
EventType: pollevent.ErrorEvent,
Resource: &pollevent.ResourceStatus{
Identifier: object.ObjMetadata{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "foo",
Name: "bar",
},
},
Error: fmt.Errorf("this is a test error"),
},
expected: "deployment.apps/bar error: this is a test error",
},
"status event with completed type": {
previewStrategy: common.DryRunNone,
event: pollevent.Event{
EventType: pollevent.CompletedEvent,
},
expected: "all resources has reached the Current status",
},
}
for tn, tc := range testCases {
@ -280,9 +255,9 @@ func createObject(group, kind, namespace, name string) runtime.Object {
}
type fakeCollector struct {
m map[object.ObjMetadata]pollevent.Event
m map[object.ObjMetadata]event.StatusEvent
}
func (f *fakeCollector) LatestStatus() map[object.ObjMetadata]pollevent.Event {
func (f *fakeCollector) LatestStatus() map[object.ObjMetadata]event.StatusEvent {
return f.m
}

View File

@ -13,7 +13,6 @@ import (
"k8s.io/cli-runtime/pkg/genericclioptions"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/common"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/print/list"
)
@ -63,27 +62,15 @@ func (jf *formatter) FormatApplyEvent(ae event.ApplyEvent, as *list.ApplyStats,
return nil
}
func (jf *formatter) FormatStatusEvent(se pollevent.Event, _ list.Collector) error {
switch se.EventType {
case pollevent.ResourceUpdateEvent:
func (jf *formatter) FormatStatusEvent(se event.StatusEvent, _ list.Collector) error {
if se.Type == event.StatusEventResourceUpdate {
id := se.Resource.Identifier
return jf.printResourceStatus(id, se)
case pollevent.ErrorEvent:
id := se.Resource.Identifier
return jf.printEvent("status", "error", map[string]interface{}{
"group": id.GroupKind.Group,
"kind": id.GroupKind.Kind,
"namespace": id.Namespace,
"name": id.Name,
"error": se.Error.Error(),
})
case pollevent.CompletedEvent:
return jf.printEvent("status", "completed", map[string]interface{}{})
}
return nil
}
func (jf *formatter) printResourceStatus(id object.ObjMetadata, se pollevent.Event) error {
func (jf *formatter) printResourceStatus(id object.ObjMetadata, se event.StatusEvent) error {
return jf.printEvent("status", "resourceStatus",
map[string]interface{}{
"group": id.GroupKind.Group,

View File

@ -99,7 +99,7 @@ func TestFormatter_FormatApplyEvent(t *testing.T) {
ServersideApplied: 1,
},
statusCollector: &fakeCollector{
m: map[object.ObjMetadata]pollevent.Event{
m: map[object.ObjMetadata]event.StatusEvent{
object.ObjMetadata{ //nolint:gofmt
GroupKind: schema.GroupKind{
Group: "apps",
@ -163,14 +163,14 @@ func TestFormatter_FormatApplyEvent(t *testing.T) {
func TestFormatter_FormatStatusEvent(t *testing.T) {
testCases := map[string]struct {
previewStrategy common.DryRunStrategy
event pollevent.Event
event event.StatusEvent
statusCollector list.Collector
expected map[string]interface{}
}{
"resource update with Current status": {
previewStrategy: common.DryRunNone,
event: pollevent.Event{
EventType: pollevent.ResourceUpdateEvent,
event: event.StatusEvent{
Type: event.StatusEventResourceUpdate,
Resource: &pollevent.ResourceStatus{
Identifier: object.ObjMetadata{
GroupKind: schema.GroupKind{
@ -196,44 +196,6 @@ func TestFormatter_FormatStatusEvent(t *testing.T) {
"type": "status",
},
},
"status event with error": {
previewStrategy: common.DryRunNone,
event: pollevent.Event{
EventType: pollevent.ErrorEvent,
Resource: &pollevent.ResourceStatus{
Identifier: object.ObjMetadata{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Namespace: "foo",
Name: "bar",
},
},
Error: fmt.Errorf("this is a test error"),
},
expected: map[string]interface{}{
"error": "this is a test error",
"eventType": "error",
"group": "apps",
"kind": "Deployment",
"name": "bar",
"namespace": "foo",
"timestamp": "",
"type": "status",
},
},
"status event with completed type": {
previewStrategy: common.DryRunNone,
event: pollevent.Event{
EventType: pollevent.CompletedEvent,
},
expected: map[string]interface{}{
"eventType": "completed",
"timestamp": "",
"type": "status",
},
},
}
for tn, tc := range testCases {
@ -438,9 +400,9 @@ func createObject(group, kind, namespace, name string) runtime.Object {
}
type fakeCollector struct {
m map[object.ObjMetadata]pollevent.Event
m map[object.ObjMetadata]event.StatusEvent
}
func (f *fakeCollector) LatestStatus() map[object.ObjMetadata]pollevent.Event {
func (f *fakeCollector) LatestStatus() map[object.ObjMetadata]event.StatusEvent {
return f.m
}

View File

@ -182,12 +182,8 @@ func (r *ResourceStateCollector) processEvent(ev event.Event) error {
// processStatusEvent handles events pertaining to a status
// update for a resource.
func (r *ResourceStateCollector) processStatusEvent(e pe.Event) {
if e.EventType == pe.ErrorEvent {
r.processErrorEvent(e.Error)
return
}
if e.EventType == pe.ResourceUpdateEvent {
func (r *ResourceStateCollector) processStatusEvent(e event.StatusEvent) {
if e.Type == event.StatusEventResourceUpdate {
resource := e.Resource
previous := r.resourceInfos[resource.Identifier]
previous.resourceStatus = e.Resource
@ -218,11 +214,6 @@ func (r *ResourceStateCollector) processPruneEvent(e event.PruneEvent) {
}
}
// processErrorEvent handles events for errors.
func (r *ResourceStateCollector) processErrorEvent(err error) {
r.err = err
}
// toIdentifier extracts the identifying information from an
// object.
func toIdentifier(o runtime.Object) object.ObjMetadata {

View File

@ -81,7 +81,7 @@ type expectedEvent struct {
eventType event.Type
applyEventType event.ApplyEventType
statusEventType pollevent.EventType
statusEventType event.StatusEventType
pruneEventType event.PruneEventType
deleteEventType event.DeleteEventType
}
@ -185,19 +185,19 @@ func TestApplier(t *testing.T) {
},
{
eventType: event.StatusType,
statusEventType: pollevent.ResourceUpdateEvent,
statusEventType: event.StatusEventResourceUpdate,
},
{
eventType: event.StatusType,
statusEventType: pollevent.ResourceUpdateEvent,
statusEventType: event.StatusEventResourceUpdate,
},
{
eventType: event.StatusType,
statusEventType: pollevent.ResourceUpdateEvent,
statusEventType: event.StatusEventResourceUpdate,
},
{
eventType: event.StatusType,
statusEventType: pollevent.CompletedEvent,
statusEventType: event.StatusEventCompleted,
},
},
},
@ -260,7 +260,7 @@ func TestApplier(t *testing.T) {
case event.ApplyType:
assert.Equal(t, expected.applyEventType.String(), e.ApplyEvent.Type.String())
case event.StatusType:
assert.Equal(t, expected.statusEventType.String(), e.StatusEvent.EventType.String())
assert.Equal(t, expected.statusEventType.String(), e.StatusEvent.Type.String())
case event.PruneType:
assert.Equal(t, expected.pruneEventType.String(), e.PruneEvent.Type.String())
case event.DeleteType:

View File

@ -1,6 +1,3 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=ApplyEventOperation"; DO NOT EDIT.
package event

View File

@ -1,6 +1,3 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=ApplyEventType"; DO NOT EDIT.
package event

View File

@ -1,6 +1,3 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=DeleteEventOperation"; DO NOT EDIT.
package event

View File

@ -1,6 +1,3 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=DeleteEventType"; DO NOT EDIT.
package event

View File

@ -5,7 +5,7 @@ package event
import (
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/object"
)
@ -43,7 +43,7 @@ type Event struct {
// StatusEvents contains information about the status of one of
// the applied resources.
StatusEvent event.Event
StatusEvent StatusEvent
// PruneEvent contains information about objects that have been
// pruned.
@ -99,6 +99,19 @@ type ApplyEvent struct {
Object runtime.Object
}
//go:generate stringer -type=StatusEventType
type StatusEventType int
const (
StatusEventResourceUpdate StatusEventType = iota
StatusEventCompleted
)
type StatusEvent struct {
Type StatusEventType
Resource *pollevent.ResourceStatus
}
//go:generate stringer -type=PruneEventType
type PruneEventType int

View File

@ -1,6 +1,3 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=PruneEventOperation"; DO NOT EDIT.
package event

View File

@ -1,6 +1,3 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=PruneEventType"; DO NOT EDIT.
package event

View File

@ -1,6 +1,3 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=ResourceAction"; DO NOT EDIT.
package event

View File

@ -0,0 +1,24 @@
// Code generated by "stringer -type=StatusEventType"; DO NOT EDIT.
package event
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[StatusEventResourceUpdate-0]
_ = x[StatusEventCompleted-1]
}
const _StatusEventType_name = "StatusEventResourceUpdateStatusEventCompleted"
var _StatusEventType_index = [...]uint8{0, 25, 45}
func (i StatusEventType) String() string {
if i < 0 || i >= StatusEventType(len(_StatusEventType_index)-1) {
return "StatusEventType(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _StatusEventType_name[_StatusEventType_index[i]:_StatusEventType_index[i+1]]
}

View File

@ -1,6 +1,3 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=Type"; DO NOT EDIT.
package event

View File

@ -30,7 +30,6 @@ import (
"sigs.k8s.io/cli-utils/pkg/apply/task"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/object"
)
@ -115,8 +114,8 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects,
&task.SendEventTask{
Event: event.Event{
Type: event.StatusType,
StatusEvent: pollevent.Event{
EventType: pollevent.CompletedEvent,
StatusEvent: event.StatusEvent{
Type: event.StatusEventCompleted,
},
},
},
@ -150,8 +149,8 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects,
&task.SendEventTask{
Event: event.Event{
Type: event.StatusType,
StatusEvent: pollevent.Event{
EventType: pollevent.CompletedEvent,
StatusEvent: event.StatusEvent{
Type: event.StatusEventCompleted,
},
},
},

View File

@ -181,8 +181,11 @@ func (b *baseRunner) run(ctx context.Context, taskQueue chan Task,
if o.emitStatusEvents {
// Forward all normal events to the eventChannel
eventChannel <- event.Event{
Type: event.StatusType,
StatusEvent: statusEvent,
Type: event.StatusType,
StatusEvent: event.StatusEvent{
Type: event.StatusEventResourceUpdate,
Resource: statusEvent.Resource,
},
}
}

View File

@ -202,12 +202,6 @@ func (r *statusPollerRunner) Run() {
for {
select {
case <-r.ctx.Done():
// If the context has been cancelled, just send an CompletedEvent.
// Then return from this function, which will stop the ticker
// and close the event channel.
r.eventChannel <- event.Event{
EventType: event.CompletedEvent,
}
return
case <-ticker.C:
// First sync and then compute status for all resources.

View File

@ -52,7 +52,6 @@ func TestStatusPollerRunner(t *testing.T) {
expectedEventTypes: []event.EventType{
event.ResourceUpdateEvent,
event.ResourceUpdateEvent,
event.CompletedEvent,
},
},
"multiple resources": {
@ -93,7 +92,6 @@ func TestStatusPollerRunner(t *testing.T) {
event.ResourceUpdateEvent,
event.ResourceUpdateEvent,
event.ResourceUpdateEvent,
event.CompletedEvent,
},
},
}
@ -131,7 +129,7 @@ func TestStatusPollerRunner(t *testing.T) {
var eventTypes []event.EventType
for ch := range eventChannel {
eventTypes = append(eventTypes, ch.EventType)
if len(eventTypes) == len(tc.expectedEventTypes)-1 {
if len(eventTypes) == len(tc.expectedEventTypes) {
cancel()
}
}
@ -165,19 +163,11 @@ func TestNewStatusPollerRunnerCancellation(t *testing.T) {
eventChannel := engine.Poll(ctx, identifiers, options)
var lastEvent event.Event
for {
select {
case e, more := <-eventChannel:
case <-eventChannel:
timer.Stop()
if more {
lastEvent = e
} else {
if want, got := event.CompletedEvent, lastEvent.EventType; got != want {
t.Errorf("Expected e to have type %s, but got %s", want, got)
}
return
}
return
case <-timer.C:
t.Errorf("expected runner to time out, but it didn't")
return

View File

@ -18,9 +18,6 @@ type EventType int
const (
// ResourceUpdateEvent describes events related to a change in the status of one of the polled resources.
ResourceUpdateEvent EventType = iota
// CompletedEvent signals that all resources have been reconciled and the engine has completed its work. The
// event channel will be closed after this event.
CompletedEvent
// ErrorEvent signals that the engine has encountered an error that it can not recover from. The engine
// is shutting down and the event channel will be closed after this event.
ErrorEvent

View File

@ -1,6 +1,3 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Code generated by "stringer -type=EventType"; DO NOT EDIT.
package event
@ -12,13 +9,12 @@ func _() {
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[ResourceUpdateEvent-0]
_ = x[CompletedEvent-1]
_ = x[ErrorEvent-2]
_ = x[ErrorEvent-1]
}
const _EventType_name = "ResourceUpdateEventCompletedEventErrorEvent"
const _EventType_name = "ResourceUpdateEventErrorEvent"
var _EventType_index = [...]uint8{0, 19, 33, 43}
var _EventType_index = [...]uint8{0, 19, 29}
func (i EventType) String() string {
if i < 0 || i >= EventType(len(_EventType_index)-1) {

View File

@ -9,13 +9,12 @@ import (
"k8s.io/cli-runtime/pkg/genericclioptions"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/common"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/object"
)
type Formatter interface {
FormatApplyEvent(ae event.ApplyEvent, as *ApplyStats, c Collector) error
FormatStatusEvent(se pollevent.Event, sc Collector) error
FormatStatusEvent(se event.StatusEvent, sc Collector) error
FormatPruneEvent(pe event.PruneEvent, ps *PruneStats) error
FormatDeleteEvent(de event.DeleteEvent, ds *DeleteStats) error
FormatErrorEvent(ee event.ErrorEvent) error
@ -82,18 +81,18 @@ func (d *DeleteStats) incSkipped() {
}
type Collector interface {
LatestStatus() map[object.ObjMetadata]pollevent.Event
LatestStatus() map[object.ObjMetadata]event.StatusEvent
}
type StatusCollector struct {
latestStatus map[object.ObjMetadata]pollevent.Event
latestStatus map[object.ObjMetadata]event.StatusEvent
}
func (sc *StatusCollector) updateStatus(id object.ObjMetadata, se pollevent.Event) {
func (sc *StatusCollector) updateStatus(id object.ObjMetadata, se event.StatusEvent) {
sc.latestStatus[id] = se
}
func (sc *StatusCollector) LatestStatus() map[object.ObjMetadata]pollevent.Event {
func (sc *StatusCollector) LatestStatus() map[object.ObjMetadata]event.StatusEvent {
return sc.latestStatus
}
@ -104,7 +103,7 @@ func (sc *StatusCollector) LatestStatus() map[object.ObjMetadata]pollevent.Event
func (b *BaseListPrinter) Print(ch <-chan event.Event, previewStrategy common.DryRunStrategy) error {
applyStats := &ApplyStats{}
statusCollector := &StatusCollector{
latestStatus: make(map[object.ObjMetadata]pollevent.Event),
latestStatus: make(map[object.ObjMetadata]event.StatusEvent),
}
printStatus := false
pruneStats := &PruneStats{}
@ -126,23 +125,13 @@ func (b *BaseListPrinter) Print(ch <-chan event.Event, previewStrategy common.Dr
return err
}
case event.StatusType:
switch se := e.StatusEvent; se.EventType {
case pollevent.ResourceUpdateEvent:
if se := e.StatusEvent; se.Type == event.StatusEventResourceUpdate {
statusCollector.updateStatus(e.StatusEvent.Resource.Identifier, e.StatusEvent)
if printStatus {
if err := formatter.FormatStatusEvent(e.StatusEvent, statusCollector); err != nil {
return err
}
}
case pollevent.ErrorEvent:
if err := formatter.FormatStatusEvent(e.StatusEvent, statusCollector); err != nil {
return err
}
case pollevent.CompletedEvent:
printStatus = false
if err := formatter.FormatStatusEvent(e.StatusEvent, statusCollector); err != nil {
return err
}
}
case event.PruneType:
if e.PruneEvent.Type == event.PruneEventResourceUpdate {