【Python】Arelle をマルチプロセスで実行するコード例【XBRL】

XBRL 読み込みライブラリの Arelleアレル を、マルチプロセスで実行するコード例を書きました。

Arelle を複数同時に実行できるようにしたことで、全体の処理時間を、大幅に短縮することができました。

CPU もフルパワーを発揮していました。

Arelle は、『XBRL 報告書インスタンスとそれに関係するタクソノミ』までたどってくれたり、『細かいところの名前空間接頭辞』まで解釈してくれたりして、とても便利でした。

ですが、そのぶん、どうしても処理に時間がかかっていました。

簡単な自作パーサーと比べると、どうしても遅かったです。

そこで、Arelle をマルチプロセスで実行するコード例を書きました。

その結果、数万ファイルの XBRL を読み込むのにかかっていた時間が、CPU のコア数の分だけ短縮されて、とても快適になりました。

プログラムを改良したときの XBRL の再解析が、とても早く終わるようになりました。

良かったです。

方法

Arelle を複数並列で(マルチプロセスで)実行する方法です。

ポイントは、Arelle のキャッシュフォルダでした。

Python のワーカープロセスごとに(プロセスごとに)、Arelle のキャッシュフォルダを別々に用意してから、XBRL を読み込むようにしました。

Arelle のキャッシュフォルダの設定方法です。

Arelle は、Cntlr.py の中でコマンドライン引数ひきすうのリスト sys.argv をチェックしていたので、それを利用してみました。

キャッシュフォルダを指定するコマンドライン引数は、'--xdgConfigHome=フォルダパス' でした。

(Arelle) xdgConfigHome (Command Line Operation)

ワーカープロセスの中で、sys.argv に '--xdgConfigHome=フォルダパス' を追加してから、Arelle の Cntlr.Cntlr() を実行します。

これで、Arelle が、プロセスごとに別々のフォルダを使ってくれました。

マルチプロセス処理も成功しました。

べつに、いちいちキャッシュフォルダを分けなくても動くとは思うのですが(未検証)、複数のプロセスが同時に『書き込む/読み込む』可能性を考えると、やっぱり分けておいたほうがいいのかな、と思いました。

まあ、sys.argv のリストに、あとから引数を追加するのは少しイマイチな気がしましたが、実際にしばらく使ってみて、特に不具合はありませんでした。

なので、このアプローチを採用しました。

もちろん、もっと良いやり方があるかもしれません。

ほかにも『キャッシュフォルダ』の指定方法はありました。

例えば、Arelle ライブラリの config フォルダの中に、拡張子なしのテキストファイル XDG_CONFIG_HOME を用意して、そのなかにフォルダパスを書く方法です。

ですが、XDG_CONFIG_HOME だと、プロセスごとにフォルダを変えることが困難でした。

あとは、Arelle の Cntlr.py を少し改造して、キャッシュフォルダの引数を追加する方法も考えました。

けれども、そこまでするくらいなら、コマンドライン引数のほうが簡単でした。

コード例

Arelle をマルチプロセスで実行する Python コード例です。

concurrent.futuresProcessPoolExecutor を使用しました。

自分は試していませんが、multiprocessing.Pool でも書けると思います。

コード例の中の具体的な『ファイルパス』と『フォルダパス』は、自身の環境に合わせて変更します。

app_main.py

メインモジュールのコードです。

""" F:\project\kabu\app_main.py
Arelle を使用して、XBRL の zip からデータを取得します。
マルチプロセス処理に対応したコード例です。
"""

from pathlib import Path
import logging
import datetime
from concurrent.futures import ProcessPoolExecutor
import app_xbrl # 追加の自作モジュールです。

# ルートロガーを取得します。
lg = logging.getLogger()


def main():
    """メイン関数です。"""
    # ワーカープロセスの数を決めます。
    # max_workers = 1 # シングルプロセス処理
    max_workers = 5 # マルチプロセス処理

    # EDINET XBRL のファイルパスを決めます。
    src_files = [
        r'F:\project\download\S100JKNH_1.zip',
        r'F:\project\download\S1008JYI_1.zip',
        r'F:\project\download\S100B922_1.zip',
        r'F:\project\download\S100DZ6E_1.zip',
        r'F:\project\download\S100GUI8_1.zip',
    ]

    # データの保存フォルダを決めます。
    # フォルダが無ければ作ります。
    data_dir = Path(r'F:\project\data')
    data_dir.mkdir(exist_ok=True)

    # Arelle のキャッシュを保存するフォルダを決めます。
    # フォルダが無ければ作ります。
    # あとでこのフォルダの中に、プロセスの数だけ
    # Arelle のキャッシュフォルダを作っていきます。
    arelle_temp_dir = Path(r'F:\project\arelle_temp')
    arelle_temp_dir.mkdir(exist_ok=True)

    # ログの保存フォルダを決めます。
    # フォルダが無ければ作ります。
    log_dir = Path(r'F:\project\log')
    log_dir.mkdir(exist_ok=True)

    # メインプロセスのルートロガーに、
    # ロギングレベルとハンドラを設定します。
    lg.setLevel(logging.DEBUG)
    sh = logging.StreamHandler()
    sh.setLevel(logging.DEBUG)
    lg.addHandler(sh)
    log_file = log_dir.joinpath('log_main.txt')
    fh = logging.FileHandler(filename=log_file, mode='w', encoding='utf-8')
    fh.setLevel(logging.INFO)
    lg.addHandler(fh)

    start_time = datetime.datetime.now()
    try:
        # メインの処理を実行します。
        proc(max_workers, src_files, data_dir, arelle_temp_dir, log_dir)
    except KeyboardInterrupt as e:
        lg.debug(f'main: {e.__class__.__name__}')
    finally:
        elapsed_time = datetime.datetime.now() - start_time
        lg.debug(f'(経過時間) {str(elapsed_time).rsplit(".")[0]}')

        # ログファイルを閉じます。
        fh.close()

        # サイズがゼロだったら、ログファイルを削除します。
        if log_file.stat().st_size == 0:
            log_file.unlink()
    return


def proc(max_workers, src_files, data_dir, arelle_temp_dir, log_dir):
    """メインの処理を実行する関数です。"""
    # 設定を表示します。
    lg.debug(f'(src_files) {len(src_files)} 個')
    lg.debug(f'(data_dir) {data_dir}')
    lg.debug(f'(arelle_temp_dir) {arelle_temp_dir}')
    lg.debug(f'(log_dir) {log_dir}\n')

    if max_workers == 1:
        lg.debug(f'(シングルプロセス) max_workers={max_workers}')

        # 実行します。
        app_xbrl.xbrl_proc(0, max_workers, src_files, data_dir, arelle_temp_dir)

    elif max_workers >= 2:
        lg.debug(f'(マルチプロセス) max_workers={max_workers}')

        def make_args():
            """引数リスト (args) を作る関数です。"""
            args = []
            s = 0 # s は start の略です。
            for n in range(max_workers):
                lg.debug(f'(n_worker) {n+1}')

                # できるだけ均等になるように、
                # max_workers の数だけ、src_files を分割します。
                # (ほかにも、分割のやり方はいろいろありました。)
                t = (len(src_files) + n) // max_workers

                # 演算子の // は、切り捨て除算です。
                # 割り算の商だけを返してくれました。
                lg.debug(f'({len(src_files)} + {n}) // {max_workers} = {t}')
                lg.debug(f'src_files[{s}:{s+t}]')

                # プロセスごとにログファイルパスを指定します。
                log_mp_file = log_dir.joinpath(f'log_mp_{n+1}.txt')
                lg.debug(f'(log_mp_file) {log_mp_file}\n')

                # 分割したリスト src_files[s:s+t] と、
                # その他の引数を追加します。
                args.append((
                    n + 1, max_workers, src_files[s:s+t], data_dir,
                    arelle_temp_dir, log_mp_file,
                ))

                # 分割する時の開始位置 s を更新します。
                s += t
            # (デバッグ) 分割したリストの中身の数は、
            # 元のリストの長さと同じはず。
            assert sum(len(x[2]) for x in args) == len(src_files)
            return args

        lg.debug('引数リストを作ります。')
        args = make_args()

        # プロセスプールを作ります。
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            # 実行します。
            results = executor.map(app_xbrl.mp_xbrl_proc, args)
            lg.debug(f'(results) {type(results)}')

            # 処理が終わるまで待ちます。そのために、
            # for 文でも next 関数でも tuple 関数でも
            # list 関数でも set 関数でも何でもいいので、
            # ジェネレータの results をたどります。
            for result in results:
                pass

            # (デバッグ)
            # try:
            #     lg.debug(next(results))
            #     lg.debug(next(results))
            #     lg.debug(next(results))
            #     lg.debug(next(results))
            #     lg.debug(next(results))
            #     lg.debug(next(results))
            # except StopIteration as e:
            #     lg.debug(e.__class__.__name__)
            # lg.debug(tuple(results))
            # lg.debug(list(results))
            # lg.debug(set(results))
    return


if __name__ == '__main__':
    main()

ところで、max_workers=1 のときのシングルプロセス処理は、別に分けなくても OK でした。

ですが、いろいろな検証をするときに、『普通のシングルプロセスとして実行したいな』ということが、よくありました。

なので、このような書き方をしました。

app_xbrl.py

マルチプロセスで実行する部分のコードです。

Arelle のキャッシュフォルダが、プロセスごとに別々になるようにしました。

""" F:\project\kabu\app_xbrl.py
追加の自作モジュールです。
XBRL 報告書インスタンス (.xbrl) からデータを取得します。
結果は SQLite DB に保存しました。
"""

from pathlib import Path
from os.path import basename
import os
import zipfile
import re
import sqlite3
import json
from decimal import Decimal
import datetime
import logging
from traceback import format_exc, format_exception_only
import time

# Arelle ライブラリのフォルダを
# sys.path のリストに追加してから、
# Arelle の Cntlr をインポートします。
import sys
sys.path.append(r'F:\project\kabu\Arelle')
from Arelle.arelle import Cntlr

# 名前付きロガーを取得します。
lg = logging.getLogger(__name__)


def mp_xbrl_proc(args):
    """引数をスター * でアンパックするための関数です。"""
    return _mp_xbrl_proc(*args)


def _mp_xbrl_proc(n_worker, max_workers, src_files, data_dir, arelle_temp_dir, log_file):
    """
    マルチプロセス処理で実行します。
    プロセスごとに、ロガーの設定をしてから実行します。
    この関数は、各ワーカープロセスで、1 回だけ実行します。
    """
    # もし、初回の実行でなければ、何もせずに終了します。
    # (ロガーに追加したハンドラの数を利用して判定しました)
    if len(lg.handlers) != 0:
        print(f'len(lg.handlers): {len(lg.handlers)}')
        print('executor.map() を 2 回以上実行した可能性があります。')
        print('何もせずに終了します。')
        return

    # ワーカープロセスの名前付きロガーに、
    # ロギングレベルとハンドラを設定します。
    lg.setLevel(logging.DEBUG)
    sh = logging.StreamHandler()
    sh.setLevel(logging.DEBUG)
    lg.addHandler(sh)
    fh = logging.FileHandler(filename=log_file, mode='w', encoding='utf-8')
    fh.setLevel(logging.INFO)
    lg.addHandler(fh)

    try:
        xbrl_proc(n_worker, max_workers, src_files, data_dir, arelle_temp_dir)
    except KeyboardInterrupt as e:
        lg.debug(f'{n_worker}: {e.__class__.__name__}')
    except Exception as e:
        lg.error(f'{n_worker}: {format_exc()}')
    finally:
        # ログファイルを閉じます。
        fh.close()

        # サイズがゼロだったら、ログファイルを削除します。
        if log_file.stat().st_size == 0:
            log_file.unlink()
    return


def xbrl_proc(n_worker, max_workers, src_files, data_dir, arelle_temp_dir):
    """
    XBRL 報告書インスタンス (.xbrl) を、Arelle でパースします。
    (Web 接続が発生します)
    ctrl.modelManager.load(xbrl_file) を実行したときに、
    Arelle が Web からタクソノミを取得していました。
    """
    # ログ表示を見やすくするために、
    # n_worker の秒数だけ待機させました。
    # 通常は必要ありません。
    # (コード例の実行結果を見やすくするためだけに使用しました)
    time.sleep(n_worker)

    lg.debug(f'[開始] n_worker {n_worker} (PID {os.getpid()})')

    # プロセスごとに、Arelle のキャッシュフォルダを指定します。
    # フォルダが無ければ作ります。
    arelle_dir = arelle_temp_dir.joinpath(f'arelle_{n_worker}')
    arelle_dir.mkdir(exist_ok=True)
    lg.debug(f'(arelle_dir) {arelle_dir}')

    # Arelle にキャッシュフォルダを指示するための
    # コマンドライン引数を作ります。
    cmd_arelle_dir = '--xdgConfigHome=' + str(arelle_dir)
    lg.debug(f'(cmd_arelle_dir) {cmd_arelle_dir}')

    # Python のコマンドラインの引数リストに、
    # Arelle のキャッシュフォルダパスを追加します。
    # これで、Cntlr.Cntlr() を実行したときに、
    # 指定したフォルダが使用されました。
    sys.argv.append(cmd_arelle_dir)
    lg.debug(f'(sys.argv) {sys.argv}')

    # Arelle のコントローラーを作ります。
    ctrl = Cntlr.Cntlr(logFileName='logToPrint')
    try:
        # --xdgConfigHome の引数が反映されたことを確認します。
        if str(arelle_dir) == str(Path(ctrl.webCache.cacheDir).parent.parent):
            # OK
            lg.debug(f'(ctrl.webCache.cacheDir) {ctrl.webCache.cacheDir}\n')
        else:
            # 失敗 (プログラムを終了します)
            lg.error('Arelle の設定 --xdgConfigHome に失敗しています。')
            lg.error(f'(ctrl.webCache.cacheDir) {ctrl.webCache.cacheDir}\n')
            return

        # すべての zip ファイルをたどっていきます。
        for src_file in src_files:
            xbrl_paths = []
            try:
                # zip ファイルを開きます。
                with zipfile.ZipFile(file=src_file, mode='r') as z:
                    # zip の中のファイルを、すべてたどります。
                    for filename in z.namelist():
                        # XBRL 報告書インスタンス (.xbrl) なら取得します。
                        if re.search(r'\.xbrl$', filename):
                            xbrl_paths.append(filename)
            except zipfile.BadZipFile as e:
                lg.error(f'{basename(src_file)} ' +
                    format_exception_only(type(e), e)[-1].rstrip())

            # XBRL 報告書インスタンス (.xbrl) を処理していきます。
            for xbrl_path in xbrl_paths:
                # zip パスに XBRL パスをくっつけて、
                # Arelle に渡すパスを作成します。
                xbrl_file = '{src_file}\\{xbrl_path}'.format(
                    src_file=src_file,
                    xbrl_path=xbrl_path.replace('/', '\\'),
                )

                # Arelle で XBRL を読み込みます。
                model_xbrl = ctrl.modelManager.load(xbrl_file)
                try:
                    # fact を取得します。
                    facts = get_fact_datas(model_xbrl, basename(xbrl_path))
                finally:
                    # modelXbrl を閉じます。
                    ctrl.modelManager.close()

                # 結果を SQLite DB に保存します。
                db_file = data_dir.joinpath(f'{Path(src_file).stem}.db')
                save_to_sqlite(db_file, facts)

            # 進捗を表示します。
            lg.debug(f'{n_worker}: {src_file}')
    finally:
        # Arelle のコントローラーを閉じます。
        ctrl.close()
    lg.debug(f'[終了] n_worker {n_worker} (PID {os.getpid()})')

    # 結果は SQLite DB に保存したので、
    # 戻り値は設定しません (戻り値は None になります)。
    return


def get_fact_datas(model_xbrl, xbrl_basename):
    """fact を取得する関数です。"""
    # すべての fact からデータを取得していきます。
    facts = []
    for fact in model_xbrl.facts:
        # 日本語ラベルを取得します。
        label_ja = fact.concept.label(preferredLabel=None, lang='ja', linkroleHint=None)

        # タグの値を取得します。(勘定科目の金額や文章など)
        # (説明) Decimal 型は、そのままでは SQLite DB に
        # 保存できなかったので、文字列に変換しました。
        if isinstance(fact.xValue, Decimal):
            # Decimal型は、浮動小数点数の桁数を保持した状態で、
            # 文字列にすることができました。
            x_value = str(fact.xValue)
        elif isinstance(fact.xValue, datetime.datetime):
            # 日付型も文字列にしました。
            # マイクロ秒は使われていなかったので、無視しました。
            x_value = fact.xValue.strftime('%Y-%m-%d %H:%M:%S')
        else:
            # その他の場合 (NoneType, int, bool, str) は、
            # そのまま取得しました。
            x_value = fact.xValue

        # 単位を取得します。
        if fact.unit is None:
            unit = None
        else:
            # .unit の.value とは、複雑な分数形式の単位などを、
            # Arelle が人の見やすい形式に整えた文字列でした。
            # (例) 'JPY / shares'
            unit = fact.unit.value

        # 開始日、終了日、時点(期末日) の日付を取得します。
        if fact.context.startDatetime:
            # 開始日
            start_date = fact.context.startDatetime.strftime('%Y-%m-%d')
        else:
            start_date = None

        if fact.context.endDatetime:
            # 終了日 ※ (注意)
            # ・1 日分だけ加算された日付になっていました。
            # ・また、開始日が無い時でも『instance 時点(期末日)』の
            #   日付が入っていました。
            end_date = fact.context.endDatetime.strftime('%Y-%m-%d')
        else:
            end_date = None

        if fact.context.instantDatetime:
            # 時点 (期末日) ※ (注意)
            # ・1 日分だけ加算された日付になっていました。
            instant_date = fact.context.instantDatetime.strftime('%Y-%m-%d')
        else:
            instant_date = None

        # 『fact.context.propertyView 属性』の中から、
        # 調整無しの終了日と時点(期末日)を取得します。
        end_date_pv = None
        instant_date_pv = None
        for item in fact.context.propertyView:
            if item:
                if item[0] == 'endDate':
                    end_date_pv = item[1]
                elif item[0] == 'instant':
                    instant_date_pv = item[1]

        # シナリオ (scenario) を取得します。
        scenario_datas = []
        for (dimension, dim_value) in fact.context.scenDimValues.items():
            scenario_datas.append({
                'ja': (
                    dimension.label(preferredLabel=None, lang='ja', linkroleHint=None),
                    dim_value.member.label(preferredLabel=None, lang='ja', linkroleHint=None)),
                'en': (
                    dimension.label(preferredLabel=None, lang='en', linkroleHint=None),
                    dim_value.member.label(preferredLabel=None, lang='en', linkroleHint=None)),
                'id': (
                    dimension.id,
                    dim_value.member.id),
            })

        # SQLite DB に保存するために、
        # シナリオのリストを json に変換します。
        if scenario_datas:
            # json をコンパクトにするために、
            # ensure_ascii と separators の設定を使用しました。
            scenario_json = json.dumps(
                scenario_datas, ensure_ascii=False, separators=(',', ':'))
        else:
            scenario_json = None

        # リストに追加します。
        # (実際にはもっとたくさんの項目を取得して活用しています)
        facts.append([
            xbrl_basename, # (例) 'jpcrp030000-asr-001_E00004-000_2020-05-31_01_2020-08-28.xbrl'
            fact.prefix, # (例) 'jppfs_cor'
            label_ja, # (例) '売上高'
            fact.localName, # (例) 'NetSales'

            type(fact.xValue).__name__, # (例) 'Decimal', 'DateTime', 'NoneType', 'str'
            x_value, # (例) '58179890000', '1379.56', '2019-05-31 00:00:00'
            unit, # (例) 'JPY', 'shares', 'JPY / shares', 'pure'

            start_date, # (例) '2019-06-01'
            end_date, # (例) '2020-06-01'
            instant_date, # (例) '2020-06-01'

            end_date_pv, # (例) '2020-05-31'
            instant_date_pv, # (例) '2020-05-31'

            fact.contextID, # (例) 'CurrentYearDuration'
            scenario_json, # 元のリストは、json.loads() で復元することができました。
        ])
    return facts


def save_to_sqlite(db_file, facts):
    """SQLite DB に結果を保存します。"""
    conn = sqlite3.connect(db_file)
    c = conn.cursor()
    try:
        # テーブル xbrl が無ければ作ります。
        # enddatepv と instantdatepv の末尾の pv は、
        # propertyView の略です。
        c.execute('''CREATE TABLE IF NOT EXISTS xbrl(
            xbrl TEXT,
            prefix TEXT,
            ja TEXT,
            localname TEXT,

            xtype TEXT,
            xvalue TEXT,
            unit TEXT,

            startdate TEXT,
            enddate TEXT,
            instantdate TEXT,

            enddatepv TEXT,
            instantdatepv TEXT,

            contextid TEXT,
            scenario TEXT
            )''')

        # データをまとめて追加します。
        c.executemany(
            'INSERT INTO xbrl VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)',
            facts)

        # コミットします。
        conn.commit()
    except Exception as e:
        conn.rollback()
        lg.error(f'{e.__class__.__name__} {basename(db_file)} rollback')
        lg.error(format_exc())
    finally:
        c.close()
        conn.close()
    return

実行結果

コード例の実行結果です。

シングルプロセス

シングルプロセスで実行したときのコマンドプロンプトの表示です。

プログラム全体の実行時間は、22秒でした。

(src_files) 5 個
(data_dir) F:\project\data
(arelle_temp_dir) F:\project\arelle_temp
(log_dir) F:\project\log

(シングルプロセス) max_workers=1
[開始] n_worker 0 (PID 4704)
(arelle_dir) F:\project\arelle_temp\arelle_0
(cmd_arelle_dir) --xdgConfigHome=F:\project\arelle_temp\arelle_0
(sys.argv) ['F:\\project\\kabu\\app_main.py', '--xdgConfigHome=F:\\project\\arelle_temp\\arelle_0']
(ctrl.webCache.cacheDir) F:\project\arelle_temp\arelle_0\arelle\cache

0: F:\project\download\S100JKNH_1.zip
0: F:\project\download\S1008JYI_1.zip
0: F:\project\download\S100B922_1.zip
0: F:\project\download\S100DZ6E_1.zip
0: F:\project\download\S100GUI8_1.zip
[終了] n_worker 0 (PID 4704)
(経過時間) 0:00:22

マルチプロセス

マルチプロセスで実行したときのコマンドプロンプトの表示です。

プログラム全体の実行時間は、10秒でした。

今回は zip ファイルを5個しか読み込んでいないのと、time.sleep(n_worker) を使用したために、2倍程度の高速化にとどまっていました。

ですが、数千、数万ファイルを読み込んだときは、期待した通り、CPU のコア数に応じた速さになってくれました。

成功です。

(src_files) 5 個
(data_dir) F:\project\data
(arelle_temp_dir) F:\project\arelle_temp
(log_dir) F:\project\log

(マルチプロセス) max_workers=5
引数リストを作ります。
(n_worker) 1
(5 + 0) // 5 = 1
src_files[0:1]
(log_mp_file) F:\project\log\log_mp_1.txt

(n_worker) 2
(5 + 1) // 5 = 1
src_files[1:2]
(log_mp_file) F:\project\log\log_mp_2.txt

(n_worker) 3
(5 + 2) // 5 = 1
src_files[2:3]
(log_mp_file) F:\project\log\log_mp_3.txt

(n_worker) 4
(5 + 3) // 5 = 1
src_files[3:4]
(log_mp_file) F:\project\log\log_mp_4.txt

(n_worker) 5
(5 + 4) // 5 = 1
src_files[4:5]
(log_mp_file) F:\project\log\log_mp_5.txt

(results) <class 'generator'>
[開始] n_worker 1 (PID 2624)
(arelle_dir) F:\project\arelle_temp\arelle_1
(cmd_arelle_dir) --xdgConfigHome=F:\project\arelle_temp\arelle_1
(sys.argv) ['F:\\project\\kabu\\app_main.py', '--xdgConfigHome=F:\\project\\arelle_temp\\arelle_1']
(ctrl.webCache.cacheDir) F:\project\arelle_temp\arelle_1\arelle\cache

[開始] n_worker 2 (PID 11688)
(arelle_dir) F:\project\arelle_temp\arelle_2
(cmd_arelle_dir) --xdgConfigHome=F:\project\arelle_temp\arelle_2
(sys.argv) ['F:\\project\\kabu\\app_main.py', '--xdgConfigHome=F:\\project\\arelle_temp\\arelle_2']
(ctrl.webCache.cacheDir) F:\project\arelle_temp\arelle_2\arelle\cache

[開始] n_worker 3 (PID 8900)
(arelle_dir) F:\project\arelle_temp\arelle_3
(cmd_arelle_dir) --xdgConfigHome=F:\project\arelle_temp\arelle_3
(sys.argv) ['F:\\project\\kabu\\app_main.py', '--xdgConfigHome=F:\\project\\arelle_temp\\arelle_3']
(ctrl.webCache.cacheDir) F:\project\arelle_temp\arelle_3\arelle\cache

[開始] n_worker 4 (PID 13844)
(arelle_dir) F:\project\arelle_temp\arelle_4
(cmd_arelle_dir) --xdgConfigHome=F:\project\arelle_temp\arelle_4
(sys.argv) ['F:\\project\\kabu\\app_main.py', '--xdgConfigHome=F:\\project\\arelle_temp\\arelle_4']
(ctrl.webCache.cacheDir) F:\project\arelle_temp\arelle_4\arelle\cache

[開始] n_worker 5 (PID 8932)
(arelle_dir) F:\project\arelle_temp\arelle_5
(cmd_arelle_dir) --xdgConfigHome=F:\project\arelle_temp\arelle_5
(sys.argv) ['F:\\project\\kabu\\app_main.py', '--xdgConfigHome=F:\\project\\arelle_temp\\arelle_5']
(ctrl.webCache.cacheDir) F:\project\arelle_temp\arelle_5\arelle\cache

1: F:\project\download\S100JKNH_1.zip
[終了] n_worker 1 (PID 2624)
2: F:\project\download\S1008JYI_1.zip
[終了] n_worker 2 (PID 11688)
3: F:\project\download\S100B922_1.zip
[終了] n_worker 3 (PID 8900)
4: F:\project\download\S100DZ6E_1.zip
[終了] n_worker 4 (PID 13844)
5: F:\project\download\S100GUI8_1.zip
[終了] n_worker 5 (PID 8932)
(経過時間) 0:00:10

結果を保存した SQLite DB のうちの1つです。

DB Browser for SQLite で内容を表示してみました。

ログファイルについては、以下のように、プロセスごとに独立して作成されました。

マルチプロセスでのロギングについて

余談です。『マルチプロセス』でのロギングについてです。

マルチスレッドではなくて、『マルチプロセス』でのロギングについてです。

『1つのファイル』に、全部のプロセスからログを書き込む場合は、コードが少し複雑になるようでした。

(Python) 複数のプロセスからの単一ファイルへのログ記録(Logging クックブック)

ですが、プロセスごとにログファイルを用意して、別々に書き込むぶんには、簡単にできました。

普通のシングルプロセスのときと同じように、ルートロガーでも名前付きロガーでもいいので、子プロセスの中でロガーを取得します。

あとは、ファイルハンドラを設定して、lg.debug()lg.info() で、ログを記録するだけでした。

『ロガーは、プロセス間で独立しているのか?』

自分が調べた限りでは、子プロセスのロガーは、メインプロセスのロガーとは独立していました。

子プロセスのルートロガーの lg.info() が、メインプロセスのファイルハンドラに記録されたりはしなかったです。

そして、子プロセスのルートロガーの lg.info() が、べつの子プロセスのファイルハンドラに記録されたりもしなかったです。

組み込み関数の id() で取得できるオブジェクト ID も、子プロセスのルートロガーと他のプロセスのルートロガーとでは、値が異なっていました。

子プロセスで、名前付きロガーを使用した場合でも、同様でした。

ストリームハンドラやファイルハンドラの設定についても、ほかのプロセスのハンドラの設定に、影響を与えている様子はありませんでした。

そもそも、プロセスが異なるので、『たぶんそうなんじゃないかな?』とは思っていました。

ですが、Python マニュアルを見ても、そのあたりのはっきりとした説明が、なかなか見つかりませんでした。

※ 自分が確認したかった『ロガーは、プロセスごとに、設定が分離している。』に近い内容の説明は、(Python) より手の込んだ multiprocessing の例(Logging クックブック)にありました。

結局、自分でいろいろ実験してみた結果、独立していると判断しました。

『もしかしたら、logging モジュールが、裏でなんらかのプロセスかん通信つうしんをしていて、ログの内容やロギング設定を伝搬でんぱんさせているんじゃないか?』

と、そんな気がかりもありました。

ですが、自分が実験した限りでは、そんなことはなかったです。

プロセス間については、自分でログの内容やロギング設定をやり取りする仕組みを書かない限り、ロガーは、プロセス間で独立して動いているように見えました。

そういうわけで、たぶんですが、Python のロギングシステムは、プロセス間で独立しています。

なので、プロセスごとに、別々のログファイルに記録する分には、シングルプロセスのときと同じコードで記録することができました。

コードも簡単になったので、特に必要がない限り、自分はよく、プロセスごとにログファイルを分けています。

ログの内容も、各プロセスの内容が混ざらずに済んだので、自分は見やすかったです。

どうしてもログファイルをまとめたいときは、メインプロセスの最後で、ログファイルを全部連結する処理を追加するのが簡単でした。

以上です

Arelle の関連記事です。

Arelle のインストール方法【XBRL 読み込みライブラリ】

XBRL から『勘定科目の金額や文章』を取得するコード例【Arelle】

XBRL から『勘定科目』と『リンクベース』の内容を取得するコード例【Arelle】

⇒ 【EDINET XBRL】『連結経営指標等』の Fact だけを取得する方法【Arelle】

マルチプロセス処理の関連記事です。

【Python】concurrent.futures の使い方【並列処理】

【Python】multiprocessing.Pool() の使い方【並列処理】

タイトルとURLをコピーしました