【Python】concurrent.futures の使い方【並列処理】

concurrent.futuresコンカレント ドット フューチャース並列処理する Python コード例を書きました。

ThreadPoolExecutorスレッド プール エクセキューター() のスレッドプールを使った方法と、ProcessPoolExecutorプロセス プール エクセキューター() のプロセスプールを使った方法です。

それぞれの『説明』と『コード例』です。

環境は Windows 10 Pro (64 bit) で、Python 3.8.6 (64 bit) を使用しました。

(ところで、Executor の発音は、Oxford Learner’s Dictionaries だと『イグゼッキュター』に聞こえました。ですが、YouTube で海外の方の Python 解説動画を見たら、『エクセキューター』みたいに読んでいました。読み方は色々あるみたいでした。)

multiprocessingマルチプロセッシング モジュールを使用した並列処理のコード例は、【Python】multiprocessing.Pool() の使い方【並列処理】に書きました。

説明

スレッドプールとプロセスプールの説明と Python マニュアルの場所です。

スレッドとプロセスの使い分け

ThreadPoolExecutor()ProcessPoolExecutor() の使い分けです。

これは、実際に試してみて決めるのが簡単だと思いました。

並列実行したときの『メモリ消費量』とか、『実行時間』とか、『同時実行数の上限』とか。

自分は、そういったところを見て、どちらにするかを決めました。

スレッドとプロセスの切り替えは、ThreadPoolExecutor()ProcessPoolExecutor() を書きかえるだけでできました。

べつに、しばらく使ってみて、あとから変えても問題なかったです(自分が使ってみた範囲では)。

それぞれの用途については、Python マニュアルのコード例が参考になりました。

(Python) ThreadPoolExecutor の例

(Python) ProcessPoolExecutor の例

これらを見て、『通信』がメインなら『スレッド』で、『計算』がメインなら『プロセス』が良いのかなって思いました。

ThreadPoolExecutor()

スレッド プール エクセキューターです。

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

これは、『スレッド用』の Executor オブジェクトを返してくれました。

Executor には .mapマップ() メソッドがありましたので、そこに自作関数と引数ひきすうリストを渡しました。

これで、『マルチスレッド』で実行することができました。

class concurrent.futures.Executor

map(func, *iterables, timeout=None, chunksize=1)

同時実行数についてです。

ThreadスレッドPoolExecutor() は、100 スレッドでも 1000 スレッドでも、同時に実行することができました。

試しに、1000 回分の time.sleep(1) を 1000 スレッドで実行してみたら、約8秒で完了しました (CPU: Intel Core-i5 9500)。

1秒とまではいかなくても、かなり効率的になるんですね。

『待ち』が発生する処理を『まとめてスタート』したことで、全体的な実行時間を短縮することができました。

ProcessプロセスPoolExecutor() との違いです。

プロセスプールでも、『まとめてスタート』することはできました。

ですが、同時に実行できるプロセスの数に、上限がありました。

なので、『待ち』のある処理を『多数』行うときは、ThreadPoolExecutor() のほうが便利でした。

ProcessPoolExecutor()

プロセス プール エクセキューターです。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

これは、『プロセス用』の Executor オブジェクトを返してくれました。

Executor には .mapマップ() メソッドがありましたので、そこに自作関数と引数ひきすうリストを渡しました。

これで、『マルチプロセス』で実行することができました。

class concurrent.futures.Executor

map(func, *iterables, timeout=None, chunksize=1)

同時実行数についてです。

Python マニュアルによると、Windows の場合は、max_workers=61 以下にしないと、ValueError が出るとのことでした。

On Windows, max_workers must be less than or equal to 61. If it is not then ValueError will be raised.

(Python) class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

ですが、実際に試してみたところ、61 ではだめでした。

さらに1つ少ない max_workers=60 以下にしないと、ValueError が発生しました。

一応、ValueError が出たあとも、ワーカープロセスは動いていました。

ですが、そのあと generatorジェネレーター から結果を取り出すところで、メインプロセスが進まなくなりました。

そういうわけで、自分の環境 (Windows 10) では、上限は 60 プロセスでした。

ということは、もし、64 コア 128 スレッドの CPU 『ライゼン スレッドリッパー (AMD) Ryzen™ Threadripper™ 3990X』を使うことができたとしても、『フルパワー』を発揮するためには『工夫が必要』、ということなんでしょうかね…?

if __name__ == '__main__': main()

main 関数の呼び出しについてです。
main 関数を呼び出すときは、if __name__ == '__main__': の中で呼び出すと、うまく動きました。

Python マニュアルの場所です。

ページは multiprocessing のものですが、concurrent.futures の場合にも、あてはまりました。

(Python) multiprocessing はじめに(子プロセスがそのモジュールを正常にインポートできるようにする一般的な方法)

(Python) multiprocessing メインモジュールの安全なインポート

もし、if __name__ == '__main__': を付けなかったら、どうなるのか?

そのときは、プロセスが main 関数を呼び出してしまって、エラーが発生して止まりました。

(エラー例)concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

ところで、__name__ の中には、何が入っているのか?

子プロセスのときは、'__mp_main__' という文字列が入っていました。

これは、if 文に書いた '__main__' とはちがっているので、main 関数の実行がスキップされる、という仕組みでした。

ThreadPoolExecutor() のコード例

ThreadPoolExecutorスレッド プール エクセキューター() で並列処理するコード例です。

"""(スレッドプール版)
concurrent.futures の ThreadPoolExecutor で
並列実行する Python コード例です。
"""

from concurrent.futures import ThreadPoolExecutor
import os # デバッグ用
import time # デバッグ用
import threading # デバッグ用

def main():
    """メイン関数です。"""
    print('start\n')
    t = time.time()

    # 自作関数 jisaku_func を用意します。
    # (最後のほうに書きました)

    # 引数リスト src_datas を用意します。
    src_datas = [
        [3, 'c'], # 最初(さいしょ)の引数
        [2, 'b'],
        [1, 'a'], # 最後(さいご)の引数
    ]

    # スレッドプールを作ります。
    # プロセスプールとちがって、スレッドの数は
    # max_workers=1000 スレッドでも実行できました。
    datas = []
    print('ThreadPoolExecutor().map()')
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 『関数』と『引数リスト』を渡して、実行します。
        results = executor.map(
            jisaku_func_wrapper, src_datas, timeout=None)
        # (裏では既にスレッドが動いています)
        # timeout の設定は、with 文の中にいる間だけ機能しました。

        # 戻り値は『ジェネレーター』になっていました。
        # (例) results: <class 'generator'>
        print(f'results: {type(results)}')

        # 『ジェネレーター』から、結果を取り出します。
        # 並列処理は、結果を with 文の中で取り出すようにした
        # ときだけ、Ctrl+C でうまく中断することができました。
        for data in results:
            datas.append(data)

    # 結果を表示します。
    print('(number, text, ppid, pid, tid)')
    for data in datas:
        print(data)

    print('\n(メイン関数を実行していたプロセスの ID)')
    print(f'pid: {os.getpid()}')

    elapsed_time = time.time() - t
    print(f'経過時間 {elapsed_time:.3f} 秒')
    print('\nend')
    return


def jisaku_func_wrapper(args):
    """引数をアンパックして渡す関数です。"""
    return jisaku_func(*args)


def jisaku_func(number, text):
    """自作関数です。並列で実行します。"""
    print(f'実行中 jisaku_func({number}, {text})')

    # (デバッグ) 親プロセス ID (PPID) を取得してみます。
    ppid = os.getppid()

    # (デバッグ) プロセス ID (PID) を取得してみます。
    pid = os.getpid()

    # (デバッグ) スレッド ID (TID) を取得してみます。
    # tid = threading.get_ident() # Python 3.3 から使用可能
    tid = threading.get_native_id() # Python 3.8 から使用可能

    # (デバッグ) ディスク I/O 待ちや HTTP 通信待ちなど。
    time.sleep(number)

    print(f'完了 jisaku_func({number}, {text})')

    # 結果をタプル tuple に入れて返します。
    return (number, text, ppid, pid, tid)


if __name__ == '__main__':
    main()

実行結果

スレッドプールの実行結果です。

start

ThreadPoolExecutor().map()
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
results: <class 'generator'>
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(number, text, ppid, pid, tid)
(3, 'c', 14368, 18156, 18276)
(2, 'b', 14368, 18156, 5224)
(1, 'a', 14368, 18156, 14776)

(メイン関数を実行していたプロセスの ID)
pid: 18156
経過時間 3.009 秒

end

ProcessPoolExecutor() のコード例

ProcessPoolExecutorプロセス プール エクセキューター() で並列処理するコード例です。

"""(プロセスプール版)
concurrent.futures の ProcessPoolExecutor で
並列実行する Python コード例です。
"""

from concurrent.futures import ProcessPoolExecutor
import os # デバッグ用
import time # デバッグ用
import threading # デバッグ用

def main():
    """メイン関数です。"""
    print('start\n')
    t = time.time()

    # 自作関数 jisaku_func を用意します。
    # (最後のほうに書きました)

    # 引数リスト src_datas を用意します。
    src_datas = [
        [3, 'c'], # 最初(さいしょ)の引数
        [2, 'b'],
        [1, 'a'], # 最後(さいご)の引数
    ]

    # プロセスプールを作ります。
    # 自分の環境では、max_workers=60 プロセスまで OK でした。
    # max_workers=61 の時のエラーメッセージは
    # "ValueError: need at most 63 handles, got a sequence of length 63"
    # でした。max_workers=62 以降のエラーメッセージは
    # "ValueError: max_workers must be <= 61" でした。
    datas = []
    print('ProcessPoolExecutor().map()')
    with ProcessPoolExecutor(max_workers=3) as executor:
        # 『関数』と『引数リスト』を渡して、実行します。
        results = executor.map(
            jisaku_func_wrapper, src_datas, timeout=None)
        # (裏では既にプロセスが動いています)
        # timeout の設定は、with 文の中にいる間だけ機能しました。

        # 戻り値は『ジェネレーター』になっていました。
        # (例) results: <class 'generator'>
        print(f'results: {type(results)}')

        # 『ジェネレーター』から、結果を取り出します。
        # 並列処理は、結果を with 文の中で取り出すようにした
        # ときだけ、Ctrl+C でうまく中断することができました。
        # あと、max_workers が多すぎて ValueError になったときは、
        # for 文の開始直後から、メインプロセスが進まなくなりました。
        for data in results:
            datas.append(data)

    # 結果を表示します。
    print('(number, text, ppid, pid, tid)')
    for data in datas:
        print(data)

    print('\n(メイン関数を実行していたプロセスの ID)')
    print(f'pid: {os.getpid()}')

    elapsed_time = time.time() - t
    print(f'経過時間 {elapsed_time:.3f} 秒')
    print('\nend')
    return


def jisaku_func_wrapper(args):
    """引数をアンパックして渡す関数です。"""
    return jisaku_func(*args)


def jisaku_func(number, text):
    """自作関数です。並列で実行します。"""
    print(f'実行中 jisaku_func({number}, {text})')

    # (デバッグ) 親プロセス ID (PPID) を取得してみます。
    ppid = os.getppid()

    # (デバッグ) プロセス ID (PID) を取得してみます。
    pid = os.getpid()

    # (デバッグ) スレッド ID (TID) を取得してみます。
    # tid = threading.get_ident() # Python 3.3 から使用可能
    tid = threading.get_native_id() # Python 3.8 から使用可能

    # (デバッグ) ディスク I/O 待ちや HTTP 通信待ちなど。
    time.sleep(number)

    print(f'完了 jisaku_func({number}, {text})')

    # 結果をタプル tuple に入れて返します。
    return (number, text, ppid, pid, tid)


if __name__ == '__main__':
    main()

実行結果

プロセスプールの実行結果です。

start

ProcessPoolExecutor().map()
results: <class 'generator'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(number, text, ppid, pid, tid)
(3, 'c', 10124, 17664, 15200)
(2, 'b', 10124, 4828, 19412)
(1, 'a', 10124, 13280, 13960)

(メイン関数を実行していたプロセスの ID)
pid: 10124
経過時間 3.114 秒

end

Ctrl + C での中断はできた

自分が試した限りでは、並列処理中でも、Ctrl + Cコントロール シー でプログラムを中断することができました。

ThreadPoolExecutor() でも ProcessPoolExecutor() でも、メインプロセスが止まることなく、中断することができました。

ただし、KeyboardInterruptキーボードインターラプトwith ブロックについてです。

実験してみて分かった挙動がありました。

ワーカーでの KeyboardInterrupt キャッチについて

ThreadスレッドPoolExecutor() だと、ワーカースレッドに渡した自作関数の中では、KeyboardInterrupt の例外をキャッチすることができませんでした。

どうやら、ワーカースレッドは、Ctrl + C を押しても止まらずに、自作関数を最後まで実行してくれるようでした。

そして、ワーカースレッドの実行が完了したあとに、メインプロセスで KeyboardInterrupt をキャッチすることができました。

一方で、ProcessプロセスPoolExecutor() だと、ワーカープロセスの中でも KeyboardInterrupt をキャッチすることができました。

なので、スレッドよりも中断が早かったです。

Ctrl + C を押したら、ほぼすぐにワーカープロセスが反応して、KeyboardInterrupt をキャッチすることができました。

そして、ワーカープロセスが完了したあとに、メインプロセスでも KeyboardInterrupt をキャッチすることができました。

with ブロックの中で結果を取り出す

Ctrl + C でプログラムを中断する場合です。

結果を取り出すタイミングで、挙動が変わりました。

executor.map()もどgeneratorジェネレーター から、結果を取り出すときです。

with ブロックの中で for 文にかけたら、すみやかに並列処理を中断することができました。

一方で、with ブロックを抜けてから for 文にかけるようにしたら、『ワーカースレッド』や『ワーカープロセス』へのタスク投入が止まらなかったです。

Ctrl + C を押しても、引数リストの最後まで実行されました。

そのあとでようやく、メインプロセスで KeyboardInterrupt をキャッチすることができました。

たとえ、KeyboardInterrupt をキャッチできる『ワーカープロセス』の場合でも、だめでした。

引数リストの最後まで止まりませんでした。

KeyboardInterrupt で、いったんはプロセスが終了したのですが、すぐに次のタスクが実行されて、結局、並列処理を止めることができませんでした。

引数リストの最後に達するまで、どんどんプロセスが復活しました。

そういうわけで、with ブロックの中では、結果を取り出すところまでやるのが、たぶん正解なんじゃないかと思いました。

with を抜けた後で結果を取り出す構造だと、Ctrl + C を押したあとで、プログラムが進まなくなったりしました。

ところで、withExecutor をシャットダウンするとのことでした。

with 文は Executor をシャットダウンします (waitTrue にセットして Executor.shutdown() が呼ばれたかのように待ちます)。

(Python) shutdown(wait=True, *, cancel_futures=False)

なので、想像ですが、挙動の違いはこの辺りが関係していたのかもしれません。

タイムアウト

executor.map(func, iterables, timeout)タイムアウトについても、with ブロックの中で結果を取り出したときに、うまく機能しました。

例外は concurrent.futures._base.TimeoutError が発生しました。

一方で、with を抜けた後で結果を取り出す構造だと、タイムアウトが発生しませんでした。

こちらも、with を抜けたときのシャットダウンが関係していたのかもしれません。

with 文は Executor をシャットダウンします (waitTrue にセットして Executor.shutdown() が呼ばれたかのように待ちます)。

(Python) shutdown(wait=True, *, cancel_futures=False)

ProcessPoolExecutor を使用している記事

【Python】SQLite で日本語を全文検索するコード例【N-Gram, FTS4/FTS5】 の記事にも、concurrent.futures の ProcessPoolExecutor() を使用した Python コードがあります。

『add_texts()』という自作関数のところで使用しました。

ProcessPoolExecutor() と、multiprocessing.Pool() の2種類の方法を、切りかえて使えるように書いてみました。

コードの書き方は、ほとんど同じでした。

メモリ不足を回避する

concurrent.futures の ProcessPoolExecutor() の executor.map() で、1度にたくさんのデータを処理しようとしたら、メモリ不足になりました。

原因は、『executor.map() が、すべての処理を完了するまでのあいだ、1つ1つの実行結果のもどを、保持し続けるため。』のようでした。

たとえ、戻り値のジェネレーターから結果をどんどん受け取って、del 文で削除していっても、メモリの使用量は減りませんでした。

メモリの使用量は、executor.map() がすべての処理を完了したあたりで、減りました。

具体的には、executor.map() の戻り値のジェネレーターから、最後の結果を受け取ったあたりで減ったように見えました。

(メモリの使用量はタスクマネージャーで見ました)

Python マニュアルを見ても、そういった動作の説明は見つかりませんでしたが、とりあえずそういうものなんだと、自分は理解しました。

メモリ不足の回避方法です。

自分は、たとえば『1万件ずつ executor.map() に渡す』といった方法で、メモリ不足を回避しています。

具体的には、ProcessPoolExecutor の with ブロックの中で、『1万件だけ executor.map() に渡して、戻り値のジェネレーターからすべての結果を受け取ったら、再び1万件だけ executor.map() に渡す』といった感じです。

これで、メモリ不足に陥ることなく、たくさんのデータを、マルチプロセスで処理することができました。

結果を非同期ひどうきで受け取れるからと言って、結果を受け取ればその分のメモリが解放される、というわけではなかったんですね。

対策としては、データを小分けにして実行するだけでしたが、そのぶん、Python コードはどうしても少し複雑になりました。

『データを小分けにするやり方』です。

自分は、ProcessPoolExecutor の with ブロックの中に、while 文を用意しました。

そして、while 文の中で、『組み込み関数の next() を使って1万件に達するまでデータを読み込む ⇒ executor.map() を呼び出す。それを繰り返す。そして、next() で StopIteration の例外が発生したら、最後の executor.map() を呼び出して、完了したら while 文を break する。』みたいな感じで、小分けの繰り返し処理を実現しました。

データがリストなら、スライス表記で効率的に分割できたんですけれどもね。

巨大なデータを扱うときって、たいてい、データをイテレーターやジェネレーターで扱っていて、効率的に分割することがむずかしかったりしました。

なので、while 文と next 関数を使用した愚直な方法が、役に立ちました。

ほかにも、分割実行をするための良い方法は、あると思います。

自分は、大きなデータを扱うときだけ、小分けの分割処理を書いて対応しています。

同様のメモリ不足は、multiprocessing.Pool() の .imap() とか、.imap_unordered() でも起こりました。

対策も同じで、小分けの分割処理にすることで、回避することができました。

以上です。

タイトルとURLをコピーしました