1+ # Test BLE GAP pairing with bonding (persistent pairing) using aioble
2+
3+ import sys
4+
5+ # ruff: noqa: E402
6+ sys .path .append ("" )
7+
8+ from micropython import const
9+ import machine
10+ import time
11+ import os
12+
13+ import asyncio
14+ import aioble
15+ import aioble .security
16+ import bluetooth
17+
18+ TIMEOUT_MS = 5000
19+
20+ SERVICE_UUID = bluetooth .UUID ("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A" )
21+ CHAR_UUID = bluetooth .UUID ("00000000-1111-2222-3333-444444444444" )
22+
23+ _FLAG_READ = const (0x0002 )
24+ _FLAG_READ_ENCRYPTED = const (0x0200 )
25+
26+
27+ # For aioble, we need to directly use the low-level bluetooth API for encrypted characteristics
28+ class EncryptedCharacteristic (aioble .Characteristic ):
29+ def __init__ (self , service , uuid , ** kwargs ):
30+ super ().__init__ (service , uuid , read = True , ** kwargs )
31+ # Override flags to add encryption requirement
32+ self .flags |= _FLAG_READ_ENCRYPTED
33+
34+
35+ # Acting in peripheral role.
36+ async def instance0_task ():
37+ # Clean up any existing secrets from previous tests
38+ try :
39+ os .remove ("ble_secrets.json" )
40+ except :
41+ pass
42+
43+ # Load secrets (will be empty initially but enables bond storage)
44+ aioble .security .load_secrets ()
45+
46+ service = aioble .Service (SERVICE_UUID )
47+ characteristic = EncryptedCharacteristic (service , CHAR_UUID )
48+ aioble .register_services (service )
49+
50+ multitest .globals (BDADDR = aioble .config ("mac" ))
51+ multitest .next ()
52+
53+ # Write initial characteristic value.
54+ characteristic .write ("bonded_data" )
55+
56+ # Wait for central to connect to us.
57+ print ("advertise" )
58+ connection = await aioble .advertise (
59+ 20_000 , adv_data = b"\x02 \x01 \x06 \x04 \xff MPY" , timeout_ms = TIMEOUT_MS
60+ )
61+ print ("connected" )
62+
63+ # Wait for pairing to complete
64+ print ("wait_for_bonding" )
65+ start_time = time .ticks_ms ()
66+ while not connection .encrypted and time .ticks_diff (time .ticks_ms (), start_time ) < TIMEOUT_MS :
67+ await asyncio .sleep_ms (100 )
68+
69+ # Give additional time for bonding to complete after encryption
70+ await asyncio .sleep_ms (500 )
71+
72+ if connection .encrypted :
73+ print ("bonded encrypted=1 authenticated={} bonded={}" .format (
74+ 1 if connection .authenticated else 0 ,
75+ 1 if connection .bonded else 0
76+ ))
77+ else :
78+ print ("bonding_timeout" )
79+
80+ # Wait for the central to disconnect.
81+ await connection .disconnected (timeout_ms = TIMEOUT_MS )
82+ print ("disconnected" )
83+
84+
85+ def instance0 ():
86+ try :
87+ asyncio .run (instance0_task ())
88+ finally :
89+ aioble .stop ()
90+
91+
92+ # Acting in central role.
93+ async def instance1_task ():
94+ multitest .next ()
95+
96+ # Clean up any existing secrets from previous tests
97+ try :
98+ os .remove ("ble_secrets.json" )
99+ except :
100+ pass
101+
102+ # Load secrets (will be empty initially but enables bond storage)
103+ aioble .security .load_secrets ()
104+
105+ # Connect to peripheral.
106+ print ("connect" )
107+ device = aioble .Device (* BDADDR )
108+ connection = await device .connect (timeout_ms = TIMEOUT_MS )
109+
110+ # Discover characteristics (before pairing).
111+ service = await connection .service (SERVICE_UUID )
112+ print ("service" , service .uuid )
113+ characteristic = await service .characteristic (CHAR_UUID )
114+ print ("characteristic" , characteristic .uuid )
115+
116+ # Pair with bonding enabled.
117+ print ("bond" )
118+ await connection .pair (
119+ bond = True , # Enable bonding
120+ le_secure = True ,
121+ mitm = False ,
122+ timeout_ms = TIMEOUT_MS
123+ )
124+
125+ # Give additional time for bonding to complete after encryption
126+ await asyncio .sleep_ms (500 )
127+
128+ print ("bonded encrypted={} authenticated={} bonded={}" .format (
129+ 1 if connection .encrypted else 0 ,
130+ 1 if connection .authenticated else 0 ,
131+ 1 if connection .bonded else 0
132+ ))
133+
134+ # Read the peripheral's characteristic, should be encrypted.
135+ print ("read_encrypted" )
136+ data = await characteristic .read (timeout_ms = TIMEOUT_MS )
137+ print ("read" , data )
138+
139+ # Check if secrets were saved
140+ try :
141+ os .stat ("ble_secrets.json" )
142+ print ("secrets_exist" , "yes" )
143+ except :
144+ print ("secrets_exist" , "no" )
145+
146+ # Disconnect from peripheral.
147+ print ("disconnect" )
148+ await connection .disconnect (timeout_ms = TIMEOUT_MS )
149+ print ("disconnected" )
150+
151+
152+ def instance1 ():
153+ try :
154+ asyncio .run (instance1_task ())
155+ finally :
156+ aioble .stop ()
0 commit comments