From e139b0d473acf03b7929fb68868fa5a78e021fab Mon Sep 17 00:00:00 2001 From: Maciej Klepaczewski Date: Thu, 21 Mar 2024 09:03:42 +0100 Subject: [PATCH] Preserve threadId on QueryEvent --- src/zalora/binlog-parser/parser/conversion/conversion.go | 1 + src/zalora/binlog-parser/parser/messages/message.go | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/zalora/binlog-parser/parser/conversion/conversion.go b/src/zalora/binlog-parser/parser/conversion/conversion.go index c3d3f82..75228b7 100644 --- a/src/zalora/binlog-parser/parser/conversion/conversion.go +++ b/src/zalora/binlog-parser/parser/conversion/conversion.go @@ -34,6 +34,7 @@ func ConvertQueryEventToMessage(binlogEventHeader replication.EventHeader, binlo message := messages.NewQueryMessage( header, messages.SqlQuery(binlogEvent.Query), + binlogEvent.SlaveProxyID, ) return messages.Message(message) diff --git a/src/zalora/binlog-parser/parser/messages/message.go b/src/zalora/binlog-parser/parser/messages/message.go index b5e204d..d3bb449 100644 --- a/src/zalora/binlog-parser/parser/messages/message.go +++ b/src/zalora/binlog-parser/parser/messages/message.go @@ -60,11 +60,12 @@ type SqlQuery string type QueryMessage struct { baseMessage - Query SqlQuery + Query SqlQuery + ThreadId uint32 } -func NewQueryMessage(header MessageHeader, query SqlQuery) QueryMessage { - return QueryMessage{baseMessage: baseMessage{Header: header, Type: MESSAGE_TYPE_QUERY}, Query: query} +func NewQueryMessage(header MessageHeader, query SqlQuery, threadId uint32) QueryMessage { + return QueryMessage{baseMessage: baseMessage{Header: header, Type: MESSAGE_TYPE_QUERY}, Query: query, ThreadId: threadId} } type UpdateMessage struct {