要将代码修改为在处理单个 Excel 表格时使用多线程,同时确保翻译的顺序是从最长的文本到最短的文本,我们可以对 process_excel_file_with_pandas
函数进行调整。我们将使用 ThreadPoolExecutor
来处理每个单元格的翻译。以下是修改后的代码:
import os
import ujson
import tkinter as tk
from tkinter import filedialog, messagebox
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
from tqdm import tqdm
def load_and_sort_json(file_path):
"""加载 JSON 文件并按键的长度排序(最长的键排在前面)"""
with open(file_path, 'r', encoding='utf-8') as f:
data = ujson.load(f)
# 按键的长度从大到小排序
sorted_data = dict(sorted(data.items(), key=lambda item: len(item[0]), reverse=True))
return sorted_data
def chunk_sorted_data(sorted_data, chunk_size=1000):
"""将排序后的数据分块"""
chunks = []
chunk = {}
for key, value in sorted_data.items():
chunk[key] = value
if len(chunk) >= chunk_size:
chunks.append(chunk)
chunk = {}
# 添加剩余的块
if chunk:
chunks.append(chunk)
return chunks
def replace_with_dict(text, translations):
"""使用字典查找替换文本"""
for original, translated in translations.items():
text = text.replace(original, translated)
return text
def process_cell(cell_value, translations_chunks):
"""处理单个单元格的翻译"""
if pd.notna(cell_value): # 检查单元格是否为空
for chunk in translations_chunks:
cell_value = replace_with_dict(str(cell_value), chunk)
return cell_value
def process_excel_file_with_pandas(file_path, translations_chunks, progress_queue):
"""使用 pandas 处理单个 Excel 文件"""
try:
# 读取 Excel 文件的第二列
df = pd.read_excel(file_path, usecols=[1])
total_cells = len(df)
progress_queue.put(("sheet", "Sheet1", total_cells)) # 发送工作表进度信息
# 使用多线程处理每个单元格
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(process_cell, df.iloc[idx, 0], translations_chunks): idx for idx in range(total_cells)}
for future in tqdm(futures.keys(), desc="处理单元格", leave=False):
idx = futures[future]
df.iloc[idx, 0] = future.result() # 更新单元格值
progress_queue.put(("cell", 1)) # 发送单元格进度信息
# 保存修改后的 Excel 文件
df.to_excel(file_path, index=False, header=False)
progress_queue.put(("done",)) # 发送完成信息
return True
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
progress_queue.put(("error", str(e))) # 发送错误信息
return False
def process_folder_with_threads(folder_path, translations_chunks, progress_queue):
"""使用多线程处理文件夹中的所有 Excel 文件"""
success_count = 0
total_files = len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')])
if total_files == 0:
messagebox.showwarning("警告", "选择的文件夹中没有 Excel 文件(.xlsx)")
return
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for file_name in os.listdir(folder_path):
if file_name.endswith('.xlsx'):
file_path = os.path.join(folder_path, file_name)
future = executor.submit(process_excel_file_with_pandas, file_path, translations_chunks, progress_queue)
futures.append(future)
for future in futures:
if future.result():
success_count += 1
progress_queue.put(("file", 1)) # 发送文件进度信息
progress_queue.put(("finish", success_count, total_files)) # 发送完成信息
def update_progress_bar(progress_queue, file_pbar, sheet_pbar, cell_pbar):
"""更新进度条"""
while True:
msg = progress_queue.get()
if msg[0] == "file":
file_pbar.update(msg[1]) # 更新文件进度条
elif msg[0] == "sheet":
sheet_pbar.reset(total=msg[2]) # 重置工作表进度条
sheet_pbar.set_description(f"处理工作表: {msg[1]}")
elif msg[0] == "cell":
sheet_pbar.update(msg[1]) # 更新单元格进度条
elif msg[0] == "done":
file_pbar.update(1) # 更新文件进度条
elif msg[0] == "finish":
messagebox.showinfo("完成", f"处理完成!成功处理 {msg[1]}/{msg[2]} 个文件。")
break
elif msg[0] == "error":
messagebox.showerror("错误", f"处理文件时出错: {msg[1]}")
def select_files_and_process():
"""选择 JSON 文件和 Excel 文件夹并开始处理"""
# 选择 JSON 文件
json_path = filedialog.askopenfilename(
title="选择 JSON 文件",
filetypes=[("JSON 文件", "*.json")]
)
if not json_path:
messagebox.showwarning("警告", "未选择 JSON 文件")
return
# 加载并排序 JSON 文件
try:
translations = load_and_sort_json(json_path)
except Exception as e:
messagebox.showerror("错误", f"加载 JSON 文件时出错: {e}")
return
# 将排序后的数据分块
translations_chunks = chunk_sorted_data(translations)
# 选择 Excel 文件夹
folder_path = filedialog.askdirectory(
title="选择包含 Excel 文件的文件夹"
)
if not folder_path:
messagebox.showwarning("警告", "未选择文件夹")
return
# 创建进度条和消息队列
progress_queue = queue.Queue()
file_pbar = tqdm(total=len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')]), desc="处理文件")
sheet_pbar = tqdm(total=0, desc="处理工作表", leave=False)
cell_pbar = tqdm(total=0, desc="处理单元格", leave=False)
# 启动进度条更新线程
progress_thread = threading.Thread(target=update_progress_bar, args=(progress_queue, file_pbar, sheet_pbar, cell_pbar))
progress_thread.start()
# 启动文件处理线程
process_thread = threading.Thread(target=process_folder_with_threads, args=(folder_path, translations_chunks, progress_queue))
process_thread.start()
def main():
"""主函数"""
root = tk.Tk()
root.title("Excel 翻译工具")
root.geometry("400x200")
# 创建按钮
btn_select = tk.Button(root, text="选择文件并处理", command=select_files_and_process, width=20, height=3)
btn_select.pack(pady=40)
# 运行主循环
root.mainloop()
if __name__ == "__main__":
main()
主要修改点:
- 单元格处理:将单元格的处理逻辑提取到
process_cell
函数中,并在 process_excel_file_with_pandas
中使用 ThreadPoolExecutor
来并行处理每个单元格。
- 进度更新:在处理单元格时,使用
tqdm
来显示单元格处理的进度。
- 确保顺序:由于我们在处理单元格时仍然按照从长到短的顺序进行替换,因此不会出现短文本先被替换的问题。
这样修改后,程序在处理单个 Excel 表格时将能够利用多线程提高效率,同时保持翻译的正确顺序。