package com.zy.omp.config.mqtt; import com.zy.omp.common.Constant; import com.zy.omp.pojo.dto.MqttMsgDto; import com.zy.omp.common.exception.ApiRuntimeException; import com.zy.omp.model.*; import com.zy.omp.service.*; import com.zy.omp.utils.*; import com.zy.omp.utils.CoordTransformUtil; import com.zy.omp.utils.GaoDeApiUtil; import com.zy.omp.websocket.WebSocketServer; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.math.BigDecimal; import java.time.LocalDateTime; /** * MQTT 消息返回处理类 * * @author yang xiao kun * create on 2021/1/19 */ @Component public class MqttCallbackHandler { @Resource private DeviceService deviceService; @Resource private DeviceLpService deviceLpService; @Resource private SetBaseService setBaseService; @Resource private SetService setService; @Resource private LocationService deviceLocationService; @Resource private WebSocketServer webSocketServer; @Resource private CallRecordsService callRecordsService; @Resource private MqttLogService mqttLogService; @Resource private RabbitMQApi rabbitMQApi; /** * 处理消息 * * @param topic 主题 * @param payload 消息内容 */ void handle(String topic, String payload) { //存储日志-接收 mqttLogService.saveLog(topic, payload, 0); //消息返回JSON转Object MqttMsgDto msgDto = MqttMsgDto.parse(payload); //路牌项目 if (topic.startsWith("$regdtx2") || topic.startsWith("$dtxlp")) { switch (msgDto.getM()) { //设备端注册报文 case Constant.M_CODE_REGISTER: { registerHandler_LP(msgDto); break; } // 设备上传定位信息 case Constant.M_CODE_UPLOAD_LOCATION_INFO_LP: { uploadLocationInfoLPHandler(topic, msgDto); break; } // 设备上传基本信息 case Constant.M_CODE_UPLOAD_INFO_LP: { uploadDeviceInfoLPHandler(topic, msgDto); break; } } } //老人机项目 else { switch (msgDto.getM()) { //设备端注册报文 case Constant.M_CODE_REGISTER: { registerHandler_OMP(msgDto); break; } // 设备位置信息上报 case Constant.M_CODE_UPLOAD_LOCATION: { uploadLocationHandler(msgDto); break; } //设备上传通话记录 case Constant.M_CODE_UPLOAD_CALL_RECORD: { uploadCallRecordsHandler(msgDto); break; } } //转发消息至网页,忽略注册的报文 if (!topic.equals(Constant.TOPIC_REGISTER_SERVER)) { webSocketServer.massMessage(msgDto.getDeviceId(), payload); } } } /** * 注册设备 * 老人机 */ private void registerHandler_OMP(MqttMsgDto obj) { String num = obj.getString("regnum"); //查询数据库中是否有此设备 Device device = deviceService.getByNum(num); //没有该设备,保存至数据库 if (device == null) { device = new Device(); device.setNum(num); device.setClientId(Constant.OLD_PHONE_DEVICE_ID_PREFIX + num); device.setPassword(MD5Util.MD5Encode(num)); deviceService.save(device); SetBase setBase = new SetBase(); setBase.setDeviceId(device.getClientId()); setBaseService.save(setBase); } //mqtt服务器注册用户 rabbitMQApi.registerMqtt(device.getClientId(), device.getPassword()); //发送消息 setService.returnCode_OMP(device); } /** * 注册设备 * 路牌 */ private void registerHandler_LP(MqttMsgDto obj) { String num = obj.getString("regnum"); //查询数据库中是否有此设备 DeviceLp deviceLp = deviceLpService.getByNum(num); //没有该设备,保存至数据库 if (deviceLp == null) { deviceLp = new DeviceLp(); deviceLp.setNum(num); deviceLp.setClientId(Constant.LP_DEVICE_ID_PREFIX + num); deviceLp.setPassword(MD5Util.encodeCut(num)); deviceLp.setGroup(obj.getString("group")); deviceLpService.save(deviceLp); } //mqtt服务器注册用户 rabbitMQApi.registerMqtt(deviceLp.getClientId(), deviceLp.getPassword()); //发送消息 setService.returnCode_LP(deviceLp); } /** * 处理上传位置 * 老人机 */ private void uploadLocationHandler(MqttMsgDto msg) { try { Location location = new Location(); location.setDeviceId(msg.getDeviceId()); location.setBatteryNum(msg.getInt("batterynum")); location.setSignalNum(msg.getInt("signalnum")); location.setMode(msg.getInt("mode")); location.setLon(msg.getString("lon")); location.setLat(msg.getString("lat")); location.setSpeed(msg.getString("speed")); location.setNum(msg.getInt("num")); location.setUploadTime(msg.getDate("createtime")); //转换后的坐标 String[] gcj = CoordTransformUtil.wgs84toGcj02(location.getLon(), location.getLat()); location.setLonGcj(gcj[0]); location.setLatGcj(gcj[1]); // 逆地理位置解析 location.setSite(GaoDeApiUtil.regeo(location.getLonGcj(), location.getLatGcj())); // 保存位置信息 deviceLocationService.saveLocation(location); } catch (Exception e) { e.printStackTrace(); throw new ApiRuntimeException("处理设备上传位置定位信息出错"); } } /** * 处理设备定位信息上传 * 路牌 */ private void uploadLocationInfoLPHandler(String topic, MqttMsgDto msg) { try { String num = topic.split("IMEI")[1]; DeviceLp deviceLp = new DeviceLp(); deviceLp.setMode(msg.getInt("mode")); deviceLp.setLon(msg.getString("lng")); deviceLp.setLat(msg.getString("lat")); //转换后的坐标 String[] gcj = CoordTransformUtil.wgs84toGcj02(deviceLp.getLon(), deviceLp.getLat()); deviceLp.setLonGcj(gcj[0]); deviceLp.setLatGcj(gcj[1]); // 逆地理位置解析 deviceLp.setSite(GaoDeApiUtil.regeo(deviceLp.getLonGcj(), deviceLp.getLatGcj())); // 保存基本信息 deviceLpService.updateByNum(deviceLp, num); } catch (Exception e) { e.printStackTrace(); throw new ApiRuntimeException("处理路牌设备上传定位信息出错"); } } /** * 处理设备基本信息上传 * 路牌 */ private void uploadDeviceInfoLPHandler(String topic, MqttMsgDto msg) { try { String num = topic.split("IMEI")[1]; setService.getDeviceLocation_LP(num); //如果远程设备的信息与数据库的信息不一致,则发送消息更新远程信息 DeviceLp dbDevice = deviceLpService.getByNum(num); Double dbWakeInt = dbDevice.getWakeInt() == null ? null : dbDevice.getWakeInt().setScale(2, BigDecimal.ROUND_HALF_DOWN).doubleValue(); Double wakeInt = msg.getDouble("sleept") == null ? null : BigDecimal.valueOf(msg.getDouble("sleept")).setScale(2, BigDecimal.ROUND_HALF_DOWN).doubleValue(); if (dbWakeInt == null || !dbWakeInt.equals(wakeInt) || !dbDevice.getThresh().equals(msg.getInt("thresh"))) { setService.updateDevice_LP(num, dbDevice.getWakeInt(), dbDevice.getThresh()); } DeviceLp deviceLp = new DeviceLp(); deviceLp.setDataType(msg.getString("datetype")); deviceLp.setBattery(msg.getString("batterynum")); deviceLp.setS4g(msg.getString("s4g")); deviceLp.setX(msg.getString("x")); deviceLp.setY(msg.getString("y")); deviceLp.setZ(msg.getString("z")); deviceLp.setAnglex(msg.getString("anglex")); deviceLp.setAngley(msg.getString("angley")); deviceLp.setAnglez(msg.getString("anglez")); deviceLp.setTemp(msg.getString("temp")); deviceLp.setUpdateTime(LocalDateTime.now()); // 保存基本信息 deviceLpService.updateByNum(deviceLp, num); } catch (Exception e) { e.printStackTrace(); throw new ApiRuntimeException("处理路牌设备上传基本信息出错"); } } /** * 设备上传通话记录 */ private void uploadCallRecordsHandler(MqttMsgDto obj) { CallRecords callRecords = new CallRecords(); callRecords.setDeviceId(obj.getDeviceId()); callRecords.setCallType(obj.getInt("type")); callRecords.setKeyNum(obj.getString("key")); callRecords.setPhoneNum(obj.getString("phnoenum")); callRecords.setTalkTime(obj.getInt("talktime")); callRecords.setEndTime(obj.getString("endtime")); callRecordsService.save(callRecords); } }