AerospikeでUDFを作ってGoから呼び出す

www.aerospike.com

Aerospikeの話。
UDFを使う機会があったのでその基本的な操作方法とかをメモとして残しておく。

UDFとは何か?

User-Defined Functionsを略してUDF。
ユーザー定義関数でluaを使ってかく。
公式のドキュメントだとこのへんに書いてある。

AerospikeのUDFは2種類あってRecord UDFStream UDFがある。
Record UDFはkeyに対する処理でrecordをinsertしたりupdateしたり出来る。
Stream UDFはクエリの結果を分散して処理し集約してくれます。

今回はRecord UDFを使ってkeyに紐づくレコードの操作をしていきます。

準備

Aerospikeをまず始めに立てる必要があるのですが、公式がdocker imageを公開しているので、それを使います。

# デーモンとして起動しport3000をポートフォワーディングする
$ docker run -d --rm -p 3000:3000 aerospike/aerospike-server:3.14.0

後はコンテナIDを指定してコンテナに入ってaqlとかを叩けばshow namespacesとかクエリを発行出来るようになるはずです。

$ docker exec -it [コンテナID] bash -l 
$ aql
aql> show namespaces
+------------+
| namespaces |
+------------+
| "test"     |
+------------+
1 row in set (0.002 secs)
OK

UDFを準備する

Aerospike上で下記のluaのプログラムを用意します。
やってることはシンプルで引数として渡されたvalueが数値で100より下の場合にnumというbinにたいしてその値を入れます。すでにrecordが存在する場合はupdateを行って、無ければinsertしています。
今回は保存パスとして/root/udfs/save.luaにしたとします。

function saveIntValue(rec, value)
    if type(value) ~= "number" then
        local msg = string.format("%s<%s> not number", value, type(value))
        warn(msg)
        error(msg)
    end

    if value > 100 then
        local msg = string.format("large value %d", value)
        warn(msg)
        error(msg)
    end

    rec["num"] = value
    local rc = 0
    if aerospike:exists(rec) then
        rc = aerospike:update(rec)
    else
        rc = aerospike:create(rec)
    end

    if rc ~= nil and rc ~= 0 then
        local msg = string.format("save record failed. rc: %s", tostring(rc))
        warn(msg)
        error(msg)
    end
    return 0
end

UDFをAerospikeに登録する

登録はaql上からも行うことが出来て、ファイルのある絶対パス相対パスを指定して登録することが出来る。

aql> register module '/root/udfs/save.lua'
OK, 1 module added.

登録されたUDF一覧を見るときはSHOW MODULESで見ることが出来る。

UDFを呼び出してみる

aql

aqlでも呼び出すことは出来る。
詳しくは公式のdocを参照してください。

aql> execute save.saveIntValue(3) on test.hoge where PK = 1 
+--------------+
| saveIntValue |
+--------------+
| 0            |
+--------------+
1 row in set (0.001 secs)

# 100以上を指定するとerrorが返ってくる
aql> execute save.saveIntValue(300) on test.hoge where PK = 1
Error: (100) /opt/aerospike/usr/udf/lua/save.lua:11: large value 300

Go

たぶんaqlから呼び出すことって開発中とかしかなくて実際に使用する時はGoとかRubySDKを使うことが多いと思うので今回はGoから呼び出すことにする。

package main

import (
    "flag"
    "fmt"

    . "github.com/aerospike/aerospike-client-go"
)


func main() {
    var pk = flag.Int("pk", 0, "primary key")
    var val = flag.Int("val", 0, "set value")
    flag.Parse()
    if *pk == 0 {
        panic("please set -pk")
    }
    if *val == 0 {
        panic("please set -val")
    }

    client, err := NewClient("127.0.0.1", 3000)
    if err != nil {
        panic(err)
    }
    key, err := NewKey("test", "fuga", *pk)
    if err != nil {
        panic(err)
    }
    _, err = client.Execute(nil, key, "save", "saveIntValue", NewValue(*val))
    if err != nil {
        panic(err)
    }
    fmt.Println("SUCCESS")
}

pkやvalを実行時に指定したりAerospikeのクライアントを作ったりしてるけど重要なのは最後のほうの

_, err = client.Execute(nil, key, "save", "saveIntValue", NewValue(*val))

ここでUDFを呼び出している。
第1引数はWritePolicyで今回はnilをいれている。
第2引数はKeyをいれており、Goの場合はNewKey(namespace, set, pk)で生成したものを使う 第3引数で今回作成したUDFの名前(パッケージ名)を指定する
第4引数で関数名を指定する 第5以降は関数のrecより後の引数を渡す。今回はvalueなので1つだけ

UDF内でerrorになった時はGo側のerrorとして返ってくるので今回はそれをハンドリングしてpanicで処理をしている。

$ go build -o main
$ ./main -pk=3 -val=88
SUCCESS
$ ./main -pk=3 -val=1000
panic: /opt/aerospike/usr/udf/lua/save.lua:11: large value 1000

最後に

今使っている限りだとAerospikeはメモリ上にレコードを保持していけるのでメモリ使用量がどんどん増えることはあるけど、
CPUはあまり使ってないので、こういった処理をAerospikeに任せるというのも1つの手だなと思った。