Kafka Streams 是 Kafka 提供的用于构建流处理应用程序的 Java 库。它提供了丰富的转换算子(transformations),允许开发者对输入的 Kafka 流数据进行各种处理,如过滤、映射、聚合、连接等,以生成新的输出流。以下是对 Kafka Streams 中常见转换算子的详细说明:

**1. 无状态转换(Stateless Transformations)

**1.1 **map()flatMap()
  • map(): 对流中的每个元素应用给定函数,生成一个新的元素。函数接受一个输入值并返回一个输出值,转换过程中不涉及任何状态的存储。

    KStream<String, String> input = ...;
    KStream<String, Integer> mapped = input.map((key, value) -> new KeyValue<>(key, value.length()));
    
  • flatMap(): 类似于 map(),但允许一个输入元素映射到零个、一个或多个输出元素。返回值是一个 Iterable 或者一个流。

    KStream<String, String> input = ...;
    KStream<String, String> flatMapped = input.flatMapValues(value -> Arrays.asList(value.split("\\s+")));
    
**1.2 **filter()
  • filter(): 根据提供的谓词函数(predicate function)过滤流中的元素。只有当谓词函数返回 true 时,元素才会被保留下来。

    KStream<String, Integer> numbers = ...;
    KStream<String, Integer> evenNumbers = numbers.filter((key, value) -> value % 2 == 0);
    
**1.3 **selectKey()
  • selectKey(): 更改流中元素的键。提供一个函数,该函数接收当前的键值对,并返回新的键。

    KStream<String, User> users = ...;
    KStream<Integer, User> userIds = users.selectKey((key, user) -> user.getId());
    
**1.4 **branch()
  • branch(): 将一个流根据多个谓词函数拆分为多个子流。每个谓词对应一个输出流,如果一个元素满足某个谓词,则会被发送到对应的子流中。

    KStream<String, String> input = ...;
    List<KStream<String, String>> streams = input.branch(
        (key, value) -> value.startsWith("A"),
        (key, value) -> value.endsWith("Z"),
        (key, value) -> true); // 默认分支,捕获未匹配前两个条件的元素
    

**2. 有状态转换(Stateful Transformations)

**2.1 **aggregate(), reduce(), count(), sum(), min(), max()
  • aggregate(): 对流中的元素应用一个累加器函数,将结果累积到一个状态中。状态的初始值由提供的 Initializer 函数确定。每次有新元素到达时,累加器函数都会被调用,用新元素更新状态。最终状态可以通过 Materialized 参数指定如何存储和查询。

    KGroupedStream<String, Integer> grouped = ...;
    KTable<String, Integer> sumPerKey = grouped.aggregate(
        () -> 0,
        (key, value, aggregate) -> aggregate + value,
        Materialized.as("sum-store"));
    
  • reduce(): 类似于 aggregate(),但累加器函数只需要接受当前状态和新元素,不需要单独的初始化函数。

    KGroupedStream<String, Integer> grouped = ...;
    KTable<String, Integer> sumPerKey = grouped.reduce(
        Integer::sum,
        Materialized.as("sum-store"));
    
  • count(), sum(), min(), max(): 这些方法是 aggregate()reduce() 的简化版本,直接提供常见的聚合操作,如计数、求和、求最小值和最大值。

**2.2 **windowedBy()
  • windowedBy(): 用于定义时间窗口(如滑动窗口、会话窗口等),对窗口内的数据进行有状态转换。配合其他有状态转换算子(如 aggregate()reduce()),可以实现基于时间窗口的聚合。

    KStream<String, Integer> input = ...;
    TimeWindows windowDef = TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofSeconds(30));
    KTable<Windowed<String>, Long> countsPerWindow = input
        .groupByKey()
        .windowedBy(windowDef)
        .count();
    
**2.3 **join(), leftJoin(), outerJoin()
  • join(): 对两个流(通常是键相同的 KStream 或 KTable)进行关联(join)操作,基于键将它们的值合并。可以是基于时间窗口的 join。

    KStream<String, String> leftStream = ...;
    KStream<String, String> rightStream = ...;
    KStream<String, String> joinedStream = leftStream.join(
        rightStream,
        (leftValue, rightValue) -> leftValue + ", " + rightValue,
        JoinWindows.of(Duration.ofSeconds(30)));
    

    相应的还有 leftJoin()(左外连接)和 outerJoin()(全外连接),处理没有匹配项的情况。

**3. 其他算子

  • peek(): 用于调试,对流中的每个元素应用一个函数,允许在不改变流的情况下观察流中的数据。

    KStream<String, String> stream = ...;
    stream.peek((key, value) -> System.out.println("Key: " + key + ", Value: " + value));
    
  • transform()transformValues(): 提供自定义的处理器(Processor)实现更复杂的转换逻辑,可以访问 ProcessorContext 以获取时间戳、状态存储等。

    KStream<String, String> stream = ...;
    stream.transformValues(new MyCustomTransformer(), "my-state-store");
    

以上就是 Kafka Streams 中常见的转换算子详解。实际应用中,可以根据业务需求组合使用这些算子,构建复杂的流处理逻辑。

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐