並列データ転送ツールEmbulkのプラグインであるembulk-input-gcsを作りました。 EmbulkのインプットソースとしてGoogle Cloud Storage(GCS)上のバケット/ファイルを指定できるものです。
embulk-output-gcsの方が断然需要が高いと思いますが、自分はembulk-output-bigqueryを作っている最中にいろんな場所から同じデータでテストできて地味に便利でした。
単純にファイルをダウンロードするだけであればGoogle公式のCLIであるgsutil等を使えば事足りますが、Embulkを使うとファイルのパース等も行えるのがポイントです。
Embulkのgzipデコーダ、CSVパーサ、embulk-output-mysqlと組み合わせて、GCSにアップロードされたgzip圧縮されたCSVからデータを取り込み別環境のMySQLサーバ上のDBにデータを取り込むというようなこともできます。
と言いつつembulk-output-bigqueryを作ろうと思ったけど、いきなりEmbulkのコードとGCP/GCSのAPI/ライブラリ両方を相手にするのは重かったのでembulk-input-s3を大いに参考にさせてもらって、まずはウォーミングアップしたっていう事情もあります。
embulk-input-gcsの使い方
Google Cloud Storage(GCS)のAPIを有効にする必要があります。 手順は画面キャプチャ付きで別記事に書いたのでそちらを参考にして下さい。
ここではembulk exampleサブコマンドで生成できるsample_01.csv.gzというファイルを、GCSのmy-bucketという名前のbucketに事前にアップロードして動かしてみます。
このファイルをEmbulk標準のgzipデコーダで解凍、同じく標準のCSVパーサでパース後、Embulkを動作させているマシンの標準出力に出力してみます。
設定ファイルのyamlは以下の様になります。 application_nameは何でもいいです。API認証時に指定しないと認証エラーになるけど、どんな文字列でも認証が通ってしまう謎パラメータです。 恐らくどのアプリからどれだけリクエストが飛んだかというような統計情報を取るためのものではないかと思いますが、コンパネのどこで確認できるのか分かりません。 誰か知ってたら教えて…
in:
type: gcs
bucket: my-bucket
path_prefix: sample
service_account_email: ABCXYZ123ABCXYZ123.gserviceaccount.com
application_name: MyAppName
p12_keyfile_fullpath: /path/to/gcskey.p12
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
header_line: true
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out: {type: stdout}
実行します。
$ embulk run /path/to/config.yml
2015-03-17 22:18:22,180 +0900: Embulk v0.5.12
2015-03-17 22:18:26.587 +0900 [INFO] (transaction): {done: 0 / 1, running: 0}
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin
4,11270,2015-01-29 11:54:36,20150129,NULL
2015-03-17 22:18:27.760 +0900 [INFO] (transaction): {done: 1 / 1, running: 0}
2015-03-17 22:18:27.791 +0900 [INFO] (main): Committed.
2015-03-17 22:18:27.793 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"sample_01.csv.gz"},"out":{}}
APIの認証が通っていれば、CSVファイルの中身がパースされた状態で表示されると思います。 outputプラグインとしてはMySQL、PostgresSQLといったRDBMSやElasticsearch向け等が出てきているのでそこに取り込むことも可能です。
実装、ハマったポイント
Embulk0.4からJavaでプラグインが書けるようになっていてJavaで実装しています。 embulk-input-s3がJava実装だったってのもありますが。
Google APIs Client Librariesを使う場合、build.gradleのdependenciesブロックのcompileの記述変更の必要がありました。ここは古橋さん(@frsyuki)に教えてもらいました。
{exclude module: “guava-jdk5”}
を指定しない場合ConfigDefault(“null”)
という記述を使っているとJsonMappingException
が発生し、「Setting null to a @Config field is not allowed
」と言われます。
その際gradlew clean
してclasspathディレクトリを一旦削除した方が良いです。
またclasspath/guava*.jarを削除しただけだとgradlew gem
する度に復活してしまうのでダメでした。
GcsFileInputPlugin.java
@Config("last_path")
@ConfigDefault("null") //←この指定をするとビルドが通らない
public Optional<String> getLastPath();
dependencies {
compile "com.google.http-client:google-http-client-jackson2:1.19.0"
//以下の記述だとConfigDefault("null")を指定できない
compile "com.google.apis:google-api-services-storage:v1-rev27-1.19.1"
//以下に変更
compile ("com.google.apis:google-api-services-storage:v1-rev27-1.19.1") {exclude module: "guava-jdk5"}
}
考えたけど実装しなかった機能
gsutilのように正規表現でファイルを引っ掛けるオプション
ex. gsutil ls gs://my-bucket/*.csv
APIのリクエストにはあくまでprefixしか指定できないため、実装するとなると一旦bucket内の全ファイルを取得してプラグイン内で絞り込む必要がある。 前方一致はともかくそれ以外はbucket内のファイル数が多いと処理に影響出そう。
ファイルの最終更新日時で絞り込めるオプション
update_datetime_from、update_datetime_toみたいなの。 付けようかと思ったけど一旦スキップ。ファイルの作成日時での絞り込みはAPIでそんな項目は取得できない(たぶん)のでできないと思います。
最後に
JavaでGCP/GCS関連の開発をするための情報が少なくて結構苦労するので別途まとめます。 3/18追記 まとめました→JavaでGCS(Google Cloud Storage)とBigQuery周りの開発をする際のハマりどころ