2020from awscrt .io import LogLevel
2121from awscrt .mqtt import Connection , Client , QoS
2222from awsiot .greengrass_discovery import DiscoveryClient , DiscoverResponse
23+ from awsiot import mqtt_connection_builder
2324
2425allowed_actions = ['both' , 'publish' , 'subscribe' ]
2526
2627parser = argparse .ArgumentParser ()
27- parser .add_argument ('-r' , '--ca_file ' , action = 'store' , required = True , dest = 'root_ca_path' , help = 'Root CA file path' )
28+ parser .add_argument ('-r' , '--root-ca ' , action = 'store' , dest = 'root_ca_path' , help = 'Root CA file path' )
2829parser .add_argument ('-c' , '--cert' , action = 'store' , required = True , dest = 'certificate_path' , help = 'Certificate file path' )
2930parser .add_argument ('-k' , '--key' , action = 'store' , required = True , dest = 'private_key_path' , help = 'Private key file path' )
30- parser .add_argument ('-n' , '--thing_name ' , action = 'store' , required = True , dest = 'thing_name' , help = 'Targeted thing name' )
31+ parser .add_argument ('-n' , '--thing-name ' , action = 'store' , required = True , dest = 'thing_name' , help = 'Targeted thing name' )
3132parser .add_argument ('-t' , '--topic' , action = 'store' , dest = 'topic' , default = 'sdk/test/Python' , help = 'Targeted topic' )
3233parser .add_argument ('-m' , '--mode' , action = 'store' , dest = 'mode' , default = 'both' ,
3334 help = 'Operation modes: %s' % str (allowed_actions ))
3435parser .add_argument ('-M' , '--message' , action = 'store' , dest = 'message' , default = 'Hello World!' ,
3536help = 'Message to publish' )
3637parser .add_argument ('--region' , action = 'store' , dest = 'region' , default = 'us-east-1' )
37- parser .add_argument ('--max_pub_ops ' , action = 'store' , dest = 'max_pub_ops' , default = 10 )
38- parser .add_argument ('--print_discover_resp_only ' , action = 'store_true' , dest = 'print_discover_resp_only' , default = False )
38+ parser .add_argument ('--max-pub-ops ' , action = 'store' , dest = 'max_pub_ops' , default = 10 )
39+ parser .add_argument ('--print-discover-resp-only ' , action = 'store_true' , dest = 'print_discover_resp_only' , default = False )
3940parser .add_argument ('-v' , '--verbose' , action = 'store' , dest = 'verbosity' , default = 'NoLogs' )
4041
4142args = parser .parse_args ()
5455 io .init_logging (LogLevel .Trace , 'stderr' )
5556
5657event_loop_group = io .EventLoopGroup (1 )
57- client_bootstrap = io .ClientBootstrap (event_loop_group )
58+ host_resolver = io .DefaultHostResolver (event_loop_group )
59+ client_bootstrap = io .ClientBootstrap (event_loop_group , host_resolver )
5860
5961tls_options = io .TlsContextOptions .create_client_with_mtls_from_path (args .certificate_path , args .private_key_path )
6062tls_options .override_default_trust_store_from_path (None , args .root_ca_path )
@@ -84,25 +86,17 @@ def on_connection_resumed(connection, error_code, session_present):
8486# Try IoT endpoints until we find one that works
8587def try_iot_endpoints ():
8688 for gg_group in discover_response .gg_groups :
87-
88- gg_core_tls_options = io .TlsContextOptions .create_client_with_mtls_from_path (args .certificate_path , args .private_key_path )
89- gg_core_tls_options .override_default_trust_store (bytes (gg_group .certificate_authorities [0 ], encoding = 'utf-8' ))
90- gg_core_tls_ctx = io .ClientTlsContext (gg_core_tls_options )
91- mqtt_client = Client (client_bootstrap , gg_core_tls_ctx )
92-
9389 for gg_core in gg_group .cores :
9490 for connectivity_info in gg_core .connectivity :
9591 try :
9692 print ('Trying core {} at host {} port {}' .format (gg_core .thing_arn , connectivity_info .host_address , connectivity_info .port ))
97- mqtt_connection = Connection (
98- mqtt_client ,
99- on_connection_interrupted = on_connection_interupted ,
100- on_connection_resumed = on_connection_resumed )
101- connect_future = mqtt_connection .connect (
102- client_id = args .thing_name ,
103- host_name = connectivity_info .host_address ,
104- port = connectivity_info .port ,
105- clean_session = False )
93+ mqtt_connection = mqtt_connection_builder .mtls_from_path (endpoint = connectivity_info .host_address , port = connectivity_info .port ,
94+ cert_filepath = args .certificate_path , pri_key_filepath = args .private_key_path , client_bootstrap = client_bootstrap ,
95+ ca_bytes = bytes (gg_group .certificate_authorities [0 ], encoding = 'utf-8' ),
96+ on_connection_interrupted = on_connection_interupted , on_connection_resumed = on_connection_resumed ,
97+ client_id = args .thing_name , clean_session = False , keep_alive_secs = 6 )
98+
99+ connect_future = mqtt_connection .connect ()
106100 connect_future .result ()
107101 print ('Connected!' )
108102 return mqtt_connection
@@ -117,9 +111,9 @@ def try_iot_endpoints():
117111
118112if args .mode == 'both' or args .mode == 'subscribe' :
119113
120- def on_publish (topic , message ):
114+ def on_publish (topic , payload ):
121115 print ('Publish received on topic {}' .format (topic ))
122- print (message )
116+ print (payload )
123117
124118 subscribe_future , _ = mqtt_connection .subscribe (args .topic , QoS .AT_MOST_ONCE , on_publish )
125119 subscribe_result = subscribe_future .result ()
0 commit comments