123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- package com.zy.bms.config.mqtt;
- import com.alibaba.fastjson.JSON;
- import com.zy.bms.common.Constant;
- import com.zy.bms.common.exception.ApiRuntimeException;
- import com.zy.bms.model.*;
- import com.zy.bms.redis.DeviceManager;
- import com.zy.bms.service.*;
- import com.zy.bms.utils.*;
- import com.zy.bms.utils.CodeGenerator;
- import com.zy.bms.utils.CoordTransformUtil;
- import com.zy.bms.utils.GaoDeApiUtil;
- import com.zy.bms.websocket.WebSocketServer;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- /**
- * MQTT 消息返回处理类
- *
- * @author yang xiao kun
- * create on 2021/1/19
- */
- @Component
- public class MqttCallbackHandler {
- @Resource
- private DeviceService deviceService;
- @Resource
- private SetBaseService setBaseService;
- @Resource
- private SetToDeviceService setToDeviceService;
- @Resource
- private DeviceLocationService deviceLocationService;
- @Resource
- private GaoDeApiUtil gaoDeApiService;
- @Resource
- private DeviceManager deviceRedisService;
- @Resource
- private WebSocketServer webSocketServer;
- @Resource
- private CallRecordsService callRecordsService;
- /**
- * 处理消息
- *
- * @param topic 主题
- * @param payload 消息内容
- */
- void handle(String topic, String payload) {
- System.out.println(payload);
- System.out.println("-------------------------------------------------------------------------");
- if (topic.contains("/")) {
- webSocketServer.massMessage(topic.split("/")[1], payload);
- }
- //消息返回JSON转Object
- MqttMsgDto msgObject = JSON.parseObject(payload, MqttMsgDto.class);
- //根据报文标识代码 M 处理消息
- switch (msgObject.getM()) {
- case Constant.M_CODE_REGISTER://设备端注册报文
- registerDevice(msgObject);
- break;
- case Constant.M_CODE_UPLOAD_LOCATION:// 设备位置信息上报
- LocationHandler(msgObject);
- break;
- case Constant.M_CODE_UPLOAD_CALL_RECORD://设备上传通话记录
- CallRecordsHandler(msgObject);
- break;
- }
- }
- /**
- * 注册设备
- */
- private void registerDevice(MqttMsgDto obj) {
- String num = obj.getString("num");
- //查询数据库中是否有此设备
- Device device = deviceService.getByNum(num);
- //没有该设备,保存至数据库
- if (device == null) {
- device = createDevice(num);
- }
- //发送消息
- setToDeviceService.returnCodeToDevice(device.getNum(), device.getClientId(), device.getPassword());
- }
- /**
- * 处理上传位置
- */
- private void LocationHandler(MqttMsgDto msg) {
- DeviceLocation location = MqttMsgToLocation(msg);
- // 逆地理位置解析
- location.setSite(gaoDeApiService.regeo(location.getLonGcj(), location.getLatGcj()));
- // 保存位置信息--历史记录
- deviceLocationService.save(location);
- // 将最新位置信息保存至redis中
- deviceRedisService.saveLocation(location);
- }
- /**
- * 设备上传通话记录
- */
- private void CallRecordsHandler(MqttMsgDto obj) {
- CallRecords callRecords = new CallRecords();
- callRecords.setDeviceId(obj.getR().split("/")[1]);
- callRecords.setCallType(obj.getInt("type"));
- callRecords.setKeyNum(obj.getString("key"));
- callRecords.setPhoneNum(obj.getString("phnoenum"));
- callRecords.setTalkTime(obj.getInt("talktime"));
- callRecords.setEndTime(obj.getString("endtime"));
- callRecordsService.save(callRecords);
- }
- /**
- * 创建设备信息,保存设备基础信息以及设置信息
- *
- * @param num 设备随机码
- */
- private Device createDevice(String num) {
- Device device = new Device();
- device.setNum(num);
- String md5 = MD5Util.MD5Encode(num);
- if (md5 == null) {
- md5 = CodeGenerator.generateUUID();
- }
- device.setClientId(md5);
- device.setPassword(md5.substring(10));
- device.setGroupId(1);
- deviceService.save(device);
- setBaseService.initSetBase(device.getClientId());
- return device;
- }
- /**
- * 解析信息转为位置对象
- */
- private DeviceLocation MqttMsgToLocation(MqttMsgDto obj) {
- try {
- DeviceLocation location = new DeviceLocation();
- location.setDeviceId(obj.getDeviceId());
- location.setBatteryNum(obj.getInt("batterynum"));
- location.setSignalNum(obj.getInt("signalnum"));
- location.setMode(obj.getInt("mode"));
- location.setLon(obj.getString("lon"));
- location.setLon(obj.getString("lat"));
- location.setSpeed(obj.getString("speed"));
- location.setNum(obj.getInt("num"));
- location.setUploadTime(obj.getDate("createtime"));
- Object cellInfo = obj.getObject("cellInfo");
- //该数据项可能为空
- if (cellInfo != null) location.setCellInfo(cellInfo.toString());
- //转换后的坐标
- String[] gcj = CoordTransformUtil.wgs84toGcj02(location.getLon(), location.getLat());
- location.setLonGcj(gcj[0]);
- location.setLatGcj(gcj[1]);
- return location;
- } catch (Exception e) {
- e.printStackTrace();
- throw new ApiRuntimeException("处理设备上传位置定位信息出错");
- }
- }
- }
|