Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
C
cmeeting
概览
概览
详情
活动
周期分析
版本库
存储库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
Issue Boards
Open sidebar
翟斌
cmeeting
Commits
51d673ff
提交
51d673ff
authored
7月 29, 2025
作者:
洪东保
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
定时任务debug
父级
31277edc
隐藏空白字符变更
内嵌
并排
正在显示
7 个修改的文件
包含
77 行增加
和
30 行删除
+77
-30
src/main/java/com/cmeeting/config/ThreadPoolConfig.java
+31
-0
src/main/java/com/cmeeting/email/EmailSender.java
+1
-1
src/main/java/com/cmeeting/job/CmeetingJob.java
+6
-9
src/main/java/com/cmeeting/job/EmailPushTask.java
+13
-3
src/main/java/com/cmeeting/job/FileProcessTask.java
+16
-11
src/main/java/com/cmeeting/service/FileProcessProducer.java
+6
-2
src/main/java/com/cmeeting/service/impl/TencentMeetingServiceImpl.java
+4
-4
没有找到文件。
src/main/java/com/cmeeting/config/ThreadPoolConfig.java
浏览文件 @
51d673ff
...
...
@@ -39,4 +39,34 @@ public class ThreadPoolConfig {
executor
.
initialize
();
return
executor
;
}
@Bean
(
"emailProcessExecutor"
)
public
ThreadPoolTaskExecutor
emailProcessExecutor
()
{
ThreadPoolTaskExecutor
executor
=
new
ThreadPoolTaskExecutor
();
// 核心线程数 (CPU密集型任务建议核心数+1)
executor
.
setCorePoolSize
(
1
);
// 固定核心线程数,避免动态获取CPU核心数
// 最大线程数
executor
.
setMaxPoolSize
(
5
);
// 队列容量
executor
.
setQueueCapacity
(
1000
);
// 线程名前缀
executor
.
setThreadNamePrefix
(
"email-process-"
);
// 明确设置所有必要属性
executor
.
setAllowCoreThreadTimeOut
(
false
);
// 核心线程不允许超时
executor
.
setWaitForTasksToCompleteOnShutdown
(
true
);
// 优雅关闭
executor
.
setAwaitTerminationSeconds
(
60
);
// 等待任务完成的最大时间
// 拒绝策略
executor
.
setRejectedExecutionHandler
(
new
ThreadPoolExecutor
.
CallerRunsPolicy
());
// 初始化前打印配置检查
log
.
info
(
"Initializing ThreadPool: core={}, max={}"
,
executor
.
getCorePoolSize
(),
executor
.
getMaxPoolSize
());
executor
.
initialize
();
return
executor
;
}
}
\ No newline at end of file
src/main/java/com/cmeeting/email/EmailSender.java
浏览文件 @
51d673ff
...
...
@@ -284,7 +284,7 @@ public class EmailSender {
.
buildRequest
()
.
post
();
log
.
error
(
"邮件已成功发送: meetingId->{}"
,
m
eetingId
);
log
.
info
(
"邮件已成功发送: meetingId->{}, subMeetingId->{}"
,
meetingId
,
subM
eetingId
);
isSent
=
true
;
}
catch
(
Exception
e
)
{
retryCount
.
getAndIncrement
();
...
...
src/main/java/com/cmeeting/job/CmeetingJob.java
浏览文件 @
51d673ff
...
...
@@ -102,8 +102,7 @@ public class CmeetingJob {
log
.
info
(
"-------关联企微腾会人员定时任务结束--------"
);
}
// @Scheduled(fixedRate = 20 * 60 * 1000,initialDelay = 2 * 60 * 1000)
@Scheduled
(
fixedRate
=
60
*
60
*
1000
,
initialDelay
=
1
*
60
*
1000
)
@Scheduled
(
fixedRate
=
20
*
60
*
1000
,
initialDelay
=
2
*
60
*
1000
)
public
void
execute
()
{
if
(
isDev
)
{
return
;
...
...
@@ -142,8 +141,7 @@ public class CmeetingJob {
/**
* 定时扫描早于一小时之前的,所有未重试过的会议,重新生成纪要
*/
// @Scheduled(fixedRate = 30 * 60 * 1000,initialDelay = 10 * 60 * 1000)
// @Scheduled(fixedRate = 30 * 60 * 1000)
@Scheduled
(
fixedRate
=
30
*
60
*
1000
,
initialDelay
=
10
*
60
*
1000
)
public
void
meetingMinutesRetry
()
{
if
(
isDev
)
{
return
;
...
...
@@ -153,12 +151,12 @@ public class CmeetingJob {
log
.
info
(
"当前时间: "
+
LocalDate
.
now
().
format
(
DateTimeFormatter
.
ISO_LOCAL_DATE
));
//查出所有早于一小时前的,生成失败且未重试过的会议
// 不能用status筛选,因为可能线程执行一般服务终止,status状态没变
List
<
MeetingInfo
>
meetingInfoList
=
meetingInfoService
.
list
(
new
LambdaQueryWrapper
<
MeetingInfo
>()
.
eq
(
MeetingInfo:
:
getEmailPushAccess
,
Boolean
.
TRUE
)
.
eq
(
MeetingInfo:
:
getIsGenerated
,
Boolean
.
FALSE
)
.
eq
(
MeetingInfo:
:
getStatus
,
MeetingState
.
GENERATE_ERROR
.
getCode
())
.
eq
(
MeetingInfo:
:
getGenerateRetry
,
Boolean
.
FALSE
)
.
eq
(
MeetingInfo:
:
getEmailPushAccess
,
Boolean
.
TRUE
)
.
le
(
MeetingInfo:
:
getSyncTime
,
LocalDateTime
.
now
().
minusHours
(
1
))
);
...
...
@@ -193,8 +191,7 @@ public class CmeetingJob {
/**
* 定时扫描早于一小时之前的,所有邮件推送未重试过的会议,重新推送邮件
*/
// @Scheduled(fixedRate = 30 * 60 * 1000,initialDelay = 15 * 60 * 1000)
// @Scheduled(fixedRate = 30 * 60 * 1000)
@Scheduled
(
fixedRate
=
30
*
60
*
1000
,
initialDelay
=
15
*
60
*
1000
)
public
void
emailPushRetry
()
{
if
(
isDev
)
{
return
;
...
...
@@ -206,8 +203,8 @@ public class CmeetingJob {
//查出所有早于一小时前的,邮件推送失败且未重试过的会议
List
<
MeetingInfo
>
meetingInfoList
=
meetingInfoService
.
list
(
new
LambdaQueryWrapper
<
MeetingInfo
>()
.
eq
(
MeetingInfo:
:
getIsGenerated
,
Boolean
.
TRUE
)
.
eq
(
MeetingInfo:
:
getEmailPushAccess
,
Boolean
.
TRUE
)
.
eq
(
MeetingInfo:
:
getIsGenerated
,
Boolean
.
TRUE
)
.
eq
(
MeetingInfo:
:
getIsPushed
,
Boolean
.
FALSE
)
.
eq
(
MeetingInfo:
:
getPushRetry
,
Boolean
.
FALSE
)
.
le
(
MeetingInfo:
:
getSyncTime
,
LocalDateTime
.
now
().
minusHours
(
1
))
...
...
src/main/java/com/cmeeting/job/EmailPushTask.java
浏览文件 @
51d673ff
...
...
@@ -4,12 +4,14 @@ import cn.hutool.core.io.FileUtil;
import
cn.hutool.core.util.IdUtil
;
import
com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
;
import
com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
;
import
com.cmeeting.constant.MeetingState
;
import
com.cmeeting.email.EmailSender
;
import
com.cmeeting.mapper.primary.MeetingInfoMapper
;
import
com.cmeeting.mapper.primary.MeetingRecordTemplateMapper
;
import
com.cmeeting.pojo.MeetingInfo
;
import
com.cmeeting.pojo.MeetingRecordTemplate
;
import
com.cmeeting.util.MinioUtils
;
import
com.cmeeting.util.RedisUtils
;
import
com.cmeeting.vo.EmailPush
;
import
com.deepoove.poi.XWPFTemplate
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
...
...
@@ -46,12 +48,18 @@ public class EmailPushTask {
private
MeetingInfoMapper
meetingInfoMapper
;
private
MinioUtils
minioUtils
;
private
RedisUtils
redisUtils
;
private
EmailSender
emailSender
;
private
MeetingRecordTemplateMapper
meetingRecordTemplateMapper
;
private
Map
<
String
,
String
>
tidWidRelations
;
// 实际处理逻辑
public
void
process
()
{
String
key
=
"meet_process"
+
meetingId
+
"_"
+
(
subMeetingId
==
null
?
""
:
"subMeetingId"
);
if
(!
redisUtils
.
setnx
(
key
,
1
,
120
))
{
log
.
warn
(
"key already exists in redis!, key: {}"
,
key
);
return
;
}
Boolean
isSuccess
=
Boolean
.
FALSE
;
AtomicInteger
retryCount
=
new
AtomicInteger
(
0
);
MeetingInfo
meetingInfo
=
meetingInfoMapper
.
selectOne
(
new
LambdaQueryWrapper
<
MeetingInfo
>()
...
...
@@ -142,9 +150,10 @@ public class EmailPushTask {
.
eq
(
MeetingInfo:
:
getMeetingId
,
meetingId
)
.
eq
(
subMeetingId
!=
null
,
MeetingInfo:
:
getSubMeetingId
,
subMeetingId
)
.
set
(
MeetingInfo:
:
getIsPushed
,
isSuccess
)
.
set
(
MeetingInfo:
:
getStatus
,
isSuccess
?
MeetingState
.
PUSH_SUCCESS
.
getCode
()
:
MeetingState
.
PUSH_ERROR
.
getCode
())
.
set
(
MeetingInfo:
:
getPushRetry
,
Boolean
.
TRUE
)
);
redisUtils
.
del
(
key
);
}
...
...
@@ -185,14 +194,15 @@ public class EmailPushTask {
public
EmailPushTask
(
String
meetingId
,
String
subMeetingId
,
String
savePath
,
Map
<
String
,
Object
>
metadata
,
MeetingInfoMapper
meetingInfoMapper
,
MinioUtils
minioUtils
,
EmailSender
emailSender
,
MeetingRecordTemplateMapper
meetingRecordTemplateMapper
,
Map
<
String
,
String
>
tidWidRelations
)
{
MeetingInfoMapper
meetingInfoMapper
,
MinioUtils
minioUtils
,
RedisUtils
redisUtils
,
EmailSender
emailSender
,
MeetingRecordTemplateMapper
meetingRecordTemplateMapper
,
Map
<
String
,
String
>
tidWidRelations
)
{
this
.
savePath
=
savePath
;
this
.
metadata
=
metadata
;
this
.
meetingId
=
meetingId
;
this
.
subMeetingId
=
subMeetingId
;
this
.
meetingInfoMapper
=
meetingInfoMapper
;
this
.
minioUtils
=
minioUtils
;
this
.
redisUtils
=
redisUtils
;
this
.
emailSender
=
emailSender
;
this
.
meetingRecordTemplateMapper
=
meetingRecordTemplateMapper
;
this
.
tidWidRelations
=
tidWidRelations
;
...
...
src/main/java/com/cmeeting/job/FileProcessTask.java
浏览文件 @
51d673ff
...
...
@@ -105,11 +105,13 @@ public class FileProcessTask {
// 实际处理逻辑
public
void
process
()
{
boolean
isSuccess
=
false
;
String
key
=
"meet_process"
+
meetingId
+
"_"
+
(
subMeetingId
!
=
null
?
""
:
"subMeetingId"
);
if
(!
redisUtils
.
setnx
(
key
,
1
,
20
*
6
0
))
{
String
key
=
"meet_process"
+
meetingId
+
"_"
+
(
subMeetingId
=
=
null
?
""
:
"subMeetingId"
);
if
(!
redisUtils
.
setnx
(
key
,
1
,
18
0
))
{
log
.
warn
(
"key already exists in redis!, key: {}"
,
key
);
return
;
}
log
.
info
(
"线程开始------------>"
);
long
l
=
System
.
currentTimeMillis
();
Integer
status
=
null
;
while
(
retryCount
<=
MAX_RETRY
&&
!
isSuccess
)
{
Client
client
=
new
Client
.
Builder
()
...
...
@@ -122,7 +124,7 @@ public class FileProcessTask {
.
eq
(
MeetingInfo:
:
getMeetingId
,
meetingId
)
.
eq
(
subMeetingId
!=
null
,
MeetingInfo:
:
getSubMeetingId
,
subMeetingId
));
if
(!
meetingInfo
.
getEmailPushAccess
())
{
log
.
warn
(
"会议主持人没有推送邮件权限
"
);
log
.
warn
(
"会议主持人没有推送邮件权限
, userId: {}"
,
meetingInfo
.
getHostUid
()
);
return
;
}
String
meetingDate
=
meetingInfo
.
getStartTime
().
toLocalDate
().
format
(
DateTimeFormatter
.
ISO_LOCAL_DATE
);
...
...
@@ -210,6 +212,7 @@ public class FileProcessTask {
//每场会议可能会分段录制,查出每个文件的转录记录后拼接
StringBuffer
recordTextBuffer
=
new
StringBuffer
();
for
(
String
recordFileId
:
recordFileIdList
)
{
log
.
info
(
"下载转录文件, recordFileId: {}"
,
recordFileId
);
//查询录制转写详情
RecordsApi
.
ApiV1AddressesRecordFileIdGetRequest
addressRequest
=
new
RecordsApi
.
ApiV1AddressesRecordFileIdGetRequest
.
Builder
(
recordFileId
)
...
...
@@ -353,11 +356,16 @@ public class FileProcessTask {
new
LambdaUpdateWrapper
<
MeetingInfo
>()
.
eq
(
MeetingInfo:
:
getMeetingId
,
meetingId
)
.
eq
(
subMeetingId
!=
null
,
MeetingInfo:
:
getSubMeetingId
,
subMeetingId
)
.
set
(
MeetingInfo:
:
getGenerateRetry
,
Boolean
.
TRUE
)
.
set
(
MeetingInfo:
:
getGenerateRetry
,
Boolean
.
TRUE
));
}
else
{
meetingInfoMapper
.
update
(
null
,
new
LambdaUpdateWrapper
<
MeetingInfo
>()
.
eq
(
MeetingInfo:
:
getMeetingId
,
meetingId
)
.
eq
(
subMeetingId
!=
null
,
MeetingInfo:
:
getSubMeetingId
,
subMeetingId
)
.
set
(
MeetingInfo:
:
getStatus
,
status
!=
null
?
status
:
MeetingState
.
GENERATE_ERROR
.
getCode
())
);
}
}
else
{
}
else
{
// 指数退避
try
{
Thread
.
sleep
((
long
)
Math
.
pow
(
2
,
retryCount
)
*
1000
);
...
...
@@ -366,13 +374,10 @@ public class FileProcessTask {
throw
new
RuntimeException
(
"重试失败"
,
ie
);
}
}
}
finally
{
if
(
isSuccess
)
{
redisUtils
.
del
(
key
);
}
}
}
redisUtils
.
del
(
key
);
log
.
info
(
"线程结束, 耗时: {} ms"
,
System
.
currentTimeMillis
()
-
l
);
}
private
byte
[]
downloadFile
(
String
url
)
{
...
...
@@ -567,7 +572,7 @@ public class FileProcessTask {
.
set
(
MeetingInfo:
:
getTemplateId
,
meetingRecordTemplate
.
getId
())
.
set
(
MeetingInfo:
:
getTransDocId
,
meetingInfo
.
getTransDocId
())
.
set
(
MeetingInfo:
:
getSubject
,
meetingInfo
.
getSubject
())
.
set
(
MeetingInfo:
:
getStatus
,
success
?
MeetingState
.
NOTE_GENERATED
.
getCode
()
:
MeetingState
.
GENERATE_ERROR
.
getCode
())
.
set
(
MeetingInfo:
:
getStatus
,
success
?
MeetingState
.
NOTE_GENERATED
.
getCode
()
:
MeetingState
.
GENERATE_ERROR
.
getCode
())
);
}
meetingInfo
.
setRecordContent
(
recordContentPath
);
...
...
src/main/java/com/cmeeting/service/FileProcessProducer.java
浏览文件 @
51d673ff
...
...
@@ -31,8 +31,10 @@ import java.util.concurrent.Future;
@Slf4j
public
class
FileProcessProducer
{
@
Autowired
@
Resource
(
name
=
"fileProcessExecutor"
)
private
ThreadPoolTaskExecutor
fileProcessExecutor
;
@Resource
(
name
=
"emailProcessExecutor"
)
private
ThreadPoolTaskExecutor
emailProcessExecutor
;
@Value
(
"${aec.key}"
)
public
String
aesKey
;
@Autowired
...
...
@@ -84,6 +86,7 @@ public class FileProcessProducer {
public
void
submitBatchTasks
(
List
<
TencentMeetingVO
.
RecordFile
>
recordFiles
,
List
<
UserDTO
.
TemplateAuthorizedUserDTO
>
authorizedUsers
,
Map
<
String
,
String
>
tidWidRelations
,
Boolean
finalRetry
)
{
List
<
Future
<?>>
futures
=
new
ArrayList
<>();
String
adminToken
=
UserAdminTokenUtil
.
getUserAdminToken
();
log
.
info
(
"待处理会议数量: {}"
,
recordFiles
.
size
());
for
(
TencentMeetingVO
.
RecordFile
recordFile
:
recordFiles
)
{
// 为每个URL创建任务
FileProcessTask
task
=
new
FileProcessTask
(
...
...
@@ -141,13 +144,14 @@ public class FileProcessProducer {
Collections
.
emptyMap
(),
meetingInfoMapper
,
minioUtils
,
redisUtils
,
emailSender
,
meetingRecordTemplateMapper
,
tidWidRelations
);
// 提交任务到线程池
Future
<?>
future
=
file
ProcessExecutor
.
submit
(()
->
{
Future
<?>
future
=
email
ProcessExecutor
.
submit
(()
->
{
task
.
process
();
});
...
...
src/main/java/com/cmeeting/service/impl/TencentMeetingServiceImpl.java
浏览文件 @
51d673ff
...
...
@@ -183,7 +183,7 @@ public class TencentMeetingServiceImpl extends ServiceImpl<TecentMeetingMapper,T
V1HistoryMeetingsUseridGet200Response
historyMeetingResponseData
=
historyMeetingResponse
.
getData
();
List
<
V1HistoryMeetingsUseridGet200ResponseMeetingInfoListInner
>
historyMeetingInfoList
=
historyMeetingResponseData
.
getMeetingInfoList
();
if
(
CollectionUtils
.
isEmpty
(
historyMeetingInfoList
)){
log
.
error
(
"会议未结束,获取子会议id信息失败
"
);
log
.
error
(
"会议未结束,获取子会议id信息失败
, meetId: {}, subId: {}"
,
meetingId
,
subMeetingId
);
continue
;
}
V1HistoryMeetingsUseridGet200ResponseMeetingInfoListInner
historyMeeting
=
historyMeetingInfoList
.
get
(
0
);
...
...
@@ -194,8 +194,8 @@ public class TencentMeetingServiceImpl extends ServiceImpl<TecentMeetingMapper,T
//如果数据库中已有相同会议id的记录,跳过同步
String
finalSubMeetingId
=
subMeetingId
;
if
(
!
meetingIds
.
stream
().
any
Match
(
item
->
item
.
getMeetingId
().
equals
(
meetingId
)
&&
Objects
.
equals
(
item
.
getSubMeetingId
(),
finalSubMeetingId
))){
log
.
info
(
"【会议检索】新的会议meetingId->{}
"
,
meeting
.
getMeetingId
()
);
if
(
meetingIds
.
stream
().
none
Match
(
item
->
item
.
getMeetingId
().
equals
(
meetingId
)
&&
Objects
.
equals
(
item
.
getSubMeetingId
(),
finalSubMeetingId
))){
log
.
info
(
"【会议检索】新的会议meetingId->{}
, subId: {} "
,
meeting
.
getMeetingId
(),
finalSubMeetingId
);
List
<
CorpRecordsVO
.
RecordFile
>
recordFiles
=
meeting
.
getRecordFiles
();
//按转录文件时间升序,便于后续的内容拼接
List
<
String
>
recordFileIdList
=
recordFiles
.
stream
().
sorted
(
Comparator
.
comparingLong
(
CorpRecordsVO
.
RecordFile
::
getRecordStartTime
))
...
...
@@ -262,7 +262,7 @@ public class TencentMeetingServiceImpl extends ServiceImpl<TecentMeetingMapper,T
hostId
=
host
.
get
().
getUserid
();
hostName
=
new
String
(
Base64
.
getDecoder
().
decode
(
host
.
get
().
getUserName
()));
}
else
{
log
.
error
(
"未找到主持人,默认没有生成纪要权限
"
);
log
.
error
(
"未找到主持人,默认没有生成纪要权限
, meetId: {}, subId: {}"
,
meetingId
,
subMeetingId
);
// processLogService.log(meeting.getMeetingId(),subMeetingId,"未找到主持人,默认没有生成纪要权限");
continue
;
}
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论