修改以下代码,让其处理单独一个xlsx文件的时候,也进行多线程的方法,同时还要保证效率和消息的正常更新 import os import ujson import tkinter as tk from tkinter import filedialog, messagebox import pandas as pd from concurrent.futures import ThreadPoolExecutor import queue import threading from tqdm import tqdm
def load_and_sort_json(file_path): """加载 JSON 文件并按键的长度排序(最长的键排在前面)""" with open(file_path, 'r', encoding='utf-8') as f: data = ujson.load(f) # 按键的长度从大到小排序 sorted_data = dict(sorted(data.items(), key=lambda item: len(item[0]), reverse=True)) return sorted_data
def chunk_sorted_data(sorted_data, chunk_size=1000): """将排序后的数据分块""" chunks = [] chunk = {} for key, value in sorted_data.items(): chunk[key] = value if len(chunk) >= chunk_size: chunks.append(chunk) chunk = {} # 添加剩余的块 if chunk: chunks.append(chunk) return chunks
def replace_with_dict(text, translations): """使用字典查找替换文本""" for original, translated in translations.items(): text = text.replace(original, translated) return text
def process_excel_file_with_pandas(file_path, translations_chunks, progress_queue): """使用 pandas 处理单个 Excel 文件""" try: # 读取 Excel 文件的第二列 df = pd.read_excel(file_path, usecols=[1]) total_cells = len(df) progress_queue.put(("sheet", "Sheet1", total_cells)) # 发送工作表进度信息
# 遍历第二列的每个单元格
for idx, cell_value in enumerate(df.iloc[:, 0]):
if pd.notna(cell_value): # 检查单元格是否为空
# 按从长到短的顺序依次替换
for chunk in translations_chunks:
cell_value = replace_with_dict(str(cell_value), chunk)
df.iloc[idx, 0] = cell_value # 更新单元格值
progress_queue.put(("cell", 1)) # 发送单元格进度信息
# 保存修改后的 Excel 文件
df.to_excel(file_path, index=False, header=False)
progress_queue.put(("done",)) # 发送完成信息
return True
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
progress_queue.put(("error", str(e))) # 发送错误信息
return False
def process_folder_with_threads(folder_path, translations_chunks, progress_queue): """使用多线程处理文件夹中的所有 Excel 文件""" success_count = 0 total_files = len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')]) if total_files == 0: messagebox.showwarning("警告", "选择的文件夹中没有 Excel 文件(.xlsx)") return
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for file_name in os.listdir(folder_path):
if file_name.endswith('.xlsx'):
file_path = os.path.join(folder_path, file_name)
future = executor.submit(process_excel_file_with_pandas, file_path, translations_chunks, progress_queue)
futures.append(future)
for future in futures:
if future.result():
success_count += 1
progress_queue.put(("file", 1)) # 发送文件进度信息
progress_queue.put(("finish", success_count, total_files)) # 发送完成信息
def update_progress_bar(progress_queue, file_pbar, sheet_pbar, cell_pbar): """更新进度条""" while True: msg = progress_queue.get() if msg[0] == "file": file_pbar.update(msg[1]) # 更新文件进度条 elif msg[0] == "sheet": sheet_pbar.reset(total=msg[2]) # 重置工作表进度条 sheet_pbar.set_description(f"处理工作表: {msg[1]}") elif msg[0] == "cell": sheet_pbar.update(msg[1]) # 更新单元格进度条 elif msg[0] == "done": file_pbar.update(1) # 更新文件进度条 elif msg[0] == "finish": messagebox.showinfo("完成", f"处理完成!成功处理 {msg[1]}/{msg[2]} 个文件。") break elif msg[0] == "error": messagebox.showerror("错误", f"处理文件时出错: {msg[1]}")
def select_files_and_process(): """选择 JSON 文件和 Excel 文件夹并开始处理""" # 选择 JSON 文件 json_path = filedialog.askopenfilename( title="选择 JSON 文件", filetypes=[("JSON 文件", "*.json")] ) if not json_path: messagebox.showwarning("警告", "未选择 JSON 文件") return
# 加载并排序 JSON 文件
try:
translations = load_and_sort_json(json_path)
except Exception as e:
messagebox.showerror("错误", f"加载 JSON 文件时出错: {e}")
return
# 将排序后的数据分块
translations_chunks = chunk_sorted_data(translations)
# 选择 Excel 文件夹
folder_path = filedialog.askdirectory(
title="选择包含 Excel 文件的文件夹"
)
if not folder_path:
messagebox.showwarning("警告", "未选择文件夹")
return
# 创建进度条和消息队列
progress_queue = queue.Queue()
file_pbar = tqdm(total=len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')]), desc="处理文件")
sheet_pbar = tqdm(total=0, desc="处理工作表", leave=False)
cell_pbar = tqdm(total=0, desc="处理单元格", leave=False)
# 启动进度条更新线程
progress_thread = threading.Thread(target=update_progress_bar, args=(progress_queue, file_pbar, sheet_pbar, cell_pbar))
progress_thread.start()
# 启动文件处理线程
process_thread = threading.Thread(target=process_folder_with_threads, args=(folder_path, translations_chunks, progress_queue))
process_thread.start()
def main(): """主函数""" root = tk.Tk() root.title("Excel 翻译工具") root.geometry("400x200")
# 创建按钮
btn_select = tk.Button(root, text="选择文件并处理", command=select_files_and_process, width=20, height=3)
btn_select.pack(pady=40)
# 运行主循环
root.mainloop()
if name == "main": main()
为了让代码能够处理单独一个 .xlsx
文件时也能使用多线程的方法,同时保持效率和消息的正常更新,我们需要对现有的代码进行一些修改。以下是修改后的代码:
import os
import ujson
import tkinter as tk
from tkinter import filedialog, messagebox
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
from tqdm import tqdm
def load_and_sort_json(file_path):
"""加载 JSON 文件并按键的长度排序(最长的键排在前面)"""
with open(file_path, 'r', encoding='utf-8') as f:
data = ujson.load(f)
# 按键的长度从大到小排序
sorted_data = dict(sorted(data.items(), key=lambda item: len(item[0]), reverse=True))
return sorted_data
def chunk_sorted_data(sorted_data, chunk_size=1000):
"""将排序后的数据分块"""
chunks = []
chunk = {}
for key, value in sorted_data.items():
chunk[key] = value
if len(chunk) >= chunk_size:
chunks.append(chunk)
chunk = {}
# 添加剩余的块
if chunk:
chunks.append(chunk)
return chunks
def replace_with_dict(text, translations):
"""使用字典查找替换文本"""
for original, translated in translations.items():
text = text.replace(original, translated)
return text
def process_excel_file_with_pandas(file_path, translations_chunks, progress_queue):
"""使用 pandas 处理单个 Excel 文件"""
try:
# 读取 Excel 文件的第二列
df = pd.read_excel(file_path, usecols=[1])
total_cells = len(df)
progress_queue.put(("sheet", "Sheet1", total_cells)) # 发送工作表进度信息
# 遍历第二列的每个单元格
for idx, cell_value in enumerate(df.iloc[:, 0]):
if pd.notna(cell_value): # 检查单元格是否为空
# 按从长到短的顺序依次替换
for chunk in translations_chunks:
cell_value = replace_with_dict(str(cell_value), chunk)
df.iloc[idx, 0] = cell_value # 更新单元格值
progress_queue.put(("cell", 1)) # 发送单元格进度信息
# 保存修改后的 Excel 文件
df.to_excel(file_path, index=False, header=False)
progress_queue.put(("done",)) # 发送完成信息
return True
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
progress_queue.put(("error", str(e))) # 发送错误信息
return False
def process_single_file(file_path, translations_chunks, progress_queue):
"""处理单个 Excel 文件"""
return process_excel_file_with_pandas(file_path, translations_chunks, progress_queue)
def process_folder_with_threads(folder_path, translations_chunks, progress_queue):
"""使用多线程处理文件夹中的所有 Excel 文件"""
success_count = 0
total_files = len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')])
if total_files == 0:
messagebox.showwarning("警告", "选择的文件夹中没有 Excel 文件(.xlsx)")
return
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for file_name in os.listdir(folder_path):
if file_name.endswith('.xlsx'):
file_path = os.path.join(folder_path, file_name)
future = executor.submit(process_excel_file_with_pandas, file_path, translations_chunks, progress_queue)
futures.append(future)
for future in futures:
if future.result():
success_count += 1
progress_queue.put(("file", 1)) # 发送文件进度信息
progress_queue.put(("finish", success_count, total_files)) # 发送完成信息
def update_progress_bar(progress_queue, file_pbar, sheet_pbar, cell_pbar):
"""更新进度条"""
while True:
msg = progress_queue.get()
if msg[0] == "file":
file_pbar.update(msg[1]) # 更新文件进度条
elif msg[0] == "sheet":
sheet_pbar.reset(total=msg[2]) # 重置工作表进度条
sheet_pbar.set_description(f"处理工作表: {msg[1]}")
elif msg[0] == "cell":
sheet_pbar.update(msg[1]) # 更新单元格进度条
elif msg[0] == "done":
file_pbar.update(1) # 更新文件进度条
elif msg[0] == "finish":
messagebox.showinfo("完成", f"处理完成!成功处理 {msg[1]}/{msg[2]} 个文件。")
break
elif msg[0] == "error":
messagebox.showerror("错误", f"处理文件时出错: {msg[1]}")
def select_files_and_process():
"""选择 JSON 文件和 Excel 文件夹并开始处理"""
# 选择 JSON 文件
json_path = filedialog.askopenfilename(
title="选择 JSON 文件",
filetypes=[("JSON 文件", "*.json")]
)
if not json_path:
messagebox.showwarning("警告", "未选择 JSON 文件")
return
# 加载并排序 JSON 文件
try:
translations = load_and_sort_json(json_path)
except Exception as e:
messagebox.showerror("错误", f"加载 JSON 文件时出错: {e}")
return
# 将排序后的数据分块
translations_chunks = chunk_sorted_data(translations)
# 选择 Excel 文件或文件夹
file_path = filedialog.askopenfilename(
title="选择 Excel 文件",
filetypes=[("Excel 文件", "*.xlsx")]
)
if file_path:
# 处理单个文件
progress_queue = queue.Queue()
file_pbar = tqdm(total=1, desc="处理文件")
sheet_pbar = tqdm(total=0, desc="处理工作表", leave=False)
cell_pbar = tqdm(total=0, desc="处理单元格", leave=False)
# 启动进度条更新线程
progress_thread = threading.Thread(target=update_progress_bar, args=(progress_queue, file_pbar, sheet_pbar, cell_pbar))
progress_thread.start()
# 处理单个文件
process_single_file(file_path, translations_chunks, progress_queue)
else:
# 选择文件夹
folder_path = filedialog.askdirectory(
title="选择包含 Excel 文件的文件夹"
)
if not folder_path:
messagebox.showwarning("警告", "未选择文件夹")
return
# 创建进度条和消息队列
progress_queue = queue.Queue()
file_pbar = tqdm(total=len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')]), desc="处理文件")
sheet_pbar = tqdm(total=0, desc="处理工作表", leave=False)
cell_pbar = tqdm(total=0, desc="处理单元格", leave=False)
# 启动进度条更新线程
progress_thread = threading.Thread(target=update_progress_bar, args=(progress_queue, file_pbar, sheet_pbar, cell_pbar))
progress_thread.start()
# 启动文件处理线程
process_thread = threading.Thread(target=process_folder_with_threads, args=(folder_path, translations_chunks, progress_queue))
process_thread.start()
def main():
"""主函数"""
root = tk.Tk()
root.title("Excel 翻译工具")
root.geometry("400x200")
# 创建按钮
btn_select = tk.Button(root, text="选择文件并处理", command=select_files_and_process, width=20, height=3)
btn_select.pack(pady=40)
# 运行主循环
root.mainloop()
if __name__ == "__main__":
main()
select_files_and_process
函数中,添加了选择单个 Excel 文件的功能。如果用户选择了一个文件,则调用 process_single_file
函数来处理该文件。这样,用户可以选择单个 .xlsx
文件进行处理,同时也可以选择一个文件夹来处理多个文件。