diff --git a/pyproject.toml b/pyproject.toml index 5cf90d03..7fde638e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,10 @@ test = [ "pytest-xdist == 3.8.0", "pytest-timeout == 2.4.0", "flake8 == 7.3.0", + "cocotb==2.0.1", + "cocotb-bus==0.3.0", + "cocotbext-apb==0.7.4", + "za-cocotblib >= 0.0.1, < 0.1.0", "switchboard-hw == 0.3.0" ] diff --git a/tests/adapters/umi2apb/__init__.py b/tests/adapters/umi2apb/__init__.py new file mode 100644 index 00000000..95d79faa --- /dev/null +++ b/tests/adapters/umi2apb/__init__.py @@ -0,0 +1,6 @@ +# Import tests so cocotb can discover them +from adapters.umi2apb.test_basic_WR import test_basic_WR # noqa: F401 +from adapters.umi2apb.test_full_throughput import test_full_throughput # noqa: F401 +from adapters.umi2apb.test_random_stimulus import test_random_stimulus # noqa: F401 +from adapters.umi2apb.test_posted_write import test_posted_write # noqa: F401 +from adapters.umi2apb.test_backpressure import test_backpressure # noqa: F401 diff --git a/tests/adapters/umi2apb/env.py b/tests/adapters/umi2apb/env.py new file mode 100644 index 00000000..687b40a9 --- /dev/null +++ b/tests/adapters/umi2apb/env.py @@ -0,0 +1,100 @@ +# Owns the driver, monitor, and scoreboard for UMI to APB adapter tests, +# and provides common functionality for the tests. + +from cocotb.clock import Clock +from cocotb.triggers import ClockCycles + +from cocotb_bus.scoreboard import Scoreboard +from cocotbext.apb import ApbBus, ApbSlave, MemoryRegion + +from cocotblib.umi.drivers.sumi_driver import SumiDriver +from cocotblib.umi.monitors.sumi_monitor import SumiMonitor +from cocotblib.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd +from cocotblib.common import drive_reset + + +# Creates the umi2apb test environment +class UMI2APBEnv: + def __init__(self, dut, clk_period_ns=10, mem_size=2**16): + self.dut = dut + self.clk_period_ns = clk_period_ns + self.mem_size = mem_size + + self.data_width = int(dut.RW.value) # default of 64 + self.addr_width = int(dut.AW.value) # default of 64 + self.data_size = self.data_width // 8 + + self.expected_responses = [] + + self.clk = dut.apb_pclk + self.nreset = dut.apb_nreset + + self._build() + + def _build(self): + dut = self.dut + + # Instantiates UMI driver + self.sumi_driver = SumiDriver( + entity=dut, + name="udev_req", + clock=self.clk, + bus_separator="_" + ) + + # Instantiates APB slave and memory region + apb_bus = ApbBus.from_prefix(dut, "apb") + self.apb_slave = ApbSlave(apb_bus, self.clk, self.nreset) + self.region = MemoryRegion(self.mem_size) + self.apb_slave.target = self.region + + # Creates UMI monitor (for responses) + self.sumi_monitor = SumiMonitor( + entity=dut, + name="udev_resp", + clock=self.clk, + bus_separator="_" + ) + + # Creates scoreboard + self.scoreboard = Scoreboard(dut, fail_immediately=True) + self.scoreboard.add_interface(monitor=self.sumi_monitor, expected_output=self.expected_responses) + + # Prerequisites for starting tests + async def start(self): + Clock(self.clk, self.clk_period_ns, unit="ns").start() + await drive_reset(self.nreset, self.clk_period_ns) + self.dut.udev_resp_ready.value = 1 + + # Waits for umi responses + async def wait_for_responses(self, max_cycles): + cycles = 0 + while self.expected_responses: + await ClockCycles(self.clk, 1) + cycles += 1 + if cycles > max_cycles: + raise TimeoutError( + f"Timeout waiting for responses " + f"({len(self.expected_responses)} remaining)" + ) + + +# Creates an ideal umi write response +def create_expected_write_response(write_txn, data_size, addr_width=64): + req_da = int(write_txn.da.value) if hasattr(write_txn.da, "value") else int(write_txn.da) + req_sa = int(write_txn.sa.value) if hasattr(write_txn.sa, "value") else int(write_txn.sa) + + req_size = int(write_txn.cmd.size) + req_len = int(write_txn.cmd.len) + + return SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_RESP_WRITE), + size=req_size, + len=req_len + ), + da=req_sa, + sa=req_da, + data=bytearray(data_size), # Expect no data in write response + addr_width=addr_width + ) diff --git a/tests/adapters/umi2apb/test_backpressure.py b/tests/adapters/umi2apb/test_backpressure.py new file mode 100644 index 00000000..972c23f2 --- /dev/null +++ b/tests/adapters/umi2apb/test_backpressure.py @@ -0,0 +1,79 @@ +import math +import cocotb + +from cocotb.handle import SimHandleBase +from cocotb.triggers import ClockCycles + +from cocotblib.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd +from adapters.umi2apb.env import UMI2APBEnv, create_expected_write_response + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_backpressure(dut: SimHandleBase): + """ + Test response backpressure handling: + 1. Disable response ready + 2. Send transactions + 3. Verify responses are held and not lost + 4. Enable ready and verify all responses arrive correctly + """ + + env = UMI2APBEnv(dut) + await env.start() + + umi_size = int(math.log2(env.data_size)) + + print("=== Backpressure Test ===") + + # Disable response ready + dut.udev_resp_ready.value = 0 + + # Send a write transaction + test_addr = 0x100 + test_data = 0xDEADBEEF + + write_txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_WRITE), + size=umi_size, + len=0, + ), + da=test_addr, + sa=0x0, + data=test_data.to_bytes(env.data_size, byteorder="little"), + ) + + env.expected_responses.append( + create_expected_write_response( + write_txn, + data_size=env.data_size, + addr_width=env.addr_width, + ) + ) + + env.sumi_driver.append(write_txn) + print(f"Sent write: addr=0x{test_addr:x}, data=0x{test_data:08x}") + + await ClockCycles(env.clk, 20) + + # Verify response is being held + assert dut.udev_resp_valid.value == 1, "Response should be valid" + assert len(env.expected_responses) == 1, "Response should not have been consumed yet" + print("Response held with backpressure") + + # enable response ready + dut.udev_resp_ready.value = 1 + print("Re-enabled udev_resp_ready") + + await env.wait_for_responses(max_cycles=10) + + # Verify mem was written correctly + mem_data = await env.region.read(test_addr, env.data_size) + actual_data = int.from_bytes(mem_data, byteorder="little") + assert actual_data == test_data, ( + f"Write data mismatch: expected 0x{test_data:x}, got 0x{actual_data:x}" + ) + print(f"Memory verified: 0x{actual_data:08x}") + + print("\n=== Backpressure Test PASSED ===") + raise env.scoreboard.result diff --git a/tests/adapters/umi2apb/test_basic_WR.py b/tests/adapters/umi2apb/test_basic_WR.py new file mode 100644 index 00000000..72d947e8 --- /dev/null +++ b/tests/adapters/umi2apb/test_basic_WR.py @@ -0,0 +1,100 @@ +import math +import cocotb + +from cocotb.handle import SimHandleBase + +from cocotblib.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd +from adapters.umi2apb.env import UMI2APBEnv, create_expected_write_response + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_basic_WR(dut: SimHandleBase): + """ + Basic sanity test: + 1. Single aligned UMI write + 2. Verify APB memory contents + 3. Single UMI read + 4. Verify response payload + """ + + # Grab shared test environment + env = UMI2APBEnv(dut) + await env.start() + + umi_size = int(math.log2(env.data_size)) + test_addr = 0x100 + test_data = 0xDEADBEEF + + print("=== Basic Write Test ===") + + # WRITE transaction + write_txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_WRITE), + size=umi_size, + len=0, + ), + da=test_addr, + sa=0x0, + data=test_data.to_bytes(env.data_size, byteorder="little"), + ) + + env.expected_responses.append( + create_expected_write_response( + write_txn, + data_size=env.data_size, + addr_width=env.addr_width, + ) + ) + + env.sumi_driver.append(write_txn) + + # Wait for write response + await env.wait_for_responses(max_cycles=100) + + # Verify APB memory contents + mem_data = await env.region.read(test_addr, env.data_size) + assert int.from_bytes(mem_data, byteorder="little") == test_data, ( + f"Write failed: expected 0x{test_data:x}, " + f"got 0x{int.from_bytes(mem_data, 'little'):x}" + ) + + print(f" Data written to memory: 0x{test_data:08x}") + print(" UMI write response verified by scoreboard") + + print("\n=== Basic Read Test ===") + + # READ transaction + read_txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_READ), + size=umi_size, + len=0, + ), + da=test_addr, + sa=0x0, + data=bytearray(env.data_size), + ) + + expected_read_resp = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_RESP_READ), + size=umi_size, + len=0, + ), + da=0x0, + sa=test_addr, + data=test_data.to_bytes(env.data_size, byteorder="little"), + addr_width=env.addr_width, + ) + + env.expected_responses.append(expected_read_resp) + env.sumi_driver.append(read_txn) + + # Wait for read response + await env.wait_for_responses(max_cycles=100) + + print("Read response verified by scoreboard") + + # Check scoreboard results + raise env.scoreboard.result diff --git a/tests/adapters/umi2apb/test_full_throughput.py b/tests/adapters/umi2apb/test_full_throughput.py new file mode 100644 index 00000000..c0f11d92 --- /dev/null +++ b/tests/adapters/umi2apb/test_full_throughput.py @@ -0,0 +1,90 @@ +import math +import cocotb + +from adapters.umi2apb.env import UMI2APBEnv +from cocotblib.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_full_throughput(dut): + """ + Back-to-back full-throughput tests alternating read/write transactions. + + - New request arrives in same cycle response becomes valid + """ + + env = UMI2APBEnv(dut) + await env.start() + + data_size = env.data_size + addr_width = env.addr_width + umi_size = int(math.log2(data_size)) + + num_transactions = 100 + + print("=== Back-to-Back Full Throughput Test ===") + + for i in range(num_transactions): + txn_size = i % (umi_size + 1) + txn_bytes = 1 << txn_size + addr = i * data_size + + is_read = (i % 2) == 0 + + if is_read: + txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_READ), + size=txn_size, + len=0, + ), + da=addr, + sa=0x0, + data=bytearray(txn_bytes), + ) + + expected_resp = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_RESP_READ), + size=txn_size, + len=0, + ), + da=0x0, + sa=addr, + data=bytearray(txn_bytes), + addr_width=addr_width, + ) + + else: + data = bytes([i & 0xFF] * txn_bytes) + txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_WRITE), + size=txn_size, + len=0, + ), + da=addr, + sa=0x0, + data=data, + ) + + expected_resp = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_RESP_WRITE), + size=txn_size, + len=0, + ), + da=0x0, + sa=addr, + data=bytearray(txn_bytes), + addr_width=addr_width, + ) + + env.expected_responses.append(expected_resp) + env.sumi_driver.append(txn) + + # Wait for all responses + await env.wait_for_responses(max_cycles=num_transactions * 50) + + print(f" All {num_transactions} back-to-back transactions completed successfully!") + raise env.scoreboard.result diff --git a/tests/adapters/umi2apb/test_posted_write.py b/tests/adapters/umi2apb/test_posted_write.py new file mode 100644 index 00000000..1cc3b1af --- /dev/null +++ b/tests/adapters/umi2apb/test_posted_write.py @@ -0,0 +1,66 @@ +import math +import cocotb + +from cocotb.handle import SimHandleBase +from cocotb.triggers import ClockCycles + +from cocotblib.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd +from adapters.umi2apb.env import UMI2APBEnv + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_posted_write(dut: SimHandleBase): + """ + Test posted writes (no UMI response expected): + 1. Send multiple writes to different addresses + 2. Verify APB memory contents + """ + + env = UMI2APBEnv(dut) + await env.start() + + umi_size = int(math.log2(env.data_size)) + + print("=== Posted Write Test ===") + + # Test data: address -> value + test_data = { + 0x100: 0xDEADBEEF, + 0x200: 0xCAFEBABE, + 0x300: 0x12345678, + 0x400: 0xABCD1234, + } + + # Send writes + for addr, data in test_data.items(): + posted_txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_POSTED), + size=umi_size, + len=0, + ), + da=addr, + sa=0x0, + data=data.to_bytes(env.data_size, byteorder="little"), + ) + env.sumi_driver.append(posted_txn) + print(f" Sent posted write: addr=0x{addr:x}, data=0x{data:08x}") + + # Wait for transactions to complete + await ClockCycles(env.clk, 50) + + # Verify APB memory contents directly + print("\n=== Verifying Memory Contents ===") + for addr, expected_data in test_data.items(): + mem_data = await env.region.read(addr, env.data_size) + actual_data = int.from_bytes(mem_data, byteorder="little") + assert actual_data == expected_data, ( + f"Posted write failed at 0x{addr:x}: " + f"expected 0x{expected_data:x}, got 0x{actual_data:x}" + ) + print(f" Verified addr=0x{addr:x}: 0x{actual_data:08x}") + + # Note: If any unexpected responses arrive, the scoreboard will + # raise error + + print("\n=== Posted Write Test PASSED ===") diff --git a/tests/adapters/umi2apb/test_random_stimulus.py b/tests/adapters/umi2apb/test_random_stimulus.py new file mode 100644 index 00000000..1ca13b03 --- /dev/null +++ b/tests/adapters/umi2apb/test_random_stimulus.py @@ -0,0 +1,114 @@ +import math +import cocotb +from random import randint, randbytes +from cocotb.triggers import ClockCycles + +from adapters.umi2apb.env import UMI2APBEnv, create_expected_write_response +from cocotblib.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_random_stimulus(dut): + """ + Randomized read/write stimulus. + + - Aligned addresses + - Full-width accesses + - Memory model checked at the end + """ + # Grab shared test environment + env = UMI2APBEnv(dut) + await env.start() + + data_size = env.data_size + addr_width = env.addr_width + umi_size = int(math.log2(data_size)) + + mem_size = 2**16 + num_random_transactions = 512 + read_probability = 0.5 + + print(f"=== Randomized Test: {num_random_transactions} transactions ===") + + # Ideal memory model for writes/reads + memory_model = {} + + for i in range(num_random_transactions): + txn_bytes = env.data_size + max_addr = (mem_size - txn_bytes) // txn_bytes + + # Randomized address and command type + addr = randint(0, max_addr) * txn_bytes + is_read = randint(0, 99) < (read_probability * 100) + + if is_read: + txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_READ), + size=umi_size, + len=0, + ), + da=addr, + sa=0x0, + data=bytearray(txn_bytes), + ) + + expected_data = memory_model.get(addr, bytearray(txn_bytes)) + expected_resp = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_RESP_READ), + size=umi_size, + len=0, + ), + da=0x0, + sa=addr, + data=expected_data, + addr_width=addr_width, + ) + + env.expected_responses.append(expected_resp) + + else: + data = randbytes(txn_bytes) + memory_model[addr] = data + + txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_WRITE), + size=umi_size, + len=0, + ), + da=addr, + sa=0x0, + data=data, + ) + + env.expected_responses.append( + create_expected_write_response(txn, txn_bytes, addr_width) + ) + + await env.sumi_driver.send(txn) + + if (i + 1) % 100 == 0: + print(f" Sent {i+1}/{num_random_transactions} transactions...") + await ClockCycles(env.clk, 1) + + await env.wait_for_responses(max_cycles=num_random_transactions * 50) + + # Memory verification + num_verified = 0 + for addr, expected_data in memory_model.items(): + mem_data = await env.region.read(addr, data_size) + assert mem_data == expected_data, ( + f"Memory mismatch at 0x{addr:x}: " + f"expected {expected_data.hex()}, got {mem_data.hex()}" + ) + num_verified += 1 + + print("\n=== Test Statistics ===") + print(f" Total transactions: {num_random_transactions}") + print(f" Unique addresses written: {len(memory_model)}") + print(f" Memory locations verified: {num_verified}") + print(" All transactions completed") + + raise env.scoreboard.result diff --git a/tests/adapters/umi2apb/test_run.py b/tests/adapters/umi2apb/test_run.py new file mode 100644 index 00000000..15bbbc1c --- /dev/null +++ b/tests/adapters/umi2apb/test_run.py @@ -0,0 +1,30 @@ +# tests/adapters/umi2apb/run.py + +import pytest +from siliconcompiler import Sim + +from umi.adapters import UMI2APB +from cocotblib.common import run_cocotb + + +def run_umi2apb(simulator="verilator", waves=True): + project = Sim(UMI2APB()) + project.add_fileset("rtl") + + tests_failed = run_cocotb( + project=project, + test_module_name="adapters.umi2apb", + simulator_name=simulator, + timescale=("1ns", "1ps"), + build_args=["--report-unoptflat"] if simulator == "verilator" else [], + output_dir_name=f"umi2apb_{simulator}", + waves=waves, + ) + + assert tests_failed == 0 + + +@pytest.mark.sim +@pytest.mark.parametrize("simulator", ["verilator"]) +def test_umi2apb(simulator): + run_umi2apb(simulator)