如何优雅地关闭被动的Kafka-Consumer并提交最后处理的记录?

问题描述我苦苦寻找此功能的过程在令人作呕的日志问题Severallastoffsetsarentgettingcommitedwithreactivekafka中得到了充分的描述,它显示了我多次尝试不同的失败。如何订阅ReactiveKafkaConsumerTemplate<S

发布:2022-10-16 标签:spring-kafka


我们可以在春靴中使用多个卡夫卡模板吗?

问题描述在我的SpringBootKafka发布应用程序中,我希望提供对以字符串(Json)或字节格式发布消息的支持,因为我希望同时支持json和avro。但是春装中的卡夫卡模板让我们只能定义其中的一个模板。有没有办法同时使用两个模板或任何其他方式来同时支持JSON和Avro?K

发布:2022-10-16 标签:apache-kafkaspring-kafkakafka-producer-api


Spring-Kafka使用Spring Boot版本2.3.7进行批量错误处理

问题描述我正在尝试执行SpringKafka批处理错误处理。首先,我有几个问题。监听器错误处理程序和容器错误处理程序有什么区别,这两个类别有哪些错误?您能帮助一些样本更好地了解这一点吗?这是我们的设计:每隔一定时间间隔轮询批量消费消息基于键推送到本地缓存(应用缓存)(避免重复事件

发布:2022-10-16 标签:spring-bootspring-kafka


如何使用Spring Kafka实现有状态消息监听器?

问题描述我希望使用SpringKafkaAPI实现有状态监听器。提供以下信息:ConCurrentKafkaListenerContainerFactory,并发设置为"n"Spring@Service类上的@KafkaListener批注方法然后创建"n"个KafkaMessa

发布:2022-10-16 标签:javaspringapache-kafkaspring-kafka


春云Kafka StreamsUncaughtExceptionHandler

问题描述我正在尝试将StreamsUncaughtExceptionHandler添加到我的Kafka流处理器中。该处理器是用Kafka函数编写的。我查看了suggestionprovidedbyArtemBilan以将StreamsUncaughtExceptionHandle

发布:2022-10-16 标签:javaapache-kafkaspring-kafkaapache-kafka-streamsspring-cloud-stream


如何确定春季卡夫卡设置的并发性?

问题描述我正在使用@KafkaListener注释编写一个Kafka使用者,我知道有一种方法可以使用ConCurentKafkaListenerContainerFactory中的方法增加来自不同分区的并发Kafka使用者的数量e.g.factory.setConcurrency

发布:2022-10-16 标签:apache-kafkakafka-consumer-apispring-kafka


Spring Kafka消费者客户端-ID配置

问题描述我有两个Kafka侦听器组件,每个侦听不同的主题,并期待不同的有效负载。我的问题是,我可以对两者使用相同的客户端ID,还是必须不同?如果客户ID必须不同,我想了解一个可以有效使用客户ID的用例。推荐答案根据文档:发出请求时传递给服务器的id字符串。这样做的目的是允许在服务

发布:2022-10-16 标签:spring-kafka


为什么当我用AutoFlush将自定义值设置为False时,Async Producer不等待linger.ms或Batch.Size填满它们?

问题描述我使用的是Spring-Kafka2.2.8,并编写了一个简单的异步生成器,设置如下:linger.ms:300000,batch.size:33554431,max.block.ms:60000.现在我正在通过调用下面的构造函数创建一个KafkaTemplate,并将A

发布:2022-10-16 标签:spring-kafka


单记录卡夫卡消费者和卡夫卡批量消费者的基本区别是什么?

问题描述我正在使用Spring-Kafka2.2.8,试图了解单记录消费者和批量消费者之间的主要区别。据我所知,从一个主题中读取消息/字节对于单个记录使用者和批处理使用者来说没有什么不同。唯一的区别是如何提交偏移量。并因此进行错误处理。我的理解正确吗?请确认。推荐答案使用基于记录

发布:2022-10-16 标签:spring-kafka


当并发设置为1以上时,如何暂停特定的Kafka消费者线程?

问题描述我使用的是Spring-kafka2.2.8,并将并发设置为2,如下所示,并尝试了解如何在满足特定条件时暂停使用者线程/实例。@KafkaListener(id="myConsumerId",topics="myTopic",concurrency=2)publicvoi

发布:2022-10-16 标签:spring-kafka