【Python】multiprocessing.Pool() の使い方【並列処理】

Python の multiprocessing.Pool() を使用して、並列処理するコード例を書きました。

Python マニュアルを見たところ、プロセスプールを使って自作関数を動かす方法は、8つもありました。

  • pool.applyアプライ()
  • pool.apply_asyncアプライ エシンク()
  • pool.mapマップ()
  • pool.map_asyncマップ エシンク()
  • pool.imapイマップ()
  • pool.imap_unorderedイマップ アンオーダード()
  • pool.starmapスターマップ()
  • pool.starmap_asyncスターマップ エシンク()

(読み方は、YouTube で海外の方が読んでいた時の発音をいろいろと聞いてみて、カタカナにしたものです。もちろん、ほかにも読み方はあると思います。ピリオド . はドットと読んでいました。プール ドット マップ みたいな感じで読んでいました。)

これらの『説明』と『コード例』と『実行結果』です。

環境は Windows 10 Pro (64 bit) で、Python 3.8.6 (64 bit) を使用しました。

自分は最初、multiprocessing モジュールの使い方がわからなくて、意図した通りに『動かない』ということが、よくありました。

そこで、実際に全部のメソッドを使ってみることにしました。

そうして、ようやく意図した通りに動く『書き方』を見つけたり、『8つのメソッドの違い』が分かるようになってきました。

マルチプロセス処理は、決算分析システムの高速化に、とても役立ちました。

自分は .mapマップ().starmapスターマップ() を良く使っています。

説明

Python マニュアルの場所と、簡単な説明を書きました。

multiprocessing.pool.Pool()

マルチモーティプロセッシング ドット プール ドット プール です。

プロセスを作るクラスです。ワーカープロセスのプールを作ってくれました。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

プロセスの数(並列数)processes を決めて使います。

プロセスの数は、processes=60 が上限のようでした。

processes=61 にしたら

ValueError: need at most 63 handles, got a sequence of length 63

というエラーが出ました。

あと .Pool() では、with ぶんを使うことができました。

with のブロックを抜けると、並列処理に使ったプロセスを、自動的に終了してくれました。

特に理由がない限り、自分は with を使用しています。

multiprocessing のインポート方法ですが、以下のどれでも動きました。

import multiprocessing
if __name__ == '__main__':
    with multiprocessing.Pool(processes=3) as pool:
        pool.map(jisaku_func_wrapper, src_datas)

from multiprocessing import Pool
if __name__ == '__main__':
    with Pool(processes=3) as pool:
        pool.map(jisaku_func_wrapper, src_datas)

from multiprocessing import pool
if __name__ == '__main__':
    with pool.Pool(processes=3) as p:
        p.map(jisaku_func_wrapper, src_datas)

pool.apply()

アプライ メソッドです。

apply(func[, args[, kwds]])

1つのプロセスを使って、完了するまで待ちます(メインプロセスの進行をブロックします)。

並列処理にはなりませんでした。

たぶん、あまり使うことはないと思います。

pool.apply_async()

アプライ エシンク メソッドです。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

for 文で1つずつ、次々に並列で動かすことができました。

結果を受け取るときは、戻り値の AsyncResult が持っている .get(timeout) メソッドを使って受け取ります。

class multiprocessing.pool.AsyncResult

get([timeout])

もし、timeout の秒数を超えても結果が受け取れなかったときは、.get(timeout) のところで multiprocessing.context.TimeoutError が発生しました。

この例外は、multiprocessing.TimeoutErrormultiprocessing.context.TimeoutError でキャッチすることができました。

Python マニュアルの『使用例(Pool を使用する例)』では、multiprocessing.TimeoutError を使って例外をキャッチしていました。

pool.map()

マップ メソッドです。

map(func, iterable[, chunksize])

自分は良く使っています。

自作関数が並列で動きました。

すべてのプロセスが完了したら、メインプロセスが進みました。

結果は、すべての引数の分を、まとめて受け取ることができました。

また、引数のアンパック機能を内蔵した .starmapスターマップ() という便利なバージョンもありました。

こちらも良く使っています。

ラッパー関数を書かなくて済んだので、コードがすっきりとしました。

pool.map_async()

マップ エシンク メソッドです。

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

自作関数が並列でスタートして、メインプロセスもすぐに動き始めました。

結果を受け取るときは、戻り値の multiprocessing.pool.MapResult が持っている .get(timeout) メソッドを使って受け取ります。

結果は、すべての引数の分を、まとめて受け取ることができました。

また、引数のアンパック機能を内蔵した .starmap_asyncスターマップ エシンク() という便利なバージョンもありました。

ラッパー関数を書かなくて済んだので、コードがすっきりとしました。

pool.imap()

イマップ メソッドです。

imap(func, iterable[, chunksize])

自作関数が並列でスタートして、メインプロセスもすぐに動き始めました。

結果は、引数リストと同じ順番で返してくれました。

.map() との違いです。

.map() だと、すべてのプロセスが完了しないと、結果が受け取れませんでした。

ですが、.imap() はちがいました。

.imap() なら、引数リストの後ろのほうが終わっていなくても、結果を返し始めてくれました。

pool.imap_unordered()

イマップ アンオーダード メソッドです。

imap_unordered(func, iterable[, chunksize])

自作関数が並列でスタートして、メインプロセスもすぐに動き始めました。

.map() との違いです。

.map() だと、すべてのプロセスが完了しないと、結果が受け取れませんでした。

ですが、.imap_unordered() はちがいました。

.imap_unordered() なら、完了したものから、どんどん結果を返してくれました。

pool.starmap()

スターマップ メソッドです。

starmap(func, iterable[, chunksize])

自分は良く使っています。

これは、引数のアンパック機能を内蔵した .map() でした。

引数を自動的に『アンパック』して、『複数の引数』として、自作関数に渡してくれました。

引数をアンパックするためのラッパー関数が、必要ありません。

コードがすっきりとして、とても便利でした。

『スター』とは、たぶん、アンパックするときに使う『演算子 * (スター、アスタリスク)』のことです。

個別に引数を与えることができない場合、関数呼び出しを * 演算子を使って書き、リストやタプルから引数をアンパックします:

(Python) 引数リストのアンパック

そういえば、Python マニュアルの中でも、アンパック用の関数名に starスター が使われていました。

def calculatestar(args):
    return calculate(*args)

(Python) 使用例(Pool を使用する例)

こういう名前の付け方もあったんですね。

pool.starmap_async()

スターマップ エシンク メソッドです。

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

これは、引数のアンパック機能を内蔵した .map_async() でした。

引数を自動的に『アンパック』して、『複数の引数』として、自作関数に渡してくれました。

結果を受け取るときは、戻り値の multiprocessing.pool.MapResult が持っている .get(timeout) メソッドを使って受け取ります。

if __name__ == '__main__': main()

main 関数の呼び出しについてです。
main 関数を呼び出すときは、if __name__ == '__main__': の中で呼び出すと、うまく動きました。

Python マニュアルの場所です。

(Python) multiprocessing はじめに(子プロセスがそのモジュールを正常にインポートできるようにする一般的な方法)

(Python) multiprocessing メインモジュールの安全なインポート

もし、if __name__ == '__main__': を付けなかったら、どうなるのか?

そのときは、プロセスが main 関数を呼び出してしまって、無限ループのような状態になってしまいました。

ところで、__name__ の中には、何が入っているのか?

子プロセスのときは、'__mp_main__' という文字列が入っていました。

これは、if 文に書いた '__main__' とはちがっているので、main 関数の実行がスキップされる、という仕組みでした。

コード例

multiprocessing.Pool() で並列処理するコード例です。

用意した自作関数 (jisaku_func) は、number の秒数だけ待機する関数です。

引数リスト (src_datas) は、3回分を用意しました。

"""
multiprocessing.Pool() で使える 8 種類の
メソッドをお試しする Python コード例です。
"""

import multiprocessing
import os # デバッグ用
import time # デバッグ用
import threading # デバッグ用

def main():
    """メイン関数です。"""
    print('start\n')

    # 自作関数 jisaku_func を用意します。
    # (最後のほうに書きました)

    # 引数リスト src_datas を用意します。
    src_datas = [
        [3, 'c'], # 最初(さいしょ)の引数
        [2, 'b'],
        [1, 'a'], # 最後(さいご)の引数
    ]

    all_datas = {}

    # ■ (1/8) アプライ (並列処理にはなりませんでした)
    print('(1/8) pool.apply()')
    t = time.time()
    with multiprocessing.Pool(processes=3) as pool:
        datas = []
        for src_data in src_datas:
            data = pool.apply(
                func=jisaku_func, args=src_data)
            print('(プロセスが完了したら進みます)')
            # 自作関数が『別プロセス』で実行された
            # だけで、並列処理にはなりませんでした。
            datas.append(data)
            print(f'戻り値の型: {type(data)}')
    all_datas['(1/8) pool.apply()'] = datas
    elapsed_time = time.time() - t
    print(f'経過時間 {elapsed_time:.3f} 秒')


    # ■ (2/8) アプライ エシンク (エィスィンク)
    print('\n(2/8) pool.apply_async()')
    t = time.time()
    with multiprocessing.Pool(processes=3) as pool:
        apply_results = []
        for src_data in src_datas:
            apply_result = pool.apply_async(
                func=jisaku_func, args=src_data)
            # (裏では既にプロセスが動いています)
            apply_results.append(apply_result)
            print(f'戻り値の型: {type(apply_result)}')
        datas = []
        for apply_result in apply_results:
            # 戻り値を取得します(プロセスの完了を待ちます)。
            data = apply_result.get(timeout=None)
            datas.append(data)
    all_datas['(2/8) pool.apply_async()'] = datas
    elapsed_time = time.time() - t
    print(f'経過時間 {elapsed_time:.3f} 秒')


    # ■ (3/8) マップ
    print('\n(3/8) pool.map()')
    t = time.time()
    with multiprocessing.Pool(processes=3) as pool:
        datas = pool.map(
            func=jisaku_func_wrapper, iterable=src_datas)
        print('(すべてのプロセスが完了したら進みます)')
        print(f'戻り値の型: {type(datas)}')
    all_datas['(3/8) pool.map()'] = datas
    elapsed_time = time.time() - t
    print(f'経過時間 {elapsed_time:.3f} 秒')


    # ■ (4/8) マップ エシンク (エィスィンク)
    print('\n(4/8) pool.map_async()')
    t = time.time()
    with multiprocessing.Pool(processes=3) as pool:
        map_result = pool.map_async(
            func=jisaku_func_wrapper, iterable=src_datas)
        # (裏では既にプロセスが動いています)
        print(f'戻り値の型: {type(map_result)}')
        # 戻り値を取得します(プロセスの完了を待ちます)。
        datas = map_result.get(timeout=None)
    all_datas['(4/8) pool.map_async()'] = datas
    elapsed_time = time.time() - t
    print(f'経過時間 {elapsed_time:.3f} 秒')


    # ■ (5/8) イマップ (アイマップ) .map() の遅延評価版
    print('\n(5/8) pool.imap()')
    t = time.time()
    with multiprocessing.Pool(processes=3) as pool:
        imap_iterator = pool.imap(
            func=jisaku_func_wrapper, iterable=src_datas)
        # (裏では既にプロセスが動いています)
        print(f'戻り値の型: {type(imap_iterator)}')
        datas = []
        for data in imap_iterator:
            datas.append(data)
            print(data)
            print('最初から順に進みます(後ろが終わってなくても)。')
    all_datas['(5/8) pool.imap()'] = datas
    elapsed_time = time.time() - t
    print(f'経過時間 {elapsed_time:.3f} 秒')


    # ■ (6/8) イマップ (アイマップ) アンオーダード
    print('\n(6/8) pool.imap_unordered()')
    t = time.time()
    with multiprocessing.Pool(processes=3) as pool:
        imap_unordered_iterator = pool.imap_unordered(
            func=jisaku_func_wrapper, iterable=src_datas)
        # (裏では既にプロセスが動いています)
        print(f'戻り値の型: {type(imap_unordered_iterator)}')
        datas = []
        for data in imap_unordered_iterator:
            datas.append(data)
            print('どれかプロセスが終わるたびに進みます。')
    all_datas['(6/8) pool.imap_unordered()'] = datas
    elapsed_time = time.time() - t
    print(f'経過時間 {elapsed_time:.3f} 秒')


    # ■ (7/8) スターマップ
    print('\n(7/8) pool.starmap()')
    t = time.time()
    with multiprocessing.Pool(processes=3) as pool:
        datas = pool.starmap(
            func=jisaku_func, iterable=src_datas)
        print('(すべてのプロセスが完了したら進みます)')
        print(f'戻り値の型: {type(datas)}')
    all_datas['(7/8) pool.starmap()'] = datas
    elapsed_time = time.time() - t
    print(f'経過時間 {elapsed_time:.3f} 秒')


    # ■ (8/8) スターマップ エシンク (エィスィンク)
    print('\n(8/8) pool.starmap_async()')
    t = time.time()
    with multiprocessing.Pool(processes=3) as pool:
        map_result = pool.starmap_async(
            func=jisaku_func, iterable=src_datas)
        # (裏では既にプロセスが動いています)
        print(f'戻り値の型: {type(map_result)}')
        # 戻り値を取得します(プロセスの完了を待ちます)。
        datas = map_result.get(timeout=None)
    all_datas['(8/8) pool.starmap_async()'] = datas
    elapsed_time = time.time() - t
    print(f'経過時間 {elapsed_time:.3f} 秒')


    print('\n結果を表示します。')
    for (method_name, datas) in all_datas.items():
        print(f'\n{method_name}')
        print('(number, text, ppid, pid, tid)')
        for data in datas:
            print(data)

    print('\n(メイン関数を実行していたプロセスの ID)')
    print(f'pid: {os.getpid()}')

    print('\nend')
    return


def jisaku_func_wrapper(args):
    """
    引数をアンパックして渡す関数です。
    スターマップ .starmap() か
    スターマップ エシンク .starmap_async() を
    使えば、この関数を使わなくても、自動的に引数を
    アンパックして渡してくれました。
    『スター』とは、たぶん、アンパックするときに使う
    『演算子 * (スター、アスタリスク)』のことです。
    ところで、"_wrapper" という関数名ですが、
    Python マニュアルの中では、
    def calculatestar(args):
        return calculate(*args)
    のように "star" が使われていました。
    """
    return jisaku_func(*args)


def jisaku_func(number, text):
    """自作関数です。並列で実行します。"""
    print(f'実行中 jisaku_func({number}, {text})')

    # (デバッグ) 親プロセス ID (PPID) を取得してみます。
    ppid = os.getppid()

    # (デバッグ) プロセス ID (PID) を取得してみます。
    pid = os.getpid()

    # (デバッグ) スレッド ID (TID) を取得してみます。
    # tid = threading.get_ident() # Python 3.3 から使用可能
    tid = threading.get_native_id() # Python 3.8 から使用可能

    # (デバッグ) ディスク I/O 待ちや HTTP 通信待ちなど。
    time.sleep(number)

    print(f'完了 jisaku_func({number}, {text})')

    # 結果をタプル tuple に入れて返します。
    return (number, text, ppid, pid, tid)


if __name__ == '__main__':
    main()

実行結果

マルチプロセス処理の実行結果です。

メインプロセスが動き始める『タイミング』や、『戻り値の型』に、違いが出ていました。

start

(1/8) pool.apply()
実行中 jisaku_func(3, c)
完了 jisaku_func(3, c)
(プロセスが完了したら進みます)
戻り値の型: <class 'tuple'>
実行中 jisaku_func(2, b)
完了 jisaku_func(2, b)
(プロセスが完了したら進みます)
戻り値の型: <class 'tuple'>
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
(プロセスが完了したら進みます)
戻り値の型: <class 'tuple'>
経過時間 6.120 秒

(2/8) pool.apply_async()
戻り値の型: <class 'multiprocessing.pool.ApplyResult'>
戻り値の型: <class 'multiprocessing.pool.ApplyResult'>
戻り値の型: <class 'multiprocessing.pool.ApplyResult'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
経過時間 3.089 秒

(3/8) pool.map()
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(すべてのプロセスが完了したら進みます)
戻り値の型: <class 'list'>
経過時間 3.108 秒

(4/8) pool.map_async()
戻り値の型: <class 'multiprocessing.pool.MapResult'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
経過時間 3.090 秒

(5/8) pool.imap()
戻り値の型: <class 'multiprocessing.pool.IMapIterator'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(3, 'c', 15240, 16156, 10752)
最初から順に進みます(後ろのプロセスが終わってなくても)。
(2, 'b', 15240, 6608, 10532)
最初から順に進みます(後ろのプロセスが終わってなくても)。
(1, 'a', 15240, 13716, 2836)
最初から順に進みます(後ろのプロセスが終わってなくても)。
経過時間 3.083 秒

(6/8) pool.imap_unordered()
戻り値の型: <class 'multiprocessing.pool.IMapUnorderedIterator'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
どれかプロセスが終わるたびに進みます。
完了 jisaku_func(2, b)
どれかプロセスが終わるたびに進みます。
完了 jisaku_func(3, c)
どれかプロセスが終わるたびに進みます。
経過時間 3.096 秒

(7/8) pool.starmap()
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
(すべてのプロセスが完了したら進みます)
戻り値の型: <class 'list'>
経過時間 3.111 秒

(8/8) pool.starmap_async()
戻り値の型: <class 'multiprocessing.pool.MapResult'>
実行中 jisaku_func(3, c)
実行中 jisaku_func(2, b)
実行中 jisaku_func(1, a)
完了 jisaku_func(1, a)
完了 jisaku_func(2, b)
完了 jisaku_func(3, c)
経過時間 3.096 秒

結果を表示します。

(1/8) pool.apply()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 12128, 3636)
(2, 'b', 15240, 12748, 15892)
(1, 'a', 15240, 1372, 13412)

(2/8) pool.apply_async()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 8700, 17576)
(2, 'b', 15240, 13796, 15152)
(1, 'a', 15240, 6372, 11796)

(3/8) pool.map()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 11632, 18268)
(2, 'b', 15240, 7480, 17828)
(1, 'a', 15240, 11456, 17608)

(4/8) pool.map_async()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 10864, 15132)
(2, 'b', 15240, 6248, 7136)
(1, 'a', 15240, 10112, 12928)

(5/8) pool.imap()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 16156, 10752)
(2, 'b', 15240, 6608, 10532)
(1, 'a', 15240, 13716, 2836)

(6/8) pool.imap_unordered()
(number, text, ppid, pid, tid)
(1, 'a', 15240, 15228, 13220)
(2, 'b', 15240, 12516, 14116)
(3, 'c', 15240, 16328, 14756)

(7/8) pool.starmap()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 15992, 13052)
(2, 'b', 15240, 12732, 14664)
(1, 'a', 15240, 12284, 13000)

(8/8) pool.starmap_async()
(number, text, ppid, pid, tid)
(3, 'c', 15240, 13972, 3804)
(2, 'b', 15240, 16748, 3788)
(1, 'a', 15240, 13920, 7736)

(メイン関数を実行していたプロセスの ID)
pid: 15240

end

Ctrl + C の KeyboardInterrupt は使用できなかった

Ctrl + Cコントロール シー を押して KeyboardInterruptキーボードインターラプト を発生させたら、メインプロセスの処理が進まなくなりました。

繰り返し Ctrl + C を押しても、メインプロセスの中断ができませんでした。

これは、apply, apply_async, map, map_async, imap, imap_unordered, starmap, starmap_async の、どのメソッドを使った時でも、メインプロセスが進まなくなって、中断もできなくなりました。

並列処理中に Ctrl + C でプログラムを中断したいときは、.Pool() を『使わない』方法だとうまくできました。

実際に自分が使っているのは、multiprocessing.Process() で並列処理する方法です。

ほかにも色々と試したところ、concurrent.futuresコンカレント ドット フューチャースThreadPoolExecutorスレッド プール エクセキューター()ProcessPoolExecutorプロセス プール エクセキューター() で、並列処理を中断することができました。

【Python】concurrent.futures の使い方【並列処理】

あとは、外部ライブラリの joblib でも、Ctrl + C でうまく中断することができました。

こちらは、joblib のドキュメントに Ctrl + C での中断機能があるむねが書かれていました。

Interruption of multiprocesses jobs with ‘Ctrl-C’

(joblib) Parallel reference documentation – Notes

並列処理をするときって、たいてい2時間とか3時間とか、長い時間実行するので、自分は Ctrl + C での中断をよく使っているんですよね。

『ひらめいた!よし、直して再実行!』みたいな感じです。

そんな時です。

自前のデータベースの破損防止のために、データベースを閉じてから終了したい、というわけです。

実際のところ、SQLite DB のファイルが壊れたことって、ほとんどなかったです。

けれども一応、念のために、閉じてから終了したいんですよね。

そういうわけなので、新規にコードを書くときは、こういった Ctrl + C に対応したモジュールを使用していこうと考えています。

multiprocessing.Pool()を使用している記事

【Python】SQLite で日本語を全文検索するコード例【N-Gram, FTS4/FTS5】 の記事にも、concurrent.futures の ProcessPoolExecutor() を使用した Python コードがあります。

『add_texts()』という自作関数のところで使用しました。

ProcessPoolExecutor() と、multiprocessing.Pool() の2種類の方法を、切りかえて使えるように書いてみました。

コードの書き方は、ほとんど同じでした。

メモリ不足を回避する

multiprocessing.Pool() の『.map().imap().imap_unordered()』などで、1度にたくさんのデータを処理しようとしたら、メモリ不足になりました。

メモリ不足の原因と対策方法は、【Python】concurrent.futures の使い方【並列処理】 の記事に書きました。

タイトルとURLをコピーしました