Symfoware

Symfowareについての考察blog

Elasticsearch + Pythonでバルクインサート(helpers.bulk)

Elasticsearch、APIにバルクインサートが用意されているようです。
Elasticsearch + Pythonでどうやるのか調べてみました。


登録用データ



郵便番号データを使用します。
こちらから、全国一括のデータをダウンロードしました。
郵便番号データダウンロード

ここを参考に、zipファイルのまま読み込んで登録しています。
Pythonでzipファイルを解凍せずに中身のテキストファイルを読み込む


  1. # -*- coding:utf-8 -*-
  2. import zipfile
  3. import cStringIO
  4. from elasticsearch import Elasticsearch
  5. def read_all():
  6.     
  7.     """ 郵便番号辞書の読み込み """
  8.     with zipfile.ZipFile('ken_all.zip', 'r') as post:
  9.         
  10.         # KEN_ALL.CSVの内容を取得し、StringIOでくるむ
  11.         f = cStringIO.StringIO(post.read('KEN_ALL.CSV'))
  12.         
  13.         for line in f:
  14.             # ms932からunicodeオブジェクトに変換
  15.             ary = unicode(line, 'ms932').strip().split(',')
  16.             ary = [item.replace('"', '') for item in ary]
  17.             
  18.             zipcode = ary[2]
  19.             address = ary[6] + ary[7] + ary[8]
  20.             
  21.             yield (zipcode, address)
  22. def main():
  23.     
  24.     # コネクション確率
  25.     es = Elasticsearch(host='192.168.1.103', port=9200)
  26.     
  27.     for zipcode, address in read_all():
  28.         
  29.         doc = {'zipcode':zipcode, 'address':address}
  30.         es.index(index="test-index", doc_type='zip-code', body=doc)
  31.     
  32.     # ドキュメント全体を検索
  33.     res = es.search(index="test-index", body={"query": {"match_all": {}}})
  34.     print("Got %d Hits:" % res['hits']['total'])
  35. if __name__ == '__main__':
  36.     main()



indexを使用した登録だと90秒かかりました。





helpers.bulk



ドキュメントを探すのに苦労しました。
elasticsearch.helpers.bulk
How to use Bulk API to store the keywords in ES by using Python


試行錯誤して書いたプログラムはこちら。


  1. # -*- coding:utf-8 -*-
  2. import zipfile
  3. import cStringIO
  4. from elasticsearch import Elasticsearch
  5. from elasticsearch import helpers
  6. def read_all():
  7.     
  8.     """ 郵便番号辞書の読み込み """
  9.     with zipfile.ZipFile('ken_all.zip', 'r') as post:
  10.         
  11.         # KEN_ALL.CSVの内容を取得し、StringIOでくるむ
  12.         f = cStringIO.StringIO(post.read('KEN_ALL.CSV'))
  13.         
  14.         for line in f:
  15.             # ms932からunicodeオブジェクトに変換
  16.             ary = unicode(line, 'ms932').strip().split(',')
  17.             ary = [item.replace('"', '') for item in ary]
  18.             
  19.             zipcode = ary[2]
  20.             address = ary[6] + ary[7] + ary[8]
  21.             
  22.             yield (zipcode, address)
  23. def main():
  24.     
  25.     # コネクション確率
  26.     es = Elasticsearch(host='192.168.1.103', port=9200)
  27.     
  28.     # bulkに渡すリスト
  29.     actions = []
  30.     
  31.     for zipcode, address in read_all():
  32.         
  33.         doc = {'zipcode':zipcode, 'address':address}
  34.         
  35.         # index => _index、doc_type => _type、body => _sourceに読み替えてセット
  36.         actions.append({'_index':'test-index', '_type':'zip-code', '_source':doc})
  37.         
  38.         # 1000件たまったらbulk
  39.         if len(actions) > 1000
  40.             helpers.bulk(es, actions)
  41.             actions = []
  42.             
  43.     
  44.     # 残っているデータを登録
  45.     if len(actions) > 0:
  46.         helpers.bulk(es, actions)
  47.     
  48.     
  49.     # ドキュメント全体を検索
  50.     es.indices.refresh(index="test-index")
  51.     res = es.search(index="test-index", body={"query": {"match_all": {}}})
  52.     print("Got %d Hits:" % res['hits']['total'])
  53. if __name__ == '__main__':
  54.     main()





1000件ごとの一括登録に変えると9秒程度で処理完了です。
10倍高速になりました。





まとめ



通常のinsertとbulkに変えた時の時間です。
bulkは、何件ごとに一括登録するか件数を変化させて測定しています。
結果を見ると、1000件程度を一括登録するようにすれば、
満足な性能が得られるようでした。

タイプ123,721件の登録時間(秒)
index90.34
bulk:1022.40
bulk:10012.50
bulk:10009.19
bulk:100008.87
bulk:1000009.07






関連記事

テーマ:プログラミング - ジャンル:コンピュータ

  1. 2014/11/08(土) 18:36:35|
  2. Python
  3. | トラックバック:0
  4. | コメント:0
  5. | 編集
<<Elasticsearch 日本語データの分かち書きと検索(kuromoji使用) | ホーム | Elasticsearch 1.4にPythonから接続する>>

コメント

コメントの投稿


管理者にだけ表示を許可する

トラックバック

トラックバック URL
http://symfoware.blog68.fc2.com/tb.php/1542-c8308b10
この記事にトラックバックする(FC2ブログユーザー)