concurrent.futures
で並列処理する Python コード例を書きました。
ThreadPoolExecutor()
のスレッドプールを使った方法と、ProcessPoolExecutor()
のプロセスプールを使った方法です。
それぞれの『説明』と『コード例』です。
環境は Windows 10 Pro (64 bit) で、Python 3.8.6 (64 bit) を使用しました。
(ところで、Executor の発音は、Oxford Learner’s Dictionaries だと『イグゼッキュター』に聞こえました。ですが、YouTube で海外の方の Python 解説動画を見たら、『エクセキューター』みたいに読んでいました。読み方は色々あるみたいでした。)
説明
スレッドプールとプロセスプールの説明と Python マニュアルの場所です。
スレッドとプロセスの使い分け
ThreadPoolExecutor()
と ProcessPoolExecutor()
の使い分けです。
それぞれの用途については、Python マニュアルのコード例が参考になりました。
(Python) ThreadPoolExecutor の例
(Python) ProcessPoolExecutor の例
これらを見て、『通信』がメインなら『スレッド』で、『計算』がメインなら『プロセス』が良いのかなって思いました。
とはいっても、結局実際に試してみるのが簡単でした。
並列実行したときの『メモリ消費量』とか、『実行時間』とか、『同時実行数の上限』とか。
自分は ProcessPoolExecutor()
をメインに使っています(とりあえず高速でしたので)。
スレッドとプロセスの切り替えは、ThreadPoolExecutor()
と ProcessPoolExecutor()
を書きかえるだけでできました。
べつに、しばらく使ってみて、あとから変えても問題なかったです(自分が使ってみた範囲では)。
(2022年2月追記)ほかにも、自分はまだ本格的には試していませんが、subprocess.run()
で外部プログラム (.exe) を実行する場合に、『スレッド』が効率的に使える可能性がありました。
subprocess.run()
の使い方を書きました。
⇒ 【Python】subprocess.run() の使い方【使用例】
外部プログラムの子プロセスは、Python の世界から独立して動作するので、『ThreadPoolExecutor』でも同時並行的に処理を進めることができるのではないか?と思うのです。
Python の FAQ にも、似たような記述がありましたし。
C 拡張をうまく使うことも役立ちます。C 拡張を時間のかかるタスクの実行に使えば、その拡張は実行が C コードで行われている間 GIL を解放でき、その間に他のスレッドで作業が進められます。 zlib や hashlib など、すでにこれを行なっている標準ライブラリモジュールもあります。
(Python) グローバルインタプリタロック (Global Interpreter Lock) を取り除くことはできないのですか?(ライブラリと拡張 FAQ)
上記と同じような効果を期待して、スレッドには子プロセスの起動だけお願いして、子プロセスの完了を待つ間に、別の子プロセスをどんどん起動していってもらう感じです。
しかしながら、CPU のコア数やメモリに余裕があるなら、まずは『ProcessPoolExecutor』で並列化するというのも、妥当な判断だと思います。(追記終わり)
(2022年4月27日追記)実際に実験しました。
ThreadPoolExecutor()
で subprocess.run()
を効率的に実行することができました。
⇒ 【Python】マルチスレッド処理で高速化できた例【subprocess】
(追記終わり)
ThreadPoolExecutor()
スレッド プール エクセキューターです。
これは、『スレッド用』の Executor オブジェクトを返してくれました。
Executor には .map()
メソッドがありましたので、そこに自作関数と引数リストを渡しました。
これで、『マルチスレッド』で実行することができました。
(Python) class concurrent.futures.Executor
(Python) map(func, *iterables, timeout=None, chunksize=1)
同時実行数についてです。
ThreadPoolExecutor()
は、100 スレッドでも 1000 スレッドでも、同時に実行することができました。
試しに、1000 回分の time.sleep(1)
を 1000 スレッドで実行してみたら、約8秒で完了しました (CPU: Intel Core-i5 9500)。
1秒とまではいかなくても、かなり効率的になるんですね。
『待ち』が発生する処理を『まとめてスタート』したことで、全体的な実行時間を短縮することができました。
ProcessPoolExecutor()
との違いです。
プロセスプールでも、『まとめてスタート』することはできました。
ですが、同時に実行できるプロセスの数に、上限がありました。
なので、『待ち』のある処理を『多数』行うときは、ThreadPoolExecutor()
のほうが便利でした。
ProcessPoolExecutor()
プロセス プール エクセキューターです。
これは、『プロセス用』の Executor オブジェクトを返してくれました。
Executor には .map()
メソッドがありましたので、そこに自作関数と引数リストを渡しました。
これで、『マルチプロセス』で実行することができました。
(Python) class concurrent.futures.Executor
(Python) map(func, *iterables, timeout=None, chunksize=1)
同時実行数についてです。
Python マニュアルによると、Windows の場合は、max_workers=61
以下にしないと、ValueError
が出るとのことでした。
On Windows, max_workers must be less than or equal to 61. If it is not then ValueError will be raised.
ですが、実際に試してみたところ、61 ではだめでした。
さらに1つ少ない max_workers=60
以下にしないと、ValueError
が発生しました。
一応、ValueError
が出たあとも、ワーカープロセスは動いていました。
ですが、そのあと generator から結果を取り出すところで、メインプロセスが進まなくなりました。
そういうわけで、自分の環境 (Windows 10) では、上限は 60 プロセスでした。
ということは、もし、64 コア 128 スレッドの CPU 『ライゼン スレッドリッパー (AMD) Ryzen™ Threadripper™ 3990X』を使うことができたとしても、『フルパワー』を発揮するためには『工夫が必要』、ということなんでしょうかね…?
initializer と initargs の用途と使い方
initializer
と initargs
の使い道です。
イニシャライザは、たとえば ProcessPoolExecutor
で、『各子プロセスのロギング設定』をするときに使用しました。initargs
は、『ログのファイルパス』や『Queue
』を渡すために使用しました。
initializer
と initargs
の使い方(使用例)を書きました。
⇒ ProcessPoolExecutor でログを記録するコード例(プロセスごとに分ける)
⇒ ProcessPoolExecutor でログを記録するコード例(まとめて記録)
もちろん、ほかにも使い道はあると思います。
if __name__ == '__main__': main()
main 関数の呼び出しについてです。
main 関数を呼び出すときは、if __name__ == '__main__':
の中で呼び出すと、うまく動きました。
Python マニュアルの場所です。
ページは multiprocessing のものですが、concurrent.futures の場合にも、あてはまりました。
(Python) multiprocessing
はじめに(子プロセスがそのモジュールを正常にインポートできるようにする一般的な方法)
(Python) multiprocessing
メインモジュールの安全なインポート
もし、if __name__ == '__main__':
を付けなかったら、どうなるのか?
そのときは、子プロセスが main 関数を呼び出してしまって、エラーが発生して止まりました。
(エラー例)concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
ところで、__name__
の中には、何が入っているのか?
子プロセスのときは、'__mp_main__'
という文字列が入っていました。
これは、if 文に書いた '__main__'
とはちがっているので、main 関数の実行がスキップされる、という仕組みでした。
ThreadPoolExecutor() のコード例
ThreadPoolExecutor()
で並列処理するコード例です。
"""(スレッドプール版)
concurrent.futures の ThreadPoolExecutor で
並列実行する Python コード例です。
"""
from concurrent.futures import ThreadPoolExecutor
import os # デバッグ用
import time # デバッグ用
import threading # デバッグ用
def main():
"""メイン関数です。"""
print('start\n')
t = time.time()
# 自作関数 jisaku_func を用意します。
# (最後のほうに書きました)
# 引数リスト src_datas を用意します。
src_datas = [
[3, 'c'],
[2, 'b'],
[1, 'a'],
]
# スレッドプールを作ります。
# プロセスプールとちがって、スレッドの数は
# max_workers=1000 スレッドでも実行できました。
datas = []
print('ThreadPoolExecutor().map()')
with ThreadPoolExecutor(max_workers=3) as executor:
# 『関数』と『引数リスト』を渡して、実行します。
results = executor.map(
jisaku_func_wrapper, src_datas, timeout=None)
# (裏では既にスレッドが動いています)
# timeout の設定は、with 文の中にいる間だけ機能しました。
# 戻り値は『ジェネレーター』になっていました。
# (例) results: <class 'generator'>
print(f'results: {type(results)}')
# 『ジェネレーター』から、結果を取り出します。
# 並列処理は、結果を with 文の中で取り出すようにした
# ときだけ、Ctrl+C でうまく中断することができました。
for data in results:
datas.append(data)
# 結果を表示します。
print('(number, text, ppid, pid, tid)')
for data in datas:
print(data)
print('\n(メイン関数を実行していたプロセスの ID)')
print(f'pid: {os.getpid()}')
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
print('\nend')
return
def jisaku_func_wrapper(args):
"""引数をアンパックして渡す関数です。"""
return jisaku_func(*args)
def jisaku_func(number, text):
"""自作関数です。並列で実行します。"""
print(f'実行中 jisaku_func({number}, {text})')
# (デバッグ) 親プロセス ID (PPID) を取得してみます。
ppid = os.getppid()
# (デバッグ) プロセス ID (PID) を取得してみます。
pid = os.getpid()
# (デバッグ) スレッド ID (TID) を取得してみます。
# tid = threading.get_ident() # Python 3.3 から使用可能
tid = threading.get_native_id() # Python 3.8 から使用可能
# (デバッグ) ディスク I/O 待ちや HTTP 通信待ちなど。
time.sleep(number)
print(f'完了 jisaku_func({number}, {text})')
# 結果をタプル tuple に入れて返します。
return (number, text, ppid, pid, tid)
if __name__ == '__main__':
main()
実行結果
スレッドプールの実行結果です。
start
ThreadPoolExecutor().map()
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
results: <class 'generator'>
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(number, text, ppid, pid, tid)
(3, 'c', 14368, 18156, 18276)
(2, 'b', 14368, 18156, 5224)
(1, 'a', 14368, 18156, 14776)
(メイン関数を実行していたプロセスの ID)
pid: 18156
経過時間 3.009 秒
end
ProcessPoolExecutor() のコード例
ProcessPoolExecutor()
で並列処理するコード例です。
"""(プロセスプール版)
concurrent.futures の ProcessPoolExecutor で
並列実行する Python コード例です。
"""
from concurrent.futures import ProcessPoolExecutor
import os # デバッグ用
import time # デバッグ用
import threading # デバッグ用
def main():
"""メイン関数です。"""
print('start\n')
t = time.time()
# 自作関数 jisaku_func を用意します。
# (最後のほうに書きました)
# 引数リスト src_datas を用意します。
src_datas = [
[3, 'c'],
[2, 'b'],
[1, 'a'],
]
# プロセスプールを作ります。
# 自分の環境では、max_workers=60 プロセスまで OK でした。
# max_workers=61 の時のエラーメッセージは
# "ValueError: need at most 63 handles, got a sequence of length 63"
# でした。max_workers=62 以降のエラーメッセージは
# "ValueError: max_workers must be <= 61" でした。
datas = []
print('ProcessPoolExecutor().map()')
with ProcessPoolExecutor(max_workers=3) as executor:
# 『関数』と『引数リスト』を渡して、実行します。
results = executor.map(
jisaku_func_wrapper, src_datas, timeout=None)
# (裏では既にプロセスが動いています)
# timeout の設定は、with 文の中にいる間だけ機能しました。
# 戻り値は『ジェネレーター』になっていました。
# (例) results: <class 'generator'>
print(f'results: {type(results)}')
# 『ジェネレーター』から、結果を取り出します。
# 並列処理は、結果を with 文の中で取り出すようにした
# ときだけ、Ctrl+C でうまく中断することができました。
# あと、max_workers が多すぎて ValueError になったときは、
# for 文の開始直後から、メインプロセスが進まなくなりました。
for data in results:
datas.append(data)
# 結果を表示します。
print('(number, text, ppid, pid, tid)')
for data in datas:
print(data)
print('\n(メイン関数を実行していたプロセスの ID)')
print(f'pid: {os.getpid()}')
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
print('\nend')
return
def jisaku_func_wrapper(args):
"""引数をアンパックして渡す関数です。"""
return jisaku_func(*args)
def jisaku_func(number, text):
"""自作関数です。並列で実行します。"""
print(f'実行中 jisaku_func({number}, {text})')
# (デバッグ) 親プロセス ID (PPID) を取得してみます。
ppid = os.getppid()
# (デバッグ) プロセス ID (PID) を取得してみます。
pid = os.getpid()
# (デバッグ) スレッド ID (TID) を取得してみます。
# tid = threading.get_ident() # Python 3.3 から使用可能
tid = threading.get_native_id() # Python 3.8 から使用可能
# (デバッグ) ディスク I/O 待ちや HTTP 通信待ちなど。
time.sleep(number)
print(f'完了 jisaku_func({number}, {text})')
# 結果をタプル tuple に入れて返します。
return (number, text, ppid, pid, tid)
if __name__ == '__main__':
main()
実行結果
プロセスプールの実行結果です。
start
ProcessPoolExecutor().map()
results: <class 'generator'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(number, text, ppid, pid, tid)
(3, 'c', 10124, 17664, 15200)
(2, 'b', 10124, 4828, 19412)
(1, 'a', 10124, 13280, 13960)
(メイン関数を実行していたプロセスの ID)
pid: 10124
経過時間 3.114 秒
end
Pause キーでの一時停止はできた
マルチプロセス処理を実行中に、キーボードの Pause キーを押したら、プログラムの実行を一時停止することができました。
プログラムを一時停止するにはPAUSEキーを押す【Python】
そして、矢印キーなど、何かキーを押したら、再びプログラムが動き始めました。
一時停止の Pause キーや、再開するときの矢印キーは、コマンドプロンプトのウィンドウをクリックするなどして、アクティブにしてから押すと反応しました。
Pause キーを押しても、特にメッセージなどは表示されませんでした。
ですが、タスクマネージャーの CPU 使用率を見たら0%近くまで下がっていたので、意図した通り、プログラムが一時停止していたことがわかりました。
Pause キーは便利なので、自分はよく使っています。
Ctrl + C での中断はできた
自分が試した限りでは、並列処理中でも、Ctrl + C
でプログラムを中断することができました。
ThreadPoolExecutor()
でも ProcessPoolExecutor()
でも、メインプロセスが止まることなく、中断することができました。
ただし、KeyboardInterrupt
と with
ブロックについてです。
実験してみて分かった挙動がありました。
ワーカーでの KeyboardInterrupt キャッチについて
ThreadPoolExecutor()
だと、ワーカースレッドに渡した自作関数の中では、KeyboardInterrupt
の例外をキャッチすることができませんでした。
どうやら、ワーカースレッドは、Ctrl + C
を押しても止まらずに、自作関数を最後まで実行してくれるようでした。
そして、ワーカースレッドの実行が完了したあとに、メインプロセスで KeyboardInterrupt
をキャッチすることができました。
一方で、ProcessPoolExecutor()
だと、ワーカープロセスの中でも KeyboardInterrupt
をキャッチすることができました。
なので、スレッドよりも中断が早かったです。
Ctrl + C
を押したら、ほぼすぐにワーカープロセスが反応して、KeyboardInterrupt
をキャッチすることができました。
そして、ワーカープロセスが完了したあとに、メインプロセスでも KeyboardInterrupt
をキャッチすることができました。
with ブロックの中で結果を取り出す
Ctrl + C
でプログラムを中断する場合です。
結果を取り出すタイミングで、挙動が変わりました。
executor.map()
の戻り値の generator から、結果を取り出すときです。
with
ブロックの中で for
文にかけたら、すみやかに並列処理を中断することができました。
一方で、with
ブロックを抜けてから for
文にかけるようにしたら、『ワーカースレッド』や『ワーカープロセス』へのタスク投入が止まらなかったです。
Ctrl + C
を押しても、引数リストの最後まで実行されました。
そのあとでようやく、メインプロセスで KeyboardInterrupt
をキャッチすることができました。
たとえ、KeyboardInterrupt
をキャッチできる『ワーカープロセス』の場合でも、だめでした。
引数リストの最後まで止まりませんでした。
KeyboardInterrupt
で、いったんはプロセスが終了したのですが、すぐに次のタスクが実行されて、結局、並列処理を止めることができませんでした。
引数リストの最後に達するまで、どんどんプロセスが復活しました。
そういうわけで、with
ブロックの中では、結果を取り出すところまでやるのが、たぶん正解なんじゃないかと思いました。
with
を抜けた後で結果を取り出す構造だと、Ctrl + C
を押したあとで、プログラムが進まなくなったりしました。
ところで、with
は Executor
をシャットダウンするとのことでした。
with
文はExecutor
をシャットダウンします (wait
をTrue
にセットしてExecutor.shutdown()
が呼ばれたかのように待ちます)。
なので、想像ですが、挙動の違いはこの辺りが関係していたのかもしれません。
タイムアウト
executor.map(func, iterables, timeout)
のタイムアウトについても、with
ブロックの中で結果を取り出したときに、うまく機能しました。
例外は concurrent.futures._base.TimeoutError
が発生しました。
一方で、with
を抜けた後で結果を取り出す構造だと、タイムアウトが発生しませんでした。
こちらも、with
を抜けたときのシャットダウンが関係していたのかもしれません。
with
文はExecutor
をシャットダウンします (wait
をTrue
にセットしてExecutor.shutdown()
が呼ばれたかのように待ちます)。
ログを記録するコード例
ProcessPoolExecutor
で並列処理しているときにログを記録するコード例を書きました。
⇒ ProcessPoolExecutor でログを記録するコード例(プロセスごとに分ける)
⇒ ProcessPoolExecutor でログを記録するコード例(まとめて記録)
プロセスの優先度を変更する
マルチプロセス処理を実行しているときは、プロセスの優先度を通常よりも下げておくと、ほかの作業が快適になりました。
実行中のPythonプログラムの優先度を下げる方法【psutil】
CPUが100%の状態でも、すぐにCPUの時間を分けてくれました。
そして、CPUが空けば、再び全力を発揮してくれました。
なので、優先度を下げたからといって、処理速度が極端に落ちるといったこともありませんでした。
自分はよく、Python プログラムの開始地点(エントリーポイント)の近くで、プロセスの優先度を『低 (idle, psutil.IDLE_PRIORITY_CLASS)』に下げる、といった処理を書いています。
これで、『計算しつつ、Web を見る』といったことが、より快適にできるようになりました。
メモリ不足を回避する
concurrent.futures の ProcessPoolExecutor() の executor.map() で、1度にたくさんのデータを処理しようとしたら、メモリ不足になりました。
原因は、『executor.map() が、すべての処理を完了するまでのあいだ、1つ1つの実行結果の戻り値を、保持し続けるため。』のようでした。
たとえ、戻り値のジェネレーターから結果をどんどん受け取って、del 文で削除していっても、メモリの使用量は減りませんでした。
メモリの使用量は、executor.map() がすべての処理を完了したあたりで、減りました。
具体的には、executor.map() の戻り値のジェネレーターから、最後の結果を受け取ったあたりで減ったように見えました。
(メモリの使用量はタスクマネージャーで見ました)
Python マニュアルを見ても、そういった動作の説明は見つかりませんでしたが、とりあえずそういうものなんだと、自分は理解しました。
メモリ不足の回避方法です。
自分は、たとえば『1万件ずつ executor.map() に渡す』といった方法で、メモリ不足を回避しています。
具体的には、ProcessPoolExecutor の with ブロックの中で、『1万件だけ executor.map() に渡して、戻り値のジェネレーターからすべての結果を受け取ったら、再び1万件だけ executor.map() に渡す』といった感じです。
これで、メモリ不足に陥ることなく、たくさんのデータを、マルチプロセスで処理することができました。
結果を非同期で受け取れるからと言って、結果を受け取ればその分のメモリが解放される、というわけではなかったんですね。
対策としては、データを小分けにして実行するだけでしたが、そのぶん、Python コードはどうしても少し複雑になりました。
『データを小分けにするやり方』です。
自分は、ProcessPoolExecutor の with ブロックの中に、while 文を用意しました。
そして、while 文の中で、『組み込み関数の next() を使って1万件に達するまでデータを読み込む ⇒ executor.map() を呼び出す。それを繰り返す。そして、next() で StopIteration の例外が発生したら、最後の executor.map() を呼び出して、完了したら while 文を break する。』みたいな感じで、小分けの繰り返し処理を実現しました。
データがリストなら、スライス表記で効率的に分割できたんですけれどもね。
巨大なデータを扱うときって、たいてい、データをイテレーターやジェネレーターで扱っていて、効率的に分割することがむずかしかったりしました。
なので、while 文と next 関数を使用した愚直な方法が、役に立ちました。
ほかにも、分割実行をするための良い方法は、あると思います。
自分は、大きなデータを扱うときだけ、小分けの分割処理を書いて対応しています。
同様のメモリ不足は、multiprocessing.Pool() の .imap() とか、.imap_unordered() でも起こりました。
対策も同じで、小分けの分割処理にすることで、回避することができました。
以上です。