77
88import yaml
99from skywalking import config , agent
10- from vertx import EventBus
10+ from vertx import EventBus , eventbus
1111
1212from sourceplusplus import __version__
1313from .control .LiveInstrumentRemote import LiveInstrumentRemote
@@ -77,6 +77,16 @@ def __init__(self, args: dict = None):
7777 True
7878 )
7979
80+ # setup agent authentication (if needed)
81+ if probe_config ["spp" ].get ("authentication" ) is not None :
82+ client_id = probe_config ["spp" ]["authentication" ]["client_id" ]
83+ client_secret = probe_config ["spp" ]["authentication" ]["client_secret" ]
84+ tenant_id = probe_config ["spp" ]["authentication" ].get ("tenant_id" )
85+ if tenant_id is not None and tenant_id != "" :
86+ probe_config ["skywalking" ]["agent" ]["authentication" ] = f"{ client_id } :{ client_secret } :{ tenant_id } "
87+ else :
88+ probe_config ["skywalking" ]["agent" ]["authentication" ] = f"{ client_id } :{ client_secret } "
89+
8090 for key , val in args .items ():
8191 tmp_config = probe_config
8292 loc = key .split ("." )
@@ -95,6 +105,7 @@ def attach(self):
95105 config .init (
96106 collector_address = self .probe_config ["skywalking" ]["collector" ]["backend_service" ],
97107 service_name = self .probe_config ["skywalking" ]["agent" ]["service_name" ],
108+ authentication = self .probe_config ["skywalking" ]["agent" ]["authentication" ],
98109 log_reporter_active = True ,
99110 force_tls = self .probe_config ["spp" ]["ssl_enabled" ],
100111 log_reporter_formatted = self .probe_config ["skywalking" ]["plugin" ]["toolkit" ]["log" ]["transmit_formatted" ]
@@ -151,13 +162,30 @@ def __send_connected(self, eb: EventBus):
151162 "instanceId" : self .probe_config ["spp" ]["probe_id" ],
152163 "connectionTime" : round (time .time () * 1000 ),
153164 "meta" : probe_metadata
154- }, reply_handler = lambda msg : self .__register_remotes (eb , reply_address , msg ["body" ]))
165+ }, reply_handler = lambda msg : self .__register_remotes (eb , reply_address , headers , msg ["body" ]))
155166
156- def __register_remotes (self , eb , reply_address , status ):
167+ def __register_remotes (self , eb , reply_address , headers , status ):
157168 eb .unregister_handler (reply_address )
158- eb .register_handler (
169+ self .register_handler (
170+ eb = eb ,
159171 address = "spp.probe.command.live-instrument-remote:" + self .probe_config ["spp" ]["probe_id" ],
172+ headers = headers ,
160173 handler = lambda msg : self .instrument_remote .handle_instrument_command (
161174 LiveInstrumentCommand .from_json (json .dumps (msg ["body" ]))
162175 )
163176 )
177+
178+ # todo: needed since the EventBus class does not support registering handlers with headers.
179+ # platform should require connect message before all messages, then headers in register message are not needed
180+ # noinspection PyProtectedMember
181+ @staticmethod
182+ def register_handler (eb , address , headers , handler ):
183+ if not eb ._address_registered_at_server (address ):
184+ try :
185+ eb ._check_closed ()
186+ message = eventbus .create_message ("register" , address , headers )
187+ eb ._send_frame (message )
188+ except Exception as e :
189+ print ("Registration failed: " + str (e ))
190+ raise e
191+ eb ._register_local (address , handler , True )
0 commit comments