diff --git a/mkidgen3/server/__init__.py b/mkidgen3/server/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/mkidgen3/server/aggregator.py b/mkidgen3/server/aggregator.py deleted file mode 100644 index 9a5ae38..0000000 --- a/mkidgen3/server/aggregator.py +++ /dev/null @@ -1,192 +0,0 @@ -from .captures import * -from zmq import * -from .pixelmap import PixelMap -from mkidgen3.mkidpynq import unpack_photons - -MAX_PHOTON_PER_CR_SEND = 5000 -INSTRUMENT_PHOTON_TYPE = (('time', 'u32'), ('x', 'u32'), ('y', 'u32'), ('phase', 'u16')) - -class Gen3TableSaver: - def __init__(self, file, size_hint=None): - self._file=file - - def grow(self, photons): - pass - - -class Gen3LiveImage: - def __init__(self, destination, exposure_time, photon_rate=True): - self.exposure_time = exposure_time - self.photon_rate = photon_rate - - def add_photons(self, photons): - pass - - -class PhotonAggregator: - """ A stub for combining photon capture data from multiple feedlines into a cohesive entity suitable for further - use. Photon CaptureJobs result in photon data being published by the various feedline readout servers under the - associated capture ID. To gather data from a full array we need to connect to each stream receive the data, - uncompress it, convert the timestamps to full unix times, convert the feedline channels to full readout channels ( - FL+FLchannel), and then, depending, do things like convert to a rate, array image, save send on, or so forth - - Captures can die or be killed - """ - def __init__(self, time_offset, jobs: List[CaptureJob] or None, pixel_map:PixelMap): - self._time_offset = time_offset - self._new_jobs = jobs - self._map = pixel_map - self._dead_jobs = [] - self._jobs = [] - self._hdf_saver = None - - def add_capture(self, jobs: List[CaptureJob]): - self._new_jobs.extend(jobs) - - def run(self): - poller = zmq.Poller() - poller.register(self._pipe[1], flags=zmq.POLLIN) - - getLogger(__name__).debug(f'Listening for data') - self._pipe[1].send(b'') - - while True: - - for job in list(self._new_jobs): - assert not job.datasink.is_alive() - poller.register(job.datasink.establish()) #TODO this isn't thread safe if the di - self._jobs.append(job) - self._new_jobs.remove(job) - - avail = dict(poller.poll()) - if self._pipe[1] in avail: - getLogger(__name__).debug(f'Received shutdown order, terminating {self}') - break - - nnew = 0 - photons_buf = np.recarray(len(avail)*MAX_PHOTON_PER_CR_SEND, dtype=INSTRUMENT_PHOTON_TYPE) - - for job in (j for j in list(self._jobs) if j.datasink.socket in avail): - raw_phot_data = job.datasink.receive() - - if raw_phot_data is None: - getLogger(__name__).debug(f'Photon data stream over for {job}') - poller.unregister(job.datasing.socket) - self._jobs.remove(job) - self._dead_jobs.append(job) - continue - - sl_out = slice(nnew, nnew+raw_phot_data.size) - photons_buf['time'][sl_out] = raw_phot_data['time'] + self._time_offset - photons_buf['phase'][sl_out] = raw_phot_data['phase'] - xy = self._map[job.fl_ndx, raw_phot_data['id']] #todo - photons_buf['x'][sl_out] = xy[0] - photons_buf['y'][sl_out] = xy[1] - nnew+=raw_phot_data.size - - if self._live_image is not None: - self._image_image.add_photons(photons_buf[:nnew]) - - if self._hdf_saver is not None: - self._hdf_saver.grow(photons_buf[:nnew]) - - if self._republisher: - self._republisher.send(photons_buf[:nnew]) - - for job in self._jobs: - job.datasink.destablish() - - -def make_image(map, photons, timestep_us=None, rate=False): - """ Build an image out of the current photon data, specify a timestep (in us) to make a timecube""" - - first, last = photons['time'][[0, -1]] - duration_s = (last - first) / 1e6 - if first >= last: - raise ValueError('Last photon time is not after the first!') - - if timestep_us: - timebins = (np.arange(first, last + timestep_us, timestep_us, dtype=type(timestep_us)),) - else: - timebins = tuple() - imbins = map.map_bins - - val = map.map[photons['id']] - val = (val,) if map.map.ndim == 1 else (val[0], val[1]) - coord = val + (photons['time'],) if timestep_us else val - - hist, _ = np.histogramdd(coord, bins=imbins + timebins, density=rate) - if rate: - hist *= photons.size - hist /= duration_s if timestep_us is None else 1e6 - - return hist - -from mkidgen3.mkidpynq import PHOTON_DTYPE -class PhotonAccumulator: - def __init__(self, max_ram=100, pixelmap: PixelMap = None, n_channels=None): - """ - A tool to gather photons together without running out of ram and ease access - to timeseries, images, and other helpful metrics. - - max_ram: how much space to allow for photon storage - pixelmap: and optional channel id to x or xy map, should be a nChan x 1 or 2 array positions - n_channels: optional shortcut if pixelmap not specified - """ - assert bool(n_channels) ^ bool(pixelmap), 'Must specify n_channels xor pixelmap' - self.ram_limit = max_ram # MiB - self.map = pixelmap if pixelmap else PixelMap(np.arange(n_channels, dtype=int), n_channels) - self._data = np.zeros(self.ram_limit * 1024 ** 2 // PHOTON_DTYPE.itemsize, dtype=PHOTON_DTYPE) - self.n = 0 - self.accumulations = 0 - - def __getitem__(self, n): - """Get photons for channel n""" - return self.data[self.data['id'] == n] - - def accumulate(self, buffer, n=None): - """Add some packed photons to the pot""" - self.accumulations += 1 - if n is None: - d = np.array(buffer) - n = buffer.size - else: - d = np.array(buffer[:n]) - - drop = max(n - (self._data.size - self.n), 0) - if drop: - getLogger(__name__).debug(f'Memory used up, dropping {drop} old photons.') - self._data[:self._data.size - drop] = self._data[drop:] - unpack_photons(buffer[:n], out=self._data, n=self.n) - self.n += n - drop - - @property - def data(self): - return self._data[:self.n] - - def image(self, timestep_us=None, rate=False): - """ Build an image out of the current photon data, specify a timestep (in us) to make a timecube""" - return make_image(self.map, self.data[:self.n], timestep_us=timestep_us, rate=rate) - - - -from mkidgen3.server import pixelmap -if __name__=='__main__': - - h5file = 'test.h5' - live_image = ('live.tcp', 'live.bmp') - repub = ('photons.tcp') - - #Need to manually spin up feedline servers - - N_FEEDLINES, N_RESONATORS = 2, 2048 - frs1 = FRSClient(url1) - frs2 = FRSClient(url2) - cfg = FeedlineConfig() - - servers = [frs1, frs2] - map = pixelmap.example_map(N_FEEDLINES, N_RESONATORS) - jobs = [CaptureJob(CaptureRequest(.5, 'photons', cfg, frs), - submit=(False, False)) for frs in servers] - - aggregator = PhotonAggregator(int(time.time()), jobs, map) diff --git a/mkidgen3/server/captures.py b/mkidgen3/server/captures.py deleted file mode 100644 index 101e205..0000000 --- a/mkidgen3/server/captures.py +++ /dev/null @@ -1,996 +0,0 @@ -import copy -import pickle -import threading -import time -import numpy as np -import zmq -from typing import Iterable -import blosc2 -from typing import List -from npy_append_array import NpyAppendArray -from logging import getLogger -from datetime import datetime -import os -from hashlib import md5 -from mkidgen3.server.misc import zpipe -from mkidgen3.util import format_bytes -from mkidgen3.funcs import convert_adc_raw_to_mv, raw_iq_to_unit, raw_phase_to_radian -from ..system_parameters import SYSTEM_BANDWIDTH -from mkidgen3.rfsocmemory import memfree_mib -from .feedline_config import FeedlineConfig, WaveformConfig, IFConfig -from ..mkidpynq import PHOTON_DTYPE -from ..system_parameters import N_CHANNELS, N_IQ_GROUPS, N_PHASE_GROUPS, N_POSTAGE_CHANNELS, \ - PHOTON_POSTAGE_WINDOW_LENGTH, MAXIMUM_DESIGN_COUNTRATE_PER_S -from functools import cached_property -from mkidgen3.mkidpynq import unpack_photons - -from typing import Tuple -PHOTON_DTYPE_PACKED = np.uint64 - - -class FRSClient: - def __init__(self, url: str, command_port: int = 10000, data_port: int = 10001, status_port: int = 10002): - """ - Args: - url: ip address of rfsoc for ex: rfsoc4x2b.physics.ucsb.edu - command_port: port for feedline server commands - data_port: port for capture data server - status_port: port for status server - """ - self.url = url - self.command_port = command_port - self.data_port = data_port - self.status_port = status_port - - @property - def command_url(self): - return f'tcp://{self.url}:{self.command_port}' - - @property - def data_url(self): - return f'tcp://{self.url}:{self.data_port}' - - @property - def status_url(self): - return f'tcp://{self.url}:{self.status_port}' - - def __str__(self): - return f'FRS@{self.url}:{self.command_port}' - - def bequiet(self): - ctx = zmq.Context().instance() - with ctx.socket(zmq.REQ) as s: - s.connect(self.command_url) - s.send_pyobj(('bequiet', {})) - return s.recv_pyobj() - - def order(self, *args): - ctx = zmq.Context().instance() - with ctx.socket(zmq.REQ) as s: - s.connect(self.command_url) - s.send_pyobj(args) - return s.recv_pyobj() - - -class CaptureRequest: - """ - A Capture Request object is generated by the client and submitted to the feedline readout server - """ - STATUS_ENDPOINT = 'inproc://cap_stat.xsub' - DATA_ENDPOINT = 'inproc://cap_data.xsub' - FINAL_STATUSES = ('finished', 'aborted', 'failed') - SUPPORTED_TAPS = ('postage', 'photon', 'adc', 'iq', 'filtphase', 'ddciq') - - @staticmethod - def validate_channels(tap: str, chan: list): - """ - Verify that the channel selection is compatible with the driver. - - Note that this is not done dynamically, no driver discovery is performed - - See mkidgen3.drivers.FilterPhase, .drivers.FilterIQ, and .trigger.PhotonPostageFilter - for details about groups and monitor channels. - - Args: - chan: A list of channels - tap: A capture location - - Returns: True|False - """ - tap = tap.lower() - if 'iq' in tap: - l = N_CHANNELS - m = N_CHANNELS - elif 'phase' in tap: - l = N_CHANNELS - m = N_CHANNELS - elif tap == 'postage': - l = N_POSTAGE_CHANNELS - m = N_CHANNELS - elif chan: - l = 0 # no channels for photon or adc capture - else: - return False - try: - assert len(chan) <= l - for c in chan: - assert 0 <= c < m - except AssertionError: - return False - return True - - def __init__(self, n: int, tap: str, feedline_config: FeedlineConfig, - feedline_server: FRSClient, channels: Iterable | int | None = None, file: str = None, - fail_saturation_frac: None | float | int = None, numpy_metric: None | str = None, - _compression_override: bool = None): - """ - - Args: - n: the number of samples to capture, for photons n is the buffer rotation interval request in ms - tap: the location to capture: 'adc'|'iq'|'phase'|'postage'|'photon' - feedline_config: the FL configuration required by the capture - feedline_server: the FRS to capture from - channels: the phase, IQ, or postage resonator channels to capture - file: an optional file to save the result to - fail_saturation_frac: Take a pre-capture of ADC data and abort if the maximum I or Q magnitude is greater - than the fraction*ADC_MAX_INT. Set to None, 0, or > 1 to disable. - numpy_metric: a numpy function to apply across samples on a channel by channel basis. Must have `out` and - `axis` as input parameters. Math is done to I and Q respectively (complex math not supported). - For example: 'mean' will return a mean of the values for each channel for IQ and phase data. - _compression_override: skips blosc compression on the server if True. False or None (default) results in - data being compressed before sending. - """ - - tap = tap.lower() - assert tap in CaptureRequest.SUPPORTED_TAPS - self.fail_saturation_frac = float(fail_saturation_frac) if fail_saturation_frac else None - self.nsamp = n # n is treated as the buffer time in ms for photons, and has limits enforced by the driver - self._last_status = None - if channels is not None: - try: - channels = tuple(sorted(set(channels))) - except TypeError: - channels = (channels,) - if not CaptureRequest.validate_channels(tap, channels): - raise ValueError('Invalid channels specification') - self.channels = channels - self.tap = tap # maybe add some error handling here - self.feedline_config = feedline_config - self.server = feedline_server - self._status_socket = None - self._data_socket = None - self._established = False - self._compression_override = _compression_override - if numpy_metric: - try: - self.numpy_metric = str(numpy_metric) - getattr(np, self.numpy_metric) - except AttributeError: - raise ValueError('Not a good metric') - else: - self.numpy_metric = None - self.data_endpoint = file or type(self).DATA_ENDPOINT - - def __hash__(self): - return int(md5(str((hash(self.feedline_config), self.tap, self.data_endpoint, - self.nsamp, self.server.command_url)).encode()).hexdigest(), 16) - - # def __enter__(self): - # self.establish() - # - # def __exit__(self, exc_type, exc_val, exc_tb): - # if isinstance(exc_type, zmq.error.ContextTerminated): - # self._data_socket=None - # self._status_socket=None - # else: - # self.destablish() - - def __del__(self): - try: - self.destablish() - except zmq.error.ContextTerminated: - pass - - def __str__(self): - return f'CapReq {self.server}:{self.tap} {str(hash(self))}' - - def submit(self, ctx: zmq.Context = None): - getLogger(__name__).info(f'Submitting {self}..') - ctx = ctx or zmq.Context().instance() - with ctx.socket(zmq.REQ) as s: - s.connect(self.server.command_url) - s.send_pyobj(('capture', self)) - getLogger(__name__).info(f'...submitted...') - resp = s.recv_pyobj() - getLogger(__name__).info(f'response: {resp}') - return resp - - @property - def type(self): - return 'engineering' if self.tap not in ('photon', 'postage') else self.tap - - @property - def id(self): - return str(hash(self)).encode() - - def establish(self, context: zmq.Context = None): - """ - Open up the outbound sockets for status and data. If data is going to a file, verify it ca be opened for writing and close it - """ - if self._status_socket is not None or self._data_socket is not None: - getLogger(__name__).warning(f'{self} already established') - return - context = context or zmq.Context.instance() - self._status_socket = context.socket(zmq.PUB) - self._status_socket.connect(self.STATUS_ENDPOINT) - if self.data_endpoint.startswith('file://'): - f = self.data_endpoint.lstrip('file://') - if not os.access('/' + os.path.dirname(f), os.W_OK): - raise IOError(f'Unable to write to {f}') - return - self._data_socket = context.socket(zmq.PUB) - self._data_socket.connect(self.data_endpoint) - self._send_status('established') - - def destablish(self): - if self._status_socket is not None: - log = True - getLogger(__name__).debug(f'de-establishing status...') - try: - # NB the context or socket's linger setting determines how long any pending messages have to get sent off - self._status_socket.close() - self._status_socket = None - except AttributeError: - pass - if self._data_socket is not None: - log = True - getLogger(__name__).debug(f'de-establishing data...') - try: - self._data_socket.close() - self._data_socket = None - except AttributeError: - pass - if log: - getLogger(__name__).debug(f'...de-established.') - - def fail(self, message, raise_exception=False): - if self.completed: - return - try: - if self.data_endpoint.startswith('file://'): - pass - else: - self._data_socket.send_multipart([self.id, b'']) - self._send_status('failed', message) - self.destablish() - except zmq.ZMQError as ez: - getLogger(__name__).warning(f'Failed to send fail message {self} due to {ez}') - if raise_exception: - raise ez - - def finish(self, raise_exception=True): - if self.completed: - return - try: - if self.data_endpoint.startswith('file://'): - pass - else: - self._data_socket.send_multipart([self.id, b'']) - self._send_status('finished', f'Finished at UTC {datetime.utcnow()}') - self.destablish() - except zmq.ZMQError as ez: - getLogger(__name__).warning(f'Failed to send finished message {self} due to {ez}') - if raise_exception: - raise ez - - def abort(self, message, raise_exception=False): - if self.completed: - return - try: - self._data_socket.send_multipart([self.id, b'']) - self._send_status('aborted', message) - self.destablish() - except zmq.ZMQError as ez: - getLogger(__name__).warning(f'Failed to send abort message {self} due to {ez}') - if raise_exception: - raise ez - - def send_data(self, data, status='', copy=False, compress=True): - """ - Send a (chunk) of data out. By default, data is published as a blosc2 compressed array with the - capture id as the subscription key. CaptureRequests created with a file destination will be saved as - uncompressed npy data, appended on successive calls, no data will be published. - - Data is shipped in the (compressed) format it is given. - - Do not call this function from multiple threads. - Call .establish() in the calling thread before calling this function or after calling destablish() - - This function is intended to be zero-copy, is as of 10/27/23 and should be maintained as such. - - Args: - data: a array of data to send out - status: (optional) a status update message to send out - copy: copy the array first, you probably don't want this. copying the array then freeing the - underlying buffer causes segfaults with the pycharm debugger. Be aware that the property - zmq.COPY_THRESHOLD will case messages to be copied even if copy is False if the message size - is less than the threshold. - compress: compress the data with blosc2, ignored for file destinations. - - Returns: None | zmq.MessageTracker None is returned if copying or the destination is a file. A - segfault may occur if the data is deallocated prior to the messagetracker being done. - """ - if not self._status_socket: - raise RuntimeError('Status socket is not established.') - if self.data_endpoint.startswith('file://'): - with NpyAppendArray('/' + self.data_endpoint.lstrip('file://'), delete_if_exists=False) as x: - x.append(data) - self._send_status('capturing', status) - return - elif not self._data_socket: - raise RuntimeError('Data socket is not established.') - - # getLogger(__name__).debug(f'MiB Free: {memfree_mib()}') - if self.numpy_metric is not None: - out = np.empty(data.shape[1:] if data.ndim >1 else (1,)) - try: - getattr(np, self.numpy_metric)(data, axis=0, out=out) - except TypeError as e: - raise RuntimeError(f'numpy metric: {self.numpy_metric} failed with {e}') - except Exception as e: - raise RuntimeError(f'Encountered unexpected error related to numpy metric {e}') - data = out - else: - data = np.array(data) if copy else data - - do_compression = compress if self._compression_override is None else not self._compression_override - - # from line_profiler import LineProfiler - # profile = LineProfiler() - # profile.enable_by_count() - times = [] - times.append(time.perf_counter()) - # getLogger(__name__).debug(f'MiB Free: {memfree_mib()}') - compressed = blosc2.compress(data) if do_compression else data - times.append(time.perf_counter()) - # getLogger(__name__).debug(f'MiB Free: {memfree_mib()}') - self._send_status('capturing', status) - times.append(time.perf_counter()) - tracker = self._data_socket.send_multipart([self.id, compressed], copy=False, track=not copy) - times.append(time.perf_counter()) - # profile.disable_by_count() - #getLogger(__name__).debug(profile.get_stats()) - times = np.diff(times) * 1000 - getLogger(__name__).debug(f'Compress {times[0]:.2f} ms, Status Up {times[1]:.2f}, Send {times[2]:.2f}') - cval = 100 * len(compressed) / data.nbytes if data.nbytes else 100 - getLogger(__name__).debug(f'Sending {format_bytes(len(compressed))}, ' - f'compressed to {cval:.1f} % from {format_bytes(data.nbytes)}.') - return tracker - - def _send_status(self, status, message=''): - if not self._status_socket: - raise RuntimeError('No status_socket connection available') - update = f'{status}:{message}' - getLogger(__name__).getChild('statusio').debug(f'Published status update for {self}: "{update}"') - self._last_status = (status, message) - self._status_socket.send_multipart([self.id, update.encode()]) - - @property - def completed(self): - return self._last_status is not None and self._last_status[0] in ('finished', 'aborted', 'failed') - - def set_status(self, status, message='', context: zmq.Context = None, destablish=False): - """ - get appropriate context and send current status message after connecting the status socket - - context is ignored if a status destination connection is extant - """ - if not self._status_socket: - context = context or zmq.Context().instance() - self._status_socket = context.socket(zmq.PUB) - self._status_socket.connect(self.STATUS_ENDPOINT) - self._send_status(status, message) - if destablish: - self.destablish() - - @property - def size_bytes(self): - if self.numpy_metric: - return self.capture_atom_bytes - else: - return self.nsamp*self.capture_atom_bytes - - @property - def nchan(self): - if self.channels: - return len(self.channels) - elif self.tap == 'postage': - return N_POSTAGE_CHANNELS - elif 'iq' in self.tap or 'phase' in self.tap: - return N_CHANNELS - else: - return 1 - - @property - def dwid(self) -> int: - """Data size of a capture sample in bytes""" - if self.numpy_metric: - return 2 * 8 ## numpy math returns float64 (8 bytes) * 2 for IQ - if 'iq' in self.tap or self.tap in ('adc', 'postage'): - return 4 - elif 'phase' in self.tap: - return 2 - else: - return PHOTON_DTYPE.itemsize - - @property - def capture_atom_bytes(self) -> int: - return self.dwid*self.nchan - - @property - def buffer_shape(self): - """ (nsamples, nchannels, 1|2) """ - n = int(np.ceil(self.nsamp * MAXIMUM_DESIGN_COUNTRATE_PER_S / 1000)) if self.tap == 'photon' else self.nsamp - - if self.tap == 'postage': - return n, PHOTON_POSTAGE_WINDOW_LENGTH + 1, 2 - elif 'iq' in self.tap or self.tap == 'adc': - if self.numpy_metric: - return self.nchan, 2 - else: - return n, self.nchan, 2 - elif self.tap == 'photon': - return (n,) - elif 'phase' in self.tap: - if self.numpy_metric: - return self.nchan - else: - return n, self.nchan - else: - raise RuntimeError('Unknown tap: {self.tap}') - - -class CaptureSink(threading.Thread): - def __init__(self, req_nfo: [CaptureRequest, tuple], start=True): - if isinstance(req_nfo, CaptureRequest): - id, buffer_shape, size_bytes, source_url, _compression_override = req_nfo.id, req_nfo.buffer_shape, req_nfo.size_bytes, req_nfo.server.data_url, req_nfo._compression_override - else: - id, buffer_shape, size_bytes, source_url, _compression_override = req_nfo - super(CaptureSink, self).__init__(name=f'cap_id={id.decode()}') - self._expected_buffer_shape = buffer_shape - self._expected_bytes = size_bytes - self.daemon = True - self.cap_id = id - self.data_source = source_url - self.result = None - self._pipe = None - self._buf = [] - self._compression_override = _compression_override - self.socket = None - if start: - self.start() - - def start(self): - assert not self._started.is_set() - self._pipe = zpipe(zmq.Context.instance()) - super(CaptureSink, self).start() - - def kill(self): - self._pipe[0].send(b'') - self._pipe[0].recv() - self._pipe[1].close() - self._pipe[0].close() - - def _accumulate_data(self, d): - self._buf.append(d) - - def _finish_accumulation(self): - self._buf = b''.join(self._buf) - - def _finalize_data(self): - raise NotImplementedError - - def establish(self, ctx: zmq.Context = None): - ctx = ctx or zmq.Context.instance() - self.socket = ctx.socket(zmq.SUB) - self.socket.setsockopt(zmq.SUBSCRIBE, self.cap_id) - self.socket.connect(self.data_source) - return self.socket - - def receive(self): - id, data = self.socket.recv_multipart(copy=False) - if not data: - self.socket.close() - return None - return blosc2.decompress(data) if not self._compression_override else data - - def destablish(self): - self.socket.close() - self.socket = None - - def run(self): - assert self.socket is None - try: - self.establish() - poller = zmq.Poller() - poller.register(self._pipe[1], flags=zmq.POLLIN) - poller.register(self.socket, flags=zmq.POLLIN) - - getLogger(__name__).debug(f'Listening for data for {self.cap_id}') - self._pipe[1].send(b'') - while True: - avail = dict(poller.poll()) - if self._pipe[1] in avail: - getLogger(__name__).debug(f'Received shutdown order, terminating data acq. of {self}') - break - data = self.receive() # does decompression - if not data: - getLogger(__name__).debug(f'Received null, capture data stream over') - break - getLogger(__name__).debug(f'Received {format_bytes(len(data))} of {format_bytes(self._expected_bytes)} for {self}') - self._accumulate_data(data) - self._finish_accumulation() - self._finalize_data() - if self.result: - getLogger(__name__).info(f'Capture data for {self.cap_id} processed into {self.result.data.shape} ' - f'{self.result.data.dtype}: {self.result}') - else: - getLogger(__name__).info(f"Capture data for {self.cap_id} processed 'None'") - - except zmq.ZMQError as e: - getLogger(__name__).warning(f'Shutting down {self} due to {e}') - except AttributeError: - raise - finally: - self.destablish() - self._pipe[1].send(b'') - - def data(self, timeout=None): - self.join(timeout=timeout) - return self.result - - def ready(self): - self._pipe[0].recv() # block until a byte arrives - - -class StreamCaptureSink(CaptureSink): - def _finalize_data(self): - # raw adc data is i0q0 i1q1 int16 - bytes_recv = len(self._buf) - if not bytes_recv: - getLogger(__name__).warning(f'No data received for {self.cap_id}. ' - f'Expected {format_bytes(self._expected_bytes)}.') - return - elif bytes_recv > self._expected_bytes: - getLogger(__name__).warning(f'Received more data than expected for {self.cap_id}. ' - f'Expected {format_bytes(self._expected_bytes)} got ' - f'{format_bytes(bytes_recv)}.') - elif bytes_recv < self._expected_bytes: - getLogger(__name__).warning(f'Finalizing incomplete capture data for {self.cap_id}. ' - f'Expected {format_bytes(self._expected_bytes)} got ' - f'{format_bytes(bytes_recv)}.') - - - samples_recv = int(len(self._buf) / 2 // np.prod(self._expected_buffer_shape[1:])) # / 2 for 2 bytes per I or Q - shape = (min(samples_recv, self._expected_buffer_shape[0]),) + self._expected_buffer_shape[1:] - - dtype_bytes = self._expected_bytes // np.prod(shape, dtype=int) - if dtype_bytes == 8: - dtype = np.float64 - else: - dtype = np.int16 - if dtype_bytes != 2: - getLogger(__name__).warning(f'Received data has {dtype_bytes} bytes per sample. ' - f'Data may be lost in cast to int16 (2 bytes).') - - # TODO this might technically be a memory leak if we captured more data - self.result = np.frombuffer(self._buf, count=np.prod(shape, dtype=int), dtype=dtype).reshape(shape).squeeze() - - -class ADCCaptureSink(StreamCaptureSink): - def _finalize_data(self): - super()._finalize_data() - if self.result is not None: - self.result = ADCCaptureData(self.result) - - -class IQCaptureSink(StreamCaptureSink): - def _finalize_data(self): - super()._finalize_data() - if self.result is not None: - self.result = IQCaptureData(self.result) - - -class PhaseCaptureSink(StreamCaptureSink): - def _finalize_data(self): - super()._finalize_data() - if self.result is not None: - self.result = PhaseCaptureData(self.result) - - -class SimplePhotonSink(CaptureSink): - def _accumulate_data(self, d): - self._buf.append(d) - if sum(map(len, self._buf)) / 1024 ** 2 > 100: # limit to 100MiB - self._buf.pop(0) - - def _finish_accumulation(self): - self._buf = b''.join(self._buf) - - def _finalize_data(self): - self.result = PhotonCaptureData(self._buf) - - -class PhotonCaptureSink(CaptureSink): - - def capture(self, hdf, xymap, feedline_source, fl_ids): - t = threading.Thread(target=self._main, args=(hdf, xymap, feedline_source, fl_ids)) - t.start() - - @staticmethod - def _main(hdf, xymap, feedline_source, fl_ids, term_source='inproc://PhotonCaptureSink.terminator.inproc'): - """ - - Args: - xymap: [nfeedline, npixel, 2] array - feedline_source: zmq.PUB socket with photonbufers published by feedline - term_source: a zmq socket of undecided type for detecting shutdown requests - - Returns: None - - """ - - fl_npix = 2048 - n_fl = 5 - MAX_NEW_PHOTONS = 5000 - DETECTOR_SHAPE = (128, 80) - fl_id_to_index = np.arange(n_fl, dtype=int) - - context = zmq.Context.instance() - term = context.socket(zmq.SUB) - term.setsockopt(zmq.SUBSCRIBE, id) - term.connect(term_source) - - data = context.socket(zmq.SUB) - data.setsockopt(zmq.SUBSCRIBE, fl_ids) - data.connect(feedline_source) - - poller = zmq.Poller() - poller.register(term, flags=zmq.POLLIN) - poller.register(data, flags=zmq.POLLIN) - - live_image = np.zeros(DETECTOR_SHAPE) - live_image_socket = None - live_image_by_fl = live_image.reshape(n_fl, fl_npix) - photons_rabuf = np.recarray(MAX_NEW_PHOTONS, - dtype=(('time', 'u32'), ('x', 'u32'), ('y', 'u32'), - ('phase', 'u16'))) - - while True: - avail = poller.poll() - if term in avail: - break - - frame = data.recv_multipart(copy=False) - fl_id = frame[0] - time_offset = frame[1] - d = blosc2.decompress(frame[2]) - frame_duration = None # todo time coverage of data - # buffer is nchan*nmax+1 32bit: 16bit time(base2) 16bit phase - # make array of to [nnmax+1, nchan, 2] uint16 - # nmax will always be <<2^12 number valid will be at [0,:,0] - # times need oring w offset - # photon data is d[1:d[0,i,0], i, :] - - nnew = d[0, :, 0].sum() - # if we wanted to save binary data then we could save this, the x,y list, and the time offset - # mean pixel count rate in this packet is simply [0,:,0]/dt - fl_ndx = fl_id_to_index[fl_id] - live_image_by_fl[fl_ndx, :] += d[0, :, 0] / frame_duration - - # if live_image_ready - live_image_socket.send_multipart([f'liveim', blosc2.compress(live_image)]) - - cphot = np.cumsum(d[0, :, 0], dtype=int) - for i in range(fl_npix): - sl_out = slice(cphot[i], cphot[i] + d[0, i, 0]) - sl_in = slice(1, d[0, i, 0]) - photons_rabuf['time'][sl_out] = d[sl_in, :, 0] - photons_rabuf['time'][sl_out] |= time_offset - photons_rabuf['phase'][sl_out] = d[sl_in, :, 1] - photons_rabuf['x'][sl_out] = xymap[fl_ndx, i, 0] - photons_rabuf['y'][sl_out] = xymap[fl_ndx, i, 1] - hdf.grow_by(photons_rabuf[:nnew]) - - term.close() - data.close() - hdf.close() - - -class PostageCaptureSink(CaptureSink): - def _finalize_data(self): - # get_postage returns: - # ids (nevents uint16 array or res cahnnels), - # events (nevents x 127 x2 of iq da - - if len(self._buf) != np.prod(self._expected_buffer_shape) * 4: - getLogger(__name__).warning(f'Finalizing incomplete capture data for {self.cap_id}') - self.result = PostageCaptureData(self._buf) - - -class StatusListener(threading.Thread): - def __init__(self, id, source, initial_state='Created', start=True): - super().__init__(name=f'StautsListner_{id.decode()}') - self.daemon = True - self.source = source - self._pipe = None - self.id = id - self._status_messages = [initial_state] - if start: - self.start() - - def start(self): - assert not self._started.is_set() - self._pipe = zpipe(zmq.Context.instance()) - super(StatusListener, self).start() - - def kill(self): - self._pipe[0].send(b'') - self._pipe[0].recv() - self._pipe[0].close() - self._pipe[1].close() - - @staticmethod - def is_final_status_message(update): - """return True iff message is a final status update""" - for r in CaptureRequest.FINAL_STATUSES: - if update.startswith(r): - return True - return False - - def run(self): - try: - ctx = zmq.Context().instance() - with ctx.socket(zmq.SUB) as sock: - sock.linger = 0 - sock.setsockopt(zmq.SUBSCRIBE, self.id) - sock.connect(self.source) - - poller = zmq.Poller() - poller.register(self._pipe[1], flags=zmq.POLLIN) - poller.register(sock, flags=zmq.POLLIN) - getLogger(__name__).debug(f'Listening for status updates to {self.id}') - self._pipe[1].send(b'') - while True: - avail = dict(poller.poll()) - if self._pipe[1] in avail: - getLogger(__name__).debug(f'Shutdown requested, terminating {self}') - break - elif sock not in avail: - time.sleep(.1) # play nice - continue - id, update = sock.recv_multipart() - assert id == self.id or not self.id - update = update.decode() - self._status_messages.append(update) - getLogger(__name__).debug(f'Status update for {id}: {update}') - if self.is_final_status_message(update): - break - except zmq.ZMQError as e: - getLogger(__name__).critical(f"{self} died due to {e}") - finally: - self._pipe[1].send(b'') - - def latest(self): - return self._status_messages[-1] - - def history(self): - return tuple(self._status_messages) - - def ready(self): - self._pipe[0].recv() # TODO make this line block - - -class ADCCaptureData: - """ - Formats raw ADC capture data to complex floats representing mV at the ADC input SMA. - """ - - def __init__(self, raw_data): - """ - ADC data captured by an ADC capture request - Args: - raw_data: raw data captured by adc capture - """ - self.raw = raw_data - - @property - def dtype(self): - return self.raw.dtype - - @property - def shape(self): - return self.raw.shape - - @cached_property - def data(self): - return convert_adc_raw_to_mv(self.raw[:, 0] + 1j * self.raw[:, 1]) - - -class IQCaptureData: - """ - Formats raw IQ capture data to complex floats between +/- 1. - """ - - def __init__(self, raw_data): - self.raw = raw_data - - @property - def dtype(self): - return self.raw.dtype - - @property - def shape(self): - return self.raw.shape - - @cached_property - def data(self): - return raw_iq_to_unit(self.raw[..., 0] + self.raw[..., 1] * 1j) - - -class PhaseCaptureData: - def __init__(self, data): - self.raw = data - - @property - def dtype(self): - return self.raw.dtype - - @property - def shape(self): - return self.raw.shape - - @cached_property - def data(self): - return raw_phase_to_radian(self.raw, scaled=False) - - -class PhotonCaptureData: - def __init__(self, buf): - self.raw = np.frombuffer(buf, dtype=PHOTON_DTYPE_PACKED) - self.photons = unpack_photons(self.raw) - - @property - def data(self): - return self.photons - - @property - def dtype(self): - return PHOTON_DTYPE - - @property - def shape(self): - return self.photons.size - - -class PostageCaptureData: - def __init__(self, buf) : - result = np.frombuffer(buf, count=len(buf) // 2, dtype=np.int16) - n = result.size//2//128 - shape = (n, 128, 2) - result = result.reshape(shape) - from ..funcs import postage_buffer_to_data - ids, events = postage_buffer_to_data(result, complex=True, scaled=True) - self.ids = ids # result[:, 0, 0].astype(np.uint16) - self.iqs = events # result[:, 1:, :] - self.raw = result - - @property - def dtype(self): - return np.int16 - - @property - def shape(self): - return self.iqs.shape - - @property - def data(self): - return self.iqs - - -class CaptureJob: - def __init__(self, request: CaptureRequest, submit=False): - """ - Args: - request: - submit: tuple of bools (with sink?, with status?) or True or False - """ - self.request = request - self.submitted = False - - self._status_listener = StatusListener(request.id, request.server.status_url, - initial_state='created', start=False) - - if request.tap == 'adc': - datasink = ADCCaptureSink - elif 'iq' in request.tap: - datasink = IQCaptureSink - elif 'phase' in request.tap: - datasink = PhaseCaptureSink - elif request.tap == 'photon': - datasink = SimplePhotonSink - elif request.tap == 'postage': - datasink = PostageCaptureSink - else: - raise ValueError(f'Unknown tap location {request.tap}') - - self.datasink = datasink(request, start=False) - - if submit: - self.submit() - elif isinstance(submit, tuple): - self.submit(**submit) - - def __del__(self): - self._kill_workers(kill_status_monitor=True, kill_data_sink=True) - - def status(self): - """ Return the last known status of the request """ - return self._status_listener.latest() - - def status_history(self) -> tuple[str]: - """Return a tuple of the status history of the job""" - return self._status_listener.history() - - def cancel(self, kill_status_monitor=False, kill_data_sink=True): - """ - - Args: - kill_status_monitor: Whether to terminate the status monitor - kill_data_sink: Whether to terminate the data saver - Returns: The parsed json submission result - - """ - self._kill_workers(kill_status_monitor=kill_status_monitor, kill_data_sink=kill_data_sink) - ctx = zmq.Context().instance() - with ctx.socket(zmq.REQ) as s: - s.connect(self.request.server.command_url) - s.send_pyobj(('abort', self.request.id)) - return s.recv_pyobj() - - def _kill_workers(self, kill_status_monitor=True, kill_data_sink=True): - if kill_status_monitor and self._status_listener.is_alive(): - try: - self.datasink.kill() - except zmq.ZMQError as e: - getLogger(__name__).warning(f'Caught {e} when telling data sink to terminate') - if self._status_listener.is_alive(): - getLogger(__name__).warning(f'Status listener did not instantly terminate') - if kill_data_sink and self.datasink.is_alive(): - try: - self.datasink.kill() - except zmq.ZMQError as e: - getLogger(__name__).warning(f'Caught {e} when telling data sink to terminate') - if self.datasink.is_alive(): - getLogger(__name__).warning(f'Data sink did not instantly terminate') - - def data(self, timeout=None): - return self.datasink.data(timeout=timeout) - - def submit(self, with_sink=False, with_status=True): - if with_status: - self._status_listener.start() - self._status_listener.ready() - - if with_sink and not self.request.data_endpoint.startswith('file://'): - self.datasink.start() - self.datasink.ready() - - try: - ret = self.request.submit() - self.submitted = True - return ret - except Exception as e: - self._status_listener.kill() - self.datasink.kill() - raise e diff --git a/mkidgen3/server/feedline_client.py b/mkidgen3/server/feedline_client.py deleted file mode 100644 index ed982c5..0000000 --- a/mkidgen3/server/feedline_client.py +++ /dev/null @@ -1,95 +0,0 @@ -import zmq -import logging -logging.basicConfig(level=logging.DEBUG) -from logging import getLogger -from mkidgen3.server.feedline_config import WaveformConfig -from mkidgen3.server.captures import CaptureRequest, CaptureJob, FeedlineConfig - - -""" -Setting up from scratch -1. Run sweeps (power and freq.) to find res and drive power -2. Process -3. Rerun 1&2 with fixed freq to finialize optimal drive power -4. Run IQ sweeps to find loop centers -5. Process -6. capture Optimal filter phase data -7. Process -8. capture phase data for thresholding -9. Process -10. ready to observe - -Observing - -IrrOps -Observing, feedline is acting weird, what do? -0. stop observing or cut out feedline -1. e.g. reset/power cycle/replace feedline -2. resume - - -""" - - - -ctx = zmq.Context.instance() -ctx.linger = 0 - -# cap command default 8888 -# cap data 8889 -# cap status 9000 - -feedline_server = 'tcp://localhost:8888' -capture_data_server = 'tcp://localhost:8889' -status_server = 'tcp://localhost:8890' - - -frequencies = [] -coefficients = [] -thresholds= [] -holdoffs = [] -fc = FeedlineConfig(bitstream=dict(bitstream=None, ignore_version=None), - rfdc_clk=dict(programming_key=None, clock_source=None), - rfdc=dict(dac_mts=True, adc_mts=False, adc_gains=None, dac_gains=None), - if_board=dict(lo=None, adc_atten=None, dac_atten=None), - waveform=dict(frequencies=None, amplitudes=None, phases=None, iq_ratios=None, phase_offsets=None, maximize_dynamic_range=True), - chan=dict(frequencies=frequencies), - ddc=dict(tones=None, loop_center=None, phase_offset=None, center_relative=None, quantize=None), - filter=dict(coefficients=coefficients), - trig=dict(thresholds=thresholds, holdoffs=holdoffs)) -cr = CaptureRequest(1337, 'iq', fc) -cj = CaptureJob(cr, feedline_server, capture_data_server, status_server, submit=False) -cj.submit() -x = cj.data(timeout=5) #should be almost -print(cj.status_history()) - -#Start the capture of photons from an MKID array, assume FeedlineReadoutServers are running on all the boards -feedline_server_urls = [] -data_server_urls = [] -status_server_urls = [] - -#Generate a suitable FeedlineConfig for each server -fc = FeedlineConfig() -buff_duration_ms = 100 -jobs = [] -for a,b,c in zip(feedline_server_urls, data_server_urls, status_server_urls): - cr = CaptureRequest(buff_duration_ms, 'photons', fc, feedline_server=a) - jobs.append(CaptureJob(cr, a, b, c, submit=True)) - - - - -cap_data_urls = [] -from zmq.devices import ThreadDevice - -data_server_internal = 'inproc://cap_data.xpub' -# Set up a proxy for routing all the capture requests -dtd = ThreadDevice(zmq.QUEUE, zmq.XSUB, zmq.XPUB) -dtd.setsockopt_in(zmq.LINGER, 0) -dtd.setsockopt_out(zmq.LINGER, 0) -for url in cap_data_urls: - dtd.connect_in(url) -dtd.bind_out(data_server_internal) -dtd.daemon = True -dtd.start() -getLogger(__name__).info(f'Relaying all capture data from {cap_data_urls} to {data_server_internal}') diff --git a/mkidgen3/server/feedline_server.py b/mkidgen3/server/feedline_server.py deleted file mode 100644 index 03d4a39..0000000 --- a/mkidgen3/server/feedline_server.py +++ /dev/null @@ -1,466 +0,0 @@ -import logging -import pickle -import time - -from logging import getLogger - -from mkidgen3.server.captures import CaptureRequest -from mkidgen3.server.feedline_config import RFDCConfig -from mkidgen3.server.fpga_objects import FeedlineHardware, DEFAULT_BIT_FILE -from mkidgen3.server.misc import zpipe -from mkidgen3.util import check_active_jupyter_notebook -import asyncio -import zmq -import threading -from mkidgen3.util import setup_logging -from datetime import datetime -import argparse - -COMMAND_LIST = ('reset', 'capture', 'bequiet', 'status') - - -class TapThread: - def __init__(self, target, cr:CaptureRequest): - context = zmq.Context().instance() - a, b = zpipe(context) - t = threading.Thread(target=target, name=f"TapThread: {cr.tap}:{cr.id}", args=(b, cr), kwargs=dict(context=context)) - self.thread = t - self.request = cr - self._pipe = a - self._other_pipe = b - t.start() - - def __repr__(self): - a = 'running' if self.thread.is_alive() else 'stopped' - return f'<{self.thread.name} ({a})>' - - def abort(self): - try: - # Abort the thread, not the request, the thread will handle the abort of the request if necessary! - getLogger(__name__).debug(f'Sending abort to worker: {self}') - self._pipe.send(b'abort') - except zmq.ZMQError: - getLogger(__name__).critical(f'Error sending abort to worker thread {self.thread}') - raise - - def __del__(self): - try: - self._pipe.close() - self._other_pipe.close() - except zmq.error.ContextTerminated: - pass - -class FeedlineReadoutServer: - def __init__(self, bitstream, clock_source='4.096GSPS_MTS_dualloop', if_port='dev/ifboard', ignore_version=False, - program_clock=True, mts=True, download=False): - self.hardware = FeedlineHardware(bitstream, clock_source=clock_source, if_port=if_port, - ignore_version=ignore_version, program_clock=program_clock, - rfdc=RFDCConfig(dac_mts=mts, adc_mts=False), download=download) - - self._tap_threads = {k: None for k in ('photon', 'postage', 'engineering')} - self._to_check = [] - self._checked = [] - self._cap_pipe=None - - def status(self): - """ - - Returns: Dictionary of status information - - """ - status = {'hardware': self.hardware.status(), - 'running_captures': self._running_captures(), - 'pending_captures': self._pending_captures()} - - return status - - def _running_captures(self): - return tuple([tt.request.id for tt in list(self._tap_threads.values())]) - - def _pending_captures(self): - return tuple([cr.id for cr in self._checked+self._to_check]) - - def _abort_all(self, join=False, reason='Abort all', raisezmqerror=True, also:tuple|list|CaptureRequest=tuple()): - toabort=self._checked + self._to_check - if isinstance(also, CaptureRequest): - also = [also] - toabort.extend(also) - for cr in toabort: - cr.establish() - cr.abort(reason) # signal that captures will never happen - cr.destablish() - self._checked, self._to_check = [], [] - # stop any running tap threads - for tt in (tt for tt in self._tap_threads.values() if tt is not None): - try: - tt.abort() - except zmq.ZMQError as e: - if raisezmqerror: - raise e - if join: - for tt in self._tap_threads.values(): - if tt: - tt.thread.join() - - def _abort_by_id(self, id): - aborted = False - running_by_id = {tt.request.id: tt for tt in self._tap_threads.values() if tt is not None} - if id in running_by_id: - tt = running_by_id[id] - aborted = True - getLogger(__name__).debug(f'Found request {id} being serviced in {tt}. Aborting servicer.') - tt.abort() - for cr in filter(lambda x: x.id == id, self._checked): - aborted = True - getLogger(__name__).debug(f'Found request {id} in list of checked pending CR. Aborted') - self._checked.pop(self._checked.index(cr)) - cr.abort('Abort by id') - for cr in filter(lambda x: x.id == id, self._to_check): - aborted = True - getLogger(__name__).debug(f'Found request {id} in list of pending CR to be checked. Aborted') - self._to_check.pop(self._to_check.index(cr)) - cr.abort('Abort by id') - - if not aborted: - getLogger(__name__).info(f'Capture request {id} is unknown and cannot be aborted.') - - def create_capture_handler(self, start=True, daemon=False, context: zmq.Context = None): - self._cap_pipe, self._cap_pipe_thread = zpipe(context or zmq.Context.instance()) - - thread = threading.Thread(name='FRS CR Handler', target=self._main, args=(self._cap_pipe_thread,), - kwargs={'context': context}, daemon=daemon) - if start: - thread.start() - - return thread - - def terminate_capture_handler(self): - if self._cap_pipe: - self._cap_pipe.send_pyobj(('exit', None)) - self._cap_pipe.close() - - def __del__(self): - try: - self._cap_pipe_thread.close() - except: - pass - - def abort_all(self): - if self._cap_pipe: - self._cap_pipe.send_pyobj(('abort', 'all')) - - def abort(self, id): - if self._cap_pipe: - self._cap_pipe.send_pyobj(('abort', id)) - - def capture(self, capture_request): - if self._cap_pipe: - self._cap_pipe.send_pyobj(('capture', capture_request)) - - def _cleanup_completed(self): - """ Return true iff the effective config requirements changed """ - # Check to see if any capture threads have finished - complete = [k for k, t in self._tap_threads.items() if t is not None and not t.thread.is_alive()] - # for each finished capture thread remove its settings from the requirements pot and cleanup - - if bool(complete): # need to check up to the size of the queue if anything finished - # if the effective settings didn't change on completion we don't need to recheck stuff - # ignore this optimization for now - self._to_check.extend(self._checked) - self._checked = [] - - effective_changed = False - for k in complete: - effective_changed |= self.hardware.derequire_config(self._tap_threads[k].request.id) - del self._tap_threads[k] - self._tap_threads[k] = None - - return effective_changed - - def _main(self, pipe: zmq.Socket, context: zmq.Context = None): - """ - Enqueue a list of capture requests for future handling. Invalid requests are dealt with immediately and not - enqueued. - - Args: - pipe: a pipe for receiving capture requests - context: zmq.Context - - Returns: None - - """ - context = context or zmq.Context().instance() - - #THIS LOOP SEEMS TO ACTUALLY be getting used ONLY to be deleted with the pynq.UIO reader deletion - #when the threaded interrupt manager starts stuff up and nixes pynqs reader - # (that apparently exists in this thread) - try: - aio_eloop = asyncio.get_running_loop() - except RuntimeError: - aio_eloop = asyncio.new_event_loop() - getLogger(__name__).warning('Creating but not starting a thread that really should be axed and ' - 'optimized away') - t=threading.Thread(daemon=True, target=aio_eloop.run_forever, name='plramcap_eloop') - - asyncio.set_event_loop(aio_eloop) - - getLogger(__name__).info('Main thread starting') - while True: - - effective_changed = self._cleanup_completed() - - running_by_id = {tt.request.id: tt for tt in self._tap_threads.values() if tt is not None} - - cr = None # CR is the capture request that will be ckicked off this iteration of the loop - # check for any incoming info: CapRequest, ABORT id|all, EXIT - cmd, data = '', '' - try: - cmd, data = pipe.recv_pyobj(zmq.NOBLOCK) - except zmq.ZMQError as e: - if e.errno != zmq.EAGAIN: - self._abort_all(reason='Keyboard interrupt') - if e.errno == zmq.ETERM: - break - else: - raise e # real error - - if cmd not in ('exit', 'abort', 'capture', ''): - getLogger(__name__).error(f'Received invalid command "{cmd}"') - cmd, data = '', '' - - if cmd == 'exit': - self._abort_all(join=True) - break - elif cmd == 'abort': - if data == 'all': - self._abort_all(join=True) - else: - self._abort_by_id(data) - elif cmd == 'capture': - self.hardware.config_manager.learn(data.feedline_config) - unknown = self.hardware.config_manager.unlearned_hashes(data.feedline_config) - if unknown: # We've never been sent the full config necessary - try: - data.establish() - #TODO this code seems to imply json "{'resp': 'ERROR', 'data': unknown}" - data.fail(f'ERROR: Full FeedlineConfig never sent: {unknown}') - except zmq.ZMQError as e: - getLogger(__name__).error(f'Unable to fail request with hashed config due to {e}. ' - f'Silently dropping request {data.id}') - elif (not self._to_check and self._tap_threads[data.type] is None and - self.hardware.config_compatible_with(data.feedline_config)): - cr = data # this can be run and nothing else, so it will be done below - else: - q = self._to_check if self._to_check else self._checked - try: - data.set_status('queued', f'Queued', destablish=True) - q.append(data) - except zmq.ZMQError as e: - getLogger(__name__).error(f'Unable to update status due to {e}. Silently dropping request' - f' {data.id}') - - # cant be run because there might be something more important (we check anyway), - # the tap is in use (we check when the tap finishes) - # settings aren't compatible (we will check when something finishes) - - if not cr: - try: - cr = self._to_check.pop(0) - except IndexError: - continue - - assert isinstance(cr, CaptureRequest) - - try: - if self._tap_threads[cr.type] is not None: - cr.set_status('queued', f'tap location in use by: {self._tap_threads[cr.type].request.id}') - self._checked.append(cr) - continue - else: - if not self.hardware.config_compatible_with(cr.feedline_config): - cr.set_status('queued', f'incompatible with one or more of: {running_by_id.keys()}') - self._checked.append(cr) - continue - except zmq.ZMQError as e: - getLogger(__name__).error(f'Unable to update status due to {e}. Silently aborting request {cr}.') - continue - - cr.destablish() # ensure nothing lingers from any status messages - - try: - self.hardware.apply_config(cr.id, cr.feedline_config) - except Exception as e: - self.hardware.derequire_config(id) # Not necessary as we are dying, but let's die in a clean house - getLogger(__name__).critical(f'Hardware settings failure: {e}. Aborting all requests and dying.') - self._abort_all(reason='Hardware settings failure', raisezmqerror=False, join=False, also=cr) - break - - self.start_tap_thread(cr) - - aio_eloop.close() - getLogger(__name__).info('Capture thread exiting') - - def start_tap_thread(self, cr): - """ - Start a thread to service a capture request - - Args: - cr: a capture request. - - Returns: - - """ - assert self._tap_threads.get(cr.type, None) is None, 'Only one TapThread per location may be created at a time' - cap_runners = {'engineering': self.hardware.plram_cap, 'photon': self.hardware.photon_cap, - 'postage': self.hardware.postage_cap} - target = cap_runners[cr.type] - - - cr.set_status('running', f'Started at UTC {datetime.utcnow()}', destablish=True) - - self._tap_threads[cr.type] = TapThread(target, cr) - - -def parse_cl(): - parser = argparse.ArgumentParser(description='Feedline Readout Server', add_help=True) - parser.add_argument('-p', '--port', dest='port', action='store', required=False, type=int, - help='Server port', default='8888') - parser.add_argument('--cap_port', dest='capture_port', action='store', required=False, type=int, - help='Capture Data Port', default='8889') - parser.add_argument('--sta_port', dest='status_port', action='store', required=False, type=int, - help='Capture Status Port', default='8890') - parser.add_argument('--clock', dest='clock', action='store', required=False, type=str, - help='Clock Source', default='default') - parser.add_argument('-b', '--bitstream', dest='bitstream', action='store', required=False, type=str, - help='bitstream file', - default=DEFAULT_BIT_FILE) - parser.add_argument('--if', dest='ifboard', action='store', required=False, type=str, - help='IF Board device', default='/dev/ifboard') - parser.add_argument('--iv', dest='ignore_fpga_driver_version', action='store_true', required=False, - help='Ignore FPGA driver version checks', default=False) - return parser.parse_args() - - -def start_zmq_devices(cap_addr, stat_addr): - from zmq.devices import ThreadDevice - - cap_addr_internal = 'inproc://cap_data.xsub' - stat_addr_internal = 'inproc://cap_stat.xsub' - # Set up a proxy for routing all the capture requests - dtd = ThreadDevice(zmq.QUEUE, zmq.XSUB, zmq.XPUB) - dtd.setsockopt_in(zmq.LINGER, 0) - dtd.setsockopt_out(zmq.LINGER, 0) - dtd.bind_in(cap_addr_internal) - dtd.bind_out(cap_addr) - dtd.daemon = True - dtd.start() - getLogger(__name__).info(f'Publishing capture data to {cap_addr} from relay @ {cap_addr_internal}') - - std = ThreadDevice(zmq.QUEUE, zmq.XSUB, zmq.XPUB) - std.setsockopt_in(zmq.LINGER, 0) - std.setsockopt_out(zmq.LINGER, 0) - std.bind_in(stat_addr_internal) - std.bind_out(stat_addr) - std.daemon = True - std.start() - getLogger(__name__).info(f'Publishing capture status information to {stat_addr} from relay @ {stat_addr_internal}') - - return dtd, std - - -if __name__ == '__main__': - - import os, time - os.environ['TZ'] = 'right/UTC' - time.tzset() - setup_logging('feedlinereadoutserver') - - if check_active_jupyter_notebook(): - raise RuntimeError('Jupyter notebooks are running, shut them down first.') - - args = parse_cl() - - context = zmq.Context.instance(io_threads=2) - context.linger = 1 - - fr = FeedlineReadoutServer(args.bitstream, clock_source=args.clock, if_port=args.ifboard, - ignore_version=args.ignore_fpga_driver_version, program_clock=True, mts=True, - download=False) - - # Set up proxies for routing all the capture data and status - cap_addr = f'tcp://*:{args.capture_port}' - stat_addr = f'tcp://*:{args.status_port}' - start_zmq_devices(cap_addr, stat_addr) - - # Set up a command port - command_port = args.port - cmd_addr = f"tcp://*:{command_port}" - socket = context.socket(zmq.REP) - socket.bind(cmd_addr) - - # Start up the main thread - thread = fr.create_capture_handler(context=context, start=True, daemon=False) - - getLogger(__name__).info(f'Accepting commands on {cmd_addr}') - - while True: - try: - cmd, arg = socket.recv_pyobj() - except zmq.ZMQError as e: - getLogger(__name__).error(f'Caught {e}, aborting and shutting down') - fr.terminate_capture_handler() - break - except KeyboardInterrupt: - getLogger(__name__).error(f'Keyboard Interrupt, aborting and shutting down') - fr.terminate_capture_handler() - break - except pickle.UnpicklingError: - socket.send_pyobj('ERROR: Ignoring unpicklable command') - getLogger(__name__).error(f'Ignoring unpicklable command') - continue - else: - if not thread.is_alive(): - getLogger(__name__).critical(f'Capture thread has died prematurely. All existing captures will ' - f'never complete. Exiting.') - socket.send_pyobj('ERROR') - break - - getLogger(__name__).debug(f'Received command "{cmd}" with args {arg}') - - if cmd == 'reset': - socket.send_pyobj('OK') - fr.terminate_capture_handler() - thread.join() - fr.hardware.reset() - thread = fr.create_capture_handler(context=zmq.Context.instance(), start=True, daemon=False) - - elif cmd == 'status': - try: - status = fr.status() # this might take a while and fail - except Exception as e: - status = {'hardware': str(e)} - status['id'] = f'FRS {args.fl_id} @ {args.port}/{args.cap_port}' - socket.send_pyobj(status) - - elif cmd == 'bequiet': - fr.abort_all() - try: - fr.hardware.bequiet(**arg) # This might take a while and fail - socket.send_pyobj('OK') - except Exception as e: - socket.send_pyobj(f'ERROR: {e}') - - elif cmd == 'capture': - fr.capture(arg) - socket.send_pyobj({'resp': 'OK', 'code': 0}) - - elif cmd == 'abort': - fr.abort(arg) - socket.send_pyobj({'resp': 'OK', 'code': 0}) - - else: - socket.send_pyobj({'resp': 'ERROR', 'code': 0}) - - thread.join() - socket.close() - context.term() diff --git a/mkidgen3/server/waveform.py b/mkidgen3/server/waveform.py index 5becd07..53c3380 100644 --- a/mkidgen3/server/waveform.py +++ b/mkidgen3/server/waveform.py @@ -1,61 +1,11 @@ -from mkidgen3.funcs import * -import logging +from mkidgen3.funcs import quantize_frequencies import numpy as np import numpy.typing as nt -import platform -import matplotlib.pyplot as plt from mkidgen3.system_parameters import (ADC_DAC_INTERFACE_WORD_LENGTH, DAC_RESOLUTION, DAC_SAMPLE_RATE, SYSTEM_BANDWIDTH, DAC_FREQ_RES, DAC_FREQ_MAX, DAC_FREQ_MIN) - -def _same(a, b): - """quick test for array vs None""" - if not isinstance(a, type(b)): - return False - if a is None and b is None: - return True - try: - return np.all(a == b) - except: - pass - try: - return a == b - except: - return False - - -class Waveform: - @property - def output_waveform(self): - """Subclasses shall implement _values """ - return self._values - - @property - def sample_rate(self): - """Subclasses shall implement _sample_rate """ - return self._sample_rate - - -class TabulatedWaveform(Waveform): - """ - Use this class if you want to pass existing tabulated values directly to the DAC LUT without any scaling or - computation - """ - - def __init__(self, tabulated_values=None, sample_rate=DAC_SAMPLE_RATE): - self._values = tabulated_values - self._sample_rate = sample_rate - - def __str__(self): - return f'TabulatedWaveform with sample rate {self._sample_rate}' - - def __ne__(self, other): - return not (self._sample_rate == other.sample_rate and - _same(self._values, other._values)) - - -class SimpleFreqlistWaveform(Waveform): +class SimpleWaveform(Waveform): def __init__( self, frequencies, @@ -107,220 +57,3 @@ def _values(self): ) ) return data * ((1 << 15) - 6) - - -class FreqlistWaveform(Waveform): - def __init__(self, frequencies: Iterable[int | float] = None, n_samples: int = DAC_LUT_SIZE, - sample_rate: int | float = DAC_SAMPLE_RATE, amplitudes: Iterable[int | float] = None, - phases: Iterable[int | float] = None, iq_ratios: Iterable[int | float] = None, - phase_offsets: Iterable[int | float] = None, seed: int = 2, dac_dynamic_range: float = 1.0, - optimize_phase: bool = True, compute: bool = False): - """ - Args: - frequencies: frequencies in the waveform [Hz] - n_samples: number of complex samples in the waveform - sample_rate: waveform sample rate [Hz] (should be the DAC sample rate) - amplitudes: amplitudes of each tone in the waveform. If None, all ones is assumed - phases: phases of each tone in the waveform in [0, 2*pi). If None, random phases are generated using seed - iq_ratios: ratios for IQ values used to help minimize image tones in band. Allowed values between 0 and 1 - If None, 50:50 ratio (all ones) is assumed - phase_offsets: phase offsets in [0, 2*np.pi) - seed: random seed to seed phase randomization process - dac_dynamic_range: how much of dac dynamic range to use. Allow values [0.0,1.0]. Default is 1.0 (all) - optimize_phase: check quantization error and re-generate waveform with new random phases if too large - compute: compute waveform - """ - self.freqs = np.asarray(frequencies) - assert (DAC_FREQ_MIN <= self.freqs).all() and (self.freqs <= DAC_FREQ_MAX).all(), (f"freqs must be in " - f"[{DAC_FREQ_MIN}, " - f"{DAC_FREQ_MAX}]") - self.n_samples = n_samples - self.amps = amplitudes if amplitudes is not None else np.ones_like(frequencies) - self._sample_rate = sample_rate - self._optimize_phase = optimize_phase - - if phases is None: - self.phases = np.random.default_rng(seed=seed).uniform(0, 2 * np.pi, size=self.freqs.size) - else: - self.phases = np.asarray(phases) - assert (0 <= self.phases).all and (self.phases < 2*np.pi).all, "phases must be between 0 and 2 pi" - self.dac_dynamic_range = dac_dynamic_range - - self.iq_ratios = np.asarray(iq_ratios) if iq_ratios is not None else np.ones_like(frequencies) - self.phase_offsets = np.asarray(phase_offsets) if phase_offsets is not None else np.zeros_like(frequencies) - assert (0 <= self.phase_offsets).all and (self.phase_offsets < 2*np.pi).all, ("phase offsets must be between 0 " - "and 2 pi") - self.quant_freqs = quantize_frequencies(self.freqs, rate=sample_rate, n_samples=n_samples) - - self._seed = seed - self.__values = None # cache - - self.quant_vals = None - self.quant_error = None - - if compute: - self.output_waveform - - def __ne__(self, other): - if self.quant_vals is not None and other.n_samples is not None: - return not _same(self.quant_vals, other.quant_vals) - - return not (self.dac_dynamic_range == other.dac_dynamic_range and - _same(self.iq_ratios, other.iq_ratios) and - self.n_samples == other.n_samples and - _same(self.phases, other.phases) and - _same(self.quant_freqs, other.quant_freqs) and - self._sample_rate == other._sample_rate and - _same(self.amps, other.amps) and - _same(self.phase_offsets, other.phase_offsets)) - - def __repr__(self): - return f'<{str(self)}>' - - def __str__(self): - preview_dict = {'freqs': self.freqs, 'amps': self.amps, 'phases': self.phases, - 'iq_ratios': self.iq_ratios, 'phase_offsets': self.phase_offsets, - 'quant_error': self.quant_error} - for key, value in preview_dict.items(): - if value is None or (value.size < 3): - preview_dict[key] = value - else: - preview_dict[key] = value[:3] - - return f'FreqlistWaveform: {preview_dict}' - - @property - def _values(self) -> nt.NDArray[np.complex128]: - """ - Return or calculate waveform values - Returns: Complex values where the real and imag part have been quantized to ints in accordance with specified - dac dynamic range - - """ - if self.quant_vals is None: - self.__values = self._compute_waveform() - - self.quant_vals, self.quant_error = quantize_to_int(self.__values, resolution=DAC_RESOLUTION, signed=True, - word_length=ADC_DAC_INTERFACE_WORD_LENGTH, - dyn_range=self.dac_dynamic_range, return_error=True) - if self._optimize_phase: - self._optimize_random_phase(max_quant_err=1 * predict_quantization_error(resolution=DAC_RESOLUTION), - max_attempts=3) - return self.quant_vals - - def _compute_waveform(self, phases: Iterable | None = None) -> nt.NDArray[np.complex64]: - """ - Compute the raw waveform with no scaling or casting. - Args: - phases: new phases to compute waveform with (useful for re-generating random phases) - - Returns: Raw waveform values. - - """ - iq = np.zeros(self.n_samples, dtype=np.complex64) - # generate each signal - t = 2 * np.pi * np.arange(iq.size) / self._sample_rate - - phases = self.phases if phases is None else phases - - if self.phase_offsets.any() or (self.iq_ratios != 1).any() or self.phases.ndim > 1: - logging.getLogger(__name__).debug( - f'Computing net waveform with {self.freqs.size} tones in a loop to apply IQ ratios and phase offsets.\n' - f'For 2048 tones this takes about 7 min.') - for i in range(self.freqs.size): - exp = self.amps[i] * np.exp(1j * (t * self.quant_freqs[i] + phases[i])) - scaled = np.sqrt(2) / np.sqrt(1 + self.iq_ratios[i] ** 2) - c1 = self.iq_ratios[i] * scaled * np.exp(1j * np.deg2rad(self.phase_offsets)[i]) - iq.real += c1.real * exp.real + c1.imag * exp.imag - iq.imag += scaled * exp.imag - - else: - logging.getLogger(__name__).debug( - f'Computing net waveform with {self.freqs.size} tones using IFFT.') - possible_tones = np.linspace(-DAC_SAMPLE_RATE/2, (DAC_SAMPLE_RATE/2)-DAC_FREQ_RES, DAC_LUT_SIZE) - tone_idxs = np.concatenate([np.where(possible_tones == freq) for freq in self.quant_freqs]).flatten() - fft = np.zeros(2**19, dtype=np.complex64) - for tone_number, tone_idx in enumerate(tone_idxs): - fft[tone_idx] = self.amps[tone_number]*np.exp(1j*self.phases[tone_number]) - iq = np.fft.ifft(np.fft.fftshift(fft)) - - return iq - - def _optimize_random_phase(self, - max_quant_err: float | int = 1 * predict_quantization_error(resolution=DAC_RESOLUTION), - max_attempts: int = 3) -> None: - """ - Regenerate random phases, waveform values, and quantized values if quantization error is too high. - Args: - max_quant_err: maximum absolute allowable quantization error defined as abs(expected value - achieved value) - max_attempts: maximum numer of times to regenerate random phases, waveform values, and quantized values - - Returns: None - - Waveform random phases, waveform values, quantized values, and quantization error are only updated if a - solution is found. - """ - - if max_quant_err is None: - max_quant_err = 3 * predict_quantization_error(resolution=DAC_RESOLUTION) - - if self.quant_error < max_quant_err: # already optimal - return - - quant_error = self.quant_error - cnt = 0 - while quant_error > max_quant_err: - logging.getLogger(__name__).warning( - f"Quantization error {quant_error} exceeded max quantization error {max_quant_err}. The freq comb's " - f"relative phases may have added up sub-optimally. Calculating with new random phases") - self._seed += 1 - self.phases = np.random.default_rng(seed=self._seed).uniform(0., 2. * np.pi, len(self.freqs)) - values = self._compute_waveform(phases=self.phases) - quant_vals, quant_error = quantize_to_int(values, resolution=DAC_RESOLUTION, signed=True, - word_length=ADC_DAC_INTERFACE_WORD_LENGTH, - return_error=True) - cnt += 1 - if cnt > max_attempts: - raise Exception("Process reach maximum attempts: Could not find solution below max quantization error.") - - self.__values = values - self.quant_vals, self.quant_error = quant_vals, quant_error - - -def WaveformFactory(n_uniform_tones=None, output_waveform=None, frequencies=None, - n_samples=DAC_LUT_SIZE, sample_rate=DAC_SAMPLE_RATE, amplitudes=None, phases=None, - iq_ratios=None, phase_offsets=None, seed=2, dac_dynamic_range=1.0, compute=False): - if output_waveform is not None: - return TabulatedWaveform(tabulated_values=output_waveform, sample_rate=sample_rate) - if n_uniform_tones is not None: - if n_uniform_tones not in (512, 1024, 2048): - raise ValueError('Requested number of power sweep tones not supported. Allowed values are 512, 1024, 2048.') - frequencies = uniform_freqs(n_uniform_tones, bandwidth=SYSTEM_BANDWIDTH) - if frequencies is None: - return None - frequencies = np.asarray(frequencies) - return FreqlistWaveform(frequencies=frequencies, n_samples=n_samples, sample_rate=sample_rate, - amplitudes=amplitudes, phases=phases, iq_ratios=iq_ratios, phase_offsets=phase_offsets, - seed=seed, dac_dynamic_range=dac_dynamic_range, compute=compute) - - -if __name__ == '__main__': - """ - Test code for debugging waveform generation. - """ - from mkidgen3.util import pseudo_random_tones - from mkidgen3.server.feedline_config import * - import matplotlib.pyplot as plt - - tone = 400e6 - exclude = np.array([tone - 4e6, tone - 3e6, tone - 2e6, tone - 1e6, tone, tone + 1e6, tone + 2e6, tone + 3e6, tone + 4e6]) - random_tones = pseudo_random_tones(n=2048, buffer=300e3, spread=True, exclude=exclude) - wvfm_tones = np.append(np.array([tone]), random_tones) - wvfm_cfg = WaveformConfig( - waveform=WaveformFactory(frequencies=wvfm_tones, seed=5, dac_dynamic_range=1 / 150, compute=True)) - wvfm_fft = np.abs(np.fft.fftshift(np.fft.fft(wvfm_cfg.waveform.output_waveform))) - freqs = np.linspace(-2.048e9, 2.048e9 - 7.8125e3, 2 ** 19) - plt.plot(freqs, wvfm_fft) - plt.xlim(395e6, 405e6) - plt.show() - print('hi')