MqttCallbackHandler.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package com.zy.omp.config.mqtt;
  2. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  3. import com.zy.omp.common.Constant;
  4. import com.zy.omp.pojo.dto.MqttMsgDto;
  5. import com.zy.omp.model.*;
  6. import com.zy.omp.service.*;
  7. import com.zy.omp.utils.*;
  8. import com.zy.omp.utils.CoordTransformUtil;
  9. import com.zy.omp.utils.GaoDeApiUtil;
  10. import com.zy.omp.websocket.WebSocketServer;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.stereotype.Component;
  14. import javax.annotation.Resource;
  15. import java.time.LocalDateTime;
  16. /**
  17. * MQTT 消息返回处理类
  18. *
  19. * @author yang xiao kun
  20. * create on 2021/1/19
  21. */
  22. @Slf4j
  23. @Component
  24. public class MqttCallbackHandler {
  25. @Resource
  26. private DeviceService deviceService;
  27. @Resource
  28. private DeviceInfoStateService deviceInfoStateService;
  29. @Resource
  30. private DeviceInfoRecordService deviceInfoRecordService;
  31. @Resource
  32. private DeviceSetService deviceSetService;
  33. @Resource
  34. private SetService setService;
  35. @Resource
  36. private WebSocketServer webSocketServer;
  37. @Resource
  38. private CallRecordsService callRecordsService;
  39. @Resource
  40. private MqttLogService mqttLogService;
  41. @Resource
  42. private RabbitMQApi rabbitMQApi;
  43. /**
  44. * 处理消息
  45. *
  46. * @param topic 主题
  47. * @param payload 消息内容
  48. */
  49. void handle(String topic, String payload) {
  50. //存储日志-接收
  51. mqttLogService.saveReceiveLog(topic, payload);
  52. //消息返回JSON转Object
  53. MqttMsgDto msgDto = MqttMsgDto.parse(payload);
  54. switch (msgDto.getM()) {
  55. //设备端注册报文
  56. case Constant.M_CODE_REGISTER: {
  57. doRegister(msgDto);
  58. break;
  59. }
  60. // 设备位置信息上报
  61. case Constant.M_CODE_UPLOAD_LOCATION: {
  62. doUploadLocation(msgDto);
  63. break;
  64. }
  65. //设备上传通话记录
  66. case Constant.M_CODE_UPLOAD_CALL_RECORD: {
  67. doUploadCallRecord(msgDto);
  68. break;
  69. }
  70. }
  71. //转发消息至网页,忽略注册的报文
  72. if (!topic.equals(Constant.TOPIC_REGISTER_SERVER)) {
  73. webSocketServer.massMessage(msgDto.getOpenNum(), payload);
  74. }
  75. }
  76. /**
  77. * 注册设备
  78. */
  79. private void doRegister(MqttMsgDto obj) {
  80. String openNum = obj.getString("regnum");
  81. //查询数据库中是否有此设备
  82. Device device = deviceService.getOne(new QueryWrapper<Device>().eq("openNum", openNum));
  83. //没有该设备,保存至数据库
  84. if (device == null) {
  85. device = new Device();
  86. device.setOpenNum(openNum);
  87. device.setClientId(Constant.DEVICE_ID_PREFIX + openNum);
  88. device.setPassword(MD5Util.MD5Encode(openNum));
  89. deviceService.save(device);
  90. DeviceSet deviceSet = new DeviceSet();
  91. deviceSet.setOpenNum(device.getOpenNum());
  92. deviceSetService.save(deviceSet);
  93. }
  94. //mqtt服务器注册用户
  95. rabbitMQApi.registerMqtt(device.getClientId(), device.getPassword());
  96. //发送消息
  97. setService.returnCode(device);
  98. }
  99. /**
  100. * 处理上传位置
  101. */
  102. private void doUploadLocation(MqttMsgDto msg) {
  103. try {
  104. DeviceInfoState state = new DeviceInfoState();
  105. state.setOpenNum(msg.getOpenNum());
  106. state.setBatteryNum(msg.getInt("batterynum"));
  107. state.setSignalNum(msg.getInt("signalnum"));
  108. state.setMode(msg.getInt("mode"));
  109. state.setLon(msg.getString("lon"));
  110. state.setLat(msg.getString("lat"));
  111. state.setSpeed(msg.getString("speed"));
  112. state.setNum(msg.getInt("num"));
  113. state.setUploadTime(msg.getDate("createtime"));
  114. //转换后的坐标
  115. String[] gcj = CoordTransformUtil.wgs84toGcj02(state.getLon(), state.getLat());
  116. state.setLonGcj(gcj[0]);
  117. state.setLatGcj(gcj[1]);
  118. // 逆地理位置解析
  119. state.setSite(GaoDeApiUtil.regeo(state.getLonGcj(), state.getLatGcj()));
  120. // 保存设备
  121. deviceInfoStateService.save(state);
  122. DeviceInfoRecord record = BeanUtil.cast(state, DeviceInfoRecord.class);
  123. deviceInfoRecordService.save(record);
  124. } catch (Exception e) {
  125. log.error("老人机上传位置信息异常", e);
  126. }
  127. }
  128. /**
  129. * 设备上传通话记录
  130. */
  131. private void doUploadCallRecord(MqttMsgDto obj) {
  132. CallRecords callRecords = new CallRecords();
  133. callRecords.setOpenNum(obj.getOpenNum());
  134. callRecords.setCallType(obj.getInt("type"));
  135. callRecords.setKeyNum(obj.getString("key"));
  136. callRecords.setPhoneNum(obj.getString("phnoenum"));
  137. callRecords.setTalkTime(obj.getInt("talktime"));
  138. callRecords.setEndTime(obj.getString("endtime"));
  139. callRecordsService.save(callRecords);
  140. }
  141. }