很多事实,我们使用python 脚本,将需要处理的JSON 数据推送到 kafka 中间件中的案例 代码如下:

安装librdkafka

For RedHat and RPM-based distros, add this YUM repo and then do sudo yum install librdkafka-devel python-devel: http://docs.confluent.io/current/installation.html#rpm-packages-via-yum

from confluent_kafka import Consumer
from confluent_kafka import Producer
from confluent_kafka import KafkaError
#创建kafka 连接信息
producer_conf = {}
producer_conf['bootstrap.servers'] = 'localhost:9092'
producer = Producer(**producer_conf)

file_obj = open('1.json','r')
all_the_text = file_obj.readline()
file_obj.close()
#定义TOPIC信息
kafka_topic = "message_top"
try:
    producer.produce('topic',all_the_text,callback=self.del_callback)
    producer.poll(0)
            
except Exception as e:
    print('异常'+str(e))

producer.flush()

def del_callback(self,err,msg):
    if err:
        print(err)

producer.flush()  

这一句必须加 否则推送不过去

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐