流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理。

批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点。

Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输模型:

  • Flink以固定的缓存块为单位进行网络数据传输,用户可以通过设置缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似上文所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟
  • 如果缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型,此时系统可以获得最高的吞吐量
  • 同时缓存块的超时值也可以设置为0到无限大之间的任意值。缓存块的超时阈值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活地权衡系统延迟和吞吐量 
     

一、流处理系统

流处理Streaming
• StreamExecutionEnvironment
• DataStreaming

需求 通过socket实时产生一些单词,使用flink实时接收数据,对指定时间窗口内(例如:2秒)的数据进行聚合 统计,并且把时间窗口内计算的结果打印出来

代码:


  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.socketTextStream("192.168.221.131", 9001)

    import org.apache.flink.api.scala._
    val wordCount = text.flatMap(_.split(" "))//将每一行数据根据空格切分单词
      .map((_,1))   //每一个单词转换为tuple2的形式(单词,1)
      .keyBy(tup=>tup._1)  //官方推荐使用keyselector选择器选择数据
      .timeWindow(Time.seconds(2))  //时间窗口为2秒,表示每隔2秒钟计算一次接收到的数
      .sum(1)

    wordCount.print().setParallelism(1)
    //执行程序
    env.execute("SocketWindowWord")
  }

在服务器上输出nc - l 9001,如图所示,然后启动项目

数据hello you  hello me hello you

idea控制台可以看到如下效果

二、批处理系统

批处理Streaming
• ExecutionEnvironment

需求:统计指定文件中单词出现的总次数

代码:

 def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val inputPath="hdfs://192.168.221.131:9000/hello.txt"

    val outPath="hdfs://192.168.221.131:9000/out082001"
    val text = env.readTextFile(inputPath)

    import org.apache.flink.api.scala._
    val  wordCount= text.flatMap(_.split(" "))
    .map((_,1)).groupBy(0).sum(1).setParallelism(1)

    wordCount.writeAsCsv(outPath,"\n","") //指定行分割符和字段分割符
    env.execute("BatchWordCount")
  }

结果:

 

三、异常处理

3.1 flink编译报错

错误信息:Error:(18, 33) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]

解决方法:

推荐的做法是在代码中引入以下包:

import org.apache.flink.streaming.api.scala._

但是没有解决问题,最后发现是Scala版本问题。自己的idea的scala版本和pom.xml中的不一致!!!

我的版本为 2.11.12

然后看下pom.xml文件,flink对应的scala版本是2.12系列,所以报错。flink对应的scala版本是2.11系列即可解决问题,如下图所示

3.2 启动异常

异常信息:

Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
 

解决方法:在pom文件引入

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.2.0</version>
</dependency>

即可解决问题

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐