Python の multiprocessing で並列処理を実行するときに、ログを出力するコード例です。
Python 標準ライブラリの logging を使用して、ログを記録します。
プロセスごとに別々のロガーを取得して、プロセスごとに別々のログファイルに書き込みます。
その Python コード例を紹介します。
multiprocessing でログを記録するコード例
メインプロセスでは『ルートロガー』を取得して、子プロセスでは『名前付きのロガー』を取得します。
ログファイルは、プロセスごとに別々に作成して、別々に書き込みます。
6コアで6プロセスの並列処理を実行したら、7つのログファイルができました(メインプロセス×1つ+子プロセス×6つ)。
果たして、この方法がベストプラクティスなのかは分かりません。
自分は今のところ、以下のような Python コードに落ち着きました。
"""Python ファイル名 logging_mp_main.py"""
from pathlib import Path
from multiprocessing import Process
import logging
if __name__ == '__main__':
print('(自身をスクリプトとして実行した)')
logger = logging.getLogger()
elif __name__ == '__mp_main__':
print('(自身をマルチプロセス処理で実行した)')
logger = None # とりあえず変数だけ用意しておく
else:
# 他のPythonモジュールから呼び出したとき。または、
# 他のPythonモジュールからマルチプロセス処理で呼び出したとき。
# __name__ -> 'logging_main'
logger = logging.getLogger(__name__)
# Python ファイル名を表示
print(f'Python ファイル名 -> {Path(__file__).name}')
# __name__ 属性を表示
print(f'__name__ 属性 -> {__name__}')
def main():
"""メイン関数"""
# ロガーのロギングレベルを設定
logger.setLevel(logging.DEBUG)
# ストリームハンドラを取得 → ハンドラのロギングレベルを設定 → ロガーに追加
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG) # デバッグ レベル
logger.addHandler(sh)
# ログファイルのパスを決める
log_txt = Path(__file__).parent.joinpath(f'log-{logger.name}.txt')
# Path(__file__).parent -> Python ファイルと同じフォルダ
# .joinpath(f'log_{logger.name}.txt') -> ロガー名を入れたファイル名
# ファイルハンドラを取得 → ハンドラのロギングレベルを設定 → ロガーに追加
fh = logging.FileHandler(log_txt, encoding='utf-8')
fh.setLevel(logging.WARNING) # ワーニング/ウォーニング レベル
# fh.setLevel(logging.CRITICAL) # クリティカル レベル
logger.addHandler(fh)
# ロガーを使う
logger.debug(f'ロガー名 logger.name -> {logger.name}')
logger.debug(f'ログファイル名 fh.baseFilename -> {fh.baseFilename}')
logger.debug(f'{logger.name} 開始')
logger.warning(f'{logger.name} ワーニング (例) パスが255文字を超えています')
logger.error(f'{logger.name} エラー (例) ファイルが存在しません')
print('---')
# [マルチプロセス処理]
# === プロセスを1つ作る ===
p = Process(target=mp_func, args=(1,))
# === プロセスを開始する ===
p.start()
# === プロセスが終了するまで待つ ===
p.join()
print('---')
# ロガーを使う (続き)
logger.debug(f'{logger.name} 終了')
# ログのファイルサイズがゼロなら削除する (必要なら)
if log_txt.stat().st_size == 0:
# ファイルハンドラを閉じる
# (これでログファイルを削除できるようになった)
fh.close()
# 空っぽのログファイルを削除
log_txt.unlink()
return
def mp_func(process_id):
"""マルチプロセス処理"""
# ロガー名を決めて、ロガーを取得。
logger_name = f'{__name__}.{process_id}'
logger = logging.getLogger(logger_name)
# ロガーのロギングレベルを設定
logger.setLevel(logging.DEBUG)
# ストリームハンドラを取得 → ハンドラのロギングレベルを設定 → ロガーに追加
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG) # デバッグ レベル
logger.addHandler(sh)
# ログファイルのパスを決める
log_txt = Path(__file__).parent.joinpath(f'log-{logger.name}.txt')
# Path(__file__).parent -> Python ファイルと同じフォルダ
# .joinpath(f'log_{logger.name}.txt') -> ロガー名を入れたファイル名
# ファイルハンドラを取得 → ハンドラのロギングレベルを設定 → ロガーに追加
fh = logging.FileHandler(log_txt, encoding='utf-8')
fh.setLevel(logging.WARNING) # ワーニング/ウォーニング レベル
# fh.setLevel(logging.CRITICAL) # クリティカル レベル
logger.addHandler(fh)
# ロガーを使う
logger.debug(f'ロガー名 logger.name -> {logger.name}')
logger.debug(f'ログファイル名 fh.baseFilename -> {fh.baseFilename}')
logger.debug(f'{logger.name} マルチプロセス処理 開始')
logger.warning(f'{logger.name} ワーニング (例) 処理が大変です')
logger.error(f'{logger.name} エラー (例) まずいです')
logger.critical(f'{logger.name} クリティカル (例) もうだめです')
logger.debug(f'{logger.name} マルチプロセス処理 終了')
return
if __name__ == '__main__':
# マルチプロセス処理の時はスキップされる
main()
実行結果
コマンドプロンプトの表示
『print() 関数』と『ロガーのストリームハンドラ』が表示した内容です。
ここに『メインプロセス』と『子プロセス』の出力が、すべて表示されました。
● コマンドプロンプト
(自身をスクリプトとして実行した)
Python ファイル名 -> logging_mp_main.py
__name__ 属性 -> __main__
ロガー名 logger.name -> root
ログファイル名 fh.baseFilename -> ***\log-root.txt
root 開始
root ワーニング (例) パスが255文字を超えています
root エラー (例) ファイルが存在しません
---
(自身をマルチプロセス処理で実行した)
Python ファイル名 -> logging_mp_main.py
__name__ 属性 -> __mp_main__
ロガー名 logger.name -> __mp_main__.1
ログファイル名 fh.baseFilename -> ***\log-__mp_main__.1.txt
__mp_main__.1 マルチプロセス処理 開始
__mp_main__.1 ワーニング (例) 処理が大変です
__mp_main__.1 エラー (例) まずいです
__mp_main__.1 クリティカル (例) もうだめです
__mp_main__.1 マルチプロセス処理 終了
---
root 終了
メインプロセスのログファイル
メインプロセスの『ロガーのファイルハンドラ』が記録した内容です。
● ログファイル名 'log-root.txt'
root ワーニング (例) パスが255文字を超えています
root エラー (例) ファイルが存在しません
メインプロセスのファイルハンドラのレベル設定で、fh.setLevel(logging.WARNING) を指定しました。
なので、WARNING よりも軽微なログである logger.debug() が無視されました。
子プロセスのログファイル
子プロセスの『ロガーのファイルハンドラ』が記録した内容です。
● ログファイル名 'log-__mp_main__.1.txt'
__mp_main__.1 ワーニング (例) 処理が大変です
__mp_main__.1 エラー (例) まずいです
__mp_main__.1 クリティカル (例) もうだめです
子プロセスのファイルハンドラのレベル設定で、fh.setLevel(logging.WARNING) を指定しました。
なので、WARNING よりも軽微なログである logger.debug() が無視されました。
『サイズがゼロのログファイル』を自動で削除する
自分はよく、プロセスの終了時に『サイズがゼロのログファイル』を削除しています。
自分の決算分析のプログラムでは、正常に完了したらそれで OK だったので、ログファイルを毎回手動で削除していました。
ただ、だんだんそれが面倒になってきたので
『サイズがゼロで終わったものくらいは、自動で消えてほしい。』
そう思うようになりました。
それで、ファイルサイズを調べて消す処理を追加しました。
マルチプロセスで実行したら、毎回ログファイルがたくさんできて、大変だったんですよね。
ちょっとだけ快適になりました。
エラーが記録されたファイルだけが残るので、とても便利でした。
ファイルサイズを調べる以外にも、ログファイルをオープンして、指定した文字列があれば残すとか、いろいろできそうです。
Python マニュアルの場所
コード例で使用した機能のマニュアルの場所です。
インポート関連のモジュール属性
ダブルアンダースコアーネーム __name__
__name__
ダブルアンダースコアーファイル __file__
__file__
スコープの名前
ダブルアンダースコアーメイン '__main__'
__main__ — トップレベルのスクリプト環境
f'' エフストリング (f-string)
エフ f''
から始まる文字列
フォーマット済み文字列リテラル
multiprocessing マルチプロセッシング
プロセス クラス Process()
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
スタート メソッド .start()
start()
ジョイン メソッド .join()
join([timeout])
pathlib パスリブ
パス クラス Path()
class pathlib.Path(*pathsegments)
ペアレント プロパティ .parent
PurePath.parent
ジョインパス メソッド .joinpath()
PurePath.joinpath(*other)
スタット メソッド .stat()
Path.stat()
エスティーサイズ .st_size
st_size
アンリンク メソッド .unlink()
Path.unlink(missing_ok=False)
logging ロギング
ゲットロガー メソッド .getLogger()
logging.getLogger(name=None)
セットレベル メソッド .setLevel()
『ロガー』オブジェクトの setLevel(level)
『ハンドラ』オブジェクトの setLevel(level)
アッドハンドラ メソッド .addHandler()
addHandler(hdlr)
デバッグ メソッド .debug()
debug(msg, *args, **kwargs)
ウォーニング メソッド .warning()
warning(msg, *args, **kwargs)
エラー メソッド .error()
error(msg, *args, **kwargs)
クローズ メソッド .close()
close()
ストリームハンドラ クラス logging.StreamHandler()
class logging.StreamHandler(stream=None)
ファイルハンドラ クラス logging.FileHandler()
class logging.FileHandler(filename, mode=’a’, encoding=None, delay=False)