Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_header.c rdkafka_admin.c rdkafka_aux.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c rdunittest_fetcher.c\
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_error.c rdkafka_fetcher.c rdkafka_telemetry.c \
rdkafka_telemetry_encode.c rdkafka_telemetry_decode.c \
Expand Down
262 changes: 208 additions & 54 deletions src/rdkafka_fetcher.c

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/rdkafka_fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ rd_ts_t rd_kafka_toppar_fetch_decide(rd_kafka_toppar_t *rktp,

void rd_kafka_broker_share_fetch_leave(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko_orig, rd_ts_t now);
void rd_kafka_broker_share_fetch(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko_orig, rd_ts_t now);
int64_t rd_kafka_op_get_offset(const rd_kafka_op_t *rko);
void rd_kafka_share_filter_forward(
rd_kafka_broker_t *rkb,
rd_kafka_toppar_t *rktp,
rd_kafka_q_t *temp_fetchq,
rd_kafka_q_t *temp_appq,
int32_t AcquiredRecordsArrayCnt,
const int64_t *FirstOffsets,
const int64_t *LastOffsets,
const int16_t *DeliveryCounts);


#endif /* _RDKAFKA_FETCHER_H_ */
8 changes: 8 additions & 0 deletions src/rdkafka_msgset.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ rd_kafka_msgset_parse(rd_kafka_buf_t *rkbuf,
rd_kafka_aborted_txns_t *aborted_txns,
const struct rd_kafka_toppar_ver *tver);

rd_kafka_resp_err_t
rd_kafka_share_msgset_parse(rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
rd_kafka_toppar_t *rktp,
rd_kafka_aborted_txns_t *aborted_txns,
const struct rd_kafka_toppar_ver *tver,
rd_kafka_q_t *par_rkq);

#if WITH_ZLIB
rd_kafka_resp_err_t rd_kafka_gzip_compress(rd_kafka_broker_t *rkb,
int comp_level,
Expand Down
27 changes: 27 additions & 0 deletions src/rdkafka_msgset_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,33 @@ rd_kafka_msgset_parse(rd_kafka_buf_t *rkbuf,
return err;
}

rd_kafka_resp_err_t
rd_kafka_share_msgset_parse(rd_kafka_buf_t *rkbuf,
rd_kafka_buf_t *request,
rd_kafka_toppar_t *rktp,
rd_kafka_aborted_txns_t *aborted_txns,
const struct rd_kafka_toppar_ver *tver,
rd_kafka_q_t *par_rkq) {
rd_kafka_msgset_reader_t msetr;
rd_kafka_resp_err_t err;

rd_kafka_msgset_reader_init(&msetr, rkbuf, rktp, tver, aborted_txns,
par_rkq);

/* Parse and handle the message set */
err = rd_kafka_msgset_reader_run(&msetr);

rd_atomic64_add(&rktp->rktp_c.rx_msgs, msetr.msetr_msgcnt);
rd_atomic64_add(&rktp->rktp_c.rx_msg_bytes, msetr.msetr_msg_bytes);

rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchcnt,
(int64_t)msetr.msetr_msgcnt);
rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchsize,
(int64_t)msetr.msetr_msg_bytes);

return err;
}


/**
* @brief Offset comparator
Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ rd_kafka_toppar_t *rd_kafka_toppar_new0(rd_kafka_topic_t *rkt,
rkt->rkt_topic->str, rktp->rktp_partition, rktp,
&rktp->rktp_refcnt, func, line);

rktp->rktp_share_acknowledge = NULL;
rktp->rktp_share_acknowledgement_list = NULL;
rktp->rktp_share_acknowledge_count = 0;

return rd_kafka_toppar_keep(rktp);
Expand Down Expand Up @@ -339,7 +339,7 @@ void rd_kafka_toppar_destroy_final(rd_kafka_toppar_t *rktp) {
/* Clear queues */
rd_kafka_assert(rktp->rktp_rkt->rkt_rk,
rd_kafka_msgq_len(&rktp->rktp_xmit_msgq) == 0);
rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_share_acknowledge == NULL);
rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_share_acknowledgement_list == NULL);
rd_kafka_assert(rktp->rktp_rkt->rkt_rk, rktp->rktp_share_acknowledge_count == 0);
rd_kafka_dr_msgq(rktp->rktp_rkt, &rktp->rktp_msgq,
RD_KAFKA_RESP_ERR__DESTROY);
Expand Down
11 changes: 10 additions & 1 deletion src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ rd_kafka_fetch_pos_make(int64_t offset,
typedef TAILQ_HEAD(rd_kafka_toppar_tqhead_s,
rd_kafka_toppar_s) rd_kafka_toppar_tqhead_t;

typedef enum rd_kafka_share_acknowledgement_type {
RD_KAFKA_SHARE_ACK_GAP = 0, /* gap */
RD_KAFKA_SHARE_ACK_ACCEPT = 1 /* accept */
} rd_kafka_share_acknowledgement_type;

/**
* Topic + Partition combination
*/
Expand Down Expand Up @@ -184,6 +189,8 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
rd_kafka_q_t *rktp_fetchq; /* Queue of fetched messages
* from broker.
* Broker thread -> App */
rd_kafka_q_t *rktp_temp_fetchq; /* Temporary fetch queue
* used to filter acquired records */
rd_kafka_q_t *rktp_ops; /* * -> Main thread */

rd_atomic32_t rktp_msgs_inflight; /**< Current number of
Expand Down Expand Up @@ -491,8 +498,10 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
int64_t first_offset;
int64_t last_offset;
int16_t delivery_count;
} *rktp_share_acknowledge; /* NULL = not initialized */
rd_kafka_share_acknowledgement_type type;
};
size_t rktp_share_acknowledge_count; /* number of entries in rktp_share_acknowledge (0 when NULL) */
rd_list_t *rktp_share_acknowledgement_list; /*Type: rd_kafka_toppar_share_ack_entry_t */
};

/**
Expand Down
2 changes: 2 additions & 0 deletions src/rdunittest.c
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ extern int unittest_scram(void);
#endif
extern int unittest_assignors(void);
extern int unittest_map(void);
extern int unittest_fetcher_share_filter_forward(void);
#if WITH_CURL
extern int unittest_http(void);
#endif
Expand Down Expand Up @@ -480,6 +481,7 @@ int rd_unittest(void) {
#endif
{"telemetry", unittest_telemetry},
{"telemetry_decode", unittest_telemetry_decode},
{"fetcher_share_filter_forward", unittest_fetcher_share_filter_forward},
{"feature", unittest_feature},
{NULL}};
int i;
Expand Down
Loading