diff --git a/banzai/dbs.py b/banzai/dbs.py index 230d6f81..a27cb011 100755 --- a/banzai/dbs.py +++ b/banzai/dbs.py @@ -12,6 +12,7 @@ from dateutil.parser import parse import requests from sqlalchemy import create_engine, pool, func, make_url +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import sessionmaker from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Boolean, CHAR, JSON, UniqueConstraint, Float, Text from sqlalchemy.ext.declarative import declarative_base @@ -116,6 +117,25 @@ class ProcessedImage(Base): tries = Column(Integer, default=0) + +class StackFrame(Base): + __tablename__ = 'stack_frames' + id = Column(Integer, primary_key=True, autoincrement=True) + moluid = Column(String(100), nullable=False, index=True) + stack_num = Column(Integer, nullable=False) + frmtotal = Column(Integer, nullable=False) + camera = Column(String(50), nullable=False, index=True) + filepath = Column(String(255), nullable=True) + is_last = Column(Boolean, default=False) + status = Column(String(20), default='active', nullable=False) + dateobs = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=datetime.datetime.utcnow) + completed_at = Column(DateTime, nullable=True) + __table_args__ = ( + UniqueConstraint('moluid', 'stack_num', name='uq_stack_moluid_num'), + ) + + def parse_configdb(configdb_address): """ Parse the contents of the configdb. @@ -580,3 +600,56 @@ def replicate_instrument(instrument_record, db_address): add_or_update_record(db_session, Instrument, equivalence_criteria, record_attributes) db_session.commit() + + +def insert_stack_frame(db_address, moluid, stack_num, frmtotal, camera, filepath, is_last, dateobs): + """Insert a stack frame record into the database. Duplicate (moluid, stack_num) is a no-op.""" + try: + with get_session(db_address) as session: + session.add(StackFrame( + moluid=moluid, + stack_num=stack_num, + frmtotal=frmtotal, + camera=camera, + filepath=filepath, + is_last=is_last, + dateobs=dateobs, + )) + except IntegrityError: + pass + + +def get_stack_frames(db_address, moluid): + """Get all stack frame records for a given moluid.""" + with get_session(db_address) as session: + return session.query(StackFrame).filter( + StackFrame.moluid == moluid + ).all() + + +def mark_stack_complete(db_address, moluid, status='complete'): + """Mark all frames for a moluid as complete (or timeout).""" + now = datetime.datetime.utcnow() + with get_session(db_address) as session: + session.query(StackFrame).filter( + StackFrame.moluid == moluid + ).update({'status': status, 'completed_at': now}) + + +def update_stack_frame_filepath(db_address, moluid, stack_num, filepath): + """Set the reduced filepath on an existing stack frame record.""" + with get_session(db_address) as session: + session.query(StackFrame).filter( + StackFrame.moluid == moluid, + StackFrame.stack_num == stack_num, + ).update({'filepath': filepath}) + + +def cleanup_old_records(db_address, retention_days): + """Delete completed stack frame records older than retention_days.""" + cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=retention_days) + with get_session(db_address) as session: + session.query(StackFrame).filter( + StackFrame.status != 'active', + StackFrame.completed_at < cutoff, + ).delete() diff --git a/banzai/settings.py b/banzai/settings.py index 758d1d49..2cce2eed 100644 --- a/banzai/settings.py +++ b/banzai/settings.py @@ -163,3 +163,7 @@ LARGE_WORKER_QUEUE = os.getenv('CELERY_LARGE_TASK_QUEUE_NAME', 'celery_large') REFERENCE_CATALOG_URL = os.getenv('REFERENCE_CATALOG_URL', 'http://phot-catalog.lco.gtn/') + +SUBFRAME_TASK_QUEUE_NAME = os.getenv('SUBFRAME_TASK_QUEUE_NAME', 'subframe_tasks') +STACK_QUEUE_NAME = os.getenv('STACK_QUEUE_NAME', 'banzai_stack_queue') +REDIS_URL = os.getenv('REDIS_URL', 'redis://redis:6379/0') diff --git a/banzai/stacking.py b/banzai/stacking.py new file mode 100644 index 00000000..0064224a --- /dev/null +++ b/banzai/stacking.py @@ -0,0 +1,195 @@ +"""Smart stacking: worker, supervisor, and helper functions.""" +import datetime +import multiprocessing +import os +import signal +import time + +import redis as redis_lib + +from banzai import dbs +from banzai.logs import get_logger + +logger = get_logger() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +REQUIRED_MESSAGE_FIELDS = ('fits_file', 'last_frame', 'instrument_enqueue_timestamp') + + +def validate_message(body): + """Check that body contains fits_file, last_frame, instrument_enqueue_timestamp.""" + return all(field in body for field in REQUIRED_MESSAGE_FIELDS) + + +def check_stack_complete(frames, frmtotal): + """Return True if the stack is ready to finalize. + + A stack is complete when all received frames have been reduced and either + all expected frames are present or the instrument signalled is_last. + """ + all_reduced = all(f.filepath is not None for f in frames) + all_arrived = len(frames) == frmtotal + has_last = any(f.is_last for f in frames) + return all_reduced and (all_arrived or has_last) + + +# --------------------------------------------------------------------------- +# Notifications +# --------------------------------------------------------------------------- + +REDIS_KEY_PREFIX = 'stack:notify:' + + +def push_notification(redis_client, camera, moluid): + """Push a moluid notification onto the Redis list for a camera.""" + redis_client.lpush(f'{REDIS_KEY_PREFIX}{camera}', moluid) + + +def drain_notifications(redis_client, camera): + """Drain and return a deduplicated set of moluids from the Redis list for a camera.""" + key = f'{REDIS_KEY_PREFIX}{camera}' + drain_key = f'{key}:draining' + # Atomic rename so notifications pushed between read and delete aren't lost + try: + redis_client.rename(key, drain_key) + except redis_lib.exceptions.ResponseError: + return set() + raw = redis_client.lrange(drain_key, 0, -1) + redis_client.delete(drain_key) + return {item.decode() if isinstance(item, bytes) else item for item in raw} + + +# --------------------------------------------------------------------------- +# Worker +# --------------------------------------------------------------------------- + +def run_worker_loop(camera, db_address, redis_url, timeout_minutes=20, retention_days=30, poll_interval=5): + """Main loop: drain notifications, query DB, check completion, finalize.""" + redis_client = redis_lib.Redis.from_url(redis_url) + while True: + process_notifications(db_address, redis_client, camera) + check_timeout(db_address, camera, timeout_minutes) + dbs.cleanup_old_records(db_address, retention_days) + time.sleep(poll_interval) + + +def process_notifications(db_address, redis_client, camera): + """Drain, deduplicate, and process latest state for each moluid.""" + moluids = drain_notifications(redis_client, camera) + for moluid in moluids: + frames = dbs.get_stack_frames(db_address, moluid) + if not frames: + continue + frmtotal = frames[0].frmtotal + if check_stack_complete(frames, frmtotal): + finalize_stack(db_address, moluid, status='complete') + + +def finalize_stack(db_address, moluid, status='complete'): + """Mark stack complete and log mock stacking/JPEG/ingester operations.""" + dbs.mark_stack_complete(db_address, moluid, status=status) + logger.info(f'Mock stacking complete for {moluid}', extra_tags={'moluid': moluid}) + logger.info(f'Mock JPEG generation for {moluid}', extra_tags={'moluid': moluid}) + logger.info(f'Mock ingester upload for {moluid}', extra_tags={'moluid': moluid}) + + +def check_timeout(db_address, camera, timeout_minutes): + """Find stale active stacks and finalize them with status='timeout'.""" + cutoff = datetime.datetime.utcnow() - datetime.timedelta(minutes=timeout_minutes) + with dbs.get_session(db_address) as session: + stale_moluids = session.query(dbs.StackFrame.moluid).filter( + dbs.StackFrame.camera == camera, + dbs.StackFrame.status == 'active', + dbs.StackFrame.dateobs < cutoff, + ).distinct().all() + for (moluid,) in stale_moluids: + finalize_stack(db_address, moluid, status='timeout') + + +# --------------------------------------------------------------------------- +# Supervisor +# --------------------------------------------------------------------------- + +def discover_cameras(db_address, site_id): + """Query the Instrument table for cameras at a site.""" + with dbs.get_session(db_address) as session: + instruments = session.query(dbs.Instrument).filter( + dbs.Instrument.site == site_id + ).all() + return [inst.camera for inst in instruments] + + +class StackingSupervisor: + def __init__(self, site_id, db_address, redis_url, timeout_minutes=20, retention_days=30): + self.site_id = site_id + self.db_address = db_address + self.redis_url = redis_url + self.timeout_minutes = timeout_minutes + self.retention_days = retention_days + self.workers = {} + + def _worker_args(self, camera): + return (camera, self.db_address, self.redis_url, self.timeout_minutes, self.retention_days) + + def start(self): + """Discover cameras and spawn one worker process per camera.""" + cameras = discover_cameras(self.db_address, self.site_id) + for camera in cameras: + proc = multiprocessing.Process( + target=run_worker_loop, + args=self._worker_args(camera), + name=f'stacking-worker-{camera}', + ) + proc.start() + self.workers[camera] = proc + logger.info(f'Started stacking worker for camera {camera}') + + def monitor(self, check_interval=10): + """Check worker health and restart crashed workers.""" + while True: + for camera, proc in list(self.workers.items()): + if not proc.is_alive(): + logger.warning(f'Worker for {camera} died, restarting') + new_proc = multiprocessing.Process( + target=run_worker_loop, + args=self._worker_args(camera), + name=f'stacking-worker-{camera}', + ) + new_proc.start() + self.workers[camera] = new_proc + time.sleep(check_interval) + + def shutdown(self): + """Graceful shutdown of all workers.""" + for camera, proc in self.workers.items(): + proc.terminate() + proc.join(timeout=10) + logger.info(f'Stopped stacking worker for camera {camera}') + self.workers.clear() + + +def run_supervisor(): + """Entry point for the stacking supervisor.""" + site_id = os.environ['SITE_ID'] + db_address = os.environ['DB_ADDRESS'] + redis_url = os.environ.get('REDIS_URL', 'redis://redis:6379/0') + timeout_minutes = int(os.environ.get('STACK_TIMEOUT_MINUTES', '20')) + retention_days = int(os.environ.get('STACK_RETENTION_DAYS', '30')) + + supervisor = StackingSupervisor(site_id, db_address, redis_url, + timeout_minutes=timeout_minutes, + retention_days=retention_days) + + def handle_signal(signum, frame): + supervisor.shutdown() + raise SystemExit(0) + + signal.signal(signal.SIGTERM, handle_signal) + signal.signal(signal.SIGINT, handle_signal) + + supervisor.start() + supervisor.monitor() diff --git a/banzai/tests/site_e2e/test_site_e2e.py b/banzai/tests/site_e2e/test_site_e2e.py index fd570910..c64c92e8 100644 --- a/banzai/tests/site_e2e/test_site_e2e.py +++ b/banzai/tests/site_e2e/test_site_e2e.py @@ -7,6 +7,7 @@ import pytest import requests from sqlalchemy import create_engine, text +from astropy.io import fits from banzai import dbs from banzai.tests.site_e2e.utils import populate_publication @@ -162,7 +163,6 @@ def test_06_queue_raw_frame(self, site_deployment, auth_token): @pytest.mark.e2e_site_reduction def test_07_reduction_completes(self, site_deployment): """Verify reduction completed by checking for processed output file.""" - from astropy.io import fits raw_dir = DATA_DIR / 'raw' assert raw_dir.exists(), f"Raw directory not found: {raw_dir}" diff --git a/banzai/tests/test_smart_stacking.py b/banzai/tests/test_smart_stacking.py new file mode 100644 index 00000000..c9dd7a8f --- /dev/null +++ b/banzai/tests/test_smart_stacking.py @@ -0,0 +1,296 @@ +"""Unit tests for the smart stacking feature.""" +import datetime +from unittest.mock import MagicMock, patch + +import pytest + +from sqlalchemy import text + +from banzai import dbs +from banzai.dbs import insert_stack_frame, get_stack_frames, mark_stack_complete, cleanup_old_records, update_stack_frame_filepath +from banzai.stacking import (validate_message, check_stack_complete, + push_notification, drain_notifications, REDIS_KEY_PREFIX, + process_notifications, finalize_stack, check_timeout, + discover_cameras, StackingSupervisor) + +pytestmark = pytest.mark.smart_stacking + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def db_address(tmp_path): + """Create a fresh SQLite DB per test with a site and two instruments.""" + addr = f'sqlite:///{tmp_path}/test.db' + dbs.create_db(addr) + with dbs.get_session(addr) as session: + session.add(dbs.Site(id='tst', timezone=0, latitude=0, longitude=0, elevation=0)) + session.add(dbs.Instrument(site='tst', camera='cam1', name='cam1', type='1m0-SciCam-Sinistro', nx=4096, ny=4096)) + session.add(dbs.Instrument(site='tst', camera='cam2', name='cam2', type='1m0-SciCam-Sinistro', nx=4096, ny=4096)) + return addr + + +@pytest.fixture +def mock_redis(): + """Return a MagicMock standing in for a Redis client.""" + r = MagicMock() + r.lpush = MagicMock() + r.lrange = MagicMock(return_value=[]) + r.delete = MagicMock() + return r + + +# --------------------------------------------------------------------------- +# Database operations +# --------------------------------------------------------------------------- + +@pytest.mark.smart_stacking +class TestDBOperations: + + def test_insert_and_query(self, db_address): + dateobs = datetime.datetime(2024, 6, 15, 12, 0, 0) + insert_stack_frame( + db_address, moluid='mol-001', stack_num=1, frmtotal=5, + camera='cam1', filepath='/data/frame1.fits', is_last=False, dateobs=dateobs, + ) + frames = get_stack_frames(db_address, moluid='mol-001') + assert len(frames) == 1 + frame = frames[0] + assert frame.moluid == 'mol-001' + assert frame.stack_num == 1 + assert frame.frmtotal == 5 + assert frame.camera == 'cam1' + assert frame.filepath == '/data/frame1.fits' + assert frame.is_last is False + assert frame.dateobs == dateobs + + def test_duplicate_is_noop(self, db_address): + dateobs = datetime.datetime(2024, 6, 15, 12, 0, 0) + insert_stack_frame( + db_address, moluid='mol-dup', stack_num=1, frmtotal=3, + camera='cam1', filepath='/data/dup1.fits', is_last=False, dateobs=dateobs, + ) + insert_stack_frame( + db_address, moluid='mol-dup', stack_num=1, frmtotal=3, + camera='cam1', filepath='/data/dup2.fits', is_last=False, dateobs=dateobs, + ) + frames = get_stack_frames(db_address, 'mol-dup') + assert len(frames) == 1 + assert frames[0].filepath == '/data/dup1.fits' + + def test_update_stack_frame_filepath(self, db_address): + dateobs = datetime.datetime(2024, 6, 15, 12, 0, 0) + insert_stack_frame( + db_address, moluid='mol-upd', stack_num=1, frmtotal=3, + camera='cam1', filepath=None, is_last=False, dateobs=dateobs, + ) + frames = get_stack_frames(db_address, 'mol-upd') + assert frames[0].filepath is None + + update_stack_frame_filepath(db_address, 'mol-upd', 1, '/data/reduced.fits') + frames = get_stack_frames(db_address, 'mol-upd') + assert frames[0].filepath == '/data/reduced.fits' + + +# --------------------------------------------------------------------------- +# Status transitions +# --------------------------------------------------------------------------- + +@pytest.mark.smart_stacking +class TestStatusTransitions: + + def test_status_active_to_complete(self, db_address): + dateobs = datetime.datetime(2024, 6, 15, 12, 0, 0) + for i in range(3): + insert_stack_frame( + db_address, moluid='mol-comp', stack_num=i + 1, frmtotal=3, + camera='cam1', filepath=f'/data/comp{i}.fits', is_last=(i == 2), dateobs=dateobs, + ) + mark_stack_complete(db_address, 'mol-comp', 'complete') + frames = get_stack_frames(db_address, 'mol-comp') + for f in frames: + assert f.status == 'complete' + assert f.completed_at is not None + + def test_status_active_to_timeout(self, db_address): + dateobs = datetime.datetime(2024, 6, 15, 12, 0, 0) + for i in range(2): + insert_stack_frame( + db_address, moluid='mol-to', stack_num=i + 1, frmtotal=5, + camera='cam1', filepath=f'/data/to{i}.fits', is_last=False, dateobs=dateobs, + ) + mark_stack_complete(db_address, 'mol-to', 'timeout') + frames = get_stack_frames(db_address, 'mol-to') + for f in frames: + assert f.status == 'timeout' + assert f.completed_at is not None + + +# --------------------------------------------------------------------------- +# Timeout +# --------------------------------------------------------------------------- + +@pytest.mark.smart_stacking +class TestTimeout: + + def test_timeout_finalizes_stale_stacks(self, db_address): + old_dateobs = datetime.datetime.utcnow() - datetime.timedelta(hours=2) + for i in range(3): + insert_stack_frame( + db_address, moluid='mol-stale', stack_num=i + 1, frmtotal=5, + camera='cam1', filepath=f'/data/stale{i}.fits', is_last=False, dateobs=old_dateobs, + ) + check_timeout(db_address, 'cam1', timeout_minutes=60) + frames = get_stack_frames(db_address, 'mol-stale') + for f in frames: + assert f.status == 'timeout' + + +# --------------------------------------------------------------------------- +# Redis notifications +# --------------------------------------------------------------------------- + +@pytest.mark.smart_stacking +class TestRedisNotifications: + + def test_push_notification(self, mock_redis): + push_notification(mock_redis, 'cam1', 'mol-abc') + mock_redis.lpush.assert_called_once_with(f'{REDIS_KEY_PREFIX}cam1', 'mol-abc') + + def test_drain_for_camera(self, mock_redis): + mock_redis.lrange.return_value = [b'mol-a', b'mol-a', b'mol-b'] + result = drain_notifications(mock_redis, 'cam1') + assert result == {'mol-a', 'mol-b'} + mock_redis.rename.assert_called_once_with( + f'{REDIS_KEY_PREFIX}cam1', f'{REDIS_KEY_PREFIX}cam1:draining') + mock_redis.lrange.assert_called_once_with(f'{REDIS_KEY_PREFIX}cam1:draining', 0, -1) + mock_redis.delete.assert_called_once_with(f'{REDIS_KEY_PREFIX}cam1:draining') + + +# --------------------------------------------------------------------------- +# Multiple concurrent stacks +# --------------------------------------------------------------------------- + +@pytest.mark.smart_stacking +class TestConcurrentStacks: + + def test_concurrent_stacks_same_camera(self, db_address): + dateobs = datetime.datetime(2024, 6, 15, 12, 0, 0) + for i in range(3): + insert_stack_frame( + db_address, moluid='mol-A', stack_num=i + 1, frmtotal=3, + camera='cam1', filepath=f'/data/a{i}.fits', is_last=(i == 2), dateobs=dateobs, + ) + for i in range(2): + insert_stack_frame( + db_address, moluid='mol-B', stack_num=i + 1, frmtotal=5, + camera='cam1', filepath=f'/data/b{i}.fits', is_last=False, dateobs=dateobs, + ) + + frames_a = get_stack_frames(db_address, 'mol-A') + frames_b = get_stack_frames(db_address, 'mol-B') + assert len(frames_a) == 3 + assert len(frames_b) == 2 + assert check_stack_complete(frames_a, frmtotal=3) is True + assert check_stack_complete(frames_b, frmtotal=5) is False + + +# --------------------------------------------------------------------------- +# check_stack_complete +# --------------------------------------------------------------------------- + +@pytest.mark.smart_stacking +class TestCheckStackComplete: + + @staticmethod + def _frame(filepath='/data/f.fits', is_last=False): + f = MagicMock() + f.filepath = filepath + f.is_last = is_last + return f + + def test_all_frames_arrived_and_reduced(self): + frames = [self._frame() for _ in range(3)] + assert check_stack_complete(frames, frmtotal=3) is True + + def test_partial_without_is_last(self): + frames = [self._frame() for _ in range(3)] + assert check_stack_complete(frames, frmtotal=5) is False + + def test_partial_with_is_last(self): + frames = [self._frame() for _ in range(2)] + [self._frame(is_last=True)] + assert check_stack_complete(frames, frmtotal=5) is True + + def test_is_last_waits_for_unreduced_frames(self): + frames = [self._frame(), self._frame(filepath=None, is_last=True)] + assert check_stack_complete(frames, frmtotal=5) is False + + def test_empty_frames(self): + assert check_stack_complete([], frmtotal=5) is False + + +# --------------------------------------------------------------------------- +# Retention / cleanup +# --------------------------------------------------------------------------- + +@pytest.mark.smart_stacking +class TestRetention: + + def test_cleanup_old_records(self, db_address): + dateobs = datetime.datetime(2024, 6, 15, 12, 0, 0) + for i in range(3): + insert_stack_frame( + db_address, moluid='mol-old', stack_num=i + 1, frmtotal=3, + camera='cam1', filepath=f'/data/old{i}.fits', is_last=(i == 2), dateobs=dateobs, + ) + mark_stack_complete(db_address, 'mol-old', 'complete') + + with dbs.get_session(db_address) as session: + session.execute( + text("UPDATE stack_frames SET completed_at = :old_date WHERE moluid = :mol"), + {'old_date': datetime.datetime.utcnow() - datetime.timedelta(days=30), 'mol': 'mol-old'}, + ) + + cleanup_old_records(db_address, retention_days=7) + frames = get_stack_frames(db_address, 'mol-old') + assert len(frames) == 0 + + def test_cleanup_preserves_recent(self, db_address): + dateobs = datetime.datetime(2024, 6, 15, 12, 0, 0) + for i in range(3): + insert_stack_frame( + db_address, moluid='mol-recent', stack_num=i + 1, frmtotal=3, + camera='cam1', filepath=f'/data/recent{i}.fits', is_last=(i == 2), dateobs=dateobs, + ) + mark_stack_complete(db_address, 'mol-recent', 'complete') + cleanup_old_records(db_address, retention_days=7) + frames = get_stack_frames(db_address, 'mol-recent') + assert len(frames) == 3 + + +# --------------------------------------------------------------------------- +# Supervisor +# --------------------------------------------------------------------------- + +@pytest.mark.smart_stacking +class TestSupervisor: + + def test_discover_cameras(self, db_address): + cameras = discover_cameras(db_address, 'tst') + assert 'cam1' in cameras + assert 'cam2' in cameras + assert len(cameras) == 2 + + @patch('banzai.stacking.discover_cameras', return_value=['cam1', 'cam2', 'cam3']) + @patch('banzai.stacking.multiprocessing.Process') + def test_supervisor_spawns_per_camera(self, mock_process_cls, mock_discover): + supervisor = StackingSupervisor( + site_id='tst', + db_address='sqlite:///fake.db', + redis_url='redis://localhost:6379', + ) + supervisor.start() + assert mock_process_cls.call_count == 3 + assert mock_process_cls.return_value.start.call_count == 3 diff --git a/banzai/utils/stage_utils.py b/banzai/utils/stage_utils.py index ccd17315..74ea4e0a 100644 --- a/banzai/utils/stage_utils.py +++ b/banzai/utils/stage_utils.py @@ -63,3 +63,4 @@ def run_pipeline_stages(image_paths: list, runtime_context: Context, calibration for image in images: image.write(runtime_context) + return images diff --git a/pyproject.toml b/pyproject.toml index b9eba032..6555b781 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -144,6 +144,7 @@ zip-safe = false banzai_create_local_db = "banzai.main:create_local_db" banzai_download_worker = "banzai.cache.download_worker:run_download_worker_daemon" banzai_cache_init = "banzai.cache.init:run_initialization" + banzai_stacking_supervisor = "banzai.stacking:run_supervisor" [tool.coverage.run] source = ["banzai"] diff --git a/pytest.ini b/pytest.ini index 686a0197..eb8f72bf 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,6 +1,6 @@ [pytest] minversion = 3.5 -norecursedirs = build docs/_build .direnv site_e2e +norecursedirs = build docs/_build .direnv site_e2e smart_stacking_integration doctest_plus = enabled addopts = -p no:warnings log_cli = True @@ -63,3 +63,5 @@ markers = stacking stats thousands_qc + smart_stacking : Smart stacking unit tests + integration_smart_stacking : Smart stacking integration tests