123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- package com.zy.omp.config.mqtt;
- import com.zy.omp.common.Constant;
- import com.zy.omp.pojo.dto.MqttMsgDto;
- import com.zy.omp.common.exception.ApiRuntimeException;
- import com.zy.omp.model.*;
- import com.zy.omp.service.*;
- import com.zy.omp.utils.*;
- import com.zy.omp.utils.CoordTransformUtil;
- import com.zy.omp.utils.GaoDeApiUtil;
- import com.zy.omp.websocket.WebSocketServer;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- import java.math.BigDecimal;
- import java.time.LocalDateTime;
- /**
- * MQTT 消息返回处理类
- *
- * @author yang xiao kun
- * create on 2021/1/19
- */
- @Component
- public class MqttCallbackHandler {
- @Resource
- private DeviceService deviceService;
- @Resource
- private DeviceLpService deviceLpService;
- @Resource
- private SetBaseService setBaseService;
- @Resource
- private SetService setService;
- @Resource
- private LocationService deviceLocationService;
- @Resource
- private WebSocketServer webSocketServer;
- @Resource
- private CallRecordsService callRecordsService;
- @Resource
- private MqttLogService mqttLogService;
- @Resource
- private RabbitMQApi rabbitMQApi;
- /**
- * 处理消息
- *
- * @param topic 主题
- * @param payload 消息内容
- */
- void handle(String topic, String payload) {
- //存储日志-接收
- mqttLogService.saveLog(topic, payload, 0);
- //消息返回JSON转Object
- MqttMsgDto msgDto = MqttMsgDto.parse(payload);
- //路牌项目
- if (topic.startsWith("$regdtx2") || topic.startsWith("$dtxlp")) {
- switch (msgDto.getM()) {
- //设备端注册报文
- case Constant.M_CODE_REGISTER: {
- registerHandler_LP(msgDto);
- break;
- }
- // 设备上传定位信息
- case Constant.M_CODE_UPLOAD_LOCATION_INFO_LP: {
- uploadLocationInfoLPHandler(topic, msgDto);
- break;
- }
- // 设备上传基本信息
- case Constant.M_CODE_UPLOAD_INFO_LP: {
- uploadDeviceInfoLPHandler(topic, msgDto);
- break;
- }
- }
- }
- //老人机项目
- else {
- switch (msgDto.getM()) {
- //设备端注册报文
- case Constant.M_CODE_REGISTER: {
- registerHandler_OMP(msgDto);
- break;
- }
- // 设备位置信息上报
- case Constant.M_CODE_UPLOAD_LOCATION: {
- uploadLocationHandler(msgDto);
- break;
- }
- //设备上传通话记录
- case Constant.M_CODE_UPLOAD_CALL_RECORD: {
- uploadCallRecordsHandler(msgDto);
- break;
- }
- }
- //转发消息至网页,忽略注册的报文
- if (!topic.equals(Constant.TOPIC_REGISTER_SERVER)) {
- webSocketServer.massMessage(msgDto.getDeviceId(), payload);
- }
- }
- }
- /**
- * 注册设备
- * 老人机
- */
- private void registerHandler_OMP(MqttMsgDto obj) {
- String num = obj.getString("regnum");
- //查询数据库中是否有此设备
- Device device = deviceService.getByNum(num);
- //没有该设备,保存至数据库
- if (device == null) {
- device = new Device();
- device.setNum(num);
- device.setClientId(Constant.OLD_PHONE_DEVICE_ID_PREFIX + num);
- device.setPassword(MD5Util.MD5Encode(num));
- deviceService.save(device);
- SetBase setBase = new SetBase();
- setBase.setDeviceId(device.getClientId());
- setBaseService.save(setBase);
- }
- //mqtt服务器注册用户
- rabbitMQApi.registerMqtt(device.getClientId(), device.getPassword());
- //发送消息
- setService.returnCode_OMP(device);
- }
- /**
- * 注册设备
- * 路牌
- */
- private void registerHandler_LP(MqttMsgDto obj) {
- String num = obj.getString("regnum");
- //查询数据库中是否有此设备
- DeviceLp deviceLp = deviceLpService.getByNum(num);
- //没有该设备,保存至数据库
- if (deviceLp == null) {
- deviceLp = new DeviceLp();
- deviceLp.setNum(num);
- deviceLp.setClientId(Constant.LP_DEVICE_ID_PREFIX + num);
- deviceLp.setPassword(MD5Util.encodeCut(num));
- deviceLp.setGroup(obj.getString("group"));
- deviceLpService.save(deviceLp);
- }
- //mqtt服务器注册用户
- rabbitMQApi.registerMqtt(deviceLp.getClientId(), deviceLp.getPassword());
- //发送消息
- setService.returnCode_LP(deviceLp);
- }
- /**
- * 处理上传位置
- * 老人机
- */
- private void uploadLocationHandler(MqttMsgDto msg) {
- try {
- Location location = new Location();
- location.setDeviceId(msg.getDeviceId());
- location.setBatteryNum(msg.getInt("batterynum"));
- location.setSignalNum(msg.getInt("signalnum"));
- location.setMode(msg.getInt("mode"));
- location.setLon(msg.getString("lon"));
- location.setLat(msg.getString("lat"));
- location.setSpeed(msg.getString("speed"));
- location.setNum(msg.getInt("num"));
- location.setUploadTime(msg.getDate("createtime"));
- //转换后的坐标
- String[] gcj = CoordTransformUtil.wgs84toGcj02(location.getLon(), location.getLat());
- location.setLonGcj(gcj[0]);
- location.setLatGcj(gcj[1]);
- // 逆地理位置解析
- location.setSite(GaoDeApiUtil.regeo(location.getLonGcj(), location.getLatGcj()));
- // 保存位置信息
- deviceLocationService.saveLocation(location);
- } catch (Exception e) {
- e.printStackTrace();
- throw new ApiRuntimeException("处理设备上传位置定位信息出错");
- }
- }
- /**
- * 处理设备定位信息上传
- * 路牌
- */
- private void uploadLocationInfoLPHandler(String topic, MqttMsgDto msg) {
- try {
- String num = topic.split("IMEI")[1];
- DeviceLp deviceLp = new DeviceLp();
- deviceLp.setMode(msg.getInt("mode"));
- deviceLp.setLng(msg.getString("lng"));
- deviceLp.setLat(msg.getString("lat"));
- //转换后的坐标
- String[] gcj = CoordTransformUtil.wgs84toGcj02(deviceLp.getLng(), deviceLp.getLat());
- deviceLp.setLngGcj(gcj[0]);
- deviceLp.setLatGcj(gcj[1]);
- // 逆地理位置解析
- deviceLp.setSite(GaoDeApiUtil.regeo(deviceLp.getLngGcj(), deviceLp.getLatGcj()));
- // 保存基本信息
- deviceLpService.updateByNum(deviceLp, num);
- } catch (Exception e) {
- e.printStackTrace();
- throw new ApiRuntimeException("处理路牌设备上传定位信息出错");
- }
- }
- /**
- * 处理设备基本信息上传
- * 路牌
- */
- private void uploadDeviceInfoLPHandler(String topic, MqttMsgDto msg) {
- try {
- String num = topic.split("IMEI")[1];
- setService.getDeviceLocation_LP(num);
- //如果远程设备的信息与数据库的信息不一致,则发送消息更新远程信息
- DeviceLp dbDevice = deviceLpService.getByNum(num);
- System.out.println(dbDevice.getWakeInt());
- Double dbWakeInt = dbDevice.getWakeInt() == null ? null : dbDevice.getWakeInt().setScale(2, BigDecimal.ROUND_HALF_DOWN).doubleValue();
- Double wakeInt = msg.getDouble("sleept") == null ? null : new BigDecimal(msg.getDouble("sleept")).setScale(2, BigDecimal.ROUND_HALF_DOWN).doubleValue();
- if (dbWakeInt == null || !dbWakeInt.equals(wakeInt) || !dbDevice.getThresh().equals(msg.getInt("thresh"))) {
- setService.updateDevice_LP(num, dbDevice.getWakeInt(), dbDevice.getThresh());
- }
- DeviceLp deviceLp = new DeviceLp();
- deviceLp.setDataType(msg.getString("datetype"));
- deviceLp.setBattery(msg.getString("batterynum"));
- deviceLp.setS4g(msg.getString("s4g"));
- deviceLp.setX(msg.getString("x"));
- deviceLp.setY(msg.getString("y"));
- deviceLp.setZ(msg.getString("z"));
- deviceLp.setAnglex(msg.getString("anglex"));
- deviceLp.setAngley(msg.getString("angley"));
- deviceLp.setAnglez(msg.getString("anglez"));
- deviceLp.setTemp(msg.getString("temp"));
- // 保存基本信息
- deviceLpService.updateByNum(deviceLp, num);
- } catch (Exception e) {
- e.printStackTrace();
- throw new ApiRuntimeException("处理路牌设备上传基本信息出错");
- }
- }
- /**
- * 设备上传通话记录
- */
- private void uploadCallRecordsHandler(MqttMsgDto obj) {
- CallRecords callRecords = new CallRecords();
- callRecords.setDeviceId(obj.getDeviceId());
- callRecords.setCallType(obj.getInt("type"));
- callRecords.setKeyNum(obj.getString("key"));
- callRecords.setPhoneNum(obj.getString("phnoenum"));
- callRecords.setTalkTime(obj.getInt("talktime"));
- callRecords.setEndTime(obj.getString("endtime"));
- callRecordsService.save(callRecords);
- }
- }
|