diff --git a/.gitignore b/.gitignore index f5463ef..ee019d5 100644 --- a/.gitignore +++ b/.gitignore @@ -9,5 +9,6 @@ dist *.swp *.log *.egg-info +*.cpython-*.so output/ replicas-wlce/ diff --git a/simt_emlite/cli/profile_download.py b/simt_emlite/cli/profile_download.py index 94e71ac..7ca86d7 100644 --- a/simt_emlite/cli/profile_download.py +++ b/simt_emlite/cli/profile_download.py @@ -25,12 +25,11 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple -from emop_frame_protocol.emop_profile_log_1_record import EmopProfileLog1Record -from emop_frame_protocol.emop_profile_log_2_record import EmopProfileLog2Record from rich.console import Console # mypy: disable-error-code="import-untyped" from simt_emlite.mediator.mediator_client_exception import MediatorClientException +from simt_emlite.profile_logs.download_cache import DownloadCache from simt_emlite.profile_logs.downloader_config import DownloaderConfig from simt_emlite.profile_logs.profile_downloader import ProfileDownloader from simt_emlite.profile_logs.replicas.replica_missing_file_utils import ( @@ -170,11 +169,20 @@ def update_progress(msg: str): ) return True, None - log_1_records: Dict[datetime.datetime, EmopProfileLog1Record] = ( - downloader.download_profile_log_1_day(progress_callback=update_progress) + assert downloader.serial is not None + cache = DownloadCache(output_dir, downloader.serial, date) + if cache.has_cached_data: + log_progress( + f"Resuming from cached progress for [bold]{downloader.serial}[/bold] on [cyan]{date}[/cyan]", + meter=downloader.serial, + date=str(date), + ) + + log_1_records = downloader.download_profile_log_1_day( + progress_callback=update_progress, cache=cache ) - log_2_records: Dict[datetime.datetime, EmopProfileLog2Record] = ( - downloader.download_profile_log_2_day(progress_callback=update_progress) + log_2_records = downloader.download_profile_log_2_day( + progress_callback=update_progress, cache=cache ) # Create start and end datetime for the day (timezone-aware) @@ -185,8 +193,6 @@ def update_progress(msg: str): tzinfo=datetime.timezone.utc ) - assert downloader.serial is not None - # Create SMIP readings from the downloaded profile logs readings_a, readings_b = create_smip_readings( serial=downloader.serial, @@ -228,6 +234,9 @@ def update_progress(msg: str): meter=downloader.serial, ) + # Download succeeded - clean up cache + cache.delete() + identifier = ( downloader.name if downloader and downloader.name is not None diff --git a/simt_emlite/mediator/grpc/client.py b/simt_emlite/mediator/grpc/client.py index f119c86..f568f9b 100644 --- a/simt_emlite/mediator/grpc/client.py +++ b/simt_emlite/mediator/grpc/client.py @@ -51,6 +51,7 @@ def __init__( ) self.mediator_address = mediator_address or "0.0.0.0:50051" + self._cached_channel: grpc.Channel | None = None global logger self.log = logger.bind(mediator_address=self.mediator_address) @@ -60,70 +61,99 @@ def __init__( have_certs=self.have_certs, ) + @property + def _channel(self) -> grpc.Channel: + if self._cached_channel is None: + if self.mediator_address is None: + raise ValueError("mediator_address cannot be none") + if self.have_certs: + credentials = self._channel_credentials() + self._cached_channel = grpc.secure_channel( + self.mediator_address, + credentials, + options=(("grpc.ssl_target_name_override", "cepro-mediators"),), + ) + else: + self._cached_channel = grpc.insecure_channel(self.mediator_address) + return self._cached_channel + + def close(self) -> None: + if self._cached_channel is not None: + self._cached_channel.close() + self._cached_channel = None + + def __enter__(self) -> "EmliteMediatorGrpcClient": + return self + + def __exit__(self, *args: Any) -> None: + self.close() + + def __del__(self) -> None: + self.close() + def read_element(self, serial: str, object_id: ObjectIdEnum | int) -> Any: obis = self._object_id_int(object_id) obis_name = ( object_id.name if isinstance(object_id, ObjectIdEnum) else hex(object_id) ) - with self._get_channel() as channel: - stub = EmliteMediatorServiceStub(channel) # type: ignore[no-untyped-call] - try: - self.log.debug( - f"send request - reading element [{obis_name}]", meter_id=serial - ) - rsp_obj = stub.readElement( - ReadElementRequest(serial=serial, objectId=obis), - timeout=TIMEOUT_SECONDS, + stub = EmliteMediatorServiceStub(self._channel) # type: ignore[no-untyped-call] + try: + self.log.debug( + f"send request - reading element [{obis_name}]", meter_id=serial + ) + rsp_obj = stub.readElement( + ReadElementRequest(serial=serial, objectId=obis), + timeout=TIMEOUT_SECONDS, + ) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: + self.log.warn( + "rpc timeout (deadline_exceeded)", + object_id=obis_name, + meter_id=serial, ) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: + elif e.code() == grpc.StatusCode.INTERNAL: + details = str(e.details() or "") + if "EOFError" in details: self.log.warn( - "rpc timeout (deadline_exceeded)", + "EOFError from meter", object_id=obis_name, meter_id=serial, ) - elif e.code() == grpc.StatusCode.INTERNAL: - details = str(e.details() or "") - if "EOFError" in details: - self.log.warn( - "EOFError from meter", - object_id=obis_name, - meter_id=serial, - ) - raise EmliteEOFError(f"object_id={obis_name}, meter={serial}") - elif "failed to connect after retries" in details: - self.log.warn(e.details(), meter_id=serial) - raise EmliteConnectionFailure( - f"object_id={obis_name}, meter={serial}" - ) - else: - raise e - elif e.code() == grpc.StatusCode.UNAVAILABLE: - # we get a lot of these for instantaneous_voltage for reasons unknown - # they can be tolerated as we still get a number of successful calls as well - log_level: str = ( - "warning" - if object_id == ObjectIdEnum.instantaneous_voltage - or object_id - == ObjectIdEnum.three_phase_instantaneous_voltage_l1 - else "error" - ) - getattr(self.log, log_level)( - "readElement failed", - details=e.details(), - code=e.code(), - object_id=obis_name, - meter_id=serial, + raise EmliteEOFError(f"object_id={obis_name}, meter={serial}") + elif "failed to connect after retries" in details: + self.log.warn(e.details(), meter_id=serial) + raise EmliteConnectionFailure( + f"object_id={obis_name}, meter={serial}" ) else: - self.log.error( - "readElement failed", - details=e.details(), - code=e.code(), - object_id=obis_name, - meter_id=serial, - ) - raise e + raise e + elif e.code() == grpc.StatusCode.UNAVAILABLE: + # we get a lot of these for instantaneous_voltage for reasons unknown + # they can be tolerated as we still get a number of successful calls as well + log_level: str = ( + "warning" + if object_id == ObjectIdEnum.instantaneous_voltage + or object_id + == ObjectIdEnum.three_phase_instantaneous_voltage_l1 + else "error" + ) + getattr(self.log, log_level)( + "readElement failed", + details=e.details(), + code=e.code(), + object_id=obis_name, + meter_id=serial, + ) + else: + self.log.error( + "readElement failed", + details=e.details(), + code=e.code(), + object_id=obis_name, + meter_id=serial, + ) + raise e payload_bytes = rsp_obj.response self.log.debug( @@ -145,74 +175,72 @@ def write_element( obis_name = ( object_id.name if isinstance(object_id, ObjectIdEnum) else hex(object_id) ) - with self._get_channel() as channel: - stub = EmliteMediatorServiceStub(channel) # type: ignore[no-untyped-call] - try: - self.log.debug( - f"send request - write element [{obis_name}]", meter_id=serial - ) - stub.writeElement( - WriteElementRequest(serial=serial, objectId=obis, payload=payload), - timeout=TIMEOUT_SECONDS, + stub = EmliteMediatorServiceStub(self._channel) # type: ignore[no-untyped-call] + try: + self.log.debug( + f"send request - write element [{obis_name}]", meter_id=serial + ) + stub.writeElement( + WriteElementRequest(serial=serial, objectId=obis, payload=payload), + timeout=TIMEOUT_SECONDS, + ) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: + self.log.warn( + "rpc timeout (deadline_exceeded)", + object_id=obis_name, + meter_id=serial, ) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: + elif e.code() == grpc.StatusCode.INTERNAL: + details = str(e.details() or "") + if "EOFError" in details: self.log.warn( - "rpc timeout (deadline_exceeded)", + "EOFError from meter", object_id=obis_name, meter_id=serial, ) - elif e.code() == grpc.StatusCode.INTERNAL: - details = str(e.details() or "") - if "EOFError" in details: - self.log.warn( - "EOFError from meter", - object_id=obis_name, - meter_id=serial, - ) - raise EmliteEOFError( - "object_id=" + obis_name + ", meter=" + serial - ) - elif "failed to connect after retries" in details: - self.log.warn(e.details(), meter_id=serial) - raise EmliteConnectionFailure( - "object_id=" + obis_name + ", meter=" + serial - ) - else: - raise e - else: - self.log.error( - "writeElement failed", - details=e.details(), - code=e.code(), - object_id=obis_name, - meter_id=serial, + raise EmliteEOFError( + "object_id=" + obis_name + ", meter=" + serial ) - raise e - - def send_message(self, serial: str, message: bytes) -> bytes: - with self._get_channel() as channel: - stub = EmliteMediatorServiceStub(channel) # type: ignore[no-untyped-call] - try: - self.log.debug("send request - message", meter_id=serial) - rsp_obj = stub.sendRawMessage( - SendRawMessageRequest(serial=serial, dataField=message), - timeout=TIMEOUT_SECONDS, - ) - except grpc.RpcError as e: - if ( - e.code() == grpc.StatusCode.INTERNAL - and "failed to connect after retries" in (e.details() or "") - ): + elif "failed to connect after retries" in details: self.log.warn(e.details(), meter_id=serial) - raise EmliteConnectionFailure(f"meter={serial}") + raise EmliteConnectionFailure( + "object_id=" + obis_name + ", meter=" + serial + ) + else: + raise e + else: self.log.error( - "sendRawMessage", + "writeElement failed", details=e.details(), code=e.code(), + object_id=obis_name, meter_id=serial, ) - raise e + raise e + + def send_message(self, serial: str, message: bytes) -> bytes: + stub = EmliteMediatorServiceStub(self._channel) # type: ignore[no-untyped-call] + try: + self.log.debug("send request - message", meter_id=serial) + rsp_obj = stub.sendRawMessage( + SendRawMessageRequest(serial=serial, dataField=message), + timeout=TIMEOUT_SECONDS, + ) + except grpc.RpcError as e: + if ( + e.code() == grpc.StatusCode.INTERNAL + and "failed to connect after retries" in (e.details() or "") + ): + self.log.warn(e.details(), meter_id=serial) + raise EmliteConnectionFailure(f"meter={serial}") + self.log.error( + "sendRawMessage", + details=e.details(), + code=e.code(), + meter_id=serial, + ) + raise e payload_bytes: bytes = rsp_obj.response self.log.debug( @@ -223,52 +251,33 @@ def send_message(self, serial: str, message: bytes) -> bytes: return payload_bytes def get_info(self, serial: str) -> str: - with self._get_channel() as channel: - stub = InfoServiceStub(channel) - try: - self.log.debug("send request - get_info", meter_id=serial) - rsp_obj = stub.GetInfo( - GetInfoRequest(serial=serial), timeout=TIMEOUT_SECONDS - ) - return rsp_obj.json_data - except grpc.RpcError as e: - # Tolerate NOT_FOUND by returning empty string or raising specific exception? - # User didn't specify, but `info_service.py` logs and aborts with NOT_FOUND. - # raising e allows caller to handle. - self.log.error( - "GetInfo failed", - details=e.details(), - code=e.code(), - meter_id=serial, - ) - raise e + stub = InfoServiceStub(self._channel) + try: + self.log.debug("send request - get_info", meter_id=serial) + rsp_obj = stub.GetInfo( + GetInfoRequest(serial=serial), timeout=TIMEOUT_SECONDS + ) + return rsp_obj.json_data + except grpc.RpcError as e: + self.log.error( + "GetInfo failed", + details=e.details(), + code=e.code(), + meter_id=serial, + ) + raise e def get_meters(self, esco: str | None = None) -> str: - with self._get_channel() as channel: - stub = InfoServiceStub(channel) - try: - self.log.debug("send request - get_meters", esco=esco) - rsp_obj = stub.GetMeters( - GetMetersRequest(esco=esco), timeout=TIMEOUT_SECONDS - ) - return rsp_obj.json_meters - except grpc.RpcError as e: - self.log.error("GetMeters failed", details=e.details(), code=e.code()) - raise e - - def _get_channel(self) -> grpc.Channel: - if self.mediator_address is None: - raise ValueError("mediator_address cannot be none") - - if self.have_certs: - credentials = self._channel_credentials() - return grpc.secure_channel( - self.mediator_address, - credentials, - options=(("grpc.ssl_target_name_override", "cepro-mediators"),), + stub = InfoServiceStub(self._channel) + try: + self.log.debug("send request - get_meters", esco=esco) + rsp_obj = stub.GetMeters( + GetMetersRequest(esco=esco), timeout=TIMEOUT_SECONDS ) - else: - return grpc.insecure_channel(self.mediator_address) + return rsp_obj.json_meters + except grpc.RpcError as e: + self.log.error("GetMeters failed", details=e.details(), code=e.code()) + raise e def _channel_credentials(self) -> grpc.ChannelCredentials: if not self.have_certs: diff --git a/simt_emlite/profile_logs/download_cache.py b/simt_emlite/profile_logs/download_cache.py new file mode 100644 index 0000000..1133cfd --- /dev/null +++ b/simt_emlite/profile_logs/download_cache.py @@ -0,0 +1,123 @@ +"""Cache for resumable profile log downloads. + +Saves completed chunks to a JSON file so that a failed download can +be resumed on the next run without re-downloading already-fetched chunks. +""" + +import datetime +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict + +from simt_emlite.util.logging import get_logger + +logger = get_logger(__name__, __file__) + + +@dataclass +class CachedLog1Record: + """Duck-type compatible with EmopProfileLog1Record for create_smip_readings().""" + + import_a: int + import_b: int + + +@dataclass +class CachedLog2Record: + """Duck-type compatible with EmopProfileLog2Record for create_smip_readings().""" + + active_export_a: int + active_export_b: int = 0 # 0 for single-element meters + + +class DownloadCache: + """Manages a per-serial per-date cache file for partial profile downloads. + + Cache file is a hidden JSON file in the output directory: + .{serial}-{YYYYMMDD}.download_cache.json + """ + + def __init__(self, output_dir: str, serial: str, date: datetime.date) -> None: + filename = f".{serial}-{date.strftime('%Y%m%d')}.download_cache.json" + self.cache_path = Path(output_dir) / filename + self._data: Dict[str, Any] = self._load() + + def _load(self) -> Dict[str, Any]: + if self.cache_path.exists(): + try: + with open(self.cache_path) as f: + data = json.load(f) + logger.info( + "Loaded download cache", + cache_file=str(self.cache_path), + log1_chunks=len(data.get("log1_chunks_done", [])), + log2_chunks=len(data.get("log2_chunks_done", [])), + ) + return data + except (json.JSONDecodeError, OSError) as e: + logger.warning( + "Failed to load cache file, starting fresh", + error=str(e), + ) + return { + "log1_chunks_done": [], + "log1_records": {}, + "log2_chunks_done": [], + "log2_records": {}, + } + + def _save(self) -> None: + with open(self.cache_path, "w") as f: + json.dump(self._data, f, indent=2) + + @property + def has_cached_data(self) -> bool: + return bool( + self._data.get("log1_chunks_done") or self._data.get("log2_chunks_done") + ) + + def has_log1_chunk(self, chunk_start_iso: str) -> bool: + return chunk_start_iso in self._data["log1_chunks_done"] + + def has_log2_chunk(self, chunk_start_iso: str) -> bool: + return chunk_start_iso in self._data["log2_chunks_done"] + + def save_log1_chunk( + self, chunk_start_iso: str, records: Dict[str, Dict[str, int]] + ) -> None: + self._data["log1_chunks_done"].append(chunk_start_iso) + self._data["log1_records"].update(records) + self._save() + + def save_log2_chunk( + self, chunk_start_iso: str, records: Dict[str, Dict[str, int]] + ) -> None: + self._data["log2_chunks_done"].append(chunk_start_iso) + self._data["log2_records"].update(records) + self._save() + + def get_log1_records(self) -> Dict[datetime.datetime, CachedLog1Record]: + result: Dict[datetime.datetime, CachedLog1Record] = {} + for ts_str, data in self._data["log1_records"].items(): + ts = datetime.datetime.fromisoformat(ts_str) + result[ts] = CachedLog1Record( + import_a=data["import_a"], + import_b=data["import_b"], + ) + return result + + def get_log2_records(self) -> Dict[datetime.datetime, CachedLog2Record]: + result: Dict[datetime.datetime, CachedLog2Record] = {} + for ts_str, data in self._data["log2_records"].items(): + ts = datetime.datetime.fromisoformat(ts_str) + result[ts] = CachedLog2Record( + active_export_a=data["active_export_a"], + active_export_b=data.get("active_export_b", 0), + ) + return result + + def delete(self) -> None: + if self.cache_path.exists(): + self.cache_path.unlink() + logger.info("Deleted download cache", cache_file=str(self.cache_path)) diff --git a/simt_emlite/profile_logs/profile_downloader.py b/simt_emlite/profile_logs/profile_downloader.py index a1beae1..661c56e 100644 --- a/simt_emlite/profile_logs/profile_downloader.py +++ b/simt_emlite/profile_logs/profile_downloader.py @@ -14,12 +14,13 @@ import json import logging from pathlib import Path -from typing import Callable, Dict, Optional, cast +from typing import Any, Callable, Dict, Optional, cast -from emop_frame_protocol.emop_profile_log_1_record import EmopProfileLog1Record -from emop_frame_protocol.emop_profile_log_2_record import EmopProfileLog2Record from simt_emlite.mediator.api_core import EmliteMediatorAPI +from simt_emlite.profile_logs.download_cache import ( + DownloadCache, +) # mypy: disable-error-code="import-untyped" from simt_emlite.smip.smip_file_finder import SMIPFileFinder @@ -182,11 +183,16 @@ def find_download_file(self) -> SMIPFileFinderResult: def download_profile_log_1_day( self, progress_callback: Optional[Callable[[str], None]] = None, - ) -> Dict[datetime.datetime, EmopProfileLog1Record]: + cache: Optional[DownloadCache] = None, + ) -> Dict[datetime.datetime, Any]: """Download profile log 1 data for a single day in chunks + Args: + progress_callback: Optional callback for progress updates + cache: Optional DownloadCache for resumable downloads + Returns: - Dict of timestamp to profile log 1 record + Dict of timestamp to profile log 1 record (EmopProfileLog1Record or CachedLog1Record) """ # Convert date to datetime for the day (ensure timezone-aware) @@ -206,10 +212,23 @@ def download_profile_log_1_day( # Download in 2-hour chunks (4 x 30-minute intervals per chunk) current_time = start_datetime chunk_size = datetime.timedelta(hours=2) - profile_records: Dict[datetime.datetime, EmopProfileLog1Record] = {} + profile_records: Dict[datetime.datetime, Any] = {} while current_time < end_datetime: chunk_end = min(current_time + chunk_size, end_datetime) + chunk_key = current_time.isoformat() + + # Check cache for this chunk + if cache and cache.has_log1_chunk(chunk_key): + msg = f"profile_log_1 chunk {current_time.strftime('%H:%M')} loaded from cache" + logger.info(msg, name=self.name, serial=self.serial) + if progress_callback: + progress_callback(msg) + for ts, record in cache.get_log1_records().items(): + if current_time <= ts < chunk_end: + profile_records[ts] = record + current_time = chunk_end + continue msg = f"Reading profile_log_1 chunk: {current_time.strftime('%H:%M')} to {chunk_end.strftime('%H:%M')}" logger.info( @@ -245,6 +264,17 @@ def download_profile_log_1_day( for record in response.records: profile_records[record.timestamp_datetime] = record + # Save chunk to cache + if cache: + chunk_records = { + r.timestamp_datetime.isoformat(): { + "import_a": r.import_a, + "import_b": r.import_b, + } + for r in response.records + } + cache.save_log1_chunk(chunk_key, chunk_records) + # Move to next chunk current_time = chunk_end @@ -255,15 +285,20 @@ def download_profile_log_1_day( def download_profile_log_2_day( self, progress_callback: Optional[Callable[[str], None]] = None, - ) -> Dict[datetime.datetime, EmopProfileLog2Record]: + cache: Optional[DownloadCache] = None, + ) -> Dict[datetime.datetime, Any]: """Download profile log 2 data for a single day in chunks. Profile log 2 returns different numbers of records depending on meter type: - Twin element meters (hardware C1.w): 2 records per call (2 x 30 min = 1 hour) - Single element meters: 3 records per call (3 x 30 min = 1.5 hours) + Args: + progress_callback: Optional callback for progress updates + cache: Optional DownloadCache for resumable downloads + Returns: - Dict of timestamp to profile log 2 record + Dict of timestamp to profile log 2 record (EmopProfileLog2Record or CachedLog2Record) """ # Convert date to datetime for the day (ensure timezone-aware) @@ -290,10 +325,23 @@ def download_profile_log_2_day( ) current_time = start_datetime - profile_records: Dict[datetime.datetime, EmopProfileLog2Record] = {} + profile_records: Dict[datetime.datetime, Any] = {} while current_time < end_datetime: chunk_end = min(current_time + chunk_size, end_datetime) + chunk_key = current_time.isoformat() + + # Check cache for this chunk + if cache and cache.has_log2_chunk(chunk_key): + msg = f"profile_log_2 chunk {current_time.strftime('%H:%M')} loaded from cache" + logger.info(msg) + if progress_callback: + progress_callback(msg) + for ts, record in cache.get_log2_records().items(): + if current_time <= ts < chunk_end: + profile_records[ts] = record + current_time = chunk_end + continue msg = f"Reading profile_log_2 chunk: {current_time.strftime('%H:%M')} to {chunk_end.strftime('%H:%M')}" logger.info(msg) @@ -322,6 +370,18 @@ def download_profile_log_2_day( for record in response.records: profile_records[record.timestamp_datetime] = record + # Save chunk to cache + if cache: + chunk_records = {} + for r in response.records: + record_data: Dict[str, int] = { + "active_export_a": r.active_export_a, + } + if self.is_twin_element: + record_data["active_export_b"] = r.active_export_b + chunk_records[r.timestamp_datetime.isoformat()] = record_data + cache.save_log2_chunk(chunk_key, chunk_records) + # Move to next chunk current_time = chunk_end