盆暗の学習記録

データサイエンス ,エンジニアリング,ビジネスについて日々学んだことの備忘録としていく予定です。初心者であり独学なので内容には誤りが含まれる可能性が大いにあります。

Pythonにおける並列処理・非同期処理の概要とコード例(tqdmを使う例も)

1つのプロセスだけで実行するには遅い処理や、Web APIの処理待ちなどがあるとき、並列処理や非同期処理で高速化したくなりますよね。

Pythonだと標準パッケージだけでもthreadingとかmultiprocessingとかasyncioなど複数の選択肢があり、各パッケージでもコードの書き方の選択肢が豊富にあって迷いがちなので、それぞれの

  • 簡単な概要(使い分けの判断材料のため)
  • シンプルなコード例(map()関数のようにiterableを関数に通す方法だけ紹介)

をまとめてメモしておきます。

(※ Python 3.11環境を想定して書いています)

スレッドベースの並列処理(threading)

Pythonの標準パッケージ threading を使った並行処理です。

概要

この方法の長短としては

長所:

  • 短く書ける
  • Jupyter内で実行できる(※後述するasyncioはできないため)
  • I/Oバウンドに対して有効

短所:

  • CPUバウンドの対処にはならない
  • スレッドセーフでない(マルチスレッドに対応していない)ライブラリがある(pandasなど)

かなと思います。

PythonではGIL(Global Interpreter Lock)のために、マルチスレッドにしても同時に1つの処理しかできません(公式ドキュメントより)。なのでCPUバウンド(CPUの計算待ち)への対処にはならないのですが、I/Oバウンド(ストレージへの読み書きやAPIのレスポンス待ちなど、CPU起因じゃない待ち時間)への対処としては使える方法かなと思います。

※GILの制約を減らすための開発が行われており、sub-interpretersを使うことでthreadingでも並列処理をする(複数コアを使って同時に複数の処理をする)ことが可能なようです。まだ開発段階でありPython 3.13以降を待つ必要があるとは思いますが、将来的には上記問題も緩和されていくと思われます。

gihyo.jp

コード例

例として、以下の関数と入力値を使います。

import time

# 関数の例
def f(x: int) -> float:
    time.sleep(0.1)
    return x / 10


# 入力値の例
inputs: list[int] = list(range(10))

ThreadPoolExecutor

threadingパッケージの高水準インターフェースである concurrent.futures.ThreadPoolExecutor を使うと、次のように書くだけで実装できます。

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as p:
    outputs = list(p.map(f, inputs))

なお、同時実行数を制御したい場合は ThreadPoolExecutor(max_workers=10) のように指定できます。

tqdmを使う場合:thread_map

進捗を表示するtqdmパッケージを使う場合、さらに短く書くことができます。ソースコードを見る限り、thread_map()の内部でwith文を実行しているようです。

from tqdm.contrib import concurrent
outputs = concurrent.thread_map(f, inputs)

プロセスベースの並列処理(multiprocessing)

Pythonの標準パッケージ multiprocessing を使った並列処理です。

概要

この方法の長短は

長所:

  • 短く書ける
  • Jupyter内で実行できる(※asyncはできないため)
  • CPUバウンドに対して有効
  • I/Oバウンドに対しても使える

短所:

  • 同時実行数がCPU数に依存する(→ vCPUの数が小さいEC2インスタンスなどでI/Oバウンドの処理を高速化したいときはthreadingやasyncioのほうがよさそう)

かなと思います。

コード例

以下の例ではthreadingの例と同様の関数f()と入力値inputsを使います。

multiprocessing.Pool

multiprocessingパッケージで一番簡潔に実装する方法は Pool を使って関数とiterableオブジェクトを渡して、 map 関数のように処理することかなと思います。

from multiprocessing import Pool

with Pool() as p:
    outputs = list(p.map(f, inputs))

なお、同時実行数は Pool(processes=4) のように指定します(デフォルトはNoneであり、その場合は os.cpu_count() で表示されるCPU数が使われるようです)。

tqdmで処理の進捗を表示したい場合はimap(遅延評価版)を使う方法があるみたいです。

from tqdm import tqdm

with Pool() as p:
    imap = p.imap(f, inputs)
    outputs = list(tqdm(imap, total=len(inputs)))

ProcessPoolExecutor

concurrent.futures パッケージでは ThreadPoolExecutor と ProcessPoolExecutor が同じインターフェースを持つように実装されており、こちらでも同様にmapを呼び出す書き方ができます。

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as p:
    outputs = list(p.map(f, inputs))

tqdmを使う場合:process_map

tqdmを使って進捗を表示したい場合は process_map を使うのが良いかと思います。

from tqdm.contrib import concurrent
outputs = concurrent.process_map(f, inputs)

非同期処理(asyncio)

Python標準パッケージ asyncio を使った並行処理です。

概要

この方法の長短は以下のようなものがあるかなと思います。

長所:

  • I/Oバウンドに対して有効

短所:

  • CPUバウンドの対処にはならない
  • Jupyter内で実行できない
  • コード量がやや増える

「Jupyter内で実行できない」について

Jupyterで実行すると

RuntimeError: This event loop is already running

というエラーがでて動きません。

対策としてnest-asyncioを使えば実行できるようになります。 とはいえ、対策の手間がふえるのでデータ分析の場面でasyncioを使うのはやや非効率な方法に感じます。

コード例

例として以下の関数(コルーチン)を使います

import time

# 関数の例
async def f(x: int) -> float:
    time.sleep(0.1)
    return x / 10

# 入力値の例
inputs: list[int] = list(range(10))

asyncio.gather

gatherとrunを使う場合。

import asyncio

async def execute(inputs):
    coroutines = [f(x) for x in inputs]
    outputs = await asyncio.gather(*coroutines)
    return outputs

outputs = asyncio.run(execute(inputs))

tqdmを使う場合:tqdm_asyncio.gather

tqdm_asyncioというクラスにgather()という関数があるので、こちらに置き換えればよいようです(tqdm documentation)。

from tqdm.asyncio import tqdm_asyncio

async def execute(inputs):
    coroutines = [f(x) for x in inputs]
    outputs = await tqdm_asyncio.gather(*coroutines)
    return outputs


outputs = asyncio.run(execute(inputs))

同時実行数をコントロールしたい場合

asyncio.Semaphoreセマフォ)を使う方法がいちばん楽なのかなと思います。 f()async with文を挟む変更を加えることで並行処理数をコントロールできます。

semaphore = asyncio.Semaphore(5)  # 最大5つの並行処理を許可

async def f(x: int) -> float:
    async with semaphore:
        return x / 10

outputs = asyncio.run(execute(inputs))

ただし、Semaphoreはスレッドセーフではないようです・・・(同期プリミティブ — Python 3.11.8 ドキュメント)。

おわりに

あらためて思ったんですがasyncってわかりにくいし記述が複雑になるし、めんどくさくないですか?僕はいまだに苦手意識が強いです。

複雑なことをやりたいならasyncioは良さそうですが、手軽に並行処理したいだけならthreadingで足りるように思います。

しかしthreadingはスレッドセーフとか考える必要があるので、CPU数が十分ある環境ならばmultiprocessingが手軽で安全のように思います。

thread_map()process_map()だったら2行くらいで済むので、ほんとにお手軽だなと思いました。

from tqdm.contrib import concurrent
outputs = concurrent.process_map(f, inputs)