diff --git a/examples/consumer.c b/examples/consumer.c
index f621a9db35..8f33edd69c 100644
--- a/examples/consumer.c
+++ b/examples/consumer.c
@@ -149,6 +149,29 @@ int main(int argc, char **argv) {
return 1;
}
+ if (rd_kafka_conf_set(conf, "share.consumer", "true", errstr,
+ sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+ fprintf(stderr, "%s\n", errstr);
+ rd_kafka_conf_destroy(conf);
+ return 1;
+ }
+
+
+ if (rd_kafka_conf_set(conf, "enable.auto.commit", "false", errstr,
+ sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+ fprintf(stderr, "%s\n", errstr);
+ rd_kafka_conf_destroy(conf);
+ return 1;
+ }
+
+
+ if (rd_kafka_conf_set(conf, "debug", "cgrp", errstr,
+ sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+ fprintf(stderr, "%s\n", errstr);
+ rd_kafka_conf_destroy(conf);
+ return 1;
+ }
+
/*
* Create consumer instance.
*
diff --git a/src/rdkafka.c b/src/rdkafka.c
index f3d5b7f3fc..1c2f7045cc 100644
--- a/src/rdkafka.c
+++ b/src/rdkafka.c
@@ -2844,6 +2844,49 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
return NULL;
}
+rd_kafka_t *rd_kafka_share_consumer_new(
+ rd_kafka_conf_t *conf, char *errstr, size_t errstr_size) {
+ rd_kafka_t *rk;
+ char errstr_internal[512];
+ rd_kafka_conf_res_t res;
+
+ if (conf == NULL) {
+ rd_snprintf(errstr, errstr_size,
+ "rd_kafka_share_consumer_new(): "
+ "conf argument must not be NULL");
+ return NULL;
+ }
+
+ res = rd_kafka_conf_set(conf, "share.consumer", "true", errstr_internal,
+ sizeof(errstr_internal));
+ if (res != RD_KAFKA_CONF_OK) {
+ rd_snprintf(errstr, errstr_size,
+ "rd_kafka_share_consumer_new(): "
+ "Failed to set share.consumer=true: %s",
+ errstr_internal);
+ return NULL;
+ }
+
+
+ res = rd_kafka_conf_set(conf, "group.protocol", "consumer", errstr_internal,
+ sizeof(errstr_internal));
+ if (res != RD_KAFKA_CONF_OK) {
+ rd_snprintf(errstr, errstr_size,
+ "rd_kafka_share_consumer_new(): "
+ "Failed to set group.protocol=consumer: %s",
+ errstr_internal);
+ return NULL;
+ }
+
+ rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, errstr_size);
+ if (!rk) {
+ /* If rd_kafka_new() failed it will have set the last error
+ * and filled out errstr, so we don't need to do that here. */
+ return NULL;
+ }
+ return rk;
+}
+
/**
* Schedules a rebootstrap of the cluster immediately.
*
diff --git a/src/rdkafka.h b/src/rdkafka.h
index 3565b1c5a8..6abd1534ca 100644
--- a/src/rdkafka.h
+++ b/src/rdkafka.h
@@ -3057,6 +3057,9 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
size_t errstr_size);
+RD_EXPORT
+rd_kafka_t *rd_kafka_share_consumer_new(rd_kafka_conf_t *conf, char *errstr, size_t errstr_size);
+
/**
* @brief Destroy Kafka handle.
*
diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c
index 0d85cbde32..dffd56d7eb 100644
--- a/src/rdkafka_cgrp.c
+++ b/src/rdkafka_cgrp.c
@@ -3363,6 +3363,304 @@ void rd_kafka_cgrp_handle_ConsumerGroupHeartbeat(rd_kafka_t *rk,
}
}
+void rd_kafka_cgrp_handle_ShareGroupHeartbeat(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+ const int log_decode_errors = LOG_ERR;
+ int16_t error_code = 0;
+ int actions = 0;
+ rd_kafkap_str_t error_str = RD_KAFKAP_STR_INITIALIZER_EMPTY;
+ rd_kafkap_str_t member_id;
+ int32_t member_epoch;
+ int32_t heartbeat_interval_ms;
+ int8_t are_assignments_present;
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY)
+ return;
+
+ rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT);
+
+ if (rd_kafka_cgrp_will_leave(rkcg))
+ err = RD_KAFKA_RESP_ERR__OUTDATED;
+ if (err)
+ goto err;
+
+ rd_kafka_buf_read_throttle_time(rkbuf);
+ rd_kafka_buf_read_i16(rkbuf, &error_code);
+ rd_kafka_buf_read_str(rkbuf, &error_str);
+
+ if (error_code) {
+ err = error_code;
+ goto err;
+ }
+
+ rd_kafka_buf_read_str(rkbuf, &member_id);
+ if (!RD_KAFKAP_STR_IS_NULL(&member_id)) {
+ rd_kafka_cgrp_set_member_id(rkcg, member_id.str);
+ }
+
+ rd_kafka_buf_read_i32(rkbuf, &member_epoch);
+ rkcg->rkcg_generation_id = member_epoch;
+
+ rd_kafka_buf_read_i32(rkbuf, &heartbeat_interval_ms);
+ if (heartbeat_interval_ms > 0) {
+ rkcg->rkcg_heartbeat_intvl_ms = heartbeat_interval_ms;
+ }
+
+ rd_kafka_buf_read_i8(rkbuf, &are_assignments_present);
+
+ if (are_assignments_present == 1) {
+ rd_kafka_topic_partition_list_t *assigned_topic_partitions;
+ const rd_kafka_topic_partition_field_t assignments_fields[] = {
+ RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_END};
+ assigned_topic_partitions = rd_kafka_buf_read_topic_partitions(
+ rkbuf, rd_true, rd_false /* Don't use Topic Name */, 0,
+ assignments_fields);
+
+ rd_kafka_dbg(
+ rk, CGRP, "HEARTBEAT",
+ "ShareGroupHeartbeat response received "
+ "assigned_topic_partitions size %d",
+ assigned_topic_partitions->cnt);
+
+ if (rd_kafka_is_dbg(rk, CGRP)) {
+ char assigned_topic_partitions_str[512] = "NULL";
+
+ if (assigned_topic_partitions) {
+ rd_kafka_topic_partition_list_str(
+ assigned_topic_partitions,
+ assigned_topic_partitions_str,
+ sizeof(assigned_topic_partitions_str), 0);
+ }
+
+ rd_kafka_dbg(
+ rk, CGRP, "HEARTBEAT",
+ "ShareGroupHeartbeat response received target "
+ "assignment \"%s\"",
+ assigned_topic_partitions_str);
+ }
+
+ if (assigned_topic_partitions) {
+ RD_IF_FREE(rkcg->rkcg_next_target_assignment,
+ rd_kafka_topic_partition_list_destroy);
+ rkcg->rkcg_next_target_assignment = NULL;
+ if (rd_kafka_cgrp_consumer_is_new_assignment_different(
+ rkcg, assigned_topic_partitions)) {
+ rkcg->rkcg_next_target_assignment =
+ assigned_topic_partitions;
+ } else {
+ rd_kafka_topic_partition_list_destroy(
+ assigned_topic_partitions);
+ assigned_topic_partitions = NULL;
+ }
+ }
+ }
+
+ if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY &&
+ (rkcg->rkcg_consumer_flags & RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK) &&
+ rkcg->rkcg_target_assignment) {
+ if (rkcg->rkcg_consumer_flags &
+ RD_KAFKA_CGRP_CONSUMER_F_SENDING_ACK) {
+ if (rkcg->rkcg_current_assignment)
+ rd_kafka_topic_partition_list_destroy(
+ rkcg->rkcg_current_assignment);
+ rkcg->rkcg_current_assignment =
+ rd_kafka_topic_partition_list_copy(
+ rkcg->rkcg_target_assignment);
+ rd_kafka_topic_partition_list_destroy(
+ rkcg->rkcg_target_assignment);
+ rkcg->rkcg_target_assignment = NULL;
+ rkcg->rkcg_consumer_flags &=
+ ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_ACK;
+
+ if (rd_kafka_is_dbg(rkcg->rkcg_rk, CGRP)) {
+ char rkcg_current_assignment_str[512] = "NULL";
+
+ rd_kafka_topic_partition_list_str(
+ rkcg->rkcg_current_assignment,
+ rkcg_current_assignment_str,
+ sizeof(rkcg_current_assignment_str), 0);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "HEARTBEAT",
+ "Target assignment acked, new "
+ "current assignment "
+ " \"%s\"",
+ rkcg_current_assignment_str);
+ }
+ } else if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) {
+ /* We've finished reconciliation but we weren't
+ * sending an ack, need to send a new HB with the ack.
+ */
+ rd_kafka_cgrp_consumer_expedite_next_heartbeat(
+ rkcg, "not subscribed anymore");
+ }
+ }
+
+
+ if (rkcg->rkcg_consumer_flags &
+ RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING &&
+ rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) {
+ /* TODO: Check if this should be done only for the
+ * steady state?
+ */
+ rd_kafka_assignment_serve(rk);
+ rkcg->rkcg_consumer_flags &=
+ ~RD_KAFKA_CGRP_CONSUMER_F_SERVE_PENDING;
+ }
+
+ if (rkcg->rkcg_next_target_assignment) {
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) {
+ rd_kafka_cgrp_consumer_next_target_assignment_request_metadata(
+ rk, rkb);
+ } else {
+ /* Consumer left the group sending an HB request
+ * while this one was in-flight. */
+ rd_kafka_topic_partition_list_destroy(
+ rkcg->rkcg_next_target_assignment);
+ rkcg->rkcg_next_target_assignment = NULL;
+ }
+ }
+
+ if (rd_kafka_cgrp_consumer_subscription_preconditions_met(rkcg))
+ rd_kafka_cgrp_consumer_expedite_next_heartbeat(
+ rkcg, "send new subscription");
+
+ rkcg->rkcg_consumer_flags &=
+ ~RD_KAFKA_CGRP_CONSUMER_F_SENDING_NEW_SUBSCRIPTION &
+ ~RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST;
+ rd_kafka_cgrp_maybe_clear_heartbeat_failed_err(rkcg);
+ rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ rkcg->rkcg_expedite_heartbeat_retries = 0;
+ rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+
+ return;
+
+err_parse:
+ err = rkbuf->rkbuf_err;
+err:
+ rkcg->rkcg_last_heartbeat_err = err;
+ rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+ switch (err) {
+ case RD_KAFKA_RESP_ERR__DESTROY:
+ /* quick cleanup */
+ return;
+
+ case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS:
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
+ "ShareGroupHeartbeat failed due to coordinator (%s) "
+ "loading in progress: %s: "
+ "retrying",
+ rkcg->rkcg_curr_coord
+ ? rd_kafka_broker_name(rkcg->rkcg_curr_coord)
+ : "none",
+ rd_kafka_err2str(err));
+ actions = RD_KAFKA_ERR_ACTION_RETRY;
+ break;
+
+ case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
+ case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR__TRANSPORT:
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
+ "ShareGroupHeartbeat failed due to coordinator (%s) "
+ "no longer available: %s: "
+ "re-querying for coordinator",
+ rkcg->rkcg_curr_coord
+ ? rd_kafka_broker_name(rkcg->rkcg_curr_coord)
+ : "none",
+ rd_kafka_err2str(err));
+ /* Remain in joined state and keep querying for coordinator */
+ actions = RD_KAFKA_ERR_ACTION_REFRESH;
+ break;
+
+ case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
+ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
+ "ShareGroupHeartbeat failed due to: %s: "
+ "will rejoin the group",
+ rd_kafka_err2str(err));
+ rkcg->rkcg_consumer_flags |=
+ RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN;
+ return;
+
+ case RD_KAFKA_RESP_ERR_INVALID_REQUEST:
+ case RD_KAFKA_RESP_ERR_GROUP_MAX_SIZE_REACHED:
+ case RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION:
+ case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE:
+ case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED:
+ actions = RD_KAFKA_ERR_ACTION_FATAL;
+ break;
+
+ default:
+ actions = rd_kafka_err_action(
+ rkb, err, request,
+
+ RD_KAFKA_ERR_ACTION_SPECIAL,
+ RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
+
+ RD_KAFKA_ERR_ACTION_END);
+ break;
+ }
+
+ if (actions & RD_KAFKA_ERR_ACTION_FATAL) {
+ rd_kafka_set_fatal_error(
+ rkcg->rkcg_rk, err,
+ "ShareGroupHeartbeat fatal error: %s",
+ rd_kafka_err2str(err));
+ rd_kafka_cgrp_revoke_all_rejoin_maybe(
+ rkcg, rd_true, /*assignments lost*/
+ rd_true, /*initiating*/
+ "Fatal error in ShareGroupHeartbeat API response");
+ return;
+ }
+
+ if (!rkcg->rkcg_heartbeat_intvl_ms) {
+ /* When an error happens on first HB, it should be always
+ * retried, unless fatal, to avoid entering a tight loop
+ * and to use exponential backoff. */
+ actions |= RD_KAFKA_ERR_ACTION_RETRY;
+ }
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Re-query for coordinator */
+ rkcg->rkcg_consumer_flags |=
+ RD_KAFKA_CGRP_CONSUMER_F_SEND_FULL_REQUEST;
+ rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err));
+ /* If coordinator changes, HB will be expedited. */
+ }
+
+ if (actions & RD_KAFKA_ERR_ACTION_SPECIAL) {
+ rd_ts_t min_error_interval =
+ RD_MAX(rkcg->rkcg_heartbeat_intvl_ms * 1000,
+ /* default group.consumer.heartbeat.interval.ms */
+ 5000000);
+ if (rkcg->rkcg_last_err != err ||
+ (rd_clock() >
+ rkcg->rkcg_ts_last_err + min_error_interval)) {
+ rd_kafka_cgrp_set_last_err(rkcg, err);
+ rd_kafka_consumer_err(
+ rkcg->rkcg_q, rd_kafka_broker_id(rkb), err, 0, NULL,
+ NULL, err,
+ "ShareGroupHeartbeat failed: %s%s%.*s",
+ rd_kafka_err2str(err),
+ RD_KAFKAP_STR_LEN(&error_str) ? ": " : "",
+ RD_KAFKAP_STR_PR(&error_str));
+ }
+ }
+
+ if (actions & RD_KAFKA_ERR_ACTION_RETRY &&
+ rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
+ !rd_kafka_cgrp_will_leave(rkcg) &&
+ rd_kafka_buf_retry(rkb, request)) {
+ /* Retry */
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+ }
+}
/**
* @brief Handle Heartbeat response.
@@ -6144,6 +6442,14 @@ void rd_kafka_cgrp_consumer_group_heartbeat(rd_kafka_cgrp_t *rkcg,
}
rkcg->rkcg_expedite_heartbeat_retries++;
+
+ if (RD_KAFKA_IS_SHARE_CONSUMER(rkcg->rkcg_rk)) {
+ rd_kafka_ShareGroupHeartbeatRequest(rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id,
+ member_epoch, rkcg_client_rack, rkcg_subscription_topics, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_cgrp_handle_ShareGroupHeartbeat, NULL);
+ return;
+ }
+
rd_kafka_ConsumerGroupHeartbeatRequest(
rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id,
member_epoch, rkcg_group_instance_id, rkcg_client_rack,
@@ -6205,6 +6511,15 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) {
"member fenced - rejoining");
}
+ /* There should be no fencing, hence no rejoining - these asserts are to test only, we don't actually need them. */
+ rd_dassert(!(RD_KAFKA_IS_SHARE_CONSUMER(rkcg->rkcg_rk) &&
+ (rkcg->rkcg_consumer_flags &
+ RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN)));
+ rd_dassert(!(RD_KAFKA_IS_SHARE_CONSUMER(rkcg->rkcg_rk) &&
+ (rkcg->rkcg_consumer_flags &
+ RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE)));
+
+
switch (rkcg->rkcg_join_state) {
case RD_KAFKA_CGRP_JOIN_STATE_INIT:
rkcg->rkcg_consumer_flags &=
@@ -6294,6 +6609,7 @@ rd_kafka_cgrp_consumer_subscribe(rd_kafka_cgrp_t *rkcg,
/* If member is leaving, new subscription
* will be applied after the leave
* ConsumerGroupHeartbeat */
+ /* MILIND: how is new subscription applied after heartbeat, check it. */
if (!rd_kafka_cgrp_will_leave(rkcg))
rd_kafka_cgrp_consumer_apply_next_subscribe(rkcg);
} else {
diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c
index 1f8bbf106b..f9b5de7037 100644
--- a/src/rdkafka_conf.c
+++ b/src/rdkafka_conf.c
@@ -1509,6 +1509,9 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"which indicates where this client is physically located. It "
"corresponds with the broker config `broker.rack`.",
.sdef = ""},
+ {_RK_GLOBAL | _RK_HIDDEN, "share.consumer", _RK_C_BOOL, _RK(is_share_consumer),
+ "tba description", 0, 1, 0},
+
/* Global producer properties */
{_RK_GLOBAL | _RK_PRODUCER | _RK_HIGH, "transactional.id", _RK_C_STR,
diff --git a/src/rdkafka_conf.h b/src/rdkafka_conf.h
index 92e5193eb7..fedfc2ecee 100644
--- a/src/rdkafka_conf.h
+++ b/src/rdkafka_conf.h
@@ -470,6 +470,8 @@ struct rd_kafka_conf_s {
rd_kafkap_str_t *client_rack;
+ int is_share_consumer; /**< Is this a share consumer? */
+
/*
* Producer configuration
*/
diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h
index d8370ff599..9e97bf9dca 100644
--- a/src/rdkafka_int.h
+++ b/src/rdkafka_int.h
@@ -94,6 +94,8 @@ typedef struct rd_kafka_lwtopic_s rd_kafka_lwtopic_t;
#define RD_KAFKA_OFFSET_IS_LOGICAL(OFF) ((OFF) < 0)
+#define RD_KAFKA_IS_SHARE_CONSUMER(rk) \
+ ((rk)->rk_type == RD_KAFKA_CONSUMER && (rk)->rk_conf.is_share_consumer)
/**
* @struct Represents a fetch position:
diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c
index 5dbbf9c9d4..db325b11c3 100644
--- a/src/rdkafka_op.c
+++ b/src/rdkafka_op.c
@@ -123,6 +123,7 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) {
[RD_KAFKA_OP_TERMINATE_TELEMETRY] =
"REPLY:RD_KAFKA_OP_TERMINATE_TELEMETRY",
[RD_KAFKA_OP_ELECTLEADERS] = "REPLY:ELECTLEADERS",
+ [RD_KAFKA_OP_SHARE_FETCH] = "REPLY:SHARE_FETCH",
};
if (type & RD_KAFKA_OP_REPLY)
@@ -287,6 +288,7 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) {
sizeof(rko->rko_u.telemetry_broker),
[RD_KAFKA_OP_TERMINATE_TELEMETRY] = _RD_KAFKA_OP_EMPTY,
[RD_KAFKA_OP_ELECTLEADERS] = sizeof(rko->rko_u.admin_request),
+ [RD_KAFKA_OP_SHARE_FETCH] = sizeof(rko->rko_u.share_fetch),
};
size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK];
@@ -507,6 +509,10 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) {
rd_kafka_broker_destroy);
break;
+ case RD_KAFKA_OP_SHARE_FETCH:
+ /* TODO KIP-932: Add destruction code. */
+ break;
+
default:
break;
}
diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h
index e79309aa02..58a248aaf1 100644
--- a/src/rdkafka_op.h
+++ b/src/rdkafka_op.h
@@ -189,6 +189,8 @@ typedef enum {
RD_KAFKA_OP_ELECTLEADERS, /**< Admin:
* ElectLeaders
* u.admin_request */
+ RD_KAFKA_OP_SHARE_FETCH, /**< broker op: Issue share fetch request if
+ applicable. */
RD_KAFKA_OP__END
} rd_kafka_op_type_t;
@@ -724,6 +726,16 @@ struct rd_kafka_op_s {
void (*cb)(rd_kafka_t *rk, void *rkb);
} terminated;
+ struct {
+ /** Whether this broker should share-fetch nonzero
+ * messages. */
+ rd_bool_t should_fetch;
+
+ /** Absolute timeout left to complete this share-fetch.
+ */
+ rd_ts_t abs_timeout;
+ } share_fetch;
+
} rko_u;
};
diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h
index 02565ecb3b..b5b52ac146 100644
--- a/src/rdkafka_proto.h
+++ b/src/rdkafka_proto.h
@@ -175,6 +175,7 @@ static RD_UNUSED const char *rd_kafka_ApiKey2str(int16_t ApiKey) {
[RD_KAFKAP_ConsumerGroupDescribe] = "ConsumerGroupDescribe",
[RD_KAFKAP_GetTelemetrySubscriptions] = "GetTelemetrySubscriptions",
[RD_KAFKAP_PushTelemetry] = "PushTelemetry",
+ [RD_KAFKAP_ShareGroupHeartbeat] = "ShareGroupHeartbeat",
};
static RD_TLS char ret[64];
diff --git a/src/rdkafka_protocol.h b/src/rdkafka_protocol.h
index 19190e1447..28c246dd9c 100644
--- a/src/rdkafka_protocol.h
+++ b/src/rdkafka_protocol.h
@@ -120,6 +120,7 @@
#define RD_KAFKAP_GetTelemetrySubscriptions 71
#define RD_KAFKAP_PushTelemetry 72
#define RD_KAFKAP_AssignReplicasToDirs 73
+#define RD_KAFKAP_ShareGroupHeartbeat 76
#define RD_KAFKAP__NUM 74
diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c
index 663a07eae3..94c01e5964 100644
--- a/src/rdkafka_request.c
+++ b/src/rdkafka_request.c
@@ -2442,7 +2442,101 @@ void rd_kafka_ConsumerGroupHeartbeatRequest(
rd_kafkap_str_destroy(subscribed_topic_regex_to_send);
}
+void rd_kafka_ShareGroupHeartbeatRequest(
+ rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ const rd_kafkap_str_t *member_id,
+ int32_t member_epoch,
+ const rd_kafkap_str_t *rack_id,
+ const rd_kafka_topic_partition_list_t *subscribed_topics,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int features;
+ size_t rkbuf_size = 0;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_ShareGroupHeartbeat, 1, 1, &features);
+
+ rd_rkb_dbg(rkb, CGRP, "SHAREHEARTBEAT",
+ "ShareGroupHeartbeat version %d for group \"%s\", member id "
+ "\"%s\", topic count = %d",
+ ApiVersion, group_id ? group_id->str : "NULL",
+ member_id ? member_id->str : "NULL",
+ subscribed_topics ? subscribed_topics->cnt : -1);
+
+ if (ApiVersion == -1) {
+ rd_kafka_cgrp_coord_dead(rkb->rkb_rk->rk_cgrp,
+ RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
+ "ShareGroupHeartbeatRequest not "
+ "supported by broker");
+ return;
+ }
+
+ // debug log all the fields
+ if (rd_rkb_is_dbg(rkb, CGRP)) {
+ char subscribed_topics_str[512] = "NULL";
+ if (subscribed_topics) {
+ rd_kafka_topic_partition_list_str(
+ subscribed_topics, subscribed_topics_str,
+ sizeof(subscribed_topics_str), 0);
+ }
+ rd_rkb_dbg(rkb, CGRP, "SHAREHEARTBEAT",
+ "ShareGroupHeartbeat of group id \"%s\", "
+ "member id \"%s\", member epoch %d, rack id \"%s\""
+ ", subscribed topics \"%s\"",
+ group_id ? group_id->str : "NULL",
+ member_id ? member_id->str : "NULL", member_epoch,
+ rack_id ? rack_id->str : "NULL",
+ subscribed_topics_str);
+ }
+
+ if (group_id)
+ rkbuf_size += RD_KAFKAP_STR_SIZE(group_id);
+ if (member_id)
+ rkbuf_size += RD_KAFKAP_STR_SIZE(member_id);
+ rkbuf_size += 4; /* MemberEpoch */
+ if (rack_id)
+ rkbuf_size += RD_KAFKAP_STR_SIZE(rack_id);
+ if (subscribed_topics) {
+ rkbuf_size +=
+ ((subscribed_topics->cnt * (4 + 50)) + 4 /* array size */);
+ }
+
+ rkbuf = rd_kafka_buf_new_flexver_request(
+ rkb, RD_KAFKAP_ShareGroupHeartbeat, 1, rkbuf_size, rd_true);
+
+ rd_kafka_buf_write_kstr(rkbuf, group_id);
+ rd_kafka_buf_write_kstr(rkbuf, member_id);
+ rd_kafka_buf_write_i32(rkbuf, member_epoch);
+ rd_kafka_buf_write_kstr(rkbuf, rack_id);
+ if (subscribed_topics) {
+ int topics_cnt = subscribed_topics->cnt;
+
+ /* write Topics */
+ rd_kafka_buf_write_arraycnt(rkbuf, topics_cnt);
+ while (--topics_cnt >= 0) {
+ if (rd_rkb_is_dbg(rkb, CGRP))
+ rd_rkb_dbg(
+ rkb, CGRP, "SHAREHEARTBEAT",
+ "ShareGroupHeartbeat subscribed "
+ "topic %s",
+ subscribed_topics->elems[topics_cnt].topic);
+ rd_kafka_buf_write_str(
+ rkbuf, subscribed_topics->elems[topics_cnt].topic,
+ -1);
+ }
+ } else {
+ rd_kafka_buf_write_arraycnt(rkbuf, -1);
+ }
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, features);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
/**
* @brief Construct and send ListGroupsRequest to \p rkb
diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h
index c508ffdaaf..12b2bcbd5f 100644
--- a/src/rdkafka_request.h
+++ b/src/rdkafka_request.h
@@ -400,6 +400,17 @@ void rd_kafka_ConsumerGroupHeartbeatRequest(
rd_kafka_resp_cb_t *resp_cb,
void *opaque);
+void rd_kafka_ShareGroupHeartbeatRequest(
+ rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ const rd_kafkap_str_t *member_id,
+ int32_t member_epoch,
+ const rd_kafkap_str_t *rack_id,
+ const rd_kafka_topic_partition_list_t *subscribed_topics,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque);
+
rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
const rd_list_t *topics,
rd_list_t *topic_ids,
diff --git a/tests/0154-share_consumer.c b/tests/0154-share_consumer.c
new file mode 100644
index 0000000000..d906eaf836
--- /dev/null
+++ b/tests/0154-share_consumer.c
@@ -0,0 +1,71 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2025, Confluent Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+int main_0154_share_consumer(int argc, char **argv) {
+ char errstr[512];
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *rk;
+ rd_kafka_topic_partition_list_t *topics;
+ char *topic = "test-topic";// test_mk_topic_name("0154-share-consumer", 0);
+ char *group = "test-group-0";
+
+ test_create_topic_wait_exists(NULL, topic, 3, -1, 60 * 1000);
+ rd_sleep(5);
+
+ test_produce_msgs_easy(topic, 0, 0, 2);
+
+ TEST_SAY("Creating share consumer\n");
+ test_conf_init(&conf, NULL, 60);
+ rd_kafka_conf_set(conf, "share.consumer", "true", NULL, 0);
+ rd_kafka_conf_set(conf, "group.protocol", "consumer", NULL, 0);
+ rd_kafka_conf_set(conf, "group.id", group, NULL, 0);
+ rd_kafka_conf_set(conf, "debug", "cgrp,protocol,conf", NULL, 0);
+
+ // rk = rd_kafka_share_consumer_new(conf, errstr, sizeof(errstr));
+ rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
+ if (!rk) {
+ TEST_FAIL("Failed to create share consumer: %s\n", errstr);
+ }
+
+ topics = rd_kafka_topic_partition_list_new(1);
+ rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);
+ rd_kafka_subscribe(rk, topics);
+ rd_kafka_topic_partition_list_destroy(topics);
+
+ TEST_SAY("Share consumer created successfully\n");
+
+ rd_kafka_consumer_poll(rk, 65000);
+
+ TEST_SAY("Destroying consumer\n");
+
+ /* Clean up */
+ rd_kafka_destroy(rk);
+ return 0;
+}
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 324281bd99..fd90f7aefb 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -144,6 +144,7 @@ set(
0151-purge-brokers.c
0152-rebootstrap.c
0153-memberid.c
+ 0154-share-consumer.c
8000-idle.cpp
8001-fetch_from_follower_mock_manual.c
test.c
diff --git a/tests/broker_version_tests.py b/tests/broker_version_tests.py
index c451e02471..1a0de874b1 100755
--- a/tests/broker_version_tests.py
+++ b/tests/broker_version_tests.py
@@ -31,7 +31,7 @@ def test_it(version, deploy=True, conf={}, rdkconf={}, tests=None,
"""
cluster = LibrdkafkaTestCluster(version, conf,
- num_brokers=int(conf.get('broker_cnt', 3)),
+ num_brokers=int(conf.get('broker_cnt', 1)),
debug=debug, scenario=scenario,
kraft=kraft)
@@ -175,7 +175,7 @@ def handle_report(report, version, suite):
'--brokers',
dest='broker_cnt',
type=int,
- default=3,
+ default=1,
help='Number of Kafka brokers')
parser.add_argument('--ssl', dest='ssl', action='store_true',
default=False,
diff --git a/tests/interactive_broker_version.py b/tests/interactive_broker_version.py
index acddc872fd..3f2ffe7aae 100755
--- a/tests/interactive_broker_version.py
+++ b/tests/interactive_broker_version.py
@@ -25,7 +25,7 @@ def version_as_number(version):
def test_version(version, cmd=None, deploy=True, conf={}, debug=False,
exec_cnt=1,
- root_path='tmp', broker_cnt=3, scenario='default',
+ root_path='tmp', broker_cnt=1, scenario='default',
kraft=False):
"""
@brief Create, deploy and start a Kafka cluster using Kafka \\p version
diff --git a/tests/test.c b/tests/test.c
index 42e525a9cc..d6a9029f7d 100644
--- a/tests/test.c
+++ b/tests/test.c
@@ -272,6 +272,7 @@ _TEST_DECL(0150_telemetry_mock);
_TEST_DECL(0151_purge_brokers_mock);
_TEST_DECL(0152_rebootstrap_local);
_TEST_DECL(0153_memberid);
+_TEST_DECL(0154_share_consumer);
/* Manual tests */
_TEST_DECL(8000_idle);
@@ -540,6 +541,7 @@ struct test tests[] = {
_TEST(0151_purge_brokers_mock, TEST_F_LOCAL),
_TEST(0152_rebootstrap_local, TEST_F_LOCAL),
_TEST(0153_memberid, 0, TEST_BRKVER(0, 4, 0, 0)),
+ _TEST(0154_share_consumer, 0, TEST_BRKVER(0, 4, 0, 0)),
/* Manual tests */
_TEST(8000_idle, TEST_F_MANUAL),
diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj
index 812a2674d1..252f9db0ba 100644
--- a/win32/tests/tests.vcxproj
+++ b/win32/tests/tests.vcxproj
@@ -234,6 +234,7 @@
+