Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/avatar.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ jobs:
- name: Test
run: |
avatar --list | grep -Ev '^=' > test-names.txt
split test-names.txt -n l/${{ matrix.shard }}
timeout 5m avatar --test-beds bumble.bumbles --tests $(split test-names.txt -n l/${{ matrix.shard }})
- name: Rootcanal Logs
run: cat rootcanal.log
182 changes: 182 additions & 0 deletions avatar/cases/l2cap_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import avatar
import logging

from avatar import BumblePandoraDevice
from avatar import PandoraDevice
from avatar import PandoraDevices
from avatar import pandora_snippet
from mobly import base_test
from mobly import test_runner
from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_is_not_none # type: ignore
from pandora import host_pb2
from typing import Any, Dict, Optional

CLASSIC_PSM = 0xFEFF
LE_SPSM = 0xF0


class L2capTest(base_test.BaseTestClass): # type: ignore[misc]
devices: Optional[PandoraDevices] = None

# pandora devices.
dut: PandoraDevice
ref: PandoraDevice

# BR/EDR & Low-Energy connections.
dut_ref: Dict[str, host_pb2.Connection] = {}
ref_dut: Dict[str, host_pb2.Connection] = {}

def setup_class(self) -> None:
self.devices = PandoraDevices(self)
self.dut, self.ref, *_ = self.devices

# Enable BR/EDR mode for Bumble devices.
for device in self.devices:
if isinstance(device, BumblePandoraDevice):
device.config.setdefault("classic_enabled", True)

def teardown_class(self) -> None:
if self.devices:
self.devices.stop_all()

@avatar.asynchronous
async def setup_test(self) -> None: # pytype: disable=wrong-arg-types
await asyncio.gather(self.dut.reset(), self.ref.reset())

# Connect REF to DUT in both BR/EDR and Low-Energy.
ref_dut_br, dut_ref_br = await pandora_snippet.connect(self.ref, self.dut)
ref_dut_le, dut_ref_le = await pandora_snippet.connect_le_dummy(self.ref, self.dut)

self.dut_ref = dict(basic=dut_ref_br, le_credit_based=dut_ref_le)
self.ref_dut = dict(basic=ref_dut_br, le_credit_based=ref_dut_le)

@avatar.parameterized(
(dict(basic=dict(psm=CLASSIC_PSM)),),
(dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)),),
) # type: ignore[misc]
@avatar.asynchronous
async def test_connect(
self,
request: Dict[str, Any],
) -> None:
transport = next(iter(request.keys()))
ref_dut, dut_ref = await asyncio.gather(
self.ref.aio.l2cap.WaitConnection(connection=self.ref_dut[transport], **request),
self.dut.aio.l2cap.Connect(connection=self.dut_ref[transport], **request),
)
assert_is_not_none(ref_dut.channel)
assert_is_not_none(dut_ref.channel)

@avatar.parameterized(
(dict(basic=dict(psm=CLASSIC_PSM)),),
(dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)),),
) # type: ignore[misc]
@avatar.asynchronous
async def test_wait_connection(
self,
request: Dict[str, Any],
) -> None:
transport = next(iter(request.keys()))
dut_ref, ref_dut = await asyncio.gather(
self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request),
self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request),
)
assert_is_not_none(ref_dut.channel)
assert_is_not_none(dut_ref.channel)

@avatar.parameterized(
(dict(basic=dict(psm=CLASSIC_PSM)),),
(dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)),),
) # type: ignore[misc]
@avatar.asynchronous
async def test_disconnect(
self,
request: Dict[str, Any],
) -> None:
transport = next(iter(request.keys()))
dut_ref, ref_dut = await asyncio.gather(
self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request),
self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request),
)
assert_is_not_none(ref_dut.channel)
assert_is_not_none(dut_ref.channel)
assert ref_dut.channel and dut_ref.channel

_, dis = await asyncio.gather(
self.ref.aio.l2cap.WaitDisconnection(channel=ref_dut.channel),
self.dut.aio.l2cap.Disconnect(channel=dut_ref.channel),
)

assert_equal(dis.result_variant(), 'success')

@avatar.parameterized(
(dict(basic=dict(psm=CLASSIC_PSM)),),
(dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)),),
) # type: ignore[misc]
@avatar.asynchronous
async def test_wait_disconnection(
self,
request: Dict[str, Any],
) -> None:
transport = next(iter(request.keys()))
dut_ref, ref_dut = await asyncio.gather(
self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request),
self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request),
)
assert_is_not_none(ref_dut.channel)
assert_is_not_none(dut_ref.channel)
assert ref_dut.channel and dut_ref.channel

dis, _ = await asyncio.gather(
self.dut.aio.l2cap.WaitDisconnection(channel=dut_ref.channel),
self.ref.aio.l2cap.Disconnect(channel=ref_dut.channel),
)

assert_equal(dis.result_variant(), 'success')

@avatar.parameterized(
(dict(basic=dict(psm=CLASSIC_PSM)),),
(dict(le_credit_based=dict(spsm=LE_SPSM, mtu=2046, mps=2048, initial_credit=256)),),
) # type: ignore[misc]
@avatar.asynchronous
async def test_send(
self,
request: Dict[str, Any],
) -> None:
transport = next(iter(request.keys()))
dut_ref, ref_dut = await asyncio.gather(
self.ref.aio.l2cap.WaitConnection(connection=self.dut_ref[transport], **request),
self.dut.aio.l2cap.Connect(connection=self.ref_dut[transport], **request),
)
assert_is_not_none(ref_dut.channel)
assert_is_not_none(dut_ref.channel)
assert ref_dut.channel and dut_ref.channel

ref_source = self.ref.aio.l2cap.Receive(channel=dut_ref.channel)
_, recv = await asyncio.gather(
self.dut.aio.l2cap.Send(channel=ref_dut.channel, data=b"The quick brown fox jumps over the lazy dog"),
anext(aiter(ref_source)),
)

assert_equal(recv.data, b"The quick brown fox jumps over the lazy dog")


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
test_runner.main() # type: ignore
36 changes: 6 additions & 30 deletions avatar/cases/le_security_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
from mobly.asserts import fail # type: ignore
from pandora.host_pb2 import PUBLIC
from pandora.host_pb2 import RANDOM
from pandora.host_pb2 import Connection
from pandora.host_pb2 import DataTypes
from pandora.host_pb2 import OwnAddressType
from pandora.security_pb2 import LE_LEVEL3
from pandora.security_pb2 import LEVEL2
from pandora.security_pb2 import PairingEventAnswer
Expand Down Expand Up @@ -203,38 +200,17 @@ async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
nonlocal ref_dut
nonlocal dut_ref

# Make LE connection task.
async def connect_le(
initiator: PandoraDevice,
acceptor: PandoraDevice,
initiator_addr_type: OwnAddressType,
acceptor_addr_type: OwnAddressType,
) -> Tuple[Connection, Connection]:
# Acceptor - Advertise
advertisement = acceptor.aio.host.Advertise(
legacy=True,
connectable=True,
own_address_type=acceptor_addr_type,
data=DataTypes(manufacturer_specific_data=b'pause cafe'),
)

# Initiator - Scan and fetch the address
scan = initiator.aio.host.Scan(own_address_type=initiator_addr_type)
acceptor_scan = await anext(
(x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)
) # pytype: disable=name-error
scan.cancel()

# Initiator - LE connect
return await pandora_snippet.connect_le(initiator, advertisement, acceptor_scan, initiator_addr_type)

# Make LE connection.
if connect == 'incoming_connection':
# DUT is acceptor
ref_dut, dut_ref = await connect_le(self.ref, self.dut, ref_address_type, dut_address_type)
ref_dut, dut_ref = await pandora_snippet.connect_le_dummy(
self.ref, self.dut, ref_address_type, dut_address_type
)
else:
# DUT is initiator
dut_ref, ref_dut = await connect_le(self.dut, self.ref, dut_address_type, ref_address_type)
dut_ref, ref_dut = await pandora_snippet.connect_le_dummy(
self.dut, self.ref, dut_address_type, ref_address_type
)

# Pairing.

Expand Down
12 changes: 12 additions & 0 deletions avatar/pandora_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from dataclasses import dataclass
from pandora import host_grpc
from pandora import host_grpc_aio
from pandora import l2cap_grpc
from pandora import l2cap_grpc_aio
from pandora import security_grpc
from pandora import security_grpc_aio
from typing import Any, Dict, MutableMapping, Optional, Tuple, Union
Expand Down Expand Up @@ -152,6 +154,11 @@ def security_storage(self) -> security_grpc.SecurityStorage:
"""Returns the Pandora SecurityStorage gRPC interface."""
return security_grpc.SecurityStorage(self.channel)

@property
def l2cap(self) -> l2cap_grpc.L2CAP:
"""Returns the Pandora SecurityStorage gRPC interface."""
return l2cap_grpc.L2CAP(self.channel)

@dataclass
class Aio:
channel: grpc.aio.Channel
Expand All @@ -171,6 +178,11 @@ def security_storage(self) -> security_grpc_aio.SecurityStorage:
"""Returns the Pandora SecurityStorage gRPC interface."""
return security_grpc_aio.SecurityStorage(self.channel)

@property
def l2cap(self) -> l2cap_grpc_aio.L2CAP:
"""Returns the Pandora SecurityStorage gRPC interface."""
return l2cap_grpc_aio.L2CAP(self.channel)

@property
def aio(self) -> 'PandoraClient.Aio':
if not self._aio:
Expand Down
28 changes: 28 additions & 0 deletions avatar/pandora_snippet.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_is_not_none # type: ignore
from pandora._utils import AioStream
from pandora.host_pb2 import RANDOM
from pandora.host_pb2 import AdvertiseResponse
from pandora.host_pb2 import Connection
from pandora.host_pb2 import DataTypes
from pandora.host_pb2 import OwnAddressType
from pandora.host_pb2 import ScanningResponse
from typing import Optional, Tuple
Expand Down Expand Up @@ -65,3 +67,29 @@ async def connect_le(
assert_is_not_none(init_res.connection)
assert init_res.connection
return init_res.connection, wait_res.connection


# Make LE connection task.
async def connect_le_dummy(
initiator: PandoraDevice,
acceptor: PandoraDevice,
initiator_addr_type: OwnAddressType = RANDOM,
acceptor_addr_type: OwnAddressType = RANDOM,
) -> Tuple[Connection, Connection]:
# Acceptor - Advertise
advertisement = acceptor.aio.host.Advertise(
legacy=True,
connectable=True,
own_address_type=acceptor_addr_type,
data=DataTypes(manufacturer_specific_data=b'pause cafe'),
)

# Initiator - Scan and fetch the address
scan = initiator.aio.host.Scan(own_address_type=initiator_addr_type)
acceptor_scan = await anext(
(x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)
) # pytype: disable=name-error
scan.cancel()

# Initiator - LE connect
return await connect_le(initiator, advertisement, acceptor_scan, initiator_addr_type)
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ classifiers = [
"License :: OSI Approved :: Apache Software License"
]
dependencies = [
"bt-test-interfaces>=0.0.4",
"bumble>=0.0.176",
"bumble@git+https://github.com/google/bumble@uael/l2cap_pandora",
"bt-test-interfaces==0.0.5",
"protobuf==4.24.2",
"grpcio==1.57",
"mobly==1.12.2",
Expand Down