Commit 189b0678 authored by 何金镒's avatar 何金镒

思必驰的兜底技能5:WebSocket搭建

parent f81c13d4
......@@ -134,6 +134,10 @@
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
<build>
......
package com.ikonke.konkeaialibabamcp.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
......@@ -92,6 +92,7 @@ public class CDCController {
token.setAccessToken(cdc_token);
token.setState(CDCToken.STATE_ENABLED);
token.setCreateTime(LocalDateTime.now());
token.setAiSpeechUserId(sn);
tokenService.save(token);
}
......
......@@ -72,8 +72,6 @@ public class DifyController {
JSONObject context = jsonObject.getJSONObject("context");
JSONObject device = context.getJSONObject("device");
String deviceName = device.getStr("deviceName");
JSONObject user = context.getJSONObject("user");
String userId = user.getStr("userId");
String type = result.get("type").toString();
String input; //请求文本
......@@ -108,7 +106,7 @@ public class DifyController {
//根据userId获取到对应的主机号等信息
QueryWrapper<CDCToken> wrapper = new QueryWrapper<>();
wrapper.eq("aiSpeechUserId", userId);
wrapper.eq("aiSpeechUserId", deviceName);
List<CDCToken> list = tokenService.list(wrapper);
if(list.size() == 1){
CDCToken token = list.get(0);
......
......@@ -68,35 +68,6 @@ public class DifyControllerV2 {
}
@GetMapping("/synAiSpeech")
public JSONObject synAiSpeech(@RequestParam(name = "aiSpeechUserId") String aiSpeechUserId,
@RequestParam(name = "sn") String sn){
log.info("synAiSpeech..同步AiSpeechUserId..aiSpeechUserId:{},sn:{}",aiSpeechUserId,sn);
Long timestamp = System.currentTimeMillis();
JSONObject result = new JSONObject();
QueryWrapper<CDCToken> wrapper = new QueryWrapper<>();
wrapper.eq("sn", sn);
List<CDCToken> list = tokenService.list(wrapper);
if(list.size() != 1){
result.set("msg", "操作失败,先获取建发的token");
result.set("code", 300);
}
CDCToken token = list.get(0);
if(StrUtil.isBlank(token.getAiSpeechUserId())){
token.setAiSpeechUserId(aiSpeechUserId);
tokenService.updateById(token);
}else{
if(!token.getAiSpeechUserId().equals(aiSpeechUserId)){
token.setAiSpeechUserId(aiSpeechUserId);
tokenService.updateById(token);
}
}
result.set("msg", "操作成功");
result.set("code", 200);
log.info("操作成功,time={}",System.currentTimeMillis() - timestamp);
return result;
}
/**
* 思必驰 demo
......@@ -111,9 +82,7 @@ public class DifyControllerV2 {
JSONObject result = jsonObject.getJSONObject("request");
JSONObject context = jsonObject.getJSONObject("context");
JSONObject device = context.getJSONObject("device");
String deviceName = device.getStr("deviceName");
JSONObject user = context.getJSONObject("user");
String userId = user.getStr("userId");
String deviceName = device.getStr("deviceName");//这是屏端的sn
String type = result.get("type").toString();
String input; //请求文本
......@@ -127,7 +96,7 @@ public class DifyControllerV2 {
//根据userId获取到对应的主机号等信息
QueryWrapper<CDCToken> wrapper = new QueryWrapper<>();
wrapper.eq("aiSpeechUserId", userId);
wrapper.eq("aiSpeechUserId", deviceName);
List<CDCToken> list = tokenService.list(wrapper);
if(list.size() == 1){
CDCToken token = list.get(0);
......
......@@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ikonke.konkeaialibabamcp.entity.mysql.CDCToken;
import com.ikonke.konkeaialibabamcp.event.mode.DifyChatSteamEvent;
import com.ikonke.konkeaialibabamcp.service.mysqlservice.ICDCTokenService;
import com.ikonke.konkeaialibabamcp.ws.MsgWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
......@@ -52,6 +53,8 @@ public class DifyChatSteamEventListener {
log.info("dify请求返回2--->:{}",result_end);
// 通过WebSocket发送消息给屏端
MsgWebSocket.sendToSingle(event.getSn(), result_end);
log.info("dify请求耗时--->:{}",System.currentTimeMillis() - startTime);
} catch (Exception e) {
Thread.currentThread().interrupt();
......
package com.ikonke.konkeaialibabamcp.ws;
import lombok.Data;
@Data
public class MsgDTO {
private String sn;
private String msg;
private String msgType;
}
package com.ikonke.konkeaialibabamcp.ws;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Service
@ServerEndpoint("/web/socket/msg")
public class MsgWebSocket {
private static final ConcurrentHashMap<String,Session> sessions = new ConcurrentHashMap<>();
private static final AtomicInteger onlineCount = new AtomicInteger(0);
public static void sendToAll(String message) {
try {
log.info("===>APP群发:{}",message);
for (Session s : sessions.values()) {
if (s.isOpen()) {
// s.getAsyncRemote().sendText(message, result -> {
// if (!result.isOK()) {
// log.error("异步发送消息失败: {}", result.getException());
// }
// });
synchronized (s) {
s.getBasicRemote().sendText(message);
}
}
}
}catch (Exception e){
e.printStackTrace();
log.error("群发消息时出错", e);
}
}
public static void sendToSingle(String sn, String message) {
try {
log.info("===>APP发送:sn={},message={}",sn,message);
Session session = sessions.get(sn);
if (session != null && session.isOpen()) {
synchronized (session) { // 添加同步锁
session.getBasicRemote().sendText(message);
}
}
}catch (Exception e){
e.printStackTrace();
log.error("发送消息时出错", e);
}
}
/**
* 建立连接调用的方法
* /web/socket/msg?sn=00226DA86A12
* /web/socket/msg?sn=00226DA86A12&ccu=123
*/
@OnOpen
public void onOpen(Session session) {
log.info("===>APP新连接:session-{},online-count-{},url={}",session.getId(),onlineCount.get(),session.getRequestURI());
String sn = null;
String ccu = null;
if (session.getRequestParameterMap().containsKey("sn")) {
sn = session.getRequestParameterMap().get("sn").get(0);
}
if (session.getRequestParameterMap().containsKey("ccu")) {
ccu = session.getRequestParameterMap().get("ccu").get(0);
}
// 验证 sn 是否符合要求
if (!this.isValidSn(sn)) {
try {
// 发送错误信息给客户端
session.getBasicRemote().sendText("Invalid SN parameter");
// 关闭连接
session.close();
log.warn("连接被拒绝:无效的SN - {}", sn);
} catch (Exception e) {
log.error("关闭无效连接时出错", e);
}
}else{
// 加入Set中
sessions.put(sn,session);
// 在线数增加
onlineCount.getAndIncrement();
//校验登录权限
}
}
/**
* 校验SN
*/
private boolean isValidSn(String sn) {
if(StrUtil.isBlank(sn)){
return false;
}
return true;
}
/**
* 客户端消息处理的方法
*/
@OnMessage
public void onMessage(Session sender,String message) throws Exception {
log.info("接收到消息:{}",message);
MsgDTO dto = JSONUtil.toBean(message, MsgDTO.class);
Session receiver = sessions.get(dto.getSn());
if (receiver != null) {
receiver.getBasicRemote().sendText(dto.getMsg());
}
}
/**
* 关闭连接调用的方法
*/
@OnClose
public void onClose(Session session) {
String sn = session.getRequestParameterMap().get("sn").get(0);
// 从Set中删除
sessions.remove(sn);
// 在线数减少
onlineCount.getAndDecrement();
log.info("session-{},down-line-count-{}",session.getId(),onlineCount.get());
}
/**
* 发生错误调用的方法
*/
@OnError
public void onError(Session session, Throwable throwable) throws Exception {
log.error("Web Stock Error", throwable);
session.getBasicRemote().sendText(throwable.getMessage());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment