From db7e16ef578cd51b03a5ff1ebda552d6e74b56ea Mon Sep 17 00:00:00 2001 From: Aleksey Vyskubov Date: Wed, 1 Sep 2021 13:25:21 +0300 Subject: [PATCH 1/6] Update kafka_source.h --- include/kspp/sources/kafka_source.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/kspp/sources/kafka_source.h b/include/kspp/sources/kafka_source.h index afb9f62..e3518d9 100644 --- a/include/kspp/sources/kafka_source.h +++ b/include/kspp/sources/kafka_source.h @@ -23,6 +23,7 @@ namespace kspp { } void start(int64_t offset) override { + _thread = std::thread(&kafka_source_base::thread_f, this); _impl.start(offset); _started = true; } @@ -92,7 +93,6 @@ namespace kspp { : partition_source(nullptr, partition) , _started(false) , _exit(false) - , _thread(&kafka_source_base::thread_f, this) , _impl(config, topic, partition, consumer_group) , _key_codec(key_codec) , _val_codec(val_codec) From 15a87426544c990fafb6c2155e6d91bf203c60ca Mon Sep 17 00:00:00 2001 From: Aleksey Vyskubov Date: Thu, 2 Sep 2021 11:47:05 +0300 Subject: [PATCH 2/6] wait for kafka_source::thread_f() to finish wait for kafka_source_base::thread_f() to finish before closing and destroying kafka_source --- include/kspp/sources/kafka_source.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/include/kspp/sources/kafka_source.h b/include/kspp/sources/kafka_source.h index e3518d9..62b72fa 100644 --- a/include/kspp/sources/kafka_source.h +++ b/include/kspp/sources/kafka_source.h @@ -93,6 +93,7 @@ namespace kspp { : partition_source(nullptr, partition) , _started(false) , _exit(false) + , _thread_f_finished(false) , _impl(config, topic, partition, consumer_group) , _key_codec(key_codec) , _val_codec(val_codec) @@ -163,12 +164,14 @@ namespace kspp { _commit_chain_size.set(_commit_chain.size()); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } + _thread_f_finished = true; DLOG(INFO) << "exiting thread"; } size_t _max_incomming_queue_size=1000; bool _started; bool _exit; + bool _thread_f_finished; std::thread _thread; event_queue _incomming_msg; kafka_consumer _impl; @@ -212,6 +215,13 @@ namespace kspp { key_codec, val_codec) { } + + ~kafka_source() override + { + kafka_source_base::close(); + while (!kafka_source_base::_thread_f_finished) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } protected: std::shared_ptr> parse(const std::unique_ptr &ref) override { From 3923dfa6c5f72f692000cef17916257d5dc0de38 Mon Sep 17 00:00:00 2001 From: Aleksey Vyskubov Date: Thu, 2 Sep 2021 13:17:07 +0300 Subject: [PATCH 3/6] Update kafka_source.h check exit condition in consuming loop --- include/kspp/sources/kafka_source.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/kspp/sources/kafka_source.h b/include/kspp/sources/kafka_source.h index 62b72fa..6c3bd1c 100644 --- a/include/kspp/sources/kafka_source.h +++ b/include/kspp/sources/kafka_source.h @@ -143,7 +143,7 @@ namespace kspp { while(!_exit) { //auto tick = kspp::milliseconds_since_epoch(); - while (auto p = _impl.consume()) { + while (auto p = _impl.consume() && !_exit) { auto decoded_msg = parse(p); if (decoded_msg) { _incomming_msg.push_back(decoded_msg); From 5f23c83ddecf7f4841dcde1fa6d021bcedd41391 Mon Sep 17 00:00:00 2001 From: Aleksey Vyskubov Date: Thu, 2 Sep 2021 14:06:30 +0300 Subject: [PATCH 4/6] Update kafka_source.h --- include/kspp/sources/kafka_source.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/kspp/sources/kafka_source.h b/include/kspp/sources/kafka_source.h index 6c3bd1c..4c9e338 100644 --- a/include/kspp/sources/kafka_source.h +++ b/include/kspp/sources/kafka_source.h @@ -143,7 +143,7 @@ namespace kspp { while(!_exit) { //auto tick = kspp::milliseconds_since_epoch(); - while (auto p = _impl.consume() && !_exit) { + while ((auto p = _impl.consume()) && !_exit) { auto decoded_msg = parse(p); if (decoded_msg) { _incomming_msg.push_back(decoded_msg); From 7c1c0c8b99a2881c33a97b77f2ae03cc31381b37 Mon Sep 17 00:00:00 2001 From: Aleksey Vyskubov Date: Thu, 2 Sep 2021 15:24:57 +0300 Subject: [PATCH 5/6] Update kafka_source.h add specializations --- include/kspp/sources/kafka_source.h | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/include/kspp/sources/kafka_source.h b/include/kspp/sources/kafka_source.h index 4c9e338..ae74bd7 100644 --- a/include/kspp/sources/kafka_source.h +++ b/include/kspp/sources/kafka_source.h @@ -143,7 +143,7 @@ namespace kspp { while(!_exit) { //auto tick = kspp::milliseconds_since_epoch(); - while ((auto p = _impl.consume()) && !_exit) { + while (auto p = _impl.consume()) { auto decoded_msg = parse(p); if (decoded_msg) { _incomming_msg.push_back(decoded_msg); @@ -218,8 +218,8 @@ namespace kspp { ~kafka_source() override { - kafka_source_base::close(); - while (!kafka_source_base::_thread_f_finished) + this->close(); + while (!this->_thread_f_finished) std::this_thread::sleep_for(std::chrono::milliseconds(100)); } @@ -305,6 +305,13 @@ namespace kspp { val_codec) { } + ~kafka_source() override + { + this->close(); + while (!this->_thread_f_finished) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + protected: std::shared_ptr> parse(const std::unique_ptr &ref) override { if (!ref) @@ -363,6 +370,13 @@ namespace kspp { nullptr) { } + ~kafka_source() override + { + this->close(); + while (!this->_thread_f_finished) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + protected: std::shared_ptr> parse(const std::unique_ptr &ref) override { if (!ref || ref->key_len() == 0) From b97dc5fc852be45faf547ff1e6c2aa7e7f8f5528 Mon Sep 17 00:00:00 2001 From: Aleksey Vyskubov Date: Thu, 2 Sep 2021 17:15:31 +0300 Subject: [PATCH 6/6] remove double call to close() --- include/kspp/sources/kafka_source.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/include/kspp/sources/kafka_source.h b/include/kspp/sources/kafka_source.h index ae74bd7..80e2867 100644 --- a/include/kspp/sources/kafka_source.h +++ b/include/kspp/sources/kafka_source.h @@ -218,7 +218,6 @@ namespace kspp { ~kafka_source() override { - this->close(); while (!this->_thread_f_finished) std::this_thread::sleep_for(std::chrono::milliseconds(100)); } @@ -307,7 +306,6 @@ namespace kspp { ~kafka_source() override { - this->close(); while (!this->_thread_f_finished) std::this_thread::sleep_for(std::chrono::milliseconds(100)); } @@ -372,7 +370,6 @@ namespace kspp { ~kafka_source() override { - this->close(); while (!this->_thread_f_finished) std::this_thread::sleep_for(std::chrono::milliseconds(100)); }