Python の multiprocessing.Pool()
を使用して、並列処理するコード例を書きました。
Python マニュアルを見たところ、プロセスプールを使って自作関数を動かす方法は、8つもありました。
pool.apply()
pool.apply_async()
pool.map()
pool.map_async()
pool.imap()
pool.imap_unordered()
pool.starmap()
pool.starmap_async()
(読み方は、YouTube で海外の方が読んでいた時の発音をいろいろと聞いてみて、カタカナにしたものです。もちろん、ほかにも読み方はあると思います。ピリオド .
はドットと読んでいました。プール ドット マップ みたいな感じで読んでいました。)
これらの『説明』と『コード例』と『実行結果』です。
環境は Windows 10 Pro (64 bit) で、Python 3.8.6 (64 bit) を使用しました。
自分は最初、multiprocessing
モジュールの使い方がわからなくて、意図した通りに『動かない』ということが、よくありました。
そこで、実際に全部のメソッドを使ってみることにしました。
そうして、ようやく意図した通りに動く『書き方』を見つけたり、『8つのメソッドの違い』が分かるようになってきました。
マルチプロセス処理は、決算分析システムの高速化に、とても役立ちました。
自分は .map()
と .starmap()
を良く使っています。
説明
Python マニュアルの場所と、簡単な説明を書きました。
multiprocessing.pool.Pool()
マルチプロセッシング ドット プール ドット プール です。
プロセスを作るクラスです。ワーカープロセスのプールを作ってくれました。
プロセスの数(並列数)processes
を決めて使います。
プロセスの数は、processes=60
が上限のようでした。
processes=61
にしたら
ValueError: need at most 63 handles, got a sequence of length 63
というエラーが出ました。
あと .Pool()
では、with
文を使うことができました。
with
のブロックを抜けると、並列処理に使ったプロセスを、自動的に終了してくれました。
特に理由がない限り、自分は with
を使用しています。
multiprocessing
のインポート方法ですが、以下のどれでも動きました。
import multiprocessing
if __name__ == '__main__':
with multiprocessing.Pool(processes=3) as pool:
pool.map(jisaku_func_wrapper, src_datas)
from multiprocessing import Pool
if __name__ == '__main__':
with Pool(processes=3) as pool:
pool.map(jisaku_func_wrapper, src_datas)
from multiprocessing import pool
if __name__ == '__main__':
with pool.Pool(processes=3) as p:
p.map(jisaku_func_wrapper, src_datas)
pool.apply()
アプライ メソッドです。
(Python) apply(func[, args[, kwds]])
1つのプロセスを使って、完了するまで待ちます(メインプロセスの進行をブロックします)。
並列処理にはなりませんでした。
たぶん、あまり使うことはないと思います。
pool.apply_async()
アプライ エシンク メソッドです。
(Python) apply_async(func[, args[, kwds[, callback[, error_callback]]]])
for
文で1つずつ、次々に並列で動かすことができました。
結果を受け取るときは、戻り値の AsyncResult
が持っている .get(timeout)
メソッドを使って受け取ります。
(Python) class multiprocessing.pool.AsyncResult
もし、timeout
の秒数を超えても結果が受け取れなかったときは、.get(timeout)
のところで multiprocessing.context.TimeoutError
が発生しました。
この例外は、multiprocessing.TimeoutError
や multiprocessing.context.TimeoutError
でキャッチすることができました。
Python マニュアルの『(Python) 使用例(Pool
を使用する例)』では、multiprocessing.TimeoutError
を使って例外をキャッチしていました。
pool.map()
マップ メソッドです。
(Python) map(func, iterable[, chunksize])
自分は良く使っています。
自作関数が並列で動きました。
すべてのプロセスが完了したら、メインプロセスが進みました。
結果は、すべての引数の分を、まとめて受け取ることができました。
また、引数のアンパック機能を内蔵した .starmap()
という便利なバージョンもありました。
こちらも良く使っています。
ラッパー関数を書かなくて済んだので、コードがすっきりとしました。
pool.map_async()
マップ エシンク メソッドです。
(Python) map_async(func, iterable[, chunksize[, callback[, error_callback]]])
自作関数が並列でスタートして、メインプロセスもすぐに動き始めました。
結果を受け取るときは、戻り値の multiprocessing.pool.MapResult
が持っている .get(timeout)
メソッドを使って受け取ります。
結果は、すべての引数の分を、まとめて受け取ることができました。
また、引数のアンパック機能を内蔵した .starmap_async()
という便利なバージョンもありました。
ラッパー関数を書かなくて済んだので、コードがすっきりとしました。
pool.imap()
イマップ メソッドです。
(Python) imap(func, iterable[, chunksize])
自作関数が並列でスタートして、メインプロセスもすぐに動き始めました。
結果は、引数リストと同じ順番で返してくれました。
.map()
との違いです。
.map()
だと、すべてのプロセスが完了しないと、結果が受け取れませんでした。
ですが、.imap()
はちがいました。
.imap()
なら、引数リストの後ろのほうが終わっていなくても、結果を返し始めてくれました。
pool.imap_unordered()
イマップ アンオーダード メソッドです。
(Python) imap_unordered(func, iterable[, chunksize])
自作関数が並列でスタートして、メインプロセスもすぐに動き始めました。
.map()
との違いです。
.map()
だと、すべてのプロセスが完了しないと、結果が受け取れませんでした。
ですが、.imap_unordered()
はちがいました。
.imap_unordered()
なら、完了したものから、どんどん結果を返してくれました。
pool.starmap()
スターマップ メソッドです。
(Python) starmap(func, iterable[, chunksize])
自分は良く使っています。
これは、引数のアンパック機能を内蔵した .map()
でした。
引数を自動的に『アンパック』して、『複数の引数』として、自作関数に渡してくれました。
引数をアンパックするためのラッパー関数が、必要ありません。
コードがすっきりとして、とても便利でした。
『スター』とは、たぶん、アンパックするときに使う『演算子 * (スター、アスタリスク)』のことです。
個別に引数を与えることができない場合、関数呼び出しを
*
演算子を使って書き、リストやタプルから引数をアンパックします:
そういえば、Python マニュアルの中でも、アンパック用の関数名に star が使われていました。
def calculatestar(args): return calculate(*args)
こういう名前の付け方もあったんですね。
pool.starmap_async()
スターマップ エシンク メソッドです。
(Python) starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
これは、引数のアンパック機能を内蔵した .map_async()
でした。
引数を自動的に『アンパック』して、『複数の引数』として、自作関数に渡してくれました。
結果を受け取るときは、戻り値の multiprocessing.pool.MapResult
が持っている .get(timeout)
メソッドを使って受け取ります。
initializer と initargs の用途と使い方
multiprocessing.pool.Pool()
の initializer
と initargs
の使い道です。
イニシャライザは、たとえば『各子プロセスのロギング設定』をするときに使用しました。initargs
は、『ログのファイルパス』や『Queue
』を渡すために使用しました。
initializer
と initargs
の使い方(使用例)を書きました。
⇒ multiprocessing.Pool でログを記録するコード例(プロセスごとに分ける)
⇒ multiprocessing.Pool でログを記録するコード例(まとめて記録)
もちろん、ほかにも使い道はあると思います。
if __name__ == '__main__': main()
main 関数の呼び出しについてです。
main 関数を呼び出すときは、if __name__ == '__main__':
の中で呼び出すと、うまく動きました。
Python マニュアルの場所です。
(Python) multiprocessing
はじめに(子プロセスがそのモジュールを正常にインポートできるようにする一般的な方法)
(Python) multiprocessing
メインモジュールの安全なインポート
もし、if __name__ == '__main__':
を付けなかったら、どうなるのか?
そのときは、子プロセスが main 関数を呼び出してしまって、無限ループのような状態になってしまいました。
ところで、__name__
の中には、何が入っているのか?
子プロセスのときは、'__mp_main__'
という文字列が入っていました。
これは、if 文に書いた '__main__'
とはちがっているので、main 関数の実行がスキップされる、という仕組みでした。
コード例
multiprocessing.Pool() で並列処理するコード例です。
用意した自作関数 (jisaku_func) は、number
の秒数だけ待機する関数です。
引数リスト (src_datas) は、3回分を用意しました。
"""
multiprocessing.Pool() で使える 8 種類の
メソッドをお試しする Python コード例です。
"""
import multiprocessing
import os # デバッグ用
import time # デバッグ用
import threading # デバッグ用
def main():
"""メイン関数です。"""
print('start\n')
# 自作関数 jisaku_func を用意します。
# (最後のほうに書きました)
# 引数リスト src_datas を用意します。
src_datas = [
[3, 'c'],
[2, 'b'],
[1, 'a'],
]
all_datas = {}
# ■ (1/8) アプライ (並列処理にはなりませんでした)
print('(1/8) pool.apply()')
t = time.time()
with multiprocessing.Pool(processes=3) as pool:
datas = []
for src_data in src_datas:
data = pool.apply(
func=jisaku_func, args=src_data)
print('(プロセスが完了したら進みます)')
# 自作関数が『別プロセス』で実行された
# だけで、並列処理にはなりませんでした。
datas.append(data)
print(f'戻り値の型: {type(data)}')
all_datas['(1/8) pool.apply()'] = datas
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
# ■ (2/8) アプライ エシンク (エィスィンク)
print('\n(2/8) pool.apply_async()')
t = time.time()
with multiprocessing.Pool(processes=3) as pool:
apply_results = []
for src_data in src_datas:
apply_result = pool.apply_async(
func=jisaku_func, args=src_data)
# (裏では既にプロセスが動いています)
apply_results.append(apply_result)
print(f'戻り値の型: {type(apply_result)}')
datas = []
for apply_result in apply_results:
# 戻り値を取得します(プロセスの完了を待ちます)。
data = apply_result.get(timeout=None)
datas.append(data)
all_datas['(2/8) pool.apply_async()'] = datas
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
# ■ (3/8) マップ
print('\n(3/8) pool.map()')
t = time.time()
with multiprocessing.Pool(processes=3) as pool:
datas = pool.map(
func=jisaku_func_wrapper, iterable=src_datas)
print('(すべてのプロセスが完了したら進みます)')
print(f'戻り値の型: {type(datas)}')
all_datas['(3/8) pool.map()'] = datas
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
# ■ (4/8) マップ エシンク (エィスィンク)
print('\n(4/8) pool.map_async()')
t = time.time()
with multiprocessing.Pool(processes=3) as pool:
map_result = pool.map_async(
func=jisaku_func_wrapper, iterable=src_datas)
# (裏では既にプロセスが動いています)
print(f'戻り値の型: {type(map_result)}')
# 戻り値を取得します(プロセスの完了を待ちます)。
datas = map_result.get(timeout=None)
all_datas['(4/8) pool.map_async()'] = datas
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
# ■ (5/8) イマップ (アイマップ) .map() の遅延評価版
print('\n(5/8) pool.imap()')
t = time.time()
with multiprocessing.Pool(processes=3) as pool:
imap_iterator = pool.imap(
func=jisaku_func_wrapper, iterable=src_datas)
# (裏では既にプロセスが動いています)
print(f'戻り値の型: {type(imap_iterator)}')
datas = []
for data in imap_iterator:
datas.append(data)
print(data)
print('最初から順に進みます(後ろが終わってなくても)。')
all_datas['(5/8) pool.imap()'] = datas
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
# ■ (6/8) イマップ (アイマップ) アンオーダード
print('\n(6/8) pool.imap_unordered()')
t = time.time()
with multiprocessing.Pool(processes=3) as pool:
imap_unordered_iterator = pool.imap_unordered(
func=jisaku_func_wrapper, iterable=src_datas)
# (裏では既にプロセスが動いています)
print(f'戻り値の型: {type(imap_unordered_iterator)}')
datas = []
for data in imap_unordered_iterator:
datas.append(data)
print('どれかプロセスが終わるたびに進みます。')
all_datas['(6/8) pool.imap_unordered()'] = datas
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
# ■ (7/8) スターマップ
print('\n(7/8) pool.starmap()')
t = time.time()
with multiprocessing.Pool(processes=3) as pool:
datas = pool.starmap(
func=jisaku_func, iterable=src_datas)
print('(すべてのプロセスが完了したら進みます)')
print(f'戻り値の型: {type(datas)}')
all_datas['(7/8) pool.starmap()'] = datas
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
# ■ (8/8) スターマップ エシンク (エィスィンク)
print('\n(8/8) pool.starmap_async()')
t = time.time()
with multiprocessing.Pool(processes=3) as pool:
map_result = pool.starmap_async(
func=jisaku_func, iterable=src_datas)
# (裏では既にプロセスが動いています)
print(f'戻り値の型: {type(map_result)}')
# 戻り値を取得します(プロセスの完了を待ちます)。
datas = map_result.get(timeout=None)
all_datas['(8/8) pool.starmap_async()'] = datas
elapsed_time = time.time() - t
print(f'経過時間 {elapsed_time:.3f} 秒')
print('\n結果を表示します。')
for (method_name, datas) in all_datas.items():
print(f'\n{method_name}')
print('(number, text, ppid, pid, tid)')
for data in datas:
print(data)
print('\n(メイン関数を実行していたプロセスの ID)')
print(f'pid: {os.getpid()}')
print('\nend')
return
def jisaku_func_wrapper(args):
"""
引数をアンパックして渡す関数です。
スターマップ .starmap() か
スターマップ エシンク .starmap_async() を
使えば、この関数を使わなくても、自動的に引数を
アンパックして渡してくれました。
『スター』とは、たぶん、アンパックするときに使う
『演算子 * (スター、アスタリスク)』のことです。
ところで、"_wrapper" という関数名ですが、
Python マニュアルの中では、
def calculatestar(args):
return calculate(*args)
のように "star" が使われていました。
"""
return jisaku_func(*args)
def jisaku_func(number, text):
"""自作関数です。並列で実行します。"""
print(f'実行中 jisaku_func({number}, {text})')
# (デバッグ) 親プロセス ID (PPID) を取得してみます。
ppid = os.getppid()
# (デバッグ) プロセス ID (PID) を取得してみます。
pid = os.getpid()
# (デバッグ) スレッド ID (TID) を取得してみます。
# tid = threading.get_ident() # Python 3.3 から使用可能
tid = threading.get_native_id() # Python 3.8 から使用可能
# (デバッグ) ディスク I/O 待ちや HTTP 通信待ちなど。
time.sleep(number)
print(f'完了 jisaku_func({number}, {text})')
# 結果をタプル tuple に入れて返します。
return (number, text, ppid, pid, tid)
if __name__ == '__main__':
main()
実行結果
マルチプロセス処理の実行結果です。
メインプロセスが動き始める『タイミング』や、『戻り値の型』に、違いが出ていました。
start
(1/8) pool.apply()
実行中 jisaku_func(3, c)
完了 jisaku_func(3, c)
(プロセスが完了したら進みます)
戻り値の型: <class 'tuple'>
実行中 jisaku_func(2, b)
完了 jisaku_func(2, b)
(プロセスが完了したら進みます)
戻り値の型: <class 'tuple'>
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
(プロセスが完了したら進みます)
戻り値の型: <class 'tuple'>
経過時間 6.120 秒
(2/8) pool.apply_async()
戻り値の型: <class 'multiprocessing.pool.ApplyResult'>
戻り値の型: <class 'multiprocessing.pool.ApplyResult'>
戻り値の型: <class 'multiprocessing.pool.ApplyResult'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
経過時間 3.089 秒
(3/8) pool.map()
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(すべてのプロセスが完了したら進みます)
戻り値の型: <class 'list'>
経過時間 3.108 秒
(4/8) pool.map_async()
戻り値の型: <class 'multiprocessing.pool.MapResult'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
経過時間 3.090 秒
(5/8) pool.imap()
戻り値の型: <class 'multiprocessing.pool.IMapIterator'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(3, 'c', 15240, 16156, 10752)
最初から順に進みます(後ろのプロセスが終わってなくても)。
(2, 'b', 15240, 6608, 10532)
最初から順に進みます(後ろのプロセスが終わってなくても)。
(1, 'a', 15240, 13716, 2836)
最初から順に進みます(後ろのプロセスが終わってなくても)。
経過時間 3.083 秒
(6/8) pool.imap_unordered()
戻り値の型: <class 'multiprocessing.pool.IMapUnorderedIterator'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
どれかプロセスが終わるたびに進みます。
完了 jisaku_func(2, b)
どれかプロセスが終わるたびに進みます。
完了 jisaku_func(3, c)
どれかプロセスが終わるたびに進みます。
経過時間 3.096 秒
(7/8) pool.starmap()
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(すべてのプロセスが完了したら進みます)
戻り値の型: <class 'list'>
経過時間 3.111 秒
(8/8) pool.starmap_async()
戻り値の型: <class 'multiprocessing.pool.MapResult'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
経過時間 3.096 秒
結果を表示します。
(1/8) pool.apply()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 12128, 3636)
(2, 'b', 15240, 12748, 15892)
(1, 'a', 15240, 1372, 13412)
(2/8) pool.apply_async()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 8700, 17576)
(2, 'b', 15240, 13796, 15152)
(1, 'a', 15240, 6372, 11796)
(3/8) pool.map()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 11632, 18268)
(2, 'b', 15240, 7480, 17828)
(1, 'a', 15240, 11456, 17608)
(4/8) pool.map_async()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 10864, 15132)
(2, 'b', 15240, 6248, 7136)
(1, 'a', 15240, 10112, 12928)
(5/8) pool.imap()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 16156, 10752)
(2, 'b', 15240, 6608, 10532)
(1, 'a', 15240, 13716, 2836)
(6/8) pool.imap_unordered()
(number, text, ppid, pid, tid)
(1, 'a', 15240, 15228, 13220)
(2, 'b', 15240, 12516, 14116)
(3, 'c', 15240, 16328, 14756)
(7/8) pool.starmap()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 15992, 13052)
(2, 'b', 15240, 12732, 14664)
(1, 'a', 15240, 12284, 13000)
(8/8) pool.starmap_async()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 13972, 3804)
(2, 'b', 15240, 16748, 3788)
(1, 'a', 15240, 13920, 7736)
(メイン関数を実行していたプロセスの ID)
pid: 15240
end
Pause キーでの一時停止はできた
マルチプロセス処理を実行中に、キーボードの Pause キーを押したら、プログラムの実行を一時停止することができました。
プログラムを一時停止するにはPAUSEキーを押す【Python】
そして、矢印キーなど、何かキーを押したら、再びプログラムが動き始めました。
一時停止の Pause キーや、再開するときの矢印キーは、コマンドプロンプトのウィンドウをクリックするなどして、アクティブにしてから押すと反応しました。
Pause キーを押しても、特にメッセージなどは表示されませんでした。
ですが、タスクマネージャーの CPU 使用率を見たら0%近くまで下がっていたので、意図した通り、プログラムが一時停止していたことがわかりました。
Pause キーは便利なので、自分はよく使っています。
Ctrl + C の KeyboardInterrupt は使用できなかった
Ctrl + C
を押して KeyboardInterrupt
を発生させたら、メインプロセスの処理が進まなくなりました。
繰り返し Ctrl + C
を押しても、メインプロセスの中断ができませんでした。
これは、apply
, apply_async
, map
, map_async
, imap
, imap_unordered
, starmap
, starmap_async
の、どのメソッドを使った時でも、メインプロセスが進まなくなって、中断もできなくなりました。
並列処理中に Ctrl + C
でプログラムを中断したいときは、.Pool()
を『使わない』方法だとうまくできました。
実際に自分が使っているのは、multiprocessing.Process()
で並列処理する方法です。
ほかにも色々と試したところ、concurrent.futures
の ThreadPoolExecutor()
と ProcessPoolExecutor()
で、並列処理を中断することができました。
【Python】concurrent.futures の使い方【並列処理】
あとは、外部ライブラリの joblib
でも、Ctrl + C
でうまく中断することができました。
こちらは、joblib
のドキュメントに Ctrl + C
での中断機能がある旨が書かれていました。
Interruption of multiprocesses jobs with ‘Ctrl-C’
並列処理をするときって、たいてい2時間とか3時間とか、長い時間実行するので、自分は Ctrl + C
での中断をよく使っているんですよね。
『ひらめいた!よし、直して再実行!』みたいな感じです。
そんな時です。
自前のデータベースの破損防止のために、データベースを閉じてから終了したい、というわけです。
実際のところ、SQLite DB のファイルが壊れたことって、ほとんどなかったです。
けれども一応、念のために、閉じてから終了したいんですよね。
そういうわけなので、新規にコードを書くときは、こういった Ctrl + C
に対応したモジュールを使用していこうと考えています。
ログを記録するコード例
multiprocessing.Pool
で並列処理しているときにログを記録するコード例を書きました。
⇒ multiprocessing.Pool でログを記録するコード例(プロセスごとに分ける)
⇒ multiprocessing.Pool でログを記録するコード例(まとめて記録)
プロセスの優先度を変更する
マルチプロセス処理を実行しているときは、プロセスの優先度を通常よりも下げておくと、ほかの作業が快適になりました。
実行中のPythonプログラムの優先度を下げる方法【psutil】
CPUが100%の状態でも、すぐにCPUの時間を分けてくれました。
そして、CPUが空けば、再び全力を発揮してくれました。
なので、優先度を下げたからといって、処理速度が極端に落ちるといったこともありませんでした。
自分はよく、Python プログラムの開始地点(エントリーポイント)の近くで、プロセスの優先度を『低 (idle, psutil.IDLE_PRIORITY_CLASS)』に下げる、といった処理を書いています。
これで、『計算しつつ、Web を見る』といったことが、より快適にできるようになりました。
メモリ不足を回避する
multiprocessing.Pool()
の『.map()
、.imap()
、.imap_unordered()
』などで、1度にたくさんのデータを処理しようとしたら、メモリ不足になりました。
メモリ不足の原因と対策方法は、【Python】concurrent.futures の使い方【並列処理】 の記事に書きました。
以上です。