最近embulk-input-gcs、embulk-output-bigqueryというEmbulkのプラグインを作ったんですが、Embulk本体のコードよりGoogleのAPI/Google APIs Client Library for Javaの利用でハマりました。
AWSのようにSDK落としてくればサクッと開発できると思ってたんですがそんなに甘くなかった。。
GCP(Google Cloud Platform)界隈は認証方法やライブラリが多数ある割に、どれを使えばいいかというようなindexが公式情報含めあまりネット上にあまりないのでライブラリの選択でそもそもハマります。
未だに全容が把握できていないんですが、自分用メモ兼embulk-output-bigqueryへのプルリクエスト頂くことも期待してまとめておきます。
なおJava以外の言語向けライブラリの状況は分かりません。
そもそもどのライブラリを使うべきかどうやって探すのよ?
ネットで見つけたサンプルコード/スニペットを元にググり始めると収集がつかなくなるので、一旦はGoogle Cloud Platformのコンソールの左メニューにある「ソースコード」> 「デベロッパーツール」からリンクされているものから見ていくのがいいと思います。
サンプルコードに関してはGoogle Cloud PlatformのGithubアカウントで公開されているプロジェクトのコードを丹念に漁っていくと見つけられたりします。
kubernetes関連なんかも混じっている上に全部で250プロジェクト近くあるので探すの自体が大変ですが、対象言語や対象プロダクトで絞り込んで行けばなんとかなります。コードリーディングのスキル向上だと思って頑張りましょう。
認証方法をどうするのかがどのライブラリを使うかを決める最大のポイントだと思われるので、認証方法をどうするのかを先に考えたほうが良いかもしれません。
あとネット上のソースコードを参照する場合はAndroidなどのクライアントアプリケーション向けのコードなのかサーバサイドアプリケーション向けのコードなのかで、使っているライブラリやimportしているパッケージが変わってくるので、そこは意識しながら読んだ方がいいです。
あとはJavaDoc読もう。
認証系ライブラリ
恐らくここがキモで、複数ある認証方法の中からどれを選択するのかで特にGCS関連で使えるライブラリが決まってきます。
と言いつつ結局全部でいくつ認証方法があるのかは分かりません。 それぞれ使うべきクラスが変わってくると思います。
- サービスアカウントのP12(PKCS12)形式の秘密鍵による認証
- GoogleCredentialクラス
- サービスアカウントのJSONキーによる認証
- GoogleClientSecretsクラス
自分は今回1つ目の方法を選択したのでGoogleCredentialクラスを使っています。
後者についてはGoogle Cloud PlatformのGithubアカウントでサンプルコードが公開されています。
Google Cloud Storage(GCS)
ライブラリの選択について
Javaの場合、Google APIs Client LibrariesとGoogle Cloud Storage Java Client Libraryという名前の似た2つのライブラリが存在します。GCPのマネージメントコンソールからリンクされているのは前者です。後者は恐らくOAuthによる認証に対応しておらずACLによる制御のみだと思います。
Google Cloud Client Librariesというものもリンクされていますが、これはJava版が存在していません。
embulk-input-gcs/embulk-output-bigqueryではGoogle APIs Client Librariesを使っています。
認証方法
上の認証のところで書いたGoogleCredentialクラスとGoogle APIs Client Libraries for Javaを使ってGCSの認証を行うコードは以下のようになります。
private final HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
private final JsonFactory jsonFactory = new JacksonFactory();
private hoge() throws IOException, GeneralSecurityException
{
GoogleCredential credentials = new GoogleCredential.Builder().setTransport(httpTransport) // (1)
.setJsonFactory(jsonFactory)
.setServiceAccountId(<サービスアカウント メールアドレス>)
.setServiceAccountScopes( // (2)
ImmutableList.of(
StorageScopes.DEVSTORAGE_READ_ONLY
BigqueryScopes.BIGQUERY
)
)
.setServiceAccountPrivateKeyFromP12File(new File(<P12秘密鍵のフルパス>))
.build();
Storage client = new Storage.Builder(httpTransport, jsonFactory, credentials)
.setApplicationName(<アプリケーション名>) // (3)
.build();
return client;
}
(1) まずGoogleCredentialクラスを使って後続の処理でも使用するcredentialsを生成します。 (2) GoogleCredential.Builder().setServiceAccountScopes()にはアプリケーションが使用する権限を渡していきます。
- https://www.googleapis.com/auth/devstorage.read_write のようなURIで指定する方法
- StorageScopesクラスで定義されている定数を使ってStorageScopes.DEVSTORAGE_READ_ONLYのように記述する方法
があります。
ここでBigqueryScopesクラスで定義されているBigqueryScopes.BIGQUERYも追加すればそのままGoogle BigQueryの認証も可能です。
上のコードではGCSの読み取り権限(StorageScopes.DEVSTORAGE_READ_ONLY)とBigQuery上のデータの管理/閲覧権限(BigqueryScopes.BIGQUERY)を与えています。
(3) Storage.Builder.setApplicationName()には任意のアプリケーション名を指定します。これはGCPのコンソールで確認できる値ではなく何でも構いません。
ただし値はnullでなければ何でも構わないけど指定しないとAPIの認証がエラーになる謎パラメータです。 自分はユーザが設定ファイルで指定していなければ「Embulk BigQuery plugin」を指定するようにしました。
恐らくどのアプリからどれだけアクセスがあったかというような統計を取るためのものではないかと思います。コンソールのどこから確認できるのかは知らない…
(4) 実はこれだけだと認証情報不正等で認証が失敗してもExceptionが発生せず認証失敗を検知できませんでした。 embulk-output-bigqueryではプロジェクトの一覧を最大1件取得するというコードを足しています。
この3行がない場合そのまま後続処理へ進んでしまい、バケットへアクセスする段階で400 Bad Request{“error” : “invalid_grant”}
になります。もしくは後続処理の中で考慮するか。
でももうちょっとスマートな方法がある気がする。
GCSへのファイルのアップロードについて
自分がリリース直前丸1日ハマってembulk-output-gcs作者のhakoberaさんに教えてもらったんですが、ファイルアップロードの際に1行足りないとアップロードが激遅になります。
1GBのファイルのアップロードが1h掛かっても終わらず、ベンチマークのやり方間違えたか回線障害かと思いました。
下から1行目のinsertObject.setDisableGZipContent(true);がない場合、HTTP contentのgzip圧縮が常時OFFとなるようで、ネットワーク転送量が激増します。
File file = new File(<ローカルファイルのパス>);
stream = new FileInputStream(file);
StorageObject objectMetadata = new StorageObject().setName(<GCSにアップロード後のpath>);
InputStreamContent content = new InputStreamContent(getContentType(), stream);
Storage.Objects.Insert insertObject = storageClient.objects().insert(bucket, objectMetadata, content);
insertObject.setDisableGZipContent(true);
StorageObject response = insertObject.execute();
Google BigQuery
Google Cloud PlatformのGithubアカウントで公開されているサンプルコードが参考になります。
com.google.api.services.bigquery.*
に登録されているパッケージでジョブの準備/実行/結果取得等一通りの処理はできると思いますが、com.google.appengine.tools.mapreduce.impl.BigQueryConstants
なんていうものもあります。
これはBigQuery関連の各種limit値が登録されているものですが自分は使いませんでした。
1つのジョブに登録できるファイルの最大容量はBigQueryのドキュメントでは最大1TB(非圧縮状態)ですがこのパッケージでの定義はバッファを見てか500GBになっていたりしますし、パラメータ数自体も少なく信用していいのか分からなかったので。
ジョブのステータス/実行結果の取得
ジョブのステータス/実行結果取得も少し注意が必要です。
下記のコードのjob.getStatus().getState()
で取得できるのはジョブの実行ステータス(PENDING/RUNNING/DONE)であって、ジョブが正常に実行できたか失敗したかの結果ではありません。
実行が成功したかどうかはjob.getStatus().getErrorResult()
の戻り値がnullであるかどうかで判断する必要があります。ネット上で公開されている多くのコードでもそこまで見ていないケースが多いので注意が必要です。
実行に失敗した場合はjob.getStatus().getErrorResult().getMessage()
でエラーとなった理由が取得できます。
Not found:Table your-project-id:demodatasets.demotable
ジョブが成功した場合の結果詳細についてはjob.getStatistics().getLoad()
で取得できます。
{"inputFileBytes":"298","inputFiles":"2","outputBytes":"480","outputRows":"8"}
Job job = bigQueryClient.jobs().get(<プロジェクトID>, jobRef.getJobId()).execute();
if (job.getStatus().getErrorResult() != null) {
log.warn(String.format("Job failed. job id:[%s] reason:[%s] status:[FAILED]", jobRef.getJobId(), job.getStatus().getErrorResult().getMessage()));
}
String jobStatus = job.getStatus().getState();
if (jobStatus.equals("DONE")) {
JobStatistics statistics = job.getStatistics();
log.info(String.format("Job statistics [%s]", statistics.getLoad()));
}
GetQueryResultsResponseというクラスもありますが、これは文字通りBigQueryにSQLのクエリを発行した際の実行結果を取得するためのものです。
BigQueryのデータ可視化のためにGoogleスプレッドシート/GAS(Google Apps Script)を使ったことある人なら、言語は違えどgetQueryResults()とかgetRows()というメソッド名に見覚えがあるかもしれません。
APIのレスポンスの確認
Google APIs Explorerで実際にリクエストを投げた結果の(ダミーではなく)APIレスポンスの確認ができます。
BigQueryであれば実際のJobの実行結果の確認もできるので、APIでどういった項目/値が返ってくるのか確認しつつ開発を進めるといいと思います。画面右上のOAuth2.0のチェックをONにするのをお忘れずに。
自分は開発終わってからこれ知りましたが。
最後に一言
Google APIs Client Library for Java使った後だとGASのスクリプトがスラスラかけて楽。