Operators
DataStream 程序 在Flink里是一个常规操作,对数据流进行转换(mapping, filtering, reducing)用户可以在Python DataStream API中显示指定转换的输出类型信息。FilterFunction 是用于filter transformation。不同的函数接口用于不同的转换在 Python DataStream API。比如MapFunction
Operators
DataStream Transformations #
DataStream 程序 在Flink里是一个常规操作,对数据流进行转换(mapping, filtering, reducing)
Functions
转换接收用户定义的函数作为输入, 以定义转换的功能
Implementing Function Interfaces #
不同的函数接口用于不同的转换在 Python DataStream API
比如MapFunction 是用于map transformation,
FilterFunction 是用于filter transformation
[root@master pyflink]# cat t104.py
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
class MyMapFunction(MapFunction):
def map(self, value):
return value + 1
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
data_stream.print()
mapped_stream.print()
env.execute()
Lambda Function #
如下所示,转换还可以接收lambda函数来定义转换的功能:
[root@master pyflink]# cat t105.py
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())
data_stream.print()
mapped_stream.print()
env.execute()
[root@master pyflink]# python t105.py
2> 1
2> 5
3> 2
4> 3
1> 4
2> 2
2> 6
1> 5
3> 3
4> 4
Output Type #
用户可以在Python DataStream API中显示指定转换的输出类型信息。
Convert DataStream into Table #

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)