PolledProcessor在Spring云数据流上的问题

人气:124 发布:2023-01-03 标签: spring-cloud-stream spring-cloud-dataflow

问题描述

我正在使用PolledProcessor实现一个Spring云数据流处理器。我遵循了这里的示例https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers。以下是我的代码。我将一个带有源管道的流部署到SCDF的这个处理器(源|轮询处理器),并让源发布了一些消息。我确认处理器每秒轮询来自SCDF Rabbitmq的消息,但result始终为false。我去了SCDF Rabbitmq控制台,我看到那些消息都在队列中。因此,处理器不会轮询消息,尽管它在代码中保持轮询。我还看到队列没有使用者。看起来SCDF没有将此处理器绑定到队列。知道为什么吗?

public interface PolledProcessor {
    @Input
    PollableMessageSource source();

    @Output
    MessageChannel dest();
}

@SpringBootApplication
@EnableBinding(PolledProcessor.class)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(PollableMessageSource source, MessageChannel dest) {
        return args -> {
            while (true) {
                boolean result = source.poll(dest::send);
                Thread.sleep(1000);
            }
        };
    }
}

这是源和处理器之间的队列状态

推荐答案

我测试了一个没有问题的Spring Cloud Stream应用:

@SpringBootApplication
@EnableBinding(Polled.class)
public class So69383266Application {

    public static void main(String[] args) {
        SpringApplication.run(So69383266Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(PollableMessageSource source) {
        return args -> {
            while (true) {
                boolean result = source.poll(System.out::println);
                System.out.println(result);
                Thread.sleep(1000);
            }
        };
    }

}

interface Polled {

    @Input
    PollableMessageSource source();

}
false
GenericMessage [payload=byte[6], headers={...
true
false

我建议您在AmqpMessageSource.doReceive()中设置断点以查看发生了什么。

编辑

下面介绍如何检查来源是否正在使用正确的队列:

@Bean
public ApplicationRunner runner(PollableMessageSource source) {
    return args -> {
        while (true) {
            DirectFieldAccessor dfa = new DirectFieldAccessor(source);
            log.info(dfa.getPropertyValue("source.h.advised.targetSource.target.queue").toString());
            boolean result = source.poll(System.out::println);
            System.out.println(result);
            Thread.sleep(1000);
        }
    };
}

26