Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ruby.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ jobs:
run: archery docker push ubuntu-ruby

macos:
name: ARM64 macOS 14 GLib & Ruby
name: ARM64 macOS GLib & Ruby
runs-on: macos-latest
if: ${{ !contains(github.event.pull_request.title, 'WIP') }}
timeout-minutes: 60
Expand Down
22 changes: 20 additions & 2 deletions ruby/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ end

packages.each do |package|
namespace package do
package_dir = File.join(base_dir, package)

desc "Run test for #{package}"
task :test do
cd(File.join(base_dir, package)) do
cd(package_dir) do
if ENV["USE_BUNDLER"]
sh("bundle", "exec", "rake", "test")
else
Expand All @@ -46,9 +48,22 @@ packages.each do |package|
end
end

desc "Run benchmark for #{package}"
task :benchmark do
cd(package_dir) do
if File.directory?("benchmark")
if ENV["USE_BUNDLER"]
sh("bundle", "exec", "rake", "benchmark")
else
ruby("-S", "rake", "benchmark")
end
end
end
end

desc "Install #{package}"
task :install do
cd(File.join(base_dir, package)) do
cd(package_dir) do
if ENV["USE_BUNDLER"]
sh("bundle", "exec", "rake", "install")
else
Expand All @@ -70,6 +85,9 @@ end
desc "Run test for all packages"
task test: sorted_packages.collect {|package| "#{package}:test"}

desc "Run benchmark for all packages"
task benchmark: sorted_packages.collect {|package| "#{package}:benchmark"}

desc "Install all packages"
task install: sorted_packages.collect {|package| "#{package}:install"}

Expand Down
8 changes: 6 additions & 2 deletions ruby/red-arrow-format/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ source "https://rubygems.org/"

gemspec

gem "rake"
gem "red-arrow", path: "../red-arrow"
gem "test-unit"

group :development do
gem "benchmark-driver"
gem "rake"
gem "test-unit"
end
22 changes: 22 additions & 0 deletions ruby/red-arrow-format/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,28 @@ task :test do
end
end

benchmark_tasks = []
namespace :benchmark do
Dir.glob("benchmark/*.yaml").sort.each do |yaml|
name = File.basename(yaml, ".*")
command_line = [
RbConfig.ruby, "-v", "-S", "benchmark-driver", File.expand_path(yaml),
]

desc "Run #{name} benchmark"
task name do
puts("```console")
puts("$ #{command_line.join(" ")}")
sh(*command_line, verbose: false)
puts("```")
end
benchmark_tasks << "benchmark:#{name}"
end
end

desc "Run all benchmarks"
task :benchmark => benchmark_tasks

namespace :flat_buffers do
desc "Generate FlatBuffers code"
task :generate do
Expand Down
53 changes: 53 additions & 0 deletions ruby/red-arrow-format/benchmark/file-reader.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

prelude: |
Warning[:experimental] = false

require "arrow"
require "arrow-format"

seed = 29
random = Random.new(seed)

n_columns = 100
n_rows = 10000
max_uint32 = 2 ** 32 - 1
arrays = n_columns.times.collect do |i|
if i.even?
Arrow::UInt32Array.new(n_rows.times.collect {random.rand(max_uint32)})
else
Arrow::BinaryArray.new(n_rows.times.collect {random.bytes(random.rand(10))})
end
end
table = Arrow::Table.new(arrays.collect.with_index {|array, i| [i, array]})
buffer = Arrow::ResizableBuffer.new(4096)
table.save(buffer, format: :arrow_file)

GC.start
GC.disable
benchmark:
"Arrow::Table.load": |
Arrow::Table.load(buffer, format: :arrow_file)
"Arrow::RecordBatchFileReader": |
Arrow::BufferInputStream.open(buffer) do |input|
Arrow::RecordBatchFileReader.new(input).each do
end
end
"ArrowFormat::FileReader": |
ArrowFormat::FileReader.new(buffer.data.to_s).each do
end
53 changes: 53 additions & 0 deletions ruby/red-arrow-format/benchmark/streaming-reader.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

prelude: |
Warning[:experimental] = false

require "arrow"
require "arrow-format"

seed = 29
random = Random.new(seed)

n_columns = 100
n_rows = 10000
max_uint32 = 2 ** 32 - 1
arrays = n_columns.times.collect do |i|
if i.even?
Arrow::UInt32Array.new(n_rows.times.collect {random.rand(max_uint32)})
else
Arrow::BinaryArray.new(n_rows.times.collect {random.bytes(random.rand(10))})
end
end
table = Arrow::Table.new(arrays.collect.with_index {|array, i| [i, array]})
buffer = Arrow::ResizableBuffer.new(4096)
table.save(buffer, format: :arrow_streaming)

GC.start
GC.disable
benchmark:
"Arrow::Table.load": |
Arrow::Table.load(buffer, format: :arrow_streaming)
"Arrow::RecordBatchStreamReader": |
Arrow::BufferInputStream.open(buffer) do |input|
Arrow::RecordBatchStreamReader.new(input).each do
end
end
"ArrowFormat::StreamingReader": |
ArrowFormat::StreamingReader.new(buffer.data.to_s).each do
end
27 changes: 22 additions & 5 deletions ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,17 @@ class StreamingReader
include Enumerable

def initialize(input)
@input = input
case input
when File
@input = IO::Buffer.map(input, nil, 0, IO::Buffer::READONLY)
@offset = 0
when String
@input = IO::Buffer.for(input)
@offset = 0
else
@input = input
end

@on_read = nil
@pull_reader = StreamingPullReader.new do |record_batch|
@on_read.call(record_batch) if @on_read
Expand Down Expand Up @@ -53,11 +63,18 @@ def consume
next_size = @pull_reader.next_required_size
return false if next_size.zero?

next_chunk = @input.read(next_size, @buffer)
return false if next_chunk.nil?
if @input.is_a?(IO::Buffer)
next_chunk = @input.slice(@offset, next_size)
@offset += next_size
@pull_reader.consume(next_chunk)
true
else
next_chunk = @input.read(next_size, @buffer)
return false if next_chunk.nil?

@pull_reader.consume(IO::Buffer.for(next_chunk))
true
@pull_reader.consume(IO::Buffer.for(next_chunk))
true
end
end

def ensure_schema
Expand Down
82 changes: 77 additions & 5 deletions ruby/red-arrow-format/test/test-reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ def roundtrip(data)
else
table = data
end
path = File.join(tmp_dir, "data.#{file_extension}")
table.save(path)
File.open(path, "rb") do |input|
open_input(table, tmp_dir) do |input|
reader = reader_class.new(input)
case data
when Arrow::Array
Expand Down Expand Up @@ -677,8 +675,42 @@ def test_dictionary
end
end

class TestFileReader < Test::Unit::TestCase
module FileInput
def open_input(table, tmp_dir, &block)
path = File.join(tmp_dir, "data.#{file_extension}")
table.save(path)
File.open(path, "rb", &block)
end
end

module PipeInput
def open_input(table, tmp_dir, &block)
buffer = Arrow::ResizableBuffer.new(4096)
table.save(buffer, format: format)
IO.pipe do |input, output|
write_thread = Thread.new do
output.write(buffer.data.to_s)
end
begin
yield(input)
ensure
write_thread.join
end
end
end
end

module StringInput
def open_input(table, tmp_dir)
buffer = Arrow::ResizableBuffer.new(4096)
table.save(buffer, format: format)
yield(buffer.data.to_s)
end
end

class TestFileReaderFileInput < Test::Unit::TestCase
include ReaderTests
include FileInput

def file_extension
"arrow"
Expand All @@ -689,8 +721,22 @@ def reader_class
end
end

class TestStreamingReader < Test::Unit::TestCase
class TestFileReaderStringInput < Test::Unit::TestCase
include ReaderTests
include StringInput

def format
:arrow_file
end

def reader_class
ArrowFormat::FileReader
end
end

class TestStreamingReaderFileInupt < Test::Unit::TestCase
include ReaderTests
include FileInput

def file_extension
"arrows"
Expand All @@ -700,3 +746,29 @@ def reader_class
ArrowFormat::StreamingReader
end
end

class TestStreamingReaderPipeInupt < Test::Unit::TestCase
include ReaderTests
include PipeInput

def format
:arrow_streaming
end

def reader_class
ArrowFormat::StreamingReader
end
end

class TestStreamingReaderStringInupt < Test::Unit::TestCase
include ReaderTests
include StringInput

def format
:arrow_streaming
end

def reader_class
ArrowFormat::StreamingReader
end
end
Loading