package com.zy.omp.config.mqtt; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.zy.omp.common.Constant; import com.zy.omp.pojo.dto.MqttMsgDto; import com.zy.omp.model.*; import com.zy.omp.service.*; import com.zy.omp.utils.*; import com.zy.omp.utils.CoordTransformUtil; import com.zy.omp.utils.GaoDeApiUtil; import com.zy.omp.websocket.WebSocketServer; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.LocalDateTime; /** * MQTT 消息返回处理类 * * @author yang xiao kun * create on 2021/1/19 */ @Slf4j @Component public class MqttCallbackHandler { @Resource private DeviceService deviceService; @Resource private DeviceInfoStateService deviceInfoStateService; @Resource private DeviceInfoRecordService deviceInfoRecordService; @Resource private DeviceSetService deviceSetService; @Resource private SetService setService; @Resource private WebSocketServer webSocketServer; @Resource private CallRecordsService callRecordsService; @Resource private MqttLogService mqttLogService; @Resource private RabbitMQApi rabbitMQApi; /** * 处理消息 * * @param topic 主题 * @param payload 消息内容 */ void handle(String topic, String payload) { //存储日志-接收 mqttLogService.saveReceiveLog(topic, payload); //消息返回JSON转Object MqttMsgDto msgDto = MqttMsgDto.parse(payload); switch (msgDto.getM()) { //设备端注册报文 case Constant.M_CODE_REGISTER: { doRegister(msgDto); break; } // 设备位置信息上报 case Constant.M_CODE_UPLOAD_LOCATION: { doUploadLocation(msgDto); break; } //设备上传通话记录 case Constant.M_CODE_UPLOAD_CALL_RECORD: { doUploadCallRecord(msgDto); break; } } //转发消息至网页,忽略注册的报文 if (!topic.equals(Constant.TOPIC_REGISTER_SERVER)) { webSocketServer.massMessage(msgDto.getOpenNum(), payload); } } /** * 注册设备 */ private void doRegister(MqttMsgDto obj) { String openNum = obj.getString("regnum"); //查询数据库中是否有此设备 Device device = deviceService.getOne(new QueryWrapper().eq("openNum", openNum)); //没有该设备,保存至数据库 if (device == null) { device = new Device(); device.setOpenNum(openNum); device.setClientId(Constant.DEVICE_ID_PREFIX + openNum); device.setPassword(MD5Util.MD5Encode(openNum)); deviceService.save(device); DeviceSet deviceSet = new DeviceSet(); deviceSet.setOpenNum(device.getOpenNum()); deviceSetService.save(deviceSet); } //mqtt服务器注册用户 rabbitMQApi.registerMqtt(device.getClientId(), device.getPassword()); //发送消息 setService.returnCode(device); } /** * 处理上传位置 */ private void doUploadLocation(MqttMsgDto msg) { try { DeviceInfoState state = new DeviceInfoState(); state.setOpenNum(msg.getOpenNum()); state.setBatteryNum(msg.getInt("batterynum")); state.setSignalNum(msg.getInt("signalnum")); state.setMode(msg.getInt("mode")); state.setLon(msg.getString("lon")); state.setLat(msg.getString("lat")); state.setSpeed(msg.getString("speed")); state.setNum(msg.getInt("num")); state.setUploadTime(msg.getDate("createtime")); //转换后的坐标 String[] gcj = CoordTransformUtil.wgs84toGcj02(state.getLon(), state.getLat()); state.setLonGcj(gcj[0]); state.setLatGcj(gcj[1]); // 逆地理位置解析 state.setSite(GaoDeApiUtil.regeo(state.getLonGcj(), state.getLatGcj())); // 保存设备 deviceInfoStateService.save(state); DeviceInfoRecord record = BeanUtil.cast(state, DeviceInfoRecord.class); deviceInfoRecordService.save(record); } catch (Exception e) { log.error("老人机上传位置信息异常", e); } } /** * 设备上传通话记录 */ private void doUploadCallRecord(MqttMsgDto obj) { CallRecords callRecords = new CallRecords(); callRecords.setOpenNum(obj.getOpenNum()); callRecords.setCallType(obj.getInt("type")); callRecords.setKeyNum(obj.getString("key")); callRecords.setPhoneNum(obj.getString("phnoenum")); callRecords.setTalkTime(obj.getInt("talktime")); callRecords.setEndTime(obj.getString("endtime")); callRecordsService.save(callRecords); } }