提交 49471a66 作者: 洪东保

多线程多活保证任务无重复消费

父级 e7bd4c9b
...@@ -8,6 +8,7 @@ import com.cmeeting.pojo.MeetingInfo; ...@@ -8,6 +8,7 @@ import com.cmeeting.pojo.MeetingInfo;
import com.cmeeting.pojo.UserId; import com.cmeeting.pojo.UserId;
import com.cmeeting.pojo.WeComUser; import com.cmeeting.pojo.WeComUser;
import com.cmeeting.service.*; import com.cmeeting.service.*;
import com.cmeeting.util.RedisUtils;
import com.cmeeting.vo.TencentMeetingVO; import com.cmeeting.vo.TencentMeetingVO;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -43,6 +44,8 @@ public class CmeetingJob { ...@@ -43,6 +44,8 @@ public class CmeetingJob {
private UserIdMapper userIdMapper; private UserIdMapper userIdMapper;
@Value("${isDev}") @Value("${isDev}")
private Boolean isDev; private Boolean isDev;
@Resource
private RedisUtils redisUtils;
// @PostConstruct // @PostConstruct
public void weComUserInit(){ public void weComUserInit(){
...@@ -107,34 +110,43 @@ public class CmeetingJob { ...@@ -107,34 +110,43 @@ public class CmeetingJob {
if (isDev) { if (isDev) {
return; return;
} }
//查出企微id和腾会id的关联关系 if (redisUtils.setnx("Scheduled-All", "Scheduled-All", 18 * 60)){
List<UserId> userIdRelations = userIdMapper.selectList(null);
Map<String,String> widTidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getWid,UserId::getTid));
Map<String,String> tidWidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getTid,UserId::getWid));
//查出企微的人员信息
List<WeComUser> weComUserList = weComService.list();
Map<String,WeComUser> weComUserMap = weComUserList.stream().collect(Collectors.toMap(WeComUser::getUserId, Function.identity()));
//智能体授权人员
List<UserDTO> accessUserIds = tencentMeetingService.getAccessUserIds(widTidRelations);
if (CollectionUtils.isEmpty(accessUserIds)) {
log.info("无生成纪要权限的人员");
return; return;
}else{
log.info("生成纪要权限人员:->{}",accessUserIds.toString());
} }
List<TencentMeetingVO.RecordFile> meetingFiles = tencentMeetingService.getMeetingFiles(accessUserIds,weComUserMap); try {
//查出企微id和腾会id的关联关系
List<UserId> userIdRelations = userIdMapper.selectList(null);
Map<String, String> widTidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getWid, UserId::getTid));
Map<String, String> tidWidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getTid, UserId::getWid));
//查出企微的人员信息
List<WeComUser> weComUserList = weComService.list();
Map<String, WeComUser> weComUserMap = weComUserList.stream().collect(Collectors.toMap(WeComUser::getUserId, Function.identity()));
if (meetingFiles == null || meetingFiles.isEmpty()) { //智能体授权人员
log.info("没有录制文件需要处理"); List<UserDTO> accessUserIds = tencentMeetingService.getAccessUserIds(widTidRelations);
return; if (CollectionUtils.isEmpty(accessUserIds)) {
} log.info("无生成纪要权限的人员");
return;
} else {
log.info("生成纪要权限人员:->{}", accessUserIds.toString());
}
List<TencentMeetingVO.RecordFile> meetingFiles = tencentMeetingService.getMeetingFiles(accessUserIds, weComUserMap);
if (meetingFiles == null || meetingFiles.isEmpty()) {
log.info("没有录制文件需要处理");
return;
}
//获取模板授权的人员 //获取模板授权的人员
List<UserDTO.TemplateAuthorizedUserDTO> authorizedUsers = meetingRecordTemplateService.selectAuthorizedUsers(); List<UserDTO.TemplateAuthorizedUserDTO> authorizedUsers = meetingRecordTemplateService.selectAuthorizedUsers();
// 提交处理任务 // 提交处理任务
producer.submitBatchTasks(meetingFiles,authorizedUsers,tidWidRelations,Boolean.FALSE); producer.submitBatchTasks(meetingFiles, authorizedUsers, tidWidRelations, Boolean.FALSE);
} catch (Exception e){
e.printStackTrace();
} finally {
redisUtils.del("Scheduled-All");
}
} }
...@@ -146,6 +158,9 @@ public class CmeetingJob { ...@@ -146,6 +158,9 @@ public class CmeetingJob {
if (isDev) { if (isDev) {
return; return;
} }
if (redisUtils.setnx("Scheduled-retry", "Scheduled-retry", 28 * 60)){
return;
}
try { try {
log.info("-------生成纪要重试定时任务开始-------"); log.info("-------生成纪要重试定时任务开始-------");
log.info("当前时间: " + LocalDate.now().format(DateTimeFormatter.ISO_LOCAL_DATE)); log.info("当前时间: " + LocalDate.now().format(DateTimeFormatter.ISO_LOCAL_DATE));
...@@ -185,6 +200,8 @@ public class CmeetingJob { ...@@ -185,6 +200,8 @@ public class CmeetingJob {
log.info("-------生成纪要重试定时任务结束--------"); log.info("-------生成纪要重试定时任务结束--------");
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally {
redisUtils.del("Scheduled-retry");
} }
} }
...@@ -196,6 +213,9 @@ public class CmeetingJob { ...@@ -196,6 +213,9 @@ public class CmeetingJob {
if (isDev) { if (isDev) {
return; return;
} }
if (redisUtils.setnx("Scheduled-email-retry", "Scheduled-email-retry", 28 * 60)){
return;
}
try { try {
log.info("-------邮件推送重试定时任务开始-------"); log.info("-------邮件推送重试定时任务开始-------");
log.info("当前时间: " + LocalDate.now().format(DateTimeFormatter.ISO_LOCAL_DATE)); log.info("当前时间: " + LocalDate.now().format(DateTimeFormatter.ISO_LOCAL_DATE));
...@@ -230,6 +250,8 @@ public class CmeetingJob { ...@@ -230,6 +250,8 @@ public class CmeetingJob {
log.info("-------邮件推送重试定时任务结束--------"); log.info("-------邮件推送重试定时任务结束--------");
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} finally {
redisUtils.del("Scheduled-email-retry");
} }
} }
} }
...@@ -112,7 +112,7 @@ public class FileProcessTask { ...@@ -112,7 +112,7 @@ public class FileProcessTask {
public void process() { public void process() {
boolean isSuccess = false; boolean isSuccess = false;
String key = "meet_process" + meetingId + "_" + (subMeetingId == null ? "" : "subMeetingId"); String key = "meet_process" + meetingId + "_" + (subMeetingId == null ? "" : "subMeetingId");
if (!redisUtils.setnx(key, 1, 180)) { if (!redisUtils.setnx(key, 1, 240)) {
log.warn("key already exists in redis!, key: {}", key); log.warn("key already exists in redis!, key: {}", key);
return; return;
} }
...@@ -129,6 +129,10 @@ public class FileProcessTask { ...@@ -129,6 +129,10 @@ public class FileProcessTask {
MeetingInfo meetingInfo = meetingInfoMapper.selectOne(new LambdaQueryWrapper<MeetingInfo>() MeetingInfo meetingInfo = meetingInfoMapper.selectOne(new LambdaQueryWrapper<MeetingInfo>()
.eq(MeetingInfo::getMeetingId,meetingId) .eq(MeetingInfo::getMeetingId,meetingId)
.eq(subMeetingId != null, MeetingInfo::getSubMeetingId, subMeetingId)); .eq(subMeetingId != null, MeetingInfo::getSubMeetingId, subMeetingId));
if (meetingInfo.getIsGenerated()) {
log.warn("Generating is down, meetingId: {}, subMeetingId: {}", meetingInfo.getMeetingId(), meetingInfo.getSubMeetingId());
return;
}
if (!meetingInfo.getEmailPushAccess()) { if (!meetingInfo.getEmailPushAccess()) {
log.warn("会议主持人没有推送邮件权限, userId: {}", meetingInfo.getHostUid()); log.warn("会议主持人没有推送邮件权限, userId: {}", meetingInfo.getHostUid());
return; return;
...@@ -780,4 +784,8 @@ public class FileProcessTask { ...@@ -780,4 +784,8 @@ public class FileProcessTask {
this.permTenantId = permTenantId; this.permTenantId = permTenantId;
this.aesKey = aesKey; this.aesKey = aesKey;
} }
public String getId(){
return this.meetingId + (this.subMeetingId == null ? "" : this.subMeetingId);
}
} }
\ No newline at end of file
...@@ -123,7 +123,6 @@ public class FileProcessProducer { ...@@ -123,7 +123,6 @@ public class FileProcessProducer {
task.process(); task.process();
callbackHandler.onComplete(task); // 回调处理 callbackHandler.onComplete(task); // 回调处理
}); });
futures.add(future); futures.add(future);
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论