MqttProducerCfg.java 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package com.zy.omp.config.mqtt;
  2. import com.zy.omp.utils.CodeGenerator;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.integration.annotation.ServiceActivator;
  7. import org.springframework.integration.channel.DirectChannel;
  8. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  9. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  10. import org.springframework.messaging.MessageChannel;
  11. import org.springframework.messaging.MessageHandler;
  12. /**
  13. * MQTT 生产者配置
  14. *
  15. * @author yang xiao kun
  16. * create on 2021/1/19
  17. */
  18. @Configuration
  19. public class MqttProducerCfg {
  20. @Autowired
  21. private MqttPahoClientFactory mqttClientFactory;
  22. /**
  23. * MQTT信息通道(生产者)
  24. */
  25. @Bean(name = "mqttOutboundChannel")
  26. public MessageChannel mqttOutboundChannel() {
  27. return new DirectChannel();
  28. }
  29. /**
  30. * MQTT消息处理器(生产者)
  31. */
  32. @Bean
  33. @ServiceActivator(inputChannel = "mqttOutboundChannel")
  34. public MessageHandler mqttOutbound() {
  35. MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(CodeGenerator.shortUuid() + "_Producer", mqttClientFactory);
  36. // 设置异步发送,默认是false(发送时阻塞)
  37. messageHandler.setAsync(true);
  38. // 设置默认的服务质量
  39. messageHandler.setDefaultQos(0);
  40. messageHandler.setDefaultRetained(false);
  41. return messageHandler;
  42. }
  43. }