pom.xml里引入
(因为这个项目的jar包不是很新,于是因为jar包版本有高有低。不是少个类就是类名方法名不一致

		<!-- mqtt -->
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-stream</artifactId>
			<version>5.4.1</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-mqtt</artifactId>
			<version>5.4.1</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-core</artifactId>
			<version>5.4.1</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.springframework/spring-messaging -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-messaging</artifactId>
			<version>5.2.11.RELEASE</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.4.0</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams -->
		<dependency>
			<groupId>org.reactivestreams</groupId>
			<artifactId>reactive-streams</artifactId>
			<version>1.0.3</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.springframework/spring-core -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>5.3.1</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.12.0</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
			<version>2.12.0</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-annotations</artifactId>
			<version>2.12.0</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
		<dependency>
			<groupId>org.eclipse.paho</groupId>
			<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
			<version>1.2.0</version>
		</dependency>
		<!--mqtt end-->

org.eclipse.paho.client.mqttv3版本如上1.2.0 不然会报错

maven找不到的jar,可以下载到本地,用命令安装

mvn install:install-file -Dfile=jar包路径 -DgroupId=pom里填的对应jar的groupid -DartifactId=对应jar的artifactId -Dversion=对应jar的version -Dpackaging=jar

resources文件夹下的配置文件(.yml文件)里添加配置

#mqtt
mqtt:
  host:服务器链接 #不加密 tcp://服务器地址:端口号 #加密 ssl://服务器地址:端口号
  clientinid: 3401000050 #客户端ID
  topic: topic_default #默认topic名
  qoslevel: 0  #QOS服务等级 0,1,2
  username: test #登陆服务器(broker)的用户名
  password: abc123 #登陆服务器(broker)的密码
  async: true
  keepaliveinterval: 2  #发送心跳包间隔,比断连时间短即可

注意yml按缩进划分层次mqtt:前面不要有空格(所有配置项前不可以有tab缩进

代码目录结构
java部分结构

@Configuration
@IntegrationComponentScan
public class MqttConfig {
    @Value("${mqtt.username}")//通过此注解获取yml文件中填的变量值,赋给username
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.host}")
    private String hostUrl;

    @Value("${mqtt.clientinid}")
    private String clientId;

    @Value("${mqtt.topic}")
    private String defaultTopic;

    @Value("${mqtt.qoslevel}")
    private int qosLevel;

    @Value("${mqtt.async}")
    private boolean async;

    @Value("${mqtt.keepaliveinterval}")
    private int keepAliveInterval;

    @Autowired
    private MqttReceiveHandle mqttReceiveHandle;//接收消息

    public MqttConnectOptions getMqttConnectOptions(){//连接服务器
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        if (StringUtils.isNotBlank(username)) {
            mqttConnectOptions.setUserName(username);
        }
        if (StringUtils.isNotBlank(password)) {
            mqttConnectOptions.setPassword(password.toCharArray());
        }
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    // 初始化 (发送消息部分
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(async);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setDefaultQos(qosLevel);
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }//发送通道 (发送消息部分


    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }//订阅通道  (订阅消息部分

    @Bean
    public MessageProducer channelInbound(MessageChannel mqttInputChannel, MqttPahoClientFactory factory) {//(订阅消息部分
       // String clientId = "h-backend-mqtt-in-" + System.currentTimeMillis();
       String topic = "";//填写要订阅的topic(主题)
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, factory, topic);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(0);
        adapter.setOutputChannel(mqttInputChannel);
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //处理接收消息
                mqttReceiveHandle.handle(message);
            }
        };
    }
}

MqttGateway

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
  //   payload/data:要发送的信息    topic:就是客户端要接收的topic  Qos: 0|1|2
    void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
    void sendToMqtt(String data);
    void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int Qoslevel);
    void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.RETAINED) boolean retained);//   @Header(MqttHeaders.RETAINED) boolean retained        //持久连接,好像是发送的消息会保留很久
}

使用,发送消息

import com.ruoyi.project.mqtt.MqttGateway;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
//这个注解用来定时任务执行remote方法..
@Component("AhApi")
public class AhApi {
    @Resource
    private MqttGateway mqttGateway;
    public void commonmethod(String topic,String payload,boolean retained) throws Exception{
        System.out.println("topic:"+topic);
        System.out.println("payload:"+payload);
        mqttGateway.sendToMqtt(payload,topic,retained);
        System.out.println("send finish");
    }
    public void remote() throws Exception{
        String data="data here";
        String topic="topic";
        commonmethod(topic,payload,true);
    }}

MqttReceiveHandle(订阅类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class MqttReceiveHandle {
    private  final Logger logger = LoggerFactory.getLogger(MqttReceiveHandle.class);
    public void handle(Message<?> message){
        logger.info("主题:{},QOS:{},消息接收到的数据:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getHeaders().get(MqttHeaders.RECEIVED_QOS), message.getPayload());
    //    log.info("主题:{},QOS:{},消息接收到的数据:{}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getHeaders().get(MqttHeaders.RECEIVED_QOS), message.getPayload());
    }
}

订阅部分的话、项目启动好像就在自动监听了?总之是可以收到消息的

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐