一、项目需求

需要将 kafka topic中的数据使用 flink 读取并存入 hbase 中
需要在 hbase shell 中查出 product_name 字段的前五条数据

  • 以下是kafka topic中的 json 数据格式
  • 用 scala 语言编写,scala-2.12版本,使用的的 java 版本是1.8
  • flink的版本是1.14版本,此处使用的是 Datastream 没有涉及到 sql 的内容
  • 使用的 maven 管理项目依赖
{
    "order_id": 458,
    "order_sn": "2024110659360903",
    "customer_id": 6644,
    "shipping_user": "刘鹏",
    "province": "上海市",
    "city": "上海市",
    "address": "上海市上海市南京西路12664101号恒隆广场2期大堂22层",
    "order_source": 1,
    "payment_method": 2,
    "order_money": 6857.71,
    "district_money": 4114.63,
    "shipping_money": 37.26,
    "payment_money": 2780.34,
    "shipping_comp_name": "顺丰",
    "shipping_sn": "8335790790612",
    "create_time": "20241101211934",
    "shipping_time": "NULL",
    "pay_time": "NULL",
    "receive_time": "NULL",
    "order_status": "已下单",
    "order_point": 278,
    "invoice_title": "维旺明网络有限公司",
    "modified_time": "2024-11-01 13:19:34"
}

二、导入相关依赖

1、涉及到要读取kafka中的数据,需要导入kafka的连接器

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>

2、涉及到数据要存入hbase中,需要导入hbase的连接器

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>

3、flink-Datastream开发必备的依赖项

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>

4、导入日志依赖,方便查看flink的日志

flink依赖导入中会自动引入自带的日志包,个人觉得不是很好用,如过要使用这里导入的日志包,需要在导入日志依赖前去项目中的library库中移除flink自带的日志依赖

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.20.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j2-impl</artifactId>
            <version>2.20.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.20.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.12</version>
        </dependency>

导入完依赖之后,还需要在 resource 里面配置 log4j2.properties 文件(此文件需要自行建立)

# Log4j
status = error

name = PropertiesConfig

appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{yyyy-MM-dd HH:mm:ss}] %-5p [%t] %c: %m%n

rootLogger.level = info
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = STDOUT

logger.spark.name = org.apache.spark
logger.spark.level = info

logger.flink.name = org.apache.flink
logger.flink.level = info


三、编写工程代码

1、创建 flink 的流式数据环境

  final val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
  streamEnv.setParallelism(1) // 设置并行度为 1 ,运行没那么快,方便查看数据处理的过程
  streamEnv.getConfig.setAutoWatermarkInterval(1000) // 设置水印生成周期间隔为1000毫秒

2、读取kafka topic中的数据

    val kafka_source = KafkaSource.builder()
      .setBootstrapServers("bigdata1:9092")
      .setTopics("my_order")
      .setGroupId("consumer")
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build()

3、将读取的kafka数据进行处理,放入样例类对象 case class 中

  // kafka中读取的数据,对应的写了一个样例类,用于存放topic中的数据
  case class order_master(
                           order_id: Int,
                           order_sn: String,
                           customer_id: Int,
                           shipping_user: String,
                           province: String,
                           city: String,
                           address: String,
                           order_source: Int,
                           payment_method: Int,
                           order_money: Double,
                           district_money: Double,
                           shipping_money: Double,
                           payment_money: Double,
                           shipping_comp_name: String,
                           shipping_sn: String,
                           create_time: String,
                           shipping_time: String,
                           pay_time: String,
                           receive_time: String,
                           order_status: String,
                           order_point: Int,
                           invoice_title: String,
                           modified_time: String
                         )

kafka 的数据处理及对应的水印生成策略

streamEnv.fromSource(kafka_source, WatermarkStrategy.noWatermarks(), "kafka_source")
      .filter(data => data.contains("order_id"))
      .map(data => {
        val jSONObject = JSON.parseObject(data)
        val order_id = jSONObject.getIntValue("order_id")
        val order_sn = jSONObject.getString("order_sn")
        val customer_id = jSONObject.getIntValue("customer_id")
        val shipping_user = jSONObject.getString("shipping_user")
        val province = jSONObject.getString("province")
        val city = jSONObject.getString("city")
        val address = jSONObject.getString("address")
        val order_source = jSONObject.getIntValue("order_source")
        val payment_method = jSONObject.getIntValue("payment_method")
        val order_money = jSONObject.getDoubleValue("order_money")
        val district_money = jSONObject.getDoubleValue("district_money")
        val shipping_money = jSONObject.getDoubleValue("shipping_money")
        val payment_money = jSONObject.getDoubleValue("payment_money")
        val shipping_comp_name = jSONObject.getString("shipping_comp_name")
        val shipping_sn = jSONObject.getString("shipping_sn")
        val create_time = jSONObject.getString("create_time")
        val shipping_time = jSONObject.getString("shipping_time")
        val pay_time = jSONObject.getString("pay_time")
        val receive_time = jSONObject.getString("receive_time")
        val order_status = jSONObject.getString("order_status")
        val order_point = jSONObject.getIntValue("order_point")
        val invoice_title = jSONObject.getString("invoice_title")
        val modified_time = jSONObject.getString("modified_time")
        val time = create_dateFormat.parse(create_time)
        val date_format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        order_master(order_id, order_sn, customer_id, shipping_user, province, city, address, order_source, payment_method,
          order_money, district_money, shipping_money, payment_money, shipping_comp_name, shipping_sn, date_format.format(time),
          shipping_time, pay_time, receive_time, order_status, order_point, invoice_title, modified_time)
      })
	  // 以 create_time 和 modified_time的最大值作为水印生成,时间格式解析需要对应的异常处理,避免出现空指针异常
      .assignTimestampsAndWatermarks(
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 允许数据迟到5秒
          .withTimestampAssigner(
            new SerializableTimestampAssigner[order_master] {
              override def extractTimestamp(element: order_master, recordTimestamp: Long): Long = 
              try {
                val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                val create = if (StringUtils.isNullOrWhitespaceOnly(element.create_time)) 0L else dateFormat.parse(element.create_time).getTime
                val modified = if (StringUtils.isNullOrWhitespaceOnly(element.modified_time)) create else dateFormat.parse(element.modified_time).getTime
                scala.math.max(create, modified)
              } catch {
                case e: Exception => println(e.getMessage)
                  0L
              }
            }
          )
      )

4、编写代码将数据存入到 hbase 中

此处使用的是HBaseSinkFunction函数,需要HBaseConfiguration参数,MutationConverter参数

    val hbase_conf = HBaseConfiguration.create()
    hbase_conf.set("zookeeper.quorum", "bigdata1:2181,bigdata2:2181,bigdata3:2181")

    val myHBaseMutation = new HBaseMutationConverter[order_master] {
      private val cf = Bytes.toBytes("cf")

      override def open(): Unit = {
        println("HBaseMutationConverter is opening.")
      }

      override def convertToMutation(record: order_master): Mutation = {
        val put = new Put(Bytes.toBytes(record.order_id))
        put.addColumn(cf, Bytes.toBytes("order_sn"), Bytes.toBytes(record.order_sn))
        put.addColumn(cf, Bytes.toBytes("customer_id"), Bytes.toBytes(record.customer_id))
        put.addColumn(cf, Bytes.toBytes("shipping_user"), Bytes.toBytes(record.shipping_user))
        put.addColumn(cf, Bytes.toBytes("province"), Bytes.toBytes(record.province))
        put.addColumn(cf, Bytes.toBytes("city"), Bytes.toBytes(record.city))
        put.addColumn(cf, Bytes.toBytes("address"), Bytes.toBytes(record.address))
        put.addColumn(cf, Bytes.toBytes("order_source"), Bytes.toBytes(record.order_source))
        put.addColumn(cf, Bytes.toBytes("payment_method"), Bytes.toBytes(record.payment_method))
        put.addColumn(cf, Bytes.toBytes("order_money"), Bytes.toBytes(record.order_money))
        put.addColumn(cf, Bytes.toBytes("district_money"), Bytes.toBytes(record.district_money))
        put.addColumn(cf, Bytes.toBytes("shipping_money"), Bytes.toBytes(record.shipping_money))
        put.addColumn(cf, Bytes.toBytes("payment_money"), Bytes.toBytes(record.payment_money))
        put.addColumn(cf, Bytes.toBytes("shipping_comp_name"), Bytes.toBytes(record.shipping_comp_name))
        put.addColumn(cf, Bytes.toBytes("shipping_sn"), Bytes.toBytes(record.shipping_sn))
        put.addColumn(cf, Bytes.toBytes("create_time"), Bytes.toBytes(record.create_time))
        put.addColumn(cf, Bytes.toBytes("shipping_time"), Bytes.toBytes(record.shipping_time))
        put.addColumn(cf, Bytes.toBytes("pay_time"), Bytes.toBytes(record.pay_time))
        put.addColumn(cf, Bytes.toBytes("receive_time"), Bytes.toBytes(record.receive_time))
        put.addColumn(cf, Bytes.toBytes("invoice_title"), Bytes.toBytes(record.invoice_title))
        put.addColumn(cf, Bytes.toBytes("modified_time"), Bytes.toBytes(record.modified_time))
        put
      }
    }
    // "order_info_1"是需要填写的表名,order_master 是需要填写的传入数据的类型
    // 后面三个数字分别是缓存的最大字节大小,缓存的最大转变数量,间隔时间
	val hbase_sink = new HBaseSinkFunction[order_master]("order_info", hbase_conf, myHBaseMutation, 100000000, 1000, 5000)

5、完整代码

import com.alibaba.fastjson2.JSON
import com.ibm.icu.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.hbase.sink.{HBaseMutationConverter, HBaseSinkFunction}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.StringUtils
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Mutation, Put}
import org.apache.hadoop.hbase.util.Bytes

import java.time.Duration

object test{
  final val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
  streamEnv.setParallelism(1)
  streamEnv.getConfig.setAutoWatermarkInterval(1000)

  case class order_master(
                           order_id: Int,
                           order_sn: String,
                           customer_id: Int,
                           shipping_user: String,
                           province: String,
                           city: String,
                           address: String,
                           order_source: Int,
                           payment_method: Int,
                           order_money: Double,
                           district_money: Double,
                           shipping_money: Double,
                           payment_money: Double,
                           shipping_comp_name: String,
                           shipping_sn: String,
                           create_time: String,
                           shipping_time: String,
                           pay_time: String,
                           receive_time: String,
                           order_status: String,
                           order_point: Int,
                           invoice_title: String,
                           modified_time: String
                         )

  def main(args: Array[String]): Unit = {
    val kafka_source = KafkaSource.builder()
      .setBootstrapServers("bigdata1:9092")
      .setTopics("my_order")
      .setGroupId("consumer")
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build()

    val hbase_conf = HBaseConfiguration.create()
    hbase_conf.set("zookeeper.quorum", "bigdata1:2181,bigdata2:2181,bigdata3:2181")

    val myHBaseMutation = new HBaseMutationConverter[order_master] {
      private val cf = Bytes.toBytes("cf")

      override def open(): Unit = {
        println("HBaseMutationConverter is opening.")
      }

      override def convertToMutation(record: order_master): Mutation = {
        val put = new Put(Bytes.toBytes(record.order_id))
        put.addColumn(cf, Bytes.toBytes("order_sn"), Bytes.toBytes(record.order_sn))
        put.addColumn(cf, Bytes.toBytes("customer_id"), Bytes.toBytes(record.customer_id))
        put.addColumn(cf, Bytes.toBytes("shipping_user"), Bytes.toBytes(record.shipping_user))
        put.addColumn(cf, Bytes.toBytes("province"), Bytes.toBytes(record.province))
        put.addColumn(cf, Bytes.toBytes("city"), Bytes.toBytes(record.city))
        put.addColumn(cf, Bytes.toBytes("address"), Bytes.toBytes(record.address))
        put.addColumn(cf, Bytes.toBytes("order_source"), Bytes.toBytes(record.order_source))
        put.addColumn(cf, Bytes.toBytes("payment_method"), Bytes.toBytes(record.payment_method))
        put.addColumn(cf, Bytes.toBytes("order_money"), Bytes.toBytes(record.order_money))
        put.addColumn(cf, Bytes.toBytes("district_money"), Bytes.toBytes(record.district_money))
        put.addColumn(cf, Bytes.toBytes("shipping_money"), Bytes.toBytes(record.shipping_money))
        put.addColumn(cf, Bytes.toBytes("payment_money"), Bytes.toBytes(record.payment_money))
        put.addColumn(cf, Bytes.toBytes("shipping_comp_name"), Bytes.toBytes(record.shipping_comp_name))
        put.addColumn(cf, Bytes.toBytes("shipping_sn"), Bytes.toBytes(record.shipping_sn))
        put.addColumn(cf, Bytes.toBytes("create_time"), Bytes.toBytes(record.create_time))
        put.addColumn(cf, Bytes.toBytes("shipping_time"), Bytes.toBytes(record.shipping_time))
        put.addColumn(cf, Bytes.toBytes("pay_time"), Bytes.toBytes(record.pay_time))
        put.addColumn(cf, Bytes.toBytes("receive_time"), Bytes.toBytes(record.receive_time))
        put.addColumn(cf, Bytes.toBytes("invoice_title"), Bytes.toBytes(record.invoice_title))
        put.addColumn(cf, Bytes.toBytes("modified_time"), Bytes.toBytes(record.modified_time))
        put
      }
    }

	val hbase_sink = new HBaseSinkFunction[order_master]("order_info", hbase_conf, myHBaseMutation, 100000000, 1000, 5000)

    streamEnv.fromSource(kafka_source, WatermarkStrategy.noWatermarks(), "kafka_source")
      .filter(data => data.contains("order_id"))
      .map(data => {
        val jSONObject = JSON.parseObject(data)
        val order_id = jSONObject.getIntValue("order_id")
        val order_sn = jSONObject.getString("order_sn")
        val customer_id = jSONObject.getIntValue("customer_id")
        val shipping_user = jSONObject.getString("shipping_user")
        val province = jSONObject.getString("province")
        val city = jSONObject.getString("city")
        val address = jSONObject.getString("address")
        val order_source = jSONObject.getIntValue("order_source")
        val payment_method = jSONObject.getIntValue("payment_method")
        val order_money = jSONObject.getDoubleValue("order_money")
        val district_money = jSONObject.getDoubleValue("district_money")
        val shipping_money = jSONObject.getDoubleValue("shipping_money")
        val payment_money = jSONObject.getDoubleValue("payment_money")
        val shipping_comp_name = jSONObject.getString("shipping_comp_name")
        val shipping_sn = jSONObject.getString("shipping_sn")
        val create_time = jSONObject.getString("create_time")
        val shipping_time = jSONObject.getString("shipping_time")
        val pay_time = jSONObject.getString("pay_time")
        val receive_time = jSONObject.getString("receive_time")
        val order_status = jSONObject.getString("order_status")
        val order_point = jSONObject.getIntValue("order_point")
        val invoice_title = jSONObject.getString("invoice_title")
        val modified_time = jSONObject.getString("modified_time")
        val time = create_dateFormat.parse(create_time)
        val date_format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        order_master(order_id, order_sn, customer_id, shipping_user, province, city, address, order_source, payment_method,
          order_money, district_money, shipping_money, payment_money, shipping_comp_name, shipping_sn, date_format.format(time),
          shipping_time, pay_time, receive_time, order_status, order_point, invoice_title, modified_time)
      })
      .assignTimestampsAndWatermarks(
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
          .withTimestampAssigner(
            new SerializableTimestampAssigner[order_master] {
              override def extractTimestamp(element: order_master, recordTimestamp: Long): Long = try {
                val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                val create = if (StringUtils.isNullOrWhitespaceOnly(element.create_time)) 0L else dateFormat.parse(element.create_time).getTime
                val modified = if (StringUtils.isNullOrWhitespaceOnly(element.modified_time)) create else dateFormat.parse(element.modified_time).getTime
                scala.math.max(create, modified)
                create
              } catch {
                case e: Exception => println(e.getMessage)
                  0L
              }
            }
          )
      )
      .filter(data => data.order_status.contains("已下单"))
      .addSink(hbase_sink)

    streamEnv.execute
  }
}

6、hbase shell中查询结果

scan 'shtd_result:order_info', {COLUMNS => 'cf:shipping_user',FORMATTER => 'toString', LIMIT => 5}

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐