MonapartyのCounterblockモジュールを作成する
今回は実際にカスタムモジュールを作成していく前に、カスタムモジュールの作成に必要な、counterblock
のプラグインアーキテクチャを理解する事から始めます。
基本的には公式のドキュメントに沿って、ビルトインモジュールのコードを参考にしつつまとめました。
正直、解説するほど深くは理解していないのですが、調べたことは何かしら残しておかないと忘れるので。
各種プロセッサ
counterblock
の起動時やブロック生成時などをトリガーに特定の処理を実行するため、いくつかのプロセッサが用意されています。
それぞれcounterblock.lib.processor
からインポートして利用し、以下の構文に従います。
from counterblock.lib.processor import <processor_name> @<processor_name>.subscribe(enabled=<bool>, priority=<int>) def my_function(param1, param2): bla = do_foo()
subscribe
の引数を指定しない場合、デフォルトはenabled = true
、priority= 0
です。
priority
については、同じプロセッサから呼ばれる複数の関数の実行順を決定するものです。メッセージのカテゴリーごとに処理する関数を分けてpriority
でその実行準を制御するような使い方のようです。
MessageProcessor
MessageProcessor
は、counterblock
のメッセージ毎に実行されます。
ちなみにトランザクション毎ではなくメッセージ毎です。例えば一つのアセット送信トランザクションの場合、debits
、credits
、sends
の3つのメッセージを受け取ったことになります。
@MessageProcessor.subscribe() def process_message(msg, msg_data):
定義する関数はmsg
とmsg_data
の二つの値を受け取ることが出来ます。
msg
はCounterparty APIのget_messages
の戻り値と同じフォーマットでメッセージの中身が入っていて、msg_data
はmsg
のbindings
の中身が入っています。
ビルトインのモジュールを眺めた感じだと、counterblock
側で独自に保持しているデータは新しいメッセージを受け取る度にここでMongoDBに保存したり既存データの更新をしているようです。
なお、定義した関数からABORT_THIS_MESSAGE_PROCESSING
をreturnすると、現在処理されているメッセージの他の優先度が低いMessageProcessor
を実行されないようにすることができます。
BlockProcessor
BlockProcessor
は、ブロック生成毎にすべてのMessageProcessor
関数が完了した後に1回実行されます。
@BlockProcessor.subscribe() def process_block():
MempoolMessageProcessor
MempoolMessageProcessor
は、承認前のトランザクションがMempoolに入った時点で実行されます。MessageProcessor
と同じくメッセージ毎に実行されますが、同じトランザクションがMempoolに入った時点とブロックに取り込まれた時点のメッセージ数は異なります。
@MempoolMessageProcessor.subscribe() def process_mempool_message(msg, msg_data):
StartUpProcessor
StartUpProcessor
は、counterblock
の起動時に1回実行されます。
ここでデータベースにインデックスを作成したりなどの初期化処理を実行します。
ただし、初期化処理といってもcounterblock
を再起動するたびに呼ばれるので、テーブルを初期化したりすると同期済ブロックのMessageProcessor
で実行されたはずのデータは消えます。実行しても問題無いかしっかり判定した上で実行するか、その手の初期化処理はRollbackProcessor
で定義します。
@StartUpProcessor.subscribe() def process_start_up():
CaughtUpProcessor
CaughtUpProcessor
は、counterblock
が最新のcounterpartyd
ブロックまで追いついた段階で実行されます。
しばらく眺めていた感じだと起動してから一度しか実行されないようです。
@CaughtUpProcessor.subscribe() def process_caught_up():
ビルトインのモジュールでは、ここからstart_task
で非同期で繰り返し実行するような処理を開始しているようです。
例えば、定期的に拡張アセット情報の取得が必要なアセットについて、データを取得しに行ったりなど。
RollbackProcessor
RollbackProcessor
は、reorgが発生した時など、ブロックチェーンデータベースがロールバックされるたびに実行されます。MessageProcessor
でMongoDBに保存しているデータは、不整合が起きないようここでロールバックします。
定義する関数はmax_block_index
を受け取ることが出来ます。ビルトインのモジュールでは、この値があればそこまでロールバックし、なければ全データをロールバックするような作りになっていました。
reparse
した場合まずはじめにmax_block_index
無しでRollbackProcessor
が実行されるので、データベースの初期化処理などはここに定義しておきます。reparse
すればデータベースの中身は勝手に消えるかと思いましたが消えないようです。特に初期データが不要であればdrop()
でコレクションの削除だけ実行しておきます。
@RollbackProcessor.subscribe() def process_rollback(max_block_index):
処理の流れ
起動時にStartUpProcessor
が実行され、ブロックが追いつくとCaughtUpProcessor
が実行されます。
未同期の場合、この間で過去のすべてのメッセージに対してMessageProcessor
が実行され、MongoDBに格納されたデータなどが最新になります。
二度目以降の起動である程度同期が終わっている場合も、そこから現在までのメッセージに対してMessageProcessor
が動きます。
CaughtUpProcessor
以降はトランザクションがあればまずはMempoolに入るのでMempoolMessageProcessor
がメッセージの数だけ実行され、ブロックが生成されるとMessageProcessor
がメッセージの数だけ実行されます。
ブロック毎にMessageProcessor
がすべて完了するとBlockProcessor
が実行されます。
なお、reparse
するとまずはじめにRollbackProcessor
が実行され、ここでMongoDBの初期化などが実行されます。その後StartUpProcessor
が呼ばれます。
なお、良く分かりませんが、StartUpProcessor
が実行されてからCaughtUpProcessor
が実行されるまでの間でWARNING:database: Pruning to block
のログが流れたあたりで毎回RollbackProcessor
が実行されていました。これはmax_block_index
付きです。
その他利用できる機能
JSON RPC APIの追加
以下のような構文でcounterblock
にAPIメソッドを追加出来ます。
引数は無くてもいいですし、必要であれば任意の変数名を設定出来ます。
from counterblock.lib.processor import API @API.add_method def custom_api(param):
APIメソッドを追加すると、以下のようにJSON RPC APIから呼び出せるようになります。
curl -s -X POST --data '{"jsonrpc":"2.0","id":1,"method":"custom_api", "params":{"param":"testparams"}}' http://localhost:4100
start_task
start_task
を利用すると、別スレッドで実行するタスクを開始出来ます。delay
を設定しなければ即時実行します。
以下ではrun_my_task
関数を1分後に実行します。
from counterblock.lib.processor import start_task start_task(run_my_task, delay=60)
ビルトインのモジュールでは、delay
無しで即時実行した関数内で、delay
有りで再起で呼び出して、定期実行するような使い方がされていました。
config.state
counterblock.lib
からconfig
をインポートすると、config.state
に格納された内部状態にアクセス出来ます。
from counterblock.lib import config
内容は以下のようなオブジェクトです。
{ 'cw_last_message_seq': 174, 'cur_block': {'block_index': 0}, 'my_latest_block': { '_id': ObjectId('5a6621decc5d12000700a7dc'), 'block_hash': '800309c298589241559b5fbe008140087e11f05ac6103e811aa9c52c6ccd07fb', 'block_time': datetime.datetime(2018, 1, 22, 17, 39, 25), 'block_index': 1224710 }, 'cp_caught_up': True, 'caught_up_started_events': True, 'cp_backend_block_index': 1224710, 'caught_up': True, 'last_message_index': 70983, 'cp_latest_block_index': 1224710 }
modules.conf
以下にあるmodules.conf
を編集すると、作成したカスタムモジュールを有効にしたり、ビルトインモジュールを無効にしたり出来ます。
federatednode/config/counterblock/modules.conf
カスタムモジュールの読み込み
作成したカスタムモジュールをロードするには、まずcounterblockコンテナの任意のディレクトリにモジュールを保存します。
ホストからコンテナにcp
コマンドなどでファイルをコピーします。
sudo docker cp hostdir/my_module.py federatednode_counterblock_1:/counterblock/counterblock/lib/modules/my_module.py
ファイルを保存したら、modules.conf
を編集します。
[LoadModule]
の下にビルトインモジュールのパスが並んでいるので、その下にでもカスタムモジュールのパスをcounterblock
のbase-dir
に対する相対パスで指定します。
[LoadModule] lib/modules/assets = True lib/modules/counterwallet = True lib/modules/dex = True lib/modules/transaction_stats = True lib/modules/betting = True lib/modules/my_module = True
上記設定ではmy_module.py
かmy_module
フォルダの__init__.py
を探しにいくようです。
デフォルト動作の変更
modules.conf
ではデフォルト機能の有効、無効や優先順位の変更も出来ます。各プロセッサから呼ばれる関数単位で設定出来ます。
有効、無効はbool値、優先順位はint値で設定し、タプルで渡すことも出来ます。
[MessageProcessor] handle_exceptional = True handle_invalid = True parse_insert = True handle_reorg = True parse_issuance = 10, True parse_balance_change = True parse_trade_book = True parse_broadcast = True
一連の作業の一部だけ止めるようなことが出来るので、影響をちゃんと考えないとおいそれとは無効には出来なそうです。ドキュメントには良く分からないなら触るなと書いてありました。
これで基本的な作りは理解出来た気がするので、次回は実際に動くものを作ってみます。