Feat: update kafka sink (#174)

This commit is contained in:
ethfoo 2023-07-11 10:29:56 +08:00 committed by GitHub
parent 6d9e55055b
commit 2620bcfac9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 114 additions and 41 deletions

View File

@ -5,6 +5,7 @@
!!! example
=== "简单"
```yaml
sink:
type: franzKafka
@ -12,6 +13,19 @@
topic: "log-${fields.topic}"
```
=== "SASL认证"
```yaml
sink:
type: kafka
brokers: ["127.0.0.1:6400"]
topic: "demo"
sasl:
enabled: true
mechanism: SCRAM-SHA-512
username: ***
password: ***
```
## brokers
| `字段` | `类型` | `是否必填` | `默认值` | `含义` |
@ -48,6 +62,50 @@
```
可配置`topic: ${fields.topic}`同样也会发送到topic "loggie"。
## ifRenderTopicFailed
| `字段` | `类型` | `是否必填` | `默认值` | `含义` |
| ---------- | ----------- | ----------- | --------- | -------- |
| ifRenderTopicFailed | | 非必填 | | 如果使用动态的规则渲染topic比如`topic: ${fields.topic}`有可能渲染失败比如日志没有fields.topic字段下面配置指示失败后的动作。 |
| ifRenderTopicFailed.dropEvent | | 非必填 | true | 默认为丢弃 |
| ifRenderTopicFailed.ignoreError | | 非必填 | | 忽略错误日志,请注意,这里只是不打印错误日志 |
| ifRenderTopicFailed.defaultTopic | | 非必填 | | 发送至该默认的topic配置后dropEvent不生效 |
!!! example
=== "1"
```yaml
sink:
type: kafka
brokers: ["127.0.0.1:6400"]
topic: "log-${fields.topic}"
ifRenderTopicFailed:
dropEvent: true
```
=== "2"
```yaml
sink:
type: kafka
brokers: ["127.0.0.1:6400"]
topic: "log-${fields.topic}"
ifRenderTopicFailed:
ignoreError: true
defaultTopic: default
```
## ignoreUnknownTopicOrPartition
| `字段` | `类型` | `是否必填` | `默认值` | `含义` |
| ---------- | ----------- | ----------- | --------- | -------- |
| ignoreUnknownTopicOrPartition | | 非必填 | | 用于当发送的topic不存在时忽略Kafka返回UNKNOWN_TOPIC_OR_PARTITION的报错 |
- 这种情况一般发生在使用动态渲染的topic但是环境里的Kafka关闭了自动创建topic导致无法发送至渲染出来的topic。默认情况Loggie会一直不停的重试从而无法发送新的日志。
- 开启ignoreUnknownTopicOrPartition后会直接丢弃发送的日志避免影响其他正常包含已存在topic的日志发送。
- 请注意和上面`ifRenderTopicFailed`的区别,`ifRenderTopicFailed`是动态渲染不出来topic或者渲染的为空值而`ignoreUnknownTopicOrPartition`则是渲染成功但是topic在Kafka中实际不存在。
## balance
@ -122,9 +180,9 @@
| ---------- |--------| ----------- | ------ |-------------------------------------|
| security | string | 非必填 | | java格式的安全认证内容可以自动转化成为franz-go适配的格式 |
案例:
```
pipelines:
!!! example
```
pipelines:
- name: local
sources:
- type: file
@ -145,8 +203,7 @@ pipelines:
sasl.mechanism: "GSSAPI"
sasl.jaas.config: "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true debug=true keyTab=\"/shylock/kerberos/zork.keytab\" principal=\"zork@AXRZPT.COM\";"
sasl.kerberos.service.name: "kafka"
```
```
Kubernetes 挂载keytab二进制证书请参考[官方文档](https://kubernetes.io/zh-cn/docs/concepts/configuration/secret/)。

View File

@ -4,6 +4,7 @@
!!! example
=== "简单"
```yaml
sink:
type: kafka
@ -11,6 +12,20 @@
topic: "log-${fields.topic}"
```
=== "SASL认证"
```yaml
sink:
type: kafka
brokers: ["127.0.0.1:6400"]
topic: "demo"
sasl:
type: scram
userName: ***
password: ***
algorithm: sha512
```
## brokers
| `字段` | `类型` | `是否必填` | `默认值` | `含义` |
@ -87,9 +102,9 @@
| ---------- | ----------- | ----------- | --------- | -------- |
| ignoreUnknownTopicOrPartition | | 非必填 | | 用于当发送的topic不存在时忽略Kafka返回UNKNOWN_TOPIC_OR_PARTITION的报错 |
这种情况一般发生在使用动态渲染的topic但是环境里的Kafka关闭了自动创建topic导致无法发送至渲染出来的topic。默认情况Loggie会一直不停的重试从而无法发送新的日志。
开启ignoreUnknownTopicOrPartition后这种情况下会直接丢弃发送的日志避免影响其他正常包含已存在topic的日志发送。
请注意和上面`ifRenderTopicFailed`的区别,`ifRenderTopicFailed`是动态渲染不出来topic或者渲染的为空值而`ignoreUnknownTopicOrPartition`则是渲染成功但是topic在Kafka中实际不存在。
- 这种情况一般发生在使用动态渲染的topic但是环境里的Kafka关闭了自动创建topic导致无法发送至渲染出来的topic。默认情况Loggie会一直不停的重试从而无法发送新的日志。
- 开启ignoreUnknownTopicOrPartition后会直接丢弃发送的日志避免影响其他正常包含已存在topic的日志发送。
- 请注意和上面`ifRenderTopicFailed`的区别,`ifRenderTopicFailed`是动态渲染不出来topic或者渲染的为空值而`ignoreUnknownTopicOrPartition`则是渲染成功但是topic在Kafka中实际不存在。
## balance

View File

@ -82,7 +82,8 @@ markdown_extensions:
- name: mermaid
class: mermaid-experimental
format: !!python/name:pymdownx.superfences.fence_code_format
- pymdownx.tabbed
- pymdownx.tabbed:
alternate_style: true
- pymdownx.tasklist:
custom_checkbox: true
- pymdownx.tilde