MqttCallbackHandler.java 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package com.zy.omp.config.mqtt;
  2. import com.zy.omp.common.Constant;
  3. import com.zy.omp.pojo.dto.MqttMsgDto;
  4. import com.zy.omp.common.exception.ApiRuntimeException;
  5. import com.zy.omp.model.*;
  6. import com.zy.omp.service.*;
  7. import com.zy.omp.utils.*;
  8. import com.zy.omp.utils.CoordTransformUtil;
  9. import com.zy.omp.utils.GaoDeApiUtil;
  10. import com.zy.omp.websocket.WebSocketServer;
  11. import org.springframework.stereotype.Component;
  12. import javax.annotation.Resource;
  13. import java.math.BigDecimal;
  14. import java.time.LocalDateTime;
  15. /**
  16. * MQTT 消息返回处理类
  17. *
  18. * @author yang xiao kun
  19. * create on 2021/1/19
  20. */
  21. @Component
  22. public class MqttCallbackHandler {
  23. @Resource
  24. private DeviceService deviceService;
  25. @Resource
  26. private DeviceLpService deviceLpService;
  27. @Resource
  28. private SetBaseService setBaseService;
  29. @Resource
  30. private SetService setService;
  31. @Resource
  32. private LocationService deviceLocationService;
  33. @Resource
  34. private WebSocketServer webSocketServer;
  35. @Resource
  36. private CallRecordsService callRecordsService;
  37. @Resource
  38. private MqttLogService mqttLogService;
  39. @Resource
  40. private RabbitMQApi rabbitMQApi;
  41. /**
  42. * 处理消息
  43. *
  44. * @param topic 主题
  45. * @param payload 消息内容
  46. */
  47. void handle(String topic, String payload) {
  48. //存储日志-接收
  49. mqttLogService.saveLog(topic, payload, 0);
  50. //消息返回JSON转Object
  51. MqttMsgDto msgDto = MqttMsgDto.parse(payload);
  52. //路牌项目
  53. if (topic.startsWith("$regdtx2") || topic.startsWith("$dtxlp")) {
  54. switch (msgDto.getM()) {
  55. //设备端注册报文
  56. case Constant.M_CODE_REGISTER: {
  57. registerHandler_LP(msgDto);
  58. break;
  59. }
  60. // 设备上传定位信息
  61. case Constant.M_CODE_UPLOAD_LOCATION_INFO_LP: {
  62. uploadLocationInfoLPHandler(topic, msgDto);
  63. break;
  64. }
  65. // 设备上传基本信息
  66. case Constant.M_CODE_UPLOAD_INFO_LP: {
  67. uploadDeviceInfoLPHandler(topic, msgDto);
  68. break;
  69. }
  70. }
  71. }
  72. //老人机项目
  73. else {
  74. switch (msgDto.getM()) {
  75. //设备端注册报文
  76. case Constant.M_CODE_REGISTER: {
  77. registerHandler_OMP(msgDto);
  78. break;
  79. }
  80. // 设备位置信息上报
  81. case Constant.M_CODE_UPLOAD_LOCATION: {
  82. uploadLocationHandler(msgDto);
  83. break;
  84. }
  85. //设备上传通话记录
  86. case Constant.M_CODE_UPLOAD_CALL_RECORD: {
  87. uploadCallRecordsHandler(msgDto);
  88. break;
  89. }
  90. }
  91. //转发消息至网页,忽略注册的报文
  92. if (!topic.equals(Constant.TOPIC_REGISTER_SERVER)) {
  93. webSocketServer.massMessage(msgDto.getDeviceId(), payload);
  94. }
  95. }
  96. }
  97. /**
  98. * 注册设备
  99. * 老人机
  100. */
  101. private void registerHandler_OMP(MqttMsgDto obj) {
  102. String num = obj.getString("regnum");
  103. //查询数据库中是否有此设备
  104. Device device = deviceService.getByNum(num);
  105. //没有该设备,保存至数据库
  106. if (device == null) {
  107. device = new Device();
  108. device.setNum(num);
  109. device.setClientId(Constant.OLD_PHONE_DEVICE_ID_PREFIX + num);
  110. device.setPassword(MD5Util.MD5Encode(num));
  111. deviceService.save(device);
  112. SetBase setBase = new SetBase();
  113. setBase.setDeviceId(device.getClientId());
  114. setBaseService.save(setBase);
  115. }
  116. //mqtt服务器注册用户
  117. rabbitMQApi.registerMqtt(device.getClientId(), device.getPassword());
  118. //发送消息
  119. setService.returnCode_OMP(device);
  120. }
  121. /**
  122. * 注册设备
  123. * 路牌
  124. */
  125. private void registerHandler_LP(MqttMsgDto obj) {
  126. String num = obj.getString("regnum");
  127. //查询数据库中是否有此设备
  128. DeviceLp deviceLp = deviceLpService.getByNum(num);
  129. //没有该设备,保存至数据库
  130. if (deviceLp == null) {
  131. deviceLp = new DeviceLp();
  132. deviceLp.setNum(num);
  133. deviceLp.setClientId(Constant.LP_DEVICE_ID_PREFIX + num);
  134. deviceLp.setPassword(MD5Util.encodeCut(num));
  135. deviceLp.setGroup(obj.getString("group"));
  136. deviceLpService.save(deviceLp);
  137. }
  138. //mqtt服务器注册用户
  139. rabbitMQApi.registerMqtt(deviceLp.getClientId(), deviceLp.getPassword());
  140. //发送消息
  141. setService.returnCode_LP(deviceLp);
  142. }
  143. /**
  144. * 处理上传位置
  145. * 老人机
  146. */
  147. private void uploadLocationHandler(MqttMsgDto msg) {
  148. try {
  149. Location location = new Location();
  150. location.setDeviceId(msg.getDeviceId());
  151. location.setBatteryNum(msg.getInt("batterynum"));
  152. location.setSignalNum(msg.getInt("signalnum"));
  153. location.setMode(msg.getInt("mode"));
  154. location.setLon(msg.getString("lon"));
  155. location.setLat(msg.getString("lat"));
  156. location.setSpeed(msg.getString("speed"));
  157. location.setNum(msg.getInt("num"));
  158. location.setUploadTime(msg.getDate("createtime"));
  159. //转换后的坐标
  160. String[] gcj = CoordTransformUtil.wgs84toGcj02(location.getLon(), location.getLat());
  161. location.setLonGcj(gcj[0]);
  162. location.setLatGcj(gcj[1]);
  163. // 逆地理位置解析
  164. location.setSite(GaoDeApiUtil.regeo(location.getLonGcj(), location.getLatGcj()));
  165. // 保存位置信息
  166. deviceLocationService.saveLocation(location);
  167. } catch (Exception e) {
  168. e.printStackTrace();
  169. throw new ApiRuntimeException("处理设备上传位置定位信息出错");
  170. }
  171. }
  172. /**
  173. * 处理设备定位信息上传
  174. * 路牌
  175. */
  176. private void uploadLocationInfoLPHandler(String topic, MqttMsgDto msg) {
  177. try {
  178. String num = topic.split("IMEI")[1];
  179. DeviceLp deviceLp = new DeviceLp();
  180. deviceLp.setMode(msg.getInt("mode"));
  181. deviceLp.setLng(msg.getString("lng"));
  182. deviceLp.setLat(msg.getString("lat"));
  183. //转换后的坐标
  184. String[] gcj = CoordTransformUtil.wgs84toGcj02(deviceLp.getLng(), deviceLp.getLat());
  185. deviceLp.setLngGcj(gcj[0]);
  186. deviceLp.setLatGcj(gcj[1]);
  187. // 逆地理位置解析
  188. deviceLp.setSite(GaoDeApiUtil.regeo(deviceLp.getLngGcj(), deviceLp.getLatGcj()));
  189. // 保存基本信息
  190. deviceLpService.updateByNum(deviceLp, num);
  191. } catch (Exception e) {
  192. e.printStackTrace();
  193. throw new ApiRuntimeException("处理路牌设备上传定位信息出错");
  194. }
  195. }
  196. /**
  197. * 处理设备基本信息上传
  198. * 路牌
  199. */
  200. private void uploadDeviceInfoLPHandler(String topic, MqttMsgDto msg) {
  201. try {
  202. String num = topic.split("IMEI")[1];
  203. setService.getDeviceLocation_LP(num);
  204. //如果远程设备的信息与数据库的信息不一致,则发送消息更新远程信息
  205. DeviceLp dbDevice = deviceLpService.getByNum(num);
  206. System.out.println(dbDevice.getWakeInt());
  207. Double dbWakeInt = dbDevice.getWakeInt() == null ? null : dbDevice.getWakeInt().setScale(2, BigDecimal.ROUND_HALF_DOWN).doubleValue();
  208. Double wakeInt = msg.getDouble("sleept") == null ? null : new BigDecimal(msg.getDouble("sleept")).setScale(2, BigDecimal.ROUND_HALF_DOWN).doubleValue();
  209. if (dbWakeInt == null || !dbWakeInt.equals(wakeInt) || !dbDevice.getThresh().equals(msg.getInt("thresh"))) {
  210. setService.updateDevice_LP(num, dbDevice.getWakeInt(), dbDevice.getThresh());
  211. }
  212. DeviceLp deviceLp = new DeviceLp();
  213. deviceLp.setDataType(msg.getString("datetype"));
  214. deviceLp.setBattery(msg.getString("batterynum"));
  215. deviceLp.setS4g(msg.getString("s4g"));
  216. deviceLp.setX(msg.getString("x"));
  217. deviceLp.setY(msg.getString("y"));
  218. deviceLp.setZ(msg.getString("z"));
  219. deviceLp.setAnglex(msg.getString("anglex"));
  220. deviceLp.setAngley(msg.getString("angley"));
  221. deviceLp.setAnglez(msg.getString("anglez"));
  222. deviceLp.setTemp(msg.getString("temp"));
  223. // 保存基本信息
  224. deviceLpService.updateByNum(deviceLp, num);
  225. } catch (Exception e) {
  226. e.printStackTrace();
  227. throw new ApiRuntimeException("处理路牌设备上传基本信息出错");
  228. }
  229. }
  230. /**
  231. * 设备上传通话记录
  232. */
  233. private void uploadCallRecordsHandler(MqttMsgDto obj) {
  234. CallRecords callRecords = new CallRecords();
  235. callRecords.setDeviceId(obj.getDeviceId());
  236. callRecords.setCallType(obj.getInt("type"));
  237. callRecords.setKeyNum(obj.getString("key"));
  238. callRecords.setPhoneNum(obj.getString("phnoenum"));
  239. callRecords.setTalkTime(obj.getInt("talktime"));
  240. callRecords.setEndTime(obj.getString("endtime"));
  241. callRecordsService.save(callRecords);
  242. }
  243. }