步骤:

        第一步:首先mongodb数据库得支持集群,单机下不支持

       第二步:在springboot项目引入mongodb 与kafka依赖

 <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
</dependency>

     第三步:编写监听mongodb数据库集合实现类

   

@Component
public class mongodbChangeStreamListener   {
    @Autowired
    private MongoTemplate mongoTemplate;

    @Autowired
    private KafkaMessageProducer kafkaMessageProducer;

    @Async
    @EventListener(ApplicationReadyEvent.class)
    public void listen(){
        try {
             //条件过滤
            Bson filter = Filters.and(
                    Filters.in("operationType","update","replace"),//可以指定监听数据库操作类型 
                    Filters.in("fullDocument.字段名", 条件值), //根据业务需求可以指定过滤条件
                    Filters.eq("fullDocument.字段名", 条件值)); 

            //可以设置需要返回的字段,不设置则返回全部的字段
            Bson project = Projections.include("fullDocument.字段","fullDocument.字段");
            List<Bson> bsons = 
            Arrays.asList(Aggregates.match(filter),Aggregates.project(project));
            ChangeStreamIterable<Document> changeStream = mongoTemplate.getCollection("集合名称")
                    .watch(bsons).fullDocument(FullDocument.UPDATE_LOOKUP);
            for (ChangeStreamDocument<Document> changeStreamDocument : changeStream) {
                //获取需要的内容
                Document fullDocument = changeStreamDocument.getFullDocument();
                  //然后拿到变更的数据 根据业务需求进行处理  可以同步到消息中间件如kafka
                   kafkaMessageProducer.push("topic",fullDocument) 
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
      
        }
    }

}

 第四步:kafka生产者

@Component
public class KafkaMessageProducer {

    @Autowired
    KafkaTemplate kafkaTemplate ;

    public void push(String topic,String msg) {
        try {
            kafkaTemplate.send(topic,msg);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐