diff --git a/docs/high-performance/message-queue/kafka-questions-01.md b/docs/high-performance/message-queue/kafka-questions-01.md index d6f4b0ac..648f9fe2 100644 --- a/docs/high-performance/message-queue/kafka-questions-01.md +++ b/docs/high-performance/message-queue/kafka-questions-01.md @@ -218,6 +218,163 @@ acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后 - 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样 - 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。 +# kafka的重试机制 +网上关于 Spring kafka 的默认重试机制文章很多,但大多都是过时的,和实际运行结果完全不一样。以下是根据 Spring-kafka-2.9.3 源码重新梳理一下。 + +## 消费失败会怎么样 + +在消费过程中,当其中一个消息消费异常时,会不会卡住后续队列消息的消费?这样业务岂不是卡住了? + +生产者代码: + + ```Java + for (int i = 0; i < 10; i++) { + kafkaTemplate.send(KafkaConst.TEST_TOPIC, String.valueOf(i)) + } + ``` + +消费者消代码: + + ```Java + @KafkaListener(topics = {KafkaConst.TEST_TOPIC},groupId = "apple") + private void customer(String message) throws InterruptedException { + log.info("kafka customer:{}",message); + Integer n = Integer.parseInt(message); + if (n%5==0){ + throw new RuntimeException(); + } + } + ``` + +在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。下面是一段消费的日志,可以看出当 test-0@95 重试多次后会被跳过。 + +```Java +2023-08-10 12:03:32.918 DEBUG 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Skipping seek of: test-0@95 +2023-08-10 12:03:32.918 TRACE 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Seeking: test-0 to: 96 +2023-08-10 12:03:32.918 INFO 9700 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-apple-1, groupId=apple] Seeking to offset 96 for partition test-0 + +``` + +## 默认会重试多少次? + +默认配置下,消费异常会进行重试,重试次数是多少, 重试是否有时间间隔? +10 次。看源码 FailedRecordTracker 类有个 recovered 函数,返回 Boolean 值判断是否要进行重试,下面是这个函数中判断是否重试的逻辑: + +```Java + FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition); + this.retryListeners.forEach(rl -> + rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get())); + long nextBackOff = failedRecord.getBackOffExecution().nextBackOff(); + if (nextBackOff != BackOffExecution.STOP) { + this.backOffHandler.onNextBackOff(container, exception, nextBackOff); + return false; + } +``` + +其中 BackOffExecution.STOP 的值为 -1,nextBackOff 的值调用 BackOff 类的 nextBackOff() 函数。如果当前执行次数大于最大执行次数则返回 STOP,既超过这个最大执行次数后才会停止重试。 + +```Java +public long nextBackOff() { + this.currentAttempts++; + if (this.currentAttempts <= getMaxAttempts()) { + return getInterval(); + } + else { + return STOP; + } +} +``` + +那么这个 getMaxAttempts 的值又是多少呢?回到最开始,当执行出错会进入 DefaultErrorHandler 。DefaultErrorHandler 默认的构造函数是: + +```Java +public DefaultErrorHandler() { + this(null, SeekUtils.DEFAULT_BACK_OFF); +} +``` + +SeekUtils.DEFAULT_BACK_OFF 定义的是: + +```Java +public static final int DEFAULT_MAX_FAILURES = 10; + +public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0, DEFAULT_MAX_FAILURES - 1); +``` + +DEFAULT_MAX_FAILURES 的值是10,currentAttempts从0到9,所以总共会执行10次,每次重试的时间间隔为0。 + +## 如何自定义重试次数,以及时间间隔 + +从上面的代码可以知道,默认错误处理器的重试次数以及时间间隔是由 FixedBackOff 控制的,FixedBackOff 是 DefaultErrorHandler 初始化时默认的。所以自定义重试次数以及时间间隔,只需要在 DefaultErrorHandler 初始化的时候传入自定义的 FixedBackOff 即可。重新实现一个 KafkaListenerContainerFactory ,调用 setCommonErrorHandler 设置新的自定义的错误处理器就可以实现。 + +```Java +@Bean +public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + // 自定义重试时间间隔以及次数 + FixedBackOff fixedBackOff = new FixedBackOff(1000, 5); + factory.setCommonErrorHandler(new DefaultErrorHandler(fixedBackOff)); + factory.setConsumerFactory(consumerFactory); + return factory; +} +``` + +## 如何在重试失败后进行告警 + +自定义重试失败后逻辑,需要手动实现,以下是一个简单的例子,重写 DefaultErrorHandler 的 handleRemaining 函数,加上自定义的告警等操作。 + +```Java +@Slf4j +public class DelErrorHandler extends DefaultErrorHandler { + + public DelErrorHandler(FixedBackOff backOff) { + super(null,backOff); + } + + @Override + public void handleRemaining(Exception thrownException, List> records, Consumer consumer, MessageListenerContainer container) { + super.handleRemaining(thrownException, records, consumer, container); + log.info("重试多次失败"); + // 自定义操作 + } +} +``` +DefaultErrorHandler 只是默认的一个错误处理器,Spring kafka 还提供了 CommonErrorHandler 接口。手动实现 CommonErrorHandler 就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。 + +## 重试失败后的数据如何再次处理 + +当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢? + +死信队列(Dead Letter Queue,简称DLQ)是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。 + +`@RetryableTopic` 是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。 + +```Java +@RetryableTopic( + attempts = "5", + backoff = @Backoff(delay = 100, maxDelay = 1000) +) +@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple") +private void customer(String message) { + log.info("kafka customer:{}", message); + Integer n = Integer.parseInt(message); + if (n % 5 == 0) { + throw new RuntimeException(); + } + System.out.println(n); +} +``` + +这个例子在listen方法上使用@RetryableTopic注解,配置了: +- 重试5次 +- 重试间隔100毫秒,最大间隔1秒 + +重试完毕后,如果仍然失败,则会进入消息队列,此时就存在两个队列: +- test 原队列 +- test-dlt 原队列对应的死信队列 + +对于死信队列的处理,既可以用 `@DltHandler` 处理,也可以使用 `@KafkaListener` 重新消费。 + ### Reference - Kafka 官方文档:https://kafka.apache.org/documentation/