MqttCallbackHandler.java 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package com.zy.omp.config.mqtt;
  2. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  3. import com.zy.omp.common.Constant;
  4. import com.zy.omp.pojo.dto.MqttMsgDto;
  5. import com.zy.omp.model.*;
  6. import com.zy.omp.service.*;
  7. import com.zy.omp.utils.*;
  8. import com.zy.omp.utils.CoordTransformUtil;
  9. import com.zy.omp.utils.GaoDeApiUtil;
  10. import com.zy.omp.websocket.WebSocketServer;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.stereotype.Component;
  14. import javax.annotation.Resource;
  15. import java.time.LocalDateTime;
  16. /**
  17. * MQTT 消息返回处理类
  18. *
  19. * @author yang xiao kun
  20. * create on 2021/1/19
  21. */
  22. @Slf4j
  23. @Component
  24. public class MqttCallbackHandler {
  25. @Resource
  26. private DeviceService deviceService;
  27. @Resource
  28. private DeviceInfoStateService deviceInfoStateService;
  29. @Resource
  30. private DeviceInfoRecordService deviceInfoRecordService;
  31. @Resource
  32. private DeviceSetService deviceSetService;
  33. @Resource
  34. private SetService setService;
  35. @Resource
  36. private CallRecordsService callRecordsService;
  37. @Resource
  38. private MqttLogService mqttLogService;
  39. @Resource
  40. private RabbitMQApi rabbitMQApi;
  41. /**
  42. * 处理消息
  43. *
  44. * @param topic 主题
  45. * @param payload 消息内容
  46. */
  47. void handle(String topic, String payload) {
  48. //存储日志-接收
  49. mqttLogService.saveReceiveLog(topic, payload);
  50. //消息返回JSON转Object
  51. MqttMsgDto msgDto = MqttMsgDto.parse(payload);
  52. switch (msgDto.getM()) {
  53. //设备端注册报文
  54. case Constant.M_CODE_REGISTER: {
  55. doRegister(msgDto);
  56. break;
  57. }
  58. // 设备位置信息上报
  59. case Constant.M_CODE_UPLOAD_LOCATION: {
  60. doUploadLocation(msgDto);
  61. break;
  62. }
  63. //设备上传通话记录
  64. case Constant.M_CODE_UPLOAD_CALL_RECORD: {
  65. doUploadCallRecord(msgDto);
  66. break;
  67. }
  68. }
  69. }
  70. /**
  71. * 注册设备
  72. */
  73. private void doRegister(MqttMsgDto obj) {
  74. String openNum = obj.getString("regnum");
  75. //查询数据库中是否有此设备
  76. Device device = deviceService.getOne(new QueryWrapper<Device>().eq("openNum", openNum));
  77. //没有该设备,保存至数据库
  78. if (device == null) {
  79. device = new Device();
  80. device.setOpenNum(openNum);
  81. device.setClientId(Constant.DEVICE_ID_PREFIX + openNum);
  82. device.setPassword(MD5Util.MD5Encode(openNum));
  83. deviceService.save(device);
  84. DeviceSet deviceSet = new DeviceSet();
  85. deviceSet.setOpenNum(device.getOpenNum());
  86. deviceSetService.save(deviceSet);
  87. }
  88. //mqtt服务器注册用户
  89. rabbitMQApi.registerMqtt(device.getClientId(), device.getPassword());
  90. //发送消息
  91. setService.returnCode(device);
  92. }
  93. /**
  94. * 处理上传位置
  95. */
  96. private void doUploadLocation(MqttMsgDto msg) {
  97. try {
  98. DeviceInfoState state = new DeviceInfoState();
  99. state.setOpenNum(msg.getOpenNum());
  100. state.setBatteryNum(msg.getInt("batterynum"));
  101. state.setSignalNum(msg.getInt("signalnum"));
  102. state.setMode(msg.getInt("mode"));
  103. state.setLon(msg.getString("lon"));
  104. state.setLat(msg.getString("lat"));
  105. state.setSpeed(msg.getString("speed"));
  106. state.setNum(msg.getInt("num"));
  107. state.setUploadTime(msg.getDate("createtime"));
  108. //转换后的坐标
  109. String[] gcj = CoordTransformUtil.wgs84toGcj02(state.getLon(), state.getLat());
  110. state.setLonGcj(gcj[0]);
  111. state.setLatGcj(gcj[1]);
  112. // 逆地理位置解析
  113. state.setSite(GaoDeApiUtil.regeo(state.getLonGcj(), state.getLatGcj()));
  114. // 保存设备
  115. deviceInfoStateService.saveOrUpdate(state);
  116. DeviceInfoRecord record = BeanUtil.cast(state, DeviceInfoRecord.class);
  117. deviceInfoRecordService.save(record);
  118. } catch (Exception e) {
  119. log.error("老人机上传位置信息异常", e);
  120. }
  121. }
  122. /**
  123. * 设备上传通话记录
  124. */
  125. private void doUploadCallRecord(MqttMsgDto obj) {
  126. CallRecords callRecords = new CallRecords();
  127. callRecords.setOpenNum(obj.getOpenNum());
  128. callRecords.setCallType(obj.getInt("type"));
  129. callRecords.setKeyNum(obj.getString("key"));
  130. callRecords.setPhoneNum(obj.getString("phnoenum"));
  131. callRecords.setTalkTime(obj.getInt("talktime"));
  132. callRecords.setEndTime(obj.getString("endtime"));
  133. callRecordsService.save(callRecords);
  134. }
  135. }