使用python 脚本推送json数据到kafka 中间件中
很多事实,我们使用python 脚本,将需要处理的JSON 数据推送到 kafka 中间件中的案例 代码如下:from confluent_kafka import Consumerfrom confluent_kafka import Producerfrom confluent_kafka import KafkaError#创建kafka 连接信息producer_conf = {}prod
·
很多事实,我们使用python 脚本,将需要处理的JSON 数据推送到 kafka 中间件中的案例 代码如下:
安装librdkafka
For RedHat and RPM-based distros, add this YUM repo and then do sudo yum install librdkafka-devel python-devel: http://docs.confluent.io/current/installation.html#rpm-packages-via-yum
from confluent_kafka import Consumer
from confluent_kafka import Producer
from confluent_kafka import KafkaError
#创建kafka 连接信息
producer_conf = {}
producer_conf['bootstrap.servers'] = 'localhost:9092'
producer = Producer(**producer_conf)
file_obj = open('1.json','r')
all_the_text = file_obj.readline()
file_obj.close()
#定义TOPIC信息
kafka_topic = "message_top"
try:
producer.produce('topic',all_the_text,callback=self.del_callback)
producer.poll(0)
except Exception as e:
print('异常'+str(e))
producer.flush()
def del_callback(self,err,msg):
if err:
print(err)
producer.flush()
这一句必须加 否则推送不过去

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)