Aerospikeの話。
UDFを使う機会があったのでその基本的な操作方法とかをメモとして残しておく。
UDFとは何か?
User-Defined Functionsを略してUDF。
ユーザー定義関数でluaを使ってかく。
公式のドキュメントだとこのへんに書いてある。
AerospikeのUDFは2種類あってRecord UDFとStream UDFがある。
Record UDFはkeyに対する処理でrecordをinsertしたりupdateしたり出来る。
Stream UDFはクエリの結果を分散して処理し集約してくれます。
今回はRecord UDFを使ってkeyに紐づくレコードの操作をしていきます。
準備
Aerospikeをまず始めに立てる必要があるのですが、公式がdocker imageを公開しているので、それを使います。
# デーモンとして起動しport3000をポートフォワーディングする $ docker run -d --rm -p 3000:3000 aerospike/aerospike-server:3.14.0
後はコンテナIDを指定してコンテナに入ってaql
とかを叩けばshow namespaces
とかクエリを発行出来るようになるはずです。
$ docker exec -it [コンテナID] bash -l $ aql aql> show namespaces +------------+ | namespaces | +------------+ | "test" | +------------+ 1 row in set (0.002 secs) OK
UDFを準備する
Aerospike上で下記のluaのプログラムを用意します。
やってることはシンプルで引数として渡されたvalue
が数値で100より下の場合にnum
というbinにたいしてその値を入れます。すでにrecordが存在する場合はupdateを行って、無ければinsertしています。
今回は保存パスとして/root/udfs/save.lua
にしたとします。
function saveIntValue(rec, value) if type(value) ~= "number" then local msg = string.format("%s<%s> not number", value, type(value)) warn(msg) error(msg) end if value > 100 then local msg = string.format("large value %d", value) warn(msg) error(msg) end rec["num"] = value local rc = 0 if aerospike:exists(rec) then rc = aerospike:update(rec) else rc = aerospike:create(rec) end if rc ~= nil and rc ~= 0 then local msg = string.format("save record failed. rc: %s", tostring(rc)) warn(msg) error(msg) end return 0 end
UDFをAerospikeに登録する
登録はaql上からも行うことが出来て、ファイルのある絶対パス、相対パスを指定して登録することが出来る。
aql> register module '/root/udfs/save.lua' OK, 1 module added.
登録されたUDF一覧を見るときはSHOW MODULES
で見ることが出来る。
UDFを呼び出してみる
aql
aqlでも呼び出すことは出来る。
詳しくは公式のdocを参照してください。
aql> execute save.saveIntValue(3) on test.hoge where PK = 1 +--------------+ | saveIntValue | +--------------+ | 0 | +--------------+ 1 row in set (0.001 secs) # 100以上を指定するとerrorが返ってくる aql> execute save.saveIntValue(300) on test.hoge where PK = 1 Error: (100) /opt/aerospike/usr/udf/lua/save.lua:11: large value 300
Go
たぶんaqlから呼び出すことって開発中とかしかなくて実際に使用する時はGoとかRubyのSDKを使うことが多いと思うので今回はGoから呼び出すことにする。
package main import ( "flag" "fmt" . "github.com/aerospike/aerospike-client-go" ) func main() { var pk = flag.Int("pk", 0, "primary key") var val = flag.Int("val", 0, "set value") flag.Parse() if *pk == 0 { panic("please set -pk") } if *val == 0 { panic("please set -val") } client, err := NewClient("127.0.0.1", 3000) if err != nil { panic(err) } key, err := NewKey("test", "fuga", *pk) if err != nil { panic(err) } _, err = client.Execute(nil, key, "save", "saveIntValue", NewValue(*val)) if err != nil { panic(err) } fmt.Println("SUCCESS") }
pkやvalを実行時に指定したりAerospikeのクライアントを作ったりしてるけど重要なのは最後のほうの
_, err = client.Execute(nil, key, "save", "saveIntValue", NewValue(*val))
ここでUDFを呼び出している。
第1引数はWritePolicyで今回はnilをいれている。
第2引数はKeyをいれており、Goの場合はNewKey(namespace, set, pk)で生成したものを使う
第3引数で今回作成したUDFの名前(パッケージ名)を指定する
第4引数で関数名を指定する
第5以降は関数のrecより後の引数を渡す。今回はvalueなので1つだけ
UDF内でerror
になった時はGo側のerrorとして返ってくるので今回はそれをハンドリングしてpanicで処理をしている。
$ go build -o main $ ./main -pk=3 -val=88 SUCCESS $ ./main -pk=3 -val=1000 panic: /opt/aerospike/usr/udf/lua/save.lua:11: large value 1000
最後に
今使っている限りだとAerospikeはメモリ上にレコードを保持していけるのでメモリ使用量がどんどん増えることはあるけど、
CPUはあまり使ってないので、こういった処理をAerospikeに任せるというのも1つの手だなと思った。