mirror of https://github.com/knative/func.git
Merge 8f60d4c559
into 9e117217a6
This commit is contained in:
commit
301157ca66
|
@ -45,7 +45,7 @@ and an 'extension' attribute for the value 'my-extension-value'.
|
|||
|
||||
func runSubscribe(cmd *cobra.Command) (err error) {
|
||||
var (
|
||||
cfg subscibeConfig
|
||||
cfg subscribeConfig
|
||||
f fn.Function
|
||||
)
|
||||
cfg = newSubscribeConfig(cmd)
|
||||
|
@ -61,66 +61,69 @@ func runSubscribe(cmd *cobra.Command) (err error) {
|
|||
}
|
||||
|
||||
// add subscription to function
|
||||
f.Deploy.Subscriptions = updateOrAddSubscription(f.Deploy.Subscriptions, cfg)
|
||||
f.Deploy.Subscriptions = addNewSubscriptions(f.Deploy.Subscriptions, cfg)
|
||||
|
||||
// pump it
|
||||
return f.Write()
|
||||
}
|
||||
|
||||
func extractFilterMap(filters []string) map[string]string {
|
||||
subscriptionFilters := make(map[string]string)
|
||||
func extractNewSubscriptions(filters []string, source string) (newSubscriptions []*subscriptionConfig) {
|
||||
for _, filter := range filters {
|
||||
kv := strings.Split(filter, "=")
|
||||
if len(kv) != 2 {
|
||||
fmt.Println("Invalid pair:", filter)
|
||||
continue
|
||||
}
|
||||
key := kv[0]
|
||||
value := kv[1]
|
||||
subscriptionFilters[key] = value
|
||||
filterKey := kv[0]
|
||||
filterValue := kv[1]
|
||||
newSubscriptions = append(newSubscriptions, &subscriptionConfig{
|
||||
Source: source,
|
||||
FilterKey: filterKey,
|
||||
FilterValue: filterValue,
|
||||
})
|
||||
}
|
||||
return subscriptionFilters
|
||||
return newSubscriptions
|
||||
}
|
||||
|
||||
type subscibeConfig struct {
|
||||
type subscriptionConfig struct {
|
||||
Source string
|
||||
FilterKey string
|
||||
FilterValue string
|
||||
}
|
||||
|
||||
type subscribeConfig struct {
|
||||
Filter []string
|
||||
Source string
|
||||
}
|
||||
|
||||
func updateOrAddSubscription(subscriptions []fn.KnativeSubscription, cfg subscibeConfig) []fn.KnativeSubscription {
|
||||
found := false
|
||||
newFilters := extractFilterMap(cfg.Filter)
|
||||
|
||||
// Iterate over subscriptions to find if one with the same source already exists
|
||||
for i, subscription := range subscriptions {
|
||||
if subscription.Source == cfg.Source {
|
||||
found = true
|
||||
|
||||
if subscription.Filters == nil {
|
||||
subscription.Filters = make(map[string]string)
|
||||
func addNewSubscriptions(subscriptions []fn.KnativeSubscription, cfg subscribeConfig) []fn.KnativeSubscription {
|
||||
newSubscriptions := extractNewSubscriptions(cfg.Filter, cfg.Source)
|
||||
for _, newSubscription := range newSubscriptions {
|
||||
isNew := true
|
||||
for _, subscription := range subscriptions {
|
||||
if subscription.Source == newSubscription.Source {
|
||||
for k, v := range subscription.Filters {
|
||||
if k == newSubscription.FilterKey && v == newSubscription.FilterValue {
|
||||
isNew = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update filters. Override if the key already exists.
|
||||
for newKey, newValue := range newFilters {
|
||||
subscription.Filters[newKey] = newValue
|
||||
}
|
||||
subscriptions[i] = subscription // Reassign the updated subscription
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If a subscription with the source was not found, add a new one
|
||||
if !found {
|
||||
subscriptions = append(subscriptions, fn.KnativeSubscription{
|
||||
Source: cfg.Source,
|
||||
Filters: newFilters,
|
||||
})
|
||||
if isNew {
|
||||
newFilter := make(map[string]string)
|
||||
newFilter[newSubscription.FilterKey] = newSubscription.FilterValue
|
||||
subscriptions = append(subscriptions, fn.KnativeSubscription{
|
||||
Source: cfg.Source,
|
||||
Filters: newFilter,
|
||||
})
|
||||
}
|
||||
}
|
||||
return subscriptions
|
||||
}
|
||||
|
||||
func newSubscribeConfig(cmd *cobra.Command) (c subscibeConfig) {
|
||||
c = subscibeConfig{
|
||||
func newSubscribeConfig(cmd *cobra.Command) (c subscribeConfig) {
|
||||
c = subscribeConfig{
|
||||
Filter: viper.GetStringSlice("filter"),
|
||||
Source: viper.GetString("source"),
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue