Optimizing JSON and Excel File Processing

Answered using OpenAI o1 mini by OpenAI o1

Question:

import os import ujson import tkinter as tk from tkinter import filedialog, messagebox import pandas as pd from concurrent.futures import ThreadPoolExecutor import queue import threading from tqdm import tqdm

def load_and_sort_json(file_path): """加载 JSON 文件并按键的长度排序(最长的键排在前面)""" with open(file_path, 'r', encoding='utf-8') as f: data = ujson.load(f) # 按键的长度从大到小排序 sorted_data = dict(sorted(data.items(), key=lambda item: len(item[0]), reverse=True)) return sorted_data

def chunk_sorted_data(sorted_data, chunk_size=1000): """将排序后的数据分块""" chunks = [] chunk = {} for key, value in sorted_data.items(): chunk[key] = value if len(chunk) >= chunk_size: chunks.append(chunk) chunk = {} # 添加剩余的块 if chunk: chunks.append(chunk) return chunks

def replace_with_dict(text, translations): """使用字典查找替换文本""" for original, translated in translations.items(): text = text.replace(original, translated) return text

def process_excel_file_with_pandas(file_path, translations_chunks, progress_queue): """使用 pandas 处理单个 Excel 文件""" try: # 读取 Excel 文件的第二列 df = pd.read_excel(file_path, usecols=[1]) total_cells = len(df) progress_queue.put(("sheet", "Sheet1", total_cells)) # 发送工作表进度信息

    # 遍历第二列的每个单元格
    for idx, cell_value in enumerate(df.iloc[:, 0]):
        if pd.notna(cell_value):  # 检查单元格是否为空
            # 按从长到短的顺序依次替换
            for chunk in translations_chunks:
                cell_value = replace_with_dict(str(cell_value), chunk)
            df.iloc[idx, 0] = cell_value  # 更新单元格值
        progress_queue.put(("cell", 1))  # 发送单元格进度信息

    # 保存修改后的 Excel 文件
    df.to_excel(file_path, index=False, header=False)
    progress_queue.put(("done",))  # 发送完成信息
    return True
except Exception as e:
    print(f"处理文件 {file_path} 时出错: {e}")
    progress_queue.put(("error", str(e)))  # 发送错误信息
    return False

def process_folder_with_threads(folder_path, translations_chunks, progress_queue): """使用多线程处理文件夹中的所有 Excel 文件""" success_count = 0 total_files = len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')]) if total_files == 0: messagebox.showwarning("警告", "选择的文件夹中没有 Excel 文件(.xlsx)") return

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = []
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.xlsx'):
            file_path = os.path.join(folder_path, file_name)
            future = executor.submit(process_excel_file_with_pandas, file_path, translations_chunks, progress_queue)
            futures.append(future)

    for future in futures:
        if future.result():
            success_count += 1
        progress_queue.put(("file", 1))  # 发送文件进度信息

progress_queue.put(("finish", success_count, total_files))  # 发送完成信息

def update_progress_bar(progress_queue, file_pbar, sheet_pbar, cell_pbar): """更新进度条""" while True: msg = progress_queue.get() if msg[0] == "file": file_pbar.update(msg[1]) # 更新文件进度条 elif msg[0] == "sheet": sheet_pbar.reset(total=msg[2]) # 重置工作表进度条 sheet_pbar.set_description(f"处理工作表: {msg[1]}") elif msg[0] == "cell": sheet_pbar.update(msg[1]) # 更新单元格进度条 elif msg[0] == "done": file_pbar.update(1) # 更新文件进度条 elif msg[0] == "finish": messagebox.showinfo("完成", f"处理完成!成功处理 {msg[1]}/{msg[2]} 个文件。") break elif msg[0] == "error": messagebox.showerror("错误", f"处理文件时出错: {msg[1]}")

def select_files_and_process(): """选择 JSON 文件和 Excel 文件夹并开始处理""" # 选择 JSON 文件 json_path = filedialog.askopenfilename( title="选择 JSON 文件", filetypes=[("JSON 文件", "*.json")] ) if not json_path: messagebox.showwarning("警告", "未选择 JSON 文件") return

# 加载并排序 JSON 文件
try:
    translations = load_and_sort_json(json_path)
except Exception as e:
    messagebox.showerror("错误", f"加载 JSON 文件时出错: {e}")
    return

# 将排序后的数据分块
translations_chunks = chunk_sorted_data(translations)

# 选择 Excel 文件夹
folder_path = filedialog.askdirectory(
    title="选择包含 Excel 文件的文件夹"
)
if not folder_path:
    messagebox.showwarning("警告", "未选择文件夹")
    return

# 创建进度条和消息队列
progress_queue = queue.Queue()
file_pbar = tqdm(total=len([f for f in os.listdir(folder_path) if f.endswith('.xlsx')]), desc="处理文件")
sheet_pbar = tqdm(total=0, desc="处理工作表", leave=False)
cell_pbar = tqdm(total=0, desc="处理单元格", leave=False)

# 启动进度条更新线程
progress_thread = threading.Thread(target=update_progress_bar, args=(progress_queue, file_pbar, sheet_pbar, cell_pbar))
progress_thread.start()

# 启动文件处理线程
process_thread = threading.Thread(target=process_folder_with_threads, args=(folder_path, translations_chunks, progress_queue))
process_thread.start()

def main(): """主函数""" root = tk.Tk() root.title("Excel 翻译工具") root.geometry("400x200")

# 创建按钮
btn_select = tk.Button(root, text="选择文件并处理", command=select_files_and_process, width=20, height=3)
btn_select.pack(pady=40)

# 运行主循环
root.mainloop()

if name == "main": main() json文件和xlsx表格文件都非常大(60MB),有没有办法让这个代码的效率更高

Answer:

处理大型 JSON 和 Excel 文件时,效率是一个重要的考虑因素。以下是一些可以提高代码效率的建议:

1. 使用更高效的 JSON 库

ujson 是一个快速的 JSON 解析库,但如果你还没有尝试过,可以考虑使用 orjson,它通常比 ujson 更快。

2. 使用 pandasread_excelto_excel 的优化

  • 读取时只读取必要的列:你已经在使用 usecols 参数,这很好。
  • 写入时使用 xlsxwriter:如果你使用 pandasto_excel,可以指定 engine='xlsxwriter',这通常会更快。

3. 减少字符串替换的次数

  • 使用正则表达式:如果你的替换规则比较复杂,可以考虑使用 re.sub 来一次性替换多个字符串。
  • 构建替换字典:在处理单元格时,构建一个替换字典,避免多次遍历。

4. 使用 dask 处理大数据

dask 是一个并行计算库,可以处理比内存大的数据集。你可以使用 dask.dataframe 来处理 Excel 文件。

5. 增加线程数

根据你的 CPU 核心数,增加 ThreadPoolExecutormax_workers 参数。

6. 进度条的优化

在更新进度条时,尽量减少更新频率,避免频繁的 UI 更新导致性能下降。

7. 代码示例

以下是一些优化后的代码示例:

import os
import orjson  # 使用 orjson 替代 ujson
import tkinter as tk
from tkinter import filedialog, messagebox
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
from tqdm import tqdm
import re

def load_and_sort_json(file_path):
    """加载 JSON 文件并按键的长度排序(最长的键排在前面)"""
    with open(file_path, 'rb') as f:  # 使用二进制模式
        data = orjson.loads(f.read())
    sorted_data = dict(sorted(data.items(), key=lambda item: len(item[0]), reverse=True))
    return sorted_data

def replace_with_dict(text, translations):
    """使用字典查找替换文本"""
    # 使用正则表达式进行替换
    pattern = re.compile('|'.join(re.escape(key) for key in translations.keys()))
    return pattern.sub(lambda match: translations[match.group(0)], text)

def process_excel_file_with_pandas(file_path, translations_chunks, progress_queue):
    """使用 pandas 处理单个 Excel 文件"""
    try:
        df = pd.read_excel(file_path, usecols=[1], engine='openpyxl')  # 使用 openpyxl 引擎
        total_cells = len(df)
        progress_queue.put(("sheet", "Sheet1", total_cells))

        for idx, cell_value in enumerate(df.iloc[:, 0]):
            if pd.notna(cell_value):
                for chunk in translations_chunks:
                    cell_value = replace_with_dict(str(cell_value), chunk)
                df.iloc[idx, 0] = cell_value
            progress_queue.put(("cell", 1))

        df.to_excel(file_path, index=False, header=False, engine='xlsxwriter')  # 使用 xlsxwriter 引擎
        progress_queue.put(("done",))
        return True
    except Exception as e:
        print(f"处理文件 {file_path} 时出错: {e}")
        progress_queue.put(("error", str(e)))
        return False

# 其他函数保持不变...

def main():
    """主函数"""
    root = tk.Tk()
    root.title("Excel 翻译工具")
    root.geometry("400x200")

    # 创建按钮
    btn_select = tk.Button(root, text="选择文件并处理", command=select_files_and_process, width=20, height=3)
    btn_select.pack(pady=40)

    # 运行主循环
    root.mainloop()

if __name__ == "__main__":
    main()

8. 使用 Dask(可选)

如果数据集非常大,考虑使用 Dask:

import dask.dataframe as dd

def process_excel_file_with_dask(file_path, translations_chunks, progress_queue):
    """使用 Dask 处理单个 Excel 文件"""
    try:
        df = dd.read_excel(file_path, usecols=[1])
        total_cells = df.shape[0].compute()
        progress_queue.put(("sheet", "Sheet1", total_cells))

        def replace_row(row):
            for chunk in translations_chunks:
                row = replace_with_dict(str(row), chunk)
            return row

        df[0] = df[0].map(replace_row, meta=('x', 'object'))
        df.to_excel(file_path, index=False, header=False, engine='xlsxwriter')
        progress_queue.put(("done",))
        return True
    except Exception as e:
        print(f"处理文件 {file_path} 时出错: {e}")
        progress_queue.put(("error", str(e)))
        return False

总结

通过以上优化,你的代码在处理大型 JSON 和 Excel 文件时应该会更高效。根据你的具体需求和环境,选择合适的优化策略。