1つのプロセスだけで実行するには遅い処理や、Web APIの処理待ちなどがあるとき、並列処理や非同期処理で高速化したくなりますよね。
Pythonだと標準パッケージだけでもthreadingとかmultiprocessingとかasyncioなど複数の選択肢があり、各パッケージでもコードの書き方の選択肢が豊富にあって迷いがちなので、それぞれの
- 簡単な概要(使い分けの判断材料のため)
- シンプルなコード例(
map()
関数のようにiterableを関数に通す方法だけ紹介)
をまとめてメモしておきます。
(※ Python 3.11環境を想定して書いています)
スレッドベースの並列処理(threading)
Pythonの標準パッケージ threading を使った並行処理です。
概要
この方法の長短としては
長所:
- 短く書ける
- Jupyter内で実行できる(※後述するasyncioはできないため)
- I/Oバウンドに対して有効
短所:
- CPUバウンドの対処にはならない
- スレッドセーフでない(マルチスレッドに対応していない)ライブラリがある(pandasなど)
かなと思います。
PythonではGIL(Global Interpreter Lock)のために、マルチスレッドにしても同時に1つの処理しかできません(公式ドキュメントより)。なのでCPUバウンド(CPUの計算待ち)への対処にはならないのですが、I/Oバウンド(ストレージへの読み書きやAPIのレスポンス待ちなど、CPU起因じゃない待ち時間)への対処としては使える方法かなと思います。
※GILの制約を減らすための開発が行われており、sub-interpretersを使うことでthreadingでも並列処理をする(複数コアを使って同時に複数の処理をする)ことが可能なようです。まだ開発段階でありPython 3.13以降を待つ必要があるとは思いますが、将来的には上記問題も緩和されていくと思われます。
コード例
例として、以下の関数と入力値を使います。
import time # 関数の例 def f(x: int) -> float: time.sleep(0.1) return x / 10 # 入力値の例 inputs: list[int] = list(range(10))
ThreadPoolExecutor
threadingパッケージの高水準インターフェースである concurrent.futures.ThreadPoolExecutor を使うと、次のように書くだけで実装できます。
from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor() as p: outputs = list(p.map(f, inputs))
なお、同時実行数を制御したい場合は ThreadPoolExecutor(max_workers=10)
のように指定できます。
tqdmを使う場合:thread_map
進捗を表示するtqdmパッケージを使う場合、さらに短く書くことができます。ソースコードを見る限り、thread_map()
の内部でwith文を実行しているようです。
from tqdm.contrib import concurrent outputs = concurrent.thread_map(f, inputs)
プロセスベースの並列処理(multiprocessing)
Pythonの標準パッケージ multiprocessing を使った並列処理です。
概要
この方法の長短は
長所:
- 短く書ける
- Jupyter内で実行できる(※asyncはできないため)
- CPUバウンドに対して有効
- I/Oバウンドに対しても使える
短所:
- 同時実行数がCPU数に依存する(→ vCPUの数が小さいEC2インスタンスなどでI/Oバウンドの処理を高速化したいときはthreadingやasyncioのほうがよさそう)
かなと思います。
コード例
以下の例ではthreadingの例と同様の関数f()
と入力値inputs
を使います。
multiprocessing.Pool
multiprocessing
パッケージで一番簡潔に実装する方法は Pool
を使って関数とiterableオブジェクトを渡して、 map
関数のように処理することかなと思います。
from multiprocessing import Pool with Pool() as p: outputs = list(p.map(f, inputs))
なお、同時実行数は Pool(processes=4)
のように指定します(デフォルトはNoneであり、その場合は os.cpu_count()
で表示されるCPU数が使われるようです)。
tqdmで処理の進捗を表示したい場合はimap(遅延評価版)を使う方法があるみたいです。
from tqdm import tqdm with Pool() as p: imap = p.imap(f, inputs) outputs = list(tqdm(imap, total=len(inputs)))
ProcessPoolExecutor
concurrent.futures パッケージでは ThreadPoolExecutor と ProcessPoolExecutor が同じインターフェースを持つように実装されており、こちらでも同様にmapを呼び出す書き方ができます。
from concurrent.futures import ProcessPoolExecutor with ProcessPoolExecutor() as p: outputs = list(p.map(f, inputs))
tqdmを使う場合:process_map
tqdmを使って進捗を表示したい場合は process_map
を使うのが良いかと思います。
from tqdm.contrib import concurrent outputs = concurrent.process_map(f, inputs)
非同期処理(asyncio)
Python標準パッケージ asyncio を使った並行処理です。
概要
この方法の長短は以下のようなものがあるかなと思います。
長所:
- I/Oバウンドに対して有効
短所:
- CPUバウンドの対処にはならない
- Jupyter内で実行できない
- コード量がやや増える
「Jupyter内で実行できない」について
Jupyterで実行すると
RuntimeError: This event loop is already running
というエラーがでて動きません。
対策としてnest-asyncioを使えば実行できるようになります。 とはいえ、対策の手間がふえるのでデータ分析の場面でasyncioを使うのはやや非効率な方法に感じます。
コード例
例として以下の関数(コルーチン)を使います
import time # 関数の例 async def f(x: int) -> float: time.sleep(0.1) return x / 10 # 入力値の例 inputs: list[int] = list(range(10))
asyncio.gather
gatherとrunを使う場合。
import asyncio async def execute(inputs): coroutines = [f(x) for x in inputs] outputs = await asyncio.gather(*coroutines) return outputs outputs = asyncio.run(execute(inputs))
tqdmを使う場合:tqdm_asyncio.gather
tqdm_asyncio
というクラスにgather()
という関数があるので、こちらに置き換えればよいようです(tqdm documentation)。
from tqdm.asyncio import tqdm_asyncio async def execute(inputs): coroutines = [f(x) for x in inputs] outputs = await tqdm_asyncio.gather(*coroutines) return outputs outputs = asyncio.run(execute(inputs))
同時実行数をコントロールしたい場合
asyncio.Semaphore
(セマフォ)を使う方法がいちばん楽なのかなと思います。
f()
にasync with
文を挟む変更を加えることで並行処理数をコントロールできます。
semaphore = asyncio.Semaphore(5) # 最大5つの並行処理を許可 async def f(x: int) -> float: async with semaphore: return x / 10 outputs = asyncio.run(execute(inputs))
ただし、Semaphore
はスレッドセーフではないようです・・・(同期プリミティブ — Python 3.11.8 ドキュメント)。
おわりに
あらためて思ったんですがasyncってわかりにくいし記述が複雑になるし、めんどくさくないですか?僕はいまだに苦手意識が強いです。
複雑なことをやりたいならasyncioは良さそうですが、手軽に並行処理したいだけならthreadingで足りるように思います。
しかしthreadingはスレッドセーフとか考える必要があるので、CPU数が十分ある環境ならばmultiprocessingが手軽で安全のように思います。
thread_map()
やprocess_map()
だったら2行くらいで済むので、ほんとにお手軽だなと思いました。
from tqdm.contrib import concurrent outputs = concurrent.process_map(f, inputs)