embulk-input-gcsプラグインを作りました

2015-03-17


並列データ転送ツールEmbulkのプラグインであるembulk-input-gcsを作りました。 EmbulkのインプットソースとしてGoogle Cloud Storage(GCS)上のバケット/ファイルを指定できるものです。

embulk-output-gcsの方が断然需要が高いと思いますが、自分はembulk-output-bigqueryを作っている最中にいろんな場所から同じデータでテストできて地味に便利でした。

単純にファイルをダウンロードするだけであればGoogle公式のCLIであるgsutil等を使えば事足りますが、Embulkを使うとファイルのパース等も行えるのがポイントです。

Embulkのgzipデコーダ、CSVパーサ、embulk-output-mysqlと組み合わせて、GCSにアップロードされたgzip圧縮されたCSVからデータを取り込み別環境のMySQLサーバ上のDBにデータを取り込むというようなこともできます。

と言いつつembulk-output-bigqueryを作ろうと思ったけど、いきなりEmbulkのコードとGCP/GCSのAPI/ライブラリ両方を相手にするのは重かったのでembulk-input-s3を大いに参考にさせてもらって、まずはウォーミングアップしたっていう事情もあります。

embulk-input-gcsの使い方

Google Cloud Storage(GCS)のAPIを有効にする必要があります。 手順は画面キャプチャ付きで別記事に書いたのでそちらを参考にして下さい。

ここではembulk exampleサブコマンドで生成できるsample_01.csv.gzというファイルを、GCSのmy-bucketという名前のbucketに事前にアップロードして動かしてみます。

このファイルをEmbulk標準のgzipデコーダで解凍、同じく標準のCSVパーサでパース後、Embulkを動作させているマシンの標準出力に出力してみます。

設定ファイルのyamlは以下の様になります。 application_nameは何でもいいです。API認証時に指定しないと認証エラーになるけど、どんな文字列でも認証が通ってしまう謎パラメータです。 恐らくどのアプリからどれだけリクエストが飛んだかというような統計情報を取るためのものではないかと思いますが、コンパネのどこで確認できるのか分かりません。 誰か知ってたら教えて…

in:
  type: gcs
  bucket: my-bucket
  path_prefix: sample
  service_account_email: ABCXYZ123ABCXYZ123.gserviceaccount.com
  application_name: MyAppName
  p12_keyfile_fullpath: /path/to/gcskey.p12
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    header_line: true
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}

実行します。

$ embulk run /path/to/config.yml
2015-03-17 22:18:22,180 +0900: Embulk v0.5.12
2015-03-17 22:18:26.587 +0900 [INFO] (transaction): {done:  0 / 1, running: 0}
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin
4,11270,2015-01-29 11:54:36,20150129,NULL
2015-03-17 22:18:27.760 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-03-17 22:18:27.791 +0900 [INFO] (main): Committed.
2015-03-17 22:18:27.793 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"sample_01.csv.gz"},"out":{}}

APIの認証が通っていれば、CSVファイルの中身がパースされた状態で表示されると思います。 outputプラグインとしてはMySQL、PostgresSQLといったRDBMSやElasticsearch向け等が出てきているのでそこに取り込むことも可能です。

実装、ハマったポイント

Embulk0.4からJavaでプラグインが書けるようになっていてJavaで実装しています。 embulk-input-s3がJava実装だったってのもありますが。

Google APIs Client Librariesを使う場合、build.gradleのdependenciesブロックのcompileの記述変更の必要がありました。ここは古橋さん(@frsyuki)に教えてもらいました。 {exclude module: “guava-jdk5”}を指定しない場合ConfigDefault(“null”)という記述を使っているとJsonMappingExceptionが発生し、「Setting null to a @Config field is not allowed」と言われます。 その際gradlew cleanしてclasspathディレクトリを一旦削除した方が良いです。

またclasspath/guava*.jarを削除しただけだとgradlew gemする度に復活してしまうのでダメでした。

GcsFileInputPlugin.java

@Config("last_path")
@ConfigDefault("null") //←この指定をするとビルドが通らない
public Optional<String> getLastPath();
dependencies {
    compile "com.google.http-client:google-http-client-jackson2:1.19.0"
    //以下の記述だとConfigDefault("null")を指定できない
    compile "com.google.apis:google-api-services-storage:v1-rev27-1.19.1"
    //以下に変更
    compile ("com.google.apis:google-api-services-storage:v1-rev27-1.19.1") {exclude module: "guava-jdk5"}
}

考えたけど実装しなかった機能

gsutilのように正規表現でファイルを引っ掛けるオプション

ex. gsutil ls gs://my-bucket/*.csv

APIのリクエストにはあくまでprefixしか指定できないため、実装するとなると一旦bucket内の全ファイルを取得してプラグイン内で絞り込む必要がある。 前方一致はともかくそれ以外はbucket内のファイル数が多いと処理に影響出そう。

ファイルの最終更新日時で絞り込めるオプション

update_datetime_from、update_datetime_toみたいなの。 付けようかと思ったけど一旦スキップ。ファイルの作成日時での絞り込みはAPIでそんな項目は取得できない(たぶん)のでできないと思います。

最後に

JavaでGCP/GCS関連の開発をするための情報が少なくて結構苦労するので別途まとめます。 3/18追記 まとめました→JavaでGCS(Google Cloud Storage)とBigQuery周りの開発をする際のハマりどころ


Profile picture of sakama

Written by sakama Data engineer who lives and works in Tokyo for successful data analytics You can follow me on Twitter

© 2022, sakama.dev - Built with Gatsby

Author

Profile picture of sakama

Written by sakama Data engineer who lives and works in Tokyo for successful data analytics You can follow me on Twitter

Socials

github.com/sakama

Recent Posts

Categories

Tags