Commit 0894277f authored by 何金镒's avatar 何金镒

异常处理

parent 3c009632
...@@ -2,8 +2,6 @@ package com.ikonke.konkeaialibabamcp.controller; ...@@ -2,8 +2,6 @@ package com.ikonke.konkeaialibabamcp.controller;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONArray; import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
...@@ -18,19 +16,16 @@ import com.ikonke.konkeaialibabamcp.service.cdc.CdcHttpUtils; ...@@ -18,19 +16,16 @@ import com.ikonke.konkeaialibabamcp.service.cdc.CdcHttpUtils;
import com.ikonke.konkeaialibabamcp.service.mysqlservice.ICDCTokenService; import com.ikonke.konkeaialibabamcp.service.mysqlservice.ICDCTokenService;
import com.ikonke.konkeaialibabamcp.utils.CcuUtils; import com.ikonke.konkeaialibabamcp.utils.CcuUtils;
import com.ikonke.konkeaialibabamcp.utils.WebSocketUtil; import com.ikonke.konkeaialibabamcp.utils.WebSocketUtil;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
...@@ -155,9 +150,14 @@ public class DifyControllerV2 { ...@@ -155,9 +150,14 @@ public class DifyControllerV2 {
|| cleanedInput.equalsIgnoreCase("闭嘴") || cleanedInput.equalsIgnoreCase("不要说话")){ || cleanedInput.equalsIgnoreCase("闭嘴") || cleanedInput.equalsIgnoreCase("不要说话")){
res = String.format(res, "好的", true); res = String.format(res, "好的", true);
}else{ }else{
// 异步执行对话???? if (!webSocketUtil.getWebSocket(token.getSn())) {
eventPublisher.publishEvent(new DifyChatSteamEvent(this,token.getSn(), cleanedInput)); log.error("未找到sn:{}的WebSocket连接,不处理对话1", token.getSn());
res = String.format(res, "正在思考中", false); res = String.format(res, "网络异常,请检查网络", false);
}else{
// 异步执行对话
eventPublisher.publishEvent(new DifyChatSteamEvent(this,token.getSn(), cleanedInput));
res = String.format(res, "正在思考中", false);
}
} }
}else{ }else{
res = String.format(res, "未检查到输入", false); res = String.format(res, "未检查到输入", false);
...@@ -184,103 +184,106 @@ public class DifyControllerV2 { ...@@ -184,103 +184,106 @@ public class DifyControllerV2 {
@GetMapping("/chatSteamMessages") @GetMapping("/chatSteamMessages")
public Flux<String> chatSteamMessages(@RequestParam(name = "query") String query, public Flux<String> chatSteamMessages(@RequestParam(name = "query") String query,
@RequestHeader("sn") String sn) throws UnsupportedEncodingException { @RequestHeader("sn") String sn) {
log.info("【{}】:开始 流式 对话, query:{}",sn,query); try {
log.info("【{}】:开始 流式 对话, query:{}",sn,query);
QueryWrapper<CDCToken> wrapper = new QueryWrapper<>();
wrapper.eq("sn", sn);
List<CDCToken> list = tokenService.list(wrapper);
if(list.size() != 1){
return Flux.just("未绑定主机");
}
CDCToken token = list.get(0);
if(StrUtil.isBlank(token.getCcuId())){
return Flux.just("未绑定主机");
}
if(StrUtil.isBlank(token.getAccessToken())){
// 没有建发的token,有效期7天
String cdc_token = cdcHttpUtils.getCDCToken(token.getCcuId(), token.getCDCOwnerId());
token.setAccessToken(cdc_token);
token.setState(CDCToken.STATE_ENABLED);
token.setCreateTime(LocalDateTime.now());
tokenService.updateById(token);
}
if(StrUtil.isBlank(token.getAccessToken())){
log.error("未获取到建发的token,结束对话");
return Flux.just("未绑定社区账号");
}
UserEntity userEntity = cdcHttpUtils.getOwner(sn); QueryWrapper<CDCToken> wrapper = new QueryWrapper<>();
wrapper.eq("sn", sn);
List<CDCToken> list = tokenService.list(wrapper);
if(list.size() != 1){
return Flux.just("未绑定主机");
}
CDCToken token = list.get(0);
if(StrUtil.isBlank(token.getCcuId())){
return Flux.just("未绑定主机");
}
if(StrUtil.isBlank(token.getAccessToken())){
// 没有建发的token,有效期7天
String cdc_token = cdcHttpUtils.getCDCToken(token.getCcuId(), token.getCDCOwnerId());
token.setAccessToken(cdc_token);
token.setState(CDCToken.STATE_ENABLED);
token.setCreateTime(LocalDateTime.now());
tokenService.updateById(token);
}
if(StrUtil.isBlank(token.getAccessToken())){
log.error("未获取到建发的token,结束对话");
return Flux.just("未绑定社区账号");
}
//workflows/run UserEntity userEntity = cdcHttpUtils.getOwner(sn);
String url = "http://172.17.12.12:8088/v1/chat-messages";
JSONObject inputs = new JSONObject(); //workflows/run
inputs.set("Authorization",mcpAuthorization); String url = "http://172.17.12.12:8088/v1/chat-messages";
inputs.set("ccuName",token.getCcuId());
inputs.set("sn",sn); JSONObject inputs = new JSONObject();
inputs.set("token",token.getAccessToken()); inputs.set("Authorization",mcpAuthorization);
if(StrUtil.isNotBlank(userEntity.getCommunityCode())){ inputs.set("ccuName",token.getCcuId());
inputs.set("communityCode",userEntity.getCommunityCode()); inputs.set("sn",sn);
}else{ inputs.set("token",token.getAccessToken());
inputs.set("communityCode","communityCode"); if(StrUtil.isNotBlank(userEntity.getCommunityCode())){
} inputs.set("communityCode",userEntity.getCommunityCode());
inputs.set("communityId",userEntity.getCommunityId()); }else{
inputs.set("easId",userEntity.getEasId()); inputs.set("communityCode","communityCode");
inputs.set("roomId",userEntity.getRoomInfo().getRoomId());
// inputs.set("roomName",userEntity.getRoomInfo().getRoomName());
inputs.set("roomName",URLEncoder.encode(userEntity.getRoomInfo().getRoomName(), StandardCharsets.UTF_8.name()));
inputs.set("buildingId",userEntity.getRoomInfo().getBuildingId());
inputs.set("userPhone",userEntity.getPhoneNum());
// inputs.set("userName",userEntity.getPhoneNum());
inputs.set("userName",URLEncoder.encode(userEntity.getNickName(), StandardCharsets.UTF_8.name()));
inputs.set("today", DateUtil.today());
JSONObject body = new JSONObject();
body.set("inputs",inputs);
body.set("query",query);
body.set("response_mode","streaming");//blocking 阻塞模式,streaming 流式模式
body.set("user",sn);
String redis_key = RedisKeys.KONKE_DIFY_CONVERSATION_ID+ sn;
Object redis_conversationId =redisTemplate.opsForValue().get(redis_key);
String conversationId = null;
if(redis_conversationId!=null){
conversationId = redis_conversationId.toString();
}
CDCToken bySn = null;
if(StrUtil.isBlank(conversationId)){
bySn = tokenService.findBySn(sn);
if(bySn!=null && StrUtil.isNotBlank(bySn.getConversationId())){
conversationId = bySn.getConversationId();
body.set("conversation_id",conversationId);//一个user一个会话
} }
}else{ inputs.set("communityId",userEntity.getCommunityId());
body.set("conversation_id",conversationId); inputs.set("easId",userEntity.getEasId());
} inputs.set("roomId",userEntity.getRoomInfo().getRoomId());
inputs.set("roomName",URLEncoder.encode(userEntity.getRoomInfo().getRoomName(), StandardCharsets.UTF_8.name()));
inputs.set("buildingId",userEntity.getRoomInfo().getBuildingId());
inputs.set("userPhone",userEntity.getPhoneNum());
inputs.set("userName",URLEncoder.encode(userEntity.getNickName(), StandardCharsets.UTF_8.name()));
log.info("dify key:{} ,流式 对话 body:{}",dify_api_key,body); inputs.set("today", DateUtil.today());
JSONObject body = new JSONObject();
return webClient.post() body.set("inputs",inputs);
.uri(url) body.set("query",query);
.headers(httpHeaders -> { body.set("response_mode","streaming");//blocking 阻塞模式,streaming 流式模式
httpHeaders.setContentType(MediaType.APPLICATION_JSON); body.set("user",sn);
httpHeaders.setBearerAuth(dify_api_key);
})
.bodyValue(body) String redis_key = RedisKeys.KONKE_DIFY_CONVERSATION_ID+ sn;
.retrieve() Object redis_conversationId =redisTemplate.opsForValue().get(redis_key);
.bodyToFlux(DifyStreamResponse.class)//实体转换 String conversationId = null;
.timeout(Duration.ofMinutes(1))//设置超时时间1分钟 if(redis_conversationId!=null){
.filter(this::shouldInclude) // 过滤掉不需要的数据【根据需求增加】 conversationId = redis_conversationId.toString();
.map((DifyStreamResponse difyStreamResponse) -> convertToCustomResponseAsync(difyStreamResponse, sn, token.getCcuId(), token.getAccessToken(), redis_conversationId)) // 异步转换【如果返回格式自定义则通过异步转换实现】 }
.onErrorResume(throwable -> { CDCToken bySn = null;
log.info("异常输出:"+throwable.getMessage()); if(StrUtil.isBlank(conversationId)){
return null; bySn = tokenService.findBySn(sn);
}); if(bySn!=null && StrUtil.isNotBlank(bySn.getConversationId())){
conversationId = bySn.getConversationId();
body.set("conversation_id",conversationId);//一个user一个会话
}
}else{
body.set("conversation_id",conversationId);
}
log.info("dify key:{} ,流式 对话 body:{}",dify_api_key,body);
return webClient.post()
.uri(url)
.headers(httpHeaders -> {
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
httpHeaders.setBearerAuth(dify_api_key);
})
.bodyValue(body)
.retrieve()
.bodyToFlux(DifyStreamResponse.class)//实体转换
.timeout(Duration.ofMinutes(1))//设置超时时间1分钟
.filter(this::shouldInclude) // 过滤掉不需要的数据【根据需求增加】
.map((DifyStreamResponse difyStreamResponse) -> convertToCustomResponseAsync(difyStreamResponse, sn, token.getCcuId(), token.getAccessToken(), redis_conversationId)) // 异步转换【如果返回格式自定义则通过异步转换实现】
.onErrorResume(throwable -> {
log.info("异常输出:"+throwable.getMessage());
return null;
});
}catch (Exception e){
e.printStackTrace();
}
return Flux.just("服务器异常,稍后重试.");
} }
private String convertToCustomResponseAsync(DifyStreamResponse difyStreamResponse,String sn,String ccuName,String token,Object redis_conversationId) { private String convertToCustomResponseAsync(DifyStreamResponse difyStreamResponse,String sn,String ccuName,String token,Object redis_conversationId) {
......
...@@ -57,6 +57,14 @@ public class DifyChatSteamEventListener { ...@@ -57,6 +57,14 @@ public class DifyChatSteamEventListener {
.execute().body(); .execute().body();
log.info("dify请求返回11--->:{}",ai_result); log.info("dify请求返回11--->:{}",ai_result);
if(ai_result.contains("服务器异常")){
JSONObject json = new JSONObject();
json.set("msg", ai_result);
json.set("type", "other");
webSocketUtil.sendToSingle(event.getSn(), json.toString());
}
// ai_result = ai_result.replaceAll("\n", "##").trim(); // ai_result = ai_result.replaceAll("\n", "##").trim();
// log.info("dify请求返回22--->:{}",ai_result); // log.info("dify请求返回22--->:{}",ai_result);
// //
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment