RubyのCassandraクライアントでバッチ処理。

大量データの読み込み, 書き込みのバッチ処理を効率よく行うため、Cassandraクライアントにはバッチ処理用の仕組みが用意されています。

環境


cassandra (0.12.1)

読み込み


大量データを処理するバッチ処理では、Cassandra内のデータを意識せずに読み込むと
メモリ上に全て展開されてしまって、メモリが足りなくなることがあります。

例えば1,000,000件のデータがあった場合、それを一度に読み込むと1,000,000件のデータがメモリに溜まってしまうので、メモリを圧迫してしまいます。

解決方法の一つとして、1,000,000件全てをメモリに格納するのではなく、分割取得して処理する方法があります。

Cassandraクライアントでは、get_rangeメソッドが分割取得に対応しています。
下記のトリガーで分割取得を行います。
1. メソッドにブロックが渡された場合
2. options[:key_count]が指定された場合(取得するロウキーの総数)
3. options[:batch_size]が指定された場合(1度に取得する件数)

分割取得の単位件数は、options[:batch_size]に指定された件数分取得します。
デフォルトは100件です。

使い方:
client = Cassandra.new('MyKeyspace', '127.0.0.1:9160')

# ブロック内の処理がbatch_sizeごとに呼び出される
client.get_range('ExampleCF', :batch_size => 1000) do |row|
# 処理
end


書き込み


CassandraのThriftClientは、1処理を行う毎にCassandraと通信を行います。
大量データの場合、データ件数分通信を行うため、ネットワークIOがボトルネックになる可能性があります。

解決方法としては、アプリケーション側で複数命令をキャッシュして、一度に発行する方法があります。

Cassandraクライアントでは、batchメソッドが複数命令のキャッシュに対応しています。

使い方:
client = Cassandra.new('MyKeyspace', '127.0.0.1:9160')
client.batch do
(1..count).each do |i|
client.insert(COLUMN_FAMILY, SimpleUUID::UUID.new.to_s, data)
end
end


ただし、batchメソッドには1つ問題があります。
batchメソッドでは、ブロック内で発行された命令をキャッシュして、
ブロック抜けた後にキャッシュ内にある命令の処理を行うのですが、
件数が膨大である場合、キャッシュデータでメモリを圧迫する上、
通信データが肥大してCassandra側でタイムアウトする場合があります。

解決方法としては、キャッシュデータ量を制限して、指定データに達した場合に都度送信する方法があります。

この方法はCassandraクライアントに組み込まれていないため、拡張します。

キャッシュデータがbatch_insertの第4引数であるbatch_sizeに指定した件数に達した場合、キャッシュデータをflushします。
注意点は、ループ処理が抜けた後に必ずflushする必要があることです。
flushしない場合、多くの場合キャッシュデータが完全に送信されません。

・メリット
通信回数が減るため、ネットワークIOが遅い環境で効果が出やすい。
メモリを圧迫しない。

拡張内容:
require 'rubygems'
require 'cassandra'
require 'benchmark'

class Cassandra
def batch_insert(column_family, key, hash, batch_size, options = {})
@batch ||= []

insert(column_family, key, hash, options)

if batch_size == @batch.size
flush(options)
end
end

def flush(options = {})
return if @batch.nil? || @batch.size == 0

compacted_map,seen_clevels = compact_mutations!
clevel = if options[:consistency] != nil
options[:consistency]
elsif seen_clevels.length > 1
raise "Multiple consistency levels used in the batch, and no override...cannot pick one"
else
seen_clevels.first
end
_mutate(compacted_map,clevel)
ensure
@batch = nil
end
end


使い方:
client = Cassandra.new('MyKeyspace', '127.0.0.1:9160')
(1..count).each do |i|
client.batch_insert(COLUMN_FAMILY, SimpleUUID::UUID.new.to_s, data, 100)
end
client.flush


書き込みのベンチマーク



通常のinsert,Cassandra標準のbatch処理を使ったinsert,拡張したbatch_insertでベンチマークをとりました。

しかし、接続先が127.0.0.1であるため通信速度が速く、思ったより違いがでませんでした。。。
分散環境では違いが色濃くでると思うのですが。

ベンチマークのソースコード

・insert
 1,000件:0.486773
 5,000件:2.492276
10,000件:4.824156
50,000件:25.074361

・batch
 1,000件:0.186431
 5,000件:1.102207
10,000件:1.937598
50,000件:14.075151

・batch_insert
 1,000件:0.177827
 5,000件:0.851033
10,000件:1.687284
50,000件:12.067339
テーマ : ソフトウェア開発
ジャンル : コンピュータ

Tag:Ruby  Trackback:1 comment:0 page to top

RubyのCassandraクライアントで分散カウンタの削除。

Cassandraクライアントから分散カウンタの削除ができなくてハマったのでメモ。

環境


cassandra (0.12.1)

対策


Cassandraの0.8から分散カウンタが追加されていて、Cassandraクライアントにも"add"というメソッドが追加されています。
これは分散カウンタ用のメソッドで、cassandra-cliの"incr"に相当します。

しかし、Cassandraクライアントから分散カウンタのカラムをremoveしようとしても例外が発生します。
これは多分、Cassandraクライアントのremoveの挙動がrowkey自体の削除ではなく、rowkeyに関連するカラムが削除されてrowkeyが残る仕様になっているのが原因な気がします。
ソースから何かないか探してみると、それっぽいのを発見。

でも扱うメソッドが無いっぽいのでCassandraクライアントを拡張。

対象のソースを見ると、_addの処理と同様の場所に_remove_counterがあるのに、なぜ_remove_counterの実装は無いんだろう?と考えると、見落としてるだけの可能性が高いかも。

require 'cassandra'
class Cassandra
def remove_counter(column_family, key, *columns_and_options)
column_family, column, sub_column, options = extract_and_validate_params(column_family, key, columns_and_options, WRITE_DEFAULTS)

if @batch
mutation_map =
{
key => {
column_family => [ _delete_mutation(column_family, column, sub_column, options[:timestamp]|| Time.stamp) ]
}
}
@batch << [mutation_map, options[:consistency]]
else
args = {:column_family => column_family}
columns = is_super(column_family) ? {:super_column => column, :column => sub_column} : {:column => column}
column_path = CassandraThrift::ColumnPath.new(args.merge(columns))
_remove_counter(key, column_path, options[:consistency])
end
end
end

対象ソース:
cassandra-0.12.1/lib/cassandra/0.8/protocol.rb

8 def _remove_counter(key, column_path, consistency_level)
9 client.remove_counter(key, column_path, consistency_level)
10 end
11
12 def _add(column_family, key, column, sub_column, value, consistency)
13 if is_super(column_family)
14 column_parent = CassandraThrift::ColumnParent.new(:column_family => column_fam
ily, :super_column => column)
15 counter_column = CassandraThrift::CounterColumn.new(:name => sub_column, :valu
e => value)
16 else
17 column_parent = CassandraThrift::ColumnParent.new(:column_family => column_fam
ily)
18 counter_column = CassandraThrift::CounterColumn.new(:name => column, :value =>
value)
19 end
20 client.add(key, column_parent, counter_column, consistency)
21 end
テーマ : ソフトウェア開発
ジャンル : コンピュータ

Tag:Ruby  Trackback:1 comment:1 page to top

RubyのCassandraクライアントで指定したIPに接続できない怪現象。


rubyのcassandraクライアントから別サーバのcassandraに接続する場合に、指定したIPに接続されずローカルホストに接続してしまう怪現象が発生してハマったのでメモ。
(正直、Forkwellのパブリケーションに追加したい衝動に駆られて書きましたw)

環境


○サーバ1
・cassandra 1.0.8
  ip: 192.168.0.11
  keyspace: ExampleKeyspace
  column family: ExampleCF
  ※optionの指定なしで作成

○サーバ2
・ruby1.9.3
  ・cassandra-0.12.1
  ・thrift-0.7.0
  ・thrift_client-0.7.1

現象確認



まず、サーバ2からサーバ1のcassandraにcassandra-cliから接続できることを確認。

$ cassandra-cli -h192.168.0.11 -kExampleKeyspace
Connected to: "Test Cluster" on 192.168.0.11/9160
Welcome to Cassandra CLI version 1.0.8

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@ExampleKeyspace]

問題なし。

次は、Rubyから接続してみる。

こんなスクリプトを用意して、
require 'rubygems'
require 'cassandra'

client = Cassandra.new('ExampleKeyspace', '192.168.0.11:9160')
client.insert('ExampleCF', SimpleUUID::UUID.new.to_s, {})

スクリプトを実行。

$ ruby insert.rb
/usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/thrift-0.7.0/lib/thrift/transport/socket.rb:53:in `rescue in open': CassandraThrift::Cassandra::Client::TransportException
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/thrift-0.7.0/lib/thrift/transport/socket.rb:36:in `open'
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/thrift-0.7.0/lib/thrift/transport/framed_transport.rb:37:in `open'
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/thrift_client-0.7.1/lib/thrift_client/connection/socket.rb:11:in `connect!'
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/thrift_client-0.7.1/lib/thrift_client/abstract_thrift_client.rb:105:in `connect!'
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/thrift_client-0.7.1/lib/thrift_client/abstract_thrift_client.rb:144:in `handled_proxy'
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/thrift_client-0.7.1/lib/thrift_client/abstract_thrift_client.rb:60:in `describe_keyspace'
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/cassandra-0.12.1/lib/cassandra/cassandra.rb:177:in `schema'
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/cassandra-0.12.1/lib/cassandra/columns.rb:31:in `column_family_property'
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/cassandra-0.12.1/lib/cassandra/columns.rb:20:in `column_name_class_for_key'
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/cassandra-0.12.1/lib/cassandra/columns.rb:12:in `column_name_class'
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/cassandra-0.12.1/lib/cassandra/helpers.rb:21:in `extract_and_validate_params'
from /usr/local/ruby-1.9.3-p125/lib/ruby/gems/1.9.1/gems/cassandra-0.12.1/lib/cassandra/cassandra.rb:442:in `insert'
from test.rb:6:in `
'

エラー発生。

原因


ソースを追ってみると、cassandraクライアントのデフォルトの挙動は、インスタンス生成時に指定したサーバに書き込みに行くのではなく、
指定したサーバのクラスタリング情報を読み取り、リング上のIP1つに対して書き込みに行く模様。

サーバ1からcassandraのリング情報を取得してみる。
$ nodetool -hlocalhost ring
Address DC Rack Status State Load Owns Token
127.0.0.1 datacenter1 rack1 Up Normal 752.7 KB 100.00% 17515755121036504136650759168839277356

つまり、リング上のIPが127.0.0.1になっているから、サーバ2から127.0.0.1へ書き込みにいっているということっぽい。
サーバ2にはcassandraが立ち上がっていないため、ネットワークエラーになる。

対象ソース:
cassandra-0.12.1/lib/cassandra/cassandra.rb

1052 def all_nodes
1053 if @auto_discover_nodes && !@keyspace.eql?("system")
1054 temp_client = new_client
1055 begin
1056 ips = (temp_client.describe_ring(@keyspace).map {|range| range.endpoints}).flatten.uniq
1057 port = @servers.first.split(':').last
1058 ips.map{|ip| "#{ip}:#{port}" }
1059 ensure
1060 temp_client.disconnect!
1061 end
1062 else
1063 @servers
1064 end
1065 end


対策


解決策としては、ぱっと2つ思いつく。
1. リング情報にcassandraサーバのプライベートIPを設定する。
2. リング情報ではなく指定したサーバ情報にアクセスするように設定する。

対策1はクラスタリングへのアクセスをCassandraに任せる場合、対策2はサーバ2とCassandraの間に別のロードバランサーなどを挟む場合かな?
(2の場合Consistency LevelをQUORUMとかALLにしたらどうなるんだろう?試してない。)

解決策1:
cassandra.yamlの設定を変更。

175 # Address to bind to and tell other Cassandra nodes to connect to. You
176 # _must_ change this if you want multiple nodes to be able to
177 # communicate!
178 #
179 # Leaving it blank leaves it up to InetAddress.getLocalHost(). This
180 # will always do the Right Thing *if* the node is properly configured
181 # (hostname, name resolution, etc), and the Right Thing is to use the
182 # address associated with the hostname (it might not be).
183 #
184 # Setting this to 0.0.0.0 is always wrong.
185 listen_address: 192.168.0.11 # ローカルIPを指定

再起動してリング上のIPを確認。
$ nodetool -h localhost ring
Address DC Rack Status State Load Owns Token
192.168.0.11 datacenter1 rack1 Up Normal 765.67 KB 100.00% 17515755121036504136650759168839277356

解決策2:
cassandraクライアントのdisable_node_auto_discovery!メソッドを呼べば、
インスタンス生成時に指定したIPのcassandraサーバに直接接続するようになる。
require 'rubygems'
require 'cassandra'

client = Cassandra.new('ExampleKeyspace', '192.168.0.11:9160')
client.disable_node_auto_discovery! # ここ
client.insert('ExampleCF', SimpleUUID::UUID.new.to_s, {})

テーマ : ソフトウェア開発
ジャンル : コンピュータ

Tag:Ruby  Trackback:0 comment:0 page to top

[Java]サロゲートペア文字列の操作

目的


Vistaが世に出てから、サロゲートペア文字列の扱いに頭を悩ませているエンジニアの方々も多いと思うのですが、「java サロゲートペア substring」とかで検索しても、なぜかそれっぽい記事が出てきません。
毎回実装するたび忘れて再実装しちゃうので、作ったクラスをメモしときます。
(もしかして、汎用的なUtilityとかあるのか?)

ソース


public final class CodePointUtils {
private CodePointUtils() {
}

/**
* サロゲートペア文字を置換.
*
* @param str
* 文字列
* @param replacement
* 置換文字列
* @return 置換後文字列
*/
public static String replaceSurrogatePair(String str, String replacement) {
StringBuilder sb = new StringBuilder();
char[] ach = str.toCharArray();

int cp = 0;
int len = str.length();
for (int i = 0; i < len; i += Character.charCount(cp)) {
cp = Character.codePointAt(ach, i);
if (!Character.isSupplementaryCodePoint(cp)) {
sb.append(String.valueOf(Character.toChars(cp)));
} else {
sb.append(replacement);
}
}

return sb.toString();
}

/**
* 文字列を開始位置から終了位置まで取り出す.
*
* @param str 文字列
* @param beginIndex 開始位置
* @param endIndex 終了位置
* @return 文字列
*/
public static String substring(String str, int beginIndex, int endIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
if (endIndex > length(str)) {
throw new StringIndexOutOfBoundsException(endIndex);
}
if (beginIndex > endIndex) {
throw new StringIndexOutOfBoundsException(endIndex - beginIndex);
}

StringBuilder sb = new StringBuilder();

char[] ach = str.toCharArray();

int beginOffset = str.offsetByCodePoints(0, beginIndex);
int endOffset = str.offsetByCodePoints(0, endIndex);

int cp = 0;
for (int i = beginOffset; i < endOffset; i += Character.charCount(cp)) {
cp = Character.codePointAt(ach, i);
sb.append(String.valueOf(Character.toChars(cp)));
}

return sb.toString();
}

/**
* 文字数を取得.
*
* @param str 文字列
* @return 文字数
*/
public static int length(String str) {
return str.codePointCount(0, str.length());
}
}



参考


http://www.ibm.com/developerworks/jp/ysl/library/java/j-unicode_surrogate/
テーマ : ソフトウェア開発
ジャンル : コンピュータ

Tag:Java  Trackback:0 comment:2 page to top

ircd-hybridをssl通信で通信内容暗号化

■ 目的
ircは通信内容が平文で流れるので、パケット解析ツールなどを使えば簡単に盗聴されてしまう。
通信内容をsslで暗号化して、セキュリティ向上をはかる。
ircd-hybridでのssl設定がどうしても見つからなかったため、stoneで、規定ポートへのパケットをsslで暗号化してircに転送する設定を行う。
また、バックグラウンドで走らせるため、起動スクリプトも作成する。

■ 必要パッケージ
・openssl
・openssl-devel
・stone
・cvs(stoneがインストールされていない場合)

■ openssl/openssl-develのインストール

[root@example ~]# yum install openssl openssl-devel


■ stoneのインストール
# cvsにログイン(パスワードなし)
[root@example tmp]# cvs -d :pserver:anonymous@cvs.sourceforge.jp:/cvsroot/stone login

# チェックアウト
[root@example tmp]# cvs -d :pserver:anonymous@cvs.sourceforge.jp:/cvsroot/stone co stone

# コンパイル
[root@example tmp]# cd stone
[root@example stone]# make linux-ssl

# 実行ファイルをパスが通ってる場所にコピー
[root@example stone]# cp -v stone /usr/local/bin/

# stoneのテスト
# ircへの通信を6668経由で6667に接続するように設定。
# ircクライアントから6668に接続できるかテスト。
[root@example tmp]# stone localhost:6667 6668

# ssl証明書を作成
[root@example tmp]# cd /etc/pki/tls/certs/
[root@example certs]# make stone.pem

# stoneのテスト
# ircクライアントの通信を6668経由で暗号化して6667に接続するように設定。
# ircクライアントからssl経由で6668に接続できるかテスト。
[root@example certs]# stone localhost:6667 6668/ssl


■ 設定ファイルの作成
 # ircのssl通信用設定ファイルを格納するディレクトリを作成
[root@example ~]# mkdir /etc/ircd-ssl
# 設定ファイルを新規作成
[root@example ~]# vi /etc/ircd-ssl/ircd-ssl.conf
# ircdのポート番号
IRCD_PORT=6667
# sslとして動作させるポート番号
IRCD_SSL_PORT=6668


■ 起動スクリプトの作成
本来は/etc/init.d/functionsに定義されている関数を使用してプロセスの管理を行うべきかと思うが、今回はめんどいので割愛。
また、例外処理も入れてない。
# 起動スクリプトを新規作成
[root@example ~]# vi /etc/init.d/ircd-ssl

#!/bin/bash
PATH=${PATH}:/usr/local/bin
prog=ircd-ssl
desc="ircd-hybrid(SSL)"
pidf=/var/run/${prog}.pid

# 設定を読み込む
[ -f /etc/${prog}/${prog}.conf ] && . /etc/${prog}/${prog}.conf

function clean() {
[ -e $pidf ] && rm -f $pidf
}

function start() {
echo "$desc 開始"
stone localhost:$IRCD_PORT $IRCD_SSL_PORT/ssl &
echo $! > $pidf
}

function stop() {
echo "$desc 停止"
[ -e $pidf ] && kill -9 `cat $pidf`
clean
}

case $1 in
start)
start
;;
stop)
stop
;;
restart)
stop
start
;;
*)
echo "Usage: $prog {start|stop|restart}" >&2
;;
esac

# 実行権限追加
[root@example ~]# chmod +x /etc/init.d/ircd-ssl


■ 起動テスト

# ircdを起動する
[root@example ~]# /etc/init.d/ircd start

# ircd-sslを起動する
[root@example ~]# /etc/init.d/ircd-ssl start
# 起動確認
[root@example ~]# ps aux | grep 6668

# ircd-sslを停止する
[root@example ~]# /etc/init.d/ircd-ssl stop
# 停止確認
[root@example ~]# ps aux | grep 6668

# ircd-sslを再起動する
[root@example ~]# /etc/init.d/ircd-ssl restart
# 起動確認
[root@example ~]# ps aux | grep 6668


■ iptablesの設定
ircdのポートを閉じて、6668のポートを開放。
設定ファイル:/etc/sysconfig/iptables

# -A RH-Firewall-1-INPUT -m state --state NEW -m tcp -p tcp --dport 6667 -j ACCEPT
-A RH-Firewall-1-INPUT -m state --state NEW -m tcp -p tcp --dport 6668 -j ACCEPT
テーマ : UNIX/Linux
ジャンル : コンピュータ

Tag:irc  Trackback:0 comment:0 page to top

プロフィール

Author:nori
年齢:26
現在地:東京都
出身地:鹿児島県
twitter:nori_licht