Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 70 additions & 38 deletions lib/sequel-bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,40 @@ module Bigquery
DATABASE_SETUP = {}.freeze

class Database < Sequel::Database # rubocop:disable Metrics/ClassLength
class BigqueryDatasetConnection
def initialize(db:, dataset:)
@db = db
@dataset = dataset
@db.send(:log_each, :debug, 'BigqueryDatasetConnection#initialize')
end

def query_with_session_support(sql, log_query: true)
@db.send(:log_query, sql) if log_query
@db.send(:log_each, :debug,
"BigqueryDatasetConnection#query_with_session_support, using session_id: #{@db.bigquery_session_id.inspect}")
@dataset.query(sql, session_id: @db.bigquery_session_id)
end

def query_job(*args, **kawrgs)
@dataset.query_job(*args, **kawrgs)
end

def ensure_job_succeeded!(job)
@dataset.send(:ensure_job_succeeded!, job)
end
end

set_adapter_scheme :bigquery

def initialize(*args, **kwargs)
@bigquery_config = kwargs.fetch(:orig_opts)
@sql_buffer = []
@sql_buffering = false
super
end

def connect(*_args)
log_each(:debug, '#connect')
get_or_create_bigquery_dataset
dataset = get_or_create_bigquery_dataset
BigqueryDatasetConnection.new(db: self, dataset: dataset)
.tap { log_each(:debug, '#connect end') }
end

Expand All @@ -51,12 +73,10 @@ def drop_datasets(*dataset_names_to_drop)
end
alias drop_dataset drop_datasets

def execute(sql, opts = OPTS) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def execute(sql, opts = OPTS) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/PerceivedComplexity
log_each(:debug, '#execute')
log_query(sql)

# require 'pry'; binding.pry if sql =~ /CREATE TABLE IF NOT EXISTS/i

sql = sql.gsub(/\sdefault \S+/i) do
warn_default_removal(sql)
''
Expand All @@ -67,22 +87,9 @@ def execute(sql, opts = OPTS) # rubocop:disable Metrics/MethodLength, Metrics/Ab
sql += ' where 1 = 1'
end

if /^begin/i.match?(sql)
warn_transaction
@sql_buffering = true
end

if @sql_buffering
@sql_buffer << sql
return [] unless /^commit/i.match?(sql)
warn("Warning: Will now execute entire buffered transaction:\n" + @sql_buffer.join("\n"))
end

sql_to_execute = @sql_buffer.any? ? @sql_buffer.join("\n") : sql

synchronize(opts[:server]) do |conn|
results = log_connection_yield(sql, conn) do
conn.query(sql_to_execute)
conn.query_with_session_support(sql, log_query: false)
end
log_each(:debug, results.awesome_inspect)
if block_given?
Expand All @@ -93,7 +100,7 @@ def execute(sql, opts = OPTS) # rubocop:disable Metrics/MethodLength, Metrics/Ab
rescue Google::Cloud::InvalidArgumentError, Google::Cloud::PermissionDeniedError => e
if e.message.include?('too many table update operations for this table')
warn('Triggered rate limit of table update operations for this table. For more information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas')
if retryable_query?(sql_to_execute)
if retryable_query?(sql)
warn('Detected retryable query - re-running query after a 1 second sleep')
sleep 1
retry
Expand All @@ -104,11 +111,14 @@ def execute(sql, opts = OPTS) # rubocop:disable Metrics/MethodLength, Metrics/Ab
raise_error(e)
rescue ArgumentError => e
raise_error(e)
end # rubocop:disable Style/MultilineBlockChain
.tap do
@sql_buffer = []
@sql_buffering = false
end
end
end

def transaction(*)
warn('#transaction start')
super
ensure
warn('#transaction end')
end

def supports_create_table_if_not_exists?
Expand All @@ -127,10 +137,27 @@ def type_literal_generic_float(_column)
:float64
end

def bigquery_session_id
# @bigquery_session_id #if in_transaction?
@bigquery_session_id ||= synchronize(opts[:server]) do |conn|
create_bigquery_session(conn)
end
end

private

attr_reader :bigquery_config

# Unfortunately, google-cloud-bigquery doesn't provide a way to create a session by itself; so we have create one by running a query. But Google::Cloud::Bigquery::Dataset#query doesn't support the create_session argument (only session_id), so we have to basically duplicate the functionality of #query to pass the create_session argument to the lower-level #query_job
def create_bigquery_session(conn)
log_each(:debug, 'Creating BigQuery session for use in transactions')
job = conn.query_job('select 1', create_session: true)
job.wait_until_done!
conn.ensure_job_succeeded!(job)
job.session_id
.tap { log_each(:debug, 'Session created') }
end

def google_cloud_bigquery_gem_config
bigquery_config.dup.tap do |config|
%i[
Expand All @@ -157,7 +184,7 @@ def bigquery_dataset_name
end

def connection_execute_method
:query
:query_with_session_support
end

def database_error_classes
Expand All @@ -169,17 +196,6 @@ def dataset_class_default
Dataset
end

def schema_parse_table(_table_name, _opts)
log_each(:debug, Paint['schema_parse_table', :red, :bold])
# require 'pry'; binding.pry
bigquery.datasets.map do |dataset|
[
dataset.dataset_id,
{},
]
end
end

def disconnect_error?(e, opts) # rubocop:disable Lint/UselessMethodDefinition
# super || (e.is_a?(::ODBC::Error) && /\A08S01/.match(e.message))
super
Expand Down Expand Up @@ -238,6 +254,22 @@ def single_statement_query?(sql)
def alter_table_query?(sql)
sql.match?(/\Aalter table /i)
end

# Appending a SELECT prevents an error due to these queries having no destination table:
# google/cloud/bigquery/query_job.rb:1799:in `destination_table_dataset_id': undefined method `dataset_id' for nil:NilClass (NoMethodError)
# See https://github.com/googleapis/google-cloud-ruby/issues/9617

def begin_transaction_sql
'BEGIN; SELECT 1'
end

def commit_transaction_sql
'COMMIT; SELECT 1'
end

def rollback_transaction_sql
'ROLLBACK; SELECT 1'
end
end

class Dataset < Sequel::Dataset
Expand Down
45 changes: 43 additions & 2 deletions spec/sequel-bigquery_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def isolated_dataset_name(name)
end

it 'supports partitioning arguments' do
expect(dataset).to have_received(:query).with(expected_sql)
expect(dataset).to have_received(:query).with(expected_sql, session_id: anything)
end
end

Expand Down Expand Up @@ -199,7 +199,7 @@ def isolated_dataset_name(name)
end

it 'combines queries into one alter table statement' do
expect(dataset).to have_received(:query).with(expected_sql)
expect(dataset).to have_received(:query).with(expected_sql, session_id: anything)
end
end

Expand Down Expand Up @@ -249,4 +249,45 @@ def directly_create_table_and_trigger_rate_limit
end
end
end

describe 'using a standard transaction across multiple queries' do
before do
recreate_dataset
db.execute('create table books (name string)')
end

it 'can commit' do
db.transaction do
db[:books].insert(name: 'The Name of the Wind')
end
expect(db[:books].all).to eq([{ name: 'The Name of the Wind' }])
end

it 'can rollback' do
db.transaction do
db[:books].insert(name: 'The Name of the Wind')
raise Sequel::Rollback
end
expect(db[:books].all).to eq([])
end
end

describe 'using Sequel::Model' do
before do
recreate_dataset
db.execute('create table books (name string)')
end

let(:book_model_class) { Sequel::Model(db[:books]) }
let(:book) { book_model_class.new(name: 'The Name of the Wind') }

it 'can define and instantiate a Sequel model' do
expect(book.name).to eq('The Name of the Wind')
end

xit 'can save and load a Sequel model' do
book.save
expect(book_model_class.all).to eq([book])
end
end
end