|
@@ -1,22 +1,14 @@
|
|
|
package com.zy.bms.service;
|
|
|
|
|
|
-import com.alibaba.fastjson.JSON;
|
|
|
import com.zy.bms.common.Constant;
|
|
|
import com.zy.bms.config.mqtt.MqttGateway;
|
|
|
import com.zy.bms.model.Device;
|
|
|
-import com.zy.bms.model.MqttLog;
|
|
|
import com.zy.bms.model.MqttMsgDto;
|
|
|
import com.zy.bms.common.io.wechat.*;
|
|
|
-import com.zy.bms.utils.HttpUtil;
|
|
|
-import org.springframework.http.HttpEntity;
|
|
|
-import org.springframework.http.HttpHeaders;
|
|
|
+import com.zy.bms.websocket.WebSocketServer;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
-import org.springframework.util.MultiValueMap;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
-import java.io.UnsupportedEncodingException;
|
|
|
-import java.net.URI;
|
|
|
-import java.net.URLEncoder;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
@@ -32,29 +24,25 @@ public class SetToDeviceService {
|
|
|
private MqttGateway mqttGateway;
|
|
|
@Resource
|
|
|
private MqttLogService mqttLogService;
|
|
|
+ @Resource
|
|
|
+ private WebSocketServer webSocketServer;
|
|
|
|
|
|
/**
|
|
|
* 发送消息
|
|
|
*
|
|
|
- * @param msg 消息
|
|
|
- * @param topic 主题
|
|
|
+ * @param payload 消息
|
|
|
+ * @param topic 主题
|
|
|
*/
|
|
|
- private void sendMsgToMqtt(String msg, String topic) {
|
|
|
- //存储记录
|
|
|
- MqttLog mqttLog = new MqttLog();
|
|
|
- mqttLog.setTopic(topic);
|
|
|
- mqttLog.setTag(1);
|
|
|
- //设备注册报文
|
|
|
- if (topic.equals(Constant.TOPIC_REGISTER_CLIENT)) {
|
|
|
- MqttMsgDto msgObject = JSON.parseObject(msg, MqttMsgDto.class);
|
|
|
- mqttLog.setDeviceId(msgObject.getString("regnum"));
|
|
|
- } else {
|
|
|
- mqttLog.setDeviceId(topic.split("/")[1]);
|
|
|
+ private void sendMsgToMqtt(String topic, String payload) {
|
|
|
+ // 发送消息
|
|
|
+ mqttGateway.sendMsgToMqtt(payload, topic);
|
|
|
+ // 存储消息日志
|
|
|
+ mqttLogService.saveLog(topic, payload, 1);
|
|
|
+ // 消息转发前端webSocket
|
|
|
+ if (!topic.equals(Constant.TOPIC_REGISTER_CLIENT)) {
|
|
|
+ String deviceId = topic.split("/")[1];
|
|
|
+ webSocketServer.massMessage(deviceId, payload);
|
|
|
}
|
|
|
- mqttLog.setContent(msg);
|
|
|
- mqttLogService.save(mqttLog);
|
|
|
- //发送消息
|
|
|
- mqttGateway.sendMsgToMqtt(msg, topic);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -69,7 +57,7 @@ public class SetToDeviceService {
|
|
|
msg.set("msgvol", io.getMsgVol())
|
|
|
.set("phonevol", io.getPhoneVol())
|
|
|
.set("msgtype", 1);
|
|
|
- sendMsgToMqtt(msg.toJson(), io.getTopic());
|
|
|
+ sendMsgToMqtt(io.getTopic(), msg.toJson());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -81,7 +69,7 @@ public class SetToDeviceService {
|
|
|
//发送模板消息给设备
|
|
|
MqttMsgDto msg = new MqttMsgDto(Constant.M_CODE_GPS_RATE, io.getDeviceId());
|
|
|
msg.set("GPS_rate", io.getGpsRate());
|
|
|
- sendMsgToMqtt(msg.toJson(), io.getTopic());
|
|
|
+ sendMsgToMqtt(io.getTopic(), msg.toJson());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -93,7 +81,7 @@ public class SetToDeviceService {
|
|
|
//发送模板消息给设备
|
|
|
MqttMsgDto msg = new MqttMsgDto(Constant.M_CODE_AUTO_ANSWER, io.getDeviceId());
|
|
|
msg.set("autoanswer", io.getAutoAnswer());
|
|
|
- sendMsgToMqtt(msg.toJson(), io.getTopic());
|
|
|
+ sendMsgToMqtt(io.getTopic(), msg.toJson());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -105,7 +93,7 @@ public class SetToDeviceService {
|
|
|
msg.set("newstype", io.getNewsType())
|
|
|
.set("newstime", io.getNewsTime())
|
|
|
.set("news", io.getNews());
|
|
|
- sendMsgToMqtt(msg.toJson(), io.getTopic());
|
|
|
+ sendMsgToMqtt(io.getTopic(), msg.toJson());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -120,7 +108,7 @@ public class SetToDeviceService {
|
|
|
.set("key1_phone", io.getKey1Phone())
|
|
|
.set("key2_name", io.getKey2Name())
|
|
|
.set("key2_phone", io.getKey2Phone());
|
|
|
- sendMsgToMqtt(msg.toJson(), io.getTopic());
|
|
|
+ sendMsgToMqtt(io.getTopic(), msg.toJson());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -130,7 +118,7 @@ public class SetToDeviceService {
|
|
|
//发送模板消息给设备
|
|
|
MqttMsgDto msg = new MqttMsgDto(Constant.M_CODE_CONTINUE_GPS, io.getDeviceId());
|
|
|
msg.set("highfreq", io.getHighFreq());
|
|
|
- sendMsgToMqtt(msg.toJson(), io.getTopic());
|
|
|
+ sendMsgToMqtt(io.getTopic(), msg.toJson());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -147,86 +135,6 @@ public class SetToDeviceService {
|
|
|
data.put("deviceid", device.getClientId());
|
|
|
data.put("password", device.getPassword());
|
|
|
msg.setData(data);
|
|
|
- registerMqtt(device.getClientId(),device.getPassword());
|
|
|
- sendMsgToMqtt(msg.toJson(), Constant.TOPIC_REGISTER_CLIENT);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 向 MQTT 服务器申请用户,供设备进行登录
|
|
|
- * <p>
|
|
|
- * 因为存在先后问题,需要先注册用户,在开通虚拟机权限,
|
|
|
- * 所以逻辑为
|
|
|
- * 发送注册用户请求
|
|
|
- * 尝试申请权限
|
|
|
- * 如果失败,则间隔一秒再次请求
|
|
|
- * 如果申请次数超过3次,则尝试重新注册
|
|
|
- * 重新注册后再次尝试申请
|
|
|
- * 如果申请五次都失败,则结束方法,记录日志
|
|
|
- *
|
|
|
- * @param clientId 设备登录ID
|
|
|
- * @param password 设备登录密码
|
|
|
- */
|
|
|
- public void registerMqtt(String clientId, String password) {
|
|
|
- addRabbitMqUser(clientId, password);
|
|
|
- int count = 0;
|
|
|
- while (!setPermission(clientId)) {
|
|
|
- try {
|
|
|
- Thread.sleep(1000);
|
|
|
- if (count == 2) {
|
|
|
- addRabbitMqUser(clientId, password);
|
|
|
- }
|
|
|
- if (count > 4) {
|
|
|
- break;
|
|
|
- }
|
|
|
- count++;
|
|
|
- } catch (InterruptedException e) {
|
|
|
- e.printStackTrace();
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 注册 RabbitMQ 用户
|
|
|
- *
|
|
|
- * @param clientId 用户名
|
|
|
- * @param password 用户密码
|
|
|
- */
|
|
|
- public void addRabbitMqUser(String clientId, String password) {
|
|
|
- String url = "http://view.ailishi.org:15672/api/users/" + clientId;
|
|
|
- //请求头
|
|
|
- HttpHeaders headers = new HttpHeaders();
|
|
|
- headers.add("authorization", "Basic bHEyMDE5OkxpUXVhblJhYmJpdA==");
|
|
|
- //请求参数
|
|
|
- Map<String, String> params = new HashMap<>();
|
|
|
- params.put("username", clientId);
|
|
|
- params.put("password", password);
|
|
|
- params.put("tags", "");
|
|
|
- HttpUtil.putForJSONEntity(url, headers, JSON.toJSONString(params));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 申请虚拟机权限
|
|
|
- *
|
|
|
- * @param clientId 用户ID
|
|
|
- */
|
|
|
- public boolean setPermission(String clientId) {
|
|
|
- String url = "http://view.ailishi.org:15672/api/permissions/%2F/" + clientId;
|
|
|
- //请求头
|
|
|
- HttpHeaders headers = new HttpHeaders();
|
|
|
- headers.add("authorization", "Basic bHEyMDE5OkxpUXVhblJhYmJpdA==");
|
|
|
- //请求参数
|
|
|
- Map<String, String> params = new HashMap<>();
|
|
|
- params.put("username", clientId);
|
|
|
- params.put("vhost", "/");
|
|
|
- params.put("configure", ".*");
|
|
|
- params.put("write", ".*");
|
|
|
- params.put("read", ".*");
|
|
|
- try {
|
|
|
- HttpUtil.putForJSONEntity(url, headers, JSON.toJSONString(params));
|
|
|
- } catch (Exception e) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- return true;
|
|
|
+ sendMsgToMqtt(Constant.TOPIC_REGISTER_CLIENT, msg.toJson());
|
|
|
}
|
|
|
}
|