multiprocessingを使ったファイル読み込みの並列化

はじめに

今回はmultiprocessingを用いたファイル読み込みの並列化についてです。 直列で読み込むのに時間がかかるときに使えます。

ダミーデータ作成

適当に1000行のcsvを10万ファイル用意

import numpy as np
import pandas as pd
from tqdm import tqdm

for i in tqdm(range(100000)):
    fname = f"data/dummy_{i}.csv"
    df = pd.DataFrame({
        "id": range(1000),
        "value": np.random.rand(1000)
    })
    df.to_csv(fname, index=None)

直列で読み込む

import time
import pandas as pd
from tqdm import tqdm
from glob import glob

# ファイルリスト
fname_list = glob("data/*")

# 読み込み処理
start = time.time()
df = []
for fname in tqdm(fname_list):
    df_tmp = pd.read_csv(fname)
    df.append(df_tmp)
df = pd.concat(df, ignore_index=True)
print(time.time()-start)
"""出力
347.80504083633423 → 5分47秒
"""

並列で読み込む

並列数を4にしてみる。ちなみに1にすれば実質直列と同等なので基本これベースに実装したほうがいいかも。

import time
import pandas as pd
from tqdm import tqdm
from glob import glob
from multiprocessing.dummy import Pool

# ファイルリスト
fname_list = glob("data/*")

# 読み込む関数定義
def read_func(fname):
    df_tmp = pd.read_csv(fname)
    return df_tmp

# 読み込み処理
start = time.time()
with Pool(4) as p:
    imap = p.imap(read_func, fname_list)
    df = list(tqdm(imap, total=len(fname_list)))
df = pd.concat(df, ignore_index=True)
print(time.time()-start)
"""出力
216.96178102493286 → 3分36秒
"""

1/4の時間とはいかないまでも、ある程度の処理速度向上が見られる(コンピュータスペックによる)

最後に

たくさんファイルあると読み込みにストレスかかるので並列化しましょうー

参考