Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ test = [
"pytest-xdist == 3.8.0",
"pytest-timeout == 2.4.0",
"flake8 == 7.3.0",
"cocotb==2.0.1",
"cocotb-bus==0.3.0",
"cocotbext-apb==0.7.4",
"za-cocotblib >= 0.0.1, < 0.1.0",
"switchboard-hw == 0.3.0"
]

Expand Down
6 changes: 6 additions & 0 deletions tests/adapters/umi2apb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Import tests so cocotb can discover them
from adapters.umi2apb.test_basic_WR import test_basic_WR # noqa: F401
from adapters.umi2apb.test_full_throughput import test_full_throughput # noqa: F401
from adapters.umi2apb.test_random_stimulus import test_random_stimulus # noqa: F401
from adapters.umi2apb.test_posted_write import test_posted_write # noqa: F401
from adapters.umi2apb.test_backpressure import test_backpressure # noqa: F401
100 changes: 100 additions & 0 deletions tests/adapters/umi2apb/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Owns the driver, monitor, and scoreboard for UMI to APB adapter tests,
# and provides common functionality for the tests.

from cocotb.clock import Clock
from cocotb.triggers import ClockCycles

from cocotb_bus.scoreboard import Scoreboard
from cocotbext.apb import ApbBus, ApbSlave, MemoryRegion

from cocotblib.umi.drivers.sumi_driver import SumiDriver
from cocotblib.umi.monitors.sumi_monitor import SumiMonitor
from cocotblib.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd
from cocotblib.common import drive_reset


# Creates the umi2apb test environment
class UMI2APBEnv:
def __init__(self, dut, clk_period_ns=10, mem_size=2**16):
self.dut = dut
self.clk_period_ns = clk_period_ns
self.mem_size = mem_size

self.data_width = int(dut.RW.value) # default of 64
self.addr_width = int(dut.AW.value) # default of 64
self.data_size = self.data_width // 8

self.expected_responses = []

self.clk = dut.apb_pclk
self.nreset = dut.apb_nreset

self._build()

def _build(self):
dut = self.dut

# Instantiates UMI driver
self.sumi_driver = SumiDriver(
entity=dut,
name="udev_req",
clock=self.clk,
bus_separator="_"
)

# Instantiates APB slave and memory region
apb_bus = ApbBus.from_prefix(dut, "apb")
self.apb_slave = ApbSlave(apb_bus, self.clk, self.nreset)
self.region = MemoryRegion(self.mem_size)
self.apb_slave.target = self.region

# Creates UMI monitor (for responses)
self.sumi_monitor = SumiMonitor(
entity=dut,
name="udev_resp",
clock=self.clk,
bus_separator="_"
)

# Creates scoreboard
self.scoreboard = Scoreboard(dut, fail_immediately=True)
self.scoreboard.add_interface(monitor=self.sumi_monitor, expected_output=self.expected_responses)

# Prerequisites for starting tests
async def start(self):
Clock(self.clk, self.clk_period_ns, unit="ns").start()
await drive_reset(self.nreset, self.clk_period_ns)
self.dut.udev_resp_ready.value = 1

# Waits for umi responses
async def wait_for_responses(self, max_cycles):
cycles = 0
while self.expected_responses:
await ClockCycles(self.clk, 1)
cycles += 1
if cycles > max_cycles:
raise TimeoutError(
f"Timeout waiting for responses "
f"({len(self.expected_responses)} remaining)"
)


# Creates an ideal umi write response
def create_expected_write_response(write_txn, data_size, addr_width=64):
req_da = int(write_txn.da.value) if hasattr(write_txn.da, "value") else int(write_txn.da)
req_sa = int(write_txn.sa.value) if hasattr(write_txn.sa, "value") else int(write_txn.sa)

req_size = int(write_txn.cmd.size)
req_len = int(write_txn.cmd.len)

return SumiTransaction(
cmd=SumiCmd.from_fields(
cmd_type=int(SumiCmdType.UMI_RESP_WRITE),
size=req_size,
len=req_len
),
da=req_sa,
sa=req_da,
data=bytearray(data_size), # Expect no data in write response
addr_width=addr_width
)
79 changes: 79 additions & 0 deletions tests/adapters/umi2apb/test_backpressure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import math
import cocotb

from cocotb.handle import SimHandleBase
from cocotb.triggers import ClockCycles

from cocotblib.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd
from adapters.umi2apb.env import UMI2APBEnv, create_expected_write_response


@cocotb.test(timeout_time=50, timeout_unit="ms")
async def test_backpressure(dut: SimHandleBase):
"""
Test response backpressure handling:
1. Disable response ready
2. Send transactions
3. Verify responses are held and not lost
4. Enable ready and verify all responses arrive correctly
"""

env = UMI2APBEnv(dut)
await env.start()

umi_size = int(math.log2(env.data_size))

print("=== Backpressure Test ===")

# Disable response ready
dut.udev_resp_ready.value = 0

# Send a write transaction
test_addr = 0x100
test_data = 0xDEADBEEF

write_txn = SumiTransaction(
cmd=SumiCmd.from_fields(
cmd_type=int(SumiCmdType.UMI_REQ_WRITE),
size=umi_size,
len=0,
),
da=test_addr,
sa=0x0,
data=test_data.to_bytes(env.data_size, byteorder="little"),
)

env.expected_responses.append(
create_expected_write_response(
write_txn,
data_size=env.data_size,
addr_width=env.addr_width,
)
)

env.sumi_driver.append(write_txn)
print(f"Sent write: addr=0x{test_addr:x}, data=0x{test_data:08x}")

await ClockCycles(env.clk, 20)

# Verify response is being held
assert dut.udev_resp_valid.value == 1, "Response should be valid"
assert len(env.expected_responses) == 1, "Response should not have been consumed yet"
print("Response held with backpressure")

# enable response ready
dut.udev_resp_ready.value = 1
print("Re-enabled udev_resp_ready")

await env.wait_for_responses(max_cycles=10)

# Verify mem was written correctly
mem_data = await env.region.read(test_addr, env.data_size)
actual_data = int.from_bytes(mem_data, byteorder="little")
assert actual_data == test_data, (
f"Write data mismatch: expected 0x{test_data:x}, got 0x{actual_data:x}"
)
print(f"Memory verified: 0x{actual_data:08x}")

print("\n=== Backpressure Test PASSED ===")
raise env.scoreboard.result
100 changes: 100 additions & 0 deletions tests/adapters/umi2apb/test_basic_WR.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import math
import cocotb

from cocotb.handle import SimHandleBase

from cocotblib.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd
from adapters.umi2apb.env import UMI2APBEnv, create_expected_write_response


@cocotb.test(timeout_time=50, timeout_unit="ms")
async def test_basic_WR(dut: SimHandleBase):
"""
Basic sanity test:
1. Single aligned UMI write
2. Verify APB memory contents
3. Single UMI read
4. Verify response payload
"""

# Grab shared test environment
env = UMI2APBEnv(dut)
await env.start()

umi_size = int(math.log2(env.data_size))
test_addr = 0x100
test_data = 0xDEADBEEF

print("=== Basic Write Test ===")

# WRITE transaction
write_txn = SumiTransaction(
cmd=SumiCmd.from_fields(
cmd_type=int(SumiCmdType.UMI_REQ_WRITE),
size=umi_size,
len=0,
),
da=test_addr,
sa=0x0,
data=test_data.to_bytes(env.data_size, byteorder="little"),
)

env.expected_responses.append(
create_expected_write_response(
write_txn,
data_size=env.data_size,
addr_width=env.addr_width,
)
)

env.sumi_driver.append(write_txn)

# Wait for write response
await env.wait_for_responses(max_cycles=100)

# Verify APB memory contents
mem_data = await env.region.read(test_addr, env.data_size)
assert int.from_bytes(mem_data, byteorder="little") == test_data, (
f"Write failed: expected 0x{test_data:x}, "
f"got 0x{int.from_bytes(mem_data, 'little'):x}"
)

print(f" Data written to memory: 0x{test_data:08x}")
print(" UMI write response verified by scoreboard")

print("\n=== Basic Read Test ===")

# READ transaction
read_txn = SumiTransaction(
cmd=SumiCmd.from_fields(
cmd_type=int(SumiCmdType.UMI_REQ_READ),
size=umi_size,
len=0,
),
da=test_addr,
sa=0x0,
data=bytearray(env.data_size),
)

expected_read_resp = SumiTransaction(
cmd=SumiCmd.from_fields(
cmd_type=int(SumiCmdType.UMI_RESP_READ),
size=umi_size,
len=0,
),
da=0x0,
sa=test_addr,
data=test_data.to_bytes(env.data_size, byteorder="little"),
addr_width=env.addr_width,
)

env.expected_responses.append(expected_read_resp)
env.sumi_driver.append(read_txn)

# Wait for read response
await env.wait_for_responses(max_cycles=100)

print("Read response verified by scoreboard")

# Check scoreboard results
raise env.scoreboard.result
90 changes: 90 additions & 0 deletions tests/adapters/umi2apb/test_full_throughput.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import math
import cocotb

from adapters.umi2apb.env import UMI2APBEnv
from cocotblib.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd


@cocotb.test(timeout_time=50, timeout_unit="ms")
async def test_full_throughput(dut):
"""
Back-to-back full-throughput tests alternating read/write transactions.

- New request arrives in same cycle response becomes valid
"""

env = UMI2APBEnv(dut)
await env.start()

data_size = env.data_size
addr_width = env.addr_width
umi_size = int(math.log2(data_size))

num_transactions = 100

print("=== Back-to-Back Full Throughput Test ===")

for i in range(num_transactions):
txn_size = i % (umi_size + 1)
txn_bytes = 1 << txn_size
addr = i * data_size

is_read = (i % 2) == 0

if is_read:
txn = SumiTransaction(
cmd=SumiCmd.from_fields(
cmd_type=int(SumiCmdType.UMI_REQ_READ),
size=txn_size,
len=0,
),
da=addr,
sa=0x0,
data=bytearray(txn_bytes),
)

expected_resp = SumiTransaction(
cmd=SumiCmd.from_fields(
cmd_type=int(SumiCmdType.UMI_RESP_READ),
size=txn_size,
len=0,
),
da=0x0,
sa=addr,
data=bytearray(txn_bytes),
addr_width=addr_width,
)

else:
data = bytes([i & 0xFF] * txn_bytes)
txn = SumiTransaction(
cmd=SumiCmd.from_fields(
cmd_type=int(SumiCmdType.UMI_REQ_WRITE),
size=txn_size,
len=0,
),
da=addr,
sa=0x0,
data=data,
)

expected_resp = SumiTransaction(
cmd=SumiCmd.from_fields(
cmd_type=int(SumiCmdType.UMI_RESP_WRITE),
size=txn_size,
len=0,
),
da=0x0,
sa=addr,
data=bytearray(txn_bytes),
addr_width=addr_width,
)

env.expected_responses.append(expected_resp)
env.sumi_driver.append(txn)

# Wait for all responses
await env.wait_for_responses(max_cycles=num_transactions * 50)

print(f" All {num_transactions} back-to-back transactions completed successfully!")
raise env.scoreboard.result
Loading
Loading