From a15b53a7f1aca7624a5da56372185e4ce2a10db6 Mon Sep 17 00:00:00 2001 From: Mike Walters Date: Thu, 2 Jan 2025 13:19:41 +0000 Subject: [PATCH] Remove `synchronize` helper function --- .../interface/gateware_phy/receiver.py | 9 +-- luna/gateware/usb/usb2/endpoints/status.py | 7 +- luna/gateware/usb/usb3/physical/lfps.py | 8 ++- luna/gateware/utils/__init__.py | 3 +- luna/gateware/utils/cdc.py | 65 +------------------ tests/test_cdc.py | 45 +------------ 6 files changed, 18 insertions(+), 119 deletions(-) diff --git a/luna/gateware/interface/gateware_phy/receiver.py b/luna/gateware/interface/gateware_phy/receiver.py index 9dd72e2c1..45296373a 100644 --- a/luna/gateware/interface/gateware_phy/receiver.py +++ b/luna/gateware/interface/gateware_phy/receiver.py @@ -33,11 +33,10 @@ # from amaranth import Elaboratable, Module, Signal, Cat, Const, ClockSignal +from amaranth.lib.cdc import FFSynchronizer from amaranth.lib.fifo import AsyncFIFOBuffered from amaranth.hdl.xfrm import ResetInserter -from ...utils.cdc import synchronize - class RxClockDataRecovery(Elaboratable): """RX Clock Data Recovery module. @@ -102,8 +101,10 @@ def elaborate(self, platform): # Synchronize the USB signals at our I/O boundary. # Despite the assumptions made in ValentyUSB, this line rate recovery FSM # isn't enough to properly synchronize these inputs. We'll explicitly synchronize. - sync_dp = synchronize(m, self._usbp, o_domain="usb_io") - sync_dn = synchronize(m, self._usbn, o_domain="usb_io") + sync_dp = Signal() + sync_dn = Signal() + m.submodules.dp_cdc = FFSynchronizer(self._usbp, sync_dp, o_domain="usb_io") + m.submodules.dn_cdc = FFSynchronizer(self._usbn, sync_dn, o_domain="usb_io") ####################################################################### # Line State Recovery State Machine diff --git a/luna/gateware/usb/usb2/endpoints/status.py b/luna/gateware/usb/usb2/endpoints/status.py index b4d634009..795fa6b2d 100644 --- a/luna/gateware/usb/usb2/endpoints/status.py +++ b/luna/gateware/usb/usb2/endpoints/status.py @@ -10,10 +10,10 @@ repeatedly poll a device for status. """ -from amaranth import Elaboratable, Module, Signal, Array +from amaranth import Elaboratable, Module, Signal, Array +from amaranth.lib.cdc import FFSynchronizer from ..endpoint import EndpointInterface -from ....utils.cdc import synchronize class USBSignalInEndpoint(Elaboratable): @@ -76,7 +76,8 @@ def elaborate(self, platform): if self._signal_domain == "usb": target_signal = self.signal else: - target_signal = synchronize(m, self.signal, o_domain="usb") + target_signal = Signal() + m.submodules.target_signal_cdc = FFSynchronizer(self.signal, target_signal, o_domain="usb") # Store a latched version of our signal, captured before we start a transmission. diff --git a/luna/gateware/usb/usb3/physical/lfps.py b/luna/gateware/usb/usb3/physical/lfps.py index b8cff422c..ad60dce9b 100644 --- a/luna/gateware/usb/usb3/physical/lfps.py +++ b/luna/gateware/usb/usb3/physical/lfps.py @@ -28,9 +28,10 @@ from math import ceil -from amaranth import * +from amaranth import * +from amaranth.lib.cdc import FFSynchronizer -from ....utils import synchronize, rising_edge_detected +from ....utils import rising_edge_detected __all__ = ['LFPSTransceiver'] @@ -106,7 +107,8 @@ def elaborate(self, platform): m = Module() # Create an in-domain version of our square-wave-detector signal. - present = synchronize(m, self.signaling_received, o_domain="ss") + present = Signal() + m.submodules.present_cdc = FFSynchronizer(self.signaling_received, present, o_domain="ss") # Figure out how large of a counter we're going to need... burst_cycles_min = ceil(self._clock_frequency * self._pattern.burst.t_min) diff --git a/luna/gateware/utils/__init__.py b/luna/gateware/utils/__init__.py index 1150dc54a..1bf432c53 100644 --- a/luna/gateware/utils/__init__.py +++ b/luna/gateware/utils/__init__.py @@ -4,11 +4,10 @@ """ Simple utility constructs for LUNA. """ from amaranth import Module, Signal, Cat -from .cdc import synchronize __all__ = [ 'rising_edge_detected', 'falling_edge_detected', 'any_edge_detected', - 'past_value_of', 'synchronize' + 'past_value_of' ] diff --git a/luna/gateware/utils/cdc.py b/luna/gateware/utils/cdc.py index afdabd2f5..eb9cdd0bf 100644 --- a/luna/gateware/utils/cdc.py +++ b/luna/gateware/utils/cdc.py @@ -7,70 +7,7 @@ """ Helpers for clock domain crossings. """ -from amaranth import Record, Signal -from amaranth.lib.cdc import FFSynchronizer -from amaranth.lib.io import Pin -from amaranth.hdl.rec import DIR_FANOUT - - -def synchronize(m, signal, *, output=None, o_domain='sync', stages=2): - """ Convenience function. Synchronizes a signal, or equivalent collection. - - Parameters: - input -- The signal to be synchronized. - output -- The signal to output the result of the synchronization - to, or None to have one created for you. - domain -- The name of the domain to be synchronized to. - stages -- The depth (in FFs) of the synchronization chain. - Longer incurs more delay. Must be >= 2 to avoid metastability. - - Returns: - record -- The post-synchronization signal. Will be equivalent to the - `output` record if provided, or a new, created signal otherwise. - """ - - # Quick function to create a synchronizer with our domain and stages. - def create_synchronizer(signal, output): - return FFSynchronizer(signal, output, o_domain=o_domain, stages=stages) - - if output is None: - if isinstance(signal, Signal): - output = Signal.like(signal) - else: - output = Record.like(signal) - - # If the object knows how to synchronize itself, let it. - if hasattr(signal, '_synchronize_'): - signal._synchronize_(m, output, o_domain=o_domain, stages=stages) - return output - - # Trivial case: if this element doesn't have a layout, - # we can just synchronize it directly. - if not hasattr(signal, 'layout'): - m.submodules += create_synchronizer(signal, output) - return output - - # Otherwise, we'll need to make sure we only synchronize - # elements with non-output directions. - for name, layout, direction in signal.layout: - # Skip any output elements, as they're already - # in our clock domain, and we don't want to drive them. - if (direction == DIR_FANOUT): - m.d.comb += signal[name].eq(output[name]) - continue - elif hasattr(signal[name], 'o') and ~hasattr(signal[name], 'i'): - m.d.comb += signal[name].o.eq(output[name]) - continue - - # If this is a record itself, we'll need to recurse. - if isinstance(signal[name], (Record, Pin)): - synchronize(m, signal[name], output=output[name], - o_domain=o_domain, stages=stages) - continue - - m.submodules += create_synchronizer(signal[name], output[name]) - - return output +from amaranth import Signal def stretch_strobe_signal(m, strobe, *, to_cycles, output=None, domain=None, allow_delay=False): diff --git a/tests/test_cdc.py b/tests/test_cdc.py index 628b792c9..49b682012 100644 --- a/tests/test_cdc.py +++ b/tests/test_cdc.py @@ -7,49 +7,8 @@ from luna.gateware.test import LunaGatewareTestCase, sync_test_case from unittest import TestCase -from amaranth import Module, Record, Signal -from amaranth.lib.io import Pin -from amaranth.hdl.rec import DIR_FANIN, DIR_FANOUT -from luna.gateware.utils.cdc import synchronize, stretch_strobe_signal - -class SynchronizedTest(TestCase): - - def test_signal(self): - m = Module() - synchronize(m, Signal()) - - def test_directional_record(self): - m = Module() - - record = Record([ - ('sig_in', 1, DIR_FANIN), - ('sig_out', 1, DIR_FANOUT) - ]) - synchronize(m, record) - - def test_nested_record(self): - m = Module() - - record = Record([ - ('sig_in', 1, DIR_FANIN), - ('sig_out', 1, DIR_FANOUT), - ('nested', [ - ('subsig_in', 1, DIR_FANIN), - ('subsig_out', 1, DIR_FANOUT), - ]) - ]) - synchronize(m, record) - - def test_pins(self): - m = Module() - pins = { - "sig_in": Pin(1, "i"), - "sig_out": Pin(1, "o"), - } - record = Record([ - (f_name, f.layout) for (f_name, f) in pins.items() - ], fields=pins) - synchronize(m, record) +from amaranth import Module, Signal +from luna.gateware.utils.cdc import stretch_strobe_signal class StrobeStretcherTest(LunaGatewareTestCase):