このエントリーをはてなブックマークに追加

RiakをPythonから扱う

次は使ってみる。公式ドキュメントそのままなので単なる自分用メモです。

準備

% sudo portinstall devel/protobuf  # protobufが必要なので。
% pip install riak

基本

set

import riak
import datetime

# Riakに接続
client = riak.RiakClient()
# bucket選択
bucket = client.bucket('test')

# key, valueのデータ作成
# "riak_developer_1"がkey、dataがvalue
person = bucket.new('riak_developer_1', data={
    'name': 'John Smith',
    'age': 10,
    'company': u'日本語',
    'createdat': datetime.datetime.now().isoformat()
    })
# Riakに保存
person.store()

保存はUTF-8は大丈夫だけど、datetimeはそのままではできなかった。isoformat()して保存すればあとで使える。

APIの選択

RiakのAPIにはHTTPとProtocol Bufferの二つがあるけど、pythonはその両方共同じAPIを使う。明示的に指定したい場合は、RiakClientの引数に指定する。

#  ProtocolBufferを使う場合
client = riak.RiakClient(port=8087, transport_class=riak.RiakPbcTransport)
#  HTTPSを使う場合
client = riak.RiakClient(port=8091, transport_class=riak.RiakHttpsTransport)

port番号は、app.configで指定しているものを使うこと。ProtocolBufferとHTTPSでは違うポート番号のはず。

なお、デフォルトはHTTPで、その場合なにも指定する必要ない。

get

# Riakに接続
client = riak.RiakClient()
# bucket選択
bucket = client.bucket('test')

# personはRiakObject
person = bucket.get('riak_developer_1')
print(person.get_data())
# -> {u'age': 10, u'company': u'\u65e5\u672c\u8a9e',
      u'createdat':u'2013-01-29T10:46:16.969318', u'name': u'John Smith'}

バイナリ

# Riakに接続
client = riak.RiakClient(port='8092')

# bucket選択
bucket = client.bucket('test')
pict_binary = open("r_rudi-icon.png", 'rb').read()

# new_binaryでバイナリを保存
pict = bucket.new_binary('riak_developer_binary',
                  data=pict_binary,
                  content_type='image/png')
# Riakに保存
pict.store()

# 取得する時は get_binaryを使う
binary = bucket.get_binary('riak_developer_binary')

Map/Reduceで取得

Map/Reduce

機能

  • Mapフェイズはdata localityを保ちつつ並列に実行される
  • Reduceフェイズはjobが登録されたノード上で並列に実行される
  • JavaScriptとErlangで使える

コード

import riak

# Riakに接続
client = riak.RiakClient(port='8092')

# データ追加
bucket = client.bucket('mr_test')
person = bucket.new('mr_1', data={'name': 'John Smith', 'age': 10})
person.store()
person = bucket.new('mr_2', data={'name': 'Riak', 'age': 30})
person.store()
person = bucket.new('mr_3', data={'name': 'r_rudi', 'age': 40})
person.store()

query = client.add('mr_test')
# Map実行 (条件は data.age > 20)
query.map("""
function(v) {
    var data = JSON.parse(v.values[0].data);
    if(data.age > 20) {
        return [[v.key, data]];
    }
    return [];
}""")

for result in query.run():
    print "%s - %s" % (result[0], result[1])

# 結果
# mr_2 - {u'age': 30, u'name': u'Riak'}
# mr_3 - {u'age': 40, u'name': u'r_rudi'}

reduceも追加してみる。

# Map実行
query.map("""
function(v) {
    var data = JSON.parse(v.values[0].data);
    if(data.age > 20) {
        return [[v.key, data]];
    }
    return [];
}""")
# Reduce実行 (各ageに10を足す)
query.reduce("""
function(values) {
    var ret = [];
    for(var i in values){
         values[i][1].age += 10;
         ret.push(values[i]);
    }
    return ret;
}""")

for result in query.run():
    print "%s - %s" % (result[0], result[1])

# 結果
# mr_2 - {u'age': 40, u'name': u'Riak'}
# mr_3 - {u'age': 50, u'name': u'r_rudi'}

Key Filter

Key FilterとはMap/Reduceの前処理として、キーを評価するQueryを全bucketに対して投げるもの。これによって指定した条件に合致するキーのみを取り出せる。取り出した後にMap/Reduceをかけると効率的、というわけ。

bucket = client.bucket('invoices')

# データを突っ込む
ary = []
ary.append(bucket.new('basho-20101215', data={
    'name': 'basho',
    'price': 1000,
    'createdat': datetime.datetime.now().isoformat()
    }))
ary.append(bucket.new('google-20110103', data={
    'name': 'google',
    'price': 99999,
    'createdat': datetime.datetime.now().isoformat()
    }))
ary.append(bucket.new('yahoo-20090613', data={
    'name': 'yahoo',
    'price': 3333,
    'createdat': datetime.datetime.now().isoformat()
    }))
for a in ary:
    a.store()

# ここから本番。
query = client.add("invoices")
query.add_key_filter("tokenize", "-", 1)  # -で区切った1つめが
query.add_key_filter("eq", "yahoo")  # yahooに等しいKey

# Map/Reduce
query.map("""function(v) {
    var data = JSON.parse(v.values[0].data);
        return [[v.key, data]];
        }""")

# Map/Reduceの結果を表示
for result in query.run():
    print "%s - %s" % (result[0], result[1])