confluent kafka binding
Signed-off-by: myan <myan@redhat.com> add message implementation Signed-off-by: myan <myan@redhat.com> add ut test Signed-off-by: myan <myan@redhat.com> add integration test Signed-off-by: myan <myan@redhat.com> add integration test and samples Signed-off-by: Meng Yan <myan@redhat.com> offset Signed-off-by: Meng Yan <myan@redhat.com> remove the ctx Signed-off-by: myan <myan@redhat.com> review Signed-off-by: Meng Yan <myan@redhat.com> remove Signed-off-by: Meng Yan <myan@redhat.com> init consumer and producer on 1 client Signed-off-by: myan <myan@redhat.com> reply the reviews Signed-off-by: myan <myan@redhat.com> fix the ci Signed-off-by: myan <myan@redhat.com> ci fix Signed-off-by: Meng Yan <myan@redhat.com> add confluent test in github action Signed-off-by: Meng Yan <myan@redhat.com> add the mis-used test case Signed-off-by: Meng Yan <myan@redhat.com> remove the invalidated bootstrapserver Signed-off-by: Meng Yan <myan@redhat.com> log kafka error message Signed-off-by: myan <myan@redhat.com> review Signed-off-by: myan <myan@redhat.com> Update protocol/kafka_confluent/v2/option.go Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com> Signed-off-by: myan <myan@redhat.com> add the auto recover option Signed-off-by: myan <myan@redhat.com> kafka error handler Signed-off-by: myan <myan@redhat.com> remove the delievery chan Signed-off-by: myan <myan@redhat.com> Update protocol/kafka_confluent/v2/protocol.go Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com> Update protocol/kafka_confluent/v2/protocol.go Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com> Update protocol/kafka_confluent/v2/option.go Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com> Update protocol/kafka_confluent/v2/protocol.go Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com> reply review Signed-off-by: myan <myan@redhat.com> modify the git action Signed-off-by: Meng Yan <myan@redhat.com> reply review Signed-off-by: Meng Yan <myan@redhat.com> reply review Signed-off-by: Meng Yan <myan@redhat.com> handle race condition between sender and closer Signed-off-by: Meng Yan <myan@redhat.com> reply review Signed-off-by: Meng Yan <myan@redhat.com> reply review1 Signed-off-by: Meng Yan <myan@redhat.com> add defer close Signed-off-by: Meng Yan <myan@redhat.com> add comment Signed-off-by: Meng Yan <myan@redhat.com>
This commit is contained in:
parent
e118aba5b1
commit
5cd87eafb4
|
|
@ -27,6 +27,15 @@ jobs:
|
|||
- 9091:9091
|
||||
- 9092:9092
|
||||
|
||||
kafka_confluent:
|
||||
image: confluentinc/confluent-local:7.6.0
|
||||
ports:
|
||||
- "9192:9192"
|
||||
env:
|
||||
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:29192,PLAINTEXT_HOST://localhost:9192'
|
||||
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@localhost:29193'
|
||||
KAFKA_LISTENERS: 'PLAINTEXT://localhost:29192,CONTROLLER://localhost:29193,PLAINTEXT_HOST://0.0.0.0:9192'
|
||||
|
||||
natss:
|
||||
image: nats-streaming:0.22.1
|
||||
ports:
|
||||
|
|
|
|||
|
|
@ -124,7 +124,8 @@ err := json.Unmarshal(bytes, &event)
|
|||
| AVRO Event Format | :x: | :x: |
|
||||
| [HTTP Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/http) | :heavy_check_mark: | :heavy_check_mark: |
|
||||
| [JSON Event Format](event_data_structure.md#marshalunmarshal-event-to-json) | :heavy_check_mark: | :heavy_check_mark: |
|
||||
| [Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka) | :heavy_check_mark: | :heavy_check_mark: |
|
||||
| [Sarama Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka) | :heavy_check_mark: | :heavy_check_mark: |
|
||||
| [Confluent Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka_confluent) | :heavy_check_mark: | :heavy_check_mark: |
|
||||
| MQTT Protocol Binding | :x: | :x: |
|
||||
| [NATS Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/nats) | :heavy_check_mark: | :heavy_check_mark: |
|
||||
| [STAN Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/stan) | :heavy_check_mark: | :heavy_check_mark: |
|
||||
|
|
|
|||
|
|
@ -0,0 +1,25 @@
|
|||
module github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2
|
||||
|
||||
go 1.18
|
||||
|
||||
replace github.com/cloudevents/sdk-go/v2 => ../../../v2
|
||||
|
||||
require (
|
||||
github.com/cloudevents/sdk-go/v2 v2.15.2
|
||||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
|
||||
github.com/stretchr/testify v1.8.4
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/google/go-cmp v0.5.9 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/json-iterator/go v1.1.11 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
go.uber.org/atomic v1.4.0 // indirect
|
||||
go.uber.org/multierr v1.1.0 // indirect
|
||||
go.uber.org/zap v1.10.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
|
||||
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
|
||||
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I=
|
||||
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
|
||||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts=
|
||||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8=
|
||||
github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA=
|
||||
github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68=
|
||||
github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE=
|
||||
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
|
||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
|
||||
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
|
||||
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
|
||||
github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs=
|
||||
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78=
|
||||
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
|
||||
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
|
||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec=
|
||||
github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
|
||||
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
|
||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
|
||||
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
|
||||
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
|
||||
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
|
||||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
|
||||
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08=
|
||||
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
|
||||
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
Copyright 2023 The CloudEvents Authors
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package kafka_confluent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/cloudevents/sdk-go/v2/binding"
|
||||
"github.com/cloudevents/sdk-go/v2/binding/format"
|
||||
"github.com/cloudevents/sdk-go/v2/binding/spec"
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
)
|
||||
|
||||
const (
|
||||
prefix = "ce-"
|
||||
contentTypeKey = "content-type"
|
||||
)
|
||||
|
||||
const (
|
||||
KafkaOffsetKey = "kafkaoffset"
|
||||
KafkaPartitionKey = "kafkapartition"
|
||||
KafkaTopicKey = "kafkatopic"
|
||||
KafkaMessageKey = "kafkamessagekey"
|
||||
)
|
||||
|
||||
var specs = spec.WithPrefix(prefix)
|
||||
|
||||
// Message represents a Kafka message.
|
||||
// This message *can* be read several times safely
|
||||
type Message struct {
|
||||
internal *kafka.Message
|
||||
properties map[string][]byte
|
||||
format format.Format
|
||||
version spec.Version
|
||||
}
|
||||
|
||||
// Check if Message implements binding.Message
|
||||
var (
|
||||
_ binding.Message = (*Message)(nil)
|
||||
_ binding.MessageMetadataReader = (*Message)(nil)
|
||||
)
|
||||
|
||||
// NewMessage returns a binding.Message that holds the provided kafka.Message.
|
||||
// The returned binding.Message *can* be read several times safely
|
||||
// This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance
|
||||
func NewMessage(msg *kafka.Message) *Message {
|
||||
if msg == nil {
|
||||
panic("the kafka.Message shouldn't be nil")
|
||||
}
|
||||
if msg.TopicPartition.Topic == nil {
|
||||
panic("the topic of kafka.Message shouldn't be nil")
|
||||
}
|
||||
if msg.TopicPartition.Partition < 0 || msg.TopicPartition.Offset < 0 {
|
||||
panic("the partition or offset of the kafka.Message must be non-negative")
|
||||
}
|
||||
|
||||
var contentType, contentVersion string
|
||||
properties := make(map[string][]byte, len(msg.Headers)+3)
|
||||
for _, header := range msg.Headers {
|
||||
k := strings.ToLower(string(header.Key))
|
||||
if k == strings.ToLower(contentTypeKey) {
|
||||
contentType = string(header.Value)
|
||||
}
|
||||
if k == specs.PrefixedSpecVersionName() {
|
||||
contentVersion = string(header.Value)
|
||||
}
|
||||
properties[k] = header.Value
|
||||
}
|
||||
|
||||
// add the kafka message key, topic, partition and partition key to the properties
|
||||
properties[prefix+KafkaOffsetKey] = []byte(strconv.FormatInt(int64(msg.TopicPartition.Offset), 10))
|
||||
properties[prefix+KafkaPartitionKey] = []byte(strconv.FormatInt(int64(msg.TopicPartition.Partition), 10))
|
||||
properties[prefix+KafkaTopicKey] = []byte(*msg.TopicPartition.Topic)
|
||||
if msg.Key != nil {
|
||||
properties[prefix+KafkaMessageKey] = msg.Key
|
||||
}
|
||||
|
||||
message := &Message{
|
||||
internal: msg,
|
||||
properties: properties,
|
||||
}
|
||||
if ft := format.Lookup(contentType); ft != nil {
|
||||
message.format = ft
|
||||
} else if v := specs.Version(contentVersion); v != nil {
|
||||
message.version = v
|
||||
}
|
||||
|
||||
return message
|
||||
}
|
||||
|
||||
func (m *Message) ReadEncoding() binding.Encoding {
|
||||
if m.version != nil {
|
||||
return binding.EncodingBinary
|
||||
}
|
||||
if m.format != nil {
|
||||
return binding.EncodingStructured
|
||||
}
|
||||
return binding.EncodingUnknown
|
||||
}
|
||||
|
||||
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
|
||||
if m.format != nil {
|
||||
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.internal.Value))
|
||||
}
|
||||
return binding.ErrNotStructured
|
||||
}
|
||||
|
||||
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error {
|
||||
if m.version == nil {
|
||||
return binding.ErrNotBinary
|
||||
}
|
||||
|
||||
var err error
|
||||
for k, v := range m.properties {
|
||||
if strings.HasPrefix(k, prefix) {
|
||||
attr := m.version.Attribute(k)
|
||||
if attr != nil {
|
||||
err = encoder.SetAttribute(attr, string(v))
|
||||
} else {
|
||||
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), string(v))
|
||||
}
|
||||
} else if k == strings.ToLower(contentTypeKey) {
|
||||
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(v))
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if m.internal.Value != nil {
|
||||
err = encoder.SetData(bytes.NewBuffer(m.internal.Value))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *Message) Finish(error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) {
|
||||
attr := m.version.AttributeFromKind(k)
|
||||
if attr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return attr, m.properties[attr.PrefixedName()]
|
||||
}
|
||||
|
||||
func (m *Message) GetExtension(name string) interface{} {
|
||||
return m.properties[prefix+name]
|
||||
}
|
||||
|
|
@ -0,0 +1,131 @@
|
|||
/*
|
||||
Copyright 2023 The CloudEvents Authors
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package kafka_confluent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
cloudevents "github.com/cloudevents/sdk-go/v2"
|
||||
"github.com/cloudevents/sdk-go/v2/binding"
|
||||
"github.com/cloudevents/sdk-go/v2/binding/format"
|
||||
"github.com/cloudevents/sdk-go/v2/test"
|
||||
)
|
||||
|
||||
var (
|
||||
ctx = context.Background()
|
||||
testEvent = test.FullEvent()
|
||||
testTopic = "test-topic"
|
||||
topicPartition = kafka.TopicPartition{
|
||||
Topic: &testTopic,
|
||||
Partition: int32(0),
|
||||
Offset: kafka.Offset(10),
|
||||
}
|
||||
structuredConsumerMessage = &kafka.Message{
|
||||
TopicPartition: topicPartition,
|
||||
Value: func() []byte {
|
||||
b, _ := format.JSON.Marshal(&testEvent)
|
||||
return b
|
||||
}(),
|
||||
Headers: []kafka.Header{{
|
||||
Key: "content-type",
|
||||
Value: []byte(cloudevents.ApplicationCloudEventsJSON),
|
||||
}},
|
||||
}
|
||||
binaryConsumerMessage = &kafka.Message{
|
||||
TopicPartition: topicPartition,
|
||||
Value: []byte("hello world!"),
|
||||
Headers: mapToKafkaHeaders(map[string]string{
|
||||
"ce-type": testEvent.Type(),
|
||||
"ce-source": testEvent.Source(),
|
||||
"ce-id": testEvent.ID(),
|
||||
"ce-time": test.Timestamp.String(),
|
||||
"ce-specversion": "1.0",
|
||||
"ce-dataschema": test.Schema.String(),
|
||||
"ce-datacontenttype": "text/json",
|
||||
"ce-subject": "receiverTopic",
|
||||
"exta": "someext",
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
func TestNewMessage(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
consumerMessage *kafka.Message
|
||||
expectedEncoding binding.Encoding
|
||||
}{
|
||||
{
|
||||
name: "Structured encoding",
|
||||
consumerMessage: structuredConsumerMessage,
|
||||
expectedEncoding: binding.EncodingStructured,
|
||||
},
|
||||
{
|
||||
name: "Binary encoding",
|
||||
consumerMessage: binaryConsumerMessage,
|
||||
expectedEncoding: binding.EncodingBinary,
|
||||
},
|
||||
{
|
||||
name: "Unknown encoding",
|
||||
consumerMessage: &kafka.Message{
|
||||
TopicPartition: topicPartition,
|
||||
Value: []byte("{}"),
|
||||
Headers: []kafka.Header{{
|
||||
Key: "content-type",
|
||||
Value: []byte("application/json"),
|
||||
}},
|
||||
},
|
||||
expectedEncoding: binding.EncodingUnknown,
|
||||
},
|
||||
{
|
||||
name: "Binary encoding with empty value",
|
||||
consumerMessage: &kafka.Message{
|
||||
TopicPartition: topicPartition,
|
||||
Value: nil,
|
||||
Headers: mapToKafkaHeaders(map[string]string{
|
||||
"ce-type": testEvent.Type(),
|
||||
"ce-source": testEvent.Source(),
|
||||
"ce-id": testEvent.ID(),
|
||||
"ce-time": test.Timestamp.String(),
|
||||
"ce-specversion": "1.0",
|
||||
"ce-dataschema": test.Schema.String(),
|
||||
"ce-datacontenttype": "text/json",
|
||||
"ce-subject": "receiverTopic",
|
||||
}),
|
||||
},
|
||||
expectedEncoding: binding.EncodingBinary,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
msg := NewMessage(tt.consumerMessage)
|
||||
require.Equal(t, tt.expectedEncoding, msg.ReadEncoding())
|
||||
|
||||
var err error
|
||||
if tt.expectedEncoding == binding.EncodingStructured {
|
||||
err = msg.ReadStructured(ctx, (*kafkaMessageWriter)(tt.consumerMessage))
|
||||
}
|
||||
|
||||
if tt.expectedEncoding == binding.EncodingBinary {
|
||||
err = msg.ReadBinary(ctx, (*kafkaMessageWriter)(tt.consumerMessage))
|
||||
}
|
||||
require.Nil(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func mapToKafkaHeaders(m map[string]string) []kafka.Header {
|
||||
res := make([]kafka.Header, len(m))
|
||||
i := 0
|
||||
for k, v := range m {
|
||||
res[i] = kafka.Header{Key: k, Value: []byte(v)}
|
||||
i++
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
Copyright 2023 The CloudEvents Authors
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package kafka_confluent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
)
|
||||
|
||||
// Option is the function signature required to be considered an kafka_confluent.Option.
|
||||
type Option func(*Protocol) error
|
||||
|
||||
// WithConfigMap sets the configMap to init the kafka client. This option is not required.
|
||||
func WithConfigMap(config *kafka.ConfigMap) Option {
|
||||
return func(p *Protocol) error {
|
||||
if config == nil {
|
||||
return errors.New("the kafka.ConfigMap option must not be nil")
|
||||
}
|
||||
p.kafkaConfigMap = config
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithSenderTopic sets the defaultTopic for the kafka.Producer. This option is not required.
|
||||
func WithSenderTopic(defaultTopic string) Option {
|
||||
return func(p *Protocol) error {
|
||||
if defaultTopic == "" {
|
||||
return errors.New("the producer topic option must not be nil")
|
||||
}
|
||||
p.producerDefaultTopic = defaultTopic
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithReceiverTopics sets the topics for the kafka.Consumer. This option is not required.
|
||||
func WithReceiverTopics(topics []string) Option {
|
||||
return func(p *Protocol) error {
|
||||
if topics == nil {
|
||||
return errors.New("the consumer topics option must not be nil")
|
||||
}
|
||||
p.consumerTopics = topics
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. This option is not required.
|
||||
func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option {
|
||||
return func(p *Protocol) error {
|
||||
if rebalanceCb == nil {
|
||||
return errors.New("the consumer group rebalance callback must not be nil")
|
||||
}
|
||||
p.consumerRebalanceCb = rebalanceCb
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. This option is not required.
|
||||
func WithPollTimeout(timeoutMs int) Option {
|
||||
return func(p *Protocol) error {
|
||||
p.consumerPollTimeout = timeoutMs
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithSender set a kafka.Producer instance to init the client directly. This option is not required.
|
||||
func WithSender(producer *kafka.Producer) Option {
|
||||
return func(p *Protocol) error {
|
||||
if producer == nil {
|
||||
return errors.New("the producer option must not be nil")
|
||||
}
|
||||
p.producer = producer
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. This option is not required.
|
||||
func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option {
|
||||
return func(p *Protocol) error {
|
||||
p.consumerErrorHandler = handler
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithSender set a kafka.Consumer instance to init the client directly. This option is not required.
|
||||
func WithReceiver(consumer *kafka.Consumer) Option {
|
||||
return func(p *Protocol) error {
|
||||
if consumer == nil {
|
||||
return errors.New("the consumer option must not be nil")
|
||||
}
|
||||
p.consumer = consumer
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Opaque key type used to store topicPartitionOffsets: assign them from ctx. This option is not required.
|
||||
type topicPartitionOffsetsType struct{}
|
||||
|
||||
var offsetKey = topicPartitionOffsetsType{}
|
||||
|
||||
// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. This option is not required.
|
||||
func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context {
|
||||
if len(topicPartitionOffsets) == 0 {
|
||||
panic("the topicPartitionOffsets cannot be empty")
|
||||
}
|
||||
for _, offset := range topicPartitionOffsets {
|
||||
if offset.Topic == nil || *(offset.Topic) == "" {
|
||||
panic("the kafka topic cannot be nil or empty")
|
||||
}
|
||||
if offset.Partition < 0 || offset.Offset < 0 {
|
||||
panic("the kafka partition/offset must be non-negative")
|
||||
}
|
||||
}
|
||||
return context.WithValue(ctx, offsetKey, topicPartitionOffsets)
|
||||
}
|
||||
|
||||
// TopicPartitionOffsetsFrom looks in the given context and returns []kafka.TopicPartition or nil if not set
|
||||
func TopicPartitionOffsetsFrom(ctx context.Context) []kafka.TopicPartition {
|
||||
c := ctx.Value(offsetKey)
|
||||
if c != nil {
|
||||
if s, ok := c.([]kafka.TopicPartition); ok {
|
||||
return s
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Opaque key type used to store message key
|
||||
type messageKeyType struct{}
|
||||
|
||||
var keyForMessageKey = messageKeyType{}
|
||||
|
||||
// WithMessageKey returns back a new context with the given messageKey.
|
||||
func WithMessageKey(ctx context.Context, messageKey string) context.Context {
|
||||
return context.WithValue(ctx, keyForMessageKey, messageKey)
|
||||
}
|
||||
|
||||
// MessageKeyFrom looks in the given context and returns `messageKey` as a string if found and valid, otherwise "".
|
||||
func MessageKeyFrom(ctx context.Context) string {
|
||||
c := ctx.Value(keyForMessageKey)
|
||||
if c != nil {
|
||||
if s, ok := c.(string); ok {
|
||||
return s
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
|
@ -0,0 +1,245 @@
|
|||
/*
|
||||
Copyright 2023 The CloudEvents Authors
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package kafka_confluent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/cloudevents/sdk-go/v2/binding"
|
||||
"github.com/cloudevents/sdk-go/v2/protocol"
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
|
||||
cecontext "github.com/cloudevents/sdk-go/v2/context"
|
||||
)
|
||||
|
||||
var (
|
||||
_ protocol.Sender = (*Protocol)(nil)
|
||||
_ protocol.Opener = (*Protocol)(nil)
|
||||
_ protocol.Receiver = (*Protocol)(nil)
|
||||
_ protocol.Closer = (*Protocol)(nil)
|
||||
)
|
||||
|
||||
type Protocol struct {
|
||||
kafkaConfigMap *kafka.ConfigMap
|
||||
|
||||
consumer *kafka.Consumer
|
||||
consumerTopics []string
|
||||
consumerRebalanceCb kafka.RebalanceCb // optional
|
||||
consumerPollTimeout int // optional
|
||||
consumerErrorHandler func(ctx context.Context, err kafka.Error) //optional
|
||||
consumerMux sync.Mutex
|
||||
consumerIncoming chan *kafka.Message
|
||||
consumerCtx context.Context
|
||||
consumerCancel context.CancelFunc
|
||||
|
||||
producer *kafka.Producer
|
||||
producerDeliveryChan chan kafka.Event // optional
|
||||
producerDefaultTopic string // optional
|
||||
|
||||
closerMux sync.Mutex
|
||||
}
|
||||
|
||||
func New(opts ...Option) (*Protocol, error) {
|
||||
p := &Protocol{
|
||||
consumerPollTimeout: 100,
|
||||
consumerIncoming: make(chan *kafka.Message),
|
||||
}
|
||||
if err := p.applyOptions(opts...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if p.kafkaConfigMap != nil {
|
||||
if p.consumerTopics != nil && p.consumer == nil {
|
||||
consumer, err := kafka.NewConsumer(p.kafkaConfigMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.consumer = consumer
|
||||
}
|
||||
if p.producerDefaultTopic != "" && p.producer == nil {
|
||||
producer, err := kafka.NewProducer(p.kafkaConfigMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.producer = producer
|
||||
}
|
||||
if p.producer == nil && p.consumer == nil {
|
||||
return nil, errors.New("at least receiver or sender topic must be set")
|
||||
}
|
||||
}
|
||||
if p.producerDefaultTopic != "" && p.producer == nil {
|
||||
return nil, fmt.Errorf("at least configmap or producer must be set for the sender topic: %s", p.producerDefaultTopic)
|
||||
}
|
||||
|
||||
if len(p.consumerTopics) > 0 && p.consumer == nil {
|
||||
return nil, fmt.Errorf("at least configmap or consumer must be set for the receiver topics: %s", p.consumerTopics)
|
||||
}
|
||||
|
||||
if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil {
|
||||
return nil, errors.New("at least one of the following to initialize the protocol must be set: config, producer, or consumer")
|
||||
}
|
||||
if p.producer != nil {
|
||||
p.producerDeliveryChan = make(chan kafka.Event)
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *Protocol) applyOptions(opts ...Option) error {
|
||||
for _, fn := range opts {
|
||||
if err := fn(p); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error) {
|
||||
if p.producer == nil {
|
||||
return errors.New("producer client must be set")
|
||||
}
|
||||
|
||||
p.closerMux.Lock()
|
||||
defer p.closerMux.Unlock()
|
||||
if p.producer.IsClosed() {
|
||||
return errors.New("producer is closed")
|
||||
}
|
||||
|
||||
defer in.Finish(err)
|
||||
|
||||
kafkaMsg := &kafka.Message{
|
||||
TopicPartition: kafka.TopicPartition{
|
||||
Topic: &p.producerDefaultTopic,
|
||||
Partition: kafka.PartitionAny,
|
||||
},
|
||||
}
|
||||
|
||||
if topic := cecontext.TopicFrom(ctx); topic != "" {
|
||||
kafkaMsg.TopicPartition.Topic = &topic
|
||||
}
|
||||
|
||||
if messageKey := MessageKeyFrom(ctx); messageKey != "" {
|
||||
kafkaMsg.Key = []byte(messageKey)
|
||||
}
|
||||
|
||||
err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e := <-p.producerDeliveryChan
|
||||
m := e.(*kafka.Message)
|
||||
if m.TopicPartition.Error != nil {
|
||||
return m.TopicPartition.Error
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Protocol) OpenInbound(ctx context.Context) error {
|
||||
if p.consumer == nil {
|
||||
return errors.New("the consumer client must be set")
|
||||
}
|
||||
if p.consumerTopics == nil {
|
||||
return errors.New("the consumer topics must be set")
|
||||
}
|
||||
|
||||
p.consumerMux.Lock()
|
||||
defer p.consumerMux.Unlock()
|
||||
logger := cecontext.LoggerFrom(ctx)
|
||||
|
||||
// Query committed offsets for each partition
|
||||
if positions := TopicPartitionOffsetsFrom(ctx); positions != nil {
|
||||
if err := p.consumer.Assign(positions); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
logger.Infof("Subscribing to topics: %v", p.consumerTopics)
|
||||
err := p.consumer.SubscribeTopics(p.consumerTopics, p.consumerRebalanceCb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.closerMux.Lock()
|
||||
p.consumerCtx, p.consumerCancel = context.WithCancel(ctx)
|
||||
defer p.consumerCancel()
|
||||
p.closerMux.Unlock()
|
||||
|
||||
defer func() {
|
||||
if !p.consumer.IsClosed() {
|
||||
logger.Infof("Closing consumer %v", p.consumerTopics)
|
||||
if err = p.consumer.Close(); err != nil {
|
||||
logger.Errorf("failed to close the consumer: %v", err)
|
||||
}
|
||||
}
|
||||
close(p.consumerIncoming)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.consumerCtx.Done():
|
||||
return p.consumerCtx.Err()
|
||||
default:
|
||||
ev := p.consumer.Poll(p.consumerPollTimeout)
|
||||
if ev == nil {
|
||||
continue
|
||||
}
|
||||
switch e := ev.(type) {
|
||||
case *kafka.Message:
|
||||
p.consumerIncoming <- e
|
||||
case kafka.Error:
|
||||
// Errors should generally be considered informational, the client will try to automatically recover.
|
||||
// But in here, we choose to terminate the application if all brokers are down.
|
||||
logger.Infof("Error %v: %v", e.Code(), e)
|
||||
if p.consumerErrorHandler != nil {
|
||||
p.consumerErrorHandler(ctx, e)
|
||||
}
|
||||
if e.Code() == kafka.ErrAllBrokersDown {
|
||||
logger.Error("All broker connections are down")
|
||||
return e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Receive implements Receiver.Receive
|
||||
func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) {
|
||||
select {
|
||||
case m, ok := <-p.consumerIncoming:
|
||||
if !ok {
|
||||
return nil, io.EOF
|
||||
}
|
||||
msg := NewMessage(m)
|
||||
return msg, nil
|
||||
case <-ctx.Done():
|
||||
return nil, io.EOF
|
||||
}
|
||||
}
|
||||
|
||||
// Close cleans up resources after use. Must be called to properly close underlying Kafka resources and avoid resource leaks
|
||||
func (p *Protocol) Close(ctx context.Context) error {
|
||||
p.closerMux.Lock()
|
||||
defer p.closerMux.Unlock()
|
||||
|
||||
if p.consumerCancel != nil {
|
||||
p.consumerCancel()
|
||||
}
|
||||
|
||||
if p.producer != nil && !p.producer.IsClosed() {
|
||||
p.producer.Close()
|
||||
close(p.producerDeliveryChan)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
Copyright 2024 The CloudEvents Authors
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package kafka_confluent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNewProtocol(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
ctx context.Context
|
||||
options []Option
|
||||
errorMessage string
|
||||
}{
|
||||
{
|
||||
name: "invalidated parameters",
|
||||
options: nil,
|
||||
errorMessage: "at least one of the following to initialize the protocol must be set: config, producer, or consumer",
|
||||
},
|
||||
{
|
||||
name: "Insufficient parameters",
|
||||
options: []Option{
|
||||
WithConfigMap(&kafka.ConfigMap{
|
||||
"bootstrap.servers": "127.0.0.1:9092",
|
||||
})},
|
||||
errorMessage: "at least receiver or sender topic must be set",
|
||||
},
|
||||
{
|
||||
name: "Insufficient consumer parameters - group.id",
|
||||
options: []Option{
|
||||
WithConfigMap(&kafka.ConfigMap{
|
||||
"bootstrap.servers": "127.0.0.1:9092",
|
||||
}),
|
||||
WithReceiverTopics([]string{"topic1", "topic2"}),
|
||||
},
|
||||
errorMessage: "Required property group.id not set",
|
||||
},
|
||||
{
|
||||
name: "Insufficient consumer parameters - configmap or consumer",
|
||||
options: []Option{
|
||||
WithReceiverTopics([]string{"topic1", "topic2"}),
|
||||
},
|
||||
errorMessage: "at least configmap or consumer must be set for the receiver topics: [topic1 topic2]",
|
||||
},
|
||||
{
|
||||
name: "Insufficient producer parameters",
|
||||
options: []Option{
|
||||
WithSenderTopic("topic3"),
|
||||
},
|
||||
errorMessage: "at least configmap or producer must be set for the sender topic: topic3",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
_, err := New(tt.options...)
|
||||
if err != nil {
|
||||
assert.Equal(t, tt.errorMessage, err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,125 @@
|
|||
/*
|
||||
Copyright 2023 The CloudEvents Authors
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package kafka_confluent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/cloudevents/sdk-go/v2/binding"
|
||||
"github.com/cloudevents/sdk-go/v2/binding/format"
|
||||
"github.com/cloudevents/sdk-go/v2/binding/spec"
|
||||
"github.com/cloudevents/sdk-go/v2/types"
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
)
|
||||
|
||||
// extends the kafka.Message to support the interfaces for the converting it to binding.Message
|
||||
type kafkaMessageWriter kafka.Message
|
||||
|
||||
var (
|
||||
_ binding.StructuredWriter = (*kafkaMessageWriter)(nil)
|
||||
_ binding.BinaryWriter = (*kafkaMessageWriter)(nil)
|
||||
)
|
||||
|
||||
// WriteProducerMessage fills the provided pubMessage with the message m.
|
||||
// Using context you can tweak the encoding processing (more details on binding.Write documentation).
|
||||
func WriteProducerMessage(ctx context.Context, in binding.Message, kafkaMsg *kafka.Message,
|
||||
transformers ...binding.Transformer,
|
||||
) error {
|
||||
structuredWriter := (*kafkaMessageWriter)(kafkaMsg)
|
||||
binaryWriter := (*kafkaMessageWriter)(kafkaMsg)
|
||||
|
||||
_, err := binding.Write(
|
||||
ctx,
|
||||
in,
|
||||
structuredWriter,
|
||||
binaryWriter,
|
||||
transformers...,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *kafkaMessageWriter) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error {
|
||||
b.Headers = []kafka.Header{{
|
||||
Key: contentTypeKey,
|
||||
Value: []byte(f.MediaType()),
|
||||
}}
|
||||
|
||||
var buf bytes.Buffer
|
||||
_, err := io.Copy(&buf, event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.Value = buf.Bytes()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *kafkaMessageWriter) Start(ctx context.Context) error {
|
||||
b.Headers = []kafka.Header{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *kafkaMessageWriter) End(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *kafkaMessageWriter) SetData(reader io.Reader) error {
|
||||
buf, ok := reader.(*bytes.Buffer)
|
||||
if !ok {
|
||||
buf = new(bytes.Buffer)
|
||||
_, err := io.Copy(buf, reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
b.Value = buf.Bytes()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *kafkaMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error {
|
||||
if attribute.Kind() == spec.DataContentType {
|
||||
if value == nil {
|
||||
b.removeProperty(contentTypeKey)
|
||||
return nil
|
||||
}
|
||||
b.addProperty(contentTypeKey, value)
|
||||
} else {
|
||||
key := prefix + attribute.Name()
|
||||
if value == nil {
|
||||
b.removeProperty(key)
|
||||
return nil
|
||||
}
|
||||
b.addProperty(key, value)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *kafkaMessageWriter) SetExtension(name string, value interface{}) error {
|
||||
if value == nil {
|
||||
b.removeProperty(prefix + name)
|
||||
}
|
||||
return b.addProperty(prefix+name, value)
|
||||
}
|
||||
|
||||
func (b *kafkaMessageWriter) removeProperty(key string) {
|
||||
for i, v := range b.Headers {
|
||||
if v.Key == key {
|
||||
b.Headers = append(b.Headers[:i], b.Headers[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *kafkaMessageWriter) addProperty(key string, value interface{}) error {
|
||||
s, err := types.Format(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.Headers = append(b.Headers, kafka.Header{Key: key, Value: []byte(s)})
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
Copyright 2023 The CloudEvents Authors
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package kafka_confluent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/cloudevents/sdk-go/v2/binding"
|
||||
. "github.com/cloudevents/sdk-go/v2/binding/test"
|
||||
"github.com/cloudevents/sdk-go/v2/event"
|
||||
. "github.com/cloudevents/sdk-go/v2/test"
|
||||
)
|
||||
|
||||
func TestWriteProducerMessage(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
context context.Context
|
||||
messageFactory func(e event.Event) binding.Message
|
||||
expectedEncoding binding.Encoding
|
||||
}{
|
||||
{
|
||||
name: "Structured to Structured",
|
||||
context: ctx,
|
||||
messageFactory: func(e event.Event) binding.Message {
|
||||
return MustCreateMockStructuredMessage(t, e)
|
||||
},
|
||||
expectedEncoding: binding.EncodingStructured,
|
||||
},
|
||||
{
|
||||
name: "Binary to Binary",
|
||||
context: ctx,
|
||||
messageFactory: MustCreateMockBinaryMessage,
|
||||
expectedEncoding: binding.EncodingBinary,
|
||||
},
|
||||
}
|
||||
EachEvent(t, Events(), func(t *testing.T, e event.Event) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := tt.context
|
||||
topic := "test-topic"
|
||||
kafkaMessage := &kafka.Message{
|
||||
TopicPartition: kafka.TopicPartition{
|
||||
Topic: &topic,
|
||||
Partition: int32(0),
|
||||
Offset: kafka.Offset(10),
|
||||
},
|
||||
}
|
||||
|
||||
eventIn := ConvertEventExtensionsToString(t, e.Clone())
|
||||
messageIn := tt.messageFactory(eventIn)
|
||||
|
||||
err := WriteProducerMessage(ctx, messageIn, kafkaMessage)
|
||||
require.NoError(t, err)
|
||||
|
||||
messageOut := NewMessage(kafkaMessage)
|
||||
require.Equal(t, tt.expectedEncoding, messageOut.ReadEncoding())
|
||||
|
||||
if tt.expectedEncoding == binding.EncodingBinary {
|
||||
err = messageOut.ReadBinary(ctx, (*kafkaMessageWriter)(kafkaMessage))
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
eventOut, err := binding.ToEvent(ctx, messageOut)
|
||||
require.NoError(t, err)
|
||||
if tt.expectedEncoding == binding.EncodingBinary {
|
||||
eventIn.SetExtension(KafkaPartitionKey, strconv.FormatInt(int64(kafkaMessage.TopicPartition.Partition), 10))
|
||||
eventIn.SetExtension(KafkaOffsetKey, strconv.FormatInt(int64(kafkaMessage.TopicPartition.Offset), 10))
|
||||
eventIn.SetExtension(KafkaTopicKey, kafkaMessage.TopicPartition.Topic)
|
||||
}
|
||||
AssertEventEquals(t, eventIn, *eventOut)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
# Confluent kafka samples
|
||||
|
||||
To run the samples, you need a running Kafka cluster.
|
||||
|
||||
To run a sample Kafka cluster using docker:
|
||||
|
||||
```
|
||||
docker run --rm --net=host confluentinc/confluent-local
|
||||
```
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
module github.com/cloudevents/sdk-go/samples/kafka_confluent
|
||||
|
||||
go 1.18
|
||||
|
||||
replace github.com/cloudevents/sdk-go/v2 => ../../v2
|
||||
|
||||
replace github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 => ../../protocol/kafka_confluent/v2
|
||||
|
||||
require (
|
||||
github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 v2.0.0-00010101000000-000000000000
|
||||
github.com/cloudevents/sdk-go/v2 v2.15.2
|
||||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/json-iterator/go v1.1.11 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
go.uber.org/atomic v1.4.0 // indirect
|
||||
go.uber.org/multierr v1.1.0 // indirect
|
||||
go.uber.org/zap v1.10.0 // indirect
|
||||
)
|
||||
|
|
@ -0,0 +1,61 @@
|
|||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
|
||||
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
|
||||
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I=
|
||||
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
|
||||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts=
|
||||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8=
|
||||
github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA=
|
||||
github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68=
|
||||
github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE=
|
||||
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
|
||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
|
||||
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
|
||||
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
|
||||
github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs=
|
||||
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78=
|
||||
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
|
||||
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
|
||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec=
|
||||
github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||
github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
|
||||
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
|
||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
|
||||
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
|
||||
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
|
||||
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
|
||||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
|
||||
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08=
|
||||
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
|
||||
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
Copyright 2023 The CloudEvents Authors
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
|
||||
cloudevents "github.com/cloudevents/sdk-go/v2"
|
||||
"github.com/cloudevents/sdk-go/v2/client"
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
)
|
||||
|
||||
var topic = "test-confluent-topic"
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
receiver, err := confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{
|
||||
"bootstrap.servers": "127.0.0.1:9092",
|
||||
"group.id": "test-confluent-offset-id",
|
||||
// "auto.offset.reset": "earliest",
|
||||
"enable.auto.commit": "true",
|
||||
}), confluent.WithReceiverTopics([]string{topic}))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create kafka protocol, %v", err)
|
||||
}
|
||||
defer receiver.Close(ctx)
|
||||
|
||||
// Setting the 'client.WithPollGoroutines(1)' to make sure the events from kafka partition are processed in order
|
||||
c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create client, %v", err)
|
||||
}
|
||||
|
||||
offsetToStart := []kafka.TopicPartition{
|
||||
{Topic: &topic, Partition: 0, Offset: 3},
|
||||
}
|
||||
|
||||
log.Printf("will listen consuming topic %s\n", topic)
|
||||
err = c.StartReceiver(confluent.WithTopicPartitionOffsets(ctx, offsetToStart), receive)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start receiver: %s", err)
|
||||
} else {
|
||||
log.Printf("receiver stopped\n")
|
||||
}
|
||||
}
|
||||
|
||||
func receive(ctx context.Context, event cloudevents.Event) {
|
||||
ext := event.Extensions()
|
||||
|
||||
fmt.Printf("%s[%s:%s] \n", ext[confluent.KafkaTopicKey],
|
||||
ext[confluent.KafkaPartitionKey], ext[confluent.KafkaOffsetKey])
|
||||
}
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
Copyright 2023 The CloudEvents Authors
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
|
||||
cloudevents "github.com/cloudevents/sdk-go/v2"
|
||||
"github.com/cloudevents/sdk-go/v2/client"
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
)
|
||||
|
||||
const topic = "test-confluent-topic"
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
receiver, err := confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{
|
||||
"bootstrap.servers": "127.0.0.1:9092",
|
||||
"group.id": "test-confluent-group-id",
|
||||
"auto.offset.reset": "earliest", // only validated when the consumer group offset has saved before
|
||||
"enable.auto.commit": "true",
|
||||
}), confluent.WithReceiverTopics([]string{topic}))
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create receiver, %v", err)
|
||||
}
|
||||
defer receiver.Close(ctx)
|
||||
|
||||
// Setting the 'client.WithPollGoroutines(1)' to make sure the events from kafka partition are processed in order
|
||||
c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create client, %v", err)
|
||||
}
|
||||
|
||||
log.Printf("will listen consuming topic %s\n", topic)
|
||||
err = c.StartReceiver(ctx, receive)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start receiver: %s", err)
|
||||
} else {
|
||||
log.Printf("receiver stopped\n")
|
||||
}
|
||||
}
|
||||
|
||||
func receive(ctx context.Context, event cloudevents.Event) {
|
||||
fmt.Printf("%s", event)
|
||||
}
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
Copyright 2023 The CloudEvents Authors
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
|
||||
cloudevents "github.com/cloudevents/sdk-go/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
count = 10
|
||||
topic = "test-confluent-topic"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
sender, err := confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{
|
||||
"bootstrap.servers": "127.0.0.1:9092",
|
||||
}), confluent.WithSenderTopic(topic))
|
||||
|
||||
defer sender.Close(ctx)
|
||||
|
||||
c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create client, %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
e := cloudevents.NewEvent()
|
||||
e.SetType("com.cloudevents.sample.sent")
|
||||
e.SetSource("https://github.com/cloudevents/sdk-go/samples/kafka_confluent/sender")
|
||||
_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
|
||||
"id": i,
|
||||
"message": "Hello, World!",
|
||||
})
|
||||
|
||||
if result := c.Send(
|
||||
// Set the producer message key
|
||||
confluent.WithMessageKey(ctx, e.ID()),
|
||||
e,
|
||||
); cloudevents.IsUndelivered(result) {
|
||||
log.Printf("failed to send: %v", result)
|
||||
} else {
|
||||
log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -18,19 +18,23 @@ replace github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 => ../../protocol
|
|||
|
||||
replace github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 => ../../protocol/mqtt_paho/v2
|
||||
|
||||
replace github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 => ../../protocol/kafka_confluent/v2
|
||||
|
||||
require (
|
||||
github.com/Azure/go-amqp v0.17.0
|
||||
github.com/IBM/sarama v1.40.1
|
||||
github.com/cloudevents/sdk-go/protocol/amqp/v2 v2.5.0
|
||||
github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2 v2.0.0-00010101000000-000000000000
|
||||
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.5.0
|
||||
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-00010101000000-000000000000
|
||||
github.com/cloudevents/sdk-go/protocol/nats/v2 v2.5.0
|
||||
github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 v2.0.0-00010101000000-000000000000
|
||||
github.com/cloudevents/sdk-go/protocol/stan/v2 v2.5.0
|
||||
github.com/cloudevents/sdk-go/v2 v2.14.0
|
||||
github.com/cloudevents/sdk-go/v2 v2.15.2
|
||||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
|
||||
github.com/eclipse/paho.golang v0.12.0
|
||||
github.com/google/go-cmp v0.6.0
|
||||
github.com/google/uuid v1.1.1
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/nats-io/nats.go v1.31.0
|
||||
github.com/nats-io/stan.go v0.10.4
|
||||
github.com/stretchr/testify v1.8.4
|
||||
|
|
|
|||
|
|
@ -1,17 +1,29 @@
|
|||
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
|
||||
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
|
||||
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
||||
github.com/IBM/sarama v1.40.1 h1:lL01NNg/iBeigUbT+wpPysuTYW6roHo6kc1QrffRf0k=
|
||||
github.com/IBM/sarama v1.40.1/go.mod h1:+5OFwA5Du9I6QrznhaMHsuwWdWZNMjaBSIxEWEgKOYE=
|
||||
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
|
||||
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I=
|
||||
github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc=
|
||||
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM=
|
||||
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
|
||||
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
|
||||
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
|
||||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts=
|
||||
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8=
|
||||
github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA=
|
||||
github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68=
|
||||
github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE=
|
||||
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
|
||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||
github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0=
|
||||
github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM=
|
||||
|
|
@ -26,15 +38,17 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
|
|||
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
|
||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
|
||||
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
|
||||
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
|
||||
|
|
@ -80,6 +94,7 @@ github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/
|
|||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
|
||||
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
|
||||
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
||||
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
|
||||
|
|
@ -90,11 +105,15 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
|
|||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
|
||||
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs=
|
||||
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78=
|
||||
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
|
||||
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
|
||||
github.com/nats-io/nats-server/v2 v2.9.23 h1:6Wj6H6QpP9FMlpCyWUaNu2yeZ/qGj+mdRkZ1wbikExU=
|
||||
|
|
@ -109,6 +128,9 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
|||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw=
|
||||
github.com/nats-io/stan.go v0.10.4/go.mod h1:3XJXH8GagrGqajoO/9+HgPyKV5MWsv7S5ccdda+pc6k=
|
||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec=
|
||||
github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w=
|
||||
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
|
||||
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
|
|
@ -125,6 +147,7 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua
|
|||
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
|
||||
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
|
|
@ -134,6 +157,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
|||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8=
|
||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
|
|
@ -141,6 +165,7 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
|
|||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
|
||||
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
|
||||
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
|
||||
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
|
||||
|
|
@ -216,6 +241,9 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
|
|||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08=
|
||||
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
|
||||
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
|
|
|||
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
Copyright 2023 The CloudEvents Authors
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package kafka_confluent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
|
||||
cloudevents "github.com/cloudevents/sdk-go/v2"
|
||||
"github.com/cloudevents/sdk-go/v2/event"
|
||||
"github.com/cloudevents/sdk-go/v2/test"
|
||||
)
|
||||
|
||||
const (
|
||||
TEST_GROUP_ID = "test_confluent_group_id"
|
||||
BOOTSTRAP_SERVER = "localhost:9192"
|
||||
)
|
||||
|
||||
type receiveEvent struct {
|
||||
event cloudevents.Event
|
||||
err error
|
||||
}
|
||||
|
||||
func TestSendEvent(t *testing.T) {
|
||||
test.EachEvent(t, test.Events(), func(t *testing.T, eventIn event.Event) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
topicName := "test-ce-confluent-" + uuid.New().String()
|
||||
// create the topic with kafka.AdminClient manually
|
||||
admin, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": BOOTSTRAP_SERVER})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = admin.CreateTopics(ctx, []kafka.TopicSpecification{{
|
||||
Topic: topicName,
|
||||
NumPartitions: 1,
|
||||
ReplicationFactor: 1}})
|
||||
require.NoError(t, err)
|
||||
|
||||
eventIn = test.ConvertEventExtensionsToString(t, eventIn)
|
||||
|
||||
// start a cloudevents receiver client go to receive the event
|
||||
eventChan := make(chan receiveEvent)
|
||||
|
||||
receiverReady := make(chan bool)
|
||||
go func() {
|
||||
p, err := protocolFactory("", []string{topicName})
|
||||
if err != nil {
|
||||
eventChan <- receiveEvent{err: err}
|
||||
return
|
||||
}
|
||||
defer p.Close(ctx)
|
||||
|
||||
client, err := cloudevents.NewClient(p)
|
||||
if err != nil {
|
||||
eventChan <- receiveEvent{err: err}
|
||||
}
|
||||
|
||||
receiverReady <- true
|
||||
err = client.StartReceiver(ctx, func(event cloudevents.Event) {
|
||||
eventChan <- receiveEvent{event: event}
|
||||
})
|
||||
if err != nil {
|
||||
eventChan <- receiveEvent{err: err}
|
||||
}
|
||||
}()
|
||||
|
||||
<-receiverReady
|
||||
|
||||
// start a cloudevents sender client go to send the event
|
||||
p, err := protocolFactory(topicName, nil)
|
||||
require.NoError(t, err)
|
||||
defer p.Close(ctx)
|
||||
|
||||
client, err := cloudevents.NewClient(p)
|
||||
require.NoError(t, err)
|
||||
res := client.Send(ctx, eventIn)
|
||||
require.NoError(t, res)
|
||||
|
||||
// check the received event
|
||||
receivedEvent := <-eventChan
|
||||
require.NoError(t, receivedEvent.err)
|
||||
eventOut := test.ConvertEventExtensionsToString(t, receivedEvent.event)
|
||||
|
||||
// test.AssertEventEquals(t, eventIn, receivedEvent.event)
|
||||
err = test.AllOf(
|
||||
test.HasExactlyAttributesEqualTo(eventIn.Context),
|
||||
test.HasData(eventIn.Data()),
|
||||
test.HasExtensionKeys([]string{confluent.KafkaPartitionKey, confluent.KafkaOffsetKey}),
|
||||
test.HasExtension(confluent.KafkaTopicKey, topicName),
|
||||
)(eventOut)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
// To start a local environment for testing:
|
||||
// Option 1: Start it on port 9092
|
||||
//
|
||||
// docker run --rm --net=host -p 9092:9092 confluentinc/confluent-local
|
||||
//
|
||||
// Option 2: Start it on port 9192
|
||||
// docker run --rm \
|
||||
// --name broker \
|
||||
// --hostname broker \
|
||||
// -p 9192:9192 \
|
||||
// -e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://broker:29192,PLAINTEXT_HOST://localhost:9192' \
|
||||
// -e KAFKA_CONTROLLER_QUORUM_VOTERS='1@broker:29193' \
|
||||
// -e KAFKA_LISTENERS='PLAINTEXT://broker:29192,CONTROLLER://broker:29193,PLAINTEXT_HOST://0.0.0.0:9192' \
|
||||
// confluentinc/confluent-local:latest
|
||||
func protocolFactory(sendTopic string, receiveTopic []string,
|
||||
) (*confluent.Protocol, error) {
|
||||
|
||||
var p *confluent.Protocol
|
||||
var err error
|
||||
if receiveTopic != nil {
|
||||
p, err = confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{
|
||||
"bootstrap.servers": BOOTSTRAP_SERVER,
|
||||
"group.id": TEST_GROUP_ID,
|
||||
"auto.offset.reset": "earliest",
|
||||
"enable.auto.commit": "true",
|
||||
}), confluent.WithReceiverTopics(receiveTopic))
|
||||
}
|
||||
if sendTopic != "" {
|
||||
p, err = confluent.New(confluent.WithConfigMap(&kafka.ConfigMap{
|
||||
"bootstrap.servers": BOOTSTRAP_SERVER,
|
||||
}), confluent.WithSenderTopic(sendTopic))
|
||||
}
|
||||
return p, err
|
||||
}
|
||||
Loading…
Reference in New Issue