提交 03265052 作者: duanxincheng

压测生成纪要服务

父级 800bac69
...@@ -229,11 +229,12 @@ public class EmailSender { ...@@ -229,11 +229,12 @@ public class EmailSender {
ItemBody body = new ItemBody(); ItemBody body = new ItemBody();
body.contentType = BodyType.HTML; body.contentType = BodyType.HTML;
if(redisUtils.hasKey("EMAIL_CONTENT_TEMPLATE")){ if(redisUtils.hasKey("EMAIL_CONTENT_TEMPLATE")){
if(StringUtils.isEmpty(toUserCode)){ //todo 压测注释
isSent = false; // if(StringUtils.isEmpty(toUserCode)){
processLogService.log(meetingId,subMeetingId,"【邮件推送异常】:收件人工号不能为空"); // isSent = false;
continue; // processLogService.log(meetingId,subMeetingId,"【邮件推送异常】:收件人工号不能为空");
} // continue;
// }
String email_content_template = String.valueOf(redisUtils.get("EMAIL_CONTENT_TEMPLATE")); String email_content_template = String.valueOf(redisUtils.get("EMAIL_CONTENT_TEMPLATE"));
long expireTimestamp = ZonedDateTime.now().plusDays(2).toInstant().toEpochMilli(); long expireTimestamp = ZonedDateTime.now().plusDays(2).toInstant().toEpochMilli();
body.content = MessageFormat.format(email_content_template,meetingInstanceId,toUserCode,String.valueOf(expireTimestamp)); body.content = MessageFormat.format(email_content_template,meetingInstanceId,toUserCode,String.valueOf(expireTimestamp));
......
...@@ -60,10 +60,11 @@ public class EmailPushTask { ...@@ -60,10 +60,11 @@ public class EmailPushTask {
while (retryCount.intValue() <= MAX_RETRY && !isSuccess) { while (retryCount.intValue() <= MAX_RETRY && !isSuccess) {
String hostUid = meetingInfo.getHostUid(); String hostUid = meetingInfo.getHostUid();
if(!tidWidRelations.containsKey(hostUid)){ //压测注释
log.error("邮件推送重试失败: 主持人对应关系未配置。meetingId {}", meetingId); // if(!tidWidRelations.containsKey(hostUid)){
continue; // log.error("邮件推送重试失败: 主持人对应关系未配置。meetingId {}", meetingId);
} // continue;
// }
try { try {
String meetingName; String meetingName;
String xml; String xml;
......
...@@ -273,27 +273,28 @@ public class FileProcessTask { ...@@ -273,27 +273,28 @@ public class FileProcessTask {
List<EmailPush.Attachment> attachments = new ArrayList<>(); List<EmailPush.Attachment> attachments = new ArrayList<>();
String hostUid = meetingInfo.getHostUid(); String hostUid = meetingInfo.getHostUid();
String toUserCode = tidWidRelations.get(meetingInfo.getHostUid()); String toUserCode = tidWidRelations.get(meetingInfo.getHostUid());
if(!tidWidRelations.containsKey(hostUid)){ // if(!tidWidRelations.containsKey(hostUid)){
log.info("用户{}暂未关联企微信息,无法生成纪要文件",hostUid); // log.info("用户{}暂未关联企微信息,无法生成纪要文件",hostUid);
processLogService.log(meetingId,subMeetingId,"用户"+hostUid+"暂未关联企微信息,无法生成纪要文件"); // processLogService.log(meetingId,subMeetingId,"用户"+hostUid+"暂未关联企微信息,无法生成纪要文件");
continue; // continue;
} // }
for (MeetingRecordTemplate template : recordTemplateList) { for (MeetingRecordTemplate template : recordTemplateList) {
//判断本次纪要有模板生成权限 //判断本次纪要有模板生成权限
if(!authorizedUserMap.containsKey(template.getId())){ //todo 压测注释
log.info("模板{}暂未授权给任意对象",template.getName()); // if(!authorizedUserMap.containsKey(template.getId())){
processLogService.log(meetingId,subMeetingId,"模板"+template.getName()+"暂未授权给任意对象"); // log.info("模板{}暂未授权给任意对象",template.getName());
continue; // processLogService.log(meetingId,subMeetingId,"模板"+template.getName()+"暂未授权给任意对象");
} // continue;
List<String> authorizedUserIds = authorizedUserMap.get(template.getId()); // }
if(!authorizedUserIds.contains(tidWidRelations.get(hostUid))){ // List<String> authorizedUserIds = authorizedUserMap.get(template.getId());
log.info("用户{}暂无模板{}权限",hostUid,template.getName()); // if(!authorizedUserIds.contains(tidWidRelations.get(hostUid))){
processLogService.log(meetingId,subMeetingId,"用户"+hostUid+"暂无模板"+template.getName()+"权限"); // log.info("用户{}暂无模板{}权限",hostUid,template.getName());
continue; // processLogService.log(meetingId,subMeetingId,"用户"+hostUid+"暂无模板"+template.getName()+"权限");
}else{ // continue;
log.info("用户{}允许应用模板{}",hostUid,template.getName()); // }else{
processLogService.log(meetingId,subMeetingId,"用户"+hostUid+"允许应用模板"+template.getName()); // log.info("用户{}允许应用模板{}",hostUid,template.getName());
} // processLogService.log(meetingId,subMeetingId,"用户"+hostUid+"允许应用模板"+template.getName());
// }
String processedResult = processWithClaude(recordTextBuffer.toString(),meetingDate,participantNames,template.getPrompt()); String processedResult = processWithClaude(recordTextBuffer.toString(),meetingDate,participantNames,template.getPrompt());
String minutesPath = saveResult(processedResult, recordTextBuffer.toString().getBytes(StandardCharsets.UTF_8), meetingInfo,toUserCode, template); String minutesPath = saveResult(processedResult, recordTextBuffer.toString().getBytes(StandardCharsets.UTF_8), meetingInfo,toUserCode, template);
try(InputStream is = new FileInputStream(minutesPath)){ try(InputStream is = new FileInputStream(minutesPath)){
...@@ -306,16 +307,17 @@ public class FileProcessTask { ...@@ -306,16 +307,17 @@ public class FileProcessTask {
FileUtil.del(minutesPath); FileUtil.del(minutesPath);
} }
} }
if(CollectionUtils.isEmpty(attachments)){ //压测注释
log.info("用户{}暂无任何模板权限,纪要生成失败",hostUid); // if(CollectionUtils.isEmpty(attachments)){
isSuccess = false; // log.info("用户{}暂无任何模板权限,纪要生成失败",hostUid);
continue; // isSuccess = false;
} // continue;
if(!tidWidRelations.containsKey(meetingInfo.getHostUid())){ // }
log.error("邮件推送重试失败: 主持人对应关系未配置。meetingId {}", meetingId); // if(!tidWidRelations.containsKey(meetingInfo.getHostUid())){
processLogService.log(meetingId,subMeetingId,"邮件推送重试失败: 主持人对应关系未配置。meetingId "+meetingId); // log.error("邮件推送重试失败: 主持人对应关系未配置。meetingId {}", meetingId);
continue; // processLogService.log(meetingId,subMeetingId,"邮件推送重试失败: 主持人对应关系未配置。meetingId "+meetingId);
} // continue;
// }
EmailPush emailPushBuilder = EmailPush.builder() EmailPush emailPushBuilder = EmailPush.builder()
.toEmail(meetingInfo.getEmail()) .toEmail(meetingInfo.getEmail())
.meetingId(meetingId) .meetingId(meetingId)
...@@ -494,19 +496,20 @@ public class FileProcessTask { ...@@ -494,19 +496,20 @@ public class FileProcessTask {
otherParams.put("userId", toUserCode); otherParams.put("userId", toUserCode);
otherParams.put("tenantId", permTenantId); otherParams.put("tenantId", permTenantId);
otherParams.put("layout", "V1"); otherParams.put("layout", "V1");
String responseData = HttpClientKnowledgePlatformUtil.sendPostByFormDataFiles(userAdminConfig.getDocDomain() + KnowledgePlatformRouteConstant.DOC.SIMPLE_DOC_UPLOAD_URL, Arrays.asList(multipartFile), otherParams, null); //todo 压测注释
if (StringUtils.isNotBlank(responseData)) { // String responseData = HttpClientKnowledgePlatformUtil.sendPostByFormDataFiles(userAdminConfig.getDocDomain() + KnowledgePlatformRouteConstant.DOC.SIMPLE_DOC_UPLOAD_URL, Arrays.asList(multipartFile), otherParams, null);
R result = JSON.parseObject(responseData, R.class); // if (StringUtils.isNotBlank(responseData)) {
List<DocResultDto> docResultDtoList = JSON.parseObject(JSONObject.toJSONString(result.getData()), new TypeReference<List<DocResultDto>>() { // R result = JSON.parseObject(responseData, R.class);
}); // List<DocResultDto> docResultDtoList = JSON.parseObject(JSONObject.toJSONString(result.getData()), new TypeReference<List<DocResultDto>>() {
DocResultDto docResultDto = docResultDtoList.get(0); // });
String previewPath = docResultDto.getPreviewPath(); // DocResultDto docResultDto = docResultDtoList.get(0);
recordContentPath = previewPath.replaceAll(fileDownloadPath,""); // String previewPath = docResultDto.getPreviewPath();
meetingInfo.setTransDocId(docResultDto.getId()); // recordContentPath = previewPath.replaceAll(fileDownloadPath,"");
}else{ // meetingInfo.setTransDocId(docResultDto.getId());
processLogService.log(meetingId,subMeetingId,"填充会议纪要失败,上传转录文件到向量知识库失败"); // }else{
throw new RuntimeException("填充会议纪要失败"); // processLogService.log(meetingId,subMeetingId,"填充会议纪要失败,上传转录文件到向量知识库失败");
} // throw new RuntimeException("填充会议纪要失败");
// }
//去除内容中除了xml内容以外其他的信息,格式化xml //去除内容中除了xml内容以外其他的信息,格式化xml
String xml = extractXmlFromMarkdown(content); String xml = extractXmlFromMarkdown(content);
...@@ -548,14 +551,15 @@ public class FileProcessTask { ...@@ -548,14 +551,15 @@ public class FileProcessTask {
new LambdaUpdateWrapper<MeetingInfo>() new LambdaUpdateWrapper<MeetingInfo>()
.eq(MeetingInfo::getMeetingId,meetingId) .eq(MeetingInfo::getMeetingId,meetingId)
.eq(subMeetingId != null,MeetingInfo::getSubMeetingId,subMeetingId) .eq(subMeetingId != null,MeetingInfo::getSubMeetingId,subMeetingId)
.set(MeetingInfo::getRecordContent,recordContentPath) // .set(MeetingInfo::getRecordContent,recordContentPath)
.set(MeetingInfo::getRecordXml,recordXmlPath) .set(MeetingInfo::getRecordXml,recordXmlPath)
.set(MeetingInfo::getParticipantUsers,meetingInfo.getParticipantUsers()) .set(MeetingInfo::getParticipantUsers,meetingInfo.getParticipantUsers())
.set(MeetingInfo::getIsGenerated,Boolean.TRUE) .set(MeetingInfo::getIsGenerated,Boolean.TRUE)
.set(MeetingInfo::getTemplateId,meetingRecordTemplate.getId()) .set(MeetingInfo::getTemplateId,meetingRecordTemplate.getId())
.set(MeetingInfo::getTransDocId,meetingInfo.getTransDocId()) .set(MeetingInfo::getTransDocId,meetingInfo.getTransDocId())
); );
meetingInfo.setRecordContent(recordContentPath); //todo 压测注释
// meetingInfo.setRecordContent(recordContentPath);
meetingInfo.setRecordXml(recordXmlPath); meetingInfo.setRecordXml(recordXmlPath);
return savePath + meetingMinutesFileName + ".docx"; return savePath + meetingMinutesFileName + ".docx";
} }
......
...@@ -4,6 +4,9 @@ import com.cmeeting.job.FileProcessTask; ...@@ -4,6 +4,9 @@ import com.cmeeting.job.FileProcessTask;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
@Service @Service
@Slf4j @Slf4j
public class FileProcessCallbackHandler { public class FileProcessCallbackHandler {
...@@ -20,6 +23,8 @@ public class FileProcessCallbackHandler { ...@@ -20,6 +23,8 @@ public class FileProcessCallbackHandler {
// 所有任务完成回调 // 所有任务完成回调
public void onAllComplete() { public void onAllComplete() {
log.info("所有文件处理任务已完成"); log.info("所有文件处理任务已完成");
log.info("任务完成时间: {}", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
log.info("----------------------------------");
// 可以发送全局通知等 // 可以发送全局通知等
} }
} }
\ No newline at end of file
...@@ -370,6 +370,9 @@ public class TencentMeetingServiceImpl extends ServiceImpl<TecentMeetingMapper,T ...@@ -370,6 +370,9 @@ public class TencentMeetingServiceImpl extends ServiceImpl<TecentMeetingMapper,T
try { try {
String userid = meeting.getUserid(); String userid = meeting.getUserid();
log.info("【周期会议扫描】:查询用户的已结束会议列表...meetingCode->{},userId->{}",meeting.getMeetingCode(), userid); log.info("【周期会议扫描】:查询用户的已结束会议列表...meetingCode->{},userId->{}",meeting.getMeetingCode(), userid);
if(StringUtils.isEmpty(userid)){
continue;
}
//获取子会议id //获取子会议id
MeetingsApi.ApiV1HistoryMeetingsUseridGetRequest historyMeetingRequest = MeetingsApi.ApiV1HistoryMeetingsUseridGetRequest historyMeetingRequest =
new MeetingsApi.ApiV1HistoryMeetingsUseridGetRequest.Builder(userid) new MeetingsApi.ApiV1HistoryMeetingsUseridGetRequest.Builder(userid)
......
package controller; package com.cmeeting.stressTest.controller;
import org.springframework.web.bind.annotation.PostMapping; import com.cmeeting.stressTest.service.StressTestService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import service.StressTestService;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -14,8 +15,8 @@ public class StressTestController { ...@@ -14,8 +15,8 @@ public class StressTestController {
@Resource @Resource
private StressTestService service; private StressTestService service;
@PostMapping("/minutesGenerate") @GetMapping("/minutesGenerate")
public void minutesGenerate(){ public void minutesGenerate(@RequestParam(value = "stressPage") Integer stressPage){
service.minutesGenerate(); service.minutesGenerate(stressPage);
} }
} }
package service; package com.cmeeting.stressTest.service;
public interface StressTestService { public interface StressTestService {
void minutesGenerate(); void minutesGenerate(Integer stressPage);
} }
package service; package com.cmeeting.stressTest.service;
import com.cmeeting.dto.UserDTO;
import com.cmeeting.mapper.primary.MeetingInfoMapper; import com.cmeeting.mapper.primary.MeetingInfoMapper;
import com.cmeeting.mapper.primary.TecentMeetingMapper; import com.cmeeting.mapper.primary.TecentMeetingMapper;
import com.cmeeting.mapper.primary.UserIdMapper; import com.cmeeting.mapper.primary.UserIdMapper;
...@@ -8,8 +7,6 @@ import com.cmeeting.pojo.MeetingInfo; ...@@ -8,8 +7,6 @@ import com.cmeeting.pojo.MeetingInfo;
import com.cmeeting.pojo.TencentMeetingUser; import com.cmeeting.pojo.TencentMeetingUser;
import com.cmeeting.pojo.UserId; import com.cmeeting.pojo.UserId;
import com.cmeeting.service.FileProcessProducer; import com.cmeeting.service.FileProcessProducer;
import com.cmeeting.service.MeetingRecordTemplateService;
import com.cmeeting.service.TencentMeetingService;
import com.cmeeting.util.SignatureUtil; import com.cmeeting.util.SignatureUtil;
import com.cmeeting.vo.CorpRecordsVO; import com.cmeeting.vo.CorpRecordsVO;
import com.cmeeting.vo.TencentMeetingVO; import com.cmeeting.vo.TencentMeetingVO;
...@@ -39,6 +36,7 @@ import java.time.Instant; ...@@ -39,6 +36,7 @@ import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -50,11 +48,7 @@ public class StressTestServiceImpl implements StressTestService { ...@@ -50,11 +48,7 @@ public class StressTestServiceImpl implements StressTestService {
@Resource @Resource
private UserIdMapper userIdMapper; private UserIdMapper userIdMapper;
@Autowired @Autowired
private TencentMeetingService tencentMeetingService;
@Autowired
private FileProcessProducer producer; private FileProcessProducer producer;
@Autowired
private MeetingRecordTemplateService meetingRecordTemplateService;
@Value(value = "${tencent.appId}") @Value(value = "${tencent.appId}")
private String tencentAppId; private String tencentAppId;
@Value(value = "${tencent.sdkId}") @Value(value = "${tencent.sdkId}")
...@@ -75,19 +69,22 @@ public class StressTestServiceImpl implements StressTestService { ...@@ -75,19 +69,22 @@ public class StressTestServiceImpl implements StressTestService {
private TecentMeetingMapper tecentMeetingMapper; private TecentMeetingMapper tecentMeetingMapper;
@Override @Override
public void minutesGenerate() { public void minutesGenerate(Integer stressPage) {
//查出企微id和腾会id的关联关系 //查出企微id和腾会id的关联关系
List<UserId> userIdRelations = userIdMapper.selectList(null); List<UserId> userIdRelations = userIdMapper.selectList(null);
Map<String,String> widTidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getWid,UserId::getTid));
Map<String,String> tidWidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getTid,UserId::getWid)); Map<String,String> tidWidRelations = userIdRelations.stream().collect(Collectors.toMap(UserId::getTid,UserId::getWid));
// List<TencentMeetingVO.RecordFile> meetingFiles = tencentMeetingService.getMeetingFiles(50); //入参是页数,一页20条转录文件
// List<TencentMeetingVO.RecordFile> meetingFiles = getMeetingFiles(stressPage);
// // 提交处理任务
// producer.submitBatchTasks(meetingFiles,authorizedUsers,tidWidRelations,Boolean.FALSE); //日志记录
log.info("----------------------------------");
log.info("生成纪要服务压测开始时间: {},压测会议量:{}",LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),stressPage * 20);
// 提交处理任务
producer.submitBatchTasks(meetingFiles,new ArrayList<>(),tidWidRelations,Boolean.FALSE);
} }
public List<TencentMeetingVO.RecordFile> getMeetingFiles(List<UserDTO> accessUserIds) { public List<TencentMeetingVO.RecordFile> getMeetingFiles(Integer stressPage) {
Client client = new Client.Builder() Client client = new Client.Builder()
.withAppId(tencentAppId).withSdkId(tencentSdkId) .withAppId(tencentAppId).withSdkId(tencentSdkId)
.withSecret(tencentSecretId,tencentSecretKey) .withSecret(tencentSecretId,tencentSecretKey)
...@@ -104,8 +101,7 @@ public class StressTestServiceImpl implements StressTestService { ...@@ -104,8 +101,7 @@ public class StressTestServiceImpl implements StressTestService {
AtomicInteger currentPage = new AtomicInteger(1); AtomicInteger currentPage = new AtomicInteger(1);
//获取总页数 //获取总页数
CorpRecordsVO firstData = fetchMeetingRecords(tencentAdminUserId, 1, startTime, endTime, 1, 20); Integer totalPage = stressPage;
Integer totalPage = firstData.getTotalPage();
//目前已存储的会议id //目前已存储的会议id
List<TencentMeetingVO.SimpleMeetingInfo> meetingIds = meetingInfoMapper.getAllMeetingIds(); List<TencentMeetingVO.SimpleMeetingInfo> meetingIds = meetingInfoMapper.getAllMeetingIds();
...@@ -138,6 +134,9 @@ public class StressTestServiceImpl implements StressTestService { ...@@ -138,6 +134,9 @@ public class StressTestServiceImpl implements StressTestService {
try { try {
String userid = meeting.getUserid(); String userid = meeting.getUserid();
log.info("【周期会议扫描】:查询用户的已结束会议列表...meetingCode->{},userId->{}",meeting.getMeetingCode(), userid); log.info("【周期会议扫描】:查询用户的已结束会议列表...meetingCode->{},userId->{}",meeting.getMeetingCode(), userid);
if(StringUtils.isEmpty(userid)){
continue;
}
//获取子会议id //获取子会议id
MeetingsApi.ApiV1HistoryMeetingsUseridGetRequest historyMeetingRequest = MeetingsApi.ApiV1HistoryMeetingsUseridGetRequest historyMeetingRequest =
new MeetingsApi.ApiV1HistoryMeetingsUseridGetRequest.Builder(userid) new MeetingsApi.ApiV1HistoryMeetingsUseridGetRequest.Builder(userid)
...@@ -239,22 +238,6 @@ public class StressTestServiceImpl implements StressTestService { ...@@ -239,22 +238,6 @@ public class StressTestServiceImpl implements StressTestService {
} }
} }
String email;
//判断是否有权限生成纪要
boolean generateAccess = accessUserIds.stream().anyMatch(item -> item.getTid().equals(hostId));
if(!generateAccess){
log.error("【权限校验】主持人{}没有生成纪要权限,跳过生成",hostId);
// processLogService.log(meeting.getMeetingId(),subMeetingId,"【权限校验】主持人"+hostId+"没有生成纪要权限,跳过生成");
continue;
}
log.info("【权限校验】主持人{}允许生成纪要",hostId);
// processLogService.log(meeting.getMeetingId(),subMeetingId,"【权限校验】主持人"+hostId+"允许生成纪要");
UserDTO userDTO = accessUserIds.stream().filter(item -> item.getTid().equals(hostId)).findFirst().get();
email = userDTO.getEmail();
//会议基本信息保存 //会议基本信息保存
MeetingInfo meetingItem = MeetingInfo.builder().meetingId(meetingId).meetingCode(meeting.getMeetingCode()) MeetingInfo meetingItem = MeetingInfo.builder().meetingId(meetingId).meetingCode(meeting.getMeetingCode())
...@@ -268,7 +251,6 @@ public class StressTestServiceImpl implements StressTestService { ...@@ -268,7 +251,6 @@ public class StressTestServiceImpl implements StressTestService {
// .participantUsers(participants.stream() // .participantUsers(participants.stream()
// .map(item->new String(Base64.getDecoder().decode(item.getUserName()))).distinct().collect(Collectors.joining("、"))) // .map(item->new String(Base64.getDecoder().decode(item.getUserName()))).distinct().collect(Collectors.joining("、")))
.recordFileId(recordFileIdList.stream().collect(Collectors.joining(","))) .recordFileId(recordFileIdList.stream().collect(Collectors.joining(",")))
.email(email)
.build(); .build();
recordFileUrlList.add(recordFileItem); recordFileUrlList.add(recordFileItem);
meetingSaveList.add(meetingItem); meetingSaveList.add(meetingItem);
......
package com.cmeeting.config; package com.cmeeting.thread.config;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
......
package com.cmeeting.thread.monitor;
import com.cmeeting.util.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/threadPool")
public class ThreadPoolController {
private final ThreadPoolMonitorService monitorService;
@Autowired
public ThreadPoolController(ThreadPoolMonitorService monitorService) {
this.monitorService = monitorService;
}
// 获取所有线程池状态(中文键名)
@GetMapping("/status")
public R getAllStatusInChinese() {
Map<String, ThreadPoolStatus> statusMap = new HashMap<>();
monitorService.getAllExecutors().forEach((name, executor) -> {
ThreadPoolStatus status = ThreadPoolStatus.fromExecutor(executor, name);
if (status != null) {
statusMap.put(name, status);
}
});
return R.ok(statusMap);
}
// 获取指定线程池状态(中文键名)
@GetMapping("/status/{executorName}")
public R getStatusInChinese(@PathVariable String executorName) {
ThreadPoolTaskExecutor executor = monitorService.getExecutor(executorName);
if (executor == null) {
return R.error();
}
ThreadPoolStatus status = ThreadPoolStatus.fromExecutor(executor, executorName);
return R.ok(status);
}
}
package com.cmeeting.thread.monitor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
@Service
@Slf4j
public class ThreadPoolMonitorService {
private final Map<String, ThreadPoolTaskExecutor> executors = new HashMap<>();
@Autowired
public ThreadPoolMonitorService(ThreadPoolTaskExecutor fileProcessExecutor) {
executors.put("fileProcessExecutor", fileProcessExecutor);
}
// 获取所有线程池
public Map<String, ThreadPoolTaskExecutor> getAllExecutors() {
return executors;
}
// 获取指定线程池
public ThreadPoolTaskExecutor getExecutor(String executorName) {
return executors.get(executorName);
}
}
package com.cmeeting.thread.monitor;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.text.DecimalFormat;
@Data
@AllArgsConstructor
@JsonSerialize(using = ThreadPoolStatusInfoSerializer.class)
public class ThreadPoolStatus {
private String poolName;
private Integer corePoolSize;
private Integer maxPoolSize;
private Integer currentPoolSize;
private Integer activeThreadCount;
private Integer queueSize;
private Integer queueCapacity;
private Long totalTasks;
private Long completedTasks;
private Long pendingTasks;
private String threadUtilization;
private String queueUtilization;
private String runningStatus;
private String healthStatus;
// 从原始ThreadPoolTaskExecutor状态构建
public static ThreadPoolStatus fromExecutor(
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor executor,
String poolName) {
java.util.concurrent.ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor();
if (threadPoolExecutor == null) {
return null;
}
DecimalFormat df = new DecimalFormat("0.00%");
String runningStatus = threadPoolExecutor.isShutdown() ?
(threadPoolExecutor.isTerminated() ? "已终止" : "关闭中") : "运行中";
int queueCapacity = 1000; // 从配置中获取或硬编码
double queueUtilization = (double) threadPoolExecutor.getQueue().size() / queueCapacity;
String healthStatus = threadPoolExecutor.getActiveCount() < executor.getMaxPoolSize() * 0.8 &&
queueUtilization < 0.7 ? "健康" : "需要关注";
return new ThreadPoolStatus(
poolName,
executor.getCorePoolSize(),
executor.getMaxPoolSize(),
threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size(),
queueCapacity,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getTaskCount() - threadPoolExecutor.getCompletedTaskCount(),
df.format((double) threadPoolExecutor.getActiveCount() / executor.getMaxPoolSize()),
df.format(queueUtilization),
runningStatus,
healthStatus
);
}
}
package com.cmeeting.thread.monitor;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class ThreadPoolStatusInfoSerializer extends StdSerializer<ThreadPoolStatus> {
private static final Map<String, String> FIELD_MAPPING = new HashMap<>();
static {
FIELD_MAPPING.put("poolName", "线程池名称");
FIELD_MAPPING.put("corePoolSize", "核心线程数");
FIELD_MAPPING.put("maxPoolSize", "最大线程数");
FIELD_MAPPING.put("currentPoolSize", "当前线程数");
FIELD_MAPPING.put("activeThreadCount", "活跃线程数");
FIELD_MAPPING.put("queueSize", "队列任务数");
FIELD_MAPPING.put("queueCapacity", "队列容量");
FIELD_MAPPING.put("totalTasks", "总任务数");
FIELD_MAPPING.put("completedTasks", "已完成任务数");
FIELD_MAPPING.put("pendingTasks", "等待任务数");
FIELD_MAPPING.put("threadUtilization", "线程利用率");
FIELD_MAPPING.put("queueUtilization", "队列利用率");
FIELD_MAPPING.put("runningStatus", "运行状态");
FIELD_MAPPING.put("healthStatus", "健康状态");
}
public ThreadPoolStatusInfoSerializer() {
this(null);
}
protected ThreadPoolStatusInfoSerializer(Class<ThreadPoolStatus> t) {
super(t);
}
@Override
public void serialize(ThreadPoolStatus value, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeStartObject();
gen.writeStringField(FIELD_MAPPING.get("poolName"), value.getPoolName());
gen.writeNumberField(FIELD_MAPPING.get("corePoolSize"), value.getCorePoolSize());
gen.writeNumberField(FIELD_MAPPING.get("maxPoolSize"), value.getMaxPoolSize());
gen.writeNumberField(FIELD_MAPPING.get("currentPoolSize"), value.getCurrentPoolSize());
gen.writeNumberField(FIELD_MAPPING.get("activeThreadCount"), value.getActiveThreadCount());
gen.writeNumberField(FIELD_MAPPING.get("queueSize"), value.getQueueSize());
gen.writeNumberField(FIELD_MAPPING.get("queueCapacity"), value.getQueueCapacity());
gen.writeNumberField(FIELD_MAPPING.get("totalTasks"), value.getTotalTasks());
gen.writeNumberField(FIELD_MAPPING.get("completedTasks"), value.getCompletedTasks());
gen.writeNumberField(FIELD_MAPPING.get("pendingTasks"), value.getPendingTasks());
gen.writeStringField(FIELD_MAPPING.get("threadUtilization"), value.getThreadUtilization());
gen.writeStringField(FIELD_MAPPING.get("queueUtilization"), value.getQueueUtilization());
gen.writeStringField(FIELD_MAPPING.get("runningStatus"), value.getRunningStatus());
gen.writeStringField(FIELD_MAPPING.get("healthStatus"), value.getHealthStatus());
gen.writeEndObject();
}
}
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论