RiakをPythonから扱う¶
次は使ってみる。公式ドキュメントそのままなので単なる自分用メモです。
準備¶
% sudo portinstall devel/protobuf # protobufが必要なので。
% pip install riak
ドキュメント¶
基本¶
set¶
import riak
import datetime
# Riakに接続
client = riak.RiakClient()
# bucket選択
bucket = client.bucket('test')
# key, valueのデータ作成
# "riak_developer_1"がkey、dataがvalue
person = bucket.new('riak_developer_1', data={
'name': 'John Smith',
'age': 10,
'company': u'日本語',
'createdat': datetime.datetime.now().isoformat()
})
# Riakに保存
person.store()
保存はUTF-8は大丈夫だけど、datetimeはそのままではできなかった。 isoformat()して保存すればあとで使える。
APIの選択¶
RiakのAPIにはHTTPとProtocol Bufferの二つがあるけど、pythonはその両方共 同じAPIを使う。明示的に指定したい場合は、RiakClientの引数に指定する。
# ProtocolBufferを使う場合
client = riak.RiakClient(port=8087, transport_class=riak.RiakPbcTransport)
# HTTPSを使う場合
client = riak.RiakClient(port=8091, transport_class=riak.RiakHttpsTransport)
port番号は、app.configで指定しているものを使うこと。ProtocolBufferと HTTPSでは違うポート番号のはず。
なお、デフォルトはHTTPで、その場合なにも指定する必要ない。
get¶
# Riakに接続
client = riak.RiakClient()
# bucket選択
bucket = client.bucket('test')
# personはRiakObject
person = bucket.get('riak_developer_1')
print(person.get_data())
# -> {u'age': 10, u'company': u'\u65e5\u672c\u8a9e',
u'createdat':u'2013-01-29T10:46:16.969318', u'name': u'John Smith'}
バイナリ¶
# Riakに接続
client = riak.RiakClient(port='8092')
# bucket選択
bucket = client.bucket('test')
pict_binary = open("r_rudi-icon.png", 'rb').read()
# new_binaryでバイナリを保存
pict = bucket.new_binary('riak_developer_binary',
data=pict_binary,
content_type='image/png')
# Riakに保存
pict.store()
# 取得する時は get_binaryを使う
binary = bucket.get_binary('riak_developer_binary')
Map/Reduceで取得¶
機能¶
Mapフェイズはdata localityを保ちつつ並列に実行される
Reduceフェイズはjobが登録されたノード上で並列に実行される
JavaScriptとErlangで使える
コード¶
import riak
# Riakに接続
client = riak.RiakClient(port='8092')
# データ追加
bucket = client.bucket('mr_test')
person = bucket.new('mr_1', data={'name': 'John Smith', 'age': 10})
person.store()
person = bucket.new('mr_2', data={'name': 'Riak', 'age': 30})
person.store()
person = bucket.new('mr_3', data={'name': 'r_rudi', 'age': 40})
person.store()
query = client.add('mr_test')
# Map実行 (条件は data.age > 20)
query.map("""
function(v) {
var data = JSON.parse(v.values[0].data);
if(data.age > 20) {
return [[v.key, data]];
}
return [];
}""")
for result in query.run():
print "%s - %s" % (result[0], result[1])
# 結果
# mr_2 - {u'age': 30, u'name': u'Riak'}
# mr_3 - {u'age': 40, u'name': u'r_rudi'}
reduceも追加してみる。
# Map実行
query.map("""
function(v) {
var data = JSON.parse(v.values[0].data);
if(data.age > 20) {
return [[v.key, data]];
}
return [];
}""")
# Reduce実行 (各ageに10を足す)
query.reduce("""
function(values) {
var ret = [];
for(var i in values){
values[i][1].age += 10;
ret.push(values[i]);
}
return ret;
}""")
for result in query.run():
print "%s - %s" % (result[0], result[1])
# 結果
# mr_2 - {u'age': 40, u'name': u'Riak'}
# mr_3 - {u'age': 50, u'name': u'r_rudi'}
Linkを使って関連するオブジェクトを取得¶
Link というのは、Riakのobject同士を関連付ける機能です。
リンク生成¶
import riak
import uuid
import time
# Riakに接続
client = riak.RiakClient(port='8092')
user_bucket = client.bucket('user')
status_bucket = client.bucket('status')
# ユーザー追加
person = user_bucket.new('rudi', data={
'name': 'rudi',
'age': 30
})
person.store()
# ステータス追加。keyはuuidで一意なkeyを生成
new_status = status_bucket.new(uuid.uuid1().hex, data={
'message': 'Message----',
'created': time.time(),
'is_public': True,
})
# status -> user のリンクを追加
new_status.add_link(person)
new_status.store()
# user -> status のリンクを追加
person.add_link(new_status)
person.store()
こういうリンクを作っておいて、
# Riakに接続
client = riak.RiakClient(port='8092')
user_bucket = client.bucket('user')
status_bucket = client.bucket('status')
person = user_bucket.get('rudi')
for status_link in person.get_links():
status = status_link.get()
print status.get_data()['message']
# -> 'Message---' と表示
という感じでlinkをたどれる。linkはいくらでも追加できるので、 statusを作るたびにuser -> status のリンクを追加していけば、 userのstatus変更をまとめて取得とかできる、というわけ。
Key Filter¶
Key Filter とは Map/Reduceの前処理として、キーを評価するQueryを全bucketに対して投げるも の。これによって指定した条件に合致するキーのみを取り出せる。取り出した 後にMap/Reduceをかけると効率的、というわけ。
bucket = client.bucket('invoices')
# データを突っ込む
ary = []
ary.append(bucket.new('basho-20101215', data={
'name': 'basho',
'price': 1000,
'createdat': datetime.datetime.now().isoformat()
}))
ary.append(bucket.new('google-20110103', data={
'name': 'google',
'price': 99999,
'createdat': datetime.datetime.now().isoformat()
}))
ary.append(bucket.new('yahoo-20090613', data={
'name': 'yahoo',
'price': 3333,
'createdat': datetime.datetime.now().isoformat()
}))
for a in ary:
a.store()
# ここから本番。
query = client.add("invoices")
query.add_key_filter("tokenize", "-", 1) # -で区切った1つめが
query.add_key_filter("eq", "yahoo") # yahooに等しいKey
# Map/Reduce
query.map("""function(v) {
var data = JSON.parse(v.values[0].data);
return [[v.key, data]];
}""")
# Map/Reduceの結果を表示
for result in query.run():
print "%s - %s" % (result[0], result[1])
Search¶
検索もあるんだけど、どうも手元で動かなかったのでまた今度書く。
Comments
comments powered by Disqus