问题描述
我有一个Kafka流--比如博客和Kafka表--比如那些博客相关的评论。Kafka流中的key可以映射到Kafka表中的多个值,即一个博客可以有多条评论。我想连接这两个对象,并创建一个带有注释ID数组的新对象。但是当我连接时,流只包含最后一个注释id。有没有任何文档或示例代码可以为我指明如何实现这一点的正确方向?基本上,有没有文档详细说明如何使用Kafka流和Kafka表进行一对多关系连接?
KStream<Integer, EnrichedBlog> joinedBlogComments = blogsStream.join(commentsTbl,
(blogId, blog) -> blog.getBlogId(),
(blog, comment) -> new EnrichedBlog(blog, comment));
因此,我需要有一个注释ID数组,而不是注释。
推荐答案
我找不到签名与您的代码示例中的签名匹配的Join方法,但我认为问题在于:
KTables被解释为更改日志,也就是说,具有相同键的每条下一条消息都被解释为对记录的更新,而不是新记录。这就是为什么您只能看到给定关键字(博客ID)的最后一条"评论"消息,先前的值将被覆盖。 要克服这个问题,首先需要更改填充KTable的方式。您可以做的是将您的评论主题作为KStream添加到您的拓扑中,然后执行一个聚合,该聚合只是构建一个共享相同博客id的评论的数组或列表。该聚合返回一个KTable,您可以使用该KTable连接您的博客KStream。这里有一个如何构建列表值KTable的草图:
builder.stream("yourCommentTopic") // where key is blog id
.groupByKey()
.aggregate(() -> new ArrayList(),
(key, value, agg) -> new KeyValue<>(key, agg.add(value)),
yourListSerde);
在聚合中使用列表比使用数组更容易,因此如果需要,我建议您将其转换为下游的数组。您还需要为您的列表提供一个serde实现,即上面示例中的"youListSerde"。