提交 37b0eea6 作者: 吕轶伟

lyw:重要故障excel文件处理

父级 22229d3a
No preview for this file type
import pandas as pd
from openpyxl import load_workbook
from openpyxl.drawing.image import Image as OpenpyxlImage
from xlrd import open_workbook
from datetime import datetime, timedelta
#from ..tools.ClaudeAPI import claude37_respond
import sys
import os
# 获取当前文件的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 获取项目根目录的路径
project_dir = os.path.dirname(current_dir)
# 将项目根目录添加到sys.path
sys.path.append(project_dir)
from tools.ClaudeAPI import claude37_img_respond,claude3_respond,claude37_respond_stream
from tools.fileUtil import extract_filename
ocrPrompt='你是个excel的图片信息抽取助手,你需要将图片的表格信息抽取出来,图片可能包含多个单元格,尽量保持表格原格式'
#excel内容转文字
def excel_to_text_with_merged_cells(file_path, sheet_name=None, output_file=None,image_folder=None):
"""
读取Excel文件并输出包含合并单元格信息的文本
参数:
file_path: Excel文件路径
sheet_name: 指定工作表名称(可选)
output_file: 输出文件路径(可选,不指定则打印到控制台)
"""
output_lines = []
file_name = extract_filename(file_path)
# 使用openpyxl加载工作簿获取合并单元格信息
if file_path.lower().endswith('.xlsx'):
wb = load_workbook(filename=file_path)
# 获取所有工作表或指定工作表
sheets = [wb[sheet_name]] if sheet_name else wb.worksheets
for ws in sheets:
#添加sheet页名称
if 'sheet' not in ws.title :
output_lines.append(f"sheet页名称:{ws.title}"+"\n")
merged_ranges = ws.merged_cells.ranges
# 准备记录图片信息的列表
image_output = []
# 遍历工作表中的所有图片
for idx, image in enumerate(ws._images, 1):
if isinstance(image, OpenpyxlImage):
# 获取图片位置信息
anchor = image.anchor
if hasattr(anchor, '_from'):
# 获取图片所在的单元格行列号
col = anchor._from.col + 1 # 转换为1-based列号
row = anchor._from.row + 1 # 转换为1-based行号
cell_ref = f"{chr(64 + col)}{row}" # 转换为A1格式
else:
col, row, cell_ref = None, None, "未知位置"
# 生成唯一文件名
img_name = f"{file_name}_img{idx}_{cell_ref}.{image.format.lower()}"
img_path = os.path.join(image_folder, img_name)
# 保存图片到本地
with open(img_path, 'wb') as img_file:
img_file.write(image._data())
#图片ocr
ocrResult = claude37_img_respond(img_path,ocrPrompt)
image_output.append(f'Row {row}, Column {col}: {ocrResult}' if ocrResult is not None else '')
# # 记录图片信息
# image_records.append({
# 'worksheet': sheet_name,
# 'cell_reference': cell_ref,
# 'row': row,
# 'column': col,
# 'image_path': img_path,
# 'image_size': (image.width, image.height)
# })
#处理单元格文字信息
rowId =1
for row in ws.iter_rows():
row_output = []
cellId =1
for cell in row:
# 检查单元格是否是合并单元格的一部分
cell_value = None
for merged_range in merged_ranges:
if cell.coordinate in merged_range:
# 如果是合并单元格的一部分,获取合并单元格的值
cell_value = ws.cell(merged_range.min_row, merged_range.min_col).value
break
if cell_value is None:
# 如果不是合并单元格的一部分,直接获取单元格的值
cell_value = cell.value
row_output.append(f'Row {rowId}, Column {cellId}: {str(cell_value)}' if cell_value is not None else '')
cellId+=1
# 将行数据添加到文本输出中
output_lines.append('\t'.join(row_output))
rowId+=1
output_lines.append('\n'.join(image_output))
# 输出结果
result = "文件名:"+file_name+("\n".join(output_lines))
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(result)
print(f"结果已保存到: {output_file}")
else:
print(result)
else:
wb = open_workbook(filename=file_path)
for sheet_index in range(wb.nsheets):
ws = wb.sheet_by_index(sheet_index)
# 获取合并单元格的范围
merged_ranges = ws.merged_cells
#添加sheet页名称
if 'sheet' not in wb.sheet_names()[sheet_index] :
output_lines.append(f"sheet页名称: {wb.sheet_names()[sheet_index]}"+"\n")
# 遍历所有行和列
for row_idx in range(ws.nrows):
for col_idx in range(ws.ncols):
# 检查当前单元格是否在合并单元格范围内
cell_value = ''
for merged_range in merged_ranges:
if row_idx >= merged_range[0] and row_idx < merged_range[1] and col_idx >= merged_range[2] and col_idx < merged_range[3]:
# 获取合并单元格的值
cell_value = ws.cell_value(merged_range[0], merged_range[2])
break
if not cell_value:
# 获取普通单元格的值
cell_value = ws.cell_value(row_idx, col_idx)
# 添加单元格值到文本输出列表
#空白单元格不要
if cell_value:
#print(cell_value)
is_less_than_one = False
time_str=''
try:
# 使用float转换文本到数字
numeric_value = float(cell_value)
is_less_than_one = numeric_value <= 1
time_delta = timedelta(days=numeric_value)
# 获取时间部分
time_part = (datetime.min + time_delta).time()
# 格式化时间字符串
time_str = time_part.strftime("%H:%M:%S")
except ValueError:
# 转换失败,不是数字
is_less_than_one = False
if is_less_than_one:
output_lines.append(f'Row {row_idx+1}, Column {col_idx+1}: {time_str}')
else:
output_lines.append(f'Row {row_idx+1}, Column {col_idx+1}: {cell_value}')
# 添加换行符以分隔行
output_lines.append('')
# 输出结果
result = "\n".join(output_lines)
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(result)
print(f"结果已保存到: {output_file}")
else:
print(result)
return result
#大模型处理excel内容为markdown
def excel_text_to_md(outPutPath, excel_text):
prompt=(
r'你是一个处理excel内容的助手,需要将人工处理之后的表格内容处理成我要求的markdown格式,表格内容案例如下:'
r'Row 1, Column 2: 长时间故障修理记录表'
r'Row 2, Column 2: 设备名称:打砂升降平台'
r'其中Row代表行号,Column代表列号'
r'需要处理的表格内容如下:'
f':{excel_text}'
r'mardown输出必须注意标题格式跟样例一样,如果抽取不到信息markdown标题不要输出,一级标题是文档名称,二级标题是sheet页名称,markdown样例如下:'
"""
# 5重要故障报告书2sheet-重要故障报告书(打砂升降平台)
## 长时间故障修理记录表
### 设备名称: 打砂升降平台
### 设备编号:B481-09-05-04
### 2024年5月20日
### 故障状况:5#喷砂房3#提升电机故障
### 修理履历:
| 时间 | 推断原因、处理内容及结果 |
| ----- | ---------------------------------------------------------------------------------------------------------------------------------- |
| 10:37 | 维修工张桂建接到电话,5#喷砂房打砂平台不下降 |
| 11:07 | 恢复生产。 |
### 修理人:张桂建 周斌
### 初始状况记录:
### 故障原因判定:3#提升电机故障
### 对策及再发防止建议:加强设备点检,做好设备定期保全
### 工程师评价:
### 维修班长评价
## 重要故障报告书
### 2024 年 05月20日
### 设备编号:B481-09-05-04
### 设备名称:打砂升降平台
### 生产工段:
### 故障发生时间:10:37
### 修理完成时间:11:07
### 设备停止时间: 30分钟
### 故障停止时间:
### 修理工时:30分钟
### 故障状况:5#喷砂房打砂平台不下降,3#提升电机不动作
### 原因分析:3#提升电机故障
### 故障现象:
### 处置:先松开3#电机刹车,将平台放下,暂停设备使用,恢复生产,待空闲时间进行电机更换
### 设备修理经过:
| 项目 |时刻|
| --------------- | -- |
### 故障现象:
1.磨损
2.松弛
### 故障原因:
1.不具备基本条件
### 再发防止对策
| 序号 | 对策项目 | 实施预定 | 完成时间 | 责任部门 |
| ---- | -------------------------- | -------- | -------- | -------------- |
### 迅速修理对策
| 序号 | 对策项目 | 实施预定 | 完成时间 | 责任部门 |
| ---- | ------------------------------------------------------------------------------------------ | -------- | -------- | -------------- |
###效果确认:
###今后推进方向:
###保存要求:
## 再、多发故障防止对策表
### 表码:
### 设备编号:
### 设备名称:
### 工序名:
### 使用班组:
### 加工品名:
### 开始年月日:
### 完成预定日:
### 完成日:
### 故障内容:
### 日期:
### 处置内容(对策内容):
### 故障问题现状:
### 要因和对策立案及对策实施内容(图示):
### 保存要求:
## 长时间故障修理记录表
### 表码:
### 设备使用部门:
### 台线名称:
### 设备名称:
### 设备编号:
### 设备类别:
### 故障状况(含电脑显示器报警内容、报警故障代码等):
### 故障发生时间:
### 修理完成时间:
### 故障停止时间:
### 修理履历:
| 时间 | 推断原因、处理内容及结果 | 修理人 | 见图 数值记录 |
| ----- | -------------------------------------------------------------------------------------------------------------------------------- | ----- | ----------- |
| 2024-07-22 12:25:00 | 维修工张桂建接到电话,5#喷砂房打砂平台不下降 | 贾瑞章 | |
### 故障原因判断综述:
### 对策及再发防止:
### 审阅者签字:
### 保存要求:
"""
)
full_response = ""
for text_chunk in claude37_respond_stream(prompt):
print(text_chunk, end="", flush=True) # 实时打印文本块
full_response += text_chunk
with open(outPutPath, 'w', encoding='utf-8') as f:
f.write(full_response)
print(f"结果已保存到: {outPutPath}")
# 使用示例
# 使用函数
#excel_path = r'E:\工作文件\2025-06\需处理文件清单及计划表\5重要故障报告书\5重要故障报告书2sheet-重要故障报告书(打砂升降平台).xls' # 可以是 .xls 或 .xlsx 文件
#file_name=r'\B线-中漆自动喷-小车轮子磨损更换-袁伟-2025.5.18'
folder_path=r'E:\工作文件\2025-06\OneDrive_1_2025-6-11\\'
#excel_path=image_folder+file_name+'.xls'
#outPutPath =image_folder+file_name+'.md'
pdf_path = 'output.pdf'
#excel_text_to_md(outPutPath, excel_to_text_with_merged_cells(excel_path, output_file='output.txt', image_folder=image_folder))
#遍历本地文件夹转成md文件
def processExcelFolder():
for item in os.listdir(folder_path):
full_path = os.path.join(folder_path, item)
fileName = extract_filename(full_path)
if os.path.isfile(full_path) & (full_path.endswith("xlsx") | full_path.endswith("xls")):
print(folder_path+fileName+'.md')
excel_text_to_md(folder_path+fileName+'.md', excel_to_text_with_merged_cells(full_path, image_folder=folder_path))
processExcelFolder()
\ No newline at end of file
import json
import requests
from io import BytesIO
from PIL import Image
import base64
def image_to_base64(image):
# 输入为PIL读取的图片,输出为base64格式
byte_data = BytesIO()# 创建一个字节流管道
image.save(byte_data, format="JPEG")# 将图片数据存入字节流管道
byte_data = byte_data.getvalue()# 从字节流管道中获取二进制
base64_str = base64.b64encode(byte_data).decode("ascii")# 二进制转base64
return base64_str
def base64_to_image(base64_str):
# 输入为base64格式字符串,输出为PIL格式图片
byte_data = base64.b64decode(base64_str) # base64转二进制
image = Image.open(BytesIO(byte_data)) # 将二进制转为PIL格式图片
return image
def claude3_respond(prompt):
url = "https://bedrock.chatbot.cn/llm/sse-invoke"
data = {
"model": "anthropic.claude-3-5-sonnet-20240620-v1:0",
"stream": False,
"messages": [{"role": "user", "content": prompt}]
}
print(data)
res = requests.post(url=url, data=json.dumps(data), headers={
"Authorization": "Bearer AKIAXFAXF62IWJXGLVEE.LnKInaahcMZG9zLsGMH3nTLOw3S3lK5Vcu0+ifnO",
"Content-Type": "application/json"})
ret = None
if res.status_code == 200:
try:
ret = res.json()['choices'][0]["message"]["content"]
except Exception as e:
print("")
else:
print(res)
return ret
def claude37_img_respond(img_path, prompt):
url = "https://bedrock.chatbot.cn/llm/sse-invoke"
with open(img_path, "rb") as f:
byte_data = f.read()
base64_str = base64.b64encode(byte_data).decode("ascii")
content = [{"source":{"type":"base64", "media_type":"image/png", "data":base64_str}, "type":"image"}, {"type":"text", "text":prompt}]
data = {
"model": "arn:aws:bedrock:us-east-1:730335234231:inference-profile/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
"stream": False,
"messages": [{"role": "user", "content": content}]
}
res = requests.post(url=url, data=json.dumps(data), headers={
"Authorization": "Bearer AKIA2UC27JC37RYNPXJW.k7eBf5lqD6zI5st6zBSGLts6dmB7d9FI7w21ILxK",
"Content-Type": "application/json"})
ret = ""
if res.status_code == 200:
try:
ret = res.json()['choices'][0]["message"]["content"]
except Exception as e:
print("")
return ret
def claude37_respond(prompt):
url = "https://bedrock.chatbot.cn/llm/sse-invoke"
data = {
"model": "arn:aws:bedrock:us-east-1:730335234231:inference-profile/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
"stream": False,
"messages": [{"role": "user", "content": prompt}]
}
res = requests.post(url=url, data=json.dumps(data), headers={
"Authorization": "Bearer AKIA2UC27JC37RYNPXJW.k7eBf5lqD6zI5st6zBSGLts6dmB7d9FI7w21ILxK",
"Content-Type": "application/json"})
ret = None
if res.status_code == 200:
try:
ret = res.json()['choices'][0]["message"]["content"]
except Exception as e:
print("")
else:
print(ret)
return ret
def claude37_respond_stream(prompt):
url = "https://bedrock.chatbot.cn/llm/sse-invoke"
data = {
"model": "arn:aws:bedrock:us-east-1:730335234231:inference-profile/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
"stream": True,
"messages": [{"role": "user", "content": prompt}]
}
with requests.post(url=url, data=json.dumps(data), headers={
"Authorization": "Bearer AKIA2UC27JC37RYNPXJW.k7eBf5lqD6zI5st6zBSGLts6dmB7d9FI7w21ILxK",
"Content-Type": "application/json"}) as response:
if response.status_code != 200:
yield f"Error: {response.status_code}"
return
# 手动处理SSE流
buffer = ""
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
if chunk:
buffer += chunk.decode('utf-8') if isinstance(chunk, bytes) else chunk
while '\n\n' in buffer:
line, buffer = buffer.split('\n\n', 1)
if line.startswith('data: '):
data = line[6:] # 去掉 'data: ' 前缀
try:
parsed = json.loads(data)
if parsed.get("choices") and len(parsed["choices"]) > 0:
delta = parsed["choices"][0].get("delta", {})
if "content" in delta and delta["content"]:
yield delta["content"]
except json.JSONDecodeError:
pass
import json
from time import sleep
import requests
def process(file_path):
url = "http://work.chatbot.cn:38890/chatbot/v2/layout/doc2md"
files = {"file": open(file_path, "rb")}
response = requests.request("POST", url, files=files)
retJson = json.loads(response.text)
taskid = retJson['data']['taskid']
sleep(1)
status_url ="http://work.chatbot.cn:38890/chatbot/taskstatus"
data = {"taskid": taskid}
headers = {"Content-Type": "application/json"}
response = requests.request("POST", status_url, data=json.dumps(data),headers=headers)
retJson = json.loads(response.text)
status = "PROCESSING"
while(not status == "DONE" and not status =="FAIL"):
sleep(2)
response = requests.request("POST", status_url, data=json.dumps(data), headers=headers)
retJson = json.loads(response.text)
status = retJson['data']['status']
return retJson
\ No newline at end of file
import re
import os
def extract_filename(filepath):
# 方法1:使用正则表达式
# 匹配最后一个斜杠或反斜杠后的内容,然后去掉后缀
filename_without_ext = re.sub(r'\.[^.]*$', '', re.search(r'[^\\/]+$', filepath).group())
return filename_without_ext
\ No newline at end of file
import threading
import time
from queue import Queue
from cn.chatbot.module.ClaudeAPI import claude37_img_respond
import cn.chatbot.test.ocr_img as OCR
class BatchOcr():
def __init__(self, data, thread_num=4):
self.dataset = data
self.thread_num = thread_num
self.seamp = threading.BoundedSemaphore(self.thread_num)
def worker(self, in_queue: Queue, out_queue: Queue):
while not in_queue.empty():
img_path = in_queue.get(timeout=10)
# print('in_queue_size', in_queue.qsize())
res = OCR.extract_img_data(img_path)
sample = dict()
sample["img_path"] = img_path
sample["res"] = res
out_queue.put(sample)
self.seamp.acquire()
def count_worker(self, in_queue: Queue, out_queue: Queue):
processed = 0
while not in_queue.empty():
if (out_queue.qsize() > processed):
processed = out_queue.qsize()
print("processed ->" + str(processed))
time.sleep(1)
def run(self):
in_queue = Queue()
for example in self.dataset:
in_queue.put(example)
print('load in_queue_size', in_queue.qsize())
out_queue = Queue()
# 开始请求,获取数据
t_list = []
for ind in range(self.thread_num):
t = threading.Thread(target=self.worker, args=(in_queue, out_queue), daemon=True)
t_list.append(t)
t.start()
t = threading.Thread(target=self.count_worker, args=(in_queue, out_queue), daemon=True)
t_list.append(t)
t.start()
for t in t_list:
t.join()
while self.seamp._value != 0:
time.sleep(1)
ret = dict()
while not out_queue.empty():
rr = out_queue.get()
ret[rr["img_path"]] = rr["res"]
return ret
import re
from cn.chatbot.module.TextInClient import TextInClient
import cn.chatbot.test.ocr_img as OCR
from cn.chatbot.test.BatchOcr import BatchOcr
def process_img_and_ocr(md:str):
# 定义正则表达式模式
pattern = r'(!\[.*?\]\((.*?)\))'
map = dict()
# 使用 re.findall 查找所有匹配项
matches = re.findall(pattern, md)
img_list = []
print("total imgs:" + str(len(matches)))
for match in matches:
orginal_img_path = match[0]
img_path = match[1]
img_list.append(img_path)
map[img_path] = orginal_img_path
batchOcr = BatchOcr(img_list)
res = batchOcr.run()
for r in res:
try:
md = md.replace(map[r], map[r] + "\n\n"+ res[r])
except:
print("replace err")
return md
textInClient = TextInClient()
import os
def read_path_file(path):
ret = []
# 递归遍历文件夹
for dirpath, dirnames, filenames in os.walk(path):
# 遍历所有文件
for filename in filenames:
full_path = os.path.join(dirpath, filename)
if full_path.endswith(".pdf"):
ret.append(full_path)
# 输出文件路径
return ret
in_path = "D:\\works\\layout\\data\\HK"
out_path = "D:\\works\\layout\\data\\HK_MD"
file_list = read_path_file(in_path)
for file in file_list:
print("processing ---> " + file)
md = textInClient.process_pdf(file)
md = process_img_and_ocr(md)
filename = os.path.basename(file)
filename = filename.replace(".pdf", ".md")
with open(os.path.join(out_path, filename), "w", encoding="utf-8") as f:
f.write(md)
import os
from urllib.request import urlretrieve
from cn.chatbot.module.ClaudeAPI import claude37_img_respond
def extract_img_data(img_path):
local_file_path = './tmp/' + os.path.basename(img_path)
# 下载并保存图片
urlretrieve(img_path, local_file_path)
prompt = ("你是一个图片识别机器人,基于我提供给你的图片,识别里面的数据,用表格数据,用markdown格式输出,无需其它解释.\n"
"\n要求:\n"
"(1)若图片中有图表,则用表格恢复完整的数据\n"
"(2)请恢复完整,全面的数据\n"
"(3)请你直接给出抽取到的结果即可,无需解释,无需解释说,你要开始抽取之类的,直接输出表格"
"(4)输出开始示例:|表头1|表头2|\n|----|----|\n|数据1|数据2|")
res = claude37_img_respond(local_file_path, prompt)
return res
import argparse
import os
import signal
import subprocess
import threading
from multiprocessing import Queue, SimpleQueue
from time import sleep
import flask
from flask import request, json
from flask import jsonify
from cn.chatbot.LayoutEngine import LayoutEngine
import requests
import logging
from cn.chatbot.module.RedisClient import RedisClient
from cn.chatbot.module.MinioClient import MinioClient
import cn.chatbot.module.tool as T
server = flask.Flask(__name__)
tmp_path = "./tmp"
in_queue = Queue()
root_path = os.path.dirname(__file__)
requests.exceptions.Timeout = 20
minio_client = None
redis_client = None
def get_file_suffix(filename:str):
file_suffix = None
filename = filename.lower()
if filename.endswith(".docx") or filename.endswith(".DOCX"):
file_suffix = ".docx"
elif filename.endswith(".doc") or filename.endswith(".DOC"):
file_suffix = ".doc"
elif filename.endswith(".xls") or filename.endswith(".XLS"):
file_suffix = ".xls"
elif filename.endswith(".xlsx") or filename.endswith(".XLSX"):
file_suffix = ".xlsx"
elif filename.endswith(".ppt") or filename.endswith(".PPT"):
file_suffix = ".ppt"
elif filename.endswith(".pptx") or filename.endswith(".PPTX"):
file_suffix = ".pptx"
elif filename.endswith(".pdf") or filename.endswith(".PDF"):
file_suffix = ".pdf"
elif filename.endswith(".txt") or filename.endswith(".TXT"):
file_suffix = ".txt"
elif filename.endswith(".md") or filename.endswith(".MD"):
file_suffix = ".md"
return file_suffix
# 执行系统命令
def kill_command(p):
os.killpg(os.getpgid(p.pid),signal.SIGTERM)
def execute_command(command,timeout):
# 使用subprocess.run来执行命令
ret = False
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, start_new_session=True)
# set timer to kill process
timer = threading.Timer(timeout, kill_command, [p])
try:
timer.start()
stdout, stderr = p.communicate()
return_code = p.returncode
if return_code == 0:
ret = True
else:
ret = False
except Exception as ex:
print(ex)
finally:
timer.cancel()
# 输出命令的执行结果
return ret
def cast_to_callback(task, taskStatus):
callback = task["callback"]
if callback:
headers = {'Content-Type': 'application/json'}
try:
logging.log(logging.INFO, "callback->" + callback)
logging.log(logging.INFO, "callback data -> " + str(taskStatus))
response = requests.post(callback, json=taskStatus, headers=headers)
except Exception as e:
logging.log(logging.ERROR, 'callback error')
def clean_up(task):
file_path = task["file"]
file_dir = os.path.dirname(file_path)
for filename in os.listdir(file_dir):
# 检查文件名是否匹配模式
if task["taskid"] in filename:
# 构建完整的文件路径
file_path = os.path.join(file_dir, filename)
# 删除匹配的文件
os.remove(file_path)
def process_pdf(layoutEngine, task):
pdf_file_path = file_path = task["file"]
md_file_path = file_path.replace("pdf", ".md")
org_filename = task["filename"].replace(".pdf", "")
success = layoutEngine.process_pdf(org_filename, pdf_file_path, md_file_path)
return success, md_file_path
def process_ppt(layoutEngine, task, minio_client):
success = False
file_path = task["file"]
file_suffix = get_file_suffix(file_path)
file_dir = os.path.dirname(file_path)
cmd = "soffice --convert-to pptx " + file_path + " --outdir " + file_dir
logging.log(logging.INFO,"exec cmd:\n " + cmd)
status = execute_command(cmd, 120)
md_file_path = file_path.replace(file_suffix, ".md")
org_filename = task["filename"].replace(file_suffix, "")
if status:
pptx_file_path = file_path.replace(file_suffix, ".pptx")
success = layoutEngine.process_pptx(org_filename, pptx_file_path, md_file_path, minio_client)
return success, md_file_path, None
else:
return success, None, None
def process_pptx(layoutEngine, task, minio_client):
success = False
file_path = task["file"]
file_suffix = get_file_suffix(file_path)
md_file_path = file_path.replace(file_suffix, ".md")
org_filename = task["filename"].replace(file_suffix, "")
success = layoutEngine.process_pptx(org_filename, file_path, md_file_path, minio_client)
return success, md_file_path, None
def process_docx(layoutEngine, task, minio_client):
file_path = task["file"]
file_suffix = get_file_suffix(file_path)
org_filename = task["filename"].replace(file_suffix, "")
md_file_path = file_path.replace(file_suffix, ".md")
success, md_file_path = layoutEngine.process_docx(org_filename, file_path, md_file_path, minio_client)
return success, md_file_path
def gen_pdf_file(task):
file_path = task["file"]
file_suffix = get_file_suffix(file_path)
file_dir = os.path.dirname(file_path)
cmd = "soffice --convert-to pdf " + '\'{}\''.format(file_path) + " --outdir " + '\'{}\''.format(file_dir)
logging.log(logging.INFO,"exec cmd:\n " + cmd)
pdf_file_path = file_path.replace(file_suffix, ".pdf")
status = execute_command(cmd, 120)
if status:
return pdf_file_path
else:
return None
def process_xlsx(layoutEngine, task):
file_path = task["file"]
file_suffix = get_file_suffix(file_path)
org_filename = task["filename"].replace(file_suffix, "")
md_file_path = file_path.replace(file_suffix, ".md")
success, md_file_path = layoutEngine.process_xlsx(org_filename, file_path, md_file_path)
return success, md_file_path
def process_xls(layoutEngine, task):
file_path = task["file"]
file_suffix = get_file_suffix(file_path)
file_dir = os.path.dirname(file_path)
org_filename = task["filename"].replace(file_suffix, "")
cmd = "soffice --convert-to xlsx " + '\'{}\''.format(file_path) + " --outdir " + '\'{}\''.format(file_dir)
logging.log(logging.INFO,"exec cmd:\n " + cmd)
status = execute_command(cmd, 120)
md_file_path = file_path.replace(file_suffix, ".md")
docx_file_path = file_path.replace(file_suffix, ".xlsx")
if status:
success, md_file_path = layoutEngine.process_xlsx(org_filename, docx_file_path, md_file_path)
return success, md_file_path
else:
return False, ""
#先调用 libreoffice 将 doc 转成 docx,然后用pandoc处理docx
def process_doc(layoutEngine, task, minio_client):
file_path = task["file"]
file_suffix = get_file_suffix(file_path)
file_dir = os.path.dirname(file_path)
org_filename = task["filename"].replace(file_suffix, "")
cmd = "soffice --convert-to docx " + '\'{}\''.format(file_path) + " --outdir " + '\'{}\''.format(file_dir)
logging.log(logging.INFO,"exec cmd:\n " + cmd)
status = execute_command(cmd, 120)
md_file_path = file_path.replace(file_suffix, ".md")
docx_file_path = file_path.replace(file_suffix, ".docx")
if status:
success, md_file_path = layoutEngine.process_docx(org_filename, docx_file_path, md_file_path, minio_client)
return success, md_file_path
else:
return False, ""
def worker(in_queue: Queue, layoutEngine:LayoutEngine):
while True:
if in_queue.empty():
sleep(1)
else:
try:
task = in_queue.get(timeout=1)
try:
if task:
taskid = task["taskid"]
taskStatus = {"taskid": taskid, "status": "PROCESSING", "progress": "50%"}
redis_client.put(taskid, json.dumps(taskStatus))
file_path = task["file"]
file_suffix = get_file_suffix(file_path)
pdf_file_path = None
success = False
md_file_path = ""
if file_suffix == ".doc":
success, md_file_path = process_doc(layoutEngine, task, minio_client)
pdf_file_path = gen_pdf_file(task)
if file_suffix == ".docx":
success, md_file_path = process_docx(layoutEngine, task, minio_client)
pdf_file_path = gen_pdf_file(task)
if file_suffix == ".xls":
success, md_file_path = process_xls(layoutEngine, task)
# pdf_file_path = gen_pdf_file(task)
if file_suffix == ".xlsx":
success, md_file_path = process_xlsx(layoutEngine, task)
# pdf_file_path = gen_pdf_file(task)
elif file_suffix == ".pdf":
success, md_file_path = process_pdf(layoutEngine, task)
pdf_file_path = file_path
elif file_suffix == ".ppt":
success, md_file_path, pdf_file_path = process_ppt(layoutEngine, task, minio_client)
elif file_suffix == ".pptx":
success, md_file_path, pdf_file_path = process_pptx(layoutEngine, task, minio_client)
if not success:
taskStatus = {"taskid": taskid, "status": "FAIL"}
logging.log(logging.INFO, "taskStatus --> " + json.dumps(taskStatus))
redis_client.put(taskid, json.dumps(taskStatus))
cast_to_callback(task, taskStatus)
clean_up(task)
sleep(10)
continue
else:
## upload file
pdf_minio_file_path = "none"
md_minio_file_path = "none"
if md_file_path:
md_minio_file_path = minio_client.upload_file(md_file_path)
if pdf_file_path:
pdf_minio_file_path = minio_client.upload_file(pdf_file_path)
data = {"pdf_file_path": pdf_minio_file_path, "markdown_file_path": md_minio_file_path,"md_file_path": md_minio_file_path,
"pdf_marked_path": "none"}
taskStatus = {"taskid": taskid, "status": "DONE", "filepath": data}
redis_client.put(taskid, json.dumps(taskStatus))
logging.log(logging.INFO, "taskStatus --> " + json.dumps(taskStatus))
cast_to_callback(task, taskStatus)
clean_up(task)
else:
sleep(1)
except Exception as e:
taskStatus = {"taskid": taskid, "status": "FAIL"}
redis_client.put(taskid, json.dumps(taskStatus))
cast_to_callback(task, taskStatus)
clean_up(task)
sleep(10)
print('worker错误:', e)
except Exception as e:
sleep(10)
def get_args_parser():
parser = argparse.ArgumentParser('set layout server', add_help=True)
parser.add_argument('--server_host', type=str, default="0.0.0.0",
help='server host, default: 0.0.0.0')
parser.add_argument('--server_port',type=int, required=True,
help='server port')
parser.add_argument('--worker_num', type=int, default=1,
help='server port')
parser.add_argument('--minio_endpoint', type=str,required=True,
help='minio endpoint, eg: 192.168.10.145:8000')
parser.add_argument('--file_url_prefix', type=str,
help='minio 文件相对地址(/bucket/time/xxxx.png)的前缀', default=None)
parser.add_argument('--bucket_name', type=str, default='layout',
help='bucket_name ')
parser.add_argument('--minio_username', type=str,default=None,
help='minio username, you should provid username/password or access_key/secret_key')
parser.add_argument('--minio_password', type=str,default=None,
help='minio password ')
parser.add_argument('--minio_access_key', type=str,default=None,
help='minio access_key, you should provid username/password or access_key/secret_key')
parser.add_argument('--minio_secret_key', type=str,default=None,
help='minio secret_key')
parser.add_argument('--minio_enable_secure', type=bool,default=False,
help='minio secure, default False')
parser.add_argument('--redis_host', type=str,required=True,
help='redis host url, eg: 192.168.10.1')
parser.add_argument('--redis_port', type=int,required=True,
help='redis port')
parser.add_argument('--redis_password', type=str,required=True,
help='redis password')
parser.add_argument('--redis_db', type=int,default=0,
help='redis db size')
## layout model
return parser
@server.route('/chatbot/layout/doc2md', methods=['post'])
def doc2md():
# ret = {'code': '0', "message": "成功", "data":{"taskId":"xxxx"}}
if 'file' not in request.files:
ret = {'code': -1, "message": "失败,未上传文件!"}
return jsonify(ret)
file = request.files['file']
if file.filename == '':
ret = {'code': -1, "message": "失败,未上传文件!"}
return jsonify(ret)
else:
taskid = T.generate_md5(file.filename)
file_suffix = get_file_suffix(file.filename)
if file_suffix == None:
ret = {'code': -1, "message": "失败,不支持的文件类型!"}
return jsonify(ret)
else:
newfilename = taskid + file_suffix
filepath = os.path.join(root_path, os.path.join(tmp_path, newfilename))
file.save(filepath)
callback = None
if "callback" in request.values:
callback = request.values["callback"]
orginal_filename = file.filename
if "filename" in request.values:
orginal_filename = request.values["filename"]
data = {"filename": orginal_filename, "file": filepath, "taskid": taskid, "callback": callback}
taskStatus = {"taskid": taskid, "status": "IN_QUEUE"}
redis_client.put(taskid, json.dumps(taskStatus))
in_queue.put(data)
#print(in_queue.qsize())
ret = {'code': 0, "message": "成功", "data": {"taskid": taskid}}
return jsonify(ret)
@server.route('/chatbot/layout/doc2mdstream', methods=['post'])
def doc2mdstream():
# ret = {'code': '0', "message": "成功", "data":{"taskId":"xxxx"}}
file_stream = request.stream
if 'filename' not in request.headers:
ret = {'code': -1, "message": "失败,未获取到文件名!"}
return jsonify(ret)
else:
taskid = T.generate_md5(request.headers.get('filename'))
file_suffix = get_file_suffix(request.headers.get('filename'))
if file_suffix == None:
ret = {'code': -1, "message": "失败,不支持的文件类型!"}
return jsonify(ret)
else:
newfilename = taskid + file_suffix
filepath = os.path.join(root_path, os.path.join(tmp_path, newfilename))
with open(filepath, "wb") as file:
file.save(filepath)
callback = None
if "callback" in request.values:
callback = request.values["callback"]
data = {"filename": newfilename, "file": filepath, "taskid": taskid, "callback": callback}
taskStatus = {"taskid": taskid, "status": "IN_QUEUE"}
redis_client.put(taskid, json.dumps(taskStatus))
in_queue.put(data)
#print(in_queue.qsize())
ret = {'code': 0, "message": "成功", "data": {"taskid": taskid}}
return jsonify(ret)
@server.route('/chatbot/layout/callback', methods=['post'])
def callback():
print(request.get_json())
return "ok"
@server.route('/chatbot/layout/taskstatus', methods=['post'])
def taskstatus():
taskid = request.get_json().get("taskid")
task_stauts = redis_client.get(taskid)
if task_stauts:
ret = {'code': 0, "message": "成功!", "data": json.loads(task_stauts)}
else:
ret = {'code': -1, "message": " "}
return jsonify(ret)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
parser = get_args_parser()
args = parser.parse_args()
minio_client = MinioClient(args.minio_endpoint,bucket_name=args.bucket_name, username=args.minio_username, password=args.minio_password,
access_key= args.minio_access_key, secret_key= args.minio_secret_key, secure=args.minio_enable_secure, url_prefix=args.file_url_prefix)
if not minio_client.connect():
logging.log(logging.INFO,"minio connect failed, please check configuration!")
exit()
else:
logging.log(logging.INFO,"minio connect success !")
redis_client = RedisClient(args.redis_host, password=args.redis_password, port=args.redis_port, db=args.redis_db)
if not redis_client.test_connect():
logging.log(logging.ERROR,"redis connect failed, please check configuration!")
exit()
else:
logging.log(logging.INFO,"redis connect success !")
layoutEngine = LayoutEngine()
for ind in range(args.worker_num):
t = threading.Thread(target=worker, args=(in_queue, layoutEngine),daemon=True)
t.start()
logging.log(logging.INFO,"created "+ str(args.worker_num) + " layout workers !")
server.run(debug=False, port=args.server_port, host=args.server_host)# 指定端口、host,0.0.0.0代表不管几个网卡,任何ip都可以访问
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论