-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream_utils.py
More file actions
110 lines (92 loc) · 3.44 KB
/
stream_utils.py
File metadata and controls
110 lines (92 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from amaranth import Module, Signal
from amaranth.lib.wiring import Component, In, Out
from amaranth.lib import stream
from amaranth.utils import ceil_log2
def tree_and(m: Module, inputs: list[Signal]) -> Signal:
if len(inputs) == 1:
return inputs[0]
elif len(inputs) == 2:
result = Signal(1)
m.d.comb += result.eq(inputs[0] & inputs[1])
return result
else:
l = len(inputs)
left = tree_and(m, inputs[:l//2])
right = tree_and(m, inputs[l//2:])
return tree_and(m, [left, right])
def tree_or(m: Module, inputs: list[Signal]) -> Signal:
if len(inputs) == 1:
return inputs[0]
elif len(inputs) == 2:
result = Signal(1)
m.d.comb += result.eq(inputs[0] | inputs[1])
return result
else:
l = len(inputs)
left = tree_or(m, inputs[:l//2])
right = tree_or(m, inputs[l//2:])
return tree_or(m, [left, right])
def fanout_stream(m: Module, input: stream.Signature, outputs: list[stream.Signature]):
combined_ready = tree_and(m, [o.ready for o in outputs])
m.d.comb += input.ready.eq(combined_ready)
for o in outputs:
m.d.comb += [
o.valid.eq(input.valid & combined_ready),
o.payload.eq(input.payload),
]
class LimitForwarder(Component):
"""
Forwards a limited number of bytes from one stream to another, then stops.
Parameters
---------
width: width of the data stream.
max_count: maximum value of the counter.
TODO: Documentation for attributes
"""
def __init__(self, width: int, max_count: int):
count = ceil_log2(max_count)
super().__init__({
"inbound": In(stream.Signature(width)),
"outbound": Out(stream.Signature(width)),
"count": In(count),
"start": In(1),
"done": Out(1),
})
self._count = count
def elaborate(self, _platform):
m = Module()
countdown = Signal(self._count)
# No transfer by default:
m.d.comb += [
self.inbound.ready.eq(0),
self.outbound.valid.eq(0),
]
with m.FSM():
with m.State("idle"):
m.next = "idle"
m.d.comb += self.done.eq(1)
with m.If(self.start):
m.next = "run"
m.d.sync += countdown.eq(self.count)
with m.State("run"):
m.next = "run"
m.d.comb += self.done.eq(0)
with m.If(countdown == 0):
m.next = "idle"
with m.Else():
# We want to do this:
# connect(m, self.inbound, self.outbound)
# But if we do so in the testbench, we get
# raise DriverConflict("Combinationally driven signals cannot be overriden by testbenches")
# from
# ctx.set(dut.inbound.valid, 1)
# Yet this works:
m.d.comb += [
self.inbound.ready.eq(self.outbound.ready),
self.outbound.valid.eq(self.inbound.valid),
self.outbound.payload.eq(self.inbound.payload),
]
with m.If(self.outbound.ready & self.inbound.valid):
# Byte was transferred
m.d.sync += countdown.eq(countdown - 1)
return m