问题描述
这是question的续集。我可以把"普通"的阿帕奇卡夫卡活页夹和功能模型一起使用吗?到目前为止,使用基于注释的配置,我在一个应用程序中混合了spring-cloud-stream-binder-kafka
和spring-cloud-stream-binder-kafka-streams
两者,spring-cloud-stream-binder-kafka
用于简单的使用/生产和spring-cloud-stream-binder-kafka-streams
高级流处理。
功能模型似乎只有streams
绑定器支持,如果我尝试混合这两种方法-基于简单用法的注释和用于流的功能,流绑定不会注册。
spring.cloud:
stream:
function:
definition: processStream
bindings:
processStream-in-0:
destination: my-topic
simple-binding-in:
destination: another-topic
public interface SimpleBinding {
String INPUT = "simple-binding-in";
@Input(INPUT)
SubscribableChannel simpleIn();
}
@Component
public class SimpleListener {
@StreamListener(SimpleBinding.INPUT)
public void listen(@Payload SomeDto payload) {
}
}
@Configuration
public class FunctionalStream {
@Bean
public Consumer<KStream<String>> processStream() {
return eventStream -> eventStream.map()
}
}
@EnableBinding(SimpleBinding.class)
出现在配置类上。是否更喜欢/支持按说明混合使用两者,或者我是否应该使用streams-binder
,即使是简单的消息消费?
推荐答案
对于kafka绑定器,您可以也绝对应该使用功能模型,而完全忘记StreamListener。这样,它将与您的KStream功能模型保持一致。
spring.cloud:
stream:
function:
definition: processStream
bindings:
processStream-in-0:
destination: my-topic
listen-in-0:
destination: another-topic
@Component
public class SimpleListener {
@Bean
public Consumer<SomeDto> listen() {
return payload -> ...
}
}
@Configuration
public class FunctionalStream {
@Bean
public Consumer<KStream<String>> processStream() {
return eventStream -> eventStream.map()
}
}