Flink流处理引擎系统学习(九)
前言今天主要是过下Flink的DataStream API入门,编码基本套路、数据源等等。昨天部门经理把项目的demo权限放开了,看了下,基本的大流程已经实现了。使用的是1.14.3版本(现在1.14.4),代码最后git时间是3.28。不得不说,demo基本把要改造的大流程写好了,还是强啊。所以说还是要保持好学习能力,是不是要走领导岗位就看个人了(不过不得不说当了一定级别的领导菜有更多的时间st
前言
今天主要是过下Flink的DataStream API入门,编码基本套路、数据源等等。
昨天部门经理把项目的demo权限放开了,看了下,基本的大流程已经实现了。使用的是1.14.3版本(现在1.14.4),代码最后git时间是3.28。不得不说,demo基本把要改造的大流程写好了,还是强啊。所以说还是要保持好学习能力,是不是要走领导岗位就看个人了(不过不得不说当了一定级别的领导菜有更多的时间study up。这个级别不能低也不能高)。
一、Flink编码套路
运行模型
数据源
自定义数据源示例
转换
DataSink
二、迭代运算
1.简单理解迭代运算
2.流式迭代运算
3.延迟控制
理解梳理:
吞吐针对整体而言,延迟针对个体而言。
三、Flink调试
1、调试手段
PS:
显式创建本地环境不推荐这样写,getExecutionEnvironment方法里会判断环境,并创建环境。
2、数据模拟
实际上1.14.4里DataStreamUtil.collect()相关方法已过时,替代的是对keyedStream的reinterpretAsKeyedStream
四、流式迭代demo
业务需求:输入一组数据,分别进行减1运算,直到等于0为止
package spendreport.stream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author zhengwen
**/
public class IterativeStreamJob {
public static void main(String[] args) throws Exception {
//输入一组数据,分别进行减1运算,直到等于0为止
final StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
DataStream<Long> input = env.generateSequence(0, 10);
//基于输入流构建IterativeStream(迭代头)
IterativeStream<Long> itStream = input.iterate();
//定义迭代逻辑(map、fun等)
DataStream<Long> minusOne = itStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
//业务目的(进行减1运算)
return value - 1;
}
});
//定义反馈流逻辑(从迭代过的流中过滤出符合条件的元素组成的部分流反馈给迭代头互补性重复计算逻辑)
DataStream<Long> greaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
//调用IterativeStream的closeWith方法可以关闭一个迭代(迭代尾)
itStream.closeWith(greaterThanZero);
//定义终止迭代的逻辑(符合条件的元素将被分发给下游进行下一次迭代)
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
//打印
//lessThanZero.print();
//使用调试结果收集工具
KeyedStream<Long, Object> ls = DataStreamUtils.reinterpretAsKeyedStream(lessThanZero,
new KeySelector<Long, Object>() {
@Override
public Object getKey(Long value) throws Exception {
return value;
}
});
//打印
ls.print();
env.execute("IterativeStreamJob");
}
}
总结
流式处理的迭代写法有点不好理解,跟我们平常的迭代方法区别有点大吧。其实用流水作业想就好理解多了,后面也许有高级的简单写法。
好,就写到这里,up!!!

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)