MonapartyのCounterblockモジュールを作成する

今回は実際にカスタムモジュールを作成していく前に、カスタムモジュールの作成に必要な、counterblockプラグインアーキテクチャを理解する事から始めます。

基本的には公式のドキュメントに沿って、ビルトインモジュールのコードを参考にしつつまとめました。

正直、解説するほど深くは理解していないのですが、調べたことは何かしら残しておかないと忘れるので。

各種プロセッサ

counterblockの起動時やブロック生成時などをトリガーに特定の処理を実行するため、いくつかのプロセッサが用意されています。

それぞれcounterblock.lib.processorからインポートして利用し、以下の構文に従います。

from counterblock.lib.processor import <processor_name>

@<processor_name>.subscribe(enabled=<bool>, priority=<int>)
def my_function(param1, param2):
    bla = do_foo()

subscribeの引数を指定しない場合、デフォルトはenabled = truepriority= 0です。

priorityについては、同じプロセッサから呼ばれる複数の関数の実行順を決定するものです。メッセージのカテゴリーごとに処理する関数を分けてpriorityでその実行準を制御するような使い方のようです。

MessageProcessor

MessageProcessorは、counterblockのメッセージ毎に実行されます。

ちなみにトランザクション毎ではなくメッセージ毎です。例えば一つのアセット送信トランザクションの場合、debitscreditssendsの3つのメッセージを受け取ったことになります。

@MessageProcessor.subscribe()
def process_message(msg, msg_data):

定義する関数はmsgmsg_dataの二つの値を受け取ることが出来ます。

msgはCounterparty APIget_messagesの戻り値と同じフォーマットでメッセージの中身が入っていて、msg_datamsgbindingsの中身が入っています。

ビルトインのモジュールを眺めた感じだと、counterblock側で独自に保持しているデータは新しいメッセージを受け取る度にここでMongoDBに保存したり既存データの更新をしているようです。

なお、定義した関数からABORT_THIS_MESSAGE_PROCESSINGをreturnすると、現在処理されているメッセージの他の優先度が低いMessageProcessorを実行されないようにすることができます。

BlockProcessor

BlockProcessorは、ブロック生成毎にすべてのMessageProcessor関数が完了した後に1回実行されます。

@BlockProcessor.subscribe()
def process_block():

MempoolMessageProcessor

MempoolMessageProcessorは、承認前のトランザクションがMempoolに入った時点で実行されます。MessageProcessorと同じくメッセージ毎に実行されますが、同じトランザクションがMempoolに入った時点とブロックに取り込まれた時点のメッセージ数は異なります。

@MempoolMessageProcessor.subscribe()
def process_mempool_message(msg, msg_data):

StartUpProcessor

StartUpProcessorは、counterblockの起動時に1回実行されます。 ここでデータベースにインデックスを作成したりなどの初期化処理を実行します。

ただし、初期化処理といってもcounterblockを再起動するたびに呼ばれるので、テーブルを初期化したりすると同期済ブロックのMessageProcessorで実行されたはずのデータは消えます。実行しても問題無いかしっかり判定した上で実行するか、その手の初期化処理はRollbackProcessorで定義します。

@StartUpProcessor.subscribe()
def process_start_up():

CaughtUpProcessor

CaughtUpProcessorは、counterblockが最新のcounterpartydブロックまで追いついた段階で実行されます。 しばらく眺めていた感じだと起動してから一度しか実行されないようです。

@CaughtUpProcessor.subscribe()
def process_caught_up():

ビルトインのモジュールでは、ここからstart_taskで非同期で繰り返し実行するような処理を開始しているようです。 例えば、定期的に拡張アセット情報の取得が必要なアセットについて、データを取得しに行ったりなど。

RollbackProcessor

RollbackProcessorは、reorgが発生した時など、ブロックチェーンデータベースがロールバックされるたびに実行されます。MessageProcessorでMongoDBに保存しているデータは、不整合が起きないようここでロールバックします。

定義する関数はmax_block_indexを受け取ることが出来ます。ビルトインのモジュールでは、この値があればそこまでロールバックし、なければ全データをロールバックするような作りになっていました。

reparseした場合まずはじめにmax_block_index無しでRollbackProcessorが実行されるので、データベースの初期化処理などはここに定義しておきます。reparseすればデータベースの中身は勝手に消えるかと思いましたが消えないようです。特に初期データが不要であればdrop()でコレクションの削除だけ実行しておきます。

@RollbackProcessor.subscribe()
def process_rollback(max_block_index):

処理の流れ

起動時にStartUpProcessorが実行され、ブロックが追いつくとCaughtUpProcessorが実行されます。

未同期の場合、この間で過去のすべてのメッセージに対してMessageProcessorが実行され、MongoDBに格納されたデータなどが最新になります。 二度目以降の起動である程度同期が終わっている場合も、そこから現在までのメッセージに対してMessageProcessorが動きます。

CaughtUpProcessor以降はトランザクションがあればまずはMempoolに入るのでMempoolMessageProcessorがメッセージの数だけ実行され、ブロックが生成されるとMessageProcessorがメッセージの数だけ実行されます。

ブロック毎にMessageProcessorがすべて完了するとBlockProcessorが実行されます。

なお、reparseするとまずはじめにRollbackProcessorが実行され、ここでMongoDBの初期化などが実行されます。その後StartUpProcessorが呼ばれます。

なお、良く分かりませんが、StartUpProcessorが実行されてからCaughtUpProcessorが実行されるまでの間でWARNING:database: Pruning to blockのログが流れたあたりで毎回RollbackProcessorが実行されていました。これはmax_block_index付きです。

その他利用できる機能

JSON RPC APIの追加

以下のような構文でcounterblockAPIメソッドを追加出来ます。 引数は無くてもいいですし、必要であれば任意の変数名を設定出来ます。

from counterblock.lib.processor import API

@API.add_method
def custom_api(param):

APIメソッドを追加すると、以下のようにJSON RPC APIから呼び出せるようになります。

curl -s -X POST --data '{"jsonrpc":"2.0","id":1,"method":"custom_api", "params":{"param":"testparams"}}' http://localhost:4100

start_task

start_taskを利用すると、別スレッドで実行するタスクを開始出来ます。delayを設定しなければ即時実行します。

以下ではrun_my_task関数を1分後に実行します。

from counterblock.lib.processor import start_task

start_task(run_my_task, delay=60)

ビルトインのモジュールでは、delay無しで即時実行した関数内で、delay有りで再起で呼び出して、定期実行するような使い方がされていました。

config.state

counterblock.libからconfigをインポートすると、config.stateに格納された内部状態にアクセス出来ます。

from counterblock.lib import config

内容は以下のようなオブジェクトです。

{
    'cw_last_message_seq': 174,
    'cur_block': {'block_index': 0},
    'my_latest_block': {
        '_id': ObjectId('5a6621decc5d12000700a7dc'),
        'block_hash': '800309c298589241559b5fbe008140087e11f05ac6103e811aa9c52c6ccd07fb',
        'block_time': datetime.datetime(2018, 1, 22, 17, 39, 25),
        'block_index': 1224710
    },
    'cp_caught_up': True,
    'caught_up_started_events': True,
    'cp_backend_block_index': 1224710,
    'caught_up': True,
    'last_message_index': 70983,
    'cp_latest_block_index': 1224710
}

modules.conf

以下にあるmodules.confを編集すると、作成したカスタムモジュールを有効にしたり、ビルトインモジュールを無効にしたり出来ます。

federatednode/config/counterblock/modules.conf

カスタムモジュールの読み込み

作成したカスタムモジュールをロードするには、まずcounterblockコンテナの任意のディレクトリにモジュールを保存します。

ホストからコンテナにcpコマンドなどでファイルをコピーします。

sudo docker cp hostdir/my_module.py federatednode_counterblock_1:/counterblock/counterblock/lib/modules/my_module.py

ファイルを保存したら、modules.confを編集します。

[LoadModule]の下にビルトインモジュールのパスが並んでいるので、その下にでもカスタムモジュールのパスをcounterblockbase-dirに対する相対パスで指定します。

[LoadModule]
lib/modules/assets = True
lib/modules/counterwallet = True
lib/modules/dex = True
lib/modules/transaction_stats = True
lib/modules/betting = True
lib/modules/my_module = True

上記設定ではmy_module.pymy_moduleフォルダの__init__.pyを探しにいくようです。

デフォルト動作の変更

modules.confではデフォルト機能の有効、無効や優先順位の変更も出来ます。各プロセッサから呼ばれる関数単位で設定出来ます。

有効、無効はbool値、優先順位はint値で設定し、タプルで渡すことも出来ます。

[MessageProcessor]
handle_exceptional = True
handle_invalid = True
parse_insert = True
handle_reorg = True
parse_issuance = 10, True
parse_balance_change = True
parse_trade_book = True
parse_broadcast = True

一連の作業の一部だけ止めるようなことが出来るので、影響をちゃんと考えないとおいそれとは無効には出来なそうです。ドキュメントには良く分からないなら触るなと書いてありました。


これで基本的な作りは理解出来た気がするので、次回は実際に動くものを作ってみます。