MonapartyのAPIを拡張する

今回は実際にcounterblockのカスタムモジュールを作成してみます。

xchain.ioAPIの中からBurnsを題材にし、それに対応したJSON RPC APIを実装します。

APIの定義確認

Endpoint

xchain.ioAPIではアドレスとブロックナンバーで検索出来ますが、今回はアドレスでの検索に絞ります。

Method Endpoint Returns
GET /api/burns/{address} Returns list of 'Burn' transactions
GET /api/burns/{block} Returns list of 'Burn' transactions

Paging

ページングにも対応していて、触ってみたところ最大値は500のようです。

Method Endpoint
GET endpoint/{page}/{limit}

Return Values

burnedearnedStringになっていますが、これは恐らく内部的にsatoshi単位で持っているものをdivisibleに応じて変換して返すためだと思われます。feeなどもすべて文字列でした。

Value Type Description
data Array Broadcasts data
block_index Integer Block number containing the transaction
burned String The amount of Bitcoin (BTC) burned
earned String The amount of Counterparty (XCP) earned
source String Source address where broadcast originated
status String Status of the transaction
timestamp Integer A UNIX timestamp of when the transaction was processed by the network
tx_hash String Transaction Hash
tx_index Integer Transaction Index
total Integer Total number of burns

Example Response

{
    "data": [{
            "block_index": 283809,
            "burned": "1.00000000",
            "earned": "1000.09090909",
            "source": "1EU6VM7zkA9qDw8ReFKHRpSSHJvbuXYNhq",
            "status": "valid",
            "timestamp": 1492254524,
            "tx_hash": "ad6609edbdb3b951627302f65df06636f2535680d69d2ee98f59af05cedf0d94",
            "tx_index": 3069
        }
    ],
    "total": 7
}

データベースの確認

テーブル

sqlite3をインストールして、counterpartyのデータベースに対して.tableコマンドでテーブル一覧を確認してみます。

sudo sqlite3 /var/lib/docker/volumes/federatednode_counterparty-data/_data/monaparty.db
sqlite> .table
addresses                contracts                orders
assets                   credits                  postqueue
balances                 debits                   rps
bet_expirations          destructions             rps_expirations
bet_match_expirations    dividends                rps_match_expirations
bet_match_resolutions    executions               rps_matches
bet_matches              issuances                rpsresolves
bets                     mempool                  sends
blocks                   messages                 storage
broadcasts               nonces                   suicides
btcpays                  order_expirations        transactions
burns                    order_match_expirations  undolog
cancels                  order_matches            undolog_block

今回ターゲットになるテーブルはburnsになるかと思われます。

テーブルの構造

次にburnsテーブルに対して.schemaコマンドで構造を確認します。(一部省略)

sqlite> .schema burns

CREATE TABLE burns(
                      tx_index INTEGER PRIMARY KEY,
                      tx_hash TEXT UNIQUE,
                      block_index INTEGER,
                      source TEXT,
                      burned INTEGER,
                      earned INTEGER,
                      status TEXT);

これだけではtimestampが足りないので、block_indexからtimestampを取ってこれそうなblocksテーブルについても構造を確認します。

sqlite> .schema blocks

CREATE TABLE blocks(
                      block_index INTEGER UNIQUE,
                      block_hash TEXT UNIQUE,
                      block_time INTEGER,
                      previous_block_hash TEXT UNIQUE,
                      difficulty INTEGER,
                      ledger_hash TEXT,
                      txlist_hash TEXT,
                      messages_hash TEXT);

実装

とりあえず最終的なjsonをそのまま返すAPIを作成します。

実際にはcounterblockは直接公開せずにNginxからNode.jsあたりに流してそこから呼ぶような感じになると思いますので、もう少し汎用的なAPIにして呼び出し元で成形するほうが良いのかもしれません。このあたりはThe手探りです。

@API.add_method

関数に@API.add_methodデコレータを付けることで、JSON RPC APIで呼べるようになります。

util.call_jsonrpc_api

モジュール内からCounterparty APIを呼ぶにはutil.call_jsonrpc_apiを使います。

メソッドはsqlを指定し、queryに生のSQLを入れたオブジェクトを渡すと、counterpartyのデータベースに対してSQLを直接実行出来ます。これは、Counterblock APIproxy_to_counterpartydからでは呼べないAPIです。

blockchain.normalize_quantity

burnedearnedノーマライズした上で文字列で返したいので、blockchain.normalize_quantityノーマライズしてから小数点以下8桁付きの文字列に変換します。

コード

my_api.py

from counterblock.lib import util ,blockchain
from counterblock.lib.processor import API

@API.add_method
def get_burns_from_address(address, offset=0, limit=500):
    
    if limit > 500:
        limit = 500
    elif limit < 0:
        limit = 0

    data_sql = "select burns.*, blocks.block_time as timestamp"
    data_sql += " from burns"
    data_sql += " inner join blocks"
    data_sql += " on burns.block_index = blocks.block_index"
    data_sql += " and burns.source = '" + address + "'"
    data_sql += " order by block_index DESC"
    data_sql += " limit " + str(limit) + " offset " + str(offset)

    data_body = util.call_jsonrpc_api("sql", {"query": data_sql}, abort_on_error=True)["result"]

    for x in data_body:
        x["burned"] = "{:.8f}".format(blockchain.normalize_quantity(x["burned"], True))
        x["earned"] = "{:.8f}".format(blockchain.normalize_quantity(x["earned"], True))

    total_sql = "select count(tx_index) as total"
    total_sql += " from burns"
    total_sql += " where source = '" + address + "'"

    total_count = util.call_jsonrpc_api("sql", {"query": total_sql}, abort_on_error=True)["result"][0]["total"]

    return {"data": data_body, "total": total_count}

modules.confの設定

作成したmy_api.pycounterblockコンテナ内にコピーします。 一旦母艦から~/hostdirに放り込んだものを、counterblock/lib配下に作成したcustom_modulesディレクトリにコピーしました。

この場合のcustom_modulesディレクトリの名称、位置や、モジュールのファイル名は、別の場所でも別の名前でも問題ありません。

sudo docker cp hostdir/my_api.py federatednode_counterblock_1:/counterblock/counterblock/lib/custom_modules/my_api.py

modules.confに先ほどコピーしたファイルの位置を追記します。

nano federatednode/config/counterblock/modules.conf

[LoadModule]
lib/modules/assets = True
lib/modules/counterwallet = True
lib/modules/dex = True
lib/modules/transaction_stats = True
lib/modules/betting = True
lib/custom_modules/my_api = True

動作確認

counterblockを再起動して作成したモジュールを有効にします。 これはmodules.confを更新した場合だけではなく、後からmy_api.pyを更新した場合にも再読み込みが必要です。

再起動が終わったらAPIにアクセスしてみます。

fednode restart counterblock

curl -s -X POST --data '{"jsonrpc":"2.0","id":1,"method":"get_burns_from_address","params":{"address":"MCwt89zvuPHaCvHLmY1fvgfoQKot1BApd5","offset":0,"limit":100}}' http://localhost:4100

{"id": 1, "jsonrpc": "2.0", "result": {"total": 1, "data": [{"timestamp": 1511081192, "source": "MCwt89zvuPHaCvHLmY1fvgfoQKot1BApd5", "tx_index": 195, "block_index": 1166003, "status": "valid", "tx_hash": "9f6fd3b04e0f2a54b99d4227aaac660c8dc291df66b74274e87153bfb4394a72", "earned": "1499.88840000", "burned": "1.00000000"}]}}

それっぽいレスポンスが返ってきました。

MonapartyのCounterblockモジュールを作成する

今回は実際にカスタムモジュールを作成していく前に、カスタムモジュールの作成に必要な、counterblockプラグインアーキテクチャを理解する事から始めます。

基本的には公式のドキュメントに沿って、ビルトインモジュールのコードを参考にしつつまとめました。

正直、解説するほど深くは理解していないのですが、調べたことは何かしら残しておかないと忘れるので。

各種プロセッサ

counterblockの起動時やブロック生成時などをトリガーに特定の処理を実行するため、いくつかのプロセッサが用意されています。

それぞれcounterblock.lib.processorからインポートして利用し、以下の構文に従います。

from counterblock.lib.processor import <processor_name>

@<processor_name>.subscribe(enabled=<bool>, priority=<int>)
def my_function(param1, param2):
    bla = do_foo()

subscribeの引数を指定しない場合、デフォルトはenabled = truepriority= 0です。

priorityについては、同じプロセッサから呼ばれる複数の関数の実行順を決定するものです。メッセージのカテゴリーごとに処理する関数を分けてpriorityでその実行準を制御するような使い方のようです。

MessageProcessor

MessageProcessorは、counterblockのメッセージ毎に実行されます。

ちなみにトランザクション毎ではなくメッセージ毎です。例えば一つのアセット送信トランザクションの場合、debitscreditssendsの3つのメッセージを受け取ったことになります。

@MessageProcessor.subscribe()
def process_message(msg, msg_data):

定義する関数はmsgmsg_dataの二つの値を受け取ることが出来ます。

msgはCounterparty APIget_messagesの戻り値と同じフォーマットでメッセージの中身が入っていて、msg_datamsgbindingsの中身が入っています。

ビルトインのモジュールを眺めた感じだと、counterblock側で独自に保持しているデータは新しいメッセージを受け取る度にここでMongoDBに保存したり既存データの更新をしているようです。

なお、定義した関数からABORT_THIS_MESSAGE_PROCESSINGをreturnすると、現在処理されているメッセージの他の優先度が低いMessageProcessorを実行されないようにすることができます。

BlockProcessor

BlockProcessorは、ブロック生成毎にすべてのMessageProcessor関数が完了した後に1回実行されます。

@BlockProcessor.subscribe()
def process_block():

MempoolMessageProcessor

MempoolMessageProcessorは、承認前のトランザクションがMempoolに入った時点で実行されます。MessageProcessorと同じくメッセージ毎に実行されますが、同じトランザクションがMempoolに入った時点とブロックに取り込まれた時点のメッセージ数は異なります。

@MempoolMessageProcessor.subscribe()
def process_mempool_message(msg, msg_data):

StartUpProcessor

StartUpProcessorは、counterblockの起動時に1回実行されます。 ここでデータベースにインデックスを作成したりなどの初期化処理を実行します。

ただし、初期化処理といってもcounterblockを再起動するたびに呼ばれるので、テーブルを初期化したりすると同期済ブロックのMessageProcessorで実行されたはずのデータは消えます。実行しても問題無いかしっかり判定した上で実行するか、その手の初期化処理はRollbackProcessorで定義します。

@StartUpProcessor.subscribe()
def process_start_up():

CaughtUpProcessor

CaughtUpProcessorは、counterblockが最新のcounterpartydブロックまで追いついた段階で実行されます。 しばらく眺めていた感じだと起動してから一度しか実行されないようです。

@CaughtUpProcessor.subscribe()
def process_caught_up():

ビルトインのモジュールでは、ここからstart_taskで非同期で繰り返し実行するような処理を開始しているようです。 例えば、定期的に拡張アセット情報の取得が必要なアセットについて、データを取得しに行ったりなど。

RollbackProcessor

RollbackProcessorは、reorgが発生した時など、ブロックチェーンデータベースがロールバックされるたびに実行されます。MessageProcessorでMongoDBに保存しているデータは、不整合が起きないようここでロールバックします。

定義する関数はmax_block_indexを受け取ることが出来ます。ビルトインのモジュールでは、この値があればそこまでロールバックし、なければ全データをロールバックするような作りになっていました。

reparseした場合まずはじめにmax_block_index無しでRollbackProcessorが実行されるので、データベースの初期化処理などはここに定義しておきます。reparseすればデータベースの中身は勝手に消えるかと思いましたが消えないようです。特に初期データが不要であればdrop()でコレクションの削除だけ実行しておきます。

@RollbackProcessor.subscribe()
def process_rollback(max_block_index):

処理の流れ

起動時にStartUpProcessorが実行され、ブロックが追いつくとCaughtUpProcessorが実行されます。

未同期の場合、この間で過去のすべてのメッセージに対してMessageProcessorが実行され、MongoDBに格納されたデータなどが最新になります。 二度目以降の起動である程度同期が終わっている場合も、そこから現在までのメッセージに対してMessageProcessorが動きます。

CaughtUpProcessor以降はトランザクションがあればまずはMempoolに入るのでMempoolMessageProcessorがメッセージの数だけ実行され、ブロックが生成されるとMessageProcessorがメッセージの数だけ実行されます。

ブロック毎にMessageProcessorがすべて完了するとBlockProcessorが実行されます。

なお、reparseするとまずはじめにRollbackProcessorが実行され、ここでMongoDBの初期化などが実行されます。その後StartUpProcessorが呼ばれます。

なお、良く分かりませんが、StartUpProcessorが実行されてからCaughtUpProcessorが実行されるまでの間でWARNING:database: Pruning to blockのログが流れたあたりで毎回RollbackProcessorが実行されていました。これはmax_block_index付きです。

その他利用できる機能

JSON RPC APIの追加

以下のような構文でcounterblockAPIメソッドを追加出来ます。 引数は無くてもいいですし、必要であれば任意の変数名を設定出来ます。

from counterblock.lib.processor import API

@API.add_method
def custom_api(param):

APIメソッドを追加すると、以下のようにJSON RPC APIから呼び出せるようになります。

curl -s -X POST --data '{"jsonrpc":"2.0","id":1,"method":"custom_api", "params":{"param":"testparams"}}' http://localhost:4100

start_task

start_taskを利用すると、別スレッドで実行するタスクを開始出来ます。delayを設定しなければ即時実行します。

以下ではrun_my_task関数を1分後に実行します。

from counterblock.lib.processor import start_task

start_task(run_my_task, delay=60)

ビルトインのモジュールでは、delay無しで即時実行した関数内で、delay有りで再起で呼び出して、定期実行するような使い方がされていました。

config.state

counterblock.libからconfigをインポートすると、config.stateに格納された内部状態にアクセス出来ます。

from counterblock.lib import config

内容は以下のようなオブジェクトです。

{
    'cw_last_message_seq': 174,
    'cur_block': {'block_index': 0},
    'my_latest_block': {
        '_id': ObjectId('5a6621decc5d12000700a7dc'),
        'block_hash': '800309c298589241559b5fbe008140087e11f05ac6103e811aa9c52c6ccd07fb',
        'block_time': datetime.datetime(2018, 1, 22, 17, 39, 25),
        'block_index': 1224710
    },
    'cp_caught_up': True,
    'caught_up_started_events': True,
    'cp_backend_block_index': 1224710,
    'caught_up': True,
    'last_message_index': 70983,
    'cp_latest_block_index': 1224710
}

modules.conf

以下にあるmodules.confを編集すると、作成したカスタムモジュールを有効にしたり、ビルトインモジュールを無効にしたり出来ます。

federatednode/config/counterblock/modules.conf

カスタムモジュールの読み込み

作成したカスタムモジュールをロードするには、まずcounterblockコンテナの任意のディレクトリにモジュールを保存します。

ホストからコンテナにcpコマンドなどでファイルをコピーします。

sudo docker cp hostdir/my_module.py federatednode_counterblock_1:/counterblock/counterblock/lib/modules/my_module.py

ファイルを保存したら、modules.confを編集します。

[LoadModule]の下にビルトインモジュールのパスが並んでいるので、その下にでもカスタムモジュールのパスをcounterblockbase-dirに対する相対パスで指定します。

[LoadModule]
lib/modules/assets = True
lib/modules/counterwallet = True
lib/modules/dex = True
lib/modules/transaction_stats = True
lib/modules/betting = True
lib/modules/my_module = True

上記設定ではmy_module.pymy_moduleフォルダの__init__.pyを探しにいくようです。

デフォルト動作の変更

modules.confではデフォルト機能の有効、無効や優先順位の変更も出来ます。各プロセッサから呼ばれる関数単位で設定出来ます。

有効、無効はbool値、優先順位はint値で設定し、タプルで渡すことも出来ます。

[MessageProcessor]
handle_exceptional = True
handle_invalid = True
parse_insert = True
handle_reorg = True
parse_issuance = 10, True
parse_balance_change = True
parse_trade_book = True
parse_broadcast = True

一連の作業の一部だけ止めるようなことが出来るので、影響をちゃんと考えないとおいそれとは無効には出来なそうです。ドキュメントには良く分からないなら触るなと書いてありました。


これで基本的な作りは理解出来た気がするので、次回は実際に動くものを作ってみます。

MonapartyのFederated Nodeをインストールする

MonapartyのFederated Nodeをインストールしてみたので手順を残しておきます。

特にハマるところもなく、基本的には以下Counterpartyのドキュメント通りですんなりインストール出来たのでこれと言って特筆する点はありませんが。。

Running a Node | Counterparty

環境はまっさらなUbuntu Server(16.04.3 LTS)です。

事前準備

システムアップデートと依存ライブラリのインストールを行います。 この辺のライブラリは元から入ってた気もしますが、実際どうだったかは忘れました。

sudo apt-get update && sudo apt-get upgrade
sudo apt-get -y install git curl coreutils

次にDockerをインストールします。

sudo -i 
curl -fsSL https://get.docker.com/ | sh
curl -L https://github.com/docker/compose/releases/download/1.16.1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
exit

インストール

ドキュメントはCounterpartyのものなので、リポジトリをMonapartyのものに変更します。

git clone https://github.com/monaparty/federatednode.git
cd federatednode
sudo ln -sf `pwd`/fednode.py /usr/local/bin/fednode

インストールする際には、希望する構成に合わせてCONFIGを変更します。

fednode install <CONFIG> <BRANCH>

CONFIGをbaseにした場合はcounterparty-servermonacoindのみで、counterblockにするとcounterblock関連が追加されます。 counterwalletなども含んだすべてのサービスをインストールする場合はfullを選択します。

どうやらcounterblock経由で諸々触るのがセオリーのようなので、CONFIGはcounterblockでインストールしました。

fednode install counterblock develop

セキュリティの強化

Ubuntuの場合、セキュリティの設定を一発でやってくれるスクリプトが用意されています。

内容はnospoofunattended-upgradessysctlfail2banpsadchkrootkitrkhunterlogwatchapparmorauditdiwatchなどなどをまとめて設定できます。 詳細はここ

cd extras/host_security
sudo ./run.py

管理

サービスの状態確認

インストールが完了すると自動的に起動するのでpsコマンドで各サービスの状態を確認します。 StateがUpになっているのが分かります。

fednode ps

                Name                              Command               State                                               Ports
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
federatednode_counterblock-testnet_1   start.sh                         Up      0.0.0.0:14100->14100/tcp, 0.0.0.0:14101->14101/tcp, 0.0.0.0:14102->14102/tcp, 4100/tcp,
                                                                                4101/tcp, 4102/tcp
federatednode_counterblock_1           start.sh                         Up      14100/tcp, 14101/tcp, 14102/tcp, 0.0.0.0:4100->4100/tcp, 4101/tcp, 4102/tcp
federatednode_counterparty-testnet_1   start.sh                         Up      0.0.0.0:14000->14000/tcp, 4000/tcp
federatednode_counterparty_1           start.sh                         Up      14000/tcp, 0.0.0.0:4000->4000/tcp
federatednode_monacoin-testnet_1       start.sh                         Up      0.0.0.0:19402->19402/tcp, 19403/tcp, 9401/tcp, 9402/tcp
federatednode_monacoin_1               start.sh                         Up      19402/tcp, 19403/tcp, 9401/tcp, 0.0.0.0:9402->9402/tcp
federatednode_mongodb_1                docker-entrypoint.sh mongod      Up      127.0.0.1:27017->27017/tcp
federatednode_redis_1                  docker-entrypoint.sh redis ...   Up      6379/tcp

データの保存場所

各データは以下にfederatednode_bitcoin-datafederatednode_counterparty-dataなどのディレクトリがあり、そこに保存されていました。

/var/lib/docker/volumes

ログの確認

tailコマンドやlogsコマンドで各サービスのログを確認出来ます。 サービスを指定しなければ、すべてのサービスのログが流れてきます。

fednode tail <service>
fednode logs <service>

monacoinのログを確認するには以下のようにサービスを指定します。

fednode tail monacoin

Attaching to federatednode_monacoin_1
monacoin_1              | 2018-01-17 12:27:44 UpdateTip: new best=9df4d83ebc67172601b543fdef232b2818289f216215876f5494fac0c5a1e753 height=3053 version=0x00000002 log2_work=33.168825 tx=3060 date='2014-01-01 00:46:37' progress=0.000617 cache=0.6MiB(3058tx)

コンテナの起動、停止

confファイルを更新した場合など、サービスの再起動が必要になります。 サービス名を指定しなければ全サービスに対して実行します。

fednode stop <service>
fednode start <service>
fednode restart <service>

シェルコマンドの実行

execコマンドで各サービスのコマンドを実行出来ます。

fednode exec <service> <CMD>

monacoin-cliを実行するには以下のようにします。

fednode exec monacoin monacoin-cli getinfo

{
  "version": 130202,
  "protocolversion": 70015,
  "walletversion": 130000,
  "balance": 0.00000000,
  "blocks": 8328,
  "timeoffset": 0,
  "connections": 2,
  "proxy": "",
  "difficulty": 1.690091809366619,
  "testnet": false,
  "keypoololdest": 1516190275,
  "keypoolsize": 100,
  "paytxfee": 0.00000000,
  "relayfee": 0.00100000,
  "errors": "Upgrade to Monacoin 0.14.2 https://monacoin.org/"
}

ちなみに各コンテナのシェルに入るにはshellコマンドで入り、exitで抜けます。

fednode shell <service>

APIの実行

同期が終わったらCounterblock APIも試してみます。

curl -s -X POST --data '{"jsonrpc":"2.0","id":1,"method":"get_chain_block_height"}' http://localhost:4100

{"result": 1222864, "id": 1, "jsonrpc": "2.0"}

とりあえず問題なく動きました。

次はCounterblockのカスタムモジュールについて色々と触ってみます。

一応ゴールとしてはブロックエクスプローラ的なものでも作って公開したいと思っているんですが、分からない事が多すぎて到達出来るかは非常に怪しい感じです。。

soon...

生焼けMonapartyのサルベージ手順メモ

もともとのCounterpartyのバグがそのまま残ったもので、Burnのトランザクションのfeeが少なすぎてUnconfirmedから抜け出せないといったことが確率で発生するという話のようです。

以下の手順でサルベージ出来たので手順を残しておきます。 ※何か問題が起きても責任取れませんので自己責任でお願いします。

MonapartyのWalletで該当アドレスのAddress ActionsからShow Private Keyを選択します。

f:id:tadajam:20171119211606p:plain

開いたダイヤログのShow Private Keyから秘密鍵を表示してコピーします。

f:id:tadajam:20171119211654p:plain

Electrum-monaのメニューから、ウォレット秘密鍵スイープを選択します。

f:id:tadajam:20171119211744p:plain

先ほどコピーした秘密鍵を貼り付けてスイープをクリックします。

f:id:tadajam:20171119211838p:plain

発信でブロードキャストします。 この手順でMonaparty側は空になります。

f:id:tadajam:20171119213738p:plain

これでサルベージ完了なので、そのまま新しいMonapartyのアドレスに送金してから再度焼きました。

deeplearn.jsを使ってKaggleのタイタニックをやってみる

唐突ですが、Numeraiをナンピンし無限に損をし続けるにも限界があります。

そう、BTCがもう無いのです。そして心も折れました。

そんな沼底でGBYTEと共に耐え忍ぶ以外の方法を模索し、たどり着いた結論は損切りではなく「データサイエンティストになってNMRを貰おう」でした。

まぁ実際にはそこまで機械学習の沼は浅くないんでしょうが、前から興味があったので良い機会ということで手を出してみました。

ついでに裁量取引ではひどい目にばかり合うので、自動売買のBOTとか作ってみたいという思いもあり。

ということでここから暗号通貨は一切関係無しです。

deeplearn.js

まずは初めの一歩ということで、googleがらみの機械学習ライブラリdeeplearn.jsを少し触ってみました。

deeplearn.jsはブラウザからWebGLを介してGPUを使った機械学習が出来る優れもので、TensorFlowを触るより手軽なんじゃないかと考えて手を出しました。

しかし、機械学習初心者が初めに触るには少しハードルが高く、結局は機械学習の作法について理解するために、最も情報の多いTensorFlowを一から勉強する羽目になりました。

TensorGraphSessionあたりの概念はほぼ同じようなので無駄にはなりませんが、ライブラリでやれることもまだ少ないようなので、これから機械学習に手を出そうという人は素直にTensorFlow触ったほうがいいと思います。

Kaggle

今回は機械学習Hello worldこと、KaggleのTitanicを試してみます。

Kaggleは機械学習のアイデアのコンペをやっているサイトで、チュートリアル的なコンペがいくつかあり、そのうちの1つTitanicをやってみます。

Titanic: Machine Learning from Disaster | Kaggle

タイタニック号の乗員名簿からその生死を予測するというもので、必要なデータは以下にあるtrain.csvtest.csvです。

Titanic: Machine Learning from Disaster | Kaggle

データの前処理

データの前処理は非常に重要です。 むしろこちらが本番なのではないかという気すらします。

初めはこの辺りをあまり考えず、とりあえず数値になっていればいいだろうという考えでやってみましたが、全くうまくいきませんでした。

不要なデータの削除

以下は有用性が低い、もしくは扱いづらいデータです。

  • PassengerId
  • Name
  • Ticket
  • Cabin

使えないデータは列ごと削除します。 Cabinはすごく有用だと思われますが、欠損が多いので除外します。

ただ、マジな人々はTicketからCabinを割り出したり、Nameにある敬称からAgeを割り出したり色々使いどころはあるようです。

欠損値補完

以下はデータに欠損があります。

  • Age
  • Fare
  • Cabin
  • Embarked

平均値や中央値で補完するケースや、欠損のある列を除外するケース、欠損のある行を除外するケース、また機械学習で欠損している部分を推定するケースなど様々なようです。

今回は面倒なのでCabinは捨て、他は平均値で補完します。

標準化

以下はスケールの異なる数値データです。

  • Age
  • SibSp
  • Parch
  • Fare

スケールの異なるデータ間で数値の大小が極端に大きい場合、特定の項目の影響が大きくなりすぎるというようなことが起こるようで、それを避けるために各次元のスケールを合わせます。

一般的にはZスコアが利用されるそうです。

(x - 平均) / 標準偏差

データセットによっては、時系列データなど標準化すべきではないものもあります。

ダミー変数化

以下は数値データではありません。 Pclassは数値ですが、社会階級High、Middle、Lowを表しています。

  • Pclass
  • Sex
  • Embarked

こういったデータを扱う場合は、次元を拡張しmaleの場合[1, 0]、femaleの場合[0, 1]のように別の次元で表現します。

交差検証

過学習が起きていない事、汎化性能が高いことを検証するために、訓練データとテストデータを分け、訓練には訓練データを使い、精度の検証には訓練に使っていないテストデータを使います。

train.csvの前処理が終わったら、訓練データtrainXtrainYとテストデータtestXtestYに分割しておきます。 Xは前処理で作成した各種データの配列で、Yは答えとなるSurvivedの配列です。

deeplearn.jsのインストール

npmでインストールします。

npm install --save deeplearn

必要なものをライブラリからインポートしておきます。

import {
  Array1D,
  Array2D,
  NDArrayMathGPU,
  Scalar,
  Session,
  SGDOptimizer,
  InCPUMemoryShuffledInputProviderBuilder,
  CostReduction,
  Graph,
  Tensor,
  NDArray
} from 'deeplearn';

グラフの構築

Graphを使ってモデルを定義します。

訓練データやテストデータの入れ物になるxtgraph.placeholdershapeのみ決めて定義します。

訓練により最適化されていく変数となるw0b0w1b1graph.variableで初期値と共に定義します。

用意した変数に対してgraph.add(足し算)、graph.matmul(内積)などのメソッドを使って数式を組み、graph.relugraph.sigmoidなどの活性化関数を通して次の層へ出力します。

その他Graphクラスのメソッドについては、以下公式のAPI Referenceに記載されています。

Graph | deeplearn

今回実装したモデルはロジスティック回帰に隠れ層を追加した多層パーセプトロンで、隠れ層の数はいくつか増やしてみたりしたものの結果がいまいちだったので1層だけです。ニューロンの数の増減もあまり良い結果を生みませんでした。

なお、sigmoidに渡すときにreshapeしてあげないとshapeが合わない的なエラーが出て、結構ハマりました。

最後にgraph.meanSquaredCostで損失関数に二乗誤差を指定しています。

const graph: Graph = new Graph();
const x: Tensor = graph.placeholder("x", [12]);
const t: Tensor = graph.placeholder('t', []);

//入力層 - 隠れ層
const w0: Tensor = graph.variable("w0", Array2D.randNormal([12, 12]));
const b0: Tensor = graph.variable("b0", Scalar.randNormal([]));
const h0: Tensor = graph.relu(graph.add(graph.matmul(x, w0), b0));

//隠れ層 - 出力層
const w1: Tensor = graph.variable("w1", Array2D.randNormal([12, 1]));
const b1: Tensor = graph.variable("b1", Scalar.randNormal([]));
const y: Tensor = graph.sigmoid(graph.reshape(graph.add(graph.matmul(h0, w1), b1), []));

const cost: Tensor = graph.meanSquaredCost(y, t);

ちなみにとことんconstで定義している理由は謎です。公式に倣いました。

訓練

訓練(学習)はmath.scopeの中で行います。

訓練データをNDArrayとして構築する際にtrackで追跡することで、scopeの最後で自動的にクリーンアップされるようになります。

InCPUMemoryShuffledInputProviderBuilderで事前にデータをシャッフルします。これ結構重要らしいです。

NUM_BATCHESを増やして訓練を繰り返すほど学習が進んでいきますが、数を倍にしたところであまり結果に差異はありませんでした。

LEARNING_RATEを小さくすると訓練に時間がかかるようになるので、最適なNUM_BATCHESの数も変わりますが、色々試してもあまり顕著に良くなる組み合わせが見つかりませんでした。

オプティマイザはSGD(確率的勾配降下法)しかないようで、それ以外を使いたければ自力で実装するしかなさそうです。

ループの中でsession.trainを実行し、出力される値が徐々に小さくなっていけば訓練が進んでいるということになります。 全く動かない、安定しない、NaNになるというような場合は何かおかしいのでグラフの構築、もしくはデータの前処理から見直したほうが良いです。

const math: NDArrayMathGPU = new NDArrayMathGPU();
const session: Session = new Session(graph, math);

math.scope((keep, track) => {
  const xs: Array1D[] = trainX.map(x => track(Array1D.new(x)));
  const ys: Scalar[] = trainY.map(x => track(Scalar.new(x)));

  const shuffledInputProviderBuilder =
      new InCPUMemoryShuffledInputProviderBuilder([xs, ys]);
  const [xProvider, yProvider] =
      shuffledInputProviderBuilder.getInputProviders();

  const NUM_BATCHES = 500;
  const BATCH_SIZE = xs.length;
  const LEARNING_RATE = 1;
  const optimizer = new SGDOptimizer(LEARNING_RATE);
  for (let i = 0; i < NUM_BATCHES; i++) {
    const costValue = session.train(
        cost,
        [{tensor: x, data: xProvider}, {tensor: t, data: yProvider}],
        BATCH_SIZE, optimizer, CostReduction.MEAN
      );
    console.log("Average cost: " + costValue.get());
  }

推定

事前に作成したテストデータtestXを利用し、session.evalSurvivedを推定し、testYと比較して精度を確認します。

これは訓練のループの中で訓練ごとの精度を確認することも出来ますし、ループを抜けてから最後に実行することも可能です。

for(let i = 0; i < testX.length; i++) {
  const result: NDArray = session.eval(y, [{tensor: x, data: track(Array1D.new(testX[i]))}]);
  let r = result.getValues()[0] > 0.5 ? 1 : 0;
  console.log(r === testY[i]);
}

提出

精度に問題なければtest.csvに同じように前処理を実行し、上記と同じ手順でSurvivedを推定します。

PassengerIdと出力されたSurvivedcsvを作成し、以下ページでアップロードします。

https://www.kaggle.com/c/titanic/submit

結果、Scoreは0.78947でした。

f:id:tadajam:20170922032123p:plain

良いんだか悪いんだか良く分かりませんが、とりあえずベースラインは超えたので良しとします。

そして次は素直にTensorFlow使います。

NEM-libraryを触りながらRxJSを学ぶ

正直言うと今までObservable良く分かんねぇなぁと思いながら使ってました。良く分からないけど、なんかこれオシャレじゃない?的な。

日本語の記事も少ないですし、抽象的な記事が多いのでいまいち理解が進まない。

以下の記事を書く際に参考にしたnem-library-examplesが結構良いサンプルだったので、少し理解が進んだような気がしていましたが、まだ9割ぐらいは雰囲気スクリプターからは抜け出せていませんでした。 tadajam.hateblo.jp

その時参考にしたページ github.com

この辺り詳しい人が誰か教えてくれないかなぁなどと思っていたところ、神が現れました。

NEMの守護神みなりんさん経由で、以下記事を読んでくれたNEM-Libraryの中の人ことAleixさんから連絡があり、もっとイケている書き方を教えてもらいました。 tadajam.hateblo.jp

神々への感謝の気持ちを忘れないために、そしてどこかの誰かの参考になればと記事を残しておきます。

修正前

記事にもともと載せていたソースが以下です。

一つ目のAPIの結果を受けて二つ目のAPIをコールする流れですが、せっかくのRxJSの利点を生かせていません。このどんどんネストしていく感じ、嫌ですね。

一つ目のAPIを呼ぶところでsubscribeして、その結果の配列をforEachでぶん回して二つ目のAPIをそれぞれの要素に対して実行しています。

accountHttp.getMosaicOwnedByAddress(address)
  .subscribe(mosaics => {
    let mosaicHttp: MosaicHttp = new MosaicHttp();
    
    mosaics.filter(x => x.mosaicId.namespaceId !== "nem")
      .forEach(x => {
        mosaicHttp.getMosaicDefinition(x.mosaicId)
          .subscribe(mosaicDefinition => {
            console.log(x.mosaicId, mosaicDefinition);
            console.log(x.mosaicId, x.quantity / (10 ** mosaicDefinition.properties.divisibility));
          });
      });
  });

修正後

getMosaicOwnedByAddressで流れてくるMosaic配列を、flatMapでバラしてそのまま流します。 流れてきたMosaicfilterをかけてxemを除去します。 xem以外のMosaicを使ってgetMosaicDefinitionを呼び、MosaicMosaicDefinitionを一つのオブジェクトにして流します。 ここをmapで流すとsubscribeの中でさらにsubscribeを呼ぶことになるので、flatMapで流します。

accountHttp.getMosaicOwnedByAddress(address)
    .flatMap(_ => _)
    .filter(mosaic => mosaic.mosaicId.namespaceId !== "nem")
    .flatMap(mosaic => {
      return mosaicHttp.getMosaicDefinition(mosaic.mosaicId)
        .map(mosaicDefinition => <any>{
          mosaicOwnedByTheUser: mosaic,
          mosaicDefinition: mosaicDefinition
        })
    })
    .subscribe(mosaicInformation => {
      console.log(mosaicInformation.mosaicDefinition.id, mosaicInformation.mosaicDefinition);
      console.log(mosaicInformation.mosaicDefinition.id, mosaicInformation.mosaicOwnedByTheUser.quantity / (10 ** mosaicInformation.mosaicDefinition.properties.divisibility));
    });

flatMapをうまく使うことで、二つのAPIコールを一つの流れで処理出来ました。

なんだかすごく(3割ぐらい?w)RxJSが分かった気分になってきました。

Thank you Aleix!

NEM Libraryを使ってみる(3) ウォレットの管理

NEM Libraryにはウォレットを管理するクラスも用意されています。

Walletの生成

秘密鍵からアドレスを生成する場合はWalletクラスを継承したSimpleWalletクラスを利用します。

let password: Password = new Password("passwordstr");
let privateKey: string = "privatekeyxxxxxxxxxxxxxxxxxxxxxxxx";

let simpleWallet: SimpleWallet = SimpleWallet.createWithPrivateKey("nextem test", password, privateKey);

生成されたSimpleWalletは以下のようなオブジェクトになり、秘密鍵は暗号化されています。

{
    "name":"nextem test",
    "network":104,"
    address": {
        "value":"ND6VJMWYX7CZRPLYXX566DP2Z6O7ZKYG745EV2AX",
        "networkType":104
    },
    "creationDate":"2017-08-26T11:29:37.729",
    "schema":1,
    "encryptedPrivateKey": {
        "encryptedKey":"encryptedxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
        "iv":"ivxxxxxxxxxxxxxxxxxxxxxxxxx"
    }
}

ちなみに秘密鍵をランダム生成するには、SimpleWalletクラスのcreateメソッドを利用します。

let simpleWallet: SimpleWallet = SimpleWallet.create("random wallet", password);

Walletの利用

公開鍵や秘密鍵を利用するには、openメソッドでAccountクラスを作成して利用します。 秘密鍵はprivateになっているので、秘密鍵を直接操作するのではなく、戻ってくるAccountクラスの各メソッドを利用してトランザクションへの署名や、メッセージの暗号化、復号化を実施します。

let account: Account = simpleWallet.open(password);
account.signTransaction(transaction)

バックアップ

WalletのバックアップはwriteWLTFileメソッドを利用します。 stringが返ってくるので適当に保存します。

let mimetype = 'application/octet-stream';
let url = window.URL.createObjectURL(new Blob([simpleWallet.writeWLTFile()], { 'type': mimetype }));
let a = document.createElement('a');

a.target = '_blank';
a.download = simpleWallet.name + ".wlt";
a.href = url;
a.click();

バックアップからの復元

wltファイルからWalletを復元するには、wltファイルの文字列を抽出した上でreadFromWLTメソッドを利用します。

なお、NanoWalletから出力したwltファイルは形式が異なるようで読み込めませんでした。

let simpleWallet = SimpleWallet.readFromWLT("wltstrxxxxxxxxxxxxxxxxxxxxxxxxxxxx");

このあたり自前で実装するのも面倒なので、WEBサイトなどに導入する際には地味に便利なのではないでしょうか。


※筆者のモチベーション向上のため、以下NEMアドレスへxemなりシットトークンなりの寄付を受け付けています。

NDY4RH-UZ3CZO-Z53O5H-NEXTEM-7UF5X3-MMDGH4-IMAD