Commit f81c13d4 authored by 何金镒's avatar 何金镒

思必驰的兜底技能4:重新梳理逻辑

parent 25e14727
...@@ -66,10 +66,10 @@ ...@@ -66,10 +66,10 @@
<!-- <artifactId>spring-boot-starter-web</artifactId>--> <!-- <artifactId>spring-boot-starter-web</artifactId>-->
<!-- </dependency>--> <!-- </dependency>-->
<dependency> <!-- <dependency>-->
<groupId>org.springframework.boot</groupId> <!-- <groupId>org.springframework.boot</groupId>-->
<artifactId>spring-boot-starter-webflux</artifactId> <!-- <artifactId>spring-boot-starter-webflux</artifactId>-->
</dependency> <!-- </dependency>-->
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
......
package com.ikonke.konkeaialibabamcp.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync // 开启异步支持
public class AsyncConfig {
// 配置线程池
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(5);
// 最大线程数
executor.setMaxPoolSize(10);
// 队列容量
executor.setQueueCapacity(25);
// 线程名称前缀
executor.setThreadNamePrefix("Async-");
// 当达到最大线程数时如何处理新任务
executor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
...@@ -13,4 +13,6 @@ public class RedisKeys { ...@@ -13,4 +13,6 @@ public class RedisKeys {
public static final String KONKE_DIFY_CONVERSATION_ID = "konke:dify:conversationid:"; public static final String KONKE_DIFY_CONVERSATION_ID = "konke:dify:conversationid:";
public static final String KONKE_DIFY_API_KEY = "konke:dify:api:key"; public static final String KONKE_DIFY_API_KEY = "konke:dify:api:key";
public static final String KONKE_DIFY_LAST_STEAM_CHAT_TASK_ID = "konke:dify:last:taskid:";
} }
...@@ -59,18 +59,40 @@ public class CDCController { ...@@ -59,18 +59,40 @@ public class CDCController {
List<CDCToken> list = tokenService.list(wrapper); List<CDCToken> list = tokenService.list(wrapper);
if(list.size() == 1){ if(list.size() == 1){
CDCToken token = list.get(0); CDCToken token = list.get(0);
// 判断CDCToken是否过期
LocalDateTime localDateTime = token.getCreateTime().plusDays(6);
if(localDateTime.isBefore(LocalDateTime.now())){
log.info("getCDCToken..CDCToken已过期..sn:{}",sn);
token.setState(CDCToken.STATE_DISABLED);
tokenService.updateById(token);
cdc_token = cdcHttpUtils.getCDCToken(ccuId, ownerId);
token.setAccessToken(cdc_token);
token.setState(CDCToken.STATE_ENABLED);
token.setCreateTime(LocalDateTime.now());
tokenService.updateById(token);
}
if(StrUtil.isNotBlank(token.getSn()) && !token.getSn().equalsIgnoreCase(sn)){ if(StrUtil.isNotBlank(token.getSn()) && !token.getSn().equalsIgnoreCase(sn)){
//更新sn //更新sn-sn不存在或者更换了sn
token.setSn(sn); token.setSn(sn);
tokenService.save(token); tokenService.updateById(token);
} }
cdc_token = token.getAccessToken(); cdc_token = token.getAccessToken();
}else if(list.size() > 1){ }else if(list.size() > 1){
log.error("获取CDCToken失败,CDCToken数量大于1"); log.error("获取CDCToken失败,CDCToken数量大于1");
return null; return null;
}else{ }else{
//去建发获取CDCToken //去建发获取CDCToken,有效期7天
cdc_token = cdcHttpUtils.getCDCToken(ccuId, ownerId); cdc_token = cdcHttpUtils.getCDCToken(ccuId, ownerId);
CDCToken token = new CDCToken();
token.setSn(sn);
token.setCcuId(ccuId);
token.setAccessToken(cdc_token);
token.setState(CDCToken.STATE_ENABLED);
token.setCreateTime(LocalDateTime.now());
tokenService.save(token);
} }
JSONObject result = new JSONObject(); JSONObject result = new JSONObject();
...@@ -109,7 +131,6 @@ public class CDCController { ...@@ -109,7 +131,6 @@ public class CDCController {
bySn.setAccessToken(token); bySn.setAccessToken(token);
bySn.setRefreshToken(bySn.getAccessToken()); bySn.setRefreshToken(bySn.getAccessToken());
bySn.setState(CDCToken.STATE_ENABLED); bySn.setState(CDCToken.STATE_ENABLED);
bySn.setCreateTime(LocalDateTime.now());
bySn.setCcuId(CcuUtils.getCcuName(ccuName)); bySn.setCcuId(CcuUtils.getCcuName(ccuName));
tokenService.updateById(bySn); tokenService.updateById(bySn);
} }
......
...@@ -114,6 +114,7 @@ public class DifyController { ...@@ -114,6 +114,7 @@ public class DifyController {
CDCToken token = list.get(0); CDCToken token = list.get(0);
String url = difyChatSteamMessages + "/dify/chatSteamMessages?query="+input; String url = difyChatSteamMessages + "/dify/chatSteamMessages?query="+input;
log.info("开始 dify 请求:{}",url);
ai_result = HttpRequest.get(url) ai_result = HttpRequest.get(url)
.header("ccuName", token.getCcuId() ) .header("ccuName", token.getCcuId() )
.header("token", token.getAccessToken()) .header("token", token.getAccessToken())
...@@ -274,6 +275,29 @@ public class DifyController { ...@@ -274,6 +275,29 @@ public class DifyController {
}); });
} }
/**
* 停止响应
* 仅支持流式模式。
*/
@GetMapping("/stopChatSteamMessages")
public String stopChatSteamMessages(@RequestHeader("sn") String sn){
log.info("结束 流式 对话, sn = {}",sn);
String redis_task_key = RedisKeys.KONKE_DIFY_LAST_STEAM_CHAT_TASK_ID+ sn;
Object redis_task_id =redisTemplate.opsForValue().get(redis_task_key);
if(redis_task_id!=null){
String url = "http://172.17.12.12:8088/chat-messages/"+redis_task_id+"/stop";
JSONObject body = new JSONObject();
body.set("user",sn);
String body1 = HttpRequest.post(url)
.header("Authorization", "Bearer " + dify_api_key)
.header("Content-Type", "application/json")
.body(body.toString())
.timeout(1000000)
.execute().body();
return body1;
}
return "{\"result\": \"error\"}";
}
@GetMapping("/chatSteamMessages") @GetMapping("/chatSteamMessages")
public Flux<String> chatSteamMessages(@RequestParam(name = "query") String query, public Flux<String> chatSteamMessages(@RequestParam(name = "query") String query,
@RequestHeader("ccuName") String ccuName, @RequestHeader("ccuName") String ccuName,
...@@ -334,8 +358,12 @@ public class DifyController { ...@@ -334,8 +358,12 @@ public class DifyController {
private String convertToCustomResponseAsync(DifyStreamResponse difyStreamResponse,String sn,String ccuName,String token,Object redis_conversationId) { private String convertToCustomResponseAsync(DifyStreamResponse difyStreamResponse,String sn,String ccuName,String token,Object redis_conversationId) {
if(redis_conversationId == null && StrUtil.isNotBlank(difyStreamResponse.getConversation_id())){//conversationId为null表示第一次对话 if(redis_conversationId == null && StrUtil.isNotBlank(difyStreamResponse.getConversation_id())){//conversationId为null表示第一次对话
String conversationId = difyStreamResponse.getConversation_id(); // 保存最后一次对话的task_id,后续可暂停对话
String redis_task_key = RedisKeys.KONKE_DIFY_LAST_STEAM_CHAT_TASK_ID+ sn;
redisTemplate.opsForValue().set(redis_task_key, difyStreamResponse.getTask_id());
// 保存conversationId
String conversationId = difyStreamResponse.getConversation_id();
String redis_key = RedisKeys.KONKE_DIFY_CONVERSATION_ID+ sn; String redis_key = RedisKeys.KONKE_DIFY_CONVERSATION_ID+ sn;
redisTemplate.opsForValue().set(redis_key, conversationId); redisTemplate.opsForValue().set(redis_key, conversationId);
......
package com.ikonke.konkeaialibabamcp.event.listener;
import cn.hutool.http.HttpRequest;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ikonke.konkeaialibabamcp.entity.mysql.CDCToken;
import com.ikonke.konkeaialibabamcp.event.mode.DifyChatSteamEvent;
import com.ikonke.konkeaialibabamcp.service.mysqlservice.ICDCTokenService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class DifyChatSteamEventListener {
@Value("${cdc.difyChatSteamMessages}")
private String difyChatSteamMessages;
@Autowired
private ICDCTokenService tokenService;
@Async
@EventListener
public void handleUserRegisteredEvent(DifyChatSteamEvent event) {
long startTime = System.currentTimeMillis();
log.info("处理dify AI 聊天:sn:{},input:{}", event.getSn(), event.getInput());
try {
QueryWrapper<CDCToken> wrapper = new QueryWrapper<>();
wrapper.eq("sn", event.getSn());
List<CDCToken> list = tokenService.list(wrapper);
CDCToken token = list.get(0);
String url = difyChatSteamMessages + "/dify/v2/chatSteamMessages?query="+event.getInput();
log.info("开始 dify 请求:{}",url);
String ai_result = HttpRequest.get(url)
.header("ccuName", token.getCcuId() )
.header("token", token.getAccessToken())
.header("sn",token.getSn())
.timeout(600000)
.execute().body();
log.info("dify请求返回:{}",ai_result);
String result_end = ai_result.replaceAll("(?m)^\\s*$[\r\n]*", "").replaceAll("[\r\n]+", "\n").replaceAll("[\r\n]", "").trim();
log.info("dify请求返回2--->:{}",result_end);
// 通过WebSocket发送消息给屏端
log.info("dify请求耗时--->:{}",System.currentTimeMillis() - startTime);
} catch (Exception e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
package com.ikonke.konkeaialibabamcp.event.mode;
import org.springframework.context.ApplicationEvent;
public class DifyChatSteamEvent extends ApplicationEvent {
private final String sn;
private final String input;
public DifyChatSteamEvent(Object source,String sn, String input) {
super(source);
this.sn = sn;
this.input = input;
}
public String getSn() {
return sn;
}
public String getInput() {
return input;
}
}
...@@ -5,6 +5,7 @@ import com.ikonke.konkeaialibabamcp.entity.mysql.CDCToken; ...@@ -5,6 +5,7 @@ import com.ikonke.konkeaialibabamcp.entity.mysql.CDCToken;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
@Mapper @Mapper
public interface CDCTokenMapper extends BaseMapper<CDCToken> { public interface CDCTokenMapper extends BaseMapper<CDCToken> {
...@@ -14,4 +15,7 @@ public interface CDCTokenMapper extends BaseMapper<CDCToken> { ...@@ -14,4 +15,7 @@ public interface CDCTokenMapper extends BaseMapper<CDCToken> {
@Select("SELECT * FROM cdc_token WHERE sn = #{sn} and `state` = #{state}") @Select("SELECT * FROM cdc_token WHERE sn = #{sn} and `state` = #{state}")
CDCToken findBySnAndState(@Param("sn") String sn, @Param("state") Integer state); CDCToken findBySnAndState(@Param("sn") String sn, @Param("state") Integer state);
@Update("UPDATE cdc_token SET accessToken = NULL WHERE DATE(createTime) < DATE_SUB(CURDATE(), INTERVAL #{num} DAY);")
int examineCDCToken(@Param("num") int num);
} }
...@@ -49,7 +49,8 @@ public class CdcHttpUtils { ...@@ -49,7 +49,8 @@ public class CdcHttpUtils {
.execute().body(); .execute().body();
JSONObject jsonObject = JSONUtil.parseObj(body1); JSONObject jsonObject = JSONUtil.parseObj(body1);
if(jsonObject.getInt("code") == 200){ if(jsonObject.getInt("code") == 200){
return jsonObject.getStr("token"); JSONObject jsonObject_data = jsonObject.getJSONObject("data");
return jsonObject_data.getStr("access_token");
} }
return null; return null;
} }
......
...@@ -7,4 +7,6 @@ public interface ICDCTokenService extends IService<CDCToken> { ...@@ -7,4 +7,6 @@ public interface ICDCTokenService extends IService<CDCToken> {
CDCToken findBySn(String sn); CDCToken findBySn(String sn);
CDCToken findBySn(String sn,Integer state); CDCToken findBySn(String sn,Integer state);
int examineCDCToken(int num);
} }
...@@ -16,9 +16,6 @@ import java.util.List; ...@@ -16,9 +16,6 @@ import java.util.List;
@Service @Service
public class ICDCTokenServiceImpl extends ServiceImpl<CDCTokenMapper, CDCToken> implements ICDCTokenService { public class ICDCTokenServiceImpl extends ServiceImpl<CDCTokenMapper, CDCToken> implements ICDCTokenService {
@Autowired
private CDCTokenMapper cdcTokenMapper;
@Override @Override
public CDCToken findBySn(String sn) { public CDCToken findBySn(String sn) {
return baseMapper.findBySn(sn); return baseMapper.findBySn(sn);
...@@ -29,5 +26,9 @@ public class ICDCTokenServiceImpl extends ServiceImpl<CDCTokenMapper, CDCToken> ...@@ -29,5 +26,9 @@ public class ICDCTokenServiceImpl extends ServiceImpl<CDCTokenMapper, CDCToken>
return baseMapper.findBySnAndState(sn, state); return baseMapper.findBySnAndState(sn, state);
} }
@Override
public int examineCDCToken(int num){
return baseMapper.examineCDCToken(num);
}
} }
package com.ikonke.konkeaialibabamcp.timedtasks; package com.ikonke.konkeaialibabamcp.timedtasks;
import com.ikonke.konkeaialibabamcp.entity.mysql.TemporaryScene; import com.ikonke.konkeaialibabamcp.entity.mysql.TemporaryScene;
import com.ikonke.konkeaialibabamcp.service.mysqlservice.ICDCTokenService;
import com.ikonke.konkeaialibabamcp.service.mysqlservice.ITemporarySceneService; import com.ikonke.konkeaialibabamcp.service.mysqlservice.ITemporarySceneService;
import com.ikonke.konkeaialibabamcp.utils.KonkeIotUtils; import com.ikonke.konkeaialibabamcp.utils.KonkeIotUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -21,9 +22,11 @@ public class TemporarySceneTask { ...@@ -21,9 +22,11 @@ public class TemporarySceneTask {
private ITemporarySceneService temporarySceneService; private ITemporarySceneService temporarySceneService;
@Autowired @Autowired
private KonkeIotUtils konkeIotServer; private KonkeIotUtils konkeIotServer;
@Autowired
private ICDCTokenService cdcTokenService;
/** /**
* 每5分钟再执行一次该定时器。 * 每5分钟、300秒 再执行一次该定时器。
*/ */
@Scheduled(fixedRate = 300000) @Scheduled(fixedRate = 300000)
public void task() { public void task() {
...@@ -40,4 +43,15 @@ public class TemporarySceneTask { ...@@ -40,4 +43,15 @@ public class TemporarySceneTask {
} }
/**
* 每 43200 秒 、 720 分钟 、 12 小时 执行一次该定时器。
*/
@Scheduled(fixedRate = 43200000)
public void examineCDCToken() {
log.info("开始定时任务:检查建发的token是否过期");
int i = cdcTokenService.examineCDCToken(6);
log.info("结束定时任务:检查建发的token是否过期"+i);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment