MqttCallbackHandler.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package com.zy.bms.config.mqtt;
  2. import com.alibaba.fastjson.JSON;
  3. import com.zy.bms.common.Constant;
  4. import com.zy.bms.common.exception.ApiRuntimeException;
  5. import com.zy.bms.model.*;
  6. import com.zy.bms.redis.DeviceManager;
  7. import com.zy.bms.service.*;
  8. import com.zy.bms.utils.*;
  9. import com.zy.bms.utils.CodeGenerator;
  10. import com.zy.bms.utils.CoordTransformUtil;
  11. import com.zy.bms.utils.GaoDeApiUtil;
  12. import com.zy.bms.websocket.WebSocketServer;
  13. import org.springframework.stereotype.Component;
  14. import javax.annotation.Resource;
  15. /**
  16. * MQTT 消息返回处理类
  17. *
  18. * @author yang xiao kun
  19. * create on 2021/1/19
  20. */
  21. @Component
  22. public class MqttCallbackHandler {
  23. @Resource
  24. private DeviceService deviceService;
  25. @Resource
  26. private SetBaseService setBaseService;
  27. @Resource
  28. private SetToDeviceService setToDeviceService;
  29. @Resource
  30. private DeviceLocationService deviceLocationService;
  31. @Resource
  32. private GaoDeApiUtil gaoDeApiService;
  33. @Resource
  34. private DeviceManager deviceRedisService;
  35. @Resource
  36. private WebSocketServer webSocketServer;
  37. @Resource
  38. private CallRecordsService callRecordsService;
  39. /**
  40. * 处理消息
  41. *
  42. * @param topic 主题
  43. * @param payload 消息内容
  44. */
  45. void handle(String topic, String payload) {
  46. System.out.println(payload);
  47. System.out.println("-------------------------------------------------------------------------");
  48. if (topic.contains("/")) {
  49. webSocketServer.massMessage(topic.split("/")[1], payload);
  50. }
  51. //消息返回JSON转Object
  52. MqttMsgDto msgObject = JSON.parseObject(payload, MqttMsgDto.class);
  53. //根据报文标识代码 M 处理消息
  54. switch (msgObject.getM()) {
  55. case Constant.M_CODE_REGISTER://设备端注册报文
  56. registerDevice(msgObject);
  57. break;
  58. case Constant.M_CODE_UPLOAD_LOCATION:// 设备位置信息上报
  59. LocationHandler(msgObject);
  60. break;
  61. case Constant.M_CODE_UPLOAD_CALL_RECORD://设备上传通话记录
  62. CallRecordsHandler(msgObject);
  63. break;
  64. }
  65. }
  66. /**
  67. * 注册设备
  68. */
  69. private void registerDevice(MqttMsgDto obj) {
  70. String num = obj.getString("num");
  71. //查询数据库中是否有此设备
  72. Device device = deviceService.getByNum(num);
  73. //没有该设备,保存至数据库
  74. if (device == null) {
  75. device = createDevice(num);
  76. }
  77. //发送消息
  78. setToDeviceService.returnCodeToDevice(device.getNum(), device.getClientId(), device.getPassword());
  79. }
  80. /**
  81. * 处理上传位置
  82. */
  83. private void LocationHandler(MqttMsgDto msg) {
  84. DeviceLocation location = MqttMsgToLocation(msg);
  85. // 逆地理位置解析
  86. location.setSite(gaoDeApiService.regeo(location.getLonGcj(), location.getLatGcj()));
  87. // 保存位置信息--历史记录
  88. deviceLocationService.save(location);
  89. // 将最新位置信息保存至redis中
  90. deviceRedisService.saveLocation(location);
  91. }
  92. /**
  93. * 设备上传通话记录
  94. */
  95. private void CallRecordsHandler(MqttMsgDto obj) {
  96. CallRecords callRecords = new CallRecords();
  97. callRecords.setDeviceId(obj.getR().split("/")[1]);
  98. callRecords.setCallType(obj.getInt("type"));
  99. callRecords.setKeyNum(obj.getString("key"));
  100. callRecords.setPhoneNum(obj.getString("phnoenum"));
  101. callRecords.setTalkTime(obj.getInt("talktime"));
  102. callRecords.setEndTime(obj.getString("endtime"));
  103. callRecordsService.save(callRecords);
  104. }
  105. /**
  106. * 创建设备信息,保存设备基础信息以及设置信息
  107. *
  108. * @param num 设备随机码
  109. */
  110. private Device createDevice(String num) {
  111. Device device = new Device();
  112. device.setNum(num);
  113. String md5 = MD5Util.MD5Encode(num);
  114. if (md5 == null) {
  115. md5 = CodeGenerator.generateUUID();
  116. }
  117. device.setClientId(md5);
  118. device.setPassword(md5.substring(10));
  119. device.setGroupId(1);
  120. deviceService.save(device);
  121. setBaseService.initSetBase(device.getClientId());
  122. return device;
  123. }
  124. /**
  125. * 解析信息转为位置对象
  126. */
  127. private DeviceLocation MqttMsgToLocation(MqttMsgDto obj) {
  128. try {
  129. DeviceLocation location = new DeviceLocation();
  130. location.setDeviceId(obj.getDeviceId());
  131. location.setBatteryNum(obj.getInt("batterynum"));
  132. location.setSignalNum(obj.getInt("signalnum"));
  133. location.setMode(obj.getInt("mode"));
  134. location.setLon(obj.getString("lon"));
  135. location.setLon(obj.getString("lat"));
  136. location.setSpeed(obj.getString("speed"));
  137. location.setNum(obj.getInt("num"));
  138. location.setUploadTime(obj.getDate("createtime"));
  139. Object cellInfo = obj.getObject("cellInfo");
  140. //该数据项可能为空
  141. if (cellInfo != null) location.setCellInfo(cellInfo.toString());
  142. //转换后的坐标
  143. String[] gcj = CoordTransformUtil.wgs84toGcj02(location.getLon(), location.getLat());
  144. location.setLonGcj(gcj[0]);
  145. location.setLatGcj(gcj[1]);
  146. return location;
  147. } catch (Exception e) {
  148. e.printStackTrace();
  149. throw new ApiRuntimeException("处理设备上传位置定位信息出错");
  150. }
  151. }
  152. }