import os import ujson import tkinter as tk from tkinter import filedialog, messagebox import pandas as pd from concurrent.futures import ThreadPoolExecutor import queue import threading from tqdm import tqdm
def load_and_sort_json(file_path): """加载 JSON 文件并按键的长度排序(最长的键排在前面)""" with open(file_path, 'r', encoding='utf-8') as f: data = ujson.load(f) # 按键的长度从大到小排序 sorted_data = dict(sorted(data.items(), key=lambda item: len(item[0]), reverse=True)) return sorted_data
def chunk_sorted_data(sorted_data, chunk_size=1000): """将排序后的数据分块""" chunks = [] chunk = {} for key, value in sorted_data.items(): chunk[key] = value if len(chunk) >= chunk_size: chunks.append(chunk) chunk = {} # 添加剩余的块 if chunk: chunks.append(chunk) return chunks
def replace_with_dict(text, translations): """使用字典查找替换文本""" for original, translated in translations.items(): text = text.replace(original, translated) return text
def process_excel_file_with_pandas(file_path, translations_chunks, progress_queue): """使用 pandas 处理单个 Excel 文件""" try: # 读取 Excel 文件的第二列 df = pd.read_excel(file_path, usecols=[1]) total_cells = len(df) progress_queue.put(("sheet", "Sheet1", total_cells)) # 发送工作表进度信息
# 遍历第二列的每个单元格
for idx, cell_value in enumerate(df.iloc[:, 0]):
if pd.notna(cell_value): # 检查单元格是否为空
# 按从长到短的顺序依次替换
for chunk in translations_chunks:
cell_value = replace_with_dict(str(cell_value), chunk)
df.iloc[idx, 0] = cell_value # 更新单元格值
progress_queue.put(("cell", 1)) # 发送单元格进度信息
# 保存修改后的 Excel 文件
df.to_excel(file_path, index=False, header=False)
progress_queue.put(("done",)) # 发送完成信息
return True
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
progress_queue.put(("error", str(e))) # 发送错误信息
return False
def process_folder_with_threads(folder_path, translations_chunks, progress_queue): """使用多线程处理文件夹中的所有 Excel 文件""" success_count = 0 total_files = len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')]) if total_files == 0: messagebox.showwarning("警告", "选择的文件夹中没有 Excel 文件(.xlsx)") return
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for file_name in os.listdir(folder_path):
if file_name.endswith('.xlsx'):
file_path = os.path.join(folder_path, file_name)
future = executor.submit(process_excel_file_with_pandas, file_path, translations_chunks, progress_queue)
futures.append(future)
for future in futures:
if future.result():
success_count += 1
progress_queue.put(("file", 1)) # 发送文件进度信息
progress_queue.put(("finish", success_count, total_files)) # 发送完成信息
def update_progress_bar(progress_queue, file_pbar, sheet_pbar, cell_pbar): """更新进度条""" while True: msg = progress_queue.get() if msg[0] == "file": file_pbar.update(msg[1]) # 更新文件进度条 elif msg[0] == "sheet": sheet_pbar.reset(total=msg[2]) # 重置工作表进度条 sheet_pbar.set_description(f"处理工作表: {msg[1]}") elif msg[0] == "cell": sheet_pbar.update(msg[1]) # 更新单元格进度条 elif msg[0] == "done": file_pbar.update(1) # 更新文件进度条 elif msg[0] == "finish": messagebox.showinfo("完成", f"处理完成!成功处理 {msg[1]}/{msg[2]} 个文件。") break elif msg[0] == "error": messagebox.showerror("错误", f"处理文件时出错: {msg[1]}")
def select_files_and_process(): """选择 JSON 文件和 Excel 文件夹并开始处理""" # 选择 JSON 文件 json_path = filedialog.askopenfilename( title="选择 JSON 文件", filetypes=[("JSON 文件", "*.json")] ) if not json_path: messagebox.showwarning("警告", "未选择 JSON 文件") return
# 加载并排序 JSON 文件
try:
translations = load_and_sort_json(json_path)
except Exception as e:
messagebox.showerror("错误", f"加载 JSON 文件时出错: {e}")
return
# 将排序后的数据分块
translations_chunks = chunk_sorted_data(translations)
# 选择 Excel 文件夹
folder_path = filedialog.askdirectory(
title="选择包含 Excel 文件的文件夹"
)
if not folder_path:
messagebox.showwarning("警告", "未选择文件夹")
return
# 创建进度条和消息队列
progress_queue = queue.Queue()
file_pbar = tqdm(total=len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')]), desc="处理文件")
sheet_pbar = tqdm(total=0, desc="处理工作表", leave=False)
cell_pbar = tqdm(total=0, desc="处理单元格", leave=False)
# 启动进度条更新线程
progress_thread = threading.Thread(target=update_progress_bar, args=(progress_queue, file_pbar, sheet_pbar, cell_pbar))
progress_thread.start()
# 启动文件处理线程
process_thread = threading.Thread(target=process_folder_with_threads, args=(folder_path, translations_chunks, progress_queue))
process_thread.start()
def main(): """主函数""" root = tk.Tk() root.title("Excel 翻译工具") root.geometry("400x200")
# 创建按钮
btn_select = tk.Button(root, text="选择文件并处理", command=select_files_and_process, width=20, height=3)
btn_select.pack(pady=40)
# 运行主循环
root.mainloop()
if name == "main": main() json文件和xlsx表格文件都非常大(60MB),有没有办法让这个代码的效率更高
处理大型 JSON 和 Excel 文件时,效率是一个重要的考虑因素。以下是一些可以提高代码效率的建议:
ujson
是一个快速的 JSON 解析库,但如果你还没有尝试过,可以考虑使用 orjson
,它通常比 ujson
更快。
pandas
的 read_excel
和 to_excel
的优化usecols
参数,这很好。xlsxwriter
:如果你使用 pandas
的 to_excel
,可以指定 engine='xlsxwriter'
,这通常会更快。re.sub
来一次性替换多个字符串。dask
处理大数据dask
是一个并行计算库,可以处理比内存大的数据集。你可以使用 dask.dataframe
来处理 Excel 文件。
根据你的 CPU 核心数,增加 ThreadPoolExecutor
的 max_workers
参数。
在更新进度条时,尽量减少更新频率,避免频繁的 UI 更新导致性能下降。
以下是一些优化后的代码示例:
import os
import orjson # 使用 orjson 替代 ujson
import tkinter as tk
from tkinter import filedialog, messagebox
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
from tqdm import tqdm
import re
def load_and_sort_json(file_path):
"""加载 JSON 文件并按键的长度排序(最长的键排在前面)"""
with open(file_path, 'rb') as f: # 使用二进制模式
data = orjson.loads(f.read())
sorted_data = dict(sorted(data.items(), key=lambda item: len(item[0]), reverse=True))
return sorted_data
def replace_with_dict(text, translations):
"""使用字典查找替换文本"""
# 使用正则表达式进行替换
pattern = re.compile('|'.join(re.escape(key) for key in translations.keys()))
return pattern.sub(lambda match: translations[match.group(0)], text)
def process_excel_file_with_pandas(file_path, translations_chunks, progress_queue):
"""使用 pandas 处理单个 Excel 文件"""
try:
df = pd.read_excel(file_path, usecols=[1], engine='openpyxl') # 使用 openpyxl 引擎
total_cells = len(df)
progress_queue.put(("sheet", "Sheet1", total_cells))
for idx, cell_value in enumerate(df.iloc[:, 0]):
if pd.notna(cell_value):
for chunk in translations_chunks:
cell_value = replace_with_dict(str(cell_value), chunk)
df.iloc[idx, 0] = cell_value
progress_queue.put(("cell", 1))
df.to_excel(file_path, index=False, header=False, engine='xlsxwriter') # 使用 xlsxwriter 引擎
progress_queue.put(("done",))
return True
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
progress_queue.put(("error", str(e)))
return False
# 其他函数保持不变...
def main():
"""主函数"""
root = tk.Tk()
root.title("Excel 翻译工具")
root.geometry("400x200")
# 创建按钮
btn_select = tk.Button(root, text="选择文件并处理", command=select_files_and_process, width=20, height=3)
btn_select.pack(pady=40)
# 运行主循环
root.mainloop()
if __name__ == "__main__":
main()
如果数据集非常大,考虑使用 Dask:
import dask.dataframe as dd
def process_excel_file_with_dask(file_path, translations_chunks, progress_queue):
"""使用 Dask 处理单个 Excel 文件"""
try:
df = dd.read_excel(file_path, usecols=[1])
total_cells = df.shape[0].compute()
progress_queue.put(("sheet", "Sheet1", total_cells))
def replace_row(row):
for chunk in translations_chunks:
row = replace_with_dict(str(row), chunk)
return row
df[0] = df[0].map(replace_row, meta=('x', 'object'))
df.to_excel(file_path, index=False, header=False, engine='xlsxwriter')
progress_queue.put(("done",))
return True
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
progress_queue.put(("error", str(e)))
return False
通过以上优化,你的代码在处理大型 JSON 和 Excel 文件时应该会更高效。根据你的具体需求和环境,选择合适的优化策略。