Commit 8555f0b0 authored by 何金镒's avatar 何金镒

mcp与WebSocket冲突

parent 6a28ae97
...@@ -66,10 +66,10 @@ ...@@ -66,10 +66,10 @@
<!-- <artifactId>spring-boot-starter-web</artifactId>--> <!-- <artifactId>spring-boot-starter-web</artifactId>-->
<!-- </dependency>--> <!-- </dependency>-->
<!-- <dependency>--> <dependency>
<!-- <groupId>org.springframework.boot</groupId>--> <groupId>org.springframework.boot</groupId>
<!-- <artifactId>spring-boot-starter-webflux</artifactId>--> <artifactId>spring-boot-starter-webflux</artifactId>
<!-- </dependency>--> </dependency>
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
...@@ -134,10 +134,6 @@ ...@@ -134,10 +134,6 @@
<artifactId>spring-boot-starter-actuator</artifactId> <artifactId>spring-boot-starter-actuator</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
package com.ikonke.konkeaialibabamcp.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
...@@ -79,7 +79,12 @@ public class CDCController { ...@@ -79,7 +79,12 @@ public class CDCController {
token.setSn(sn); token.setSn(sn);
tokenService.updateById(token); tokenService.updateById(token);
} }
cdc_token = token.getAccessToken();
if(StrUtil.isBlank(token.getAccessToken())){
cdc_token = cdcHttpUtils.getCDCToken(ccuId, ownerId);
token.setCreateTime(LocalDateTime.now());
tokenService.updateById(token);
}
}else if(list.size() > 1){ }else if(list.size() > 1){
log.error("获取CDCToken失败,CDCToken数量大于1"); log.error("获取CDCToken失败,CDCToken数量大于1");
return null; return null;
......
...@@ -58,92 +58,92 @@ public class DifyController { ...@@ -58,92 +58,92 @@ public class DifyController {
} }
} }
/** // /**
* 思必驰 demo // * 思必驰 demo
* https://www.duiopen.com/docs/ct_dsk_protocol // * https://www.duiopen.com/docs/ct_dsk_protocol
* // *
*/ // */
@RequestMapping(value = "/demo", method = RequestMethod.POST, consumes = "application/json") // @RequestMapping(value = "/demo", method = RequestMethod.POST, consumes = "application/json")
@ResponseBody // @ResponseBody
public JSONObject demo(@RequestBody JSONObject jsonObject) { // public JSONObject demo(@RequestBody JSONObject jsonObject) {
Long timestamp = System.currentTimeMillis(); // Long timestamp = System.currentTimeMillis();
log.info("思必驰 demo..请求参数:{}",jsonObject.toString()); // log.info("思必驰 demo..请求参数:{}",jsonObject.toString());
JSONObject result = jsonObject.getJSONObject("request"); // JSONObject result = jsonObject.getJSONObject("request");
JSONObject context = jsonObject.getJSONObject("context"); // JSONObject context = jsonObject.getJSONObject("context");
JSONObject device = context.getJSONObject("device"); // JSONObject device = context.getJSONObject("device");
String deviceName = device.getStr("deviceName"); // String deviceName = device.getStr("deviceName");
//
String type = result.get("type").toString(); // String type = result.get("type").toString();
String input; //请求文本 // String input; //请求文本
String res = "{'version': '1.0','response': {'speak': {'type': 'text','text': '%s','ssml': 'SSML markup text string to speak'}},'shouldEndSession': %b}"; //忽略了部分DSK协议字段,shouldEndSession字段决定是否结束本轮对话 // String res = "{'version': '1.0','response': {'speak': {'type': 'text','text': '%s','ssml': 'SSML markup text string to speak'}},'shouldEndSession': %b}"; //忽略了部分DSK协议字段,shouldEndSession字段决定是否结束本轮对话
//
//
JSONObject response = JSONUtil.parseObj(String.format(res, "error", true)); // 响应报文 // JSONObject response = JSONUtil.parseObj(String.format(res, "error", true)); // 响应报文
JSONArray slots; // JSONArray slots;
String ai_result = ""; // String ai_result = "";
try { // try {
slots = result.getJSONArray("inputs"); // slots = result.getJSONArray("inputs");
input = slots.getJSONObject(0).get("input").toString(); // input = slots.getJSONObject(0).get("input").toString();
//
// if (type.equals("start")) { //唤醒 //// if (type.equals("start")) { //唤醒
// res = String.format(res, "已唤醒", false); //// res = String.format(res, "已唤醒", false);
//// log.info("新一轮对话开始...........");
////
//// response = JSONUtil.parseObj(res);
//// } else { //继续对话
//// if (input.equals("helloworld") || input.equals("你好世界") || input.equals("世界你好")) { //根据关键词返回内容
//// res = String.format(res, String.format("这是第%d次helloworld", 2), false);
//// } else if (input.equals("再见") || type.equals("end")) { //结束对话,退出说法可以根据产品自行设定
//// res = String.format(res, "结束对话", true);
//// } else {
//// res = String.format(res, "我听不懂你在说什么", false);
//// }
//// response = JSONUtil.parseObj(res);
//// }
// if (type.equals("start")) {
// log.info("新一轮对话开始..........."); // log.info("新一轮对话开始...........");
// }
//
// //根据userId获取到对应的主机号等信息
// QueryWrapper<CDCToken> wrapper = new QueryWrapper<>();
// wrapper.eq("aiSpeechUserId", deviceName);
// List<CDCToken> list = tokenService.list(wrapper);
// if(list.size() == 1){
// CDCToken token = list.get(0);
//
// String url = difyChatSteamMessages + "/dify/chatSteamMessages?query="+input;
// log.info("开始 dify 请求:{}",url);
// ai_result = HttpRequest.get(url)
// .header("ccuName", token.getCcuId() )
// .header("token", token.getAccessToken())
// .header("sn",token.getSn())
// .timeout(600000)
// .execute().body();
//
// log.info("dify请求返回:{}",ai_result);
//
// String result_end = ai_result.replaceAll("(?m)^\\s*$[\r\n]*", "").replaceAll("[\r\n]+", "\n").replaceAll("[\r\n]", "").trim();
// log.info("dify请求返回2--->:{}",result_end);
// //
// res = String.format(res, result_end, false);
// response = JSONUtil.parseObj(res);
// }else if(list.size() > 1){
// log.error("获取CDCToken失败,CDCToken数量大于1");
// res = String.format(res, "绑定异常", false);
// response = JSONUtil.parseObj(res);
// }else{
// res = String.format(res, "未绑定主机绑定", false);
// response = JSONUtil.parseObj(res); // response = JSONUtil.parseObj(res);
// } else { //继续对话
// if (input.equals("helloworld") || input.equals("你好世界") || input.equals("世界你好")) { //根据关键词返回内容
// res = String.format(res, String.format("这是第%d次helloworld", 2), false);
// } else if (input.equals("再见") || type.equals("end")) { //结束对话,退出说法可以根据产品自行设定
// res = String.format(res, "结束对话", true);
// } else {
// res = String.format(res, "我听不懂你在说什么", false);
// } // }
// } catch (Exception e) {
// res = String.format(res, "服务器异常,稍后重试.", false);
// response = JSONUtil.parseObj(res); // response = JSONUtil.parseObj(res);
// e.printStackTrace();
// } finally {
// log.info("思必驰 demo..响应时间="+(System.currentTimeMillis()-timestamp));
// return response;
// }
// } // }
if (type.equals("start")) {
log.info("新一轮对话开始...........");
}
//根据userId获取到对应的主机号等信息
QueryWrapper<CDCToken> wrapper = new QueryWrapper<>();
wrapper.eq("aiSpeechUserId", deviceName);
List<CDCToken> list = tokenService.list(wrapper);
if(list.size() == 1){
CDCToken token = list.get(0);
String url = difyChatSteamMessages + "/dify/chatSteamMessages?query="+input;
log.info("开始 dify 请求:{}",url);
ai_result = HttpRequest.get(url)
.header("ccuName", token.getCcuId() )
.header("token", token.getAccessToken())
.header("sn",token.getSn())
.timeout(600000)
.execute().body();
log.info("dify请求返回:{}",ai_result);
String result_end = ai_result.replaceAll("(?m)^\\s*$[\r\n]*", "").replaceAll("[\r\n]+", "\n").replaceAll("[\r\n]", "").trim();
log.info("dify请求返回2--->:{}",result_end);
res = String.format(res, result_end, false);
response = JSONUtil.parseObj(res);
}else if(list.size() > 1){
log.error("获取CDCToken失败,CDCToken数量大于1");
res = String.format(res, "绑定异常", false);
response = JSONUtil.parseObj(res);
}else{
res = String.format(res, "未绑定主机绑定", false);
response = JSONUtil.parseObj(res);
}
} catch (Exception e) {
res = String.format(res, "服务器异常,稍后重试.", false);
response = JSONUtil.parseObj(res);
e.printStackTrace();
} finally {
log.info("思必驰 demo..响应时间="+(System.currentTimeMillis()-timestamp));
return response;
}
}
@GetMapping("/chatMessages") @GetMapping("/chatMessages")
public String chatMessages(@RequestParam(name = "query") String query, public String chatMessages(@RequestParam(name = "query") String query,
......
...@@ -78,7 +78,7 @@ public class DifyControllerV2 { ...@@ -78,7 +78,7 @@ public class DifyControllerV2 {
@ResponseBody @ResponseBody
public JSONObject bottomSkill(@RequestBody JSONObject jsonObject) { public JSONObject bottomSkill(@RequestBody JSONObject jsonObject) {
Long timestamp = System.currentTimeMillis(); Long timestamp = System.currentTimeMillis();
log.info("思必驰 demo..请求参数:{}",jsonObject.toString()); log.info("思必驰 bottomSkill..请求参数:{}",jsonObject.toString());
JSONObject result = jsonObject.getJSONObject("request"); JSONObject result = jsonObject.getJSONObject("request");
JSONObject context = jsonObject.getJSONObject("context"); JSONObject context = jsonObject.getJSONObject("context");
JSONObject device = context.getJSONObject("device"); JSONObject device = context.getJSONObject("device");
...@@ -123,7 +123,7 @@ public class DifyControllerV2 { ...@@ -123,7 +123,7 @@ public class DifyControllerV2 {
response = JSONUtil.parseObj(res); response = JSONUtil.parseObj(res);
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
log.info("思必驰 demo..响应时间="+(System.currentTimeMillis()-timestamp)); log.info("思必驰 bottomSkill..响应时间="+(System.currentTimeMillis()-timestamp));
return response; return response;
} }
} }
......
...@@ -5,7 +5,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; ...@@ -5,7 +5,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ikonke.konkeaialibabamcp.entity.mysql.CDCToken; import com.ikonke.konkeaialibabamcp.entity.mysql.CDCToken;
import com.ikonke.konkeaialibabamcp.event.mode.DifyChatSteamEvent; import com.ikonke.konkeaialibabamcp.event.mode.DifyChatSteamEvent;
import com.ikonke.konkeaialibabamcp.service.mysqlservice.ICDCTokenService; import com.ikonke.konkeaialibabamcp.service.mysqlservice.ICDCTokenService;
import com.ikonke.konkeaialibabamcp.ws.MsgWebSocket; import com.ikonke.konkeaialibabamcp.utils.WebSocketUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
...@@ -24,16 +24,18 @@ public class DifyChatSteamEventListener { ...@@ -24,16 +24,18 @@ public class DifyChatSteamEventListener {
@Autowired @Autowired
private ICDCTokenService tokenService; private ICDCTokenService tokenService;
@Autowired
private WebSocketUtil webSocketUtil;
@Async @Async
@EventListener @EventListener
public void handleUserRegisteredEvent(DifyChatSteamEvent event) { public void handleUserRegisteredEvent(DifyChatSteamEvent event) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
log.info("处理dify AI 聊天:sn:{},input:{}", event.getSn(), event.getInput()); log.info("处理dify 异步 AI 聊天:sn:{},input:{}", event.getSn(), event.getInput());
try { try {
if (!MsgWebSocket.getWebSocket(event.getSn())) { if (!webSocketUtil.getWebSocket(event.getSn())) {
log.error("未找到sn:{}的WebSocket连接,不处理对话", event.getSn()); log.error("未找到sn:{}的WebSocket连接,不处理对话", event.getSn());
return; return;
} }
...@@ -58,9 +60,9 @@ public class DifyChatSteamEventListener { ...@@ -58,9 +60,9 @@ public class DifyChatSteamEventListener {
// log.info("dify请求返回2--->:{}",result_end); // log.info("dify请求返回2--->:{}",result_end);
// 通过WebSocket发送消息给屏端 // 通过WebSocket发送消息给屏端
MsgWebSocket.sendToSingle(event.getSn(), ai_result); webSocketUtil.sendToSingle(event.getSn(), ai_result);
log.info("dify请求耗时--->:{}",System.currentTimeMillis() - startTime); log.info("dify异步请求耗时--->:{}",System.currentTimeMillis() - startTime);
} catch (Exception e) { } catch (Exception e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
e.printStackTrace(); e.printStackTrace();
......
package com.ikonke.konkeaialibabamcp.utils;
import cn.hutool.http.HttpRequest;
import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class WebSocketUtil {
private final static String getWebSocketUrl = "http://127.0.0.1:9997/ws/getWebSocket?sn=";
private final static String sendToSingleUrl = "http://127.0.0.1:9997/ws/sendToSingle";
public boolean getWebSocket(String sn){
String url = getWebSocketUrl + sn;
String body = HttpRequest.get(url)
.execute().body();
return "true".equals(body);
}
public void sendToSingle(String sn,String message){
JSONObject param = new JSONObject();
param.set("sn", sn);
param.set("message", message);
HttpRequest.post(sendToSingleUrl)
.body(param.toString())
.execute().body();
}
}
package com.ikonke.konkeaialibabamcp.ws;
import lombok.Data;
@Data
public class MsgDTO {
private String sn;
private String msg;
private String msgType;
}
package com.ikonke.konkeaialibabamcp.ws;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Service
@ServerEndpoint("/web/socket/msg")
public class MsgWebSocket {
private static final ConcurrentHashMap<String,Session> sessions = new ConcurrentHashMap<>();
private static final AtomicInteger onlineCount = new AtomicInteger(0);
public static void sendToAll(String message) {
try {
log.info("===>APP群发:{}",message);
for (Session s : sessions.values()) {
if (s.isOpen()) {
// s.getAsyncRemote().sendText(message, result -> {
// if (!result.isOK()) {
// log.error("异步发送消息失败: {}", result.getException());
// }
// });
synchronized (s) {
s.getBasicRemote().sendText(message);
}
}
}
}catch (Exception e){
e.printStackTrace();
log.error("群发消息时出错", e);
}
}
public static void sendToSingle(String sn, String message) {
try {
Session session = sessions.get(sn);
if (session != null && session.isOpen()) {
synchronized (session) { // 添加同步锁
session.getBasicRemote().sendText(message);
}
}else{
log.warn("发送消息的连接不存在:sn={}",sn);
}
}catch (Exception e){
e.printStackTrace();
log.error("发送消息时出错", e);
}
}
public static boolean getWebSocket(String sn){
Session session = sessions.get(sn);
return session != null && session.isOpen();
}
/**
* 建立连接调用的方法
* /web/socket/msg?sn=00226DA86A12
* /web/socket/msg?sn=00226DA86A12&ccu=123
*/
@OnOpen
public void onOpen(Session session) {
String sn = null;
String ccu = null;
try {
log.info("===>APP新连接:session-{},online-count-{},url={}",session.getId(),onlineCount.get(),session.getRequestURI());
if (session.getRequestParameterMap().containsKey("sn")) {
sn = session.getRequestParameterMap().get("sn").get(0);
}
if (session.getRequestParameterMap().containsKey("ccu")) {
ccu = session.getRequestParameterMap().get("ccu").get(0);
}
if(getWebSocket(sn)){
log.error("连接被拒绝:sn已存在 - {}", sn);
session.getBasicRemote().sendText("SN already connected");
session.close();
return;
}
// 验证 sn 是否符合要求
if (!this.isValidSn(sn)) {
// 发送错误信息给客户端
session.getBasicRemote().sendText("Invalid SN parameter");
// 关闭连接
session.close();
log.warn("连接被拒绝:无效的SN - {}", sn);
}else{
// 加入Set中
sessions.put(sn,session);
// 在线数增加
onlineCount.getAndIncrement();
//校验登录权限
}
} catch (Exception e) {
log.error("建立连接时发生错误:SN={}",sn, e);
if (sn != null) {
sessions.remove(sn);
}
try {
if (session.isOpen()) {
session.close();
}
} catch (Exception closeException) {
log.error("关闭连接时发生错误", closeException);
}
}
}
/**
* 校验SN
*/
private boolean isValidSn(String sn) {
if(StrUtil.isBlank(sn)){
return false;
}
return true;
}
/**
* 客户端消息处理的方法
*/
@OnMessage
public void onMessage(Session sender,String message) throws Exception {
log.info("接收到消息:{}",message);
MsgDTO dto = JSONUtil.toBean(message, MsgDTO.class);
Session receiver = sessions.get(dto.getSn());
if (receiver != null) {
receiver.getBasicRemote().sendText(dto.getMsg());
}
}
/**
* 关闭连接调用的方法
*/
@OnClose
public void onClose(Session session) {
String sn = null;
try {
if (session.getRequestParameterMap().containsKey("sn")) {
sn = session.getRequestParameterMap().get("sn").get(0);
}
if (sn != null) {
// 从Set中删除
sessions.remove(sn);
// 在线数减少
int currentCount = onlineCount.decrementAndGet();
log.info("连接关闭: session-{}, SN={}, 当前在线数={}",
session.getId(), sn, currentCount);
} else {
log.warn("关闭未知连接: session-{}", session.getId());
}
} catch (Exception e) {
log.error("处理连接关闭时发生错误: session-{}", session.getId(), e);
}
}
/**
* 发生错误调用的方法
*/
@OnError
public void onError(Session session, Throwable throwable) {
String sn = null;
try {
log.error("WebSocket发生错误: session-{}", session.getId(), throwable);
// 获取SN信息
if (session.getRequestParameterMap().containsKey("sn")) {
sn = session.getRequestParameterMap().get("sn").get(0);
}
// 尝试发送错误信息给客户端(如果会话仍打开)
if (session.isOpen()) {
try {
session.getBasicRemote().sendText("Connection error occurred");
} catch (Exception sendException) {
log.error("发送错误信息失败: session-{}", session.getId(), sendException);
}
}
} catch (Exception e) {
log.error("处理错误时发生异常", e);
} finally {
// 确保资源清理和计数器更新
if (sn != null) {
sessions.remove(sn);
int currentCount = onlineCount.decrementAndGet();
log.info("因错误清理连接: SN={}, 当前在线数={}", sn, currentCount);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment