mirror of https://github.com/knative/pkg.git
parent
0605de9264
commit
3da93ebb24
|
@ -30,9 +30,9 @@ import (
|
|||
scheme "k8s.io/client-go/kubernetes/scheme"
|
||||
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
record "k8s.io/client-go/tools/record"
|
||||
apiextensionsclient "knative.dev/pkg/client/injection/apiextensions/client"
|
||||
client "knative.dev/pkg/client/injection/apiextensions/client"
|
||||
customresourcedefinition "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1beta1/customresourcedefinition"
|
||||
client "knative.dev/pkg/client/injection/kube/client"
|
||||
kubeclient "knative.dev/pkg/client/injection/kube/client"
|
||||
controller "knative.dev/pkg/controller"
|
||||
logging "knative.dev/pkg/logging"
|
||||
)
|
||||
|
@ -56,29 +56,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF
|
|||
|
||||
customresourcedefinitionInformer := customresourcedefinition.Get(ctx)
|
||||
|
||||
recorder := controller.GetEventRecorder(ctx)
|
||||
if recorder == nil {
|
||||
// Create event broadcaster
|
||||
logger.Debug("Creating event broadcaster")
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
watches := []watch.Interface{
|
||||
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof),
|
||||
eventBroadcaster.StartRecordingToSink(
|
||||
&v1.EventSinkImpl{Interface: client.Get(ctx).CoreV1().Events("")}),
|
||||
}
|
||||
recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: defaultControllerAgentName})
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
for _, w := range watches {
|
||||
w.Stop()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
rec := &reconcilerImpl{
|
||||
Client: apiextensionsclient.Get(ctx),
|
||||
Client: client.Get(ctx),
|
||||
Lister: customresourcedefinitionInformer.Lister(),
|
||||
Recorder: recorder,
|
||||
reconciler: r,
|
||||
finalizerName: defaultFinalizerName,
|
||||
}
|
||||
|
@ -87,6 +67,7 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF
|
|||
queueName := fmt.Sprintf("%s.%s", strings.ReplaceAll(t.PkgPath(), "/", "-"), t.Name())
|
||||
|
||||
impl := controller.NewImpl(rec, logger, queueName)
|
||||
agentName := defaultControllerAgentName
|
||||
|
||||
// Pass impl to the options. Save any optional results.
|
||||
for _, fn := range optionsFns {
|
||||
|
@ -97,11 +78,41 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF
|
|||
if opts.FinalizerName != "" {
|
||||
rec.finalizerName = opts.FinalizerName
|
||||
}
|
||||
if opts.AgentName != "" {
|
||||
agentName = opts.AgentName
|
||||
}
|
||||
}
|
||||
|
||||
rec.Recorder = createRecorder(ctx, agentName)
|
||||
|
||||
return impl
|
||||
}
|
||||
|
||||
func createRecorder(ctx context.Context, agentName string) record.EventRecorder {
|
||||
logger := logging.FromContext(ctx)
|
||||
|
||||
recorder := controller.GetEventRecorder(ctx)
|
||||
if recorder == nil {
|
||||
// Create event broadcaster
|
||||
logger.Debug("Creating event broadcaster")
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
watches := []watch.Interface{
|
||||
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof),
|
||||
eventBroadcaster.StartRecordingToSink(
|
||||
&v1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events("")}),
|
||||
}
|
||||
recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName})
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
for _, w := range watches {
|
||||
w.Stop()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return recorder
|
||||
}
|
||||
|
||||
func init() {
|
||||
clientsetscheme.AddToScheme(scheme.Scheme)
|
||||
}
|
||||
|
|
|
@ -54,29 +54,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF
|
|||
|
||||
namespaceInformer := namespace.Get(ctx)
|
||||
|
||||
recorder := controller.GetEventRecorder(ctx)
|
||||
if recorder == nil {
|
||||
// Create event broadcaster
|
||||
logger.Debug("Creating event broadcaster")
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
watches := []watch.Interface{
|
||||
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof),
|
||||
eventBroadcaster.StartRecordingToSink(
|
||||
&v1.EventSinkImpl{Interface: client.Get(ctx).CoreV1().Events("")}),
|
||||
}
|
||||
recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: defaultControllerAgentName})
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
for _, w := range watches {
|
||||
w.Stop()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
rec := &reconcilerImpl{
|
||||
Client: client.Get(ctx),
|
||||
Lister: namespaceInformer.Lister(),
|
||||
Recorder: recorder,
|
||||
reconciler: r,
|
||||
finalizerName: defaultFinalizerName,
|
||||
}
|
||||
|
@ -85,6 +65,7 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF
|
|||
queueName := fmt.Sprintf("%s.%s", strings.ReplaceAll(t.PkgPath(), "/", "-"), t.Name())
|
||||
|
||||
impl := controller.NewImpl(rec, logger, queueName)
|
||||
agentName := defaultControllerAgentName
|
||||
|
||||
// Pass impl to the options. Save any optional results.
|
||||
for _, fn := range optionsFns {
|
||||
|
@ -95,11 +76,41 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF
|
|||
if opts.FinalizerName != "" {
|
||||
rec.finalizerName = opts.FinalizerName
|
||||
}
|
||||
if opts.AgentName != "" {
|
||||
agentName = opts.AgentName
|
||||
}
|
||||
}
|
||||
|
||||
rec.Recorder = createRecorder(ctx, agentName)
|
||||
|
||||
return impl
|
||||
}
|
||||
|
||||
func createRecorder(ctx context.Context, agentName string) record.EventRecorder {
|
||||
logger := logging.FromContext(ctx)
|
||||
|
||||
recorder := controller.GetEventRecorder(ctx)
|
||||
if recorder == nil {
|
||||
// Create event broadcaster
|
||||
logger.Debug("Creating event broadcaster")
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
watches := []watch.Interface{
|
||||
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof),
|
||||
eventBroadcaster.StartRecordingToSink(
|
||||
&v1.EventSinkImpl{Interface: client.Get(ctx).CoreV1().Events("")}),
|
||||
}
|
||||
recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName})
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
for _, w := range watches {
|
||||
w.Stop()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return recorder
|
||||
}
|
||||
|
||||
func init() {
|
||||
scheme.AddToScheme(scheme.Scheme)
|
||||
}
|
||||
|
|
|
@ -193,13 +193,13 @@ func extractCommentTags(t *types.Type) map[string]map[string]string {
|
|||
return ExtractCommentTags("+", comments)
|
||||
}
|
||||
|
||||
func extractReconcilerClassTag(tags map[string]map[string]string) (classname string, has bool) {
|
||||
vals, has := tags["genreconciler"]
|
||||
if !has {
|
||||
return
|
||||
func extractReconcilerClassTag(tags map[string]map[string]string) (string, bool) {
|
||||
vals, ok := tags["genreconciler"]
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
classname, _ = vals["class"]
|
||||
return
|
||||
classname, has := vals["class"]
|
||||
return classname, has
|
||||
}
|
||||
|
||||
func isNonNamespaced(tags map[string]map[string]string) bool {
|
||||
|
|
Loading…
Reference in New Issue