首页电脑使用kafka 消费重试 kafka消费者重试

kafka 消费重试 kafka消费者重试

圆圆2025-09-13 23:01:52次浏览条评论

使用 reactor kafka 消费指定范围消息后停止 consumer

本文介绍了如何使用Reactor Kafka从指定Topic的启动位置开始消费消息,直到达到该Topic Partition的最新Offset,并在消费完成后顺利地停止Consumer。通过结合seekToBeginning、endOffsets 和 takeUntil等Reactor Kafka的特性,可以实现精准的消息消费控制。

在某些场景下,我们需要消费Kafka Topic中的全部或部分消息,并在消费完成后停止 Consumer,例如数据迁移、历史数据分析等。Reactor Kafka 提供了强大的 API 来实现这种需求。以下是一个示例,展示了如何使用 Reactor Kafka 从 Topic 的初始消费位置到最新的 Offset,然后 Consumer 停止。

代码示例导入 org.apache.kafka.common.TopicPartition;导入 org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;导入 org.springframework.kafka.listener.ConsumerProperties;导入 org.springframework.kafka.support.Acknowledgment;导入 react.core.Disposable;导入 react.core.publisher.Flux;导入 react.core.publisher.Mono;导入 react.kafka.receiver.ReceiverOptions;导入 react.kafka.receiver.ReceiverPartition;导入 react.kafka.receiver.ReceiverRecord;导入 java.time.Duration;导入 java.util.Collections;导入 java.util.Map;公共类 KafkaConsumerExample { public Disposable consumerMessages(String topic, String groupId, String bootstrapServers) { TopicPartition topicPartition = new TopicPartition(topic, 0); // 假设只有一个 Partition // 配置 Consumer 属性 Maplt;String, Objectgt;consumerProps = Map.of( quot;bootstrap.serversquot;, bootstrapServers, quot;group.idquot;, groupId, quot;key.deserializerquot;, org.apache.kafka.common.serialization.StringDeserializer.class, quot;value.deserializerquot;, org.apache.kafka.common.serialization.StringDeserializer.class, quot;auto.offset.resetquot;, quot;earliestquot; // 最早的 Offset 开始消费 ); // 创建 ReceiverOptions ReceiverOptionslt;String, Stringgt; receiveOptions = ReceiverOptions.lt;String, St

ringgt;create(consumerProps) .subscription(Collections.singleton(topic)) .addAssignListener(partitions -gt;partitions.forEach(ReceiverPartition::seekToBeginning)); // 创建ReactiveKafkaConsumerTemplate ReactiveKafkaConsumerTemplatelt;String,Stringgt;kafkaConsumer = new ReactiveKafkaConsumerTemplatelt;gt;(receiverOptions); // 消费消息并停止Consumer return kafkaConsumer .receive() .flatMap(record -gt;{//获取当前Partition的最新Offset Monolt;Maplt;TopicPartition,Longgt;gt;endOffsetsMono = kafkaConsumer.doOnConsumer(consumer -gt;consumer.endOffsets(Collections.singleton(topicPartition)));return endOffsetsMono.map(topicPartitionToLastOffset -gt; { long lastOffset = topicPartitionToLastOffset.get(topicPartition); return new RecordWithLastOffset(record, lastOffset); }); }) .takeUntil(recordWithLastOffset -gt; recordWithLastOffset.record.offset() gt;= (recordWithLastOffset.lastOffset - 1)) .subscribe(recordWithLastOffset -gt; { ReceiverRecordlt;String, Stringgt; record = recordWithLastOffset.record; 确认 acknowledgment = record.receiverOffset(); System.out.printf(quot;Rece

已获取消息:topic-partition=s offset=d key=s value=s\nquot;, acknowledgment.topicPartition(), acknowledgment.offset(), record.key(), record.value()); acknowledgment.acknowledge(); }); } private static class RecordWithLastOffset { private final ReceiverRecordlt;String, Stringgt; record; private final long lastOffset; public RecordWithLastOffset(ReceiverRecordlt;String, Stringgt; record, long lastOffset) { this.record = record; this.lastOffset = lastOffset; } } public static void main(String[] args) { String topic = quot;your-topic-namequot; String groupId = quot;your-group-idquot; String bootstrapServers = quot;localhost:9092quot; KafkaConsumerExample example = new KafkaConsumerExample(); Disposable Disposable = example.consumeMessages(topic, groupId, bootstrapServers); // 保持程序运行一段时间,以便消费消息 try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } // 取消订阅,停止消费dispose(); }}登录后复制

解释代码 DeepSeek

幻方量化公司推出的开源大模型平台

7087 查看详细配置 Consumer 属性:设置 Kafka Consumer 的连接信息、序列化方式、GroupId 以及 Offset 重置策略。auto.offset.reset = 最早预测从 Topic 的初始化位置开始消费。创建 ReceiverOptions:使用配置的 Consumer 属性创建 ReceiverOptions,并通过订阅指定要消费的 Topic。addAssignListener 用于在 Partition 分配后,通过eekToBeginning Consumer 的 Offset创建ReactiveKafkaConsumerTemplate:使用ReceiverOptions创建ReactiveKafkaConsumerTemplate,用于消费Kafka消息。消费消息并停止Consumer:kafkaConsumer.receive():从Kafka Topic接收消息,返回一个Fluxlt;ReceiverRecordlt;String, Stringgt;gt;。flatMap: 对于接收每个到的消息,使用kafkaConsumer.doOnConsumer来获取当前TopicPartition的最新Offset。doOnConsumer允许你访问底层的KafkaConsumer对象,从而可以调用consumer.endOffsets方法。map:将ReceiverRecord并获取到的最新Offset封装到一个自定义的RecordWithLastOffset对象中。takeUntil:使用takeUntil操作符,当消费到最新Offset record.offset() gt;= (lastOffset - 1) 判断当前消息的Offset是否已经达到或超过了最新Offset的前一个位置。订阅:订阅Flux,处理接收到的消息。在订阅方法中,可以执行消息处理逻辑,并使用record.receiverOffset().acknowledge()提交Offset。取消订阅:使用disposable.dispose() 取消订阅,停止Consumer。

注意事项示例代码假设Topic只有一个Partition。如果Topic有多个Partition,需要根据实际情况进行调整。endOffsets 返回方法是一个Maplt;TopicPartition,Longgt;,其中Long值是每个Partition的最新Offset。

Offset 的提交方式有多种,示例代码中使用手动提交,即在 subscribe 方法中调用 record.receiverOffset().acknowledge() 提交 Offset。也可以使用自动提交,通过设置 Consumer 的 enable.auto.commit 属性来实现。在实际应用中,需要处理可能出现的异常情况,例如 Kafka 连接失败、消息处理失败等。

总结

通过 Reactor Kafka 的eekToBeginning、endOffsets 和 takeUntil 等特性,可以实现精准的消息消费控制,并在完成消费后优雅地停止 Consumer。这种方式适用于需要消费指定范围消息的场景,例如数据迁移、历史数据分析等。在实际应用中,需要根据具体的需求进行调整和优化。

以上就是使用 Reactor Kafka 消费指定范围消息后停止 Consumer 的内容详细,更多请关注乐哥常识网相关其他文章! apache edge ai kafka String封装自动地图对象数据分析大家都看: 在React前端处理Java传入Map类型API响应的实践指南解决Spring Boot与React应用在AWS部署中CORS错误的终极指南React Native中WritableArrayWritableMap报错及size()为0如何解决?React Native中ReadableArray无法传入Map?如何解决“提供了null非法类型”错误?Java框架与储备React框架的集成

使用 Reactor
java中读取文件报错Input not set java中读取文件
相关内容
发表评论

游客 回复需填写必要信息