@@ -125,7 +125,7 @@ type Writer struct {
125125 BatchBytes int64
126126
127127 // Time limit on how often incomplete message batches will be flushed to
128- // kafka.
128+ // kafka. This is ignored if SendPartialBatch is set to true
129129 //
130130 // The default is to flush at least every second.
131131 BatchTimeout time.Duration
@@ -158,6 +158,13 @@ type Writer struct {
158158 // Defaults to false.
159159 Async bool
160160
161+ // SendPartialBatch forces WriteMessages to send a partial batch instead of
162+ // blocking until a full batch is made. This is useful if you are already batching
163+ // messages before the producer, and want to flush everything sent to WriteMessages
164+ // without blocking while also still being synchronous. When set BatchTimeout does
165+ // nothing.
166+ SendPartialBatch bool
167+
161168 // An optional function called when the writer succeeds or fails the
162169 // delivery of messages to a kafka partition. When writing the messages
163170 // fails, the `err` parameter will be non-nil.
@@ -276,6 +283,13 @@ type WriterConfig struct {
276283 // The default is to use a kafka default value of 1048576.
277284 BatchBytes int
278285
286+ // SendPartialBatch forces WriteMessages to send a partial batch instead of
287+ // blocking until a full batch is made. This is useful if you are already batching
288+ // messages before the producer, and want to flush everything sent to WriteMessages
289+ // without blocking while also still being synchronous. When set BatchTimeout does
290+ // nothing.
291+ SendPartialBatch bool
292+
279293 // Time limit on how often incomplete message batches will be flushed to
280294 // kafka.
281295 //
@@ -487,22 +501,23 @@ func NewWriter(config WriterConfig) *Writer {
487501 }
488502
489503 w := & Writer {
490- Addr : TCP (config .Brokers ... ),
491- Topic : config .Topic ,
492- MaxAttempts : config .MaxAttempts ,
493- BatchSize : config .BatchSize ,
494- Balancer : config .Balancer ,
495- BatchBytes : int64 (config .BatchBytes ),
496- BatchTimeout : config .BatchTimeout ,
497- ReadTimeout : config .ReadTimeout ,
498- WriteTimeout : config .WriteTimeout ,
499- RequiredAcks : RequiredAcks (config .RequiredAcks ),
500- Async : config .Async ,
501- Logger : config .Logger ,
502- ErrorLogger : config .ErrorLogger ,
503- Transport : transport ,
504- transport : transport ,
505- writerStats : stats ,
504+ Addr : TCP (config .Brokers ... ),
505+ Topic : config .Topic ,
506+ MaxAttempts : config .MaxAttempts ,
507+ BatchSize : config .BatchSize ,
508+ Balancer : config .Balancer ,
509+ BatchBytes : int64 (config .BatchBytes ),
510+ BatchTimeout : config .BatchTimeout ,
511+ SendPartialBatch : config .SendPartialBatch ,
512+ ReadTimeout : config .ReadTimeout ,
513+ WriteTimeout : config .WriteTimeout ,
514+ RequiredAcks : RequiredAcks (config .RequiredAcks ),
515+ Async : config .Async ,
516+ Logger : config .Logger ,
517+ ErrorLogger : config .ErrorLogger ,
518+ Transport : transport ,
519+ transport : transport ,
520+ writerStats : stats ,
506521 }
507522
508523 if config .RequiredAcks == 0 {
@@ -1059,13 +1074,26 @@ func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*
10591074 batches [batch ] = append (batches [batch ], i )
10601075 }
10611076 }
1077+
1078+ // if we are sending partial batches and the current batch is not empty send
1079+ // the batch right away instead of lagging.
1080+ if ptw .w .SendPartialBatch && ! ptw .currBatch .empty () {
1081+ ptw .currBatch .trigger ()
1082+ ptw .queue .Put (ptw .currBatch )
1083+ ptw .currBatch = nil
1084+ }
1085+
10621086 return batches
10631087}
10641088
10651089// ptw.w can be accessed here because this is called with the lock ptw.mutex already held.
10661090func (ptw * partitionWriter ) newWriteBatch () * writeBatch {
10671091 batch := newWriteBatch (time .Now (), ptw .w .batchTimeout ())
1068- ptw .w .spawn (func () { ptw .awaitBatch (batch ) })
1092+ // if we are not sending partial batches we don't need to set this.
1093+ if ! ptw .w .SendPartialBatch {
1094+ ptw .w .spawn (func () { ptw .awaitBatch (batch ) })
1095+ }
1096+
10691097 return batch
10701098}
10711099
@@ -1239,6 +1267,11 @@ func (b *writeBatch) full(maxSize int, maxBytes int64) bool {
12391267 return b .size >= maxSize || b .bytes >= maxBytes
12401268}
12411269
1270+ // empty returns if the batch has no data in it at all
1271+ func (b * writeBatch ) empty () bool {
1272+ return b == nil || b .size == 0
1273+ }
1274+
12421275func (b * writeBatch ) trigger () {
12431276 close (b .ready )
12441277}
0 commit comments