基于APACHE-KAFKA-BINDER的春云流功能模型

人气:175 发布:2023-01-03 标签: apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka

问题描述

这是question的续集。我可以把"普通"的阿帕奇卡夫卡活页夹和功能模型一起使用吗?到目前为止,使用基于注释的配置,我在一个应用程序中混合了spring-cloud-stream-binder-kafkaspring-cloud-stream-binder-kafka-streams两者,spring-cloud-stream-binder-kafka用于简单的使用/生产和spring-cloud-stream-binder-kafka-streams高级流处理。

功能模型似乎只有streams绑定器支持,如果我尝试混合这两种方法-基于简单用法的注释和用于流的功能,流绑定不会注册。

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            simple-binding-in:
              destination: another-topic

public interface SimpleBinding {

    String INPUT = "simple-binding-in";

    @Input(INPUT)
    SubscribableChannel simpleIn();

}

@Component
public class SimpleListener {

    @StreamListener(SimpleBinding.INPUT)
    public void listen(@Payload SomeDto payload) {
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}

@EnableBinding(SimpleBinding.class)出现在配置类上。是否更喜欢/支持按说明混合使用两者,或者我是否应该使用streams-binder,即使是简单的消息消费?

推荐答案

对于kafka绑定器,您可以也绝对应该使用功能模型,而完全忘记StreamListener。这样,它将与您的KStream功能模型保持一致。

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            listen-in-0:
              destination: another-topic

@Component
public class SimpleListener {

    @Bean
    public Consumer<SomeDto> listen() {
        return payload -> ...
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}

23