Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# encoding: utf-8
source 'https://rubygems.org'

gemspec
3 changes: 2 additions & 1 deletion aws-kclrb.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ Gem::Specification.new do |spec|
spec.summary = 'Amazon Kinesis Client Library for Ruby'
spec.description = 'A ruby interface for the Amazon Kinesis Client Library MultiLangDaemon'
spec.author = 'Amazon Web Services'
spec.files = Dir['lib/**/*.rb']
spec.files = Dir['lib/**/*.rb']
spec.files += Dir['spec/**/*.rb']
spec.files += ['README.md', 'LICENSE.txt', 'VERSION', 'NOTICE.txt', '.yardopts', '.rspec']
spec.license = 'Amazon Software License'
spec.platform = Gem::Platform::RUBY
spec.homepage = 'http://github.com/aws/amazon-kinesis-client-ruby'
spec.require_paths = ['lib']

spec.add_dependency('aws-sdk', '~> 2.0')
spec.add_dependency('multi_json', '~> 1.0')
end
10 changes: 5 additions & 5 deletions lib/aws/kclrb.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
#
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#
# http://aws.amazon.com/asl/
#
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

require 'aws/kclrb/record_processor'
require 'aws/kclrb/kcl_process'
require_relative 'kclrb/record_processor'
require_relative 'kclrb/kcl_process'
46 changes: 23 additions & 23 deletions lib/aws/kclrb/checkpointer.rb
Original file line number Diff line number Diff line change
@@ -1,59 +1,59 @@
#
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#
# http://aws.amazon.com/asl/
#
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

require 'aws/kclrb/io_proxy'
require_relative 'io_proxy'

module Aws
module KCLrb
# Error class used for wrapping exception names passed through the
# Error class used for wrapping exception names passed through the
# input stream.
class CheckpointError < RuntimeError
# @!attribute [r] value
# @return [String] the name of the exception wrapped by this instance.
attr_reader :value
# @param value [String] The name of the exception that was received
# while checkpointing. For more details see

# @param value [String] The name of the exception that was received
# while checkpointing. For more details see
# {https://github.com/awslabs/amazon-kinesis-client/tree/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/exceptions KCL exceptions}.
# Any of these exception names could be returned by the {https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java MultiLangDaemon}
# as a response to a checkpoint action.
def initialize(value)
@value = value
end

# @return [String] the name of the wrapped exception.
def to_s
@value.to_s
end
end

# @abstract
# A checkpointer class which allows you to make checkpoint requests.
#
# A checkpoint marks a point in a shard where you've successfully
# processed to. If this processor fails or loses its lease to that
# shard, another processor will be started either by this
# {https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java MultiLangDaemon}
# or a different instance and resume at the most recent checkpoint
#
# A checkpoint marks a point in a shard where you've successfully
# processed to. If this processor fails or loses its lease to that
# shard, another processor will be started either by this
# {https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java MultiLangDaemon}
# or a different instance and resume at the most recent checkpoint
# in this shard.
class Checkpointer

# Checkpoints at a particular sequence number you provide or if `nil`
# was passed, the checkpoint will be at the end of the most recently
# delivered list of records.
# delivered list of records.
#
# @param sequence_number [String, nil] The sequence number to checkpoint at
# @param sequence_number [String, nil] The sequence number to checkpoint at
# or `nil` if you want to checkpoint at the farthest record.
# @raise [CheckpointError] if the {https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java MultiLangDaemon}
# returned a response indicating an error, or if the checkpointer
Expand All @@ -62,8 +62,8 @@ def checkpoint(sequence_number=nil)
fail NotImplementedError.new
end
end


# @api private
# Default implementation of the {Checkpointer} abstract class.
class CheckpointerImpl
Expand All @@ -72,7 +72,7 @@ class CheckpointerImpl
def initialize(io_proxy)
@io_proxy = io_proxy
end

# (see Checkpointer#checkpoint)
def checkpoint(sequence_number=nil)
@io_proxy.write_action('checkpoint', 'checkpoint' => sequence_number)
Expand All @@ -89,7 +89,7 @@ def checkpoint(sequence_number=nil)
# to the RecordProcessor indicating that the KCL (or KCLrb) is in
# an invalid state. See KCL documentation for description of this
# exception. Note that the documented guidance is that this exception
# is NOT retriable so the client code should exit (see
# is NOT retriable so the client code should exit (see
# https://github.com/awslabs/amazon-kinesis-client/tree/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/exceptions)
raise CheckpointError.new('InvalidStateException')
end
Expand Down
22 changes: 11 additions & 11 deletions lib/aws/kclrb/kcl_process.rb
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
#
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#
# http://aws.amazon.com/asl/
#
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

require 'aws/kclrb/io_proxy'
require 'aws/kclrb/checkpointer'
require_relative 'io_proxy'
require_relative 'checkpointer'

module Aws
module KCLrb
# Error raised if the {KCLProcess} received an input action that it
# could not parse or it could not handle.
class MalformedAction < RuntimeError; end

# Entry point for a KCL application in Ruby.
#
# Implementers of KCL applications in Ruby should instantiate this
Expand All @@ -36,7 +36,7 @@ def initialize(processor, input=$stdin, output=$stdout, error=$stderr)
@io_proxy = IOProxy.new(input, output, error)
@checkpointer = CheckpointerImpl.new(@io_proxy)
end

# Starts this KCL processor's main loop.
def run
action = @io_proxy.read_action
Expand All @@ -45,9 +45,9 @@ def run
action = @io_proxy.read_action
end
end

private

# @api private
# Parses an input action and invokes the appropriate method of the
# record processor.
Expand Down Expand Up @@ -75,7 +75,7 @@ def process_action(action)
rescue KeyError => ke
raise MalformedAction.new("Action '#{action}': #{ke.message}")
end

# @api private
# Calls the specified method on the record processor, and handles
# any resulting exceptions by writing to the error stream.
Expand All @@ -89,7 +89,7 @@ def dispatch_to_processor(method, *args)
# of issue.
@io_proxy.write_error(processor_error)
end

end
end
end
14 changes: 7 additions & 7 deletions samples/sample_kcl.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
#! /usr/bin/env ruby
#
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#
# http://aws.amazon.com/asl/
#
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

require 'aws/kclrb'
require_relative '../lib/aws/kclrb'
require 'base64'
require 'tmpdir'
require 'fileutils'
Expand Down Expand Up @@ -84,15 +84,15 @@ def shutdown(checkpointer, reason)

private
# Helper method that retries checkpointing once.
# @param checkpointer [Aws::KCLrb::Checkpointer] The checkpointer instance to use.
# @param sequence_number (see Aws::KCLrb::Checkpointer#checkpoint)
# @param checkpointer [Aws::KCLrb::Checkpointer] The checkpointer instance to use.
# @param sequence_number (see Aws::KCLrb::Checkpointer#checkpoint)
def checkpoint_helper(checkpointer, sequence_number=nil)
begin
checkpointer.checkpoint(sequence_number)
rescue Aws::KCLrb::CheckpointError => e
# Here, we simply retry once.
# More sophisticated retry logic is recommended.
checkpointer.checkpoint(last_seq) if last_seq
checkpointer.checkpoint(sequence_number) if sequence_number
end
end
end
Expand Down
16 changes: 8 additions & 8 deletions samples/sample_kcl_producer.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
#! /usr/bin/env ruby
#
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
#
# http://aws.amazon.com/asl/
#
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

require 'aws/kinesis'
require 'aws-sdk'
require 'multi_json'
require 'optparse'

Expand All @@ -39,8 +39,8 @@ def delete_stream_if_exists
begin
@kinesis.delete_stream(:stream_name => @stream_name)
puts "Deleted stream #{@stream_name}"
rescue AWS::Kinesis::Errors::ResourceNotFoundException
# nothing to do
rescue Aws::Kinesis::Errors::ResourceNotFoundException
# nothing to do
end
end

Expand All @@ -56,7 +56,7 @@ def create_stream_if_not_exists
fail "Stream #{@stream_name} has #{desc[:shards].size} shards, while requested number of shards is #{@shard_count}"
end
puts "Stream #{@stream_name} already exists with #{desc[:shards].size} shards"
rescue AWS::Kinesis::Errors::ResourceNotFoundException
rescue Aws::Kinesis::Errors::ResourceNotFoundException
puts "Creating stream #{@stream_name} with #{@shard_count || 2} shards"
@kinesis.create_stream(:stream_name => @stream_name,
:shard_count => @shard_count || 2)
Expand Down Expand Up @@ -142,7 +142,7 @@ def wait_for_stream_to_become_active
# See http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html
kconfig = {}
kconfig[:region] = aws_region if aws_region
kinesis = AWS::Kinesis::Client.new(kconfig)
kinesis = Aws::Kinesis::Client.new(kconfig)

producer = SampleProducer.new(kinesis, stream_name, sleep_between_puts, shard_count)
producer.run(timeout)
Expand Down