【Flink-stream开发中,将 kafka 的数据存入 hbase 中(scala)】
此处使用的是HBaseSinkFunction函数,需要HBaseConfiguration参数,MutationConverter参数。
·
目录
一、项目需求
需要将 kafka topic中的数据使用 flink 读取并存入 hbase 中
需要在 hbase shell 中查出 product_name 字段的前五条数据
- 以下是kafka topic中的 json 数据格式
- 用 scala 语言编写,scala-2.12版本,使用的的 java 版本是1.8
- flink的版本是1.14版本,此处使用的是 Datastream 没有涉及到 sql 的内容
- 使用的 maven 管理项目依赖
{
"order_id": 458,
"order_sn": "2024110659360903",
"customer_id": 6644,
"shipping_user": "刘鹏",
"province": "上海市",
"city": "上海市",
"address": "上海市上海市南京西路12664101号恒隆广场2期大堂22层",
"order_source": 1,
"payment_method": 2,
"order_money": 6857.71,
"district_money": 4114.63,
"shipping_money": 37.26,
"payment_money": 2780.34,
"shipping_comp_name": "顺丰",
"shipping_sn": "8335790790612",
"create_time": "20241101211934",
"shipping_time": "NULL",
"pay_time": "NULL",
"receive_time": "NULL",
"order_status": "已下单",
"order_point": 278,
"invoice_title": "维旺明网络有限公司",
"modified_time": "2024-11-01 13:19:34"
}
二、导入相关依赖
1、涉及到要读取kafka中的数据,需要导入kafka的连接器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.0</version>
</dependency>
2、涉及到数据要存入hbase中,需要导入hbase的连接器
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_2.12</artifactId>
<version>1.14.0</version>
</dependency>
3、flink-Datastream开发必备的依赖项
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.14.0</version>
</dependency>
4、导入日志依赖,方便查看flink的日志
flink依赖导入中会自动引入自带的日志包,个人觉得不是很好用,如过要使用这里导入的日志包,需要在导入日志依赖前去项目中的library库中移除flink自带的日志依赖
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.12</version>
</dependency>
导入完依赖之后,还需要在
resource
里面配置log4j2.properties
文件(此文件需要自行建立)
# Log4j
status = error
name = PropertiesConfig
appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{yyyy-MM-dd HH:mm:ss}] %-5p [%t] %c: %m%n
rootLogger.level = info
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = STDOUT
logger.spark.name = org.apache.spark
logger.spark.level = info
logger.flink.name = org.apache.flink
logger.flink.level = info
三、编写工程代码
1、创建 flink 的流式数据环境
final val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1) // 设置并行度为 1 ,运行没那么快,方便查看数据处理的过程
streamEnv.getConfig.setAutoWatermarkInterval(1000) // 设置水印生成周期间隔为1000毫秒
2、读取kafka topic中的数据
val kafka_source = KafkaSource.builder()
.setBootstrapServers("bigdata1:9092")
.setTopics("my_order")
.setGroupId("consumer")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
3、将读取的kafka数据进行处理,放入样例类对象 case class 中
// kafka中读取的数据,对应的写了一个样例类,用于存放topic中的数据
case class order_master(
order_id: Int,
order_sn: String,
customer_id: Int,
shipping_user: String,
province: String,
city: String,
address: String,
order_source: Int,
payment_method: Int,
order_money: Double,
district_money: Double,
shipping_money: Double,
payment_money: Double,
shipping_comp_name: String,
shipping_sn: String,
create_time: String,
shipping_time: String,
pay_time: String,
receive_time: String,
order_status: String,
order_point: Int,
invoice_title: String,
modified_time: String
)
kafka 的数据处理及对应的水印生成策略
streamEnv.fromSource(kafka_source, WatermarkStrategy.noWatermarks(), "kafka_source")
.filter(data => data.contains("order_id"))
.map(data => {
val jSONObject = JSON.parseObject(data)
val order_id = jSONObject.getIntValue("order_id")
val order_sn = jSONObject.getString("order_sn")
val customer_id = jSONObject.getIntValue("customer_id")
val shipping_user = jSONObject.getString("shipping_user")
val province = jSONObject.getString("province")
val city = jSONObject.getString("city")
val address = jSONObject.getString("address")
val order_source = jSONObject.getIntValue("order_source")
val payment_method = jSONObject.getIntValue("payment_method")
val order_money = jSONObject.getDoubleValue("order_money")
val district_money = jSONObject.getDoubleValue("district_money")
val shipping_money = jSONObject.getDoubleValue("shipping_money")
val payment_money = jSONObject.getDoubleValue("payment_money")
val shipping_comp_name = jSONObject.getString("shipping_comp_name")
val shipping_sn = jSONObject.getString("shipping_sn")
val create_time = jSONObject.getString("create_time")
val shipping_time = jSONObject.getString("shipping_time")
val pay_time = jSONObject.getString("pay_time")
val receive_time = jSONObject.getString("receive_time")
val order_status = jSONObject.getString("order_status")
val order_point = jSONObject.getIntValue("order_point")
val invoice_title = jSONObject.getString("invoice_title")
val modified_time = jSONObject.getString("modified_time")
val time = create_dateFormat.parse(create_time)
val date_format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
order_master(order_id, order_sn, customer_id, shipping_user, province, city, address, order_source, payment_method,
order_money, district_money, shipping_money, payment_money, shipping_comp_name, shipping_sn, date_format.format(time),
shipping_time, pay_time, receive_time, order_status, order_point, invoice_title, modified_time)
})
// 以 create_time 和 modified_time的最大值作为水印生成,时间格式解析需要对应的异常处理,避免出现空指针异常
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 允许数据迟到5秒
.withTimestampAssigner(
new SerializableTimestampAssigner[order_master] {
override def extractTimestamp(element: order_master, recordTimestamp: Long): Long =
try {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val create = if (StringUtils.isNullOrWhitespaceOnly(element.create_time)) 0L else dateFormat.parse(element.create_time).getTime
val modified = if (StringUtils.isNullOrWhitespaceOnly(element.modified_time)) create else dateFormat.parse(element.modified_time).getTime
scala.math.max(create, modified)
} catch {
case e: Exception => println(e.getMessage)
0L
}
}
)
)
4、编写代码将数据存入到 hbase 中
此处使用的是HBaseSinkFunction函数,需要HBaseConfiguration参数,MutationConverter参数
val hbase_conf = HBaseConfiguration.create()
hbase_conf.set("zookeeper.quorum", "bigdata1:2181,bigdata2:2181,bigdata3:2181")
val myHBaseMutation = new HBaseMutationConverter[order_master] {
private val cf = Bytes.toBytes("cf")
override def open(): Unit = {
println("HBaseMutationConverter is opening.")
}
override def convertToMutation(record: order_master): Mutation = {
val put = new Put(Bytes.toBytes(record.order_id))
put.addColumn(cf, Bytes.toBytes("order_sn"), Bytes.toBytes(record.order_sn))
put.addColumn(cf, Bytes.toBytes("customer_id"), Bytes.toBytes(record.customer_id))
put.addColumn(cf, Bytes.toBytes("shipping_user"), Bytes.toBytes(record.shipping_user))
put.addColumn(cf, Bytes.toBytes("province"), Bytes.toBytes(record.province))
put.addColumn(cf, Bytes.toBytes("city"), Bytes.toBytes(record.city))
put.addColumn(cf, Bytes.toBytes("address"), Bytes.toBytes(record.address))
put.addColumn(cf, Bytes.toBytes("order_source"), Bytes.toBytes(record.order_source))
put.addColumn(cf, Bytes.toBytes("payment_method"), Bytes.toBytes(record.payment_method))
put.addColumn(cf, Bytes.toBytes("order_money"), Bytes.toBytes(record.order_money))
put.addColumn(cf, Bytes.toBytes("district_money"), Bytes.toBytes(record.district_money))
put.addColumn(cf, Bytes.toBytes("shipping_money"), Bytes.toBytes(record.shipping_money))
put.addColumn(cf, Bytes.toBytes("payment_money"), Bytes.toBytes(record.payment_money))
put.addColumn(cf, Bytes.toBytes("shipping_comp_name"), Bytes.toBytes(record.shipping_comp_name))
put.addColumn(cf, Bytes.toBytes("shipping_sn"), Bytes.toBytes(record.shipping_sn))
put.addColumn(cf, Bytes.toBytes("create_time"), Bytes.toBytes(record.create_time))
put.addColumn(cf, Bytes.toBytes("shipping_time"), Bytes.toBytes(record.shipping_time))
put.addColumn(cf, Bytes.toBytes("pay_time"), Bytes.toBytes(record.pay_time))
put.addColumn(cf, Bytes.toBytes("receive_time"), Bytes.toBytes(record.receive_time))
put.addColumn(cf, Bytes.toBytes("invoice_title"), Bytes.toBytes(record.invoice_title))
put.addColumn(cf, Bytes.toBytes("modified_time"), Bytes.toBytes(record.modified_time))
put
}
}
// "order_info_1"是需要填写的表名,order_master 是需要填写的传入数据的类型
// 后面三个数字分别是缓存的最大字节大小,缓存的最大转变数量,间隔时间
val hbase_sink = new HBaseSinkFunction[order_master]("order_info", hbase_conf, myHBaseMutation, 100000000, 1000, 5000)
5、完整代码
import com.alibaba.fastjson2.JSON
import com.ibm.icu.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.hbase.sink.{HBaseMutationConverter, HBaseSinkFunction}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.StringUtils
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Mutation, Put}
import org.apache.hadoop.hbase.util.Bytes
import java.time.Duration
object test{
final val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
streamEnv.getConfig.setAutoWatermarkInterval(1000)
case class order_master(
order_id: Int,
order_sn: String,
customer_id: Int,
shipping_user: String,
province: String,
city: String,
address: String,
order_source: Int,
payment_method: Int,
order_money: Double,
district_money: Double,
shipping_money: Double,
payment_money: Double,
shipping_comp_name: String,
shipping_sn: String,
create_time: String,
shipping_time: String,
pay_time: String,
receive_time: String,
order_status: String,
order_point: Int,
invoice_title: String,
modified_time: String
)
def main(args: Array[String]): Unit = {
val kafka_source = KafkaSource.builder()
.setBootstrapServers("bigdata1:9092")
.setTopics("my_order")
.setGroupId("consumer")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
val hbase_conf = HBaseConfiguration.create()
hbase_conf.set("zookeeper.quorum", "bigdata1:2181,bigdata2:2181,bigdata3:2181")
val myHBaseMutation = new HBaseMutationConverter[order_master] {
private val cf = Bytes.toBytes("cf")
override def open(): Unit = {
println("HBaseMutationConverter is opening.")
}
override def convertToMutation(record: order_master): Mutation = {
val put = new Put(Bytes.toBytes(record.order_id))
put.addColumn(cf, Bytes.toBytes("order_sn"), Bytes.toBytes(record.order_sn))
put.addColumn(cf, Bytes.toBytes("customer_id"), Bytes.toBytes(record.customer_id))
put.addColumn(cf, Bytes.toBytes("shipping_user"), Bytes.toBytes(record.shipping_user))
put.addColumn(cf, Bytes.toBytes("province"), Bytes.toBytes(record.province))
put.addColumn(cf, Bytes.toBytes("city"), Bytes.toBytes(record.city))
put.addColumn(cf, Bytes.toBytes("address"), Bytes.toBytes(record.address))
put.addColumn(cf, Bytes.toBytes("order_source"), Bytes.toBytes(record.order_source))
put.addColumn(cf, Bytes.toBytes("payment_method"), Bytes.toBytes(record.payment_method))
put.addColumn(cf, Bytes.toBytes("order_money"), Bytes.toBytes(record.order_money))
put.addColumn(cf, Bytes.toBytes("district_money"), Bytes.toBytes(record.district_money))
put.addColumn(cf, Bytes.toBytes("shipping_money"), Bytes.toBytes(record.shipping_money))
put.addColumn(cf, Bytes.toBytes("payment_money"), Bytes.toBytes(record.payment_money))
put.addColumn(cf, Bytes.toBytes("shipping_comp_name"), Bytes.toBytes(record.shipping_comp_name))
put.addColumn(cf, Bytes.toBytes("shipping_sn"), Bytes.toBytes(record.shipping_sn))
put.addColumn(cf, Bytes.toBytes("create_time"), Bytes.toBytes(record.create_time))
put.addColumn(cf, Bytes.toBytes("shipping_time"), Bytes.toBytes(record.shipping_time))
put.addColumn(cf, Bytes.toBytes("pay_time"), Bytes.toBytes(record.pay_time))
put.addColumn(cf, Bytes.toBytes("receive_time"), Bytes.toBytes(record.receive_time))
put.addColumn(cf, Bytes.toBytes("invoice_title"), Bytes.toBytes(record.invoice_title))
put.addColumn(cf, Bytes.toBytes("modified_time"), Bytes.toBytes(record.modified_time))
put
}
}
val hbase_sink = new HBaseSinkFunction[order_master]("order_info", hbase_conf, myHBaseMutation, 100000000, 1000, 5000)
streamEnv.fromSource(kafka_source, WatermarkStrategy.noWatermarks(), "kafka_source")
.filter(data => data.contains("order_id"))
.map(data => {
val jSONObject = JSON.parseObject(data)
val order_id = jSONObject.getIntValue("order_id")
val order_sn = jSONObject.getString("order_sn")
val customer_id = jSONObject.getIntValue("customer_id")
val shipping_user = jSONObject.getString("shipping_user")
val province = jSONObject.getString("province")
val city = jSONObject.getString("city")
val address = jSONObject.getString("address")
val order_source = jSONObject.getIntValue("order_source")
val payment_method = jSONObject.getIntValue("payment_method")
val order_money = jSONObject.getDoubleValue("order_money")
val district_money = jSONObject.getDoubleValue("district_money")
val shipping_money = jSONObject.getDoubleValue("shipping_money")
val payment_money = jSONObject.getDoubleValue("payment_money")
val shipping_comp_name = jSONObject.getString("shipping_comp_name")
val shipping_sn = jSONObject.getString("shipping_sn")
val create_time = jSONObject.getString("create_time")
val shipping_time = jSONObject.getString("shipping_time")
val pay_time = jSONObject.getString("pay_time")
val receive_time = jSONObject.getString("receive_time")
val order_status = jSONObject.getString("order_status")
val order_point = jSONObject.getIntValue("order_point")
val invoice_title = jSONObject.getString("invoice_title")
val modified_time = jSONObject.getString("modified_time")
val time = create_dateFormat.parse(create_time)
val date_format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
order_master(order_id, order_sn, customer_id, shipping_user, province, city, address, order_source, payment_method,
order_money, district_money, shipping_money, payment_money, shipping_comp_name, shipping_sn, date_format.format(time),
shipping_time, pay_time, receive_time, order_status, order_point, invoice_title, modified_time)
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(
new SerializableTimestampAssigner[order_master] {
override def extractTimestamp(element: order_master, recordTimestamp: Long): Long = try {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val create = if (StringUtils.isNullOrWhitespaceOnly(element.create_time)) 0L else dateFormat.parse(element.create_time).getTime
val modified = if (StringUtils.isNullOrWhitespaceOnly(element.modified_time)) create else dateFormat.parse(element.modified_time).getTime
scala.math.max(create, modified)
create
} catch {
case e: Exception => println(e.getMessage)
0L
}
}
)
)
.filter(data => data.order_status.contains("已下单"))
.addSink(hbase_sink)
streamEnv.execute
}
}
6、hbase shell中查询结果
scan 'shtd_result:order_info', {COLUMNS => 'cf:shipping_user',FORMATTER => 'toString', LIMIT => 5}

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)