※注意 この記事はembulk-output-bigquery v0.2.x以前に基づいて書かれています。 v0.3.0で大幅な変更が加わっているためv0.2.xで使っていたconfig.ymlはそのままでは使えません。 主な変更点は
- JavaプラグインからJRubyベースのプラグインに変わりました。それまでよりpull-requestは送りやすくなったと思うので早く送るん(ry
- formattersのブロックの指定が不要になりました。(FileOutputプラグインからOutputプラグインに変わったため)
- ローカルにGZIP圧縮されたファイルを吐くために必要だったencodersのブロックの指定が不要になりました。(理由は同上)。代わりにcompression: GZIPを指定してください。
その他の変更点についてはCHANGELOGを参照してください。
4/1追記 v0.1.2で動的テーブル作成に対応したので一部内容を変更しています
この前リリースしたEmbulkのGoogle BigQuery向けアウトプットプラグインembulk-output-bigqueryを使ってMySQL→BigQueryへデータをバルクロードする動作デモになります。
Embulk自体についてはオープンソースのバルクデータローダー「Embulk」登場。fluentdのバッチ版、トレジャーデータが支援 - Publickeyとか読んでみてください。
- embulk-input-randomでダミーデータの作成&embulk-input-mysqlでダミーデータをMySQLへインポート
- embulk-output-mysqlとembulk-output-bigqueryでMySQLからBigQueryへデータをロード(CSVフォーマット)
という流れになっています。
gemのインストール
embulk本体のインストールはEmbulkのQuick Start読んで下さい。
embulk gem install embulk-input-random
embulk gem install embulk-output-bigquery
embulk gem install embulk-input-mysql
embulk gem install embulk-output-mysql
MySQLのデータベース、テーブルの作成とデータのロード
embulk-input-randomプラグインを使って10000件ほど作成します。 embulk-output-mysqlも使ってそのままMySQLに流し込んじゃいましょう。 まずはMySQLのデータベース、ユーザ、テーブルの作成から。
mysql> CREATE DATABASE embulk DEFAULT CHARACTER SET utf8;
mysql> GRANT ALL ON embulk.* TO embulk_user@"%" IDENTIFIED BY 'embulk_pass';
mysql> USE embulk;
mysql> CREATE TABLE example (\
id bigint,\
number bigint,\
value1 varchar(60),\
value2 varchar(60),\
value3 varchar(60),\
value4 varchar(60),\
value5 varchar(60),\
value6 varchar(60),\
value7 varchar(60),\
value8 varchar(60),\
value9 varchar(60),\
value10 varchar(60),\
primary key(id)\
);
設定ファイル
# vi random_to_mysql.yml
in:
type: random
# 10000レコード生成、お好きな数字をどうぞ
rows: 10000
schema:
id: primary_key
number: integer
value1: string
value2: string
value3: string
value4: string
value5: string
value6: string
value7: string
value8: string
value9: string
value10: string
out:
type: mysql
host: localhost
database: embulk
user: embulk_user
password: embulk_pass
table: example
mode: insert
MySQLにデータをロード
embulk runで実行します。
# embulk run /path/to/random_to_mysql.yml
2015-03-19 20:04:11.957 +0900: Embulk v0.5.3
Random generation started.
2015-03-19 20:04:15.068 +0900 [INFO] (transaction): SQL: CREATE TABLE IF NOT EXISTS `example` (`id` BIGINT, `number` BIGINT, `value1` TEXT, `value2` TEXT, `value3` TEXT, `value4` TEXT, `value5` TEXT, `value6` TEXT, `value7` TEXT, `value8` TEXT, `value9` TEXT, `value10` TEXT)
2015-03-19 20:04:15.070 +0900 [INFO] (transaction): > 0.00 seconds
2015-03-19 20:04:15.118 +0900 [INFO] (transaction): {done: 0 / 1, running: 0}
2015-03-19 20:04:15.159 +0900 [INFO] (task-0000): Prepared SQL: INSERT INTO `example` (`id`, `number`, `value1`, `value2`, `value3`, `value4`, `value5`, `value6`, `value7`, `value8`, `value9`, `value10`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
Random generator input thread 0...
2015-03-19 20:04:18.588 +0900 [INFO] (task-0000): Loading 10,000 rows
2015-03-19 20:04:19.718 +0900 [INFO] (task-0000): > 1.13 seconds (loaded 10,000 rows in total)
2015-03-19 20:04:19.721 +0900 [INFO] (transaction): {done: 1 / 1, running: 0}
Random input finished. Commit reports = [{"rows":10000,"columns":12}]
2015-03-19 20:04:19.777 +0900 [INFO] (main): Committed.
2015-03-19 20:04:19.777 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
数秒で完了しました。
BigQueryへのデータロード
それではembulk-input-mysqlとembulk-output-bigqueryを使ってMySQL→BigQueryへのデータロードを行います。
embulk-output-bigqueryはschema.jsonのパスを指定しての動的テーブルに対応しているのでテーブルも実行時に作成してしまいます。なおテーブルが既に存在する場合は既存テーブルに追記されます。
- プロジェクトID: your-project-id
- データセット名: demodatasets
- テーブル名: demotable
- テーブルのスキーマ(schema.json):
[
{
"name": "id",
"type": "INTEGER"
},
{
"name": "number",
"type": "INTEGER"
},
{
"name": "value1",
"type": "STRING"
},
{
"name": "value2",
"type": "STRING"
},
{
"name": "value3",
"type": "STRING"
},
{
"name": "value4",
"type": "STRING"
},
{
"name": "value5",
"type": "STRING"
},
{
"name": "value6",
"type": "STRING"
},
{
"name": "value7",
"type": "STRING"
},
{
"name": "value8",
"type": "STRING"
},
{
"name": "value9",
"type": "STRING"
},
{
"name": "value10",
"type": "STRING"
}
]
設定ファイル
このデモそのまま動かしている場合は以下の設定で問題ないはずです。
他のフォーマットのデータを試していて[Too many errors encountered. Limit is: 0.]
等言われた場合はmax_bad_records: 100
等と指定するとひとまずロードはできると思います。
ただしデータは欠損します。
このオプションはbqコマンドラインツールでbq loadする際のオプションと同じようにエラーを何レコードまで許容するかのオプションです。 これを超えるエラーが見つかると全てロールバックされます。 デフォルトは0なので1レコードでもエラーになるとジョブ実行に失敗します。
field_delimiterも指定可能です。デフォルトは”,”
です。encodingもサポートしていますがBigQueryがサポートしているのはUTF-8(デフォルト)とISO-8859-1のみなので殆どの場合は指定する必要はないはずです。
skip_leading_rowsはサポートしていません。CSVフォーマッタープラグインのheader_line: false
を使って下さい。
# vi mysql_to_bigquery.yml
in:
type: mysql
user: embulk_user
password: embulk_pass
database: embulk
table: example
host: localhost
select: "*"
out:
type: bigquery
service_account_email: your-service-account.gserviceaccount.com
p12_keyfile_path: /path/to/private.p12
path_prefix: /var/tmp/sample
# 生成するファイルはCSV
source_format: CSV
# 一応拡張子は.csv.gz
file_ext: .csv.gz
# ジョブ実行完了時にローカルファイルを削除する
delete_from_local_when_job_end: 1
project: your-project-id
dataset: demodatasets
# 実行時にテーブルを作成する
auto_create_table: 1
schema_path: /path/to/schema.json
table: demotable
formatter:
type: csv
#ヘッダ行は不要
header_line: false
timezone: Asia/Tokyo
encoders:
- {type: gzip}
実行
# embulk run mysql_to_bigquery.yml
2015-04-01 20:28:16.039 +0900: Embulk v0.5.3
2015-04-01 20:28:18.563 +0900 [INFO] (transaction): Fetch size is 10000. Using server-side prepared statement.
2015-04-01 20:28:20.746 +0900 [INFO] (transaction): {done: 0 / 1, running: 0}
2015-04-01 20:28:20.847 +0900 [INFO] (task-0000): Writing file [/var/tmp/sample.000.00.csv.gz]
2015-04-01 20:28:20.877 +0900 [INFO] (task-0000): Fetch size is 10000. Using server-side prepared statement.
2015-04-01 20:28:20.909 +0900 [INFO] (task-0000): SQL: SELECT * FROM `example`
2015-04-01 20:28:20.981 +0900 [INFO] (task-0000): > 0.07 seconds
2015-04-01 20:28:21.649 +0900 [INFO] (task-0000): Fetched 500 rows.
2015-04-01 20:28:21.918 +0900 [INFO] (task-0000): Fetched 1,000 rows.
2015-04-01 20:28:22.283 +0900 [INFO] (task-0000): Fetched 2,000 rows.
2015-04-01 20:28:22.710 +0900 [INFO] (task-0000): Fetched 4,000 rows.
2015-04-01 20:28:22.983 +0900 [INFO] (task-0000): Fetched 8,000 rows.
2015-04-01 20:28:23.102 +0900 [INFO] (task-0000): Job preparing... project:your-project-id dataset:demodatasets table:demotable
2015-04-01 20:28:23.115 +0900 [INFO] (task-0000): table:[demotable] will be create if not exists
# アップロード開始
2015-04-01 20:28:23.127 +0900 [INFO] (task-0000): Upload start [/var/tmp/sample.000.00.csv.gz]
# アップロード正常終了
2015-04-01 20:28:26.047 +0900 [INFO] (task-0000): Upload completed [/var/tmp/sample.000.00.csv.gz]
2015-04-01 20:28:26.055 +0900 [INFO] (task-0000): Job executed. job id:[job_ABCDEFGHIJKLMN] file:[/var/tmp/sample.000.00.csv.gz]
# ジョブ実行ステータスのポーリング。PENDING→RUNNING→SUCCESS
2015-04-01 20:28:26.160 +0900 [INFO] (task-0000): Checking job status... job id:[job_ABCDEFGHIJKLMN] elapsed_time:105ms status:[PENDING]
2015-04-01 20:28:36.270 +0900 [INFO] (task-0000): Checking job status... job id:[job_ABCDEFGHIJKLMN] elapsed_time:10215ms status:[PENDING]
2015-04-01 20:28:46.439 +0900 [INFO] (task-0000): Checking job status... job id:[job_ABCDEFGHIJKLMN] elapsed_time:20384ms status:[RUNNING]
# 正常に完了した場合はStatisticsがINFOで表示される
# 失敗した場合はエラーとなった原因がWARNで表示される
2015-04-01 20:28:56.567 +0900 [INFO] (task-0000): Job statistics [{"inputFileBytes":"3420374","inputFiles":"1","outputBytes":"4660000","outputRows":"10000"}]
2015-04-01 20:28:56.568 +0900 [INFO] (task-0000): Job completed successfully. job id:[job_ABCDEFGHIJKLMN] elapsed_time:30513ms status:[SUCCESS]
# ローカルファイルの削除
2015-04-01 20:28:56.568 +0900 [INFO] (task-0000): Delete local file [/var/tmp/sample.000.00.csv.gz]
2015-04-01 20:28:56.569 +0900 [INFO] (transaction): {done: 1 / 1, running: 0}
2015-04-01 20:28:56.596 +0900 [INFO] (main): Committed.
2015-04-01 20:28:56.597 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
1分ほどで実行が完了しました。 普段RDBMS等を使われている方は長いと思われるかもしれませんが、45秒ほどはBigQuery上でのジョブの実行です。 ローカルでの処理は数秒で完了しています。
実行結果の確認
bqコマンドで確認してみます。
正常に動作していればテーブル作成時の1件を含め10001件入っているはずです。
bq query "SELECT COUNT(*) FROM demodatasets.demotable"
Waiting on bqjob_abcdefghijklmn ... (0s) Current status: DONE
+-------+
| f0_ |
+-------+
| 10001 |
+-------+
未知のデータで試す場合はいきなりBigQueryにロードせずtype:file
としてローカルファイルに出力しbq loadで正常にロードできるか確認すると問題の切り分けができてハマらないと思います。
最後に
ね、簡単でしょ?