提交 0cc18764 作者: 洪东保

定时任务修改

父级 cdaad871
......@@ -170,7 +170,7 @@ public class RecordTemplateController {
* 模板测试效果
* @param file 用户自主上传的转录文件
* @param meetingInstId 历史会议主键id
* @param id 模板id
* @param content 模板提示词
* @return
*/
@PostMapping("/testGenerate")
......
......@@ -127,7 +127,6 @@ public class UnifiedController {
//取消预约会议
}
// //todo 待测试
// @GetMapping("/sameNameInsertTid")
// public void sameNameInsertTid(String corpid, String corpsecret) throws IOException {
// String weComToken = getWeComToken(corpid,corpsecret);
......
......@@ -43,17 +43,18 @@ public class CmeetingJob {
@Value("${isDev}")
private Boolean isDev;
// @PostConstruct
public void weComUserInit(){
// @PostConstruct
public void weComUserInit() {
weComUserSync();
}
// @PostConstruct
public void tencentUserInit(){
// @PostConstruct
public void tencentUserInit() {
TencentUserSync();
}
// @PostConstruct
public void userBindInit(){
// @PostConstruct
public void userBindInit() {
userBind();
}
......@@ -109,28 +110,28 @@ public class CmeetingJob {
log.info("-------关联企微腾会人员定时任务结束--------");
}
@Scheduled(fixedRate = 20 * 60 * 1000,initialDelay = 2 * 60 * 1000)
@Scheduled(fixedRate = 20 * 60 * 1000, initialDelay = 2 * 60 * 1000)
public void execute() {
if (isDev) {
return;
}
//查出企微id和腾会id的关联关系
List<UserId> userIdRelations = userIdMapper.selectList(null);
Map<String,String> widTidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getWid,UserId::getTid));
Map<String,String> tidWidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getTid,UserId::getWid));
Map<String, String> widTidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getWid, UserId::getTid));
Map<String, String> tidWidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getTid, UserId::getWid));
//查出企微的人员信息
List<WeComUser> weComUserList = weComService.list();
Map<String,WeComUser> weComUserMap = weComUserList.stream().collect(Collectors.toMap(WeComUser::getUserId, Function.identity()));
Map<String, WeComUser> weComUserMap = weComUserList.stream().collect(Collectors.toMap(WeComUser::getUserId, Function.identity()));
//智能体授权人员
List<UserDTO> accessUserIds = tencentMeetingService.getAccessUserIds(widTidRelations);
if (CollectionUtils.isEmpty(accessUserIds)) {
log.info("无生成纪要权限的人员");
return;
}else{
} else {
log.info("生成纪要权限人员:->{}", accessUserIds.stream().map(UserDTO::getWid).collect(Collectors.joining(",")));
}
List<TencentMeetingVO.RecordFile> meetingFiles = tencentMeetingService.getMeetingFiles(accessUserIds,weComUserMap);
List<TencentMeetingVO.RecordFile> meetingFiles = tencentMeetingService.getMeetingFiles(accessUserIds, weComUserMap);
if (meetingFiles == null || meetingFiles.isEmpty()) {
log.info("没有录制文件需要处理");
......@@ -141,14 +142,14 @@ public class CmeetingJob {
List<UserDTO.TemplateAuthorizedUserDTO> authorizedUsers = meetingRecordTemplateService.selectAuthorizedUsers();
// 提交处理任务
producer.submitBatchTasks(meetingFiles,authorizedUsers,tidWidRelations,Boolean.FALSE);
producer.submitBatchTasks(meetingFiles, authorizedUsers, tidWidRelations, Boolean.FALSE);
}
/**
* 定时扫描早于一小时之前的,所有未重试过的会议,重新生成纪要
*/
@Scheduled(fixedRate = 30 * 60 * 1000,initialDelay = 10 * 60 * 1000)
@Scheduled(fixedRate = 30 * 60 * 1000, initialDelay = 10 * 60 * 1000)
public void meetingMinutesRetry() {
if (isDev) {
return;
......@@ -160,9 +161,9 @@ public class CmeetingJob {
//查出所有早于一小时前的,生成失败且未重试过的会议
List<MeetingInfo> meetingInfoList =
meetingInfoService.list(new LambdaQueryWrapper<MeetingInfo>()
.eq(MeetingInfo::getIsGenerated,Boolean.FALSE)
.eq(MeetingInfo::getGenerateRetry,Boolean.FALSE)
.le(MeetingInfo::getSyncTime,LocalDateTime.now().minusHours(1))
.eq(MeetingInfo::getIsGenerated, Boolean.FALSE)
.eq(MeetingInfo::getGenerateRetry, Boolean.FALSE)
.le(MeetingInfo::getSyncTime, LocalDateTime.now().minusHours(1))
);
if (meetingInfoList == null || meetingInfoList.isEmpty()) {
......@@ -180,13 +181,13 @@ public class CmeetingJob {
//查出企微id和腾会id的关联关系
List<UserId> userIdRelations = userIdMapper.selectList(null);
Map<String,String> tidWidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getTid,UserId::getWid));
Map<String, String> tidWidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getTid, UserId::getWid));
//获取模板授权的人员
List<UserDTO.TemplateAuthorizedUserDTO> authorizedUsers = meetingRecordTemplateService.selectAuthorizedUsers();
// 提交处理任务
producer.submitBatchTasks(meetingFiles,authorizedUsers,tidWidRelations, Boolean.TRUE);
producer.submitBatchTasks(meetingFiles, authorizedUsers, tidWidRelations, Boolean.TRUE);
log.info("-------生成纪要重试定时任务结束--------");
} catch (Exception e) {
e.printStackTrace();
......@@ -196,7 +197,7 @@ public class CmeetingJob {
/**
* 定时扫描早于一小时之前的,所有邮件推送未重试过的会议,重新推送邮件
*/
@Scheduled(fixedRate = 30 * 60 * 1000,initialDelay = 15 * 60 * 1000)
@Scheduled(fixedRate = 30 * 60 * 1000, initialDelay = 15 * 60 * 1000)
public void emailPushRetry() {
if (isDev) {
return;
......@@ -208,11 +209,11 @@ public class CmeetingJob {
//查出所有早于一小时前的,邮件推送失败且未重试过的会议
List<MeetingInfo> meetingInfoList =
meetingInfoService.list(new LambdaQueryWrapper<MeetingInfo>()
.eq(MeetingInfo::getIsGenerated,Boolean.TRUE)
.eq(MeetingInfo::getEmailPushAccess,Boolean.TRUE)
.eq(MeetingInfo::getIsPushed,Boolean.FALSE)
.eq(MeetingInfo::getPushRetry,Boolean.FALSE)
.le(MeetingInfo::getSyncTime,LocalDateTime.now().minusHours(1))
.eq(MeetingInfo::getIsGenerated, Boolean.TRUE)
.eq(MeetingInfo::getEmailPushAccess, Boolean.TRUE)
.eq(MeetingInfo::getIsPushed, Boolean.FALSE)
.eq(MeetingInfo::getPushRetry, Boolean.FALSE)
.le(MeetingInfo::getSyncTime, LocalDateTime.now().minusHours(1))
);
if (meetingInfoList == null || meetingInfoList.isEmpty()) {
......@@ -229,9 +230,9 @@ public class CmeetingJob {
//查出企微id和腾会id的关联关系
List<UserId> userIdRelations = userIdMapper.selectList(null);
Map<String,String> tidWidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getTid,UserId::getWid));
Map<String, String> tidWidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getTid, UserId::getWid));
// 提交处理任务
producer.submitEmailPushTasks(meetingFiles,tidWidRelations);
producer.submitEmailPushTasks(meetingFiles, tidWidRelations);
log.info("-------邮件推送重试定时任务结束--------");
} catch (Exception e) {
e.printStackTrace();
......
......@@ -37,7 +37,7 @@ public class MeetingInfo implements Serializable {
* 会议主题
*/
private String subject;
/**
* 会议ID(字符串类型)
*/
......@@ -47,12 +47,12 @@ public class MeetingInfo implements Serializable {
* 子会议ID
*/
private String subMeetingId;
/**
* 会议号码
*/
private String meetingCode;
/**
* 主持人
*/
......@@ -67,14 +67,14 @@ public class MeetingInfo implements Serializable {
* 参会人员名单
*/
private String participantUsers;
/**
* 会议开始时间(时间戳)
*/
@DateTimeFormat(pattern = "yyy-MM-dd HH:mm:ss")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", locale = "zh", timezone = "GMT+8")
private LocalDateTime startTime;
/**
* 会议结束时间(时间戳)
*/
......@@ -89,6 +89,7 @@ public class MeetingInfo implements Serializable {
/**
* 推送邮件许可 为false不推送
*/
private Boolean emailGenerateAccess;
private Boolean emailPushAccess;
/**
* 是否推送邮件完成
......
......@@ -9,6 +9,7 @@ import com.cmeeting.mapper.primary.MeetingInfoMapper;
import com.cmeeting.mapper.primary.MeetingRecordTemplateMapper;
import com.cmeeting.pojo.UserId;
import com.cmeeting.util.MinioUtils;
import com.cmeeting.util.RedisUtils;
import com.cmeeting.util.UserAdminConfig;
import com.cmeeting.util.UserAdminTokenUtil;
import com.cmeeting.vo.TencentMeetingVO;
......@@ -29,22 +30,12 @@ import java.util.concurrent.Future;
@Service
@Slf4j
public class FileProcessProducer {
@Autowired
private ThreadPoolTaskExecutor fileProcessExecutor;
@Autowired
private FileProcessCallbackHandler callbackHandler;
@Value(value = "${tencent.appId}")
private String tencentAppId;
@Value(value = "${tencent.sdkId}")
private String tencentSdkId;
@Value(value = "${tencent.secretId}")
private String tencentSecretId;
@Value(value = "${tencent.secretKey}")
private String tencentSecretKey;
@Value(value = "${tencent.admin.userId}")
private String tencentAdminUserId;
@Value(value = "${llm.api-addr}")
private String llmApiAddr;
@Value(value = "${tencent.base-save-path}")
......@@ -60,6 +51,8 @@ public class FileProcessProducer {
@Resource
private MinioUtils minioUtils;
@Resource
private RedisUtils redisUtils;
@Resource
private EmailSender emailSender;
@Resource
private ProcessLogService processLogService;
......@@ -71,59 +64,56 @@ public class FileProcessProducer {
/**
* 批量提交生成纪要任务
* @param recordFiles 转录文件信息
*
* @param recordFiles 转录文件信息
* @param authorizedUsers 模板授权的人员
* @param tidWidRelations 腾会id企微id对应关系
* @param finalRetry 是否为最终重试
* @param finalRetry 是否为最终重试
* @param finalRetry
*/
public void submitBatchTasks(List<TencentMeetingVO.RecordFile> recordFiles, List<UserDTO.TemplateAuthorizedUserDTO> authorizedUsers, Map<String,String> tidWidRelations, Boolean finalRetry) {
public void submitBatchTasks(List<TencentMeetingVO.RecordFile> recordFiles, List<UserDTO.TemplateAuthorizedUserDTO> authorizedUsers, Map<String, String> tidWidRelations, Boolean finalRetry) {
List<Future<?>> futures = new ArrayList<>();
String adminToken = UserAdminTokenUtil.getUserAdminToken();
for (TencentMeetingVO.RecordFile recordFile : recordFiles) {
// 为每个URL创建任务
FileProcessTask task = new FileProcessTask(
recordFile.getRecordFileIdList(),
recordFile.getMeetingId(),
recordFile.getSubMeetingId(),
baseSavePath,
Collections.emptyMap(),
tencentAppId,
tencentSdkId,
tencentSecretId,
tencentSecretKey,
tencentAdminUserId,
meetingInfoMapper,
minioUtils,
emailSender,
meetingRecordTemplateMapper,
llmApiAddr,
finalRetry,
processLogService,
authorizedUsers,
tidWidRelations,
userAdminConfig,
adminToken,
applicationId,
fileDownloadPath,
permTenantId
recordFile.getRecordFileIdList(),
recordFile.getMeetingId(),
recordFile.getSubMeetingId(),
baseSavePath,
Collections.emptyMap(),
meetingInfoMapper,
minioUtils,
redisUtils,
emailSender,
meetingRecordTemplateMapper,
llmApiAddr,
finalRetry,
processLogService,
authorizedUsers,
tidWidRelations,
userAdminConfig,
adminToken,
applicationId,
fileDownloadPath,
permTenantId
);
// 提交任务到线程池
Future<?> future = fileProcessExecutor.submit(() -> {
task.process();
callbackHandler.onComplete(task); // 回调处理
});
futures.add(future);
}
// 可以添加一个监控线程来检查所有任务完成情况
monitorTaskCompletion(futures);
}
// 批量提交邮箱推送重试任务
public void submitEmailPushTasks(List<TencentMeetingVO.RecordFile> recordFiles,Map<String,String> tidWidRelations) {
public void submitEmailPushTasks(List<TencentMeetingVO.RecordFile> recordFiles, Map<String, String> tidWidRelations) {
List<Future<?>> futures = new ArrayList<>();
for (TencentMeetingVO.RecordFile recordFile : recordFiles) {
......@@ -151,7 +141,7 @@ public class FileProcessProducer {
// 可以添加一个监控线程来检查所有任务完成情况
monitorTaskCompletion(futures);
}
private void monitorTaskCompletion(List<Future<?>> futures) {
new Thread(() -> {
for (Future<?> future : futures) {
......
......@@ -302,7 +302,7 @@ public class RecordTemplatePermissionServiceImpl extends ServiceImpl<RecordTempl
* @return
*/
private String extractXmlFromMarkdown(String markdown) {
StringBuffer sb;
StringBuffer sb = null;
try {
int start = markdown.indexOf("<");
int end = markdown.lastIndexOf(">") + 1;
......@@ -313,7 +313,6 @@ public class RecordTemplatePermissionServiceImpl extends ServiceImpl<RecordTempl
sb.append("</root>");
} catch (Exception e) {
log.info("markdown转xml,markdown->{}",markdown);
throw new RuntimeException(e.getMessage());
}
return sb.toString();
}
......
......@@ -141,6 +141,15 @@ public class RedisUtils {
}
}
public boolean setnx(String key, Object value, long time) {
try {
return redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
*
......
......@@ -112,7 +112,7 @@ email:
smtp-host: ${EMAIL_SMTP_HOST}
push-switch: true #邮件推送总开关,高优先级
environment: test #test推给本公司人员,prod推给用户
test-receiver: duanxincheng@chatbot.cn #用于测试的收方邮箱
test-receiver: hongdongbao@chatbot.cn #用于测试的收方邮箱
llm:
api-addr: ${LLM_API_ADDR}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论