package com.zy.bms.config.mqtt; import com.alibaba.fastjson.JSON; import com.zy.bms.common.Constant; import com.zy.bms.common.exception.ApiRuntimeException; import com.zy.bms.model.*; import com.zy.bms.redis.DeviceManager; import com.zy.bms.service.*; import com.zy.bms.utils.*; import com.zy.bms.utils.CodeGenerator; import com.zy.bms.utils.CoordTransformUtil; import com.zy.bms.utils.GaoDeApiUtil; import com.zy.bms.websocket.WebSocketServer; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * MQTT 消息返回处理类 * * @author yang xiao kun * create on 2021/1/19 */ @Component public class MqttCallbackHandler { @Resource private DeviceService deviceService; @Resource private SetBaseService setBaseService; @Resource private SetToDeviceService setToDeviceService; @Resource private DeviceLocationService deviceLocationService; @Resource private GaoDeApiUtil gaoDeApiService; @Resource private DeviceManager deviceRedisService; @Resource private WebSocketServer webSocketServer; @Resource private CallRecordsService callRecordsService; /** * 处理消息 * * @param topic 主题 * @param payload 消息内容 */ void handle(String topic, String payload) { System.out.println(payload); System.out.println("-------------------------------------------------------------------------"); if (topic.contains("/")) { webSocketServer.massMessage(topic.split("/")[1], payload); } //消息返回JSON转Object MqttMsgDto msgObject = JSON.parseObject(payload, MqttMsgDto.class); //根据报文标识代码 M 处理消息 switch (msgObject.getM()) { case Constant.M_CODE_REGISTER://设备端注册报文 registerDevice(msgObject); break; case Constant.M_CODE_UPLOAD_LOCATION:// 设备位置信息上报 LocationHandler(msgObject); break; case Constant.M_CODE_UPLOAD_CALL_RECORD://设备上传通话记录 CallRecordsHandler(msgObject); break; } } /** * 注册设备 */ private void registerDevice(MqttMsgDto obj) { String num = obj.getString("num"); //查询数据库中是否有此设备 Device device = deviceService.getByNum(num); //没有该设备,保存至数据库 if (device == null) { device = createDevice(num); } //发送消息 setToDeviceService.returnCodeToDevice(device.getNum(), device.getClientId(), device.getPassword()); } /** * 处理上传位置 */ private void LocationHandler(MqttMsgDto msg) { DeviceLocation location = MqttMsgToLocation(msg); // 逆地理位置解析 location.setSite(gaoDeApiService.regeo(location.getLonGcj(), location.getLatGcj())); // 保存位置信息--历史记录 deviceLocationService.save(location); // 将最新位置信息保存至redis中 deviceRedisService.saveLocation(location); } /** * 设备上传通话记录 */ private void CallRecordsHandler(MqttMsgDto obj) { CallRecords callRecords = new CallRecords(); callRecords.setDeviceId(obj.getR().split("/")[1]); callRecords.setCallType(obj.getInt("type")); callRecords.setKeyNum(obj.getString("key")); callRecords.setPhoneNum(obj.getString("phnoenum")); callRecords.setTalkTime(obj.getInt("talktime")); callRecords.setEndTime(obj.getString("endtime")); callRecordsService.save(callRecords); } /** * 创建设备信息,保存设备基础信息以及设置信息 * * @param num 设备随机码 */ private Device createDevice(String num) { Device device = new Device(); device.setNum(num); String md5 = MD5Util.MD5Encode(num); if (md5 == null) { md5 = CodeGenerator.generateUUID(); } device.setClientId(md5); device.setPassword(md5.substring(10)); device.setGroupId(1); deviceService.save(device); setBaseService.initSetBase(device.getClientId()); return device; } /** * 解析信息转为位置对象 */ private DeviceLocation MqttMsgToLocation(MqttMsgDto obj) { try { DeviceLocation location = new DeviceLocation(); location.setDeviceId(obj.getDeviceId()); location.setBatteryNum(obj.getInt("batterynum")); location.setSignalNum(obj.getInt("signalnum")); location.setMode(obj.getInt("mode")); location.setLon(obj.getString("lon")); location.setLon(obj.getString("lat")); location.setSpeed(obj.getString("speed")); location.setNum(obj.getInt("num")); location.setUploadTime(obj.getDate("createtime")); Object cellInfo = obj.getObject("cellInfo"); //该数据项可能为空 if (cellInfo != null) location.setCellInfo(cellInfo.toString()); //转换后的坐标 String[] gcj = CoordTransformUtil.wgs84toGcj02(location.getLon(), location.getLat()); location.setLonGcj(gcj[0]); location.setLatGcj(gcj[1]); return location; } catch (Exception e) { e.printStackTrace(); throw new ApiRuntimeException("处理设备上传位置定位信息出错"); } } }