diff --git a/.gitignore b/.gitignore index 36283a4..a72dce8 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,7 @@ result/ .cursor/ .claude/ + +# Temp iii config files generated by bbdev script mode +api/iii-config-*.yaml +api/data/ diff --git a/api/config.yaml b/api/config.yaml new file mode 100644 index 0000000..80926ff --- /dev/null +++ b/api/config.yaml @@ -0,0 +1,60 @@ +port: 49134 + +modules: + # ── Worker Module ────────────────────────────────────────────────────── + - class: modules::worker::WorkerModule + config: + port: 49134 + + # ── State Module ─────────────────────────────────────────────────────── + - class: modules::state::StateModule + config: + adapter: + class: modules::state::adapters::KvStore + config: + store_method: file_based + file_path: ./data/state_store.db + + # ── REST API Module ──────────────────────────────────────────────────── + - class: modules::api::RestApiModule + config: + port: 3111 + host: 0.0.0.0 + default_timeout: 86400000 + concurrency_request_limit: 1024 + cors: + allowed_origins: + - http://localhost:3000 + - http://localhost:5173 + allowed_methods: + - GET + - POST + - PUT + - DELETE + - OPTIONS + + # ── Queue Module ─────────────────────────────────────────────────────── + - class: modules::queue::QueueModule + config: + adapter: + class: modules::queue::BuiltinQueueAdapter + + # ── PubSub Module ────────────────────────────────────────────────────── + - class: modules::pubsub::PubSubModule + config: + adapter: + class: modules::pubsub::LocalAdapter + + # ── Observability Module ─────────────────────────────────────────────── + - class: modules::observability::OtelModule + config: + enabled: true + exporter: memory + + # ── Exec Module (Python) ─────────────────────────────────────────────── + - class: modules::shell::ExecModule + config: + watch: + - steps/**/*.py + exec: + - PYTHONUNBUFFERED=1 PYTHONPATH=. .venv/bin/motia dev --dir steps diff --git a/api/pyproject.toml b/api/pyproject.toml new file mode 100644 index 0000000..ea2c242 --- /dev/null +++ b/api/pyproject.toml @@ -0,0 +1,13 @@ +[project] +name = "bbdev" +version = "0.1.0" +description = "Agent-friendly developer toolchain backend powered by Motia" +requires-python = ">=3.10" +dependencies = [ + "motia[otel]==1.0.0rc17", + "iii-sdk==0.2.0", + "pydantic>=2.0", +] + +[project.optional-dependencies] +dev = ["pytest>=8.0.0"] diff --git a/api/steps/compiler/01_build_api.step.py b/api/steps/compiler/01_build_api.step.py new file mode 100644 index 0000000..63d1040 --- /dev/null +++ b/api/steps/compiler/01_build_api.step.py @@ -0,0 +1,15 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +config = { + "name": "build-compiler-api", + "description": "build compiler", + "flows": ["compiler"], + "triggers": [api("POST", "/compiler/build")], + "enqueues": ["compiler.build"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = request.body or {} + await ctx.enqueue({"topic": "compiler.build", "data": {**body, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/compiler/01_build_event.step.py b/api/steps/compiler/01_build_event.step.py new file mode 100644 index 0000000..a4969d6 --- /dev/null +++ b/api/steps/compiler/01_build_event.step.py @@ -0,0 +1,56 @@ +import os +import sys + +from motia import FlowContext, queue + +# Add the utils directory to the Python path +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "build-compiler", + "description": "build compiler", + "flows": ["compiler"], + "triggers": [queue("compiler.build")], + "enqueues": [], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + bbdir = get_buckyball_path() + script_dir = f"{bbdir}/workflow/steps/compiler/scripts" + yaml_dir = f"{script_dir}/yaml" + # ================================================================================== + # Execute operation + # ================================================================================== + command = f"mkdir -p {bbdir}/compiler/build" + result = stream_run_logger( + cmd=command, + logger=ctx.logger, + stdout_prefix="compiler build", + stderr_prefix="compiler build", + ) + command = f"cd {bbdir}/compiler/build && ninja -j{os.cpu_count()}" + result = stream_run_logger( + cmd=command, + logger=ctx.logger, + stdout_prefix="compiler build", + stderr_prefix="compiler build", + ) + + # ================================================================================== + # Return result to API + # ================================================================================== + success_result, failure_result = await check_result( + ctx, result.returncode, continue_run=False, trace_id=origin_tid) + + # ================================================================================== + # Continue routing + # ================================================================================== + return diff --git a/api/steps/firesim/01_buildbitstream_api.step.py b/api/steps/firesim/01_buildbitstream_api.step.py new file mode 100644 index 0000000..0b69bdc --- /dev/null +++ b/api/steps/firesim/01_buildbitstream_api.step.py @@ -0,0 +1,15 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +config = { + "name": "firesim-buildbitstream-api", + "description": "build bitstream", + "flows": ["firesim"], + "triggers": [api("POST", "/firesim/buildbitstream")], + "enqueues": ["firesim.buildbitstream"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = request.body or {} + await ctx.enqueue({"topic": "firesim.buildbitstream", "data": {**body, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/firesim/01_buildbitstream_event.step.py b/api/steps/firesim/01_buildbitstream_event.step.py new file mode 100644 index 0000000..4550ee5 --- /dev/null +++ b/api/steps/firesim/01_buildbitstream_event.step.py @@ -0,0 +1,54 @@ +import os +import sys + +from motia import FlowContext, queue + +# Add the utils directory to the Python path +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "firesim-buildbitstream", + "description": "build bitstream", + "flows": ["firesim"], + "triggers": [queue("firesim.buildbitstream")], + "enqueues": [], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + bbdir = get_buckyball_path() + script_dir = f"{bbdir}/workflow/steps/firesim/scripts" + yaml_dir = f"{script_dir}/yaml" + # ================================================================================== + # Execute operation + # ================================================================================== + command = f"firesim buildbitstream " + command += f" -a {yaml_dir}/config_hwdb.yaml" + command += f" -b {yaml_dir}/config_build.yaml" + command += f" -r {yaml_dir}/config_build_recipes.yaml" + command += f" -c {yaml_dir}/config_runtime.yaml" + result = stream_run_logger( + cmd=command, + logger=ctx.logger, + stdout_prefix="firesim buildbitstream", + stderr_prefix="firesim buildbitstream", + ) + + # ================================================================================== + # Return result to API + # ================================================================================== + success_result, failure_result = await check_result( + ctx, result.returncode, continue_run=False, trace_id=origin_tid) + + # ================================================================================== + # Continue routing + # ================================================================================== + + return diff --git a/api/steps/firesim/02_infrasetup_api.step.py b/api/steps/firesim/02_infrasetup_api.step.py new file mode 100644 index 0000000..5beca75 --- /dev/null +++ b/api/steps/firesim/02_infrasetup_api.step.py @@ -0,0 +1,16 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +config = { + "name": "firesim-infrasetup-api", + "description": "infrasetup", + "flows": ["firesim"], + "triggers": [api("POST", "/firesim/infrasetup")], + "enqueues": ["firesim.infrasetup"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = request.body or {} + data = {"jobs": body.get("jobs", 16)} + await ctx.enqueue({"topic": "firesim.infrasetup", "data": {**data, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/firesim/02_infrasetup_event.step.py b/api/steps/firesim/02_infrasetup_event.step.py new file mode 100644 index 0000000..2d1f316 --- /dev/null +++ b/api/steps/firesim/02_infrasetup_event.step.py @@ -0,0 +1,54 @@ +import os +import sys + +from motia import FlowContext, queue + +# Add the utils directory to the Python path +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "firesim-infrasetup", + "description": "infrasetup", + "flows": ["firesim"], + "triggers": [queue("firesim.infrasetup")], + "enqueues": [], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + bbdir = get_buckyball_path() + script_dir = f"{bbdir}/workflow/steps/firesim/scripts" + yaml_dir = f"{script_dir}/yaml" + # ================================================================================== + # Execute operation + # ================================================================================== + command = f"firesim infrasetup " + command += f" -a {yaml_dir}/config_hwdb.yaml" + command += f" -b {yaml_dir}/config_build.yaml" + command += f" -r {yaml_dir}/config_build_recipes.yaml" + command += f" -c {yaml_dir}/config_runtime.yaml" + result = stream_run_logger( + cmd=command, + logger=ctx.logger, + stdout_prefix="firesim infrasetup", + stderr_prefix="firesim infrasetup", + ) + + # ================================================================================== + # Return result to API + # ================================================================================== + success_result, failure_result = await check_result( + ctx, result.returncode, continue_run=False, trace_id=origin_tid) + + # ================================================================================== + # Continue routing + # ================================================================================== + + return diff --git a/api/steps/firesim/03_runworkload_api.step.py b/api/steps/firesim/03_runworkload_api.step.py new file mode 100644 index 0000000..de1db06 --- /dev/null +++ b/api/steps/firesim/03_runworkload_api.step.py @@ -0,0 +1,16 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +config = { + "name": "firesim-runworkload-api", + "description": "run workload", + "flows": ["firesim"], + "triggers": [api("POST", "/firesim/runworkload")], + "enqueues": ["firesim.runworkload"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = request.body or {} + data = {"jobs": body.get("jobs", 16)} + await ctx.enqueue({"topic": "firesim.runworkload", "data": {**data, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/firesim/03_runworkload_event.step.py b/api/steps/firesim/03_runworkload_event.step.py new file mode 100644 index 0000000..e9a57ae --- /dev/null +++ b/api/steps/firesim/03_runworkload_event.step.py @@ -0,0 +1,54 @@ +import os +import sys + +from motia import FlowContext, queue + +# Add the utils directory to the Python path +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "firesim-runworkload", + "description": "run workload", + "flows": ["firesim"], + "triggers": [queue("firesim.runworkload")], + "enqueues": [], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + bbdir = get_buckyball_path() + script_dir = f"{bbdir}/workflow/steps/firesim/scripts" + yaml_dir = f"{script_dir}/yaml" + # ================================================================================== + # Execute operation + # ================================================================================== + command = f"firesim runworkload " + command += f" -a {yaml_dir}/config_hwdb.yaml" + command += f" -b {yaml_dir}/config_build.yaml" + command += f" -r {yaml_dir}/config_build_recipes.yaml" + command += f" -c {yaml_dir}/config_runtime.yaml" + result = stream_run_logger( + cmd=command, + logger=ctx.logger, + stdout_prefix="firesim runworkload", + stderr_prefix="firesim runworkload", + ) + + # ================================================================================== + # Return result to API + # ================================================================================== + success_result, failure_result = await check_result( + ctx, result.returncode, continue_run=False, trace_id=origin_tid) + + # ================================================================================== + # Continue routing + # ================================================================================== + + return diff --git a/api/steps/marshal/01_build_api.step.py b/api/steps/marshal/01_build_api.step.py new file mode 100644 index 0000000..ba9049a --- /dev/null +++ b/api/steps/marshal/01_build_api.step.py @@ -0,0 +1,15 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +config = { + "name": "marshal-build-api", + "description": "build marshal", + "flows": ["marshal"], + "triggers": [api("POST", "/marshal/build")], + "enqueues": ["marshal.build"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = request.body or {} + await ctx.enqueue({"topic": "marshal.build", "data": {**body, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/marshal/01_build_event.step.py b/api/steps/marshal/01_build_event.step.py new file mode 100644 index 0000000..4698ce5 --- /dev/null +++ b/api/steps/marshal/01_build_event.step.py @@ -0,0 +1,70 @@ +import os +import sys + +from motia import FlowContext, queue + +# Add the utils directory to the Python path +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "marshal-build", + "description": "build marshal", + "flows": ["marshal"], + "triggers": [queue("marshal.build")], + "enqueues": ["marshal.complete", "marshal.error"], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + bbdir = get_buckyball_path() + script_dir = f"{bbdir}/workflow/steps/marshal/scripts" + # ================================================================================== + # Execute operation + # ================================================================================== + command = f"./marshal -v build interactive.json && ./marshal -v install interactive.json" + result = stream_run_logger( + cmd=command, + logger=ctx.logger, + cwd=script_dir, + stdout_prefix="marshal build", + stderr_prefix="marshal build", + ) + + # ================================================================================== + # Return result to API + # ================================================================================== + success_result, failure_result = await check_result( + ctx, result.returncode, continue_run=False, trace_id=origin_tid) + + # ================================================================================== + # Continue routing + # Routing to completion or error handling + # ================================================================================== + if result.returncode == 0: + await ctx.enqueue( + { + "topic": "marshal.complete", + "data": {**input_data, "task": "marshal", "result": success_result}, + } + ) + else: + await ctx.enqueue( + { + "topic": "marshal.error", + "data": { + **input_data, + "task": "marshal", + "result": failure_result, + "returncode": result.returncode, + }, + } + ) + + return diff --git a/api/steps/marshal/02_launch_api.step.py b/api/steps/marshal/02_launch_api.step.py new file mode 100644 index 0000000..4b52daf --- /dev/null +++ b/api/steps/marshal/02_launch_api.step.py @@ -0,0 +1,15 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +config = { + "name": "marshal-launch-api", + "description": "launch marshal", + "flows": ["marshal"], + "triggers": [api("POST", "/marshal/launch")], + "enqueues": ["marshal.launch"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = request.body or {} + await ctx.enqueue({"topic": "marshal.launch", "data": {**body, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/marshal/02_launch_event.step.py b/api/steps/marshal/02_launch_event.step.py new file mode 100644 index 0000000..edb1141 --- /dev/null +++ b/api/steps/marshal/02_launch_event.step.py @@ -0,0 +1,50 @@ +import os +import sys + +from motia import FlowContext, queue + +# Add the utils directory to the Python path +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "marshal-launch", + "description": "launch marshal", + "flows": ["marshal"], + "triggers": [queue("marshal.launch")], + "enqueues": [], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + bbdir = get_buckyball_path() + script_dir = f"{bbdir}/workflow/steps/marshal/scripts" + # ================================================================================== + # Execute operation + # ================================================================================== + command = f"./marshal -v launch interactive.json" + result = stream_run_logger( + cmd=command, + logger=ctx.logger, + cwd=script_dir, + stdout_prefix="marshal launch", + stderr_prefix="marshal launch", + ) + + # ================================================================================== + # Return result to API + # ================================================================================== + success_result, failure_result = await check_result( + ctx, result.returncode, continue_run=False, trace_id=origin_tid) + + # ================================================================================== + # Continue routing + # Finish workflow + # ================================================================================== + return diff --git a/api/steps/palladium/01_verilog_api.step.py b/api/steps/palladium/01_verilog_api.step.py new file mode 100644 index 0000000..76c2500 --- /dev/null +++ b/api/steps/palladium/01_verilog_api.step.py @@ -0,0 +1,36 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +from utils.path import get_buckyball_path + +config = { + "name": "palladium-verilog-api", + "description": "generate verilog code", + "flows": ["palladium"], + "triggers": [api("POST", "/palladium/verilog")], + "enqueues": ["palladium.verilog"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + bbdir = get_buckyball_path() + body = request.body or {} + + # Get config name, must be provided + config_name = body.get("config") + if not config_name or config_name == "None": + return ApiResponse( + status=400, + body={ + "status": "error", + "message": "Configuration name is required. Please specify --config_name parameter.", + "example": './bbdev palladium --verilog "--config_name sims.palladium.BuckyballToyP2EConfig"', + }, + ) + + data = { + "config": config_name, + "balltype": body.get("balltype"), + "output_dir": body.get("output_dir", f"{bbdir}/arch/build/"), + } + await ctx.enqueue({"topic": "palladium.verilog", "data": {**data, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/palladium/01_verilog_event_step.py b/api/steps/palladium/01_verilog_event.step.py similarity index 64% rename from api/steps/palladium/01_verilog_event_step.py rename to api/steps/palladium/01_verilog_event.step.py index 125b3d2..a8dfa4e 100644 --- a/api/steps/palladium/01_verilog_event_step.py +++ b/api/steps/palladium/01_verilog_event.step.py @@ -1,7 +1,8 @@ import os -import subprocess import sys +from motia import FlowContext, queue + # Add the utils directory to the Python path utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) if utils_path not in sys.path: @@ -9,48 +10,47 @@ from utils.path import get_buckyball_path from utils.stream_run import stream_run_logger -from utils.event_common import check_result +from utils.event_common import check_result, get_origin_trace_id config = { - "type": "event", - "name": "make verilog", + "name": "palladium-verilog", "description": "generate verilog code", - "subscribes": ["palladium.verilog"], - "emits": ["palladium.build"], "flows": ["palladium"], + "triggers": [queue("palladium.verilog")], + "enqueues": ["palladium.build"], } -async def handler(data, context): +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) bbdir = get_buckyball_path() # Use arch/build as the base directory for chipyard.Generator - base_build_dir = f"{data.get('output_dir', f'{bbdir}/arch/build')}/palladium" + base_build_dir = f"{input_data.get('output_dir', f'{bbdir}/arch/build')}/palladium" # Output directory for final Verilog files verilog_output_dir = f"{base_build_dir}/verilog" arch_dir = f"{bbdir}/arch" # Get config name, must be provided - config_name = data.get("config") + config_name = input_data.get("config") if not config_name or config_name == "None": - context.logger.error("Configuration name is required but not provided") + ctx.logger.error("Configuration name is required but not provided") success_result, failure_result = await check_result( - context, + ctx, 1, continue_run=False, extra_fields={ "task": "validation", "error": "Configuration name is required. Please specify --config_name parameter.", "example": './bbdev palladium --verilog "--config_name sims.palladium.BuckyballToyP2EConfig"', - }, - ) + }, trace_id=origin_tid) return failure_result - context.logger.info(f"Using configuration: {config_name}") + ctx.logger.info(f"Using configuration: {config_name}") # ================================================================================== # Step 1: Generate FIRRTL using chipyard.Generator # ================================================================================== - context.logger.info("Step 1: Generating FIRRTL with chipyard.Generator...") + ctx.logger.info("Step 1: Generating FIRRTL with chipyard.Generator...") os.system(f"mkdir -p {verilog_output_dir}") firrtl_command = ( f"cd {arch_dir} && " @@ -63,26 +63,25 @@ async def handler(data, context): result = stream_run_logger( cmd=firrtl_command, - logger=context.logger, + logger=ctx.logger, cwd=arch_dir, stdout_prefix="palladium firrtl", stderr_prefix="palladium firrtl", ) if result.returncode != 0: - context.logger.error(f"FIRRTL generation failed with code {result.returncode}") + ctx.logger.error(f"FIRRTL generation failed with code {result.returncode}") success_result, failure_result = await check_result( - context, + ctx, result.returncode, continue_run=False, - extra_fields={"task": "firrtl", "step": "generate"}, - ) + extra_fields={"task": "firrtl", "step": "generate"}, trace_id=origin_tid) return failure_result # ================================================================================== # Step 2: Convert FIRRTL to SystemVerilog using firtool # ================================================================================== - context.logger.info("Step 2: Converting FIRRTL to SystemVerilog with firtool...") + ctx.logger.info("Step 2: Converting FIRRTL to SystemVerilog with firtool...") # Extract the simple class name from the full config name # e.g., "sims.palladium.BuckyballToyP2EConfig" -> "BuckyballToyP2EConfig" @@ -91,18 +90,17 @@ async def handler(data, context): # Find the generated FIRRTL file (in base_build_dir, not verilog_output_dir) fir_file = f"{base_build_dir}/palladium.fpga.{config_class_name}.fir" if not os.path.exists(fir_file): - context.logger.error(f"FIRRTL file not found: {fir_file}") - context.logger.info(f"Looking for files in {base_build_dir}...") + ctx.logger.error(f"FIRRTL file not found: {fir_file}") + ctx.logger.info(f"Looking for files in {base_build_dir}...") # List files to help debug if os.path.exists(base_build_dir): files = os.listdir(base_build_dir) - context.logger.info(f"Files in build dir: {files}") + ctx.logger.info(f"Files in build dir: {files}") success_result, failure_result = await check_result( - context, + ctx, 1, continue_run=False, - extra_fields={"task": "firrtl", "step": "file_check"}, - ) + extra_fields={"task": "firrtl", "step": "file_check"}, trace_id=origin_tid) return failure_result verilog_command = ( @@ -118,57 +116,33 @@ async def handler(data, context): result = stream_run_logger( cmd=verilog_command, - logger=context.logger, + logger=ctx.logger, cwd=arch_dir, stdout_prefix="palladium verilog", stderr_prefix="palladium verilog", ) if result.returncode != 0: - context.logger.error(f"Verilog generation failed with code {result.returncode}") + ctx.logger.error(f"Verilog generation failed with code {result.returncode}") success_result, failure_result = await check_result( - context, + ctx, result.returncode, continue_run=False, - extra_fields={"task": "verilog", "step": "firtool"}, - ) + extra_fields={"task": "verilog", "step": "firtool"}, trace_id=origin_tid) return failure_result - # ================================================================================== - # Step 3: Verify and clean up - # ================================================================================== - # context.logger.info("Step 3: Verifying generated files...") - - # # Check if top-level Verilog was generated - # top_sv_dir = f"{verilog_output_dir}/VCU118FPGATestHarness.sv" - # top_sv_file = f"{top_sv_dir}/VCU118FPGATestHarness.sv" - - # if os.path.exists(top_sv_file): - # # Count generated files - # sv_files = [f for f in os.listdir(top_sv_dir) if f.endswith('.sv')] - # context.logger.info(f"Successfully generated {len(sv_files)} SystemVerilog files") - # context.logger.info(f"Top-level module: {top_sv_file}") - # else: - # context.logger.error(f"Top-level Verilog file not found: {top_sv_file}") - - # # Remove unwanted file - # topname_file = f"{arch_dir}/TestHarness.sv" - # if os.path.exists(topname_file): - # os.remove(topname_file) - # context.logger.info(f"Removed unwanted file: {topname_file}") - # ================================================================================== # Return result to API # ================================================================================== success_result, failure_result = await check_result( - context, + ctx, result.returncode, - continue_run=data.get("from_run_workflow", False), + continue_run=input_data.get("from_run_workflow", False), extra_fields={ "task": "verilog", "output_dir": verilog_output_dir, "top_module": "VCU118FPGATestHarness", - }, + }, trace_id=origin_tid, ) # ================================================================================== @@ -176,9 +150,9 @@ async def handler(data, context): # Routing to verilog or finish workflow # For run workflow, continue to verilog; for standalone clean, complete # ================================================================================== - if data.get("from_run_workflow"): - await context.emit( - {"topic": "palladium.build", "data": {**data, "task": "run"}} + if input_data.get("from_run_workflow"): + await ctx.enqueue( + {"topic": "palladium.build", "data": {**input_data, "task": "run"}} ) return diff --git a/api/steps/pegasus/01_verilog_api_step.py b/api/steps/pegasus/01_verilog_api.step.py similarity index 56% rename from api/steps/pegasus/01_verilog_api_step.py rename to api/steps/pegasus/01_verilog_api.step.py index d43ced3..5e75649 100644 --- a/api/steps/pegasus/01_verilog_api_step.py +++ b/api/steps/pegasus/01_verilog_api.step.py @@ -1,22 +1,20 @@ -import asyncio -from utils.event_common import wait_for_result -from utils.path import get_buckyball_path +from motia import ApiRequest, ApiResponse, FlowContext, api +from utils.path import get_buckyball_path +from utils.event_common import wait_for_result config = { - "type": "api", "name": "Pegasus Verilog", "description": "Generate SystemVerilog for Pegasus FPGA (PegasusHarness + ChipTop)", - "path": "/pegasus/verilog", - "method": "POST", - "emits": ["pegasus.verilog"], "flows": ["pegasus"], + "triggers": [api("POST", "/pegasus/verilog")], + "enqueues": ["pegasus.verilog"], } -async def handler(req, context): +async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse: bbdir = get_buckyball_path() - body = req.get("body") or {} + body = req.body or {} # Default config for Pegasus; allow override config_name = body.get("config", "sims.pegasus.PegasusConfig") @@ -26,10 +24,6 @@ async def handler(req, context): "output_dir": body.get("output_dir", f"{bbdir}/arch/build/pegasus/"), } - await context.emit({"topic": "pegasus.verilog", "data": data}) - - while True: - result = await wait_for_result(context) - if result is not None: - return result - await asyncio.sleep(1) + await ctx.enqueue({"topic": "pegasus.verilog", "data": {**data, "_trace_id": ctx.trace_id}}) + result = await wait_for_result(ctx) + return result diff --git a/api/steps/pegasus/01_verilog_event_step.py b/api/steps/pegasus/01_verilog_event.step.py similarity index 84% rename from api/steps/pegasus/01_verilog_event_step.py rename to api/steps/pegasus/01_verilog_event.step.py index 77ca869..82298fe 100644 --- a/api/steps/pegasus/01_verilog_event_step.py +++ b/api/steps/pegasus/01_verilog_event.step.py @@ -1,33 +1,34 @@ import os import sys +from motia import FlowContext, queue + utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) if utils_path not in sys.path: sys.path.insert(0, utils_path) from utils.path import get_buckyball_path from utils.stream_run import stream_run_logger -from utils.event_common import check_result - +from utils.event_common import check_result, get_origin_trace_id config = { - "type": "event", "name": "make pegasus verilog", "description": "Generate SystemVerilog from Chisel using ElaboratePegasus", - "subscribes": ["pegasus.verilog"], - "emits": [], "flows": ["pegasus"], + "triggers": [queue("pegasus.verilog")], + "enqueues": [], } -async def handler(data, context): +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) bbdir = get_buckyball_path() arch_dir = f"{bbdir}/arch" - build_dir = data.get("output_dir", f"{bbdir}/arch/build/pegasus/") + build_dir = input_data.get("output_dir", f"{bbdir}/arch/build/pegasus/") - config_name = data.get("config", "sims.pegasus.PegasusConfig") - context.logger.info(f"[pegasus] Elaborating config: {config_name}") - context.logger.info(f"[pegasus] Output directory: {build_dir}") + config_name = input_data.get("config", "sims.pegasus.PegasusConfig") + ctx.logger.info(f"[pegasus] Elaborating config: {config_name}") + ctx.logger.info(f"[pegasus] Output directory: {build_dir}") os.makedirs(build_dir, exist_ok=True) @@ -44,7 +45,7 @@ async def handler(data, context): result = stream_run_logger( cmd=command, - logger=context.logger, + logger=ctx.logger, cwd=arch_dir, stdout_prefix="pegasus verilog", stderr_prefix="pegasus verilog", @@ -103,10 +104,10 @@ def build_dpi_stub(src_path: str) -> str: wf.write(build_dpi_stub(src)) else: shutil.copy2(src, os.path.join(vivado_gen_dir, f)) - context.logger.info(f"[pegasus] Copied {len(sv_files)} files to {vivado_gen_dir}") + ctx.logger.info(f"[pegasus] Copied {len(sv_files)} files to {vivado_gen_dir}") success_result, failure_result = await check_result( - context, + ctx, result.returncode, continue_run=False, extra_fields={ @@ -115,6 +116,7 @@ def build_dpi_stub(src_path: str) -> str: "vivado_gen_dir": vivado_gen_dir, "top_module": "PegasusHarness", }, + trace_id=origin_tid, ) return diff --git a/api/steps/pegasus/02_buildbitstream_api.step.py b/api/steps/pegasus/02_buildbitstream_api.step.py new file mode 100644 index 0000000..bad553a --- /dev/null +++ b/api/steps/pegasus/02_buildbitstream_api.step.py @@ -0,0 +1,18 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +from utils.event_common import wait_for_result + +config = { + "name": "Pegasus Buildbitstream", + "description": "build pegasus bitstream", + "flows": ["pegasus"], + "triggers": [api("POST", "/pegasus/buildbitstream")], + "enqueues": ["pegasus.buildbitstream"], +} + + +async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = req.body or {} + await ctx.enqueue({"topic": "pegasus.buildbitstream", "data": {**body, "_trace_id": ctx.trace_id}}) + result = await wait_for_result(ctx) + return result diff --git a/api/steps/pegasus/02_buildbitstream_api_step.py b/api/steps/pegasus/02_buildbitstream_api_step.py deleted file mode 100644 index be05f47..0000000 --- a/api/steps/pegasus/02_buildbitstream_api_step.py +++ /dev/null @@ -1,23 +0,0 @@ -import asyncio -from utils.event_common import wait_for_result - -config = { - "type": "api", - "name": "Pegasus Buildbitstream", - "description": "build pegasus bitstream", - "path": "/pegasus/buildbitstream", - "method": "POST", - "emits": ["pegasus.buildbitstream"], - "flows": ["pegasus"], -} - - -async def handler(req, context): - body = req.get("body") or {} - await context.emit({"topic": "pegasus.buildbitstream", "data": body}) - - while True: - result = await wait_for_result(context) - if result is not None: - return result - await asyncio.sleep(1) diff --git a/api/steps/pegasus/02_buildbitstream_event.step.py b/api/steps/pegasus/02_buildbitstream_event.step.py new file mode 100644 index 0000000..2002510 --- /dev/null +++ b/api/steps/pegasus/02_buildbitstream_event.step.py @@ -0,0 +1,83 @@ +import os +import sys + +from motia import FlowContext, queue + +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "Pegasus Buildbitstream", + "description": "build pegasus bitstream", + "flows": ["pegasus"], + "triggers": [queue("pegasus.buildbitstream")], + "enqueues": [], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + bbdir = get_buckyball_path() + generated_dir = input_data.get("generated_dir", f"{bbdir}/pegasus/vivado/generated") + output_dir = input_data.get("output_dir", f"{bbdir}/pegasus/vivado/build") + top_module = input_data.get("top", "PegasusHarness") + + ctx.logger.info(f"[pegasus] Generated dir: {generated_dir}") + ctx.logger.info(f"[pegasus] Output dir: {output_dir}") + + os.makedirs(output_dir, exist_ok=True) + + if not os.path.isdir(generated_dir): + ctx.logger.error(f"[pegasus] generated dir not found: {generated_dir}") + success_result, failure_result = await check_result( + ctx, + 1, + continue_run=False, + extra_fields={"task": "buildbitstream", "error": "missing generated_dir"}, + trace_id=origin_tid, + ) + return failure_result + + has_rtl = any(name.endswith(".sv") or name.endswith(".v") for name in os.listdir(generated_dir)) + if not has_rtl: + ctx.logger.error(f"[pegasus] no verilog files found in: {generated_dir}") + success_result, failure_result = await check_result( + ctx, + 1, + continue_run=False, + extra_fields={"task": "buildbitstream", "error": "empty generated_dir"}, + trace_id=origin_tid, + ) + return failure_result + + bit_cmd = ( + f"bash {bbdir}/pegasus/vivado/build-bitstream.sh " + f"--source_dir {generated_dir} " + f"--output_dir {output_dir} " + f"--top {top_module}" + ) + result = stream_run_logger( + cmd=bit_cmd, + logger=ctx.logger, + cwd=bbdir, + stdout_prefix="pegasus bitstream", + stderr_prefix="pegasus bitstream", + ) + + success_result, failure_result = await check_result( + ctx, + result.returncode, + continue_run=False, + extra_fields={ + "task": "buildbitstream", + "output_dir": output_dir, + "bitstream": f"{output_dir}/{top_module}.bit", + }, + trace_id=origin_tid, + ) + return diff --git a/api/steps/pegasus/02_buildbitstream_event_step.py b/api/steps/pegasus/02_buildbitstream_event_step.py deleted file mode 100644 index cd11653..0000000 --- a/api/steps/pegasus/02_buildbitstream_event_step.py +++ /dev/null @@ -1,78 +0,0 @@ -import os -import sys - -utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) -if utils_path not in sys.path: - sys.path.insert(0, utils_path) - -from utils.path import get_buckyball_path -from utils.stream_run import stream_run_logger -from utils.event_common import check_result - -config = { - "type": "event", - "name": "Pegasus Buildbitstream", - "description": "build pegasus bitstream", - "subscribes": ["pegasus.buildbitstream"], - "emits": [], - "flows": ["pegasus"], -} - - -async def handler(data, context): - bbdir = get_buckyball_path() - generated_dir = data.get("generated_dir", f"{bbdir}/pegasus/vivado/generated") - output_dir = data.get("output_dir", f"{bbdir}/pegasus/vivado/build") - top_module = data.get("top", "PegasusHarness") - - context.logger.info(f"[pegasus] Generated dir: {generated_dir}") - context.logger.info(f"[pegasus] Output dir: {output_dir}") - - os.makedirs(output_dir, exist_ok=True) - - if not os.path.isdir(generated_dir): - context.logger.error(f"[pegasus] generated dir not found: {generated_dir}") - success_result, failure_result = await check_result( - context, - 1, - continue_run=False, - extra_fields={"task": "buildbitstream", "error": "missing generated_dir"}, - ) - return failure_result - - has_rtl = any(name.endswith(".sv") or name.endswith(".v") for name in os.listdir(generated_dir)) - if not has_rtl: - context.logger.error(f"[pegasus] no verilog files found in: {generated_dir}") - success_result, failure_result = await check_result( - context, - 1, - continue_run=False, - extra_fields={"task": "buildbitstream", "error": "empty generated_dir"}, - ) - return failure_result - - bit_cmd = ( - f"bash {bbdir}/pegasus/vivado/build-bitstream.sh " - f"--source_dir {generated_dir} " - f"--output_dir {output_dir} " - f"--top {top_module}" - ) - result = stream_run_logger( - cmd=bit_cmd, - logger=context.logger, - cwd=bbdir, - stdout_prefix="pegasus bitstream", - stderr_prefix="pegasus bitstream", - ) - - success_result, failure_result = await check_result( - context, - result.returncode, - continue_run=False, - extra_fields={ - "task": "buildbitstream", - "output_dir": output_dir, - "bitstream": f"{output_dir}/{top_module}.bit", - }, - ) - return diff --git a/api/steps/sardine/01_run_api.step.py b/api/steps/sardine/01_run_api.step.py new file mode 100644 index 0000000..fb24f90 --- /dev/null +++ b/api/steps/sardine/01_run_api.step.py @@ -0,0 +1,22 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +from utils.path import get_buckyball_path + +config = { + "name": "sardine-run-api", + "description": "running sardine", + "flows": ["sardine"], + "triggers": [api("POST", "/sardine/run")], + "enqueues": ["sardine.run"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + bbdir = get_buckyball_path() + + body = request.body or {} + + data = {"workload": body.get("workload", "")} + + await ctx.enqueue({"topic": "sardine.run", "data": {**data, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/sardine/01_run_api_step.py b/api/steps/sardine/01_run_api_step.py deleted file mode 100644 index 2c2fc1a..0000000 --- a/api/steps/sardine/01_run_api_step.py +++ /dev/null @@ -1,45 +0,0 @@ -import subprocess -import sys -import os -import asyncio -from utils.event_common import wait_for_result - -utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) -if utils_path not in sys.path: - sys.path.insert(0, utils_path) - -from utils.path import get_buckyball_path - -config = { - "type": "api", - "name": "running sardine", - "description": "running sardine", - "path": "/sardine/run", - "method": "POST", - "emits": ["sardine.run"], - "flows": ["sardine"], -} - - -async def handler(req, context): - bbdir = get_buckyball_path() - - body = req.get("body") or {} - - data = { - "workload": body.get("workload", ""), - "coverage": body.get("coverage", False), - } - - sardine_dir = f"{bbdir}/bb-tests/sardine" - - await context.emit({"topic": "sardine.run", "data": data}) - - # ================================================================================== - # Wait for execution result - # ================================================================================== - while True: - result = await wait_for_result(context) - if result is not None: - return result - await asyncio.sleep(1) diff --git a/api/steps/sardine/01_run_event_step.py b/api/steps/sardine/01_run_event.step.py similarity index 55% rename from api/steps/sardine/01_run_event_step.py rename to api/steps/sardine/01_run_event.step.py index 39b1a24..1b4e1b3 100644 --- a/api/steps/sardine/01_run_event_step.py +++ b/api/steps/sardine/01_run_event.step.py @@ -1,30 +1,29 @@ -from contextlib import redirect_stdout import os -import subprocess import sys import time +from motia import FlowContext, queue + # Add the utils directory to the Python path utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) if utils_path not in sys.path: sys.path.insert(0, utils_path) - from utils.path import get_buckyball_path from utils.stream_run import stream_run_logger -from utils.event_common import check_result +from utils.event_common import check_result, get_origin_trace_id config = { - "type": "event", - "name": "running sardine", + "name": "sardine-run", "description": "running sardine", - "subscribes": ["sardine.run"], - "emits": ["sardine.coverage_report"], "flows": ["sardine"], + "triggers": [queue("sardine.run")], + "enqueues": ["sardine.coverage_report"], } -async def handler(data, context): +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) bbdir = get_buckyball_path() sardine_dir = f"{bbdir}/bb-tests/sardine" @@ -32,15 +31,15 @@ async def handler(data, context): # Record start time so coverage report can filter to only this run's .dat files run_start_time = time.time() - command = f"python3 run_tests.py -m \"({data.get('workload', '')})\"" - if data.get("coverage", False): + command = f"python3 run_tests.py -m \"({input_data.get('workload', '')})\"" + if input_data.get("coverage", False): command += " --coverage" - context.logger.info( + ctx.logger.info( "Executing sardine command", {"command": command, "cwd": sardine_dir} ) result = stream_run_logger( cmd=command, - logger=context.logger, + logger=ctx.logger, cwd=sardine_dir, executable="bash", stdout_prefix="sardine run", @@ -50,20 +49,14 @@ async def handler(data, context): # ================================================================================== # Return execution result # ================================================================================== - coverage = data.get("coverage", False) - - if coverage: - # When coverage is enabled, always emit to coverage report step + if input_data.get("coverage", False): + # When coverage is enabled, always continue to coverage report step # (even if some tests failed, coverage data is still valid) - success_result, failure_result = await check_result( - context, result.returncode, continue_run=True - ) - await context.emit( - {"topic": "sardine.coverage_report", "data": {**data, "run_start_time": run_start_time}} + await check_result(ctx, result.returncode, continue_run=True, trace_id=origin_tid) + await ctx.enqueue( + {"topic": "sardine.coverage_report", "data": {**input_data, "run_start_time": run_start_time}} ) else: - success_result, failure_result = await check_result( - context, result.returncode, continue_run=False - ) + await check_result(ctx, result.returncode, continue_run=False, trace_id=origin_tid) return diff --git a/api/steps/sardine/02_coverage_report_event_step.py b/api/steps/sardine/02_coverage_report_event.step.py similarity index 82% rename from api/steps/sardine/02_coverage_report_event_step.py rename to api/steps/sardine/02_coverage_report_event.step.py index ed1ac7a..93f3be2 100644 --- a/api/steps/sardine/02_coverage_report_event_step.py +++ b/api/steps/sardine/02_coverage_report_event.step.py @@ -2,6 +2,8 @@ import glob import sys +from motia import FlowContext, queue + # Add the utils directory to the Python path utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) if utils_path not in sys.path: @@ -9,19 +11,19 @@ from utils.path import get_buckyball_path from utils.stream_run import stream_run_logger -from utils.event_common import check_result +from utils.event_common import check_result, get_origin_trace_id config = { - "type": "event", "name": "coverage report", "description": "merge coverage data and generate annotated source + lcov HTML report", - "subscribes": ["sardine.coverage_report"], - "emits": [], "flows": ["sardine"], + "triggers": [queue("sardine.coverage_report")], + "enqueues": [], } -async def handler(data, context): +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) bbdir = get_buckyball_path() log_dir = f"{bbdir}/arch/log" coverage_dir = f"{bbdir}/bb-tests/sardine/reports/coverage" @@ -31,19 +33,20 @@ async def handler(data, context): # Find coverage.dat files from this run only (created after run_start_time) # ================================================================================== all_dat_files = glob.glob(f"{log_dir}/*/coverage.dat") - run_start_time = data.get("run_start_time", 0) + run_start_time = input_data.get("run_start_time", 0) dat_files = [f for f in all_dat_files if os.path.getmtime(f) >= run_start_time] if not dat_files: - context.logger.error("No coverage .dat files found", {"dir": coverage_dir}) + ctx.logger.error("No coverage .dat files found", {"dir": coverage_dir}) success_result, failure_result = await check_result( - context, + ctx, returncode=1, continue_run=False, extra_fields={"task": "coverage_report", "error": "No .dat files found"}, + trace_id=origin_tid, ) return - context.logger.info(f"Found {len(dat_files)} coverage data files") + ctx.logger.info(f"Found {len(dat_files)} coverage data files") # ================================================================================== # Merge all .dat files @@ -57,17 +60,18 @@ async def handler(data, context): merge_cmd = f"verilator_coverage -write {merged_dat} {' '.join(dat_files)}" result = stream_run_logger( cmd=merge_cmd, - logger=context.logger, + logger=ctx.logger, cwd=bbdir, stdout_prefix="coverage merge", stderr_prefix="coverage merge", ) if result.returncode != 0: await check_result( - context, + ctx, returncode=result.returncode, continue_run=False, extra_fields={"task": "coverage_report", "error": "merge failed"}, + trace_id=origin_tid, ) return @@ -78,17 +82,18 @@ async def handler(data, context): annotate_cmd = f"verilator_coverage --annotate {annotate_dir} {merged_dat}" result = stream_run_logger( cmd=annotate_cmd, - logger=context.logger, + logger=ctx.logger, cwd=bbdir, stdout_prefix="coverage annotate", stderr_prefix="coverage annotate", ) if result.returncode != 0: await check_result( - context, + ctx, returncode=result.returncode, continue_run=False, extra_fields={"task": "coverage_report", "error": "annotate failed"}, + trace_id=origin_tid, ) return @@ -101,43 +106,45 @@ async def handler(data, context): lcov_cmd = f"verilator_coverage -write-info {lcov_info} {merged_dat}" result = stream_run_logger( cmd=lcov_cmd, - logger=context.logger, + logger=ctx.logger, cwd=bbdir, stdout_prefix="coverage lcov", stderr_prefix="coverage lcov", ) if result.returncode != 0: await check_result( - context, + ctx, returncode=result.returncode, continue_run=False, extra_fields={"task": "coverage_report", "error": "lcov export failed"}, + trace_id=origin_tid, ) return genhtml_cmd = f"genhtml {lcov_info} -o {html_dir}" result = stream_run_logger( cmd=genhtml_cmd, - logger=context.logger, + logger=ctx.logger, cwd=bbdir, stdout_prefix="coverage html", stderr_prefix="coverage html", ) if result.returncode != 0: await check_result( - context, + ctx, returncode=result.returncode, continue_run=False, extra_fields={"task": "coverage_report", "error": "genhtml failed"}, + trace_id=origin_tid, ) return # ================================================================================== # Return result # ================================================================================== - context.logger.info(f"Coverage report generated: {html_dir}/index.html") + ctx.logger.info(f"Coverage report generated: {html_dir}/index.html") success_result, failure_result = await check_result( - context, + ctx, returncode=0, continue_run=False, extra_fields={ @@ -147,6 +154,7 @@ async def handler(data, context): "html_dir": html_dir, "dat_count": len(dat_files), }, + trace_id=origin_tid, ) return diff --git a/api/steps/verilator/01_clean_api.step.py b/api/steps/verilator/01_clean_api.step.py new file mode 100644 index 0000000..ab1361d --- /dev/null +++ b/api/steps/verilator/01_clean_api.step.py @@ -0,0 +1,15 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +config = { + "name": "verilator-clean-api", + "description": "clean build directory", + "flows": ["verilator"], + "triggers": [api("POST", "/verilator/clean")], + "enqueues": ["verilator.clean"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = request.body or {} + await ctx.enqueue({"topic": "verilator.clean", "data": {**body, "task": "clean", "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/verilator/01_clean_event.step.py b/api/steps/verilator/01_clean_event.step.py new file mode 100644 index 0000000..4437df7 --- /dev/null +++ b/api/steps/verilator/01_clean_event.step.py @@ -0,0 +1,61 @@ +import os +import sys + +from motia import FlowContext, queue + +# Add the utils directory to the Python path +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "verilator-clean", + "description": "clean build directory", + "flows": ["verilator"], + "triggers": [ + queue("verilator.run"), + queue("verilator.clean"), + ], + "enqueues": ["verilator.verilog"], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + bbdir = get_buckyball_path() + build_dir = f"{bbdir}/arch/build" + # ================================================================================== + # Execute operation + # ================================================================================== + command = f"rm -rf {build_dir}" + result = stream_run_logger( + cmd=command, + logger=ctx.logger, + cwd=bbdir, + stdout_prefix="verilator clean", + stderr_prefix="verilator clean", + ) + + # ================================================================================== + # Return result to API + # ================================================================================== + success_result, failure_result = await check_result( + ctx, + result.returncode, + continue_run=input_data.get("from_run_workflow", False), + extra_fields={"task": "clean"}, trace_id=origin_tid, + ) + + # ================================================================================== + # Continue routing + # ================================================================================== + if input_data.get("from_run_workflow"): + await ctx.enqueue( + {"topic": "verilator.verilog", "data": {**input_data, "task": "run"}} + ) + + return diff --git a/api/steps/verilator/02_verilog_api.step.py b/api/steps/verilator/02_verilog_api.step.py new file mode 100644 index 0000000..d128818 --- /dev/null +++ b/api/steps/verilator/02_verilog_api.step.py @@ -0,0 +1,36 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +from utils.path import get_buckyball_path + +config = { + "name": "verilator-verilog-api", + "description": "generate verilog code", + "flows": ["verilator"], + "triggers": [api("POST", "/verilator/verilog")], + "enqueues": ["verilator.verilog"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + bbdir = get_buckyball_path() + body = request.body or {} + + # Get config name, must be provided + config_name = body.get("config") + if not config_name or config_name == "None": + return ApiResponse( + status=400, + body={ + "status": "error", + "message": "Configuration name is required. Please specify --config parameter.", + "example": 'bbdev verilator --verilog "--config sims.verilator.BuckyballToyVerilatorConfig"', + }, + ) + + data = { + "config": config_name, + "balltype": body.get("balltype"), + "output_dir": body.get("output_dir", f"{bbdir}/arch/build/"), + } + await ctx.enqueue({"topic": "verilator.verilog", "data": {**data, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/verilator/02_verilog_event.step.py b/api/steps/verilator/02_verilog_event.step.py new file mode 100644 index 0000000..a664ed6 --- /dev/null +++ b/api/steps/verilator/02_verilog_event.step.py @@ -0,0 +1,93 @@ +import os +import sys + +from motia import FlowContext, queue + +# Add the utils directory to the Python path +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "verilator-verilog", + "description": "generate verilog code", + "flows": ["verilator"], + "triggers": [queue("verilator.verilog")], + "enqueues": ["verilator.build"], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + bbdir = get_buckyball_path() + build_dir = input_data.get("output_dir", f"{bbdir}/arch/build/") + arch_dir = f"{bbdir}/arch" + + # Get config name, must be provided + config_name = input_data.get("config") + if not config_name or config_name == "None": + ctx.logger.error("Configuration name is required but not provided") + success_result, failure_result = await check_result( + ctx, + 1, + continue_run=False, + extra_fields={ + "task": "validation", + "error": "Configuration name is required. Please specify --config parameter.", + "example": 'bbdev verilator --verilog "--config sims.verilator.BuckyballToyVerilatorConfig"', + }, trace_id=origin_tid) + return failure_result + + ctx.logger.info(f"Using configuration: {config_name}") + + # ================================================================================== + # Execute operation + # ================================================================================== + if input_data.get("balltype"): + command = ( + f"mill -i __.test.runMain sims.verify.BallTopMain {input_data.get('balltype')} " + ) + else: + command = f"mill -i __.test.runMain sims.verilator.Elaborate {config_name} " + + command += "--disable-annotation-unknown -strip-debug-info -O=debug " + command += f"--split-verilog -o={build_dir}" + + result = stream_run_logger( + cmd=command, + logger=ctx.logger, + cwd=arch_dir, + stdout_prefix="verilator verilog", + stderr_prefix="verilator verilog", + ) + + # Remove unwanted file + topname_file = f"{arch_dir}/TestHarness.sv" + if os.path.exists(topname_file): + os.remove(topname_file) + + # ================================================================================== + # Return result to API + # ================================================================================== + success_result, failure_result = await check_result( + ctx, + result.returncode, + continue_run=input_data.get("from_run_workflow", False), + extra_fields={"task": "verilog"}, trace_id=origin_tid, + ) + + # ================================================================================== + # Continue routing + # Routing to verilog or finish workflow + # For run workflow, continue to verilog; for standalone clean, complete + # ================================================================================== + if input_data.get("from_run_workflow"): + await ctx.enqueue( + {"topic": "verilator.build", "data": {**input_data, "task": "run"}} + ) + + return diff --git a/api/steps/verilator/03_build_api.step.py b/api/steps/verilator/03_build_api.step.py new file mode 100644 index 0000000..7bb0169 --- /dev/null +++ b/api/steps/verilator/03_build_api.step.py @@ -0,0 +1,19 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +config = { + "name": "verilator-build-api", + "description": "build verilator executable", + "flows": ["verilator"], + "triggers": [api("POST", "/verilator/build")], + "enqueues": ["verilator.build"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = request.body or {} + data = { + "jobs": body.get("jobs", 16), + "cosim": body.get("cosim", False), + } + await ctx.enqueue({"topic": "verilator.build", "data": {**data, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/verilator/03_build_event.step.py b/api/steps/verilator/03_build_event.step.py new file mode 100644 index 0000000..cd3b407 --- /dev/null +++ b/api/steps/verilator/03_build_event.step.py @@ -0,0 +1,140 @@ +import os +import subprocess +import glob +import sys + +from motia import FlowContext, queue + +# Add the utils directory to the Python path +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "verilator-build", + "description": "build verilator executable", + "flows": ["verilator"], + "triggers": [queue("verilator.build")], + "enqueues": ["verilator.sim", "verilator.cosim"], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + bbdir = get_buckyball_path() + arch_dir = f"{bbdir}/arch" + build_dir = f"{arch_dir}/build" + waveform_dir = f"{arch_dir}/waveform" + log_dir = f"{arch_dir}/log" + cosim = input_data.get("cosim", False) + + # ================================================================================== + # Execute operation + # ================================================================================== + # Find sources + vsrcs = glob.glob(f"{build_dir}/**/*.v", recursive=True) + glob.glob( + f"{build_dir}/**/*.sv", recursive=True + ) + csrcs = ( + glob.glob(f"{arch_dir}/src/csrc/**/*.c", recursive=True) + + glob.glob(f"{arch_dir}/src/csrc/**/*.cc", recursive=True) + + glob.glob(f"{arch_dir}/src/csrc/**/*.cpp", recursive=True) + + glob.glob(f"{build_dir}/**/*.c", recursive=True) + + glob.glob(f"{build_dir}/**/*.cc", recursive=True) + + glob.glob(f"{build_dir}/**/*.cpp", recursive=True) + ) + + # Setup paths: fesvr from bebop/host/spike/riscv-isa-sim (install/include, install/lib) + bebop_isa_sim = f"{bbdir}/bebop/host/spike/riscv-isa-sim" + inc_paths = [ + os.environ.get("RISCV", "") + "/include" if os.environ.get("RISCV") else "", + f"{arch_dir}/thirdparty/chipyard/tools/DRAMSim2", + f"{bebop_isa_sim}/install/include", + build_dir, + f"{arch_dir}/src/csrc/include", + ] + inc_flags = " ".join([f"-I{p}" for p in inc_paths if p]) + + if cosim: + topname = "ToyBuckyball" + else: + topname = "TestHarness" + + cflags = f"{inc_flags} -DTOP_NAME='\"V{topname}\"' -std=c++17 " + if cosim: + cflags += " -DCOSIM" + ldflags = ( + f"-lreadline -ldramsim -lfesvr -lstdc++ " + f"-L{bebop_isa_sim}/install/lib " + f"-L{bbdir}/result/lib " + f"-L{arch_dir}/thirdparty/chipyard/tools/DRAMSim2 " + f"-L{arch_dir}/thirdparty/chipyard/toolchains/riscv-tools/riscv-isa-sim/build " + f"-L{arch_dir}/thirdparty/chipyard/toolchains/riscv-tools/riscv-isa-sim/build/lib" + ) + + obj_dir = f"{build_dir}/obj_dir" + subprocess.run(f"rm -rf {obj_dir}", shell=True) + os.makedirs(obj_dir, exist_ok=True) + + sources = " ".join(vsrcs + csrcs) + jobs = input_data.get("jobs", "") + + verilator_cmd = ( + f"verilator -MMD --build -cc --trace -O3 --x-assign fast --x-initial fast --noassert -Wno-fatal " + f"--trace-fst --trace-threads 1 --output-split 10000 --output-split-cfuncs 100 " + f"--unroll-count 256 " + f"-Wno-PINCONNECTEMPTY " + f"-Wno-ASSIGNDLY " + f"-Wno-DECLFILENAME " + f"-Wno-UNUSED " + f"-Wno-UNOPTFLAT " + f"-Wno-BLKANDNBLK " + f"-Wno-style " + f"-Wall " + f"--timing -j {jobs} +incdir+{build_dir} --top {topname} {sources} " + f"-CFLAGS '{cflags}' -LDFLAGS '{ldflags}' --Mdir {obj_dir} --exe" + ) + + result = stream_run_logger( + cmd=verilator_cmd, + logger=ctx.logger, + cwd=bbdir, + stdout_prefix="verilator build", + stderr_prefix="verilator build", + ) + result = stream_run_logger( + cmd=f"make -C {obj_dir} -f V{topname}.mk V{topname}", + logger=ctx.logger, + cwd=bbdir, + stdout_prefix="verilator build", + stderr_prefix="verilator build", + ) + + # ================================================================================== + # Return result to API + # ================================================================================== + success_result, failure_result = await check_result( + ctx, + result.returncode, + continue_run=input_data.get("from_run_workflow", False), + extra_fields={"task": "build"}, trace_id=origin_tid, + ) + + # ================================================================================== + # Continue routing + # ================================================================================== + if input_data.get("from_run_workflow"): + if cosim: + await ctx.enqueue( + {"topic": "verilator.cosim", "data": {**input_data, "task": "run"}} + ) + else: + await ctx.enqueue( + {"topic": "verilator.sim", "data": {**input_data, "task": "run"}} + ) + + return diff --git a/api/steps/verilator/04_cosim_api.step.py b/api/steps/verilator/04_cosim_api.step.py new file mode 100644 index 0000000..419336e --- /dev/null +++ b/api/steps/verilator/04_cosim_api.step.py @@ -0,0 +1,27 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +config = { + "name": "verilator-cosim-api", + "description": "run verilator cosimulation", + "flows": ["verilator"], + "triggers": [api("POST", "/verilator/cosim")], + "enqueues": ["verilator.cosim"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = request.body or {} + binary = body.get("binary", "") + if not binary: + return ApiResponse( + status=400, + body={ + "success": False, + "failure": True, + "returncode": 400, + "message": "binary parameter is required", + }, + ) + + await ctx.enqueue({"topic": "verilator.cosim", "data": {**body, "task": "cosim", "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/verilator/04_cosim_event.step.py b/api/steps/verilator/04_cosim_event.step.py new file mode 100644 index 0000000..d84f63d --- /dev/null +++ b/api/steps/verilator/04_cosim_event.step.py @@ -0,0 +1,128 @@ +import os +import subprocess +import sys +from datetime import datetime + +from motia import FlowContext, queue + +# Add the utils directory to the Python path +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.search_workload import search_workload +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "verilator-cosim", + "description": "run cosimulation", + "flows": ["verilator"], + "triggers": [queue("verilator.cosim")], + "enqueues": [], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + # ================================================================================== + # Get simulation parameters + # ================================================================================== + bbdir = get_buckyball_path() + arch_dir = f"{bbdir}/arch" + build_dir = f"{arch_dir}/build" + + # Generate timestamp + timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M") + + binary_name = input_data.get("binary", "") + success_result, failure_result = await check_result( + ctx, returncode=(binary_name == None), continue_run=True, trace_id=origin_tid, + ) + + binary_path = search_workload(f"{bbdir}/bb-tests/output/workloads/src", binary_name) + success_result, failure_result = await check_result( + ctx, returncode=(binary_path == None), continue_run=True, trace_id=origin_tid, + ) + if failure_result: + ctx.logger.error("binary not found", failure_result) + return + + # Create log and waveform directory + log_dir = f"{arch_dir}/log/{timestamp}-{binary_name}" + waveform_dir = f"{arch_dir}/waveform/{timestamp}-{binary_name}" + topname = "ToyBuckyball" + + os.makedirs(log_dir, exist_ok=True) + os.makedirs(waveform_dir, exist_ok=True) + + bin_path = f"{build_dir}/obj_dir/V{topname}" + batch = input_data.get("batch", False) + + # Create log and waveform file + log_path = f"{log_dir}/bdb.log" + fst_path = f"{waveform_dir}/waveform.fst" + # Remove old waveform file + subprocess.run(f"rm -f {waveform_dir}/waveform.vcd", shell=True, check=True) + + # ================================================================================== + # Execute simulation script with streaming output + # ================================================================================== + bebop_isa_sim = f"{bbdir}/bebop/host/spike/riscv-isa-sim" + ld_lib_path = ( + f"{bebop_isa_sim}/install/lib:" + f"{bbdir}/result/lib:" + f"{arch_dir}/thirdparty/chipyard/tools/DRAMSim2" + ) + sim_cmd = ( + f"export LD_LIBRARY_PATH=\"{ld_lib_path}:$LD_LIBRARY_PATH\"; " + f"{bin_path} +permissive +loadmem={binary_path} +loadmem_addr=800000000 " + f"{'+batch ' if batch else ''} " + f"+fst={fst_path} +log={log_path} +permissive-off " + f"{binary_path} > >(tee {log_dir}/stdout.log) 2> >(spike-dasm > {log_dir}/disasm.log)" + ) + script_dir = os.path.dirname(__file__) + + result = stream_run_logger( + cmd=sim_cmd, + logger=ctx.logger, + cwd=script_dir, + stdout_prefix="verilator sim", + stderr_prefix="verilator sim", + executable="bash", + ) + success_result, failure_result = await check_result( + ctx, returncode=result.returncode, continue_run=True, trace_id=origin_tid) + if failure_result: + ctx.logger.error("sim failed", failure_result) + return + + if os.path.exists(f"{waveform_dir}/waveform.fst.heir"): + subprocess.run( + f"gtkwave -f {waveform_dir}/waveform.fst -H {waveform_dir}/waveform.fst.heir", + shell=True, + check=True, + ) + + # ================================================================================== + # Return simulation result + # ================================================================================== + # This is the end point of the run workflow, status will no longer be set to processing + success_result, failure_result = await check_result( + ctx, + result.returncode, + continue_run=False, + extra_fields={ + "task": "sim", + "binary": binary_path, + "log_dir": log_dir, + "waveform_dir": waveform_dir, + "timestamp": timestamp, + }, trace_id=origin_tid) + + # ================================================================================== + # Finish workflow + # ================================================================================== + + return diff --git a/api/steps/verilator/04_sim_api.step.py b/api/steps/verilator/04_sim_api.step.py new file mode 100644 index 0000000..07ad7c5 --- /dev/null +++ b/api/steps/verilator/04_sim_api.step.py @@ -0,0 +1,27 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +config = { + "name": "verilator-sim-api", + "description": "run verilator simulation", + "flows": ["verilator"], + "triggers": [api("POST", "/verilator/sim")], + "enqueues": ["verilator.sim"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = request.body or {} + binary = body.get("binary", "") + if not binary: + return ApiResponse( + status=400, + body={ + "success": False, + "failure": True, + "returncode": 400, + "message": "binary parameter is required", + }, + ) + + await ctx.enqueue({"topic": "verilator.sim", "data": {**body, "task": "sim", "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/verilator/04_sim_event_step.py b/api/steps/verilator/04_sim_event.step.py similarity index 50% rename from api/steps/verilator/04_sim_event_step.py rename to api/steps/verilator/04_sim_event.step.py index ee9c14c..a6cf81f 100644 --- a/api/steps/verilator/04_sim_event_step.py +++ b/api/steps/verilator/04_sim_event.step.py @@ -3,6 +3,8 @@ import sys from datetime import datetime +from motia import FlowContext, queue + # Add the utils directory to the Python path utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) if utils_path not in sys.path: @@ -11,19 +13,19 @@ from utils.path import get_buckyball_path from utils.stream_run import stream_run_logger from utils.search_workload import search_workload -from utils.event_common import check_result +from utils.event_common import check_result, get_origin_trace_id config = { - "type": "event", - "name": "make sim", + "name": "verilator-sim", "description": "run simulation", - "subscribes": ["verilator.sim"], - "emits": [], "flows": ["verilator"], + "triggers": [queue("verilator.sim")], + "enqueues": [], } -async def handler(data, context): +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) # ================================================================================== # Get simulation parameters # ================================================================================== @@ -31,102 +33,97 @@ async def handler(data, context): arch_dir = f"{bbdir}/arch" build_dir = f"{arch_dir}/build" + # Generate timestamp timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M") - binary_name = data.get("binary", "") - coverage = data.get("coverage", False) + binary_name = input_data.get("binary", "") success_result, failure_result = await check_result( - context, returncode=(binary_name is None), continue_run=True + ctx, returncode=(binary_name == None), continue_run=True, trace_id=origin_tid, ) binary_path = search_workload(f"{bbdir}/bb-tests/output/workloads/src", binary_name) - context.logger.info(f"binary_path: {binary_path}") + ctx.logger.info(f"binary_path: {binary_path}") success_result, failure_result = await check_result( - context, returncode=(binary_path is None), continue_run=True + ctx, returncode=(binary_path == None), continue_run=True, trace_id=origin_tid, ) if failure_result: - context.logger.error("binary not found", failure_result) + ctx.logger.error("binary not found", failure_result) return - topname = "BBSimHarness" + # Create log and waveform directory log_dir = f"{arch_dir}/log/{timestamp}-{binary_name}" waveform_dir = f"{arch_dir}/waveform/{timestamp}-{binary_name}" + topname = "TestHarness" os.makedirs(log_dir, exist_ok=True) os.makedirs(waveform_dir, exist_ok=True) - coverage_flag = "" - if coverage: - coverage_dat_path = f"{log_dir}/coverage.dat" - coverage_flag = f"+verilator+coverage+file+{coverage_dat_path}" - bin_path = f"{build_dir}/obj_dir/V{topname}" - batch = data.get("batch", False) + batch = input_data.get("batch", False) - log_path = f"{log_dir}/bdb.ndjson" - stdout_path = f"{log_dir}/stdout.log" - meta_path = f"{log_dir}/sim_meta.txt" - fst_path = f"{waveform_dir}/waveform.fst" + # Create log and waveform file + log_path = f"{log_dir}/bdb.log" + fst_path = f"{waveform_dir}/waveform.fst" + # Remove old waveform file + subprocess.run(f"rm -f {waveform_dir}/waveform.vcd", shell=True, check=True) # ================================================================================== - # Execute simulation - # BBSimHarness uses +elf= for ELF loading (via SimDRAM_bb.cc / libelf) - # No fesvr, no +loadmem_addr needed - # - # disasm.log: only stderr -> spike-dasm (Rocket commit printf is stderr here; - # merging stdout with 2>&1 can break: full stdio buffering + non-DASM bytes). - # BDB_SIM_META moves NDJSON banner to sim_meta.txt so it does not pollute disasm. + # Execute simulation script with streaming output # ================================================================================== + bebop_isa_sim = f"{bbdir}/bebop/host/spike/riscv-isa-sim" ld_lib_path = ( + f"{bebop_isa_sim}/install/lib:" f"{bbdir}/result/lib:" f"{arch_dir}/thirdparty/chipyard/tools/DRAMSim2" ) sim_cmd = ( f"export LD_LIBRARY_PATH=\"{ld_lib_path}:$LD_LIBRARY_PATH\"; " - f"export BDB_SIM_META=\"{meta_path}\"; " - f"{bin_path} +permissive " - f"+elf={binary_path} " - f"{'+batch ' if batch else ''}" - f"{coverage_flag + ' ' if coverage_flag else ''}" - f"+fst={fst_path} +log={log_path} +stdout={stdout_path} +trace=all +permissive-off " - f"{binary_path} 2> >(spike-dasm > {log_dir}/disasm.log)" + f"{bin_path} +permissive +loadmem={binary_path} +loadmem_addr=800000000 " + f"{'+batch ' if batch else ''} " + f"+fst={fst_path} +log={log_path} +permissive-off " + f"{binary_path} > >(tee {log_dir}/stdout.log) 2> >(spike-dasm > {log_dir}/disasm.log)" ) script_dir = os.path.dirname(__file__) result = stream_run_logger( cmd=sim_cmd, - logger=context.logger, + logger=ctx.logger, cwd=script_dir, stdout_prefix="verilator sim", stderr_prefix="verilator sim", executable="bash", ) success_result, failure_result = await check_result( - context, returncode=result.returncode, continue_run=True - ) + ctx, returncode=result.returncode, continue_run=True, trace_id=origin_tid) if failure_result: - context.logger.error("sim failed", failure_result) + ctx.logger.error("sim failed", failure_result) return + if os.path.exists(f"{waveform_dir}/waveform.fst.heir"): + subprocess.run( + f"gtkwave -f {waveform_dir}/waveform.fst -H {waveform_dir}/waveform.fst.heir", + shell=True, + check=True, + ) + # ================================================================================== # Return simulation result # ================================================================================== - extra_fields = { - "task": "sim", - "binary": binary_path, - "log_dir": log_dir, - "waveform_dir": waveform_dir, - "timestamp": timestamp, - "sim_meta": meta_path, - } - if coverage: - extra_fields["coverage_dat"] = coverage_dat_path - + # This is the end point of the run workflow, status will no longer be set to processing success_result, failure_result = await check_result( - context, + ctx, result.returncode, continue_run=False, - extra_fields=extra_fields, - ) + extra_fields={ + "task": "sim", + "binary": binary_path, + "log_dir": log_dir, + "waveform_dir": waveform_dir, + "timestamp": timestamp, + }, trace_id=origin_tid) + + # ================================================================================== + # Finish workflow + # ================================================================================== return diff --git a/api/steps/verilator/05_run_api.step.py b/api/steps/verilator/05_run_api.step.py new file mode 100644 index 0000000..b599755 --- /dev/null +++ b/api/steps/verilator/05_run_api.step.py @@ -0,0 +1,25 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +config = { + "name": "verilator-run-api", + "description": "trigger complete verilator workflow", + "flows": ["verilator"], + "triggers": [api("POST", "/verilator/run")], + "enqueues": ["verilator.run"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + body = request.body or {} + + data = { + "binary": body.get("binary", ""), + "config": body.get("config", "sims.verilator.BuckyballToyVerilatorConfig"), + "jobs": body.get("jobs", "16"), + "batch": body.get("batch", False), + "cosim": body.get("cosim", False), + "from_run_workflow": True, + } + + await ctx.enqueue({"topic": "verilator.run", "data": {**data, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/workload/01_buidl_api.step.py b/api/steps/workload/01_buidl_api.step.py new file mode 100644 index 0000000..752523d --- /dev/null +++ b/api/steps/workload/01_buidl_api.step.py @@ -0,0 +1,19 @@ +from motia import ApiRequest, ApiResponse, FlowContext, api + +from utils.path import get_buckyball_path + +config = { + "name": "workload-build-api", + "description": "build workload", + "flows": ["workload"], + "triggers": [api("POST", "/workload/build")], + "enqueues": ["workload.build"], +} + + +async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: + bbdir = get_buckyball_path() + body = request.body or {} + data = {"workload": body.get("workload", "")} + await ctx.enqueue({"topic": "workload.build", "data": {**data, "_trace_id": ctx.trace_id}}) + return ApiResponse(status=202, body={"trace_id": ctx.trace_id}) diff --git a/api/steps/workload/01_build_event.step.py b/api/steps/workload/01_build_event.step.py new file mode 100644 index 0000000..2fee01e --- /dev/null +++ b/api/steps/workload/01_build_event.step.py @@ -0,0 +1,57 @@ +import os +import subprocess +import sys + +from motia import FlowContext, queue + +# Add the utils directory to the Python path +utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if utils_path not in sys.path: + sys.path.insert(0, utils_path) + +from utils.path import get_buckyball_path +from utils.stream_run import stream_run_logger +from utils.event_common import check_result, get_origin_trace_id + +config = { + "name": "workload-build", + "description": "build workload", + "flows": ["workload"], + "triggers": [queue("workload.build")], + "enqueues": [], +} + + +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) + bbdir = get_buckyball_path() + workload_dir = f"{bbdir}/bb-tests" + build_dir = f"{workload_dir}/build" + + # os.mkdir(f"{workload_dir}/build", exist_ok=True) + subprocess.run(f"rm -rf {build_dir} && mkdir -p {build_dir}", shell=True) + + command = f"cd {build_dir} && cmake -G Ninja .. && ninja -j{os.cpu_count()}" + ctx.logger.info( + "Executing workload command", {"command": command, "cwd": build_dir} + ) + result = stream_run_logger( + cmd=command, + logger=ctx.logger, + cwd=workload_dir, + executable="bash", + stdout_prefix="workload build", + stderr_prefix="workload build", + ) + + # ================================================================================== + # Return simulation result + # ================================================================================== + # This is the end of run workflow, status no longer set to processing + success_result, failure_result = await check_result( + ctx, result.returncode, continue_run=False, trace_id=origin_tid) + + # ================================================================================== + # finish workflow + # ================================================================================== + return diff --git a/api/steps/yosys/01_synth_api_step.py b/api/steps/yosys/01_synth_api.step.py similarity index 61% rename from api/steps/yosys/01_synth_api_step.py rename to api/steps/yosys/01_synth_api.step.py index 5eaf482..b50fd19 100644 --- a/api/steps/yosys/01_synth_api_step.py +++ b/api/steps/yosys/01_synth_api.step.py @@ -1,37 +1,30 @@ -import asyncio -from utils.event_common import wait_for_result - +from motia import ApiRequest, ApiResponse, FlowContext, api from utils.path import get_buckyball_path - +from utils.event_common import wait_for_result config = { - "type": "api", "name": "Yosys Synth", "description": "run yosys synthesis for area estimation", - "path": "/yosys/synth", - "method": "POST", - "emits": ["yosys.synth"], "flows": ["yosys"], + "triggers": [api("POST", "/yosys/synth")], + "enqueues": ["yosys.synth"], } -async def handler(req, context): +async def handler(req: ApiRequest, ctx: FlowContext) -> ApiResponse: bbdir = get_buckyball_path() - body = req.get("body") or {} + body = req.body or {} data = { "output_dir": body.get("output_dir", f"{bbdir}/arch/build/"), "top": body.get("top"), "config": body.get("config"), } - await context.emit({"topic": "yosys.synth", "data": data}) + await ctx.enqueue({"topic": "yosys.synth", "data": {**data, "_trace_id": ctx.trace_id}}) # ================================================================================== # Wait for synthesis result # ================================================================================== - while True: - result = await wait_for_result(context) - if result is not None: - return result - await asyncio.sleep(1) + result = await wait_for_result(ctx) + return result diff --git a/api/steps/yosys/01_synth_event_step.py b/api/steps/yosys/01_synth_event.step.py similarity index 78% rename from api/steps/yosys/01_synth_event_step.py rename to api/steps/yosys/01_synth_event.step.py index 1cc2a19..56845c7 100644 --- a/api/steps/yosys/01_synth_event_step.py +++ b/api/steps/yosys/01_synth_event.step.py @@ -4,6 +4,8 @@ import sys import yaml +from motia import FlowContext, queue + # Add the utils directory to the Python path utils_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) if utils_path not in sys.path: @@ -11,15 +13,14 @@ from utils.path import get_buckyball_path from utils.stream_run import stream_run_logger -from utils.event_common import check_result +from utils.event_common import check_result, get_origin_trace_id config = { - "type": "event", "name": "yosys synth", "description": "run yosys synthesis for area estimation", - "subscribes": ["yosys.synth"], - "emits": [], "flows": ["yosys"], + "triggers": [queue("yosys.synth")], + "enqueues": [], } @@ -32,18 +33,19 @@ def load_yosys_config(): return {} -async def handler(data, context): +async def handler(input_data: dict, ctx: FlowContext) -> None: + origin_tid = get_origin_trace_id(input_data, ctx) bbdir = get_buckyball_path() - build_dir = data.get("output_dir", f"{bbdir}/arch/build/") + build_dir = input_data.get("output_dir", f"{bbdir}/arch/build/") arch_dir = f"{bbdir}/arch" # Load yaml config as defaults, CLI args override yosys_cfg = load_yosys_config() - top_module = data.get("top") or yosys_cfg.get("top") or "BuckyballAccelerator" + top_module = input_data.get("top") or yosys_cfg.get("top") or "BuckyballAccelerator" liberty = yosys_cfg.get("liberty") # Path to .lib file, configured in yosys-config.yaml only - elaborate_config = data.get("config") or yosys_cfg.get("elaborate_config", "sims.verilator.BuckyballToyVerilatorConfig") + elaborate_config = input_data.get("config") or yosys_cfg.get("elaborate_config", "sims.verilator.BuckyballToyVerilatorConfig") - context.logger.info(f"Top module: {top_module}, Elaborate config: {elaborate_config}") + ctx.logger.info(f"Top module: {top_module}, Elaborate config: {elaborate_config}") # ================================================================================== # Step 1: Generate Verilog via Elaborate (full SoC), filter DPI-C files later @@ -53,7 +55,7 @@ async def handler(data, context): shutil.rmtree(build_dir) os.makedirs(build_dir, exist_ok=True) - context.logger.info("Step 1: Generating Verilog via Elaborate...") + ctx.logger.info("Step 1: Generating Verilog via Elaborate...") verilog_command = ( f"mill -i __.test.runMain sims.verilator.Elaborate {elaborate_config} " "--disable-annotation-unknown -strip-debug-info -O=debug " @@ -63,19 +65,20 @@ async def handler(data, context): result = stream_run_logger( cmd=verilog_command, - logger=context.logger, + logger=ctx.logger, cwd=arch_dir, stdout_prefix="yosys verilog", stderr_prefix="yosys verilog", ) if result.returncode != 0: - context.logger.error(f"Verilog generation failed with code {result.returncode}") + ctx.logger.error(f"Verilog generation failed with code {result.returncode}") success_result, failure_result = await check_result( - context, + ctx, result.returncode, continue_run=False, extra_fields={"task": "verilog"}, + trace_id=origin_tid, ) return failure_result @@ -88,7 +91,7 @@ async def handler(data, context): # ================================================================================== # Step 2: Run Yosys synthesis for area estimation # ================================================================================== - context.logger.info("Step 2: Running Yosys synthesis for area estimation...") + ctx.logger.info("Step 2: Running Yosys synthesis for area estimation...") # Collect SystemVerilog sources only (.sv), skip .v files which may contain DPI-C imports vsrcs = glob.glob(f"{build_dir}/**/*.sv", recursive=True) @@ -107,16 +110,17 @@ async def handler(data, context): vsrcs = filtered_vsrcs if skipped: - context.logger.info(f"Skipped {len(skipped)} files with unsupported syntax: {', '.join(skipped[:10])}{'...' if len(skipped) > 10 else ''}") - context.logger.info(f"Found {len(vsrcs)} synthesizable SystemVerilog source files") + ctx.logger.info(f"Skipped {len(skipped)} files with unsupported syntax: {', '.join(skipped[:10])}{'...' if len(skipped) > 10 else ''}") + ctx.logger.info(f"Found {len(vsrcs)} synthesizable SystemVerilog source files") if not vsrcs: - context.logger.error(f"No Verilog source files found in {build_dir}") + ctx.logger.error(f"No Verilog source files found in {build_dir}") success_result, failure_result = await check_result( - context, + ctx, 1, continue_run=False, extra_fields={"task": "synth", "error": "No Verilog source files found"}, + trace_id=origin_tid, ) return failure_result @@ -137,7 +141,7 @@ async def handler(data, context): f.write(f"synth -top {top_module}\n") if liberty and os.path.exists(liberty): - context.logger.info(f"Using liberty file: {liberty}") + ctx.logger.info(f"Using liberty file: {liberty}") f.write(f"dfflibmap -liberty {liberty}\n") f.write(f"abc -liberty {liberty}\n") @@ -153,7 +157,7 @@ async def handler(data, context): f.write(f"write_verilog {yosys_output_dir}/synth_netlist.v\n") else: if liberty: - context.logger.warn(f"Liberty file not found: {liberty}, falling back to generic stat") + ctx.logger.warn(f"Liberty file not found: {liberty}, falling back to generic stat") # Hierarchical breakdown before flatten f.write(f"tee -o {yosys_output_dir}/hierarchy_report.txt stat\n") f.write("flatten\n") @@ -161,15 +165,15 @@ async def handler(data, context): f.write("stat\n") f.write(f"tee -o {yosys_output_dir}/area_report.txt stat\n") - context.logger.info(f"Yosys script written to {yosys_script}") - context.logger.info(f"Synthesizing {len(vsrcs)} source files with top module: {top_module}") + ctx.logger.info(f"Yosys script written to {yosys_script}") + ctx.logger.info(f"Synthesizing {len(vsrcs)} source files with top module: {top_module}") # Run yosys yosys_command = f"yosys -s {yosys_script}" result = stream_run_logger( cmd=yosys_command, - logger=context.logger, + logger=ctx.logger, cwd=build_dir, stdout_prefix="yosys synth", stderr_prefix="yosys synth", @@ -183,7 +187,7 @@ async def handler(data, context): timing_report_file = f"{yosys_output_dir}/timing_report.txt" if liberty and os.path.exists(liberty) and os.path.exists(netlist_file) and result.returncode == 0: - context.logger.info("Step 3: Running OpenSTA timing analysis...") + ctx.logger.info("Step 3: Running OpenSTA timing analysis...") # Get target clock period from config (default 10ns = 100MHz) clock_period = yosys_cfg.get("clock_period", 10.0) @@ -202,7 +206,7 @@ async def handler(data, context): sta_result = stream_run_logger( cmd=f"sta {sta_script}", - logger=context.logger, + logger=ctx.logger, cwd=yosys_output_dir, stdout_prefix="opensta", stderr_prefix="opensta", @@ -211,11 +215,11 @@ async def handler(data, context): if sta_result.returncode == 0 and os.path.exists(timing_report_file): with open(timing_report_file, "r") as f: extra["timing_report"] = f.read() - context.logger.info(f"Timing report saved to {timing_report_file}") + ctx.logger.info(f"Timing report saved to {timing_report_file}") else: - context.logger.warn(f"OpenSTA timing analysis failed (rc={sta_result.returncode})") + ctx.logger.warn(f"OpenSTA timing analysis failed (rc={sta_result.returncode})") elif result.returncode == 0: - context.logger.info("Skipping OpenSTA: no liberty file configured or synthesis failed") + ctx.logger.info("Skipping OpenSTA: no liberty file configured or synthesis failed") # ================================================================================== # Return result to API @@ -224,19 +228,20 @@ async def handler(data, context): if os.path.exists(report_file): with open(report_file, "r") as f: extra["area_report"] = f.read() - context.logger.info(f"Area report saved to {report_file}") + ctx.logger.info(f"Area report saved to {report_file}") hierarchy_file = f"{yosys_output_dir}/hierarchy_report.txt" if os.path.exists(hierarchy_file): with open(hierarchy_file, "r") as f: extra["hierarchy_report"] = f.read() - context.logger.info(f"Hierarchy report saved to {hierarchy_file}") + ctx.logger.info(f"Hierarchy report saved to {hierarchy_file}") success_result, failure_result = await check_result( - context, + ctx, result.returncode, continue_run=False, extra_fields=extra, + trace_id=origin_tid, ) return diff --git a/api/utils/event_common.py b/api/utils/event_common.py index 7179692..2008174 100644 --- a/api/utils/event_common.py +++ b/api/utils/event_common.py @@ -3,23 +3,36 @@ """ -async def check_result(context, returncode, continue_run=False, extra_fields=None): +def get_origin_trace_id(input_data, ctx): + """Get the origin trace_id from input_data (passed by API step) or fall back to ctx.trace_id. + + iii 0.7+ assigns a new trace_id per handler invocation, so the API step's + trace_id must be forwarded explicitly via input_data["_trace_id"]. + """ + if isinstance(input_data, dict) and "_trace_id" in input_data: + return input_data["_trace_id"] + return ctx.trace_id + + +async def check_result(ctx, returncode, continue_run=False, extra_fields=None, trace_id=None): """ Check returncode, create appropriate result objects and set state. Args: - context: The event context object + ctx: The flow context object returncode: The return code (int) continue_run: If True, set processing state instead of success/failure extra_fields: Optional dictionary of extra fields to include in result body + trace_id: Optional trace_id to use as state scope (defaults to ctx.trace_id) Returns: tuple: (success_result, failure_result) - one will be None based on returncode and continue_run """ extra_fields = extra_fields or {} + scope = trace_id or ctx.trace_id if continue_run: - await context.state.set(context.trace_id, "processing", True) + await ctx.state.set(scope, "processing", True) return None, None elif returncode != 0: failure_result = { @@ -32,7 +45,7 @@ async def check_result(context, returncode, continue_run=False, extra_fields=Non **extra_fields, }, } - await context.state.set(context.trace_id, "failure", failure_result) + await ctx.state.set(scope, "failure", failure_result) return None, failure_result else: success_result = { @@ -45,67 +58,5 @@ async def check_result(context, returncode, continue_run=False, extra_fields=Non **extra_fields, }, } - await context.state.set(context.trace_id, "success", success_result) + await ctx.state.set(scope, "success", success_result) return success_result, None - - -# ================================================================================== -# API waits for event return result -# -# Expected return result format: -# { -# "status": 200/400/500, -# "body": { -# "success": true/false, -# "failure": true/false, -# "processing": true/false, -# "return_code": 0, -# other fields -# } -# } -# -# Since the Motia framework wraps data in the data field, it needs to be unpacked -# if isinstance(result, dict) and 'data' in result: -# return result['data'] -# return result -# ================================================================================== - - -async def wait_for_result(context): - """ - Check for task completion state (success or failure). - Returns result if found, None if still processing. - - Args: - context: The event context object - - Returns: - dict or None: The result data if task completed, None if still processing - """ - # Check for success result - success_result = await context.state.get(context.trace_id, "success") - if success_result and success_result.get("data"): - # Filter out invalid null state - if success_result == {"data": None} or ( - isinstance(success_result, dict) - and success_result.get("data") is None - and len(success_result) == 1 - ): - await context.state.delete(context.trace_id, "success") - return None - context.logger.info("task completed") - - if isinstance(success_result, dict) and "data" in success_result: - return success_result["data"] - return success_result - - # Check for error status - failure_result = await context.state.get(context.trace_id, "failure") - if failure_result and failure_result.get("data"): - context.logger.error("task failed", failure_result) - - if isinstance(failure_result, dict) and "data" in failure_result: - return failure_result["data"] - return failure_result - - return None diff --git a/api/utils/port.py b/api/utils/port.py index 82e6bfe..f61b8c2 100644 --- a/api/utils/port.py +++ b/api/utils/port.py @@ -9,8 +9,25 @@ def find_available_port(start_port: int = 5000, end_port: int = 5500) -> int: sock.bind(("localhost", port)) return port except OSError: - # Port is already in use, try next one continue - # If no port is available in the range, raise an exception + raise RuntimeError(f"No available port found in range {start_port}-{end_port}") + + +def reserve_port(start_port: int = 5000, end_port: int = 5500) -> tuple[int, socket.socket]: + """Reserve a port by binding and holding the socket open. + + The caller must close the returned socket once the engine has taken over the port. + This prevents TOCTOU races when multiple bbdev instances start concurrently. + """ + for port in range(start_port, end_port + 1): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(("localhost", port)) + return port, sock + except OSError: + sock.close() + continue + raise RuntimeError(f"No available port found in range {start_port}-{end_port}") diff --git a/bbdev b/bbdev index 48eb21e..a203148 100755 --- a/bbdev +++ b/bbdev @@ -8,12 +8,139 @@ import signal import shlex import json import time -import shutil +import tempfile +import threading import requests -from utils import find_available_port - -workflow_dir = os.path.dirname(os.path.abspath(__file__)) + "/api" +sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "api")) +from utils.port import reserve_port + +workflow_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "api") + +NO_PROXY = {"http": None, "https": None, "all": None} + +# Subcommand definitions: {name: {operation: help_text, ...}} +# Operations with string values accept sub-arguments; True means boolean-only (no sub-args). +WORKFLOW_COMMANDS = { + "verilator": { + "clean": True, + "verilog": 'Generate verilog files. Args: "[--balltype ] [--job ] [--batch]"', + "build": 'Build simulation executable. Args: "[--job ]"', + "sim": 'Run simulation. Args: "--binary [--batch]"', + "run": 'Integrated build+sim+run. Args: "--binary [--batch] [--job ]"', + "cosim": 'Run cosimulation. Args: "--binary [--batch]"', + }, + "vcs": { + "clean": True, + "verilog": True, + "build": 'Build simulation executable. Args: "[--job ]"', + "sim": 'Run simulation. Args: "--binary [--batch]"', + "run": 'Integrated build+sim+run. Args: "--binary [--batch] [--job ]"', + }, + "sardine": { + "run": 'Run sardine. Args: "--workload "', + }, + "agent": { + "chat": 'Run agent. Args: "--message ", "--model "', + }, + "workload": { + "build": "Build workload.", + }, + "doc": { + "deploy": True, + }, + "marshal": { + "build": "Build marshal.", + "launch": "Launch marshal.", + }, + "firesim": { + "buildbitstream": "Build bitstream.", + "infrasetup": "Infrasetup.", + "runworkload": "Run workload.", + }, + "compiler": { + "build": "Build compiler.", + }, + "funcsim": { + "build": "Build funcsim.", + "sim": "Sim funcsim.", + }, + "uvm": { + "builddut": "Build dut.", + "build": "Build uvm.", + }, + "palladium": { + "verilog": 'Generate verilog files. Args: "[--config sims.palladium.BuckyballToyP2EConfig]"', + }, +} + +ALL_COMMANDS = ["start", "stop"] + list(WORKFLOW_COMMANDS) + + +def make_iii_config(port: int, worker_port: int, quiet: bool = False) -> str: + """Create a temporary config.yaml with unique HTTP and Worker ports, return the path.""" + base_config = os.path.join(workflow_dir, "config.yaml") + with open(base_config, "r") as f: + content = f.read() + # Replace top-level engine WebSocket port and WorkerModule port (both default 49134), + # then RestApiModule HTTP port (default 3111). + content = content.replace("port: 49134", f"port: {worker_port}") + content = content.replace("port: 3111", f"port: {port}", 1) + if quiet: + content = content.replace( + "exporter: memory", + "exporter: memory\n level: warn\n logs_console_output: false", + ) + tmp = tempfile.NamedTemporaryFile( + mode="w", suffix=".yaml", prefix="iii-config-", dir=workflow_dir, delete=False + ) + tmp.write(content) + tmp.close() + return tmp.name + + +def cleanup_config(config_path: str): + """Remove temp config file, ignoring if already deleted.""" + try: + os.unlink(config_path) + except FileNotFoundError: + pass + + +def _kill_proc_tree(pid: int): + """Kill a process and all its descendants via /proc. + Kills children first (bottom-up) so they are still in the process tree + and discoverable before their parent dies.""" + def _get_children(parent_pid): + children = [] + try: + for entry in os.listdir("/proc"): + if not entry.isdigit(): + continue + try: + with open(f"/proc/{entry}/stat") as f: + stat = f.read().split() + if int(stat[3]) == parent_pid: + child_pid = int(entry) + children.append(child_pid) + children.extend(_get_children(child_pid)) + except (FileNotFoundError, IndexError, ValueError, PermissionError): + continue + except FileNotFoundError: + pass + return children + + # children first, then parent — so children are still discoverable in /proc + children = _get_children(pid) + for p in reversed(children): + try: + os.kill(p, signal.SIGTERM) + except (ProcessLookupError, OSError): + pass + try: + os.kill(pid, signal.SIGTERM) + except (ProcessLookupError, OSError): + pass def parse_args(argv: list[str]) -> argparse.Namespace: @@ -23,560 +150,331 @@ def parse_args(argv: list[str]) -> argparse.Namespace: formatter_class=argparse.RawTextHelpFormatter, ) - # Global parameters - parser.add_argument( - "--port", type=int, default=None, help="Port for dev server (default: None)" + # Global parameters shared across all subcommands + common = argparse.ArgumentParser(add_help=False) + common.add_argument( + "--port", type=int, default=None, help="Port for dev server (default: auto)" ) - parser.add_argument("--server", action="store_true", help="server mode") + common.add_argument("--server", action="store_true", help="Server mode") - # Subcommand parser subparsers = parser.add_subparsers(dest="command", help="Available commands") - # ===== start subcommand ============================================================================== - start_parser = subparsers.add_parser("start", help="Start dev server") - - # ===== stop subcommand ============================================================================== - stop_parser = subparsers.add_parser("stop", help="Stop dev server") - stop_group = stop_parser.add_mutually_exclusive_group(required=False) - stop_group.add_argument("--all", action="store_true", help="Stop all servers") - - # ===== verilator subcommand ========================================================================= - verilator_parser = subparsers.add_parser("verilator", help="Verilator operations") - # Mutually exclusive option group for verilator - only one operation can be selected - verilator_group = verilator_parser.add_mutually_exclusive_group(required=True) - verilator_group.add_argument( - "--clean", action="store_true", help="Clean verilator build directory" - ) - verilator_group.add_argument( - "--verilog", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Generate verilog files from chisel. Args: "[--balltype ] [--job ] [--batch]"', - ) - verilator_group.add_argument( - "--build", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Build verilator simulation executable. Args: "[--job ] [--coverage]"', - ) - verilator_group.add_argument( - "--sim", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Run verilator simulation. Args: "--binary [--batch] [--coverage]"', - ) - verilator_group.add_argument( - "--run", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Integrated build+sim+run. Args: "--binary [--batch] [--job ] [--coverage]"', - ) - verilator_group.add_argument( - "--cosim", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Run verilator cosimulation. Args: "--binary [--batch]"', - ) - - # ===== vcs subcommand ========================================================================= - vcs_parser = subparsers.add_parser("vcs", help="vcs operations") - # Mutually exclusive option group for vcs - only one operation can be selected - vcs_group = vcs_parser.add_mutually_exclusive_group(required=True) - vcs_group.add_argument( - "--clean", action="store_true", help="Clean vcs build directory" - ) - vcs_group.add_argument( - "--verilog", action="store_true", help="Generate verilog files" - ) - vcs_group.add_argument( - "--build", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Build vcs simulation executable. Args: "[--job ]"', - ) - vcs_group.add_argument( - "--sim", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Run vcs simulation. Args: "--binary [--batch]"', - ) - vcs_group.add_argument( - "--run", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Integrated build+sim+run. Args: "--binary [--batch] [--job ]"', - ) + # start / stop + subparsers.add_parser("start", parents=[common], help="Start dev server") + stop_parser = subparsers.add_parser("stop", parents=[common], help="Stop dev server") + stop_parser.add_argument("--all", action="store_true", help="Stop all servers") + + # Build workflow subcommands from WORKFLOW_COMMANDS table + for cmd_name, operations in WORKFLOW_COMMANDS.items(): + cmd_parser = subparsers.add_parser( + cmd_name, parents=[common], help=f"{cmd_name} operations" + ) + group = cmd_parser.add_mutually_exclusive_group(required=True) + for op_name, op_help in operations.items(): + if op_help is True: + group.add_argument(f"--{op_name}", action="store_true", help=f"{op_name.capitalize()}.") + else: + group.add_argument( + f"--{op_name}", + type=str, + nargs="?", + const="", + metavar="ARGS", + help=op_help, + ) - # ===== sardine subcommand ========================================================================= - sardine_parser = subparsers.add_parser("sardine", help="sardine operations") - sardine_group = sardine_parser.add_mutually_exclusive_group(required=True) - sardine_group.add_argument( - "--run", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Run sardine. Args: "--workload [--coverage]"', - ) + return parser.parse_args(argv) - # ===== agent subcommand ========================================================================= - agent_parser = subparsers.add_parser("agent", help="agent operations") - agent_group = agent_parser.add_mutually_exclusive_group(required=True) - agent_group.add_argument( - "--chat", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Run agent. Args: "--message ", "--model "', - ) - # ===== workload subcommand ========================================================================= - workload_parser = subparsers.add_parser("workload", help="workload operations") - workload_group = workload_parser.add_mutually_exclusive_group(required=True) - workload_group.add_argument( - "--build", - type=str, - nargs="?", - const="", - metavar="ARGS", - help="Build workload. ", - ) +def extract_command_info(args: argparse.Namespace) -> dict: + """Extract command, operation, and parsed sub-arguments from argparse Namespace.""" + result = {"command": args.command, "operation": None, "args": {}} - # ===== doc subcommand ========================================================================= - doc_parser = subparsers.add_parser("doc", help="doc operations") - doc_group = doc_parser.add_mutually_exclusive_group(required=True) - doc_group.add_argument("--deploy", action="store_true", help="Deploy doc. ") + for attr_name, attr_value in vars(args).items(): + if attr_name in ("command", "port", "server"): + continue + if attr_value is None or attr_value is False: + continue - # ===== marshal subcommand ========================================================================= - marshal_parser = subparsers.add_parser("marshal", help="marshal operations") - marshal_group = marshal_parser.add_mutually_exclusive_group(required=True) - marshal_group.add_argument( - "--build", type=str, nargs="?", const="", metavar="ARGS", help="Build marshal. " - ) - marshal_group.add_argument( - "--launch", - type=str, - nargs="?", - const="", - metavar="ARGS", - help="Launch marshal. ", - ) + result["operation"] = attr_name + if attr_value is True or attr_value == "": + result["args"] = {} + else: + result["args"] = _parse_sub_args(attr_value) + break - # ===== firesim subcommand ========================================================================= - firesim_parser = subparsers.add_parser("firesim", help="firesim operations") - firesim_group = firesim_parser.add_mutually_exclusive_group(required=True) - firesim_group.add_argument( - "--buildbitstream", - type=str, - nargs="?", - const="", - metavar="ARGS", - help="Build bitstream. ", - ) - firesim_group.add_argument( - "--infrasetup", - type=str, - nargs="?", - const="", - metavar="ARGS", - help="Infrasetup. ", - ) - firesim_group.add_argument( - "--runworkload", - type=str, - nargs="?", - const="", - metavar="ARGS", - help="Run workload. ", - ) + return result - # ===== compiler subcommand ========================================================================= - compiler_parser = subparsers.add_parser("compiler", help="compiler operations") - compiler_group = compiler_parser.add_mutually_exclusive_group(required=True) - compiler_group.add_argument( - "--build", - type=str, - nargs="?", - const="", - metavar="ARGS", - help="Build compiler. ", - ) - # ===== funcsim subcommand ========================================================================= - funcsim_parser = subparsers.add_parser("funcsim", help="funcsim operations") - funcsim_group = funcsim_parser.add_mutually_exclusive_group(required=True) - funcsim_group.add_argument( - "--build", type=str, nargs="?", const="", metavar="ARGS", help="Build funcsim. " - ) - funcsim_group.add_argument( - "--sim", type=str, nargs="?", const="", metavar="ARGS", help="Sim funcsim. " - ) +def _parse_sub_args(arg_string: str) -> dict: + """Parse a sub-argument string like '--binary /path --batch' into a dict.""" + args_dict = {} + try: + tokens = shlex.split(arg_string) + except ValueError as e: + print(f"Error parsing arguments: {e}") + return args_dict + + i = 0 + while i < len(tokens): + token = tokens[i] + if token.startswith("--"): + key = token[2:] + if i + 1 < len(tokens) and not tokens[i + 1].startswith("--"): + args_dict[key] = tokens[i + 1] + i += 2 + else: + args_dict[key] = True + i += 1 + elif token.startswith("-") and len(token) == 2: + key = token[1:] + if i + 1 < len(tokens) and not tokens[i + 1].startswith("--"): + args_dict[key] = tokens[i + 1] + i += 2 + else: + args_dict[key] = True + i += 1 + else: + i += 1 + return args_dict + + +def wait_for_http(url: str, check_fn, timeout: int, error_msg: str): + """Poll an HTTP endpoint until check_fn(response) returns True, or timeout.""" + for _ in range(timeout): + try: + resp = requests.get(url, timeout=1, proxies=NO_PROXY) + if check_fn(resp): + return True + except requests.exceptions.ConnectionError: + pass + except requests.exceptions.RequestException: + return True # non-connection error means server is reachable + time.sleep(1) + print(error_msg) + return False + + +def forward_iii_output(pipe, active_event: threading.Event, quiet: bool = False): + """Forward task output to stdout. In quiet mode, suppress engine INFO/WARN logs.""" + for line in iter(pipe.readline, ""): + if not active_event.is_set(): + continue + if quiet: + # Pass through: ANSI-colored step output and ERROR lines; drop engine INFO/WARN + if line.startswith("\033[") or "[ERROR]" in line: + sys.stdout.write(line) + sys.stdout.flush() + else: + sys.stdout.write(line) + sys.stdout.flush() + pipe.close() + + +def read_state_file(path: str): + """Read JSON from an iii state file (JSON + binary padding). Returns parsed dict or None.""" + try: + with open(path, "r", errors="ignore") as f: + raw = f.read() + state_data, _ = json.JSONDecoder().raw_decode(raw) + return state_data + except (FileNotFoundError, json.JSONDecodeError, ValueError): + return None + + +def run_server_mode(args, cmd_info): + """Handle --server mode: interact with an already-running (or new) iii server.""" + command = cmd_info["command"] + + if command == "start": + print(f"Starting dev server on port {args.port}") + config_path = make_iii_config(args.port, 49134) if args.port else os.path.join(workflow_dir, "config.yaml") + try: + subprocess.run(["iii", "-c", config_path], cwd=workflow_dir, check=True) + finally: + if args.port: + cleanup_config(config_path) - # ===== uvm subcommand ========================================================================= - uvm_parser = subparsers.add_parser("uvm", help="uvm operations") - uvm_group = uvm_parser.add_mutually_exclusive_group(required=True) - uvm_group.add_argument( - "--builddut", type=str, nargs="?", const="", metavar="ARGS", help="Build dut. " - ) - uvm_group.add_argument( - "--build", type=str, nargs="?", const="", metavar="ARGS", help="Build uvm. " - ) + elif command == "stop": + if cmd_info["operation"] == "all": + print("Stopping all servers") + subprocess.run( + "kill -9 $(ps aux | grep '[i]ii' | awk '{print $2}')", + shell=True, check=False, text=True, + ) + else: + print(f"Stopping server on port {args.port}") + subprocess.run( + f"kill -TERM $(lsof -t -i :{args.port})", + shell=True, check=False, text=True, + ) - # ===== yosys subcommand ===================================================================== - yosys_parser = subparsers.add_parser("yosys", help="yosys synthesis operations") - yosys_group = yosys_parser.add_mutually_exclusive_group(required=True) - yosys_group.add_argument( - "--synth", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Run yosys synthesis for area estimation.\n' - 'Args: "[--top ] [--config ]"\n' - ' --top : top module for synthesis (default from yosys-config.yaml)\n' - ' --config : Elaborate config class (default: sims.verilator.BuckyballToyVerilatorConfig)\n' - 'Liberty library can be configured in bbdev/api/steps/yosys/scripts/yosys-config.yaml', - ) + elif command in WORKFLOW_COMMANDS: + api_path = f"/{command}/{cmd_info['operation']}" + resp = requests.post( + f"http://localhost:{args.port}{api_path}", + json=cmd_info["args"], + timeout=30, + proxies=NO_PROXY, + ) + print(resp.text) + + else: + print(f"Unknown command: {command}") + print(f"Available commands: {', '.join(ALL_COMMANDS)}") + sys.exit(1) + + +def _submit_and_poll(port: int, api_path: str, cmd_info: dict): + """Submit a task to iii and poll for its result. Returns exit code (0=ok, 1=fail).""" + base_url = f"http://localhost:{port}" + print(f"Executing {cmd_info['command']} {cmd_info['operation']}...") + try: + submit_resp = requests.post( + f"{base_url}{api_path}", + json=cmd_info["args"], + timeout=30, + proxies=NO_PROXY, + ) + submit_data = submit_resp.json() + except Exception as e: + print(f"Error: Failed to submit task: {e}") + return 1 + + if submit_resp.status_code >= 400: + print(f"Error: {submit_data}") + return 1 + + trace_id = submit_data.get("trace_id", "") + if not trace_id: + print(f"Error: No trace_id in response: {submit_data}") + return 1 + + # Poll for result + state_file = os.path.join(workflow_dir, "data", "state_store.db", f"{trace_id}.bin") + poll_timeout = 600 # 10 minute max + task_result = None + for _ in range(poll_timeout // 2): + state_data = read_state_file(state_file) + if state_data: + if "success" in state_data: + task_result = state_data["success"].get("body", state_data["success"]) + break + elif "failure" in state_data: + task_result = state_data["failure"].get("body", state_data["failure"]) + break + time.sleep(2) + else: + print("Error: Task polling timed out after 10 minutes") + return 1 + + print(f"\nTask completed on http://localhost:{port}") + if task_result and not task_result.get("success", False): + print("Error: Task failed") + return 1 + return 0 + + +def run_script_mode(args, cmd_info): + """Handle script mode: start iii, run one task, then shut down.""" + command = cmd_info["command"] + + if command in ("start", "stop"): + print(f"'{command}' does nothing in script mode") + return + + if command not in WORKFLOW_COMMANDS: + print(f"Unknown command: {command}") + print(f"Available commands: {', '.join(ALL_COMMANDS)}") + sys.exit(1) + + api_path = f"/{command}/{cmd_info['operation']}" + + # Reserve both ports atomically to prevent TOCTOU races with concurrent bbdev instances. + # Sockets stay bound until the engine has taken over (HTTP is up), then are released. + if args.port: + available_port = args.port + http_sock = None + else: + available_port, http_sock = reserve_port(start_port=5100, end_port=5500) + worker_port, worker_sock = reserve_port(start_port=49134, end_port=49500) + print(f"Starting server on port {available_port}...") + + # Clean stale state files from previous runs + state_dir = os.path.join(workflow_dir, "data", "state_store.db") + if os.path.isdir(state_dir): + for f in os.listdir(state_dir): + if f.endswith(".bin"): + try: + os.unlink(os.path.join(state_dir, f)) + except OSError: + pass - # ===== palladium subcommand =================================================================== - palladium_parser = subparsers.add_parser("palladium", help="palladium operations") - palladium_group = palladium_parser.add_mutually_exclusive_group(required=True) - palladium_group.add_argument( - "--verilog", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Generate verilog files from chisel. Args example: "[--config sims.palladium.BuckyballToyP2EConfig]"', + config_path = make_iii_config(available_port, worker_port, quiet=True) + iii_output_active = threading.Event() + iii_output_active.set() + + # Release reserved sockets just before engine starts so it can bind the same ports. + if http_sock: + http_sock.close() + worker_sock.close() + + # Set III_URL so motia SDK connects to the correct worker port + iii_env = {**os.environ, "III_URL": f"ws://localhost:{worker_port}"} + proc = subprocess.Popen( + ["iii", "-c", config_path], + cwd=workflow_dir, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + env=iii_env, ) - # ===== pegasus subcommand =================================================================== - pegasus_parser = subparsers.add_parser("pegasus", help="pegasus operations") - pegasus_group = pegasus_parser.add_mutually_exclusive_group(required=True) - pegasus_group.add_argument( - "--verilog", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Generate verilog files from chisel. Args example: "[--config sims.pegasus.PegasusConfig]"', - ) - pegasus_group.add_argument( - "--buildbitstream", - type=str, - nargs="?", - const="", - metavar="ARGS", - help='Build AU280 bitstream from pre-generated Verilog. Args example: "[--generated_dir /path] [--output_dir /path]"', + output_thread = threading.Thread( + target=forward_iii_output, args=(proc.stdout, iii_output_active, True), daemon=True ) + output_thread.start() + + try: + base_url = f"http://localhost:{available_port}" + + # Phase 1: wait for iii engine HTTP + if not wait_for_http( + f"{base_url}/", + check_fn=lambda r: True, + timeout=30, + error_msg="Server failed to start within 30 seconds", + ): + sys.exit(1) - # Parse arguments, allowing unknown arguments (so --port can work in subcommands) - args, unknown = parser.parse_known_args(argv) - - # If --port and --server are in unknown arguments, handle them manually - while unknown: - found = False - for i in range(len(unknown)): - if unknown[i] == "--port": - try: - args.port = int(unknown[i + 1]) - unknown = unknown[i + 2 :] # Remove processed arguments - found = True - break - except (ValueError, IndexError): - break - if unknown[i] == "--server": - args.server = True - unknown = unknown[i + 1 :] # Remove processed arguments - found = True - break - if not found: - break - # If there are other unknown arguments, throw error - if unknown: - parser.error(f"unrecognized arguments: {' '.join(unknown)}") - - return args - - -def extract_command_info(args): - """Generic command info extractor, applicable to all commands""" - # Basic return structure - result = {"command": getattr(args, "command", None), "operation": None, "args": {}} - - # Dynamically extract operation type (iterate through all attributes of args) - for attr_name in dir(args): - # Skip built-in attributes and known non-operation attributes - if attr_name.startswith("_") or attr_name in ["command", "port"]: - continue + # Phase 2: wait for route registration + if not wait_for_http( + f"{base_url}{api_path}", + check_fn=lambda r: r.status_code != 404, + timeout=120, + error_msg="Routes failed to register within timeout", + ): + sys.exit(1) - attr_value = getattr(args, attr_name) - # Find attributes with value True or non-empty string (skip None and False) - if attr_value is not None and attr_value is not False: - result["operation"] = attr_name - if attr_value is True: - # Boolean operations, such as --clean, --verilog - result["args"] = {} - else: - # Operations with arguments, such as --sim "arg_string" - # Parse argument string - args_dict = {} - if attr_value: - try: - # use shlex to parse args - arg_tokens = shlex.split(attr_value) - i = 0 - while i < len(arg_tokens): - token = arg_tokens[i] - if token.startswith("--"): - # long option - option_name = token[2:] # remove -- - if i + 1 < len(arg_tokens) and not arg_tokens[ - i + 1 - ].startswith("-"): - # next token is value - args_dict[option_name] = arg_tokens[i + 1] - i += 2 - else: - # boolean flag - args_dict[option_name] = True - i += 1 - elif token.startswith("-") and len(token) == 2: - # short option - option_name = token[1:] # remove - - if i + 1 < len(arg_tokens) and not arg_tokens[ - i + 1 - ].startswith("-"): - # next token is value - args_dict[option_name] = arg_tokens[i + 1] - i += 2 - else: - # boolean flag - args_dict[option_name] = True - i += 1 - else: - # position argument, skip - i += 1 - except ValueError as e: - print(f"Error parsing arguments: {e}") - result["args"] = args_dict - break + print(f"Server is ready on port {available_port}") + rc = _submit_and_poll(available_port, api_path, cmd_info) + if rc != 0: + sys.exit(rc) - return result + finally: + # Cleanup: kill iii and all its descendant processes, then remove temp config + iii_output_active.clear() + time.sleep(0.5) + _kill_proc_tree(proc.pid) + proc.wait() + cleanup_config(config_path) if __name__ == "__main__": args = parse_args(sys.argv[1:]) cmd_info = extract_command_info(args) - # print(f"Command: {cmd_info['command']}") - # print(f"Operation: {cmd_info['operation']}") - # print(f"Arguments: {cmd_info['args']}") - - # ================================================================================== - # Two modes: server mode and script mode - # - # server mode: Manually start/stop server, can visualize and access workflows through browser - # script mode: Automatically assign port, start service when task begins, stop service when task ends - # ================================================================================== if args.server: - if cmd_info["command"] == "start": - print(f"Starting dev server on port {args.port}") - subprocess.run( - ["pnpm", "dev", "--port", str(args.port)], - cwd=workflow_dir, - check=True, - ) - - elif cmd_info["command"] == "stop": - if cmd_info["operation"] == "all": - print("Stopping all servers") - subprocess.run( - "kill -9 $(ps aux | grep '[m]otia' | awk '{print $2}')", - shell=True, - check=False, - text=True, - ) - else: - print(f"Stopping server on port {args.port}") - subprocess.run( - f"kill -TERM $(lsof -t -i :{args.port})", - shell=True, - check=False, - text=True, - ) - - elif cmd_info["command"] in [ - "verilator", - "vcs", - "sardine", - "agent", - "workload", - "doc", - "marshal", - "firesim", - "compiler", - "funcsim", - "uvm", - "palladium", - "yosys", - "pegasus", - ]: - api_path = f"/{cmd_info['command']}/{cmd_info['operation']}" - json_data = json.dumps(cmd_info["args"]) - subprocess.run( - [ - "curl", - "-X", - "POST", - f"http://localhost:{args.port}{api_path}", - "-H", - "Content-Type: application/json", - "-d", - json_data, - ], - cwd=workflow_dir, - check=True, - ) - else: - print(f"Unknown command: {cmd_info['command']}") - print( - "Available commands: start, stop, verilator, vcs, sardine, agent, \ - workload, doc, marshal, firesim, compiler, funcsim, uvm, yosys, pegasus" - ) - sys.exit(1) - else: # script mode - if cmd_info["command"] == "start": - print(" 'start' do nothing in script mode") - elif cmd_info["command"] == "stop": - print(" 'stop' do nothing in script mode") - elif cmd_info["command"] in [ - "verilator", - "vcs", - "sardine", - "agent", - "workload", - "doc", - "marshal", - "firesim", - "compiler", - "funcsim", - "uvm", - "palladium", - "yosys", - "pegasus", - ]: - # 0. Clean up stale BullMQ/Redis data to prevent replaying old events - aof_dir = os.path.join(workflow_dir, ".motia", "appendonlydir") - if os.path.exists(aof_dir): - shutil.rmtree(aof_dir) - - # 1. Start service in background ================================ - # If port is specified, use the specified port; otherwise, automatically assign port - if args.port: - available_port = args.port - else: - available_port = find_available_port(start_port=5100, end_port=5500) - print(f"Starting server on port {available_port}...") - proc = subprocess.Popen( - ["pnpm", "dev", "--port", str(available_port)], - cwd=workflow_dir, - stderr=subprocess.DEVNULL # Suppress stderr to hide BullMQ errors - ) - - # Wait for service to start - max_retries = 300 - for i in range(max_retries): - try: - # Disable proxy, connect directly to localhost - # response = requests.get(f"http://localhost:{available_port}", timeout=1) - response = requests.get( - f"http://localhost:{available_port}", - timeout=1, - proxies={"http": None, "https": None, "all": None}, - ) - if response.status_code == 200: - print(f"Server is ready on port {available_port}") - break - except requests.exceptions.RequestException: - pass - time.sleep(3) - else: - print("Server failed to start within 30 seconds") - subprocess.run( - f"kill -TERM $(lsof -t -i :{available_port})", - shell=True, - check=False, - text=True, - ) - proc.terminate() - proc.wait() - sys.exit(1) - - # 2. Execute API call ================================ - api_path = f"/{cmd_info['command']}/{cmd_info['operation']}" - json_data = json.dumps(cmd_info["args"]) - print(f"Executing {cmd_info['command']} {cmd_info['operation']}...") - # Disable proxy, connect directly to localhost - result = subprocess.run( - [ - "curl", - "-sS", - "--noproxy", - "localhost", - "-X", - "POST", - f"http://localhost:{available_port}{api_path}", - "-H", - "Content-Type: application/json", - "-d", - json_data, - ], - cwd=workflow_dir, - capture_output=True, - text=True, - ) - - # 3. Shutdown service ================================ - # Give observability plugin time to finish async operations (e.g., Redis writes) - time.sleep(1) - proc.terminate() - proc.wait() - print( - f"\nTask completed. Command running on http://localhost:{available_port} is finished" - ) - - # Check the success field returned by API - try: - response = json.loads(result.stdout) - if not response.get("success", False): - print("Error: Task failed") - sys.exit(1) - except Exception: - print("Error: Invalid API response") - sys.exit(1) - - else: - print(f"Unknown command: {cmd_info['command']}") - print( - "Available commands: start, stop, verilator, vcs, sardine, agent, \ - workload, doc, marshal, firesim, compiler, funcsim, uvm, yosys, pegasus" - ) - sys.exit(1) + run_server_mode(args, cmd_info) + else: + run_script_mode(args, cmd_info) diff --git a/flake.nix b/flake.nix index fe81a33..039d33b 100644 --- a/flake.nix +++ b/flake.nix @@ -15,11 +15,7 @@ { devShells.default = pkgs.mkShell { buildInputs = with pkgs; [ - # Node.js runtime (Motia is a Node.js framework) - nodejs_22 - nodePackages.pnpm - - # Python with project dependencies (managed by Nix, not requirements.txt) + # Python with system-level dependencies (managed by Nix) bbdevPythonPkgs # System tools @@ -29,12 +25,18 @@ shellHook = '' export PATH="$PWD:$PATH" - source "$PWD/init.sh" + + # Create venv for pip-managed packages (motia, iii-sdk) + VENV_DIR="$PWD/api/.venv" + if [ ! -d "$VENV_DIR" ]; then + echo "Creating Python venv..." + python3 -m venv "$VENV_DIR" --system-site-packages + "$VENV_DIR/bin/pip" install -q motia[otel]==1.0.0rc17 iii-sdk==0.2.0 + fi + export PATH="$VENV_DIR/bin:$PATH" echo "bbdev dev environment ready" - echo " Node.js: $(node --version)" echo " Python: $(python3 --version)" - echo " pnpm: $(pnpm --version)" echo " bbdev: $(which bbdev)" ''; };