Multi-threaded Excel Translation Tool Update

Answered using OpenAI o1 mini by OpenAI o1

Question:

import os

import ujson

import tkinter as tk

from tkinter import filedialog, messagebox

import pandas as pd

from concurrent.futures import ThreadPoolExecutor

import queue

import threading

from tqdm import tqdm

def load_and_sort_json(file_path):

"""加载 JSON 文件并按键的长度排序(最长的键排在前面)"""

with open(file_path, 'r', encoding='utf-8') as f:

data = ujson.load(f)

按键的长度从大到小排序

sorted_data = dict(sorted(data.items(), key=lambda item: len(item[0]), reverse=True))

return sorted_data

def chunk_sorted_data(sorted_data, chunk_size=1000):

"""将排序后的数据分块"""

chunks = []

chunk = {}

for key, value in sorted_data.items():

chunk[key] = value

if len(chunk) >= chunk_size:

chunks.append(chunk)

chunk = {}

添加剩余的块

if chunk:

chunks.append(chunk)

return chunks

def replace_with_dict(text, translations):

"""使用字典查找替换文本"""

for original, translated in translations.items():

text = text.replace(original, translated)

return text

def process_excel_file_with_pandas(file_path, translations_chunks, progress_queue):

"""使用 pandas 处理单个 Excel 文件"""

try:

读取 Excel 文件的第二列

df = pd.read_excel(file_path, usecols=[1])

total_cells = len(df)

progress_queue.put(("sheet", "Sheet1", total_cells)) # 发送工作表进度信息

# 遍历第二列的每个单元格
for idx, cell_value in enumerate(df.iloc[:, 0]):
    if pd.notna(cell_value):  # 检查单元格是否为空
        # 按从长到短的顺序依次替换
        for chunk in translations_chunks:
            cell_value = replace_with_dict(str(cell_value), chunk)
        df.iloc[idx, 0] = cell_value  # 更新单元格值
    progress_queue.put(("cell", 1))  # 发送单元格进度信息

# 保存修改后的 Excel 文件
df.to_excel(file_path, index=False, header=False)
progress_queue.put(("done",))  # 发送完成信息
return True

except Exception as e: print(f"处理文件 {file_path} 时出错: {e}") progress_queue.put(("error", str(e))) # 发送错误信息 return False def process_folder_with_threads(folder_path, translations_chunks, progress_queue):

"""使用多线程处理文件夹中的所有 Excel 文件"""

success_count = 0

total_files = len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')])

if total_files == 0:

messagebox.showwarning("警告", "选择的文件夹中没有 Excel 文件(.xlsx)")

return

with ThreadPoolExecutor(max_workers=4) as executor: futures = [] for file_name in os.listdir(folder_path): if file_name.endswith('.xlsx'): file_path = os.path.join(folder_path, file_name) future = executor.submit(process_excel_file_with_pandas, file_path, translations_chunks, progress_queue) futures.append(future)

for future in futures:
    if future.result():
        success_count += 1
    progress_queue.put(("file", 1))  # 发送文件进度信息

progress_queue.put(("finish", success_count, total_files)) # 发送完成信息 def update_progress_bar(progress_queue, file_pbar, sheet_pbar, cell_pbar):

"""更新进度条"""

while True:

msg = progress_queue.get()

if msg[0] == "file":

file_pbar.update(msg[1]) # 更新文件进度条

elif msg[0] == "sheet":

sheet_pbar.reset(total=msg[2]) # 重置工作表进度条

sheet_pbar.set_description(f"处理工作表: {msg[1]}")

elif msg[0] == "cell":

sheet_pbar.update(msg[1]) # 更新单元格进度条

elif msg[0] == "done":

file_pbar.update(1) # 更新文件进度条

elif msg[0] == "finish":

messagebox.showinfo("完成", f"处理完成!成功处理 {msg[1]}/{msg[2]} 个文件。")

break

elif msg[0] == "error":

messagebox.showerror("错误", f"处理文件时出错: {msg[1]}")

def select_files_and_process():

"""选择 JSON 文件和 Excel 文件夹并开始处理"""

选择 JSON 文件

json_path = filedialog.askopenfilename(

title="选择 JSON 文件",

filetypes=[("JSON 文件", "*.json")]

)

if not json_path:

messagebox.showwarning("警告", "未选择 JSON 文件")

return

加载并排序 JSON 文件

try: translations = load_and_sort_json(json_path) except Exception as e: messagebox.showerror("错误", f"加载 JSON 文件时出错: {e}") return

将排序后的数据分块

translations_chunks = chunk_sorted_data(translations)

选择 Excel 文件夹

folder_path = filedialog.askdirectory( title="选择包含 Excel 文件的文件夹" ) if not folder_path: messagebox.showwarning("警告", "未选择文件夹") return

创建进度条和消息队列

progress_queue = queue.Queue() file_pbar = tqdm(total=len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')]), desc="处理文件") sheet_pbar = tqdm(total=0, desc="处理工作表", leave=False) cell_pbar = tqdm(total=0, desc="处理单元格", leave=False)

启动进度条更新线程

progress_thread = threading.Thread(target=update_progress_bar, args=(progress_queue, file_pbar, sheet_pbar, cell_pbar)) progress_thread.start()

启动文件处理线程

process_thread = threading.Thread(target=process_folder_with_threads, args=(folder_path, translations_chunks, progress_queue)) process_thread.start() def main():

"""主函数"""

root = tk.Tk()

root.title("Excel 翻译工具")

root.geometry("400x200")

创建按钮

btn_select = tk.Button(root, text="选择文件并处理", command=select_files_and_process, width=20, height=3) btn_select.pack(pady=40)

运行主循环

root.mainloop() if name == "main":

main() 修改这个代码,让其处理单个表格的时候使用多线程,消息正常更新,需要保证先用最长的文本进行翻译,以免短文本先被替换,保证效率,不要出现改成多线程处理单个表格后,速度更慢的情况

Answer:

要将代码修改为在处理单个 Excel 表格时使用多线程,同时确保翻译的顺序是从最长的文本到最短的文本,我们可以对 process_excel_file_with_pandas 函数进行调整。我们将使用 ThreadPoolExecutor 来处理每个单元格的翻译。以下是修改后的代码:

import os
import ujson
import tkinter as tk
from tkinter import filedialog, messagebox
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
from tqdm import tqdm

def load_and_sort_json(file_path):
    """加载 JSON 文件并按键的长度排序(最长的键排在前面)"""
    with open(file_path, 'r', encoding='utf-8') as f:
        data = ujson.load(f)
    # 按键的长度从大到小排序
    sorted_data = dict(sorted(data.items(), key=lambda item: len(item[0]), reverse=True))
    return sorted_data

def chunk_sorted_data(sorted_data, chunk_size=1000):
    """将排序后的数据分块"""
    chunks = []
    chunk = {}
    for key, value in sorted_data.items():
        chunk[key] = value
        if len(chunk) >= chunk_size:
            chunks.append(chunk)
            chunk = {}
    # 添加剩余的块
    if chunk:
        chunks.append(chunk)
    return chunks

def replace_with_dict(text, translations):
    """使用字典查找替换文本"""
    for original, translated in translations.items():
        text = text.replace(original, translated)
    return text

def process_cell(cell_value, translations_chunks):
    """处理单个单元格的翻译"""
    if pd.notna(cell_value):  # 检查单元格是否为空
        for chunk in translations_chunks:
            cell_value = replace_with_dict(str(cell_value), chunk)
    return cell_value

def process_excel_file_with_pandas(file_path, translations_chunks, progress_queue):
    """使用 pandas 处理单个 Excel 文件"""
    try:
        # 读取 Excel 文件的第二列
        df = pd.read_excel(file_path, usecols=[1])
        total_cells = len(df)
        progress_queue.put(("sheet", "Sheet1", total_cells))  # 发送工作表进度信息

        # 使用多线程处理每个单元格
        with ThreadPoolExecutor(max_workers=4) as executor:
            futures = {executor.submit(process_cell, df.iloc[idx, 0], translations_chunks): idx for idx in range(total_cells)}
            for future in tqdm(futures.keys(), desc="处理单元格", leave=False):
                idx = futures[future]
                df.iloc[idx, 0] = future.result()  # 更新单元格值
                progress_queue.put(("cell", 1))  # 发送单元格进度信息

        # 保存修改后的 Excel 文件
        df.to_excel(file_path, index=False, header=False)
        progress_queue.put(("done",))  # 发送完成信息
        return True
    except Exception as e:
        print(f"处理文件 {file_path} 时出错: {e}")
        progress_queue.put(("error", str(e)))  # 发送错误信息
        return False

def process_folder_with_threads(folder_path, translations_chunks, progress_queue):
    """使用多线程处理文件夹中的所有 Excel 文件"""
    success_count = 0
    total_files = len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')])
    if total_files == 0:
        messagebox.showwarning("警告", "选择的文件夹中没有 Excel 文件(.xlsx)")
        return

    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = []
        for file_name in os.listdir(folder_path):
            if file_name.endswith('.xlsx'):
                file_path = os.path.join(folder_path, file_name)
                future = executor.submit(process_excel_file_with_pandas, file_path, translations_chunks, progress_queue)
                futures.append(future)

        for future in futures:
            if future.result():
                success_count += 1
            progress_queue.put(("file", 1))  # 发送文件进度信息

    progress_queue.put(("finish", success_count, total_files))  # 发送完成信息

def update_progress_bar(progress_queue, file_pbar, sheet_pbar, cell_pbar):
    """更新进度条"""
    while True:
        msg = progress_queue.get()
        if msg[0] == "file":
            file_pbar.update(msg[1])  # 更新文件进度条
        elif msg[0] == "sheet":
            sheet_pbar.reset(total=msg[2])  # 重置工作表进度条
            sheet_pbar.set_description(f"处理工作表: {msg[1]}")
        elif msg[0] == "cell":
            sheet_pbar.update(msg[1])  # 更新单元格进度条
        elif msg[0] == "done":
            file_pbar.update(1)  # 更新文件进度条
        elif msg[0] == "finish":
            messagebox.showinfo("完成", f"处理完成!成功处理 {msg[1]}/{msg[2]} 个文件。")
            break
        elif msg[0] == "error":
            messagebox.showerror("错误", f"处理文件时出错: {msg[1]}")

def select_files_and_process():
    """选择 JSON 文件和 Excel 文件夹并开始处理"""
    # 选择 JSON 文件
    json_path = filedialog.askopenfilename(
        title="选择 JSON 文件",
        filetypes=[("JSON 文件", "*.json")]
    )
    if not json_path:
        messagebox.showwarning("警告", "未选择 JSON 文件")
        return

    # 加载并排序 JSON 文件
    try:
        translations = load_and_sort_json(json_path)
    except Exception as e:
        messagebox.showerror("错误", f"加载 JSON 文件时出错: {e}")
        return

    # 将排序后的数据分块
    translations_chunks = chunk_sorted_data(translations)

    # 选择 Excel 文件夹
    folder_path = filedialog.askdirectory(
        title="选择包含 Excel 文件的文件夹"
    )
    if not folder_path:
        messagebox.showwarning("警告", "未选择文件夹")
        return

    # 创建进度条和消息队列
    progress_queue = queue.Queue()
    file_pbar = tqdm(total=len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')]), desc="处理文件")
    sheet_pbar = tqdm(total=0, desc="处理工作表", leave=False)
    cell_pbar = tqdm(total=0, desc="处理单元格", leave=False)

    # 启动进度条更新线程
    progress_thread = threading.Thread(target=update_progress_bar, args=(progress_queue, file_pbar, sheet_pbar, cell_pbar))
    progress_thread.start()

    # 启动文件处理线程
    process_thread = threading.Thread(target=process_folder_with_threads, args=(folder_path, translations_chunks, progress_queue))
    process_thread.start()

def main():
    """主函数"""
    root = tk.Tk()
    root.title("Excel 翻译工具")
    root.geometry("400x200")

    # 创建按钮
    btn_select = tk.Button(root, text="选择文件并处理", command=select_files_and_process, width=20, height=3)
    btn_select.pack(pady=40)

    # 运行主循环
    root.mainloop()

if __name__ == "__main__":
    main()

主要修改点:

  1. 单元格处理:将单元格的处理逻辑提取到 process_cell 函数中,并在 process_excel_file_with_pandas 中使用 ThreadPoolExecutor 来并行处理每个单元格。
  2. 进度更新:在处理单元格时,使用 tqdm 来显示单元格处理的进度。
  3. 确保顺序:由于我们在处理单元格时仍然按照从长到短的顺序进行替换,因此不会出现短文本先被替换的问题。

这样修改后,程序在处理单个 Excel 表格时将能够利用多线程提高效率,同时保持翻译的正确顺序。