【Python】マルチスレッド処理で高速化できた例【subprocess】

Python

concurrent.futuresThreadPoolExecutor を使用した場合でも、subprocess.run で外部プログラムを複数同時に実行することができました。

ProcessPoolExecutor の場合とほぼ同じ速さで処理が完了しました。

その実験結果です。

Python のマルチスレッドとマルチプロセスの比較です。

QPDF というコマンドラインツールで、EDINET の PDF の破損チェックをしてみました。

以下の 3 パターンを試しました。

  • マルチスレッド ThreadPoolExecutor で実行。
  • マルチプロセス ProcessPoolExecutor で実行。
  • 普通の関数呼び出しで実行。

CPU は Intel Core i5-9500 (6 コア 6 スレッド) です。

1 ファイルあたり約 1 MB の PDF を 10 ファイル用意して、その破損チェックにかかった時間です。

(src_files) 10 ファイル
(完了に要した時間)
 11.466069 秒 (マルチスレッド max_workers=4)
 11.575382 秒 (マルチプロセス max_workers=4)
 39.334514 秒 (普通)

1 ファイルあたり約 1 MB の PDF を 102 ファイル用意して、その破損チェックにかかった時間です。

(src_files) 102 ファイル
(完了に要した時間)
 95.163630 秒 (マルチスレッド max_workers=4)
 95.102620 秒 (マルチプロセス max_workers=4)
350.989911 秒 (普通)

この結果から、マルチスレッド処理の場合でも、マルチプロセス処理と同じくらいの速さで実行できる場合がある、ということが確認できました。

マルチスレッドで実行した場合でも、CPU のコア数に応じた時間で完了できる場合ある。

subprocess.run で起動した外部プログラム (.exe) は、やはり Python の GIL の制約を受けないようですね。

(Python) global interpreter lock (GIL)(用語集)

C 拡張をうまく使うことも役立ちます。C 拡張を時間のかかるタスクの実行に使えば、その拡張は実行が C コードで行われている間 GIL を解放でき、その間に他のスレッドで作業が進められます。 zlibhashlib など、すでにこれを行なっている標準ライブラリモジュールもあります。

(Python) グローバルインタプリタロック (Global Interpreter Lock) を取り除くことはできないのですか?(ライブラリと拡張 FAQ)

まあ、「Python チュートリアル」などの説明からも「たぶんそうだろう」とは思っていました。

スレッド処理 (threading) とは、順序的な依存関係にない複数のタスクを分割するテクニックです。スレッドは、ユーザの入力を受け付けつつ、背後で別のタスクを動かすようなアプリケーションの応答性を高めます。同じような使用例として、I/O を別のスレッドの計算処理と並列して動作させるというものがあります。

(Python) マルチスレッディング(Python チュートリアル)

CPython implementation detail: CPython は Global Interpreter Lock のため、ある時点で Python コードを実行できるスレッドは1つに限られます (ただし、いくつかのパフォーマンスが強く求められるライブラリはこの制限を克服しています)。アプリケーションにマルチコアマシンの計算能力をより良く利用させたい場合は、 multiprocessing モジュールや concurrent.futures.ProcessPoolExecutor の利用をお勧めします。 ただし、I/Oバウンドなタスクを並行して複数走らせたい場合においては、 マルチスレッドは正しい選択肢です。

(Python) threading(スレッドベースの並列処理)(Python 標準ライブラリ)

このように、I/O 待ちの話題は載っていたのですが、ほかの事例での有効性が気になっていました。

そこで今回、外部プログラムについて確認してみた、というわけです。

結果は有効でした。

Python のマルチスレッド処理は、I/O 待ちのほかにも、外部プログラムの処理待ちにも有効でした。

Python コードと実行結果を紹介します。

Python コード

ThreadPoolExecutor で「外部プログラム (.exe)」を並列実行する Python コード例です。

比較のために、ProcessPoolExecutor で実行した場合と、普通に実行した場合も書きました。

OS は Windows 10 Pro (64 bit) で、Python のバージョンは 3.8.6 (64 bit) です。

メインモジュール

プログラムの起点きてんとなるモジュールです。

ここから自作モジュールの関数を呼び出して時間を計りました。

"""app_main.py メインモジュールです。"""
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import logging, logging.handlers, multiprocessing
import datetime, pathlib, collections, sys
import jisaku
logger = logging.getLogger(__name__) # NOTSET (0)

def main():
    print(f'(Python) {sys.version}')
    print(f'(QPDF) {jisaku.exe_file_version()}')
    root = logging.getLogger()
    root.setLevel(logging.INFO) # WARNING (30) ⇒ INFO (20)
    # (2023年3月19日追記です)
    # 以下のコード例で manager の shutdown() だけ
    # try 文を書いていないのは、単に書き忘れです。
    # manager の shutdown() も、try 文や with 文などを
    # 使用して閉じてあげるのが良いと思います。
    # (追記終わり)
    manager = multiprocessing.Manager()
    q = manager.Queue(-1)
    # type(q): <class 'multiprocessing.managers.AutoProxy[Queue]'>
    qh = logging.handlers.QueueHandler(q) # NOTSET (0)
    root.addHandler(qh)
    sh = logging.StreamHandler() # NOTSET (0)
    fmt = '%(processName)s %(threadName)s %(name)s: %(message)s'
    sh.setFormatter(logging.Formatter(fmt))
    listener = logging.handlers.QueueListener(q, sh)
    listener.start()
    try:
        max_workers = 4
        src_dir = pathlib.Path(r'F:\apps\pdf')
        src_files = [(x,) for x in src_dir.glob('*.pdf')]
        src_files.sort(key=lambda x: x[0])
        src_files = src_files[:10] # デバッグ

        logger.info(f'(src_files) {len(src_files)} ファイル')
        logger.info(f'(src_dir) {src_dir}')
        logger.info(f'(exe_file) {jisaku.exe_file}')
        logger.info(f'(max_workers) {max_workers}')

        logger.info(f'(root.level) {logging.getLevelName(root.level)}')
        logger.info(f'(logger.level) {logging.getLevelName(logger.level)}')
        logger.info(f'(qh.level) {logging.getLevelName(qh.level)}')
        logger.info(f'(sh.level) {logging.getLevelName(sh.level)}')

        logger.info('----------\n')
        et_thread = proc_thread(src_files, max_workers) # マルチスレッド
        logger.info('----------\n')
        et_process = proc_process(src_files, max_workers, q) # マルチプロセス
        logger.info('----------\n')
        et_single = proc_single(src_files) # 普通
        logger.info('----------\n')

        logger.info('(完了に要した時間)')
        logger.info(f'{et_thread.total_seconds():10.6f} 秒'
            f' (マルチスレッド max_workers={max_workers})')
        logger.info(f'{et_process.total_seconds():10.6f} 秒'
            f' (マルチプロセス max_workers={max_workers})')
        logger.info(f'{et_single.total_seconds():10.6f} 秒'
            ' (普通)')
        logger.info('(以上です)')
    finally:
        listener.stop()
    manager.shutdown()
    return

def proc_thread(src_files, max_workers):
    logger.info(f'(1/3 マルチスレッド処理) start (max_workers={max_workers})')
    t_start = datetime.datetime.now()

    datas = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for data in executor.map(jisaku.func_star, src_files):
            datas.append(data)

    elapsed_time = datetime.datetime.now() - t_start
    logger.info('(マルチスレッド処理) end')
    func_result(elapsed_time, datas)
    return elapsed_time

def proc_process(src_files, max_workers, q):
    logger.info(f'(2/3 マルチプロセス処理) start (max_workers={max_workers})')
    t_start = datetime.datetime.now()

    datas = []
    with ProcessPoolExecutor(
        max_workers=max_workers, initializer=jisaku.initializer, initargs=(q,),
        ) as executor:
        for data in executor.map(jisaku.func_star, src_files):
            datas.append(data)

    elapsed_time = datetime.datetime.now() - t_start
    logger.info('(マルチプロセス処理) end')
    func_result(elapsed_time, datas)
    return elapsed_time

def proc_single(src_files):
    logger.info('(3/3 普通の処理) start')
    t_start = datetime.datetime.now()

    datas = []
    for src_file in src_files:
        data = jisaku.func_star(src_file)
        datas.append(data)

    elapsed_time = datetime.datetime.now() - t_start
    logger.info('(普通の処理) end')
    func_result(elapsed_time, datas)
    return elapsed_time

def func_result(elapsed_time, datas):
    """時間と結果を表示します。"""
    logger.info(f'(経過時間) {elapsed_time} で完了しました。')
    c_dict = collections.Counter({0: 0, 2: 0, 3: 0})
    c_dict.update(datas)
    logger.info('(QPDF --check の結果)')
    logger.info(f'(exit status) 0: {c_dict[0]} (おそらく正常な PDF の数)')
    logger.info(f'(exit status) 2: {c_dict[2]} (エラーを検出した PDF の数)')
    logger.info(f'(exit status) 3: {c_dict[3]} (警告だけを検出した PDF の数)')
    return

if __name__ == '__main__':
    main()

※ 今回はロギングでキューリスナーを使用したので、もし print() 関数と logger.info() を交互に書いたとしても、その順番通りに表示されるとは限らないです。スレッドが異なるためです。ロガーのほうはキューリスナーのスレッドが画面に書き込んでいます。もちろん、listener.start() から listener.stop() までの間での話です。

自作モジュール

外部プログラムを起動するコードです。

subprocess.run() で QPDF (qpdf.exe) を実行します。

--check は PDF ファイルをチェックするオプションです。

"""jisaku.py 自作モジュールです。"""
import logging, logging.handlers, subprocess, datetime
logger = logging.getLogger(__name__) # NOTSET (0)
exe_file = r'F:\apps\tool\qpdf\bin\qpdf.exe'

def exe_file_version():
    """QPDF のバージョンを表示します。"""
    cp = subprocess.run((exe_file, '--version'), stdout=subprocess.PIPE)
    return cp.stdout.decode('utf-8', errors='backslashreplace')

def initializer(q):
    """(マルチプロセス用) 各子プロセスのロギング設定をします。"""
    root = logging.getLogger() # 各子プロセスのルートロガー
    root.setLevel(logging.DEBUG) # WARNING (30) ⇒ DEBUG (10)
    qh = logging.handlers.QueueHandler(q) # NOTSET (0)
    root.addHandler(qh)
    logger.info(f'(root.level) {logging.getLevelName(root.level)}')
    # logger.info(f'(logger.level) {logging.getLevelName(logger.level)}')
    # logger.info(f'(qh.level) {logging.getLevelName(qh.level)}')
    return

def func_star(args): # star とはアンパック * のスターです。
    return func(*args)

def func(src_file):
    logger.info(f'{src_file.name} run')
    t = datetime.datetime.now()
    cp = subprocess.run(
        (exe_file, '--check', src_file),
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
        )
    et = datetime.datetime.now() - t
    logger.info(f'{src_file.name} {cp.returncode} {et.total_seconds():10.6f}')
    return cp.returncode

マルチプロセス処理のときのロガーについてです。

メインプロセスのルートロガーとかくプロセスのルートロガーは別物でした。

各子プロセスのルートロガーは、ロギングレベルが WARNING のままでしたし、ハンドラも取り付けられていない状態でした。

なので、各子プロセスでもルートロガーのロギングレベル設定とキューハンドラの追加を行いました。

一方で、マルチスレッドの場合は、メインプロセスのルートロガーや名前付きロガーがそのまま使用できました(実際に使用できました)。

複数スレッドからのロギングでは特別に何かをする必要はありません。

(Python) 複数のスレッドからのロギング(Logging クックブック)

実行結果

Python コードの実行結果です。

1 ファイルあたり約 1 MB の PDF を「10 ファイル」だけチェックした時の結果です。

マルチスレッドとマルチプロセスの時間を比較しました。

(Python) 3.8.6 (tags/v3.8.6:db45529, Sep 23 2020, 15:52:53) [MSC v.1927 64 bit (AMD64)]
(QPDF) qpdf version 10.6.3
Run qpdf --copyright to see copyright and license information.

MainProcess MainThread __main__: (src_files) 10 ファイル
MainProcess MainThread __main__: (src_dir) F:\apps\pdf
MainProcess MainThread __main__: (exe_file) F:\apps\tool\qpdf\bin\qpdf.exe
MainProcess MainThread __main__: (max_workers) 4
MainProcess MainThread __main__: (root.level) INFO
MainProcess MainThread __main__: (logger.level) NOTSET
MainProcess MainThread __main__: (qh.level) NOTSET
MainProcess MainThread __main__: (sh.level) NOTSET
MainProcess MainThread __main__: ----------

MainProcess MainThread __main__: (1/3 マルチスレッド処理) start (max_workers=4)
MainProcess ThreadPoolExecutor-0_0 jisaku: 2ERROR__.pdf run
MainProcess ThreadPoolExecutor-0_1 jisaku: 3WARNING.pdf run
MainProcess ThreadPoolExecutor-0_2 jisaku: S000B1DB.pdf run
MainProcess ThreadPoolExecutor-0_3 jisaku: S1007X5L.pdf run
MainProcess ThreadPoolExecutor-0_0 jisaku: 2ERROR__.pdf 2   0.890414
MainProcess ThreadPoolExecutor-0_0 jisaku: S1008UNI.pdf run
MainProcess ThreadPoolExecutor-0_0 jisaku: S1008UNI.pdf 0   3.155527
MainProcess ThreadPoolExecutor-0_0 jisaku: S1008ZIY.pdf run
MainProcess ThreadPoolExecutor-0_3 jisaku: S1007X5L.pdf 0   5.904879
MainProcess ThreadPoolExecutor-0_3 jisaku: S1009CEZ.pdf run
MainProcess ThreadPoolExecutor-0_2 jisaku: S000B1DB.pdf 0   6.139199
MainProcess ThreadPoolExecutor-0_2 jisaku: S1009E3I.pdf run
MainProcess ThreadPoolExecutor-0_0 jisaku: S1008ZIY.pdf 0   2.889925
MainProcess ThreadPoolExecutor-0_0 jisaku: S1009E60.pdf run
MainProcess ThreadPoolExecutor-0_1 jisaku: 3WARNING.pdf 3   6.998352
MainProcess ThreadPoolExecutor-0_1 jisaku: S1009IPD.pdf run
MainProcess ThreadPoolExecutor-0_3 jisaku: S1009CEZ.pdf 0   3.577264
MainProcess ThreadPoolExecutor-0_2 jisaku: S1009E3I.pdf 0   4.045927
MainProcess ThreadPoolExecutor-0_0 jisaku: S1009E60.pdf 0   3.514813
MainProcess ThreadPoolExecutor-0_1 jisaku: S1009IPD.pdf 0   4.467717
MainProcess MainThread __main__: (マルチスレッド処理) end
MainProcess MainThread __main__: (経過時間) 0:00:11.466069 で完了しました。
MainProcess MainThread __main__: (QPDF --check の結果)
MainProcess MainThread __main__: (exit status) 0: 8 (おそらく正常な PDF の数)
MainProcess MainThread __main__: (exit status) 2: 1 (エラーを検出した PDF の数)
MainProcess MainThread __main__: (exit status) 3: 1 (警告だけを検出した PDF の数)
MainProcess MainThread __main__: ----------

MainProcess MainThread __main__: (2/3 マルチプロセス処理) start (max_workers=4)
SpawnProcess-2 MainThread jisaku: (root.level) DEBUG
SpawnProcess-2 MainThread jisaku: 2ERROR__.pdf run
SpawnProcess-3 MainThread jisaku: (root.level) DEBUG
SpawnProcess-3 MainThread jisaku: 3WARNING.pdf run
SpawnProcess-4 MainThread jisaku: (root.level) DEBUG
SpawnProcess-4 MainThread jisaku: S000B1DB.pdf run
SpawnProcess-5 MainThread jisaku: (root.level) DEBUG
SpawnProcess-5 MainThread jisaku: S1007X5L.pdf run
SpawnProcess-2 MainThread jisaku: 2ERROR__.pdf 2   0.906037
SpawnProcess-2 MainThread jisaku: S1008UNI.pdf run
SpawnProcess-2 MainThread jisaku: S1008UNI.pdf 0   3.124264
SpawnProcess-2 MainThread jisaku: S1008ZIY.pdf run
SpawnProcess-5 MainThread jisaku: S1007X5L.pdf 0   5.889239
SpawnProcess-5 MainThread jisaku: S1009CEZ.pdf run
SpawnProcess-4 MainThread jisaku: S000B1DB.pdf 0   6.201686
SpawnProcess-4 MainThread jisaku: S1009E3I.pdf run
SpawnProcess-2 MainThread jisaku: S1008ZIY.pdf 0   2.874324
SpawnProcess-2 MainThread jisaku: S1009E60.pdf run
SpawnProcess-3 MainThread jisaku: 3WARNING.pdf 3   7.060847
SpawnProcess-3 MainThread jisaku: S1009IPD.pdf run
SpawnProcess-5 MainThread jisaku: S1009CEZ.pdf 0   3.530419
SpawnProcess-4 MainThread jisaku: S1009E3I.pdf 0   4.061548
SpawnProcess-2 MainThread jisaku: S1009E60.pdf 0   3.483581
SpawnProcess-3 MainThread jisaku: S1009IPD.pdf 0   4.389602
MainProcess MainThread __main__: (マルチプロセス処理) end
MainProcess MainThread __main__: (経過時間) 0:00:11.575382 で完了しました。
MainProcess MainThread __main__: (QPDF --check の結果)
MainProcess MainThread __main__: (exit status) 0: 8 (おそらく正常な PDF の数)
MainProcess MainThread __main__: (exit status) 2: 1 (エラーを検出した PDF の数)
MainProcess MainThread __main__: (exit status) 3: 1 (警告だけを検出した PDF の数)
MainProcess MainThread __main__: ----------

MainProcess MainThread __main__: (3/3 普通の処理) start
MainProcess MainThread jisaku: 2ERROR__.pdf run
MainProcess MainThread jisaku: 2ERROR__.pdf 2   0.859197
MainProcess MainThread jisaku: 3WARNING.pdf run
MainProcess MainThread jisaku: 3WARNING.pdf 3   6.639038
MainProcess MainThread jisaku: S000B1DB.pdf run
MainProcess MainThread jisaku: S000B1DB.pdf 0   5.826774
MainProcess MainThread jisaku: S1007X5L.pdf run
MainProcess MainThread jisaku: S1007X5L.pdf 0   5.561190
MainProcess MainThread jisaku: S1008UNI.pdf run
MainProcess MainThread jisaku: S1008UNI.pdf 0   2.983674
MainProcess MainThread jisaku: S1008ZIY.pdf run
MainProcess MainThread jisaku: S1008ZIY.pdf 0   2.733737
MainProcess MainThread jisaku: S1009CEZ.pdf run
MainProcess MainThread jisaku: S1009CEZ.pdf 0   3.374201
MainProcess MainThread jisaku: S1009E3I.pdf run
MainProcess MainThread jisaku: S1009E3I.pdf 0   3.858447
MainProcess MainThread jisaku: S1009E60.pdf run
MainProcess MainThread jisaku: S1009E60.pdf 0   3.280477
MainProcess MainThread jisaku: S1009IPD.pdf run
MainProcess MainThread jisaku: S1009IPD.pdf 0   4.217779
MainProcess MainThread __main__: (普通の処理) end
MainProcess MainThread __main__: (経過時間) 0:00:39.334514 で完了しました。
MainProcess MainThread __main__: (QPDF --check の結果)
MainProcess MainThread __main__: (exit status) 0: 8 (おそらく正常な PDF の数)
MainProcess MainThread __main__: (exit status) 2: 1 (エラーを検出した PDF の数)
MainProcess MainThread __main__: (exit status) 3: 1 (警告だけを検出した PDF の数)
MainProcess MainThread __main__: ----------

MainProcess MainThread __main__: (完了に要した時間)
MainProcess MainThread __main__:  11.466069 秒 (マルチスレッド max_workers=4)
MainProcess MainThread __main__:  11.575382 秒 (マルチプロセス max_workers=4)
MainProcess MainThread __main__:  39.334514 秒 (普通)
MainProcess MainThread __main__: (以上です)

QPDF の動作確認のために、「エラーの出る PDF」と「警告だけの出る PDF」も 1 つずつ用意して実行したのですが、ちゃんと検出できていました。

結果の解釈

マルチスレッドの場合でも、max_workers=4 の数だけ一斉に PDF のチェックが始まっていました。

想像するに、スレッドの中で外部プログラムを起動したら、その終了待ちの間、今度は別のスレッドが動き始めて、さらに外部プログラムを起動していった、という感じでしょうか。

Python の世界では平行処理でも、外部プログラムの方は複数同時に動いていくので、そこは並列処理になっているようでした。

結果的に、マルチスレッドはマルチプロセスとほぼ同じ時間で完了していた、と解釈しました。

外部プログラムを実行して結果を受け取るだけなら、マルチスレッド処理でも良さそうですね。

マルチスレッドが遅い(速くならない)というのは、場合による、という感じでした。

本当は CPU やメモリといった「リソースの消費具合」も確認したかったのですが、大変すぎたのでやめました。代わりにタスクマネージャーを見ていましたが、マルチスレッドとマルチプロセスでの差は分かりませんでした(今回の実験では見えない程度の差でした)。

Python コードの説明

Python マニュアルの場所です。

(Python) ThreadPoolExecutor

(Python) ProcessPoolExecutor

(Python) .map(func, *iterables, timeout=None, chunksize=1)

(Python) subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None, **other_popen_kwargs)

(Python) class collections.Counter([iterable-or-mapping])

個々の解説記事です。

【Python】PDF の破損をチェックするコード例【QPDF】

【Python】画像の破損をチェックするコード例【ImageMagick】

【Python】subprocess.run() の使い方【使用例】

【Python】ロガーでロギングするコード例【logging】

【Python】concurrent.futures の使い方【並列処理】

【Python】concurrent.futures でログを記録するコード例(まとめて記録)

【EDINET API】Python で XBRL を取得する方法【決算分析】

以上です。

スポンサーリンク
シェアする(押すとSNS投稿用の『編集ページ』に移動します)
フォローする(RSSフィードに移動します)
スポンサーリンク
シラベルノート
タイトルとURLをコピーしました