concurrent.futures
で並列処理する Python コード例を書きました。
ThreadPoolExecutor()
のスレッドプールを使った方法と、ProcessPoolExecutor()
のプロセスプールを使った方法です。
それぞれの『説明』と『コード例』です。
環境は Windows 10 Pro (64 bit) で、Python 3.8.6 (64 bit) を使用しました。
(ところで、Executor の発音は、Oxford Learner’s Dictionaries だと『イグゼッキュター』に聞こえました。ですが、YouTube で海外の方の Python 解説動画を見たら、『エクセキューター』みたいに読んでいました。読み方は色々あるみたいでした。)
※ multiprocessing モジュールを使用した並列処理のコード例は、【Python】multiprocessing.Pool() の使い方【並列処理】に書きました。
説明
スレッドプールとプロセスプールの説明と Python マニュアルの場所です。
スレッドとプロセスの使い分け ⇒ 試して決める
ThreadPoolExecutor()
と ProcessPoolExecutor()
の使い分けです。
これは、実際に試してみて決めるのが簡単だと思いました。
並列実行したときの『メモリ消費量』とか、『実行時間』とか、『同時実行数の上限』とか。
自分は、そういったところを見て、どちらにするかを決めました。
スレッドとプロセスの切り替えは、ThreadPoolExecutor()
と ProcessPoolExecutor()
を書きかえるだけでできました。
べつに、しばらく使ってみて、あとから変えても問題なかったです(自分が使ってみた範囲では)。
それぞれの用途については、Python マニュアルのコード例が参考になりました。
(Python) ThreadPoolExecutor の例
(Python) ProcessPoolExecutor の例
これらを見て、『通信』がメインなら『スレッド』で、『計算』がメインなら『プロセス』が良いのかなって思いました。
ThreadPoolExecutor()
スレッド プール エクセキューターです。
これは、『スレッド用』の Executor オブジェクトを返してくれました。
Executor には .map()
メソッドがありましたので、そこに自作関数と引数リストを渡しました。
これで、『マルチスレッド』で実行することができました。
class concurrent.futures.Executor
map(func, *iterables, timeout=None, chunksize=1)
同時実行数についてです。
ThreadPoolExecutor()
は、100 スレッドでも 1000 スレッドでも、同時に実行することができました。
試しに、1000 回分の time.sleep(1)
を 1000 スレッドで実行してみたら、約8秒で完了しました (CPU: Intel Core-i5 9500)。
1秒とまではいかなくても、かなり効率的になるんですね。
『待ち』が発生する処理を『まとめてスタート』したことで、全体的な実行時間を短縮することができました。
ProcessPoolExecutor()
との違いです。
プロセスプールでも、『まとめてスタート』することはできました。
ですが、同時に実行できるプロセスの数に、上限がありました。
なので、『待ち』のある処理を『多数』行うときは、ThreadPoolExecutor()
のほうが便利でした。
ProcessPoolExecutor()
プロセス プール エクセキューターです。
これは、『プロセス用』の Executor オブジェクトを返してくれました。
Executor には .map()
メソッドがありましたので、そこに自作関数と引数リストを渡しました。
これで、『マルチプロセス』で実行することができました。
class concurrent.futures.Executor
map(func, *iterables, timeout=None, chunksize=1)
同時実行数についてです。
Python マニュアルによると、Windows の場合は、max_workers=61
以下にしないと、ValueError
が出るとのことでした。
On Windows, max_workers must be less than or equal to 61. If it is not then ValueError will be raised.
ですが、実際に試してみたところ、61 ではだめでした。
さらに1つ少ない max_workers=60
以下にしないと、ValueError
が発生しました。
一応、ValueError
が出たあとも、ワーカープロセスは動いていました。
ですが、そのあと generator から結果を取り出すところで、メインプロセスが進まなくなりました。
そういうわけで、自分の環境 (Windows 10) では、上限は 60 プロセスでした。
ということは、もし、64 コア 128 スレッドの CPU 『ライゼン スレッドリッパー (AMD) Ryzen™ Threadripper™ 3990X』を使うことができたとしても、『フルパワー』を発揮するためには『工夫が必要』、ということなんでしょうかね…?
ThreadPoolExecutor() のコード例
ThreadPoolExecutor()
で並列処理するコード例です。
"""(スレッドプール版)
concurrent.futures の ThreadPoolExecutor で
並列実行する Python コード例です。
"""
from concurrent.futures import ThreadPoolExecutor
import os # デバッグ用
import time # デバッグ用
import threading # デバッグ用
def main():
"""メイン関数です。"""
print('start\n')
t = time.time()
# 自作関数 jisaku_func を用意します。
# (最後のほうに書きました)
# 引数リスト src_datas を用意します。
src_datas = [
[3, 'c'], # 最初(さいしょ)の引数
[2, 'b'],
[1, 'a'], # 最後(さいご)の引数
]
# スレッドプールを作ります。
# プロセスプールとちがって、スレッドの数は
# max_workers=1000 スレッドでも実行できました。
datas = []
print('ThreadPoolExecutor().map()')
with ThreadPoolExecutor(max_workers=3) as executor:
# 『関数』と『引数リスト』を渡して、実行します。
results = executor.map(
jisaku_func_wrapper, src_datas, timeout=None)
# (裏では既にスレッドが動いています)
# timeout の設定は、with 文の中にいる間だけ機能しました。
# 戻り値は『ジェネレーター』になっていました。
# (例) results: <class 'generator'>
print(f'results: {type(results)}')
# 『ジェネレーター』から、結果を取り出します。
# 並列処理は、結果を with 文の中で取り出すようにした
# ときだけ、Ctrl+C でうまく中断することができました。
for data in results:
datas.append(data)
# 結果を表示します。
print('(number, text, ppid, pid, tid)')
for data in datas:
print(data)
print('\n(メイン関数を実行していたプロセスの ID)')
print(f'pid: {os.getpid()}')
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
print('\nend')
return
def jisaku_func_wrapper(args):
"""引数をアンパックして渡す関数です。"""
return jisaku_func(*args)
def jisaku_func(number, text):
"""自作関数です。並列で実行します。"""
print(f'実行中 jisaku_func({number}, {text})')
# (デバッグ) 親プロセス ID (PPID) を取得してみます。
ppid = os.getppid()
# (デバッグ) プロセス ID (PID) を取得してみます。
pid = os.getpid()
# (デバッグ) スレッド ID (TID) を取得してみます。
# tid = threading.get_ident() # Python 3.3 から使用可能
tid = threading.get_native_id() # Python 3.8 から使用可能
# (デバッグ) ディスク I/O 待ちや HTTP 通信待ちなど。
time.sleep(number)
print(f'完了 jisaku_func({number}, {text})')
# 結果をタプル tuple に入れて返します。
return (number, text, ppid, pid, tid)
if __name__ == '__main__':
main()
実行結果
スレッドプールの実行結果です。
start
ThreadPoolExecutor().map()
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
results: <class 'generator'>
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(number, text, ppid, pid, tid)
(3, 'c', 14368, 18156, 18276)
(2, 'b', 14368, 18156, 5224)
(1, 'a', 14368, 18156, 14776)
(メイン関数を実行していたプロセスの ID)
pid: 18156
経過時間 3.009 秒
end
ProcessPoolExecutor() のコード例
ProcessPoolExecutor()
で並列処理するコード例です。
"""(プロセスプール版)
concurrent.futures の ProcessPoolExecutor で
並列実行する Python コード例です。
"""
from concurrent.futures import ProcessPoolExecutor
import os # デバッグ用
import time # デバッグ用
import threading # デバッグ用
def main():
"""メイン関数です。"""
print('start\n')
t = time.time()
# 自作関数 jisaku_func を用意します。
# (最後のほうに書きました)
# 引数リスト src_datas を用意します。
src_datas = [
[3, 'c'], # 最初(さいしょ)の引数
[2, 'b'],
[1, 'a'], # 最後(さいご)の引数
]
# プロセスプールを作ります。
# 自分の環境では、max_workers=60 プロセスまで OK でした。
# max_workers=61 の時のエラーメッセージは
# "ValueError: need at most 63 handles, got a sequence of length 63"
# でした。max_workers=62 以降のエラーメッセージは
# "ValueError: max_workers must be <= 61" でした。
datas = []
print('ProcessPoolExecutor().map()')
with ProcessPoolExecutor(max_workers=3) as executor:
# 『関数』と『引数リスト』を渡して、実行します。
results = executor.map(
jisaku_func_wrapper, src_datas, timeout=None)
# (裏では既にプロセスが動いています)
# timeout の設定は、with 文の中にいる間だけ機能しました。
# 戻り値は『ジェネレーター』になっていました。
# (例) results: <class 'generator'>
print(f'results: {type(results)}')
# 『ジェネレーター』から、結果を取り出します。
# 並列処理は、結果を with 文の中で取り出すようにした
# ときだけ、Ctrl+C でうまく中断することができました。
# あと、max_workers が多すぎて ValueError になったときは、
# for 文の開始直後から、メインプロセスが進まなくなりました。
for data in results:
datas.append(data)
# 結果を表示します。
print('(number, text, ppid, pid, tid)')
for data in datas:
print(data)
print('\n(メイン関数を実行していたプロセスの ID)')
print(f'pid: {os.getpid()}')
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
print('\nend')
return
def jisaku_func_wrapper(args):
"""引数をアンパックして渡す関数です。"""
return jisaku_func(*args)
def jisaku_func(number, text):
"""自作関数です。並列で実行します。"""
print(f'実行中 jisaku_func({number}, {text})')
# (デバッグ) 親プロセス ID (PPID) を取得してみます。
ppid = os.getppid()
# (デバッグ) プロセス ID (PID) を取得してみます。
pid = os.getpid()
# (デバッグ) スレッド ID (TID) を取得してみます。
# tid = threading.get_ident() # Python 3.3 から使用可能
tid = threading.get_native_id() # Python 3.8 から使用可能
# (デバッグ) ディスク I/O 待ちや HTTP 通信待ちなど。
time.sleep(number)
print(f'完了 jisaku_func({number}, {text})')
# 結果をタプル tuple に入れて返します。
return (number, text, ppid, pid, tid)
if __name__ == '__main__':
main()
実行結果
プロセスプールの実行結果です。
start
ProcessPoolExecutor().map()
results: <class 'generator'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(number, text, ppid, pid, tid)
(3, 'c', 10124, 17664, 15200)
(2, 'b', 10124, 4828, 19412)
(1, 'a', 10124, 13280, 13960)
(メイン関数を実行していたプロセスの ID)
pid: 10124
経過時間 3.114 秒
end
Ctrl + C での中断はできた
自分が試した限りでは、並列処理中でも、Ctrl + C
でプログラムを中断することができました。
ThreadPoolExecutor()
でも ProcessPoolExecutor()
でも、メインプロセスが止まることなく、中断することができました。
ただし、KeyboardInterrupt
と with
ブロックについてです。
実験してみて分かった挙動がありました。
ワーカーでの KeyboardInterrupt キャッチについて
ThreadPoolExecutor()
だと、ワーカースレッドに渡した自作関数の中では、KeyboardInterrupt
の例外をキャッチすることができませんでした。
どうやら、ワーカースレッドは、Ctrl + C
を押しても止まらずに、自作関数を最後まで実行してくれるようでした。
そして、ワーカースレッドの実行が完了したあとに、メインプロセスで KeyboardInterrupt
をキャッチすることができました。
一方で、ProcessPoolExecutor()
だと、ワーカープロセスの中でも KeyboardInterrupt
をキャッチすることができました。
なので、スレッドよりも中断が早かったです。
Ctrl + C
を押したら、ほぼすぐにワーカープロセスが反応して、KeyboardInterrupt
をキャッチすることができました。
そして、ワーカープロセスが完了したあとに、メインプロセスでも KeyboardInterrupt
をキャッチすることができました。
with ブロックの中で結果を取り出す
Ctrl + C
でプログラムを中断する場合です。
結果を取り出すタイミングで、挙動が変わりました。
executor.map()
の戻り値の generator から、結果を取り出すときです。
with
ブロックの中で for
文にかけたら、すみやかに並列処理を中断することができました。
一方で、with
ブロックを抜けてから for
文にかけるようにしたら、『ワーカースレッド』や『ワーカープロセス』へのタスク投入が止まらなかったです。
Ctrl + C
を押しても、引数リストの最後まで実行されました。
そのあとでようやく、メインプロセスで KeyboardInterrupt
をキャッチすることができました。
たとえ、KeyboardInterrupt
をキャッチできる『ワーカープロセス』の場合でも、だめでした。
引数リストの最後まで止まりませんでした。
KeyboardInterrupt
で、いったんはプロセスが終了したのですが、すぐに次のタスクが実行されて、結局、並列処理を止めることができませんでした。
引数リストの最後に達するまで、どんどんプロセスが復活しました。
そういうわけで、with
ブロックの中では、結果を取り出すところまでやるのが、たぶん正解なんじゃないかと思いました。
with
を抜けた後で結果を取り出す構造だと、Ctrl + C
を押したあとで、プログラムが進まなくなったりしました。
ところで、with
は Executor
をシャットダウンするとのことでした。
with
文はExecutor
をシャットダウンします (wait
をTrue
にセットしてExecutor.shutdown()
が呼ばれたかのように待ちます)。
なので、想像ですが、挙動の違いはこの辺りが関係していたのかもしれません。
タイムアウト
executor.map(func, iterables, timeout)
のタイムアウトについても、with
ブロックの中で結果を取り出すときに、うまく機能しました。
例外は concurrent.futures._base.TimeoutError
が発生しました。
一方で、with
を抜けた後で結果を取り出す構造だと、タイムアウトが発生しませんでした。
こちらも、with
を抜けたときのシャットダウンが関係していたのかもしれません。
with
文はExecutor
をシャットダウンします (wait
をTrue
にセットしてExecutor.shutdown()
が呼ばれたかのように待ちます)。