@@ -29,7 +29,6 @@ package kafka
2929import (
3030 "context"
3131 "encoding/json"
32- "fmt"
3332 "net/url"
3433 "strconv"
3534 "strings"
@@ -110,11 +109,9 @@ func (p *Plugin) Init(_ string) error {
110109 Balancer : & kafka.CRC32Balancer {}, // same behaviour as confluent-kafka client
111110 }
112111
113- if err != nil {
114- return fmt .Errorf ("failed to initialize a kafka producer with broker '%s'. Details: %w" , p .endpoint , err )
115- }
116112 p .envsInRunning = make (map [string ]* kafkapb.EnvInfo )
117- log .WithField ("call" , call ).Info ("Successfully created a kafka producer with broker '" + p .endpoint + "'" )
113+ log .WithField ("call" , call ).
114+ Info ("successfully created a kafka producer with broker '" + p .endpoint + "'" )
118115
119116 // Prepare and send active run list (expected to be empty during init)
120117 timestamp := uint64 (time .Now ().UnixMilli ())
@@ -124,7 +121,7 @@ func (p *Plugin) Init(_ string) error {
124121 }
125122 arlData , err := proto .Marshal (activeRunsList )
126123 if err != nil {
127- log .WithField ("call" , call ).Error ("Could not marshall an active runs list: " , err )
124+ log .WithField ("call" , call ).Error ("could not marshall an active runs list: " , err )
128125 }
129126 p .produceMessage (arlData , p .ActiveRunsListTopic (), "" , "Init" )
130127 return nil
@@ -161,7 +158,7 @@ func (p *Plugin) newEnvStateObject(varStack map[string]string, call string) *kaf
161158 return nil
162159 }
163160
164- var state string = "UNKNOWN"
161+ var state = "UNKNOWN"
165162 if strings .Contains (trigger , "enter_" ) {
166163 state = strings .TrimPrefix (trigger , "enter_" )
167164 } else if strings .Contains (trigger , "DESTROY" ) {
@@ -238,7 +235,7 @@ func (p *Plugin) GetRunningEnvList() []*kafkapb.EnvInfo {
238235func (p * Plugin ) produceMessage (message []byte , topic string , envId string , call string ) {
239236 log .WithField ("call" , call ).
240237 WithField ("partition" , envId ).
241- Debug ("Producing a new kafka message on topic " , topic )
238+ Debug ("producing a new kafka message on topic " , topic )
242239
243240 err := p .kafkaWriter .WriteMessages (context .Background (), kafka.Message {
244241 Topic : topic ,
@@ -262,7 +259,7 @@ func (p *Plugin) createUpdateCallback(varStack map[string]string, call string) f
262259 // Prepare and send new state notification
263260 log .WithField ("call" , call ).
264261 WithField ("partition" , envInfo .EnvironmentId ).
265- Debug ("Advertising the environment state (" + envInfo .State + ") and active runs list to Kafka" )
262+ Debug ("advertising the environment state (" + envInfo .State + ") and active runs list to Kafka" )
266263 newStateNotification := & kafkapb.NewStateNotification {
267264 EnvInfo : envInfo ,
268265 Timestamp : timestamp ,
@@ -271,7 +268,7 @@ func (p *Plugin) createUpdateCallback(varStack map[string]string, call string) f
271268 if err != nil {
272269 log .WithField ("call" , call ).
273270 WithField ("partition" , envInfo .EnvironmentId ).
274- Error ("Could not marshall a new state notification: " , err )
271+ Error ("could not marshall a new state notification: " , err )
275272 }
276273 p .produceMessage (nsnData , p .FSMTransitionTopic (envInfo .State ), envInfo .EnvironmentId , call )
277274
@@ -284,7 +281,7 @@ func (p *Plugin) createUpdateCallback(varStack map[string]string, call string) f
284281 if err != nil {
285282 log .WithField ("call" , call ).
286283 WithField ("partition" , envInfo .EnvironmentId ).
287- Error ("Could not marshall an active runs list: " , err )
284+ Error ("could not marshall an active runs list: " , err )
288285 }
289286 p .produceMessage (arlData , p .ActiveRunsListTopic (), envInfo .EnvironmentId , call )
290287 return
0 commit comments