XBRLファイルからデータを読み込む詳細なXBRLパーサーのコード例です。
詳細なXBRLパーサーの作り方で作っていたPythonプログラムのコードになります。
ある程度動きましたので、現時点のコードを掲載します。
コード量は、約2200行でした。
Pythonコードの解説
準備や動作の大まかなところの説明です。
タクソノミのzipファイルを使います
このパーサーは、EDINETのタクソノミとTDnetのタクソノミのzipファイルを使います。
ウェブ接続なしでXBRLが読み込めるように、完全ローカル動作を目指しました。
タクソノミをまとめたzipファイルは、金融庁のEDINETと東京証券取引所のTDnetで、それぞれ公開されています。
zipはローカルフォルダに保存しておきます。これは手作業になります。
XBRLパーサーを使うときに、タクソノミのzipを保存したフォルダを指定して、XBRLの読み込みに利用します。
EDINETに関しては、完全ローカル動作の試みはうまくいきました。
TDnetに関しては、どうもタクソノミのzipに不足があるようでした。
TDnetの古いXBRLに関しては、Web接続でタクソノミファイルを適宜(てきぎ)取得する仕組みが必要と思われます。
XBRLのインスタンスファイルを探し出します
このパーサーは、XBRLのzipや展開したフォルダの中から、XBRLのインスタンスファイルを探し出します。
決算書のXBRLでは、インスタンスファイルがたいてい2個以上ありました。
PublicDoc、AuditDoc、Summary、Attachment など、いろいろなフォルダがありましたが、1つのフォルダに2個以上のインスタンスファイルがあったりもしました。
このパーサーは、それらのインスタンスファイルをすべて探し出して、読み込みます。
ファイルの判定は、中身を読み込んで、どんなタグがあるかで判定しました。
データをXBRLインスタンスとDTSの2つにまとめます
このパーサーは、XBRLインスタンスのデータと発見可能なタクソノミ集合(DTS)のデータにまとめます。
XBRLインスタンスとは、金額や本文が載っているファイルです。
発見可能なタクソノミ集合 (DTS: Discoverable Taxonomy Set) とは、スキーマファイルや各種リンクベースファイル(表示、定義、計算、名称、ジェネリックラベル、参照リンク)などを集めたものです。
このパーサーでXBRLのzipを読み込むと、
- 決算書のxbrlとdts
- 監査報告書のxbrlとdts
のような辞書が返ります。
Python辞書の中に、XBRLとDTSのペアを、XBRLインスタンスの数だけ作ります。
あとは、XBRLインスタンスとDTSの2つのデータを参照しながら、自身がほしいと思う形式の表に組み立てます。
XBRLの読み込み例
詳細なXBRLパーサーを使った読み込み例です。
メイン関数
タクソノミフォルダを読み込んでから、XBRLのzipファイルを読み込んでいます。
XBRL_ZIPというPythonクラスが、zipから読み込むためのクラスです。
読み込み結果のスクリーンショットも掲載します。
def main():
"""詳細なXBRLパーサーを使うメイン関数"""
# XBRLファイルを指定
xbrl_file = r'*****\081220180613463090.zip'
# タクソノミの保存フォルダを指定
taxonomy_dirs = (
r'*****\tdnet_taxonomies',
r'*****\edinet_taxonomies',
)
# タクソノミを読み込む
taxonomy = Taxonomy(taxonomy_dirs)
# タクソノミを使ってXBRLファイル(zip)を読み込む
xbrl_datas = XBRL_ZIP(taxonomy, xbrl_file)
return
読み込み結果
読み込み結果のスクリーンショットです(Visual Studio Codeのウォッチ式)。
決算短信のXBRLを読み込みました。
XBRLのインスタンスごとに、「xbrl」と「dts」の2つ変数ができています。
XBRLパーサーのコード例
タクソノミを使って、XBRLファイルを読み込みます。
コード量は約2200行でした。
実行順序は、Taxonomyクラスを使ってから
⇒ XBRL_ZIPクラス or XBRL_FOLDERクラス
⇒ _XBRL_INSTANCEクラス or _Inline_XBRL_INSTANCEクラス
⇒ _XBRL_DISCOVERABLE_TAXONOMY_SETクラス です。
XBRL_ZIPとXBRL_FOLDERは、_XBRL_BASEクラスを継承しています。
また、_XBRL_INSTANCEと_Inline_XBRL_INSTANCEは、_XBRL_INSTANCE_BASEクラスを継承しています。
共通部分をまとめたクラスを作って、それを継承することで、コードが短くなるようにしました。
※ コードをクリックしてからキーボードの矢印キー ← → で横スクロールできます。
"""zipやフォルダから、XBRLファイルを読み込むプログラムです。"""
from os.path import join as os_join
from os.path import basename as os_basename
from os.path import split as os_split
from os import sep as os_sep
from os.path import splitext as os_splitext
from os.path import isfile as os_isfile
from os.path import isdir as os_isdir
from os import walk as os_walk
from os import makedirs as os_makedirs
from os.path import dirname as os_dirname
from urllib.parse import urlparse
from urllib.parse import urljoin
from collections import OrderedDict
import datetime
datetime_datetime = datetime.datetime
from zipfile import ZipFile
from bz2 import compress as bz2_compress
from bz2 import decompress as bz2_decompress
from re import compile as re_compile
from re import sub as re_sub
from operator import itemgetter
import lxml.etree
fromstring = lxml.etree.fromstring
tostring = lxml.etree.tostring
XMLParser = lxml.etree.XMLParser
XMLSyntaxError = lxml.etree.XMLSyntaxError
lxml_etree_Element = lxml.etree._Element
# ロガー設定
import logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
# logger.propagate = False
class XBRLParseError(Exception):
"""XBRLの解析中にエラーが発生したことを知らせる例外クラス"""
# Exceptionクラスを継承しただけの簡単な例外クラスです
# 使い方
# raise XBRLParseError('エラーメッセージ')
pass
class TaxonomyNotFoundError(Exception):
"""参照したタクソノミが見つからなかったことを知らせる例外クラス"""
pass
class _DATA:
"""データ用クラス"""
# 空のクラスです。
# いろいろなデータをまとめるために使っています。
# インスタンスを生成してから使います。
# インスタンスに属性をつけて代入すると、インスタンス変数が追加されます。
# (補足)
# クラス名に属性をつけて代入すると、クラス変数が追加されます。
# クラス変数は、すべてのインスタンスに自動で追加されます。
# クラス変数は使わない方針で作りました。
pass
def _read_xml_data(data):
"""パースエラーが発生したら、エラー回復モードで再試行する関数。"""
try:
# 最初の読み込み
z = fromstring(data, XMLParser(recover=False, huge_tree=True))
except XMLSyntaxError:
# このエラーは読み込めるかもしれないので、
# 後続のエラー回復モードで再試行する。
pass
except Exception:
# おそらく、エラー回復モードでは対応できないエラー。
# (raise文で例外を再送出する)
raise
else:
# 正常終了
return z
# エラー回復モードで再試行
return fromstring(data, XMLParser(recover=True, huge_tree=True))
def _get_abs_path(current, target):
"""現在のファイルパスを使って、相対パスを絶対パスに変換する関数。"""
if current:
if target.startswith('http'):
# httpで始まるパスは絶対パス
return target
else:
# 相対パスを絶対パスに変換する
return urljoin(current, target)
return target
class Taxonomy:
"""タクソノミのファイル群を管理するクラス"""
def __init__(self, taxonomy_dirs):
# タクソノミのファイル群を保存したタクソノミフォルダ
self.taxonomy_dirs = taxonomy_dirs
# タクソノミのファイルデータを入れる辞書
self.files = OrderedDict()
# タクソノミのアーカイブ情報を入れる辞書
self.archive_infos = OrderedDict()
# タクソノミフォルダを読み込む
self.archive_infos[-1] = ('zip', 'folder', 'file', 'infolist')
n_archive_info = 0
for taxonomy_dir in self.taxonomy_dirs:
n_archive_info += self._get_files(taxonomy_dir, self.files, self.archive_infos, n_archive_info)
return
@staticmethod
def _get_files(taxonomy_dir, files, archive_infos, n_archive_info):
"""タクソノミフォルダを読み込む関数"""
# フォルダの階層をたどって、ファイルをすべて列挙する。
for w in os_walk(taxonomy_dir):
# w: [0]root(使う) [1]dirs(つかわない) [2]filenames(使う)
# フォルダの中にあるファイルを列挙する
for filename in w[2]:
# zipファイルでなければスキップ
if not filename.lower().endswith('.zip'):
continue
# ファイルパスを生成
zip_file = os_join(w[0], filename)
# 拡張子を除いたzipファイル名を取得
# (辞書のキーからzipファイル名の部分を除去するために使用)
name_without_ext = os_splitext(filename)[0]
# zipファイルを開く
with ZipFile(zip_file, 'r') as z:
# 集合を使ってフォルダ数とファイル数をカウントする
folder_set = set()
file_set = set()
for i in z.infolist():
# (1/4) フォルダとファイルを集合に追加して数をカウント
# zip内での名前をフォルダパスとファイル名に分割
(t_dirpath, t_name) = os_split(i.filename)
# 区切り文字で分割して、サブフォルダパスを作り、集合に追加する。
t_sub_folder = ''
for t_part in t_dirpath.split('/'):
t_sub_folder += t_part + '/'
# フォルダパスを集合に追加
folder_set.add(t_sub_folder)
# ファイル名が空文字列でなければ集合に追加
if t_name:
file_set.add(i.filename)
# (2/4) 不要なフォルダとファイルをスキップ
# フォルダ(ディレクトリ)をスキップ
if i.filename[-1] == '/':
continue
# 圧縮前のサイズが 0 のファイルをスキップ
if not i.file_size:
logger.debug('skipped 圧縮前のサイズが 0 のファイル %s' % i.filename)
continue
# エントリーポイントのファイルをスキップ
if RE_TAXONOMY_ENTRY_POINT_XSD_MATCH(os_basename(i.filename)):
logger.debug('skipped %s' % os_basename(i.filename))
continue
# (3/4) ファイルを読み込んで辞書に追加
# ファイルパスをキーにする
key = i.filename
# キーからzipファイル名の階層を削除する
key = re_sub(r'^%s[\\/]' % name_without_ext, '', key, count=1)
# データ用クラスのインスタンスを作る
d = _DATA()
# アーカイブ名に割り当てた連番を取得
d.archive = n_archive_info
# データの更新日をdatetime型に変換して取得
# アスタリスク(*)はアンパックを表す記号です
d.date_time = datetime_datetime(*(i.date_time))
# キーの存在確認
if key in files:
# 更新日を比較して、新しいほうを使う。
if d.date_time > files[key].date_time:
# 新しいほうで上書き
pass
else:
# スキップ
continue
else:
# 新規追加
pass
# zipからデータを読み込む(展開する)
bin_data = z.read(i.filename)
if bin_data is None:
raise XBRLParseError('xmlの読み込み結果が無効 %s' % i.filename)
# 読み込んだデータを bz2方式 で圧縮する
d.data = bz2_compress(bin_data, compresslevel=1)
if d.data is None:
raise XBRLParseError('xmlの圧縮結果が無効 %s' % i.filename)
# 辞書に追加する
files[key] = d
# (4/4) アーカイブの情報を辞書に追加
# 連番をキーにして、アーカイブの情報を辞書に追加。
archive_infos[n_archive_info] = (filename, len(folder_set), len(file_set), len(z.infolist()))
n_archive_info += 1
logger.debug('{n} {filename} n_folder:{n_folder} n_file:{n_file} i:{infolist} files:{files}'.format(
n=n_archive_info,
filename=filename,
n_folder=len(folder_set),
n_file=len(file_set),
infolist=len(z.infolist()),
files=len(files),
))
return n_archive_info
def _get_infos(self):
"""タクソノミの読み込み結果をリストに変換する関数 (デバッグ用)"""
datas = []
datas.append(['key (filepath)', 'date_time', 'archive (number)'])
for x in self.files.items():
datas.append([x[0], x[1].date_time, x[1].archive])
return datas
def _get_data(self, current_file, target_file):
"""タクソノミのデータを取得する関数"""
# 絶対パスに変換する
abs_path = urljoin(current_file, target_file)
# 絶対パスから辞書のキーを取得
key = self._get_key(abs_path)
# 辞書からデータを取得
if key in self.files:
v = self.files[key].data
else:
raise TaxonomyNotFoundError('タクソノミにファイルがありません %s %s' % (key, target_file))
# データがバイト列(パース前)なら、圧縮を解いてパースする。
# (タクソノミのファイルは全てxmlであると仮定しています)
if isinstance(v, bytes):
# 圧縮を解く
v = bz2_decompress(v)
# データをパースする
data = _read_xml_data(v)
if data is None:
raise XBRLParseError('xmlの読み込み結果が無効 %s %s' % (current_file, target_file))
# 辞書の内容を上書きする
self.files[key].data = data
return data
else:
return v
@staticmethod
def _get_key(abs_path):
"""絶対パスから辞書のキーを取得する関数"""
# urlの文字列を6つの構成要素に分割する
u = urlparse(abs_path)
# タクソノミのキーを取得する
# (先頭のスラッシュは削除します)
return u.path.lstrip('/')
def _update(self, abs_path, data):
"""タクソノミのデータを更新する関数"""
if data is None:
raise XBRLParseError('xmlの読み込み結果が無効 %s' % abs_path)
# 絶対パスから辞書のキーを取得
key = self._get_key(abs_path)
if key in self.files:
# データを更新
self.files[key].data = data
return
############################################################
# 名前空間を定義します
############################################################
# EDINET & TDnet のマニフェストを表す名前空間
NS_MANIFEST = 'http://disclosure.edinet-fsa.go.jp/2013/manifest'
# XMLを表す名前空間
NS_XML = 'http://www.w3.org/XML/1998/namespace'
# XBRLインスタンスを表す名前空間
NS_XBRLI = 'http://www.xbrl.org/2003/instance'
# XHTMLを表す名前空間
NS_XHTML = "http://www.w3.org/1999/xhtml"
# インラインXBRLを表す名前空間
NS_IX = 'http://www.xbrl.org/2008/inlineXBRL'
# XBRLでファイルの参照を表す名前空間
NS_LINK = 'http://www.xbrl.org/2003/linkbase'
# XMLスキーマのインスタンスを表す名前空間
NS_XSI = 'http://www.w3.org/2001/XMLSchema-instance'
# explicitMemberタグで使われている名前空間
NS_XBRLDI = 'http://xbrl.org/2006/xbrldi'
# エックスリンクを表す名前空間
NS_XLINK ='http://www.w3.org/1999/xlink'
# XMLスキーマ定義言語 (XSD: XML Schema Definition Language) を表す名前空間
NS_XSD = 'http://www.w3.org/2001/XMLSchema'
# ジェネリックラベルリンクで使われている名前空間
NS_GEN = 'http://xbrl.org/2008/generic'
NS_GEN_LABEL = 'http://xbrl.org/2008/label'
############################################################
# タグ名と属性名を定義します
############################################################
TAG_MANIFEST = '{%s}manifest' % NS_MANIFEST
XPATH_MANIFEST_INSTANCE = './/{%s}instance' % NS_MANIFEST
XPATH_MANIFEST_TOC_COMPOSITION_TITLE = './/{%s}tocComposition/{%s}title' % (NS_MANIFEST, NS_MANIFEST)
XPATH_MANIFEST_IXBRL = './/{%s}ixbrl' % NS_MANIFEST
TAG_XBRLI = '{%s}xbrl' % NS_XBRLI
TAG_XHTML = '{%s}html' % NS_XHTML
TAG_SCHEMA = '{%s}schema' % NS_XSD
TAG_LINKBASE = '{%s}linkbase' % NS_LINK
XPATH_SCHEMA_REF = './/{%s}schemaRef' % NS_LINK
XPATH_CONTEXT = './/{%s}context' % NS_XBRLI
XPATH_ENTITY = './/{%s}entity' % NS_XBRLI
XPATH_IDENTIFIER = './/{%s}identifier' % NS_XBRLI
XPATH_PERIOD = './/{%s}period' % NS_XBRLI
XPATH_START_DATE = './/{%s}startDate' % NS_XBRLI
XPATH_END_DATE = './/{%s}endDate' % NS_XBRLI
XPATH_INSTANT = './/{%s}instant' % NS_XBRLI
XPATH_SCENARIO = './/{%s}scenario' % NS_XBRLI
XPATH_EXPLICIT_MEMBER = './/{%s}explicitMember' % NS_XBRLDI
XPATH_UNIT = './/{%s}unit' % NS_XBRLI
TAG_MEASURE = '{%s}measure' % NS_XBRLI
XPATH_MEASURE = './/%s' % TAG_MEASURE
TAG_DIVIDE = '{%s}divide' % NS_XBRLI
TAG_UNIT_NUMERATOR = '{%s}unitNumerator' % NS_XBRLI
TAG_UNIT_DENOMINATOR = '{%s}unitDenominator' % NS_XBRLI
XPATH_INCLUDE = './/{%s}include' % NS_XSD
XPATH_IMPORT = './/{%s}import' % NS_XSD
XPATH_LINKBASE_REF = './/{%s}linkbaseRef' % NS_LINK
XPATH_LINKBASE = './/{%s}linkbase' % NS_LINK
XPATH_ROLE_TYPE = './/{%s}roleType' % NS_LINK
XPATH_ELEMENT = './/{%s}element' % NS_XSD
XPATH_ARCROLE_TYPE = './/{%s}arcroleType' % NS_LINK
XPATH_TYPE_EXTENDED = './/{%s}*[@{%s}type="extended"]' % (NS_LINK, NS_XLINK)
XPATH_GEN_TYPE_EXTENDED = './/{%s}*[@{%s}type="extended"]' % (NS_GEN, NS_XLINK)
XPATH_TYPE_LOCATOR = './/{%s}loc[@{%s}type="locator"]' % (NS_LINK, NS_XLINK)
XPATH_TYPE_RESOURCE = './/{%s}*[@{%s}type="resource"]' % (NS_LINK, NS_XLINK)
XPATH_GEN_TYPE_RESOURCE = './/{%s}*[@{%s}type="resource"]' % (NS_GEN_LABEL, NS_XLINK)
XPATH_TYPE_ARC = './/{%s}*[@{%s}type="arc"]' % (NS_LINK, NS_XLINK)
XPATH_GEN_TYPE_ARC = './/{%s}*[@{%s}type="arc"]' % (NS_GEN, NS_XLINK)
TAG_LABEL_LINK = '{%s}labelLink' % NS_LINK
TAG_GEN_LINK = '{%s}link' % NS_GEN
TAG_PRESENTATION_LINK = '{%s}presentationLink' % NS_LINK
TAG_CALCULATION_LINK = '{%s}calculationLink' % NS_LINK
TAG_DEFINITION_LINK = '{%s}definitionLink' % NS_LINK
TAG_REFERENCE_LINK = '{%s}referenceLink' % NS_LINK
XPATH_ROLE_REF = './/{%s}roleRef' % NS_LINK
XPATH_ARCROLE_REF = './/{%s}arcroleRef' % NS_LINK
ATTRIB_NIL = '{%s}nil' % NS_XSI
ATTRIB_HREF = '{%s}href' % NS_XLINK
ATTRIB_ROLE = '{%s}role' % NS_XLINK
ATTRIB_LABEL = '{%s}label' % NS_XLINK
ATTRIB_ARCROLE = '{%s}arcrole' % NS_XLINK
ATTRIB_FROM = '{%s}from' % NS_XLINK
ATTRIB_TO = '{%s}to' % NS_XLINK
ATTRIB_LANG = '{%s}lang' % NS_XML
ATTRIB_BALANCE = '{%s}balance' % NS_XBRLI
ATTRIB_PERIOD_TYPE = '{%s}periodType' % NS_XBRLI
############################################################
# ロールの文字列を定義します
############################################################
# 計算リンクを表すロール
ROLE_CALCULATION_LINKBASE_REF = 'http://www.xbrl.org/2003/role/calculationLinkbaseRef'
# 定義リンクを表すロール
ROLE_DEFINITION_LINKBASE_REF = 'http://www.xbrl.org/2003/role/definitionLinkbaseRef'
# 表示リンクを表すロール
ROLE_PRESENTATION_LINKBASE_REF = 'http://www.xbrl.org/2003/role/presentationLinkbaseRef'
# 名称リンクを表すロール
ROLE_LABEL_LINKBASE_REF = 'http://www.xbrl.org/2003/role/labelLinkbaseRef'
# リンクロール (計算リンク、定義リンク、表示リンク、名称リンク 用)
ROLE_2003_LINK = 'http://www.xbrl.org/2003/role/link'
# リンクロール (ジェネリックラベルリンク 用)
ROLE_2008_LINK = 'http://www.xbrl.org/2008/role/link'
# ラベルロール (計算リンク、定義リンク、表示リンク、名称リンク 用)
ROLE_2003_LABEL = 'http://www.xbrl.org/2003/role/label'
# ラベルロール (ジェネリックラベルリンク 用)
ROLE_2008_LABEL = 'http://www.xbrl.org/2008/role/label'
# 科目を表すタグ名などの親子関係を表すロール
ARCROLE_PARENT_CHILD = 'http://www.xbrl.org/2003/arcrole/parent-child'
# 汎化から特化の関係を表すロール
# ARCROLE_GENERAL_SPECIAL = 'http://www.xbrl.org/2003/arcrole/general-special'
# 総和をとることを表すロール
# ARCROLE_SUMMATION_ITEM = 'http://www.xbrl.org/2003/arcrole/summation-item'
# 科目を表すタグ名などを、人が読みやすい名称に結び付けることを表すロール。
ARCROLE_CONCEPT_LABEL = 'http://www.xbrl.org/2003/arcrole/concept-label'
# ジェネリックラベルリンクでロケーターとリソースを結びつけるときに使っているロール
ARCROLE_ELEMENT_LABEL = 'http://xbrl.org/arcrole/2008/element-label'
############################################################
# ファイル名を表す正規表現を定義します
############################################################
# XBRLインスタンスで良く使われている拡張子(正規表現)
RE_XBRL_INSTANCE_MATCH = re_compile(r'^.*?\.(?:xbrl|xml|html?)$').match
############################################################
# スキップするファイル名を定義します
############################################################
# タクソノミのzipでスキップするファイル名パターンを定義
RE_TAXONOMY_ENTRY_POINT_XSD_MATCH = re_compile(r'^entryPoint.*?\.xsd$').match
# スキップする拡張子を集合(set)で定義
SKIP_EXTS_SET = {'.png', '.gif', '.jpg', '.jpeg', '.csv'}
# スキップするファイル名を集合(set)で定義
SKIP_FILENAMES_SET = {'index.txt'}
# 未対応 (MITAIOU) なのでスキップするファイル名のパターンを定義
RE_MITAIOU_SKIP_FILENAME_MATCH = re_compile(r'(?:^ifrs.*?\.xbrl$|^.*?-information\.xml$)').match
class FILE_PRIORITY:
"""XBRLを読み込むときのファイルの優先度を保持するクラス"""
def __init__(self, priority, filetype, path):
"""クラスのインスタンスを作るときに実行される関数"""
self.priority = priority
self.filetype = filetype
self.path = path
# ファイルタイプの略称を定義
FILETYPE_MANIFEST = 'm'
FILETYPE_XBRL = 'x'
FILETYPE_INLINE_XBRL = 'ix'
class _XBRL_BASE:
"""XBRLのファイル群を読み込む基底クラス"""
# 「ZIPから読み込むクラス XBRL_ZIP」と
# 「フォルダから読み込むクラス XBRL_FOLDER」の
# 共通部分をまとめました。
def __init__(self, taxonomy, path, accession_number=None):
"""クラスのインスタンスを作るときに実行される関数"""
# タクソノミのファイル群を管理するクラスのインスタンス
self.taxonomy = taxonomy
# XBRLインスタンスが入っている「zip」または「フォルダ」のパス
self.path = path
# 「zip」または「フォルダ」の中にあるファイルデータを取得して辞書に追加する
self.files = self._get_file_datas(self.path)
# ファイルデータの辞書からXBRLインスタンスを見つけてリストに追加する
self.xbrl_paths = self._find_xbrl_paths(self.files)
# XBRLインスタンスを読み込んで辞書に追加する
self.xbrls = OrderedDict()
for (xbrl_type, xbrl_path) in self.xbrl_paths:
try:
# XBRLインスタンスを読み込む
if xbrl_type == FILETYPE_XBRL:
self.xbrls[xbrl_path] = self._read_xbrl_instance_and_dts(xbrl_type, xbrl_path)
elif xbrl_type == FILETYPE_INLINE_XBRL:
# インラインXBRLのファイルは複数あるので、最初のファイル[0]をキーにする。
self.xbrls[xbrl_path[0]] = self._read_xbrl_instance_and_dts(xbrl_type, xbrl_path)
except TaxonomyNotFoundError:
# 進捗を表示する(例外の再送出はしない)
logger.debug('%s %s %s' % (os_basename(xbrl_path), xbrl_type, accession_number))
# 次のXBRLインスタンスの読み込みに移る。
except Exception:
# 進捗を表示して例外を再送出する
logger.debug('%s %s %s' % (os_basename(xbrl_path), xbrl_type, accession_number))
raise
# XBRLインスタンスの読み込み完了
# 使い終わった変数を削除
del self.taxonomy
# pickle化のさまたげになるオブジェクトを削除(Noneを設定する)
# 削除対象のオブジェクト: lxml.etree._Element
# 「zip」または「フォルダ」のファイルデータ辞書から削除
for file in self.files.keys():
if isinstance(self.files[file], lxml_etree_Element):
self.files[file] = None
# XBRLインスタンスの辞書から削除
for xbrl in self.xbrls.values():
for file in xbrl.dts.xbrl_files.keys():
if isinstance(xbrl.dts.xbrl_files[file], lxml_etree_Element):
xbrl.dts.xbrl_files[file] = None
return
def _get_file_datas(self, path):
"""「zip」または「フォルダ」の中にあるファイルデータを取得して辞書に追加する"""
# ここはzipとフォルダで処理が異なります。
# そのため、各派生クラスに同じ名前の関数を実装しました。
# 基底クラスのここは実行しません。
return {}
@staticmethod
def _is_skip_target(path):
"""スキップするファイル"""
# ファイルパスからファイル名を取得
filename = os_basename(path)
# 拡張子を取得して小文字に変換する
ext = os_splitext(path)[1].lower()
if ext in SKIP_EXTS_SET:
# スキップ対象の拡張子に含まれている
return True
if filename in SKIP_FILENAMES_SET:
# スキップ対象のファイル名に含まれている
return True
if RE_MITAIOU_SKIP_FILENAME_MATCH(filename):
# 未対応 (MITAIOU) なのでスキップ対象としているファイル名のパターンに一致する
return True
return False
def _find_xbrl_paths(self, src_files):
"""ファイルデータの辞書からXBRLインスタンスを見つけてリストに追加する"""
# ファイルをフォルダごとに分ける
folders = OrderedDict()
for (file, data) in src_files.items():
folder = os_dirname(file)
if folder not in folders:
folders[folder] = OrderedDict()
folders[folder][file] = data
del (file, data)
# フォルダごとに起点となるファイルを見つけてリストに追加する
paths = []
for (folder, files) in folders.items():
# (1/3) フォルダ内のファイルの種類をすべて判定する
ts = []
for (file, data) in files.items():
if not RE_XBRL_INSTANCE_MATCH(file):
continue
if not isinstance(data, bytes):
continue
# 辞書のバイナリデータをパース結果で上書きする
src_files[file] = _read_xml_data(data)
# 読み込みの優先度を取得
ts.append(self._get_priority_to_read(src_files[file], file))
# フォルダに起点となるファイルがなければスキップ
if not ts:
continue
# (2/3) 判定結果をソートして、優先度の高い順に並べる。
ts.sort(key=lambda x: x.priority, reverse=False)
# (3/3) フォルダの内容に応じて、起点となるXBRLインスタンスを取得する。
if ts[0].filetype == FILETYPE_MANIFEST:
# マニフェストがあれば、そこから XBRL または InlineXBRL のリストを取得する。
for t in ts:
if t.filetype == FILETYPE_MANIFEST:
# マニフェストを読み込む
manifest = self._read_manifest(src_files[t.path], current_file=t.path)
# 辞書のデータを読み込み結果で上書きする
src_files[t.path] = manifest
# XBRLインスタンスが現在のフォルダにあれば、そちらを読み込む。
# 無ければ、InlineXBRLのタプルを取得する。
for (preferred_filename, instance) in manifest.instances.items():
# スキップ対象ならスキップ
if self._is_skip_target(preferred_filename):
continue
if preferred_filename in src_files:
# XBRLが同じフォルダにあるので、それを取得する。
paths.append((FILETYPE_XBRL, preferred_filename))
else:
# InlineXBRLしかないので、それらのタプルを取得。
# (存在するものだけを取得します)
ixbrls = []
for ixbrl in instance.ixbrls:
if ixbrl in src_files:
ixbrls.append(ixbrl)
else:
logger.debug('Error 存在しないInlineXBRL %s' % ixbrl)
paths.append((FILETYPE_INLINE_XBRL, tuple(ixbrls)))
elif ts[0].filetype == FILETYPE_XBRL:
# マニフェストが無くてXBRLがあるなら、それらをすべて取得する。
paths.extend(tuple((FILETYPE_XBRL, t.path) for t in ts if t.filetype == FILETYPE_XBRL))
elif ts[0].filetype == FILETYPE_INLINE_XBRL:
# マニフェストが無くて、XBRLも無くて、InlineXBRLがあるなら、それらをタプルにして取得する。
paths.append((FILETYPE_INLINE_XBRL, tuple(t.path for t in ts if t.filetype == FILETYPE_INLINE_XBRL)))
# logger.debug('InlineXBRLしかない %s' % folder)
return paths
@staticmethod
def _get_priority_to_read(data, file):
"""読み込みの起点となるファイルの優先度を取得する関数"""
# 0 が最も高い優先度(priority)とした。9 は最も低い優先度。
if data.tag == TAG_MANIFEST:
# 一番上の階層のタグが
# <manifest xmlns="http://disclosure.edinet-fsa.go.jp/2013/manifest">
# なので、マニフェストである。
return FILE_PRIORITY(priority=0, filetype=FILETYPE_MANIFEST, path=file)
elif data.tag == TAG_XBRLI:
# 一番上の階層のタグが
# <{http://www.xbrl.org/2003/instance}xbrl>
# なので、普通のXBRLインスタンスである。
return FILE_PRIORITY(priority=1, filetype=FILETYPE_XBRL, path=file)
elif data.tag == TAG_XHTML:
# 一番上の階層のタグが
# <html>
if NS_IX in data.nsmap.values():
# インラインXBRLの名前空間に
# xmlns:ix="http://www.xbrl.org/2008/inlineXBRL"
# があったので、インラインXBRLである。
return FILE_PRIORITY(priority=2, filetype=FILETYPE_INLINE_XBRL, path=file)
# XBRLファイルではなかった
return FILE_PRIORITY(priority=9, filetype=None, path=file)
@staticmethod
def _read_manifest(data, current_file):
"""manifestファイルを読み込む関数"""
d = _DATA()
d.title = OrderedDict()
d.instances = OrderedDict()
# タイトルを取得
for t in data.findall(XPATH_MANIFEST_TOC_COMPOSITION_TITLE):
d.title[t.get(ATTRIB_LANG)] = t.text
# XBRLインスタンスを取得
for t in data.findall(XPATH_MANIFEST_INSTANCE):
abs_path = _get_abs_path(current=current_file, target=t.attrib['preferredFilename'])
d.instances[abs_path] = _DATA()
d.instances[abs_path].attrib = dict(t.attrib)
d.instances[abs_path].ixbrls = tuple(_get_abs_path(current_file, x.text) for x in t.findall(XPATH_MANIFEST_IXBRL))
return d
def _read_xbrl_instance_and_dts(self, xbrl_type, xbrl_path):
"""「XBRLインスタンス(xbrli)」と「発見可能なタクソノミ集合(dts)」を読み込む関数"""
# データ用クラスのインスタンスを作る
data = _DATA()
# XBRLの種類に応じて、読み込みクラスを使い分ける。
if xbrl_type == FILETYPE_XBRL:
# (普通のXBRL) XBRLを読み込む
data.xbrl = _XBRL_INSTANCE(xbrl_type, xbrl_path, self.files)
current_xbrl_path = xbrl_path
elif xbrl_type == FILETYPE_INLINE_XBRL:
# (InlineXBRL) InlineXBRLのファイル群を読み込む
data.xbrl = _Inline_XBRL_INSTANCE(xbrl_type, xbrl_path, self.files)
current_xbrl_path = xbrl_path[0]
else:
raise XBRLParseError('未対応のXBRL形式 %s' % str(xbrl_type))
# 辞書にデータを上書きする
self.files[xbrl_path] = data.xbrl
# タクソノミ発見の起点となるリンク集合を作成
xbrl_file_paths = set()
for abs_path in data.xbrl.schema_refs.keys():
xbrl_file_paths.add(abs_path)
for abs_path in data.xbrl.linkbase_refs.keys():
# (linkbaseRefでは、abs_path[1]のid部分は無い。)
xbrl_file_paths.add(abs_path)
for abs_path in data.xbrl.role_refs.keys():
# abs_path: [0]abs_path [1]id
xbrl_file_paths.add(abs_path[0])
for abs_path in data.xbrl.arcrole_refs.keys():
# abs_path: [0]abs_path [1]id
xbrl_file_paths.add(abs_path[0])
# 発見可能なタクソノミ集合 (DTS: Discoverable Taxonomy Set) を取得する
data.dts = self._get_discoverable_taxonomy_set(tuple(xbrl_file_paths), xbrl_path=current_xbrl_path)
return data
def _get_discoverable_taxonomy_set(self, file_paths, xbrl_path):
"""発見可能なタクソノミ集合 (DTS: Discoverable Taxonomy Set) を取得する関数"""
# タクソノミを探すときの起点にするリスト
start_files = []
for file_path in file_paths:
# 参照しているファイルのデータを取得
data = self._get_xml_data(abs_path=file_path, current_file=xbrl_path)
# タクソノミを探す起点に追加
start_files.append((file_path, data))
return _XBRL_DISCOVERABLE_TAXONOMY_SET(
self.files,
self._get_xml_data,
self.taxonomy,
start_files,
)
def _get_xml_data(self, abs_path, current_file):
"""ファイルパスからデータを取得する関数"""
# XBRLフォルダ(zip)にファイルがあれば取得
if abs_path in self.files:
data = self.files[abs_path]
# パースしていなければパースする
if isinstance(data, bytes):
data = _read_xml_data(data)
if data is None:
raise XBRLParseError('xmlの読み込み結果がNone %s %s' % (abs_path, current_file))
# 辞書のバイナリデータをパース結果で上書きする
self.files[abs_path] = data
else:
# タクソノミフォルダにファイルがあれば取得 & パース
data = self.taxonomy._get_data(current_file, target_file=abs_path)
if data is None:
raise XBRLParseError('ファイルデータがNone %s %s' % (abs_path, current_file))
return data
class XBRL_ZIP(_XBRL_BASE):
"""XBRLをzipから読み込むクラス"""
def _get_file_datas(self, path):
"""zipの中をたどって全てのファイルデータを取得する関数"""
files = OrderedDict()
# zipを読み込む
with ZipFile(path, 'r') as z:
# ファイル名のリストを全てたどる
for filename in z.namelist():
# 不要なファイルをスキップする
if self._is_skip_target(filename):
# logger.debug('skipped %s' % filename)
continue
# zipファイルの中での名前をキーにして、辞書にバイナリデータを追加。
files[filename] = z.read(filename)
if files[filename] is None:
raise XBRLParseError('xmlの読み込み結果が無効 %s' % path)
return files
class XBRL_FOLDER(_XBRL_BASE):
"""XBRLをフォルダから読み込むクラス"""
def _get_file_datas(self, path):
"""フォルダの階層をたどって全てのファイルデータを取得する関数"""
files = OrderedDict()
# フォルダの階層をたどって、ファイルをすべて列挙する。
for w in os_walk(path):
# w: [0]root [1]dirs [2]filenames
# フォルダの中にあるファイルを列挙する
for filename in w[2]:
# 不要なファイルをスキップする
if self._is_skip_target(filename):
# logger.debug('skipped %s' % filename)
continue
# ファイルの絶対パスを取得
abs_path = os_join(w[0], filename)
# ファイルの絶対パスをキーにして、辞書にバイナリデータを追加。
with open(abs_path, 'rb') as f:
files[abs_path] = f.read()
if files[abs_path] is None:
raise XBRLParseError('ファイルの読み込み結果が無効 %s' % abs_path)
return files
def get_comment(file_data):
"""XMLの最初に書かれているコメントを取得する関数"""
# コメントには、XBRLを生成したソフトウェアの情報などが載っている場合があります。
comment = file_data.getprevious()
if comment is None:
return None
else:
# コメント要素 <!-- comments --> の内部の
# 両端には空白などがあるので、除去します。
return comment.text.strip()
class _XBRL_INSTANCE_BASE:
"""XBRLインスタンスを読み込む基底クラス(xbrl, ixbrl)"""
# XBRLインスタンスの読み込の共通部分をまとめました
def __init__(self, path, xbrl_files):
"""クラスのインスタンスを作るときに実行される関数"""
# ここはXBRLとインラインXBRLで処理が異なります。
# そのため、各派生クラスに同じ名前の関数を実装しました。
# 基底クラスのここは実行しません。
return
@staticmethod
def get_link_tags(files, xbrl, xpath_tag, xbrl_path, xbrl_files):
"""schemaRefタグ または linkbaseRefタグ を取得する関数"""
for tag in xbrl.findall(xpath_tag):
# [0]ファイルパスと[1]id属性に分割
href = tag.attrib[ATTRIB_HREF].split('#')
# データ用クラスのインスタンスを作る
data = _DATA()
# 属性を辞書に変換して追加
data.attrib = dict(tag.attrib)
len_href = len(href)
if len_href == 1:
# 辞書からキーにする値を取得 & 辞書から削除
raw_path = data.attrib.pop(ATTRIB_HREF)
else:
raw_path = href[0]
if raw_path in xbrl_files:
# XBRLフォルダ(zip)にパスがあれば何もしない
abs_path = raw_path
else:
# XBRLインスタンスのファイルパスを使って、相対パスを絶対パスに変換。
abs_path = _get_abs_path(current=xbrl_path, target=raw_path)
# パスを追加
data.abs_path = abs_path
data.base_path = xbrl_path
data.raw_path = raw_path
# 辞書に追加
if len_href == 1:
assert data.abs_path not in files
files[data.abs_path] = data
else:
assert (data.abs_path, href[1]) not in files
files[(data.abs_path, href[1])] = data
return
def get_context_tags(self, contexts, xbrl):
"""contextタグを取得する関数"""
# contextタグ取得
for t in xbrl.findall(XPATH_CONTEXT):
# id属性の値を取得
t_id = t.get('id')
# id属性は重複しないはず
assert t_id not in contexts
# id属性をキーにして辞書を作成
contexts[t_id] = OrderedDict()
# entityタグ取得
entity = OrderedDict()
for (n, tag) in enumerate(t.findall(XPATH_ENTITY), start=1):
# entityは通常1つ
assert n == 1
# identifierタグ取得
entity.update(self.get_identifier_tags(tag))
contexts[t_id]['entity'] = entity
# periodタグ取得
period = OrderedDict()
for (n, tag) in enumerate(t.findall(XPATH_PERIOD), start=1):
# periodは通常1つ
assert n == 1
# startDate, endDate, instantタグ取得
period.update(self.get_date_tags(tag))
contexts[t_id]['period'] = period
# scenarioタグ取得
scenario = OrderedDict()
for (n, tag) in enumerate(t.findall(XPATH_SCENARIO), start=1):
# scenarioは通常1つ
assert n == 1
# explicitMemberタグ取得
scenario.update(self.get_explicit_member_tags(tag))
contexts[t_id]['scenario'] = scenario
return
@staticmethod
def get_identifier_tags(t):
"""identifierタグを取得する関数"""
d = OrderedDict()
for (n, tag) in enumerate(t.findall(XPATH_IDENTIFIER), start=1):
# identifierは通常1つ
assert n == 1
# タグの属性を追加
d.update(tag.attrib)
# タグのテキストを追加
d['text'] = tag.text
return d
@staticmethod
def get_date_tags(t):
"""日付タグを取得する関数"""
datas = OrderedDict()
t_start_date = t.find(XPATH_START_DATE)
if t_start_date is None:
# 期末日を追加
datas['instant'] = t.find(XPATH_INSTANT).text
else:
# 開始日を追加
datas['start_date'] = t_start_date.text
# 終了日を追加
datas['end_date'] = t.find(XPATH_END_DATE).text
return datas
@staticmethod
def get_explicit_member_tags(t):
"""explicitMemberタグを取得する関数"""
d = OrderedDict()
for t_explicit_member in t.findall(XPATH_EXPLICIT_MEMBER):
key = t_explicit_member.get('dimension')
assert key not in d
d[key] = dict(t_explicit_member.attrib)
d[key]['text'] = t_explicit_member.text
return d
def get_unit_tags(self, d, xbrl):
"""unitタグを取得する関数"""
# unitタグ取得
for t in xbrl.findall(XPATH_UNIT):
# id属性の値を取得
t_id = t.get('id')
# idは重複しないはず
assert t_id not in d
# 直下のタグを取得
for tag in t.findall('./{*}*'):
if tag.tag == TAG_DIVIDE:
# divideタグ取得
d[t_id] = self.get_divide_tags(tag)
elif tag.tag == TAG_MEASURE:
# measureタグ取得
text = tag.text.split(':')
if len(text) == 1:
d[t_id] = text[0]
else:
d[t_id] = text[1]
else:
raise XBRLParseError('不正なタグ名 %s' % tag.tag)
return
def get_divide_tags(self, t):
"""divideタグを取得する関数"""
units = []
for tag in t.findall('./{*}*'):
if tag.tag == TAG_UNIT_NUMERATOR:
units.append(self.get_measure_tags(tag, '*'))
elif tag.tag == TAG_UNIT_DENOMINATOR:
units.append(self.get_measure_tags(tag, '/'))
else:
raise XBRLParseError('不正なunitタグ %s' % tag.tag)
# 単位は存在するはず
assert units
# 先頭の不要な乗算記号を削除 or 除算される1を追加
if units[0][0] == '*':
units[0] = units[0][1:]
elif units[0][0] == '/':
units[0] = '1%s' % units[0][1:]
# 単位を連結
return ''.join(units)
@staticmethod
def get_measure_tags(t, operator):
"""measureタグを取得する関数"""
texts = []
for tag in t.findall(XPATH_MEASURE):
text = tag.text.split(':')
if len(text) == 1:
texts.append('%s%s' % (operator, text[0]))
else:
texts.append('%s%s' % (operator, text[1]))
return ''.join(texts)
@staticmethod
def get_data_tags(datas, xbrl):
"""インスタンス値のタグを取得する関数"""
# ここはXBRLとインラインXBRLで処理が異なります。
# そのため、各派生クラスに同じ名前の関数を実装しました。
# 基底クラスのここは実行しません。
return
class _XBRL_INSTANCE(_XBRL_INSTANCE_BASE):
"""XBRLインスタンスを読み込むクラス(xbrl)"""
def __init__(self, xbrl_type, path, xbrl_files):
"""クラスのインスタンスを作るときに実行される関数"""
self.xbrl_type = xbrl_type
self.path = path
xbrl = xbrl_files[path]
# コメントを取得
self.comments = get_comment(xbrl)
# 名前空間接頭辞の辞書を作成
self.ns_prefixes = {x[1]: x[0] for x in xbrl.nsmap.items()}
# スキーマファイルパスを取得
self.schema_refs = OrderedDict()
self.get_link_tags(self.schema_refs, xbrl, XPATH_SCHEMA_REF, path, xbrl_files)
if not self.schema_refs:
raise XBRLParseError('スキーマファイルが辞書にありません %s' % path)
self.linkbase_refs = OrderedDict()
self.get_link_tags(self.linkbase_refs, xbrl, XPATH_LINKBASE_REF, path, xbrl_files)
self.role_refs= OrderedDict()
self.get_link_tags(self.role_refs, xbrl, XPATH_ROLE_REF, path, xbrl_files)
self.arcrole_refs = OrderedDict()
self.get_link_tags(self.arcrole_refs, xbrl, XPATH_ARCROLE_REF, path, xbrl_files)
# コンテキストタグ(日付情報)を取得
self.contexts = OrderedDict()
self.get_context_tags(self.contexts, xbrl)
# ユニットタグ(単位情報)を取得
self.units = OrderedDict()
self.get_unit_tags(self.units, xbrl)
# インスタンス値のタグ(文書情報・財務諸表データ)を取得
self.datas = OrderedDict()
self.get_data_tags(self.datas, xbrl)
# (未実装) フットノートリンク (footnoteLink) の読み込み
return
@staticmethod
def get_data_tags(datas, xbrl):
"""インスタンス値のタグを取得する関数"""
# タグ検索では、contextRef属性を持ったタグを「インスタンス値のタグ」とみなしました。
# XBRL 2.1 の仕様書にある「The @contextRef attribute」の説明で、
# 「すべてのアイテムはコンテキストを持たないといけない。(All items MUST have a context.)」
# と書かれていたので、そうしました。
for t in xbrl.findall('.//*[@contextRef]'):
# タグ名と属性のタプルをキーにして辞書を作成
key = (t.tag, t.get('contextRef'), t.get('id'), t.get('unitRef'))
data = OrderedDict()
# 属性の内容を追加
data.update(t.attrib)
# テキストも追加
if t.text:
data['text'] = t.text
else:
# テキストが無くても何らかのタグが入っている可能性があるので、取得を試みる。
data['text'] = ''
# 直下のタグをすべて取得
for t_inner in t.findall('./*'):
data['text'] += tostring(t_inner, encoding='unicode', method='xml')
logger.debug('info テキストの中にタグがある %s' % t_inner.tag)
# キーの重複があれば知らせる
if key in datas:
if data == datas[key]:
# キーも値も同じである。
# (問題にはならないのでスキップ)
continue
else:
# キーが同じで値が異なっている。
# (値を区別できないので知らせる)
logger.debug('インスタンス値の属性が重複しています %s' % str(key))
# 辞書に追加
datas[key] = data
return
class _Inline_XBRL_INSTANCE(_XBRL_INSTANCE_BASE):
"""インラインXBRLインスタンスを読み込むクラス(ixbrl)"""
def __init__(self, xbrl_type, paths, xbrl_files):
"""クラスのインスタンスを作るときに実行される関数"""
self.xbrl_type = xbrl_type
self.paths = paths
self.comments = OrderedDict()
self.ns_prefixes = OrderedDict()
self.schema_refs = OrderedDict()
self.linkbase_refs = OrderedDict()
self.role_refs = OrderedDict()
self.arcrole_refs = OrderedDict()
self.contexts = OrderedDict()
self.units = OrderedDict()
self.datas = OrderedDict()
for path in paths:
# 辞書からInlineXBRLのデータを取得
xbrl = xbrl_files[path]
# コメントを取得
self.comments[path] = get_comment(xbrl)
# 名前空間接頭辞の辞書を作成
self.ns_prefixes.update({x[1]: x[0] for x in xbrl.nsmap.items()})
# スキーマファイルパスを取得
self.get_link_tags(self.schema_refs, xbrl, XPATH_SCHEMA_REF, path, xbrl_files)
if not self.schema_refs:
logger.debug('Error スキーマファイルが辞書にありません %s' % path)
self.get_link_tags(self.linkbase_refs, xbrl, XPATH_LINKBASE_REF, path, xbrl_files)
self.get_link_tags(self.role_refs, xbrl, XPATH_ROLE_REF, path, xbrl_files)
self.get_link_tags(self.arcrole_refs, xbrl, XPATH_ARCROLE_REF, path, xbrl_files)
# コンテキストタグ(日付情報)を取得
self.get_context_tags(self.contexts, xbrl)
# ユニットタグ(単位情報)を取得
self.get_unit_tags(self.units, xbrl)
# インスタンス値のタグ(文書情報・財務諸表データ)を取得
self.get_data_tags(self.datas, xbrl)
return
@staticmethod
def get_data_tags(datas, xbrl):
"""インスタンス値のタグを取得する関数"""
xbrl_nsmap = xbrl.nsmap
for t in xbrl.findall('.//*[@contextRef]'):
# name属性の名前空間接頭辞を名前空間に置換する
t_name = t.get('name')
if t_name:
(ns_prefix, name) = t_name.split(':', maxsplit=1)
t_name = '{%s}%s' % (xbrl_nsmap[ns_prefix], name)
else:
raise XBRLParseError('name属性が無い %s' % str(t.attrib))
# タグ名と属性のタプルをキーにして辞書を作成
key = (t_name, t.get('contextRef'), t.get('id'), t.get('unitRef'))
data = OrderedDict()
# 属性の内容を追加
data.update(t.attrib)
# テキストも追加
if t.text:
data['text'] = t.text
else:
# テキストが無くてもタグが入っている可能性があるので、取得を試みる。
data['text'] = ''
# 直下のタグをすべて取得
for t_inner in t.findall('./*'):
# method='xml' だと日本語が数値文字参照になったので method='html' を使用
data['text'] += tostring(t_inner, encoding='unicode', method='html')
# キーの重複があれば知らせる
if key in datas:
if data == datas[key]:
# キーも値も同じである。
# (問題にはならないのでスキップ)
continue
else:
# キーが同じで値が異なっている。
# (値を区別できないので知らせる)
logger.debug('インスタンス値の属性が重複している %s' % str(key))
# 辞書に追加
datas[key] = data
return
# keys
# [0]tag_role [1]href_from [2]label_from [3]label_to [4]loc_href [5]arc_role
# [6]arc_priority [7]arc_order [8]preferred_label [9]resources_role
get_keys_1458 = itemgetter(1, 4, 5, 8)
# 使用場所: _XBRL_DISCOVERABLE_TAXONOMY_SET() -> get_linkbase_data() -> add_p_arc_and_exlink()
# keys:
# [0]href_from [1]label_from [2]label_to [3]href_to [4]arc_role [5]arc_priority
# [6]arc_order [7]preferred_label ...
get_keys_0347 = itemgetter(0, 3, 4, 7)
# 使用場所: _XBRL_DISCOVERABLE_TAXONOMY_SET() -> apply_prohibited_arcs() -> is_target()
class _XBRL_DISCOVERABLE_TAXONOMY_SET:
"""発見可能なタクソノミ集合 (DTS: Discoverable Taxonomy Set) を作るクラス"""
def __init__(
self,
xbrl_files, _XBRL_BASE_get_xml_data,
taxonomy, start_files,
max_recursion_depth=100,
):
"""クラスのインスタンスを作るときに実行される関数"""
self.xbrl_files = xbrl_files
# 基底クラス _XBRL_BASE の _get_xml_data()
self._get_xml_data = _XBRL_BASE_get_xml_data
# スキーマ
self.import_schema_files = OrderedDict()
self.include_schema_files = OrderedDict()
self.linkbase_files = OrderedDict()
self.elements = OrderedDict()
self.name_id = OrderedDict()
self.role_types = OrderedDict()
self.arcrole_types = OrderedDict()
# リンクベース
self.role_refs = OrderedDict()
self.arcrole_refs = OrderedDict()
self.extended_links = OrderedDict()
self.prohibited_arcs = OrderedDict()
# ファイルを再帰的に読み込む
self.read_files_recursively(
taxonomy, start_files, max_recursion_depth, self.xbrl_files)
# prohibited arc (禁止アーク) を適用する
self.extended_links = self.apply_prohibited_arcs(self.extended_links, self.prohibited_arcs)
return
def read_files_recursively(
self, taxonomy, start_files, max_recursion_depth, xbrl_files):
"""ファイルを再帰的に読み込む関数"""
taxonomy_update = taxonomy._update
# 最初に読み込むファイルを設定
# start_files: [
# (file_path1, file_data1),
# (file_path2, file_data2),
# ...,
# ]
t = start_files
# 読み込み済みのファイルパス集合
t_set = set()
for _n in range(max_recursion_depth):
# 進捗表示
# logger.debug('n: %d schema: %d linkbase: %d' %
# (n, len(self.schema_files), len(self.linkbase_files)))
if not t:
# ファイルをたどり尽くしたので終了
# logger.debug('break n: %d' % n)
break
# 次に読み込むファイルのリストを設定
files = t
# 設定したら新しいリストを作成
t = []
for (file_path, file_data) in files:
# 読み込み済みならスキップする (1/2)
if file_path in t_set:
# logger.debug('skipped %s' % file_path)
continue
# データを取得する
if isinstance(file_data, _DATA):
# 読み込みが完了しているデータなので、そのまま受け取る。
data = file_data
else:
# ファイルの内容を取得する
if file_data.tag == TAG_SCHEMA:
data = self.get_schema_data(file_data, file_path, xbrl_files)
elif file_data.tag == TAG_LINKBASE:
data = self.get_linkbase_data(file_data, file_path, xbrl_files)
else:
raise XBRLParseError('未対応のファイル %s' % file_path)
# 取得する内容がなかった or 読み込み異常 の場合は
# 読み込み済み集合に追加してからスキップ
if data is None:
t_set.add(file_path)
continue
# キャッシュを更新する
if file_path in xbrl_files:
# XBRLのファイル辞書を更新
xbrl_files[file_path] = data
else:
# タクソノミの辞書を更新
taxonomy_update(abs_path=file_path, data=data)
# ファイルの読み込み結果が空でなければ辞書に追加する
if data is not None:
# 辞書に追加
if data.file_type == 0:
# スキーマ
self.import_schema_files.update(data.import_schema_files)
self.include_schema_files.update(data.include_schema_files)
self.linkbase_files.update(data.linkbase_files)
self.elements.update(data.elements)
self.name_id.update(data.name_id)
self.role_types.update(data.role_types)
self.arcrole_types.update(data.arcrole_types)
elif data.file_type == 1:
# リンクベース
self.role_refs.update(data.role_refs)
self.arcrole_refs.update(data.arcrole_refs)
self.add_extended_links(self.extended_links, data.extended_links)
self.add_prohibited_arcs_data(self.prohibited_arcs, data.prohibited_arcs)
# ファイルからリンクされている内容を取得する
# (関数の中で t に要素を追加します)
self.add_linked_data(data, file_path, t, t_set)
# 読み込み済みのファイルパス集合に追加
t_set.add(file_path)
else:
raise XBRLParseError('ファイルの再帰的な読み込み処理で、breakが発生しませんでした。')
return
def get_schema_data(self, file_data, file_path, xbrl_files):
"""スキーマファイルの内容を取得する関数"""
# データ用クラスのインスタンスを作る
data = _DATA()
# ファイルパスを追加
data.path = file_path
# ファイルタイプを追加 (0: スキーマファイル、1: リンクベースファイル)
data.file_type = 0
# コメントを取得
data.comment = get_comment(file_data)
# importタグを取得する
data.import_schema_files = self.get_link_tags(
file_data, XPATH_IMPORT, 'schemaLocation',
xbrl_files, file_path,
)
# includeタグを取得する
data.include_schema_files = self.get_link_tags(
file_data, XPATH_INCLUDE, 'schemaLocation',
xbrl_files, file_path,
)
# linkbaseRefタグを取得する
data.linkbase_files = self.get_link_tags(
file_data, XPATH_LINKBASE_REF, ATTRIB_HREF,
xbrl_files, file_path,
)
# elementタグを取得する
elements = OrderedDict()
name_id = OrderedDict()
for tag in file_data.findall(XPATH_ELEMENT):
if 'id' in tag.attrib:
# タグの属性を辞書に変換してから追加
elements[(file_path, tag.attrib['id'])] = dict(tag.attrib)
# name属性をキーにしてid属性を追加
name_id[tag.attrib['name']] = tag.attrib['id']
else:
# logger.debug('skipped id属性が無い %s %s' % (tag.tag, tag.attrib))
pass
data.elements = elements
data.name_id = name_id
# roleTypeタグを取得する
role_types = OrderedDict()
for tag in file_data.findall(XPATH_ROLE_TYPE):
# id属性をキーにして、タグの子要素のタプルを追加
if tag.attrib['id'] in role_types:
raise XBRLParseError('キー重複 %s' % tag.attrib['id'])
role_types[tag.attrib['id']] = tuple((x.tag, x.text) for x in tag.findall('*'))
data.role_types = role_types
# arcroleTypeタグを取得する
arcrole_types = OrderedDict()
for tag in file_data.findall(XPATH_ARCROLE_TYPE):
# タグの子要素をタプルにして辞書に追加
if tag.attrib['id'] in arcrole_types:
raise XBRLParseError('キー重複 %s' % tag.attrib['id'])
arcrole_types[tag.attrib['id']] = tuple((x.tag, x.text) for x in tag.findall('*'))
data.arcrole_types = arcrole_types
# (未実装) スキーマファイルに linkbaseRefタグではなくて、linkbaseタグがある場合。
# 仕様上は、スキーマファイルにlinkbaseタグを書いてもよいことになっていた。
if file_data.find(XPATH_LINKBASE):
logger.debug('未実装 スキーマファイルに linkbaseタグ が存在している %s' % file_path)
return data
@staticmethod
def get_link_tags(file_data, xpath_tag, link_attrib, xbrl_files, file_path):
"""importタグ または linkbaseRefタグ を取得する関数"""
files = OrderedDict()
for tag in file_data.findall(xpath_tag):
# データ用クラスのインスタンスを作る
data = _DATA()
# 属性を辞書に変換して追加
data.attrib = dict(tag.attrib)
# 辞書からキーにする値を取得 & 辞書から削除
raw_path = data.attrib.pop(link_attrib)
if raw_path in xbrl_files:
# XBRLフォルダ(zip)にパスがあれば何もしない
abs_path = raw_path
else:
# ファイルパスを使って相対パスを絶対パスに変換
abs_path = _get_abs_path(current=file_path, target=raw_path)
# パスを追加
data.abs_path = abs_path
data.base_path = file_path
data.raw_path = raw_path
# 辞書に追加
files[data.abs_path] = data
return files
def get_linkbase_data(self, file_data, file_path, xbrl_files):
"""リンクベースファイルの内容を取得する関数"""
# データ用クラスのインスタンスを作る
data = _DATA()
# ファイルパスを追加
data.path = file_path
# ファイルタイプを追加 (0: スキーマファイル、1: リンクベースファイル)
data.file_type = 1
# コメントを取得
data.comment = get_comment(file_data)
# roleRefタグを取得する
role_refs = OrderedDict()
for tag in file_data.findall(XPATH_ROLE_REF):
# タグの属性とファイルパスを取得
(href_id, t_data) = self.get_attrib_and_paths(tag, xbrl_files, file_path)
# roleURI属性をキーにして、href属性を辞書に追加。
if tag.attrib['roleURI'] in role_refs:
raise XBRLParseError('キー重複 %s' % tag.attrib['roleURI'])
role_refs[tag.attrib['roleURI']] = (t_data.abs_path, href_id)
data.role_refs = role_refs
# arcroleRefタグを取得する
arcrole_refs = OrderedDict()
for tag in file_data.findall(XPATH_ARCROLE_REF):
# タグの属性とファイルパスを取得
(href_id, t_data) = self.get_attrib_and_paths(tag, xbrl_files, file_path)
# arcroleURI属性をキーにして、href属性を辞書に追加。
if tag.attrib['arcroleURI'] in arcrole_refs:
raise XBRLParseError('キー重複 %s' % tag.attrib['arcroleURI'])
arcrole_refs[tag.attrib['arcroleURI']] = (t_data.abs_path, href_id)
data.arcrole_refs = arcrole_refs
# 拡張リンクタグの内容を取得する
# タグ検索に使うXPATHを選択する
extended_link_tags = file_data.findall(XPATH_TYPE_EXTENDED)
if len(extended_link_tags):
# 表示リンク、定義リンク、計算リンク、名称リンク、参照リンク。
xpath_type_resource_ex = XPATH_TYPE_RESOURCE
xpath_type_arc_ex = XPATH_TYPE_ARC
else:
extended_link_tags = file_data.findall(XPATH_GEN_TYPE_EXTENDED)
if len(extended_link_tags):
# ジェネリックラベルリンク
xpath_type_resource_ex = XPATH_GEN_TYPE_RESOURCE
xpath_type_arc_ex = XPATH_GEN_TYPE_ARC
else:
# XMLとしての内容は空だけど、ファイルだけは作って同梱しているケース。
logger.debug('拡張リンクタグが見つかりませんでした %s' % file_path)
return None
# 拡張リンクの内容を取得する
locators = OrderedDict()
label_id = OrderedDict()
resources = OrderedDict()
prohibited_arcs = OrderedDict()
arcs = OrderedDict()
extended_links = OrderedDict()
tags = _DATA()
tags.locators = locators
tags.label_id = label_id
tags.resources = resources
tags.arcs = arcs
data.tags = tags
data.prohibited_arcs = prohibited_arcs
data.extended_links = extended_links
for t in extended_link_tags:
tag_name = t.tag
tag_role = t.attrib[ATTRIB_ROLE]
# (1/4) ロケータータグ(xlink:type="locator")を取得する
if tag_name not in locators:
locators[tag_name] = OrderedDict()
if tag_role not in locators[tag_name]:
locators[tag_name][tag_role] = OrderedDict()
# 高速化のために、roleキーの中に入れた辞書を変数に入れて使う。
locators_name_role = locators[tag_name][tag_role]
if tag_name not in label_id:
label_id[tag_name] = OrderedDict()
if tag_role not in label_id[tag_name]:
label_id[tag_name][tag_role] = OrderedDict()
label_id_name_role = label_id[tag_name][tag_role]
for tag in t.findall(XPATH_TYPE_LOCATOR):
# タグの属性とファイルパスを取得
(href_id, t_data) = self.get_attrib_and_paths(tag, xbrl_files, file_path)
# 辞書の値にする変数を作成
value = dict(tag.attrib)
# 辞書のキーにする変数を作成
href = (t_data.abs_path, href_id)
label = value[ATTRIB_LABEL]
# 重複タグでなければ追加
if href in locators_name_role:
if label in locators_name_role[href]:
if value == locators_name_role[href][label]:
# 重複タグなのでスキップ
continue
else:
# キーが同じなのにデータが異なるのはおかしいので記録。
logger.debug('%s キー重複 %s %s %s' % ('locators', os_basename(file_path), str(href), label))
else:
locators_name_role[href][label] = value
else:
locators_name_role[href] = OrderedDict()
locators_name_role[href][label] = value
# label属性をキーにしてhref属性の絶対pathとidのタプルを追加
if label in label_id_name_role:
# 重複タグの可能性があるので記録。
logger.debug('%s キー重複 %s %s %s' % ('label_id', os_basename(file_path), label, str(href)))
label_id_name_role[label] = href
# (2/4) リソースタグ(xlink:type="resource")を取得する
if tag_name not in resources:
resources[tag_name] = OrderedDict()
if tag_role not in resources[tag_name]:
resources[tag_name][tag_role] = OrderedDict()
resources_name_role = resources[tag_name][tag_role]
# for文の中で文字列比較をしていると遅いので、比較結果を bool型 にしておく。
if tag_name == TAG_REFERENCE_LINK:
# 参照リンク
is_reference_link = True
else:
# それ以外
is_reference_link = False
for tag in t.findall(xpath_type_resource_ex):
# 辞書の値にする変数を作成
value = dict(tag.attrib)
# 辞書のキーにする変数を作成
label = value[ATTRIB_LABEL]
if is_reference_link:
value['text'] = tuple((x.tag, x.text) for x in tag.findall('*'))
else:
value['text'] = tag.text
if label in resources_name_role:
if value == resources_name_role[label]:
# 重複タグなのでスキップ
continue
else:
# キーが同じなのにデータが異なるのはおかしいので、記録する。
logger.debug('%s キー重複 %s %s' % ('resources', label, os_basename(file_path)))
else:
resources_name_role[label] = value
# label属性をキーにしてid属性を追加
id_resource = value.get('id')
if label in label_id_name_role:
logger.debug('%s キー重複 %s %s %s' % ('label_id', label, id_resource, os_basename(file_path)))
label_id_name_role[label] = id_resource
# (3/4) アークタグ(xlink:type="arc")を取得する
if tag_name not in arcs:
arcs[tag_name] = OrderedDict()
if tag_role not in arcs[tag_name]:
arcs[tag_name][tag_role] = OrderedDict()
arcs_name_role = arcs[tag_name][tag_role]
for tag in t.findall(xpath_type_arc_ex):
# 辞書の値にする変数を作成
value = dict(tag.attrib)
# 辞書のキーにする変数を作成
label_from = value[ATTRIB_FROM]
href_from = label_id_name_role[label_from]
label_to = value[ATTRIB_TO]
href_to = label_id_name_role[label_to]
arc_role = value[ATTRIB_ARCROLE]
arc_priority = value.get('priority', '0') # priority属性が無いときは 0 を設定する。整数限定(XBRLの仕様)
arc_order = value.get('order', '1') # order属性が無いときは 1 を設定する。小数OK(XBRLの仕様)
preferred_label = value.get('preferredLabel')
# 辞書のキーを作成
arc_key = (href_from, label_from, label_to, href_to, arc_role, arc_priority, arc_order, preferred_label)
# キーが重複していなければ、アークタグの内容を追加する。
if arc_key in arcs_name_role:
if value == arcs_name_role[arc_key]:
# 重複タグなのでスキップ
continue
else:
# キーが同じなのにデータが異なるのはおかしいので記録
logger.debug('キー重複 %s %s' % (str(value.get('use')), str(arc_key)))
else:
arcs_name_role[arc_key] = value
# (4/4) 禁止アークの辞書と拡張リンクのリストをつくる
# アークを使って、ロケーターとロケーターをつなげる。または、
# アークを使って、ロケーターとリソースをつなげる。
if tag_name not in prohibited_arcs:
prohibited_arcs[tag_name] = OrderedDict()
if tag_role not in prohibited_arcs[tag_name]:
prohibited_arcs[tag_name][tag_role] = OrderedDict()
# 高速化のために、roleキーの中に入れた辞書を変数に入れて使う。
prohibited_arcs_name_role = prohibited_arcs[tag_name][tag_role]
if tag_name not in extended_links:
extended_links[tag_name] = OrderedDict()
if tag_role not in extended_links[tag_name]:
extended_links[tag_name][tag_role] = []
# 高速化のために、roleキーの中に入れたリストを変数に入れて使う。
ex_name_role = extended_links[tag_name][tag_role]
ex_name_role_append = ex_name_role.append
# 重複を検出するための集合
keys_set = (set(), set())
# アイテムを「禁止アークの辞書」または「拡張リンクのリスト」に振り分ける関数
def add_p_arc_and_exlink():
"""重複チェックをしながら、アイテムを「禁止アークの辞書」または「拡張リンクのリスト」に振り分ける関数。"""
arc_use = arc_data.get('use')
if arc_use == 'prohibited':
# 重複チェック(先頭に拡張リンクのロール属性を追加して確認する)
if keys in keys_set[0]:
logger.debug('禁止アークのキーに重複 %s %s' % (os_basename(file_path), str(keys)))
else:
keys_set[0].add(keys)
# 辞書に追加
prohibited_arcs_name_role[get_keys_1458(keys)] = values
else:
# 重複チェック(先頭に拡張リンクのロール属性を追加して確認する)
if keys in keys_set[1]:
logger.debug('拡張リンクのキーに重複 %s %s' % (os_basename(file_path), str(keys)))
else:
keys_set[1].add(keys)
# タプルを作ってリストに追加
ex_name_role_append((keys[1:], values))
return
# タグ名で処理を分けて、キーと値を作成。
if (tag_name == TAG_LABEL_LINK) or (tag_name == TAG_GEN_LINK):
# 名称リンク or ジェネリックラベルリンク
for ((href_from, label_from, label_to, href_to, arc_role, arc_priority, arc_order, preferred_label), arc_data) in arcs_name_role.items():
# リソースにはファイルパスがないので補う
t_href_to = (file_path, label_to)
# リソースのタグからrole属性を取得
if label_to in resources_name_role:
# リソースのロール属性を取得
resources_role = resources_name_role[label_to][ATTRIB_ROLE]
# キーと値を作成
keys = (tag_role, href_from, label_from, label_to, t_href_to, arc_role, arc_priority, arc_order, preferred_label, resources_role)
values = {'from': locators_name_role[href_from], 'arc': arc_data, 'to': resources_name_role[label_to]}
else:
# 禁止アークの場合に該当。
# ロケーターにリソースと同じロール属性があるので、そこから取得する。
for (loc_href, loc_labels) in locators_name_role.items():
for loc_label in loc_labels.keys():
if loc_label == label_to:
# locatorのrole属性はoptionalなので、getメソッドで取得します。
resources_role = locators_name_role[loc_href][loc_label].get(ATTRIB_ROLE)
break
else:
# breakしなかったので続行
continue
# labelが見つかってbreakしたので、この階層のfor文もbreakする。
break
else:
raise XBRLParseError('to属性の先にリソースが見つかりませんでした %s %s' % (os_basename(file_path), str(label_to)))
# キーと値を作成
keys = (tag_role, href_from, label_from, label_to, loc_href, arc_role, arc_priority, arc_order, preferred_label, resources_role)
values = {'from': locators_name_role[href_from], 'arc': arc_data, 'to': locators_name_role[loc_href]}
# 禁止アークの辞書または拡張リンクのリストに追加
add_p_arc_and_exlink()
elif tag_name == TAG_REFERENCE_LINK:
# 参照リンク
for ((href_from, label_from, label_to, href_to, arc_role, arc_priority, arc_order, preferred_label), arc_data) in arcs_name_role.items():
# リソースにはファイルパスがないので補う
t_href_to = (file_path, label_to)
# リソースのタグからrole属性を取得
resources_role = resources_name_role[label_to][ATTRIB_ROLE]
# キーと値を作成
keys = (tag_role, href_from, label_from, label_to, t_href_to, arc_role, arc_priority, arc_order, preferred_label, resources_role)
values = {'from': locators_name_role[href_from], 'arc': arc_data, 'to': resources_name_role[label_to]}
# 禁止アークの辞書または拡張リンクのリストに追加
add_p_arc_and_exlink()
else:
# 表示リンク or 計算リンク or 定義リンク
for ((href_from, label_from, label_to, href_to, arc_role, arc_priority, arc_order, preferred_label), arc_data) in arcs_name_role.items():
# キーと値を作成
keys = (tag_role, href_from, label_from, label_to, href_to, arc_role, arc_priority, arc_order, preferred_label)
values = {'from': locators_name_role[href_from], 'arc': arc_data, 'to': locators_name_role[href_to]}
# 禁止アークの辞書または拡張リンクのリストに追加
add_p_arc_and_exlink()
return data
@staticmethod
def get_attrib_and_paths(tag, xbrl_files, file_path):
"""タグの属性とファイルパスを取得する関数"""
# [0]ファイルパスと[1]id属性に分割
(href_path, href_id) = tag.attrib[ATTRIB_HREF].split('#')
if href_path in xbrl_files:
# XBRLフォルダ(zip)にパスがあれば何もしない
abs_path = href_path
else:
# 絶対パスを取得
abs_path = _get_abs_path(current=file_path, target=href_path)
# データ用クラスのインスタンスを作る
data = _DATA()
# 属性を辞書に変換して追加
data.attrib = dict(tag.attrib)
# パスを追加
data.abs_path = abs_path
data.base_path = file_path
data.raw_path = href_path
return (href_id, data)
def add_linked_data(self, file_data, file_path, t, t_set):
"""リンクされたデータを取得する関数"""
if file_data.file_type == 0:
# ほかのファイルへのリンクを取得
for data in file_data.import_schema_files.values():
# ファイルのデータを読み込んで t に追加する
self.add_linked_data_to_list(data.abs_path, file_path, t, t_set)
for data in file_data.include_schema_files.values():
self.add_linked_data_to_list(data.abs_path, file_path, t, t_set)
for data in file_data.linkbase_files.values():
self.add_linked_data_to_list(data.abs_path, file_path, t, t_set)
elif file_data.file_type == 1:
# ほかのファイルへのリンクを取得
for href in file_data.role_refs.values():
# ファイルのデータを読み込んで t に追加する
self.add_linked_data_to_list(href[0], file_path, t, t_set)
for href in file_data.arcrole_refs.values():
self.add_linked_data_to_list(href[0], file_path, t, t_set)
# ロケーターのhrefからリンクを取得
for roles in file_data.tags.locators.values():
for hrefs in roles.values():
for href in hrefs.keys():
self.add_linked_data_to_list(href[0], file_path, t, t_set)
else:
raise XBRLParseError('不明なファイルタイプ file_data.file_type: %s' % file_data.file_type)
return
def add_linked_data_to_list(self, abs_path, file_path, t, t_set):
"""リンクされた「ファイルへのリンク」と「ファイルのデータ」を取得する関数"""
# 使用しないファイルをスキップする
if self.is_skip_target(abs_path):
# logger.debug('skipped %s' % abs_path)
return
# 読み込み済みならスキップする (2/2)
if abs_path in t_set:
return
# データを取得
# まず、XBRLのフォルダ or XBRLのzip から取得。
# 無ければ、タクソノミフォルダから取得。
data = self._get_xml_data(
abs_path=abs_path,
current_file=file_path,
)
# 再帰用リストに追加
# ファイルの絶対パスは、XMLの中の相対パスを絶対パスに変換するときに使います。
t.append((abs_path, data))
return
@staticmethod
def is_skip_target(path):
"""スキップするファイルパス"""
if path.startswith('http://www.xbrl.org/'):
# このパスに属するファイルをスキップしても、XBRLは読み込めました。
return True
return False
@staticmethod
def add_extended_links(d, extended_links):
"""拡張リンクのデータを追加する関数"""
for (tag_name, roles) in extended_links.items():
# タグ名のキーがなければ追加する
if tag_name not in d:
d[tag_name] = OrderedDict()
for (role, datas) in roles.items():
# role属性のキーがなければ追加する
if role not in d[tag_name]:
d[tag_name][role] = []
# リストに拡張リンクのデータを追加する
d[tag_name][role].extend(datas)
return
@staticmethod
def add_prohibited_arcs_data(d, prohibited_arcs):
"""禁止アーク(prohibited arcs)のデータを追加する関数"""
for (tag_name, roles) in prohibited_arcs.items():
# タグ名のキーが無ければ追加
if tag_name not in d:
d[tag_name] = OrderedDict()
for (role, arcs) in roles.items():
if arcs:
# role属性のキーが無ければ追加
if role not in d[tag_name]:
d[tag_name][role] = OrderedDict()
# アーク辞書のキーが無ければ追加
for (arc_key, arc_data) in arcs.items():
if arc_key not in d[tag_name][role]:
d[tag_name][role][arc_key] = arc_data
else:
continue
return
@staticmethod
def apply_prohibited_arcs(extended_links, prohibited_arcs):
"""prohibitedArc (禁止アーク) を適用する"""
# 禁止アーク適用後の新しい拡張リンク辞書
ex_links = OrderedDict()
# 禁止アーク適用回数の辞書を作成 & 最大数を取得 デバッグ
prohibited_dict = OrderedDict()
n_prohibited_max = 0
for t_roles in prohibited_arcs.values():
for t_values in t_roles.values():
n_prohibited_max += len(t_values)
for tt_keys in t_values.keys():
prohibited_dict[tt_keys] = 0
# 禁止アークの適用回数 デバッグ
n_prohibited = 0
def is_target():
"""禁止アークに該当するかを判断する関数"""
if tag_name in prohibited_arcs:
# role属性
if role in prohibited_arcs[tag_name]:
# 名称、ジェネリックラベル、参照、表示、定義、計算リンクで、必須の属性をタプルで取得。
t_keys = get_keys_0347(keys)
# 必須の属性で一致するキーがあるかを判定
if t_keys in prohibited_arcs[tag_name][role]:
# 禁止アークの属性辞書を取得
p_arc = prohibited_arcs[tag_name][role][t_keys]['arc']
# priority属性が禁止アーク以下である
if int(arc_priority) <= int(p_arc.get('priority', '0')):
# order属性が一致する
if float(arc_order) == float(p_arc.get('order', '1')):
prohibited_dict[t_keys] += 1
return True
# else:
# logger.debug('禁止アーク order属性が異なる %s %s' % (float(arc_order), float(p_arc.get('order', '1'))))
return False
for (tag_name, roles) in extended_links.items():
if tag_name not in ex_links:
ex_links[tag_name] = OrderedDict()
if (tag_name == TAG_LABEL_LINK) or (tag_name == TAG_GEN_LINK):
# 名称リンク or ジェネリックラベルリンク
for (role, datas) in roles.items():
if role not in ex_links[tag_name]:
ex_links[tag_name][role] = OrderedDict()
for (keys, values) in datas:
# keys: [0]href_from [1]label_from [2]label_to [3]href_to [4]arc_role [5]arc_priority
# [6]arc_order [7]preferred_label [8]resources_role
href_from = keys[0]
id_from = href_from[1]
href_to = keys[3]
id_to = href_to[1]
lang = values['to'][ATTRIB_LANG]
arc_role = keys[4]
arc_priority = keys[5]
arc_order = keys[6]
preferred_label = keys[7]
resources_role = keys[8]
# 禁止アークを適用
if is_target():
# 禁止アークに該当したのでスキップする
# logger.debug('禁止アーク適用 %s' % '|'.join((tag_name.split('}')[1], id_from, id_to)))
n_prohibited += 1
continue
if id_from not in ex_links[tag_name][role]:
ex_links[tag_name][role][id_from] = OrderedDict()
# keys[1]href_to は使わないのでスキップ
if resources_role not in ex_links[tag_name][role][id_from]:
ex_links[tag_name][role][id_from][resources_role] = OrderedDict()
if lang not in ex_links[tag_name][role][id_from][resources_role]:
ex_links[tag_name][role][id_from][resources_role][lang] = OrderedDict()
if arc_role not in ex_links[tag_name][role][id_from][resources_role][lang]:
ex_links[tag_name][role][id_from][resources_role][lang][arc_role] = OrderedDict()
if preferred_label not in ex_links[tag_name][role][id_from][resources_role][lang][arc_role]:
ex_links[tag_name][role][id_from][resources_role][lang][arc_role][preferred_label] = values
elif tag_name == TAG_REFERENCE_LINK:
# 参照リンク
for (role, datas) in roles.items():
if role not in ex_links[tag_name]:
ex_links[tag_name][role] = OrderedDict()
for (keys, values) in datas:
# keys: [0]href_from [1]label_from [2]label_to [3]href_to [4]arc_role [5]arc_priority
# [6]arc_order [7]preferred_label [8]resources_role
href_from = keys[0]
id_from = href_from[1]
href_to = keys[3]
id_to = href_to[1]
arc_role = keys[4]
arc_priority = keys[5]
arc_order = keys[6]
preferred_label = keys[7]
resources_role = keys[8]
# 禁止アークを適用
if is_target():
# 禁止アークに該当したのでスキップする
# logger.debug('禁止アーク適用 %s' % '|'.join((tag_name.split('}')[1], id_from, id_to)))
n_prohibited += 1
continue
if id_from not in ex_links[tag_name][role]:
ex_links[tag_name][role][id_from] = OrderedDict()
if id_to not in ex_links[tag_name][role][id_from]:
ex_links[tag_name][role][id_from][id_to] = OrderedDict()
if resources_role not in ex_links[tag_name][role][id_from][id_to]:
ex_links[tag_name][role][id_from][id_to][resources_role] = OrderedDict()
if arc_role not in ex_links[tag_name][role][id_from][id_to][resources_role]:
ex_links[tag_name][role][id_from][id_to][resources_role][arc_role] = OrderedDict()
if preferred_label not in ex_links[tag_name][role][id_from][id_to][resources_role][arc_role]:
ex_links[tag_name][role][id_from][id_to][resources_role][arc_role][preferred_label] = values
else:
# 表示リンク or 計算リンク or 定義リンク
for (role, datas) in roles.items():
if role not in ex_links[tag_name]:
ex_links[tag_name][role] = OrderedDict()
for (keys, values) in datas:
# keys: [0]href_from [1]label_from [2]label_to [3]href_to [4]arc_role [5]arc_priority
# [6]arc_order [7]preferred_label
href_from = keys[0]
id_from = href_from[1]
href_to = keys[3]
id_to = href_to[1]
arc_role = keys[4]
arc_priority = keys[5]
arc_order = keys[6]
preferred_label = keys[7]
# 禁止アークを適用
if is_target():
# 禁止アークに該当したのでスキップする
# logger.debug('禁止アーク適用 %s' % '|'.join((tag_name.split('}')[1], id_from, id_to)))
n_prohibited += 1
continue
if id_from not in ex_links[tag_name][role]:
ex_links[tag_name][role][id_from] = OrderedDict()
if id_to not in ex_links[tag_name][role][id_from]:
ex_links[tag_name][role][id_from][id_to] = OrderedDict()
if arc_role not in ex_links[tag_name][role][id_from][id_to]:
ex_links[tag_name][role][id_from][id_to][arc_role] = OrderedDict()
if preferred_label not in ex_links[tag_name][role][id_from][id_to][arc_role]:
ex_links[tag_name][role][id_from][id_to][arc_role][preferred_label] = values
# 禁止アークの適用回数チェック
if n_prohibited != n_prohibited_max:
logger.debug('禁止アークの適用回数(%d回)が禁止アークの本数(%d本)に一致しませんでした' %
(n_prohibited, n_prohibited_max))
return ex_links
以上です。