上場企業の業績グラフを描く【Python】

スポンサーリンク

投資判断のために、売上・利益や資産の推移をグラフにする方法です。「EDINET XBRLを読み込むPythonライブラリ【有報対応】」で読み取った決算データを使って、業績の時系列をグラフにします。そのPythonコードを紹介します。

 

業績グラフを描くときに難しいのが、勘定科目の抽出ですね。特に売上高のタグです。業種や会計基準によって、実に様々なタグが使われていました。あと、連結決算の金額をプロットするのか、個別財務諸表の金額をプロットするのか、といったところも選ばないとなりません。

 

さらに、「事業別の売上高」というのがあります。「連結財務諸表注記」に載っている「セグメント情報」の金額です。これらも売上高としてタグ付けされていましたので、混ざらないように区別して、プロット用のデータフレームを作っていきます。そのあたりのコード例も載せています。

決算データフレームを用意する

使用する決算データですが、pandas.concat(df_list, axis='index')で単純連結したものを使います。ある企業の全決算が、1つのデータフレームに入っている状態ですね。これをピックル(pickle)形式で保存したもの使います。

 

このデータフレームには、tag・context・日付・連結といった情報列がありますので、これらを使って必要な行を抽出していきます。日本語ラベルとかはなくて大丈夫です。

作図プログラムの流れ

パンダス(pandas)で抽出して、マットプロットリブ(matplotlib)でグラフにします。パイソンファイルですが、以下の2つを作ります。

  • fig_proc.py(フィグ・プロック)メイン関数とグラフ化
  • fig_df.py(フィグ・ディーエフ)勘定科目抽出

インポートとメイン関数

メイン関数は、開始前後の時刻を記録して実行時間を計るようにしています。経過した時間の秒数は、時分秒の形式に変換して表示しています。プロック関数が実際の処理を行っているところです。

"""fig_proc.py"""

from os.path import join as os_join
from os.path import basename as os_basename
from os.path import splitext as os_splitext
from os import listdir as os_listdir
import datetime
from datetime import timedelta
from math import log10 as math_log10
from math import modf as math_modf
from traceback import format_exc
from pandas import concat as pd_concat
from matplotlib.font_manager import FontProperties
from matplotlib.pyplot import subplots as plt_subplots
from matplotlib.pyplot import close as plt_close
from fig_df import get_dataframes


def main():
    """メイン"""
    t1 = datetime.datetime.now()

    proc()

    elapsed = (datetime.datetime.now() - t1)
    hms = get_elapsed_time_tuple(elapsed.seconds)

    print('処理時間 %d day(s) %02d:%02d:%02d' % (elapsed.days, *hms))
    print('end')
    return


def get_elapsed_time_tuple(elapesed_seconds):
    """経過秒数から (hours, minutes, seconds) のタプルを取得"""
    (hours, r2) = divmod(elapesed_seconds, 3600)
    (minutes, seconds) = divmod(r2, 60)
    return (hours, minutes, seconds)

データファイル・保存フォルダ設定

プロック関数です。抽出や作図に先立って、決算データのファイルパスとか保存フォルダを設定しています。

 

プロックはプロセス(process)のことです。メイン関数に次ぐ、一番上の階層を指すための汎用的な名前ですね。そんなイメージで私は使ってます。

 

そして、データ抽出と作図の関数をマルチプロセスで実行します。multiprocessingモジュールなどで実現するのが簡単なのですが、ここでは自作関数で実現しています。

def proc():
    """proc"""
    # フォルダ/ファイル設定
    src_dir = r'(決算データフレームのフォルダ)'
    plot_dir = '(グラフ保存フォルダ)'

    # 決算データフレームのファイルパス取得
    # src_files = [os_join(src_dir, x) for x in os_listdir(src_dir)]
    src_files = [r"***\E00000.pickle"]

    datas = n_process(
        function=mp_func,
        n_run=1,
        datas=src_files,
        args=(plot_dir, ),
        )
    return

 

以下は、自作のマルチプロセス関数です。もともとは、プロセスごとにログを取ったりするために作ったものです。

 

決算データのファイルリストを、N分割して渡しています。プロセスごとに共通の引数(保存フォルダパス)は、そのままリスト化して引数リストにくっつけて渡しています。

 

シングルプロセス処理のときは、単純に引数を渡しています。アスタリスクを付けた「*args」は、タプルをアンパックして渡すという意味です。

def n_process(function, n_run: int, datas: list, args: tuple):
    """マルチプロセス処理"""
    if n_run == 1:
        print('シングルプロセス処理 n_run: %d' % n_run)
        result_datas = function(None, 0, datas, *args)

    else:
        print('マルチプロセス処理 n_run: %d' % n_run)
        def get_indexes(len_datas):
            """均等にN分割するインデックスを生成"""
            # 商(quotient)を計算
            quotient = int(len_datas / n_run)

            # 余り(remainder)を計算
            remainder = len_datas - n_run * quotient

            # 余りを 1 ずつ、無くなるまで振り分ける。
            numbers = [
                quotient + 1 if n < remainder else quotient for n in range(n_run)]
            indexes = []
            t_end = 0
            for number in numbers:
                t_start = t_end
                t_end += number
                indexes.append((t_start, t_end))
            return indexes

        from multiprocessing import Process, Queue

        jobs = []
        for (number, i) in enumerate(get_indexes(len(datas)), start=1):
            print('%04d %04d' % (i[0], i[1]))

            queue = Queue()
            jobs.append((
                Process(
                    target=function,
                    args=[queue, number, datas[i[0]:i[1]]] + list(args)
                    ),
                queue,
                ))

        for job in jobs:
            job[0].start()

        result_datas = []
        for job in jobs:
            result_datas.extend(job[1].get())
    return result_datas

matplotlib 設定(フォント・線の太さ・点の大きさ)

日本語フォントの設定とか、グラフの書式の設定です。設定値をまとめるのに便利なので、クラスにしています。

 

日本語フォントは Google Noto Fonts の「Noto Sans CJK JP」です。ゼロに斜線が無いので、極小サイズにしても数字がつぶれにくいです。見易いので良く使っています。

Google Noto Fonts
https://www.google.com/get/noto/

 

matplotlibで日本語をうまく表示する方法は色々あるようですが、私はフォントプロパティで設定する方法を採用しています。

 

x軸の余白ですが、そのままプロットすると両端の決算がy軸と重なってしまうんですね。それを離すためのタイムデルタ(timedelta)を用意しています。

class PlotParameters:
    """描画設定クラス"""
    def __init__(self):
        # フォントファイル
        font_file = r'***\NotoSansMonoCJKjp-Regular.otf'

        # 減らすと狭くなる
        (self.fig_left, self.fig_bottom) = (0.094, 0.142)

        # 増やすと狭くなる
        (self.fig_right, self.fig_top) = (0.908, 0.92)

        self.fig_wspace = 0.2
        self.fig_hspace = 0.2

        # x軸余白
        self.left_space_days = timedelta(days=60)
        self.right_space_days = timedelta(days=120)

        # フォントサイズ
        fp_size_ticks = 10
        fp_size_legend = 14

        # フォントプロパティ
        self.fp_ticks = FontProperties(
            style='normal', weight='regular',
            size=fp_size_ticks, fname=font_file)

        self.fp_legend = FontProperties(
            style='normal', weight='regular',
            size=fp_size_legend, fname=font_file)

        # ライン書式
        self.linewidth = 2
        self.linealpha = 0.5
        
        # マーカー書式
        self.marker = 'o'
        self.markersize = 6

        # グリッド書式
        self.grid_linewidth = 0.5
        self.grid_linealpha = 1.0
        self.grid_linestyle = 'dotted'
        self.grid_color = 'k'

        # 凡例書式
        self.loc_legend = 2
        self.alpha_legend = 1.0
        return

マルチプロセスのメイン部分

決算データフレームを読み込んでグラフを作成する部分です。分割されたファイルリストを順に読み込んで、グラフ化していきます。

 

マルチプロセスで実行した場合ですが、最後はキュー(Queue: _q)に何かデータを追加しておきます。何も入れずに関数が終了すると、終わったことがメインプロセスに伝わりませんでした。

 

今回は個々のプロセスがグラフを保存して完了なので、戻り値がありません。とりあえず、プロセスの連番を入れておきました。

def mp_func(_q, n_id, src_files, data_dir):
    """マルチプロセスでグラフ作成"""
    try:
        # 描画設定を取得
        pp = PlotParameters()

        for (n, src_file) in enumerate(src_files, start=1):
            filename = os_basename(src_file)

            # データファイル読み込み & 勘定科目抽出
            datas = get_dataframes(src_file)

            # 抽出結果を保存(デバッグ用)
            for n, (name, (df, _)) in enumerate(datas.items(), start=1):
                if not df.empty:
                    df.to_excel(os_join(data_dir, '%02d_%s_%s.xlsx' % (
                        n, name, os_splitext(filename)[0]),
                        ))

            # 作図
            plot_data(datas, pp, data_dir, filename)
    except:
        print(format_exc())

    if _q is None:
        # シングルプロセス
        return ['n_id: %d' % n_id]
    else:
        # マルチプロセス(キューに何か入れておく)
        _q.put(['n_id: %d' % n_id])
    return

決算データ読み込み

決算データフレームのピックルを読み込んで、勘定科目ごとにデータフレームの辞書を作っているところです。

 

タグ名の正規表現が非常に長いので、その他の部分の関数名を u とか GI に短縮しています。正規表現は、データフレームの df.str.contains() メソッドに渡すためのものです。

 

プロットしたい勘定科目を書き連ねて、辞書に追加しています。米国基準(US-GAAP)の売上高タグもあるといいですね。

"""fig_df.py"""

from collections import OrderedDict
from pandas import read_pickle as pd_read_pickle
from pandas import isnull as pd_isnull

def get_dataframes(file):
    """データファイルを読み込む"""
    df = pd_read_pickle(file)
    df.reset_index(drop=True, inplace=True)

    # start_column = '開始日'
    end_column = '終了日'
    instant_column = '期末日'

    datas = OrderedDict()
    u = datas.update
    GI = get_items

    u({'売上収益(IFRS)': GI(df, '^(?:RevenueIFRSSummaryOfBusinessResults)$', True, end_column)})
    u({'売上収益(IFRS 個別)': GI(df, '^(?:RevenueIFRSSummaryOfBusinessResults)$', False, end_column)})
    u({'売上高': GI(df, '^(?:NetSales|NetSalesSummaryOfBusinessResults)$', True, end_column)})
    u({'売上高(個別)': GI(df, '^(?:NetSales|NetSalesSummaryOfBusinessResults|OperatingRevenue1)$', False, end_column)})

    u({'営業利益': GI(df, '^(?:OperatingIncome)$', True, end_column)})
    u({'営業利益(個別)': GI(df, '^(?:OperatingIncome)$', False, end_column)})

    u({'経常利益': GI(df, '^(?:OrdinaryIncome|OrdinaryIncomeLossSummaryOfBusinessResults)$', True, end_column)})
    u({'経常利益(個別)': GI(df, '^(?:OrdinaryIncome|OrdinaryIncomeLossSummaryOfBusinessResults)$', False, end_column)})

    u({'税引前利益(IFRS)': GI(df, '^(?:ProfitLossBeforeTaxIFRSSummaryOfBusinessResults)$', True, end_column)})
    u({'税引前利益(IFRS 個別)': GI(df, '^(?:ProfitLossBeforeTaxIFRSSummaryOfBusinessResults)$', False, end_column)})

    u({'当期利益(IFRS)': GI(df, '^(?:ProfitLossAttributableToOwnersOfParentIFRSSummaryOfBusinessResults)$', True, end_column)})
    u({'当期利益(IFRS 個別)': GI(df, '^(?:ProfitLossAttributableToOwnersOfParentIFRSSummaryOfBusinessResults)$', False, end_column)})
    u({'純利益': GI(df, '^(?:NetIncome|NetIncomeLossSummaryOfBusinessResults)$', True, end_column)})
    u({'純利益(個別)': GI(df, '^(?:NetIncome|NetIncomeLossSummaryOfBusinessResults)$', False, end_column)})

    u({'包括利益(IFRS)': GI(df, '^(?:ComprehensiveIncomeAttributableToOwnersOfParentIFRSSummaryOfBusinessResults)$', True, end_column)})
    u({'包括利益': GI(df, '^(?:ComprehensiveIncome|ComprehensiveIncomeSummaryOfBusinessResults)$', True, end_column)})
    return datas

 

ところで、df.str.contains()に正規表現を渡すと、以下のような警告が出る場合があります。その正規表現だと、マッチしたグループが取り出せるようになっているから、その文字列を取り出す時は str.extract() を使ってね、というメッセージのようです。

UserWarning: This pattern has match groups. To actually get the groups, use str.extract.

 

これは、丸カッコを使ったパターンを渡すと出てくる警告です。
'^(NetSales|NetSalesSummaryOfBusinessResults|OperatingRevenue1)$'

 

警告を消すには、(?:…) の形式にします。ハテナとコロンを追加するだけです。
'^(?:NetSales|NetSalesSummaryOfBusinessResults|OperatingRevenue1)$'

 

これは、丸カッコの中を取り込まない、という書き方です。マッチしたときに (?:…) の中の文字列を取り出す機能がなくなるので、str.extract() の提案が出なくなるようです。

狙った勘定科目のデータを抽出

ある企業の全決算を連結したデータフレームから、狙った勘定科目を取り出すところです。

 

「メンバーを含まないもの」というのは、連結財務諸表注記のセグメント別に載ってるようなところの金額を含めない、ということです。今はそこまで細かいところは使わないので、うっかり取り込んでしまわないように除外しています。

 

データフレームの [] カッコ内のチルダ「~」は、論理を反転する演算子です。pandasマニュアルのクックブック(Cookbook)のページで、インバース・オペレーター(inverse operator)として紹介されています。

 

(?<!…) は、否定後読み(ひてい あとよみ)アサーションという正規表現です。Pythonマニュアルの正規表現のページに載っています。

'_NonConsolidated)Member'で終わるコンテキストは、単に「非連結(個別)」を表しているだけなので、これをメンバー判定から除外するために使っています。

 

pd_isnull(a['値']).sum() は、Trueが1、Falseが0として足し算されることを利用した、Trueの数のカウント方法です。

 

データフレームの groupby() メソッドは、例えば、開示情報を証券コードごとにまとめたい、といった時に使うようなメソッドですが、重複した部分の選択処理を書く時にも役立ちます。

グループ・バイで抽出したデータフレームのインデックス(.index)を使って、元のデータフレームから重複を削除していきます。

 

サマリに載ってるような勘定科目は、5年間は参照され続けるので、たくさんの重複が生じます。ここでは、コンテキストに Current を含むものを優先的に残すような方針で抽出しています。

 

ほかにも、CurrentでもPriorでもいいから、とにかく最新の決算書に載っていたデータを残す、というような方針も良さそうですね。いろいろなやり方があると思います。

 

print文がたくさんありますが、重複行が減っていく様子を見るのに使っていたものです。最終的には、消すかコメントアウトしちゃいます。

"""fig_df.py の続き"""

def get_items(df, reg_tag: str, consolidated: bool, date_column):
    """勘定科目を取得"""
    print('df %d %s %s' % (len(df.index), reg_tag, consolidated))

    # タグで抽出
    a = df[df['tag'].str.contains(reg_tag)]
    print('a %d' % len(a.index))

    # 累計を選択(四半期でないものを抽出)
    if (date_column == '開始日') or (date_column == '終了日'):
        a = a[~(a['context'].str.contains('Quarter'))]
        print('a %d' % len(a.index))

    # メンバーを含まないものを抽出
    a = a[~(a['context'].str.contains('^.*?(?<!_NonConsolidated)Member$'))]
    print('a %d' % len(a.index))

    # 連結を抽出
    if consolidated:
        a = a[(a['連結']==consolidated)|(pd_isnull(a['連結'])==True)]
    else:
        a = a[a['連結']==consolidated]
    print('a %d' % len(a.index))

    # 値が無いものを除外
    if pd_isnull(a['値']).sum() > 0:
        a.dropna(axis=0, subset=('値', ), inplace=True)
        print('a %d' % len(a.index))

    # 日付が重複している行の処理
    for (name, tdf) in a.groupby(date_column):
        if len(tdf.index) == 1:
            # 重複なし(スキップ)
            continue

        print('%s shape: %s' % (str(name), str(tdf.shape)))

        # 'Current'を含む行の数を取得
        bool_current_sum = tdf['context'].str.contains('Current').sum()

        if bool_current_sum == 1:
            # Currentを残す(Currentでないものを削除)
            a.drop(
                index=tdf[~(tdf['context'].str.contains('Current'))].index,
                inplace=True,
                )

        elif bool_current_sum > 1:
            # Currentを抽出
            t = tdf[tdf['context'].str.contains('Current')]

            # 提出日が最新のもの抽出
            t = t[t['提出日'] == t['提出日'].max()]

            if len(t.index) > 1:
                if t['tag'].str.contains('Summary').sum() > 0:
                    # Summaryでないものを抽出
                    t = t[~(t['tag'].str.contains('Summary'))]

            if len(t.index) != 1:
                # 2個以上の場合は一応知らせる
                print('注意 len(t.index) %d' % len(t.index))
                # データが残っていないと t.index[0] が失敗するので止める
                assert len(t.index) != 0

            # 合致しなかったものを削除
            a.drop(index=tdf[tdf.index != t.index[0]].index, inplace=True)

        else:
            # 提出日が最新のものを残す(最新でないものを削除)
            a.drop(
                index=tdf[tdf['提出日'] != tdf['提出日'].max()].index,
                inplace=True,
                )

        print('a %d' % len(a.index))
    return (a, date_column)

プロット & 書式設定

matplotlibで勘定科目のプロットと書式設定をしているところです。

辞書から勘定科目の時系列データフレームを取り出して、プロットしています。

日本語フォントは、フォントプロパティ(fontproperties)で指定しています。

def plot_data(datas, pp, data_dir, filename):
    """作図"""
    (fig, ax) = plt_subplots(
        nrows=1, ncols=1, sharex=False, sharey=False,
        squeeze=True, subplot_kw=None, gridspec_kw=None,
        )

    try:
        date_infos = []

        # 勘定科目の名前・データフレーム・日付列を取得
        for (name, (df, date_column)) in datas.items():
            if df.empty:
                continue

            # 日付列でソート(昇順)
            df.sort_values(by=date_column, ascending=True, inplace=True)

            # プロット
            ax.plot(
                df[date_column],
                df['値'],
                label=name,
                linewidth=pp.linewidth,
                alpha=pp.linealpha,
                marker=pp.marker,
                markersize=pp.markersize,
                )

            # x軸の刻みとラベル設定のための日付データフレーム作成
            df = df[['第N期', date_column]]
            df = df.rename(
                {date_column: '日付'}, axis='columns', copy=True)
            date_infos.append(df)

        # 日付データフレームを連結 & 重複削除 & ソート
        date_df = pd_concat(
            date_infos, axis=0, ignore_index=True, copy=True)
        date_df = date_df[~(date_df['日付'].duplicated())]
        date_df = date_df.sort_values(by='日付', ascending=True)

        if not date_df.empty:
            # 日付列のシリーズ(series)を取得
            date_s = date_df['日付']

            # x軸に刻みを設定
            ax.set_xticks(date_s.dt.to_pydatetime())

            # x軸の最小値と最大値を広げる
            ax.set_xlim(
                left=date_s.iloc[0] - pp.left_space_days,
                right=date_s.iloc[-1] + pp.right_space_days,
                )

            # x軸の刻みラベルを生成 & 設定
            x_tick_labels = date_df.apply(
                lambda x: '{qn}\n{d:%y}\n{d.month:2}\n{d.day:2}'.format(
                    qn='FY' if x['第N期'] == 0 else 'Q%d' % x['第N期'],
                    d=x['日付'],
                    ),
                axis=1,
                )
            ax.set_xticklabels(
                x_tick_labels.values, fontproperties=pp.fp_ticks)

            # y軸の刻みラベルを生成 & 設定
            ax.set_yticklabels(
                (format_num_jp(x) for x in ax.get_yticks()),
                fontproperties=pp.fp_ticks,
                )

            # 第2Y軸を追加
            axtw = ax.twinx()

            # 第2Y軸に刻み・ラベル・上限下限をコピー
            axtw.set_yticks(ax.get_yticks())
            axtw.set_yticklabels(
                ax.get_yticklabels(), fontproperties=pp.fp_ticks)
            axtw.set_ylim(*(ax.get_ylim()))

            # 四半期終了日ごとに縦のグリッド線を引く
            for (t_index, (t_qn, t_date)) in date_df.iterrows():
                if t_qn == 0:
                    ax.axvline(
                        x=t_date,
                        color=pp.grid_color,
                        linewidth=pp.grid_linewidth,
                        alpha=pp.grid_linealpha,
                        linestyle=pp.grid_linestyle,
                        )

            # 横のグリッド線も追加
            ax.grid(
                b=True, which='major', axis='y',
                color=pp.grid_color,
                linewidth=pp.grid_linewidth,
                alpha=pp.grid_linealpha,
                linestyle=pp.grid_linestyle,
                )

            # 凡例の書式設定
            set_legend_format(
                ax, pp.loc_legend, pp.fp_legend, pp.alpha_legend)

        # グラフの余白を設定 left, bottom, right, top, wspace, hspace
        fig.subplots_adjust(
            left=pp.fig_left, right=pp.fig_right,
            top=pp.fig_top, bottom=pp.fig_bottom,
            wspace=pp.fig_wspace, hspace=pp.fig_hspace,
            )
        
        # 保存
        fig.savefig(
            os_join(data_dir, '%s.png' % os_splitext(filename)[0]))
    finally:
        # 現在のfigureを閉じる
        plt_close()
    return

凡例の書式設定

基本的には ax.legend() の設定だけでOKです。私は透明度の設定を追加しています。

 

凡例(はんれい)テキストの透明度は、設定するためのメソッドが見つかりませんでした。ですが、変数としては用意されていたので(._alpha)、設定自体は可能でした。

def set_legend_format(ax, loc, fp, alpha):
    """凡例の書式設定"""
    ax_legend = ax.legend(
        loc=loc, frameon=True, fancybox=True, numpoints=1, prop=fp)

    # まわりを囲っているフレームの透明度
    ax_legend.get_frame().set_alpha(0.0)

    # 凡例テキストの透明度(データ系列ごとに設定します)
    for n_text in range(len(ax_legend.texts)):
        ax_legend.texts[n_text]._alpha = alpha
    return ax

金額を億兆単位にする

百万円単位の金額よりも、1億円・1兆円単位で表示したかったので、変換します。単純に変換すると、0.99999…億円 となる場合がありましたので、fomat_int_float_auto()も作りました。

def format_num_jp(x):
    """桁数に応じて単位を変える(日本用)"""
    if x == 0:
        return 0

    t = math_log10(abs(x))
    if t >= 16:
        return '%s京' % fomat_int_float_auto(x, 10000000000000000)
    if t >= 12:
        return '%s兆' % fomat_int_float_auto(x, 1000000000000)
    elif t >= 8:
        return '%s億' % fomat_int_float_auto(x, 100000000)
    elif t >= 4:
        return '%s万' % fomat_int_float_auto(x, 10000)
    else:
        return '%s' % fomat_int_float_auto(x, 1)

目盛の小数「0.99999…」を「1.0」にする

最初に、目盛りの値を1万・1億・1兆などで割ります。その結果の小数部分に応じて、小数点以下を1ケタに抑えたり、整数にしたりしています。

 

math.modf() は、(小数部分, 整数部分) のタプルを返す関数です。[0]で小数部分を取得しています。Pythonマニュアル math.modf(x)

def fomat_int_float_auto(x, denominator):
    """整数と小数を自動切り替える"""
    t = x / denominator
    if 0.05 < abs(math_modf(t)[0]):
        return '%.1f' % t
    else:
        return '%d' % t

 

これで、「1億、1.5億、2億、2.5億」といった表示が可能になりました。

なお、1億に満たない目盛りがあると、「0、5000万、1億、1.5億、2億、2.5億」といった表示になります。

万能というわけではなく、問題もあります。グラフの金額によっては、「1.5億、1.5億、1.5億、1.5億」のように同じ金額が並んでしまう場合があります。

これは、数値のフォーマット '%.1f' で小数第2位以下を消したり、'%d' で小数を消したりしているために発生します。

グラフの目盛領域に余裕があるなら、小数点以下の表示桁数を増やせば改善します。

グラフ画像

以下のようなグラフができあがります。売上高に関するグラフの例です。

業績グラフのプロット例

タイトルとURLをコピーしました