Commit 5f0fb9ed authored by 何金镒's avatar 何金镒

dify 接口

parent 96dc50cb
......@@ -16,7 +16,7 @@
<properties>
<java.version>17</java.version>
<hutool.version>5.7.10</hutool.version>
<hutool.version>5.8.24</hutool.version>
<spring-ai-alibaba.version>1.0.0.2</spring-ai-alibaba.version>
<mysql.version>8.0.32</mysql.version>
<!-- Spring AI -->
......@@ -66,6 +66,11 @@
<!-- <artifactId>spring-boot-starter-web</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
......@@ -117,6 +122,12 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework</groupId>-->
<!-- <artifactId>spring-webflux</artifactId>-->
<!-- <version>6.1.14</version>-->
<!-- </dependency>-->
</dependencies>
<build>
......
package com.ikonke.konkeaialibabamcp.aitools;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class SceneRecommendTools {
private final RedisTemplate<String, Object> redisTemplate;
public SceneRecommendTools(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
//删除场景,会定时删除Redis中的临时场景
//保存场景,将Redis中的临时场景删除即可
}
//package com.ikonke.konkeaialibabamcp.aitools;
//
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.data.redis.core.RedisTemplate;
//import org.springframework.stereotype.Service;
//
//@Slf4j
//@Service
//public class SceneRecommendTools {
//
// private final RedisTemplate<String, Object> redisTemplate;
//
// public SceneRecommendTools(RedisTemplate<String, Object> redisTemplate) {
// this.redisTemplate = redisTemplate;
// }
//
//
// //删除场景,会定时删除Redis中的临时场景
//
// //保存场景,将Redis中的临时场景删除即可
//}
......@@ -20,15 +20,15 @@ public class RedisConfig {
return template;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
// container.setTaskExecutor(null); // 设置用于执行监听器方法的 Executor
// container.setErrorHandler(null); // 设置监听器方法执行过程中出现异常的处理器
// container.addMessageListener(null, null); // 手动设置监听器 & 监听的 topic 表达式
return container;
}
// @Bean
// public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
// RedisMessageListenerContainer container = new RedisMessageListenerContainer();
//
// container.setConnectionFactory(factory);
//
// // container.setTaskExecutor(null); // 设置用于执行监听器方法的 Executor
// // container.setErrorHandler(null); // 设置监听器方法执行过程中出现异常的处理器
// // container.addMessageListener(null, null); // 手动设置监听器 & 监听的 topic 表达式
// return container;
// }
}
package com.ikonke.konkeaialibabamcp.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisKeyExpireListener extends KeyExpirationEventMessageListener {
public RedisKeyExpireListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void doHandleMessage(Message message) {
// 过期的 key
byte[] body = message.getBody();
// 消息通道
byte[] channel = message.getChannel();
log.info("redis 过期 ------> message = {}, channel = {}", new String(body), new String(channel));
}
}
//package com.ikonke.konkeaialibabamcp.config;
//
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.data.redis.connection.Message;
//import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
//import org.springframework.data.redis.listener.RedisMessageListenerContainer;
//import org.springframework.stereotype.Component;
//
//@Slf4j
//@Component
//public class RedisKeyExpireListener extends KeyExpirationEventMessageListener {
//
// public RedisKeyExpireListener(RedisMessageListenerContainer listenerContainer) {
// super(listenerContainer);
// }
//
// @Override
// public void doHandleMessage(Message message) {
//
// // 过期的 key
// byte[] body = message.getBody();
//
// // 消息通道
// byte[] channel = message.getChannel();
//
// log.info("redis 过期 ------> message = {}, channel = {}", new String(body), new String(channel));
// }
//}
package com.ikonke.konkeaialibabamcp.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient(WebClient.Builder builder) {
return builder.baseUrl("https://example.com").build();
}
}
//package com.ikonke.konkeaialibabamcp.config;
//
//import org.springframework.context.annotation.Configuration;
//import org.springframework.web.reactive.config.CorsRegistry;
//
//import java.util.Arrays;
//import java.util.List;
//
//@Configuration
//public class WebConfig implements WebMvcConfigurer {
//
// static final List<String> ORIGIN_LIST = Arrays.asList(
// // 本地
// "http://localhost:8080",
// "http://127.0.0.1:8080",
// "http://localhost:8888",
// "http://127.0.0.1:8888",
// "http://localhost:8803",
// "http://127.0.0.1:8803"
// );
//
// @Override
// public void addCorsMappings(CorsRegistry registry) {
// // 配置全局跨域规则
// registry.addMapping("/**") // 允许所有路径的请求
// .allowedOrigins(ORIGIN_LIST.toArray(new String[0])) // 允许的源
// .allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS") // 允许的HTTP方法
// .allowedHeaders("Content-Type", "Authorization") // 允许的请求头
// .allowCredentials(true); // 是否允许发送Cookie等凭证信息
// }
//}
......@@ -8,4 +8,7 @@ public class RedisKeys {
public static final String ADD_TEMPORARY_SCENE = "add_temporary_scene";
// 临时场景的存活时间 5分钟
public static final int TEMPORARY_SCENE_TIME = 60 * 5;//TimeUnit.SECONDS
public static final String KONKE_DIFY_CONVERSATION_ID = "konke:dify:conversationid:";
}
......@@ -4,13 +4,21 @@ import cn.hutool.cache.Cache;
import cn.hutool.cache.CacheUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.ikonke.konkeaialibabamcp.constant.RedisKeys;
import com.ikonke.konkeaialibabamcp.controller.param.DifyStreamResponse;
import com.ikonke.konkeaialibabamcp.entity.mysql.CDCToken;
import com.ikonke.konkeaialibabamcp.service.mysqlservice.ICDCTokenService;
import com.ikonke.konkeaialibabamcp.utils.CcuUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.time.LocalDateTime;
......@@ -21,8 +29,13 @@ public class DifyController {
@Autowired
private ICDCTokenService tokenService;
@Autowired
private WebClient webClient;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private final static String dify_api_key = "app-YHXQcZVzokkdsrQ3mTOFhO9x";
public static Cache<String, String> conversation_id_cache = CacheUtil.newLRUCache(1024);
@GetMapping("/chatMessages")
public String chatMessages(@RequestParam(name = "query") String query,
......@@ -32,7 +45,6 @@ public class DifyController {
log.info("开始对话 sn:{},ccuName:{}, query:{}",sn,ccuName,query);
//workflows/run
String url = "http://172.17.12.12:8088/v1/chat-messages";
String api_key = "app-YHXQcZVzokkdsrQ3mTOFhO9x";
JSONObject inputs = new JSONObject();
inputs.set("ccuName",ccuName);
......@@ -45,7 +57,12 @@ public class DifyController {
body.set("response_mode","blocking");//blocking 阻塞模式,streaming 流式模式
body.set("user",sn);
String conversationId = conversation_id_cache.get(sn);
String redis_key = RedisKeys.KONKE_DIFY_CONVERSATION_ID+ sn;
Object redis_conversationId =redisTemplate.opsForValue().get(redis_key);
String conversationId = null;
if(redis_conversationId!=null){
conversationId = redis_conversationId.toString();
}
CDCToken bySn = null;
if(StrUtil.isBlank(conversationId)){
bySn = tokenService.findBySn(sn);
......@@ -53,21 +70,24 @@ public class DifyController {
conversationId = bySn.getConversationId();
body.set("conversation_id",conversationId);//一个user一个会话
}
}else{
body.set("conversation_id",conversationId);
}
log.info("dify 对话 body:{}",body);
String body1 = HttpRequest.post(url)
.header("Authorization", "Bearer " + api_key)
.header("Authorization", "Bearer " + dify_api_key)
.header("Content-Type", "application/json")
.body(body.toString())
.timeout(1000000)
.execute().body();
System.out.println(body1);
JSONObject jsonObject = new JSONObject(body1);
if(StrUtil.isBlank(conversationId)){//conversationId为null表示第一次对话
conversationId = jsonObject.getStr("conversation_id");
conversation_id_cache.put(sn, conversationId);
redisTemplate.opsForValue().set(redis_key, conversationId);
if(bySn!=null){
bySn.setConversationId(conversationId);
tokenService.updateById(bySn);
......@@ -83,8 +103,166 @@ public class DifyController {
}
}
System.out.println(jsonObject.getStr("answer"));
return jsonObject.getStr("answer");
}
@GetMapping("/chatSteamMessages")
public Flux<String> chatSteamMessages(@RequestParam(name = "query") String query,
@RequestHeader("ccuName") String ccuName,
@RequestHeader("sn") String sn,
@RequestHeader("token") String token){
log.info("开始 流式 对话 sn:{},ccuName:{}, query:{}",sn,ccuName,query);
//workflows/run
String url = "http://172.17.12.12:8088/v1/chat-messages";
JSONObject inputs = new JSONObject();
inputs.set("ccuName",ccuName);
inputs.set("sn",sn);
inputs.set("token",token);
JSONObject body = new JSONObject();
body.set("inputs",inputs);
body.set("query",query);
body.set("response_mode","streaming");//blocking 阻塞模式,streaming 流式模式
body.set("user",sn);
String redis_key = RedisKeys.KONKE_DIFY_CONVERSATION_ID+ sn;
Object redis_conversationId =redisTemplate.opsForValue().get(redis_key);
String conversationId = null;
if(redis_conversationId!=null){
conversationId = redis_conversationId.toString();
}
CDCToken bySn = null;
if(StrUtil.isBlank(conversationId)){
bySn = tokenService.findBySn(sn);
if(bySn!=null && StrUtil.isNotBlank(bySn.getConversationId())){
conversationId = bySn.getConversationId();
body.set("conversation_id",conversationId);//一个user一个会话
}
}else{
body.set("conversation_id",conversationId);
}
log.info("dify 对话 body:{}",body);
String finalConversationId = conversationId;
return webClient.post()
.uri(url)
.headers(httpHeaders -> {
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
httpHeaders.setBearerAuth(dify_api_key);
})
.bodyValue(body)
.retrieve()
.bodyToFlux(DifyStreamResponse.class)//实体转换
.filter(this::shouldInclude) // 过滤掉不需要的数据【根据需求增加】
.map((DifyStreamResponse difyStreamResponse) -> convertToCustomResponseAsync(difyStreamResponse, sn, ccuName, token, finalConversationId)) // 异步转换【如果返回格式自定义则通过异步转换实现】
.onErrorResume(throwable -> {
log.info("异常输出:"+throwable.getMessage());
return null;
});
}
private String convertToCustomResponseAsync(DifyStreamResponse difyStreamResponse,String sn,String ccuName,String token,String conversationId) {
System.out.println("---->"+difyStreamResponse.getConversation_id());
if(StrUtil.isBlank(conversationId) && StrUtil.isNotBlank(difyStreamResponse.getConversation_id())){//conversationId为null表示第一次对话
conversationId = difyStreamResponse.getConversation_id();
String redis_key = RedisKeys.KONKE_DIFY_CONVERSATION_ID+ sn;
redisTemplate.opsForValue().set(redis_key, conversationId);
CDCToken bySn = tokenService.findBySn(sn);
if(bySn!=null){
bySn.setConversationId(conversationId);
tokenService.updateById(bySn);
}else{
bySn = new CDCToken();
bySn.setSn(sn);
bySn.setCcuId(CcuUtils.getCcuName(ccuName));
bySn.setAccessToken(token);
bySn.setCreateTime(LocalDateTime.now());
bySn.setState(CDCToken.STATE_ENABLED);
bySn.setConversationId(conversationId);
tokenService.save(bySn);
}
}
String answer = difyStreamResponse.getAnswer();
if(StrUtil.isBlank(answer)){
return "";
}else{
return answer;
}
}
private boolean shouldInclude(DifyStreamResponse streamResponse) {
// 示例:只要message节点的数据和message_end节点的数据
if (streamResponse.getEvent().equals("message")
|| streamResponse.getEvent().equals("message_end")) {
return true;
}
return false;
}
@GetMapping("/getMessages")
public JSONObject getMessages(@RequestParam(name = "limit") Integer limit,
@RequestHeader("ccuName") String ccuName,
@RequestHeader("sn") String sn,
@RequestHeader("token") String token) {
log.info("获取对话历史消息 sn:{},ccuName:{}, limit:{}", sn, ccuName, limit);
String redis_key = RedisKeys.KONKE_DIFY_CONVERSATION_ID+ sn;
Object redis_conversationId =redisTemplate.opsForValue().get(redis_key);
String conversationId = null;
if(redis_conversationId!=null){
conversationId = redis_conversationId.toString();
}
if(StrUtil.isBlank(conversationId)){
CDCToken bySn = tokenService.findBySn(sn);
if(bySn!=null && StrUtil.isNotBlank(bySn.getConversationId())){
conversationId = bySn.getConversationId();
}
}
if(StrUtil.isBlank(conversationId)){
return new JSONObject();
}
if(limit==null || limit <= 0){
limit = 20;
}
String url = "http://172.17.12.12:8088/v1/messages?user="+sn+"&conversation_id="+conversationId+"&limit="+limit;
log.info("dify 获取对话历史消息 url:{}",url);
String body1 = HttpRequest.get(url)
.header("Authorization", "Bearer " + dify_api_key)
.header("Content-Type", "application/json")
.timeout(1000000)
.execute().body();
JSONObject response = new JSONObject();
JSONArray response_data = new JSONArray();
JSONObject jsonObject = JSONUtil.parseObj(body1);
Boolean has_more = jsonObject.getBool("has_more");
if(jsonObject.containsKey("data") && jsonObject.get("data")!=null){
JSONArray data = jsonObject.getJSONArray("data");
for (int i = 0; i < data.size(); i++){
JSONObject item = data.getJSONObject(i);
String query = item.getStr("query");
String answer = item.getStr("answer");
JSONObject response_item = new JSONObject();
response_item.set("query",query);
response_item.set("answer",answer);
response_item.set("created_at",item.get("created_at"));
response_data.set(response_item);
}
}
response.set("has_more",has_more);
response.set("data",response_data);
return response;
}
}
package com.ikonke.konkeaialibabamcp.controller.param;
import lombok.Data;
@Data
public class DifyOutputsData {
private String answer;
}
package com.ikonke.konkeaialibabamcp.controller.param;
import lombok.Data;
import java.io.Serializable;
@Data
public class DifyStreamResponse implements Serializable {
/**
* 不同模式下的事件类型.
*/
private String event;
/**
* agent_thought id.
*/
private String id;
/**
* 任务ID.
*/
private String task_id;
/**
* 消息唯一ID.
*/
private String message_id;
/**
* LLM 返回文本块内容.
*/
private String answer;
/**
* 创建时间戳.
*/
private Long created_at;
/**
* 会话 ID.
*/
private String conversation_id;
private DifyStreamResponseData data;
}
package com.ikonke.konkeaialibabamcp.controller.param;
import lombok.Data;
@Data
public class DifyStreamResponseData {
private String id;
private String workflow_id;
private String status;
private Long created_at;
private Long finished_at;
private DifyOutputsData outputs;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment