Embulk output plugin to load/insert data into Google BigQuery using direct insert
load data into Google BigQuery as batch jobs for big amount of data https://developers.google.com/bigquery/loading-data-into-bigquery
- Plugin type: output
- Resume supported: no
- Cleanup supported: no
- Dynamic table creating: yes
| gem version | Embulk version | 
|---|---|
| 0.7.0 and higher | v0.11.0 and higher | 
| 0.6.9 and lower | v0.9.X and lower | 
- insert data over streaming inserts
- for continuous real-time insertions
- Please use other product, like fluent-plugin-bigquery
- https://developers.google.com/bigquery/streaming-data-into-bigquery#usecases
 
Current version of this plugin supports Google API with Service Account Authentication, but does not support OAuth flow for installed applications.
| name | type | required? | default | description | 
|---|---|---|---|---|
| mode | string | optional | "append" | See Mode | 
| auth_method | string | optional | "application_default" | See Authentication | 
| json_keyfile | string | optional | keyfile path or content | |
| project | string | required unless service_account's json_keyfileis given. | project_id | |
| destination_project | string | optional | projectvalue | A destination project to which the data will be loaded. Use this if you want to separate a billing project (the projectvalue) and a destination project (thedestination_projectvalue). | 
| dataset | string | required | dataset | |
| location | string | optional | nil | geographic location of dataset. See Location | 
| table | string | required | table name, or table name with a partition decorator such as table_name$20160929 | |
| auto_create_dataset | boolean | optional | false | automatically create dataset | 
| auto_create_table | boolean | optional | true | falseis available only forappend_directmode. Other modes requiretrue. See Dynamic Table Creating and Time Partitioning | 
| schema_file | string | optional | /path/to/schema.json | |
| template_table | string | optional | template table name. See Dynamic Table Creating | |
| job_status_max_polling_time | int | optional | 3600 sec | Max job status polling time | 
| job_status_polling_interval | int | optional | 10 sec | Job status polling interval | 
| is_skip_job_result_check | boolean | optional | false | Skip waiting Load job finishes. Available for append, or delete_in_advance mode | 
| with_rehearsal | boolean | optional | false | Load rehearsal_countsrecords as a rehearsal. Rehearsal loads into REHEARSAL temporary table, and delete finally. You may use this option to investigate data errors as early stage as possible | 
| rehearsal_counts | integer | optional | 1000 | Specify number of records to load in a rehearsal | 
| abort_on_error | boolean | optional | true if max_bad_records is 0, otherwise false | Raise an error if number of input rows and number of output rows does not match | 
| column_options | hash | optional | See Column Options | |
| default_timezone | string | optional | UTC | |
| default_timestamp_format | string | optional | %Y-%m-%d %H:%M:%S.%6N | |
| payload_column | string | optional | nil | See Formatter Performance Issue | 
| payload_column_index | integer | optional | nil | See Formatter Performance Issue | 
| gcs_bucket | string | optional | nil | See GCS Bucket | 
| auto_create_gcs_bucket | boolean | optional | false | See GCS Bucket | 
| progress_log_interval | float | optional | nil (Disabled) | Progress log interval. The progress log is disabled by nil (default). NOTE: This option may be removed in a future because a filter plugin can achieve the same goal | 
| description | string | optional | nil | description of table | 
Client or request options
| name | type | required? | default | description | 
|---|---|---|---|---|
| open_timeout_sec | integer | optional | 300 | Seconds to wait for the connection to open | 
| timeout_sec | integer | optional | 300 | Seconds to wait for one block to be read (google-api-ruby-client < v0.11.0) | 
| send_timeout_sec | integer | optional | 300 | Seconds to wait to send a request (google-api-ruby-client >= v0.11.0) | 
| read_timeout_sec | integer | optional | 300 | Seconds to wait to read a response (google-api-ruby-client >= v0.11.0) | 
| retries | integer | optional | 5 | Number of retries | 
| application_name | string | optional | "Embulk BigQuery plugin" | User-Agent | 
| sdk_log_level | string | optional | nil (WARN) | Log level of google api client library | 
Options for intermediate local files
| name | type | required? | default | description | 
|---|---|---|---|---|
| path_prefix | string | optional | Path prefix of local files such as "/tmp/prefix_". Default randomly generates with tempfile | |
| sequence_format | string | optional | .%d.%d | Sequence format for pid, thread id | 
| file_ext | string | optional | The file extension of local files such as ".csv.gz" ".json.gz". Default automatically generates from source_formatandcompression | |
| skip_file_generation | boolean | optional | Load already generated local files into BigQuery if available. Specify correct path_prefix and file_ext. | |
| delete_from_local_when_job_end | boolean | optional | true | If set to true, delete generate local files when job is end | 
| compression | string | optional | "NONE" | Compression of local files ( GZIPorNONE) | 
Options for intermediate tables on BigQuery
| name | type | required? | default | description | 
|---|---|---|---|---|
| temporary_table_expiration | integer | optional | Temporary table's expiration time in seconds | 
source_format is also used to determine formatter (csv or jsonl).
Following options are same as bq command-line tools or BigQuery job's property.
| name | type | required? | default | description | 
|---|---|---|---|---|
| source_format | string | required | "CSV" | File type ( NEWLINE_DELIMITED_JSONorCSV) | 
| max_bad_records | int | optional | 0 | |
| field_delimiter | char | optional | "," | |
| encoding | string | optional | "UTF-8" | UTF-8orISO-8859-1 | 
| ignore_unknown_values | boolean | optional | false | |
| allow_quoted_newlines | boolean | optional | false | Set true, if data contains newline characters. It may cause slow procsssing | 
| time_partitioning | hash | optional | {"type":"DAY"}iftableparameter has a partition decorator, otherwise nil | See Time Partitioning | 
| time_partitioning.type | string | required | nil | The only type supported is DAY, which will generate one partition per day based on data loading time. | 
| time_partitioning.expiration_ms | int | optional | nil | Number of milliseconds for which to keep the storage for a partition. | 
| time_partitioning.field | string | optional | nil | DATEorTIMESTAMPcolumn used for partitioning | 
| range_partitioning | hash | optional | nil | See Range Partitioning | 
| range_partitioning.field | string | required | nil | INT64column used for partitioning | 
| range-partitioning.range | hash | required | nil | Defines the ranges for range paritioning | 
| range-partitioning.range.start | int | required | nil | The start of range partitioning, inclusive. | 
| range-partitioning.range.end | int | required | nil | The end of range partitioning, exclusive. | 
| range-partitioning.range.interval | int | required | nil | The width of each interval. | 
| clustering | hash | optional | nil | Currently, clustering is supported for partitioned tables, so must be used with time_partitioningoption. See clustered tables | 
| clustering.fields | array | required | nil | One or more fields on which data should be clustered. The order of the specified columns determines the sort order of the data. | 
| schema_update_options | array | optional | nil | (Experimental) List of ALLOW_FIELD_ADDITIONorALLOW_FIELD_RELAXATIONor both. See jobs#configuration.load.schemaUpdateOptions. NOTE for the current status:schema_update_optionsdoes not work forcopyjob, that is, is not effective for most of modes such asappend,replaceandreplace_backup.delete_in_advancedeletes origin table so does not need to update schema. Onlyappend_directcan utilize schema update. | 
out:
  type: bigquery
  mode: append
  auth_method: service_account
  json_keyfile: /path/to/json_keyfile.json
  project: your-project-000
  dataset: your_dataset_name
  table: your_table_name
  compression: GZIP
  source_format: NEWLINE_DELIMITED_JSONThe geographic location of the dataset. Required except for US and EU.
GCS bucket should be in same region when you use gcs_bucket.
See also Dataset Locations | BigQuery | Google Cloud
5 modes are provided.
- Load to temporary table (Create and WRITE_APPEND in parallel)
- Copy temporary table to destination table (or partition). (WRITE_APPEND)
- Insert data into existing table (or partition) directly. (WRITE_APPEND in parallel)
This is not transactional, i.e., if fails, the target table could have some rows inserted.
- Load to temporary table (Create and WRITE_APPEND in parallel)
- Copy temporary table to destination table (or partition). (WRITE_TRUNCATE)
is_skip_job_result_check must be false when replace mode
NOTE: BigQuery does not support replacing (actually, copying into) a non-partitioned table with a paritioned table atomically. You must once delete the non-partitioned table, otherwise, you get Incompatible table partitioning specification when copying to the column partitioned table error.
- Load to temporary table (Create and WRITE_APPEND in parallel)
- Copy destination table (or partition) to backup table (or partition). (dataset_old, table_old)
- Copy temporary table to destination table (or partition). (WRITE_TRUNCATE)
is_skip_job_result_check must be false when replace_backup mode.
- Delete destination table (or partition), if it exists.
- Load to destination table (or partition).
There are four authentication methods
- service_account(or- json_keyfor backward compatibility)
- authorized_user
- compute_engine
- application_default
Use GCP service account credentials. You first need to create a service account, download its json key and deploy the key with embulk.
out:
  type: bigquery
  auth_method: service_account
  json_keyfile: /path/to/json_keyfile.jsonYou can also embed contents of json_keyfile at config.yml.
out:
  type: bigquery
  auth_method: service_account
  json_keyfile:
    content: |
      {
          "private_key_id": "123456789",
          "private_key": "-----BEGIN PRIVATE KEY-----\nABCDEF",
          "client_email": "..."
      }Use Google user credentials.
You can get your credentials at ~/.config/gcloud/application_default_credentials.json by running gcloud auth login.
out:
  type: bigquery
  auth_method: authorized_user
  json_keyfile: /path/to/credentials.jsonYou can also embed contents of json_keyfile at config.yml.
out:
  type: bigquery
  auth_method: authorized_user
  json_keyfile:
    content: |
      {
        "client_id":"xxxxxxxxxxx.apps.googleusercontent.com",
        "client_secret":"xxxxxxxxxxx",
        "refresh_token":"xxxxxxxxxxx",
        "type":"authorized_user"
      }On the other hand, you don't need to explicitly create a service account for embulk when you run embulk in Google Compute Engine. In this third authentication method, you need to add the API scope "https://www.googleapis.com/auth/bigquery" to the scope list of your Compute Engine VM instance, then you can configure embulk like this.
out:
  type: bigquery
  auth_method: compute_engineUse Application Default Credentials (ADC). ADC is a strategy to locate Google Cloud Service Account credentials.
- ADC checks to see if the environment variable GOOGLE_APPLICATION_CREDENTIALSis set. If the variable is set, ADC uses the service account file that the variable points to.
- ADC checks to see if ~/.config/gcloud/application_default_credentials.jsonis located. This file is created by runninggcloud auth application-default login.
- Use the default service account for credentials if the application running on Compute Engine, App Engine, Kubernetes Engine, Cloud Functions or Cloud Run.
See https://cloud.google.com/docs/authentication/production for details.
out:
  type: bigquery
  auth_method: application_defaulttable and option accept Time#strftime
format to construct table ids.
Table ids are formatted at runtime
using the local time of the embulk server.
For example, with the configuration below,
data is inserted into tables table_20150503, table_20150504 and so on.
out:
  type: bigquery
  table: table_%Y%m%dThere are 3 ways to set schema.
Please set file path of schema.json.
out:
  type: bigquery
  auto_create_table: true
  table: table_%Y%m%d
  schema_file: /path/to/schema.jsonPlugin will try to read schema from existing table and use it as schema template.
out:
  type: bigquery
  auto_create_table: true
  table: table_%Y%m%d
  template_table: existing_table_namePlugin will try to guess BigQuery schema from Embulk schema.  It is also configurable with column_options. See Column Options.
Column options are used to aid guessing BigQuery schema, or to define conversion of values:
- column_options: advanced: an array of options for columns
- name: column name
- type: BigQuery type such as BOOLEAN,INTEGER,FLOAT,STRING,TIMESTAMP,DATETIME,DATE, andRECORD. See belows for supported conversion type.- boolean:   BOOLEAN,STRING(default:BOOLEAN)
- long:      BOOLEAN,INTEGER,FLOAT,STRING,TIMESTAMP(default:INTEGER)
- double:    INTEGER,FLOAT,STRING,TIMESTAMP(default:FLOAT)
- string:    BOOLEAN,INTEGER,FLOAT,STRING,TIME,TIMESTAMP,DATETIME,DATE,RECORD(default:STRING)
- timestamp: INTEGER,FLOAT,STRING,TIME,TIMESTAMP,DATETIME,DATE(default:TIMESTAMP)
- json:      STRING,RECORD(default:STRING)
 
- boolean:   
- mode: BigQuery mode such as NULLABLE,REQUIRED, andREPEATED(string, default:NULLABLE)
- fields: Describes the nested schema fields if the type property is set to RECORD. Please note that this is required for RECORDcolumn.
- description: description (string, default is None).
- timestamp_format: timestamp format to convert into/from timestamp(string, default isdefault_timestamp_format)
- timezone: timezone to convert into/from timestamp,date(string, default isdefault_timezone).
 
- default_timestamp_format: default timestamp format for column_options (string, default is "%Y-%m-%d %H:%M:%S.%6N")
- default_timezone: default timezone for column_options (string, default is "UTC")
Example)
out:
  type: bigquery
  auto_create_table: true
  column_options:
    - {name: date, type: STRING, timestamp_format: %Y-%m-%d, timezone: "Asia/Tokyo"}
    - name: json_column
      type: RECORD
      fields:
        - {name: key1, type: STRING}
        - {name: key2, type: STRING}NOTE: Type conversion is done in this jruby plugin, and could be slow. See Formatter Performance Issue to improve the performance.
embulk-output-bigquery supports formatting records into CSV or JSON (and also formatting timestamp column). However, this plugin is written in jruby, and jruby plugins are slower than java plugins generally.
Therefore, it is recommended to format records with filter plugins written in Java such as embulk-filter-to_json as:
filters:
  - type: to_json
    column: {name: payload, type: string}
    default_format: "%Y-%m-%d %H:%M:%S.%6N"
out:
  type: bigquery
  payload_column_index: 0 # or, payload_column: payloadFurtheremore, if your files are originally jsonl or csv files, you can even skip a parser with embulk-parser-none as:
in:
  type: file
  path_prefix: example/example.jsonl
  parser:
    type: none
    column_name: payload
out:
  type: bigquery
  payload_column_index: 0 # or, payload_column: payloadThis is useful to reduce number of consumed jobs, which is limited by 100,000 jobs per project per day.
This plugin originally loads local files into BigQuery in parallel, that is, consumes a number of jobs, say 24 jobs on 24 CPU core machine for example (this depends on embulk parameters such as min_output_tasks and max_threads).
BigQuery supports loading multiple files from GCS with one job, therefore, uploading local files to GCS in parallel and then loading from GCS into BigQuery reduces number of consumed jobs to 1.
Using gcs_bucket option, such strategy is enabled. You may also use auto_create_gcs_bucket to create the specified GCS bucket automatically.
out:
  type: bigquery
  gcs_bucket: bucket_name
  auto_create_gcs_bucket: trueToDo: Use https://cloud.google.com/storage/docs/streaming if google-api-ruby-client supports streaming transfers into GCS.
From 0.4.0, embulk-output-bigquery supports to load into partitioned table. See also Creating and Updating Date-Partitioned Tables.
To load into a partition, specify table parameter with a partition decorator as:
out:
  type: bigquery
  table: table_name$20160929You may configure time_partitioning parameter together as:
out:
  type: bigquery
  table: table_name$20160929
  time_partitioning:
    type: DAY
    expiration_ms: 259200000You can also create column-based partitioning table as:
out:
  type: bigquery
  mode: replace
  table: table_name
  time_partitioning:
    type: DAY
    field: timestampNote the time_partitioning.field should be top-level DATE or TIMESTAMP.
Use Tables: patch API to update the schema of the partitioned table, embulk-output-bigquery itself does not support it, though.
Note that only adding a new column, and relaxing non-necessary columns to be NULLABLE are supported now. Deleting columns, and renaming columns are not supported.
MEMO: jobs#configuration.load.schemaUpdateOptions is available
to update the schema of the desitination table as a side effect of the load job, but it is not available for copy job.
Thus, it was not suitable for embulk-output-bigquery idempotence modes, append, replace, and replace_backup, sigh.
See also Creating and Updating Range-Partitioned Tables.
To load into a partition, specify range_partitioning and table parameter with a partition decorator as:
out:
  type: bigquery
  table: table_name$1
  range_partitioning:
    field: customer_id
    range:
      start: 1
      end: 99999
      interval: 1Prepare a json_keyfile at example/your-project-000.json, then
$ embulk bundle install --path vendor/bundle
$ embulk run -X page_size=1 -b . -l trace example/example.yml
Place your embulk with .jar extension:
$ curl -o embulk.jar --create-dirs -L "http://dl.embulk.org/embulk-latest.jar"
$ chmod a+x embulk.jar
Investigate JRUBY_VERSION and Bundler::VERSION included in the embulk.jar:
$ echo JRUBY_VERSION | ./embulk.jar irb
2019-08-10 00:59:11.866 +0900: Embulk v0.9.17
Switch to inspect mode.
JRUBY_VERSION
"X.X.X.X"
$ echo "require 'bundler'; Bundler::VERSION" | ./embulk.jar irb
2019-08-10 01:59:10.460 +0900: Embulk v0.9.17
Switch to inspect mode.
require 'bundler'; Bundler::VERSION
"Y.Y.Y"
Install the same version of jruby (change X.X.X.X to the version shown above) and bundler:
$ rbenv install jruby-X.X.X.X
$ rbenv local jruby-X.X.X.X
$ gem install bundler -v Y.Y.Y
Install dependencies (NOTE: Use bundler included in the embulk.jar, otherwise, gem 'embulk' is not found):
$ ./embulk.jar bundle install --path vendor/bundle
Run tests with env RUBYOPT="-r ./embulk.jar:
$ bundle exec env RUBYOPT="-r ./embulk.jar" rake test
To run tests which actually connects to BigQuery such as test/test_bigquery_client.rb, prepare a json_keyfile at example/your-project-000.json, then
$ bundle exec env RUBYOPT="-r ./embulk.jar" ruby test/test_bigquery_client.rb
$ bundle exec env RUBYOPT="-r ./embulk.jar" ruby test/test_example.rb
Change the version of gemspec, and write CHANGELOG.md. Then,
$ bundle exec rake release