From 7b373eeca0ebb86f7fcb0312a77475e702c555ae Mon Sep 17 00:00:00 2001 From: georgi Date: Fri, 31 Oct 2025 00:38:45 +0100 Subject: [PATCH] feat: add csv data processor dsl example --- .../examples/nodetool-base/README_EXAMPLES.md | 5 +- .../nodetool-base/csv_data_processor_dsl.py | 87 +++++++++++++++++++ 2 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 src/nodetool/examples/nodetool-base/csv_data_processor_dsl.py diff --git a/src/nodetool/examples/nodetool-base/README_EXAMPLES.md b/src/nodetool/examples/nodetool-base/README_EXAMPLES.md index 3c93b96..1988f60 100644 --- a/src/nodetool/examples/nodetool-base/README_EXAMPLES.md +++ b/src/nodetool/examples/nodetool-base/README_EXAMPLES.md @@ -1,6 +1,6 @@ # NodeTool Base DSL Examples -This directory contains 22 comprehensive examples demonstrating the capabilities of NodeTool's DSL for building AI-powered workflows. +This directory contains 23 comprehensive examples demonstrating the capabilities of NodeTool's DSL for building AI-powered workflows. ## Original Examples (10) @@ -20,7 +20,7 @@ This directory contains 22 comprehensive examples demonstrating the capabilities 9. **categorize_mails_dsl.py** - Automatically classify emails with AI and apply labels 10. **data_generator_dsl.py** - Generate synthetic datasets using AI models -## New Examples (12) +## New Examples (13) ### AI Content Generation 11. **social_media_sentiment_dsl.py** - Analyze sentiment and emotions in social media posts @@ -37,6 +37,7 @@ This directory contains 22 comprehensive examples demonstrating the capabilities 20. **job_application_analyzer_dsl.py** - Analyze job descriptions and provide application advice 21. **competitive_analysis_dsl.py** - Analyze competitor offerings and market positioning 22. **data_validation_pipeline_dsl.py** - Validate and clean data for quality assurance +23. **csv_data_processor_dsl.py** - Clean CSV sales pipelines and export prioritized client tables ## How to Run Examples diff --git a/src/nodetool/examples/nodetool-base/csv_data_processor_dsl.py b/src/nodetool/examples/nodetool-base/csv_data_processor_dsl.py new file mode 100644 index 0000000..2bf04e6 --- /dev/null +++ b/src/nodetool/examples/nodetool-base/csv_data_processor_dsl.py @@ -0,0 +1,87 @@ +""" +CSV Data Processor DSL Example + +Ingest, clean, and export sales pipeline data stored in CSV files. + +Workflow: +1. **Seed Workspace** – Write a sample sales CSV file into the NodeTool workspace +2. **Filter Records** – Keep only completed deals above a revenue threshold +3. **Map Columns** – Rename and select the most relevant columns for reporting +4. **Export Outputs** – Save the refined table and expose a priority client list +""" + +from nodetool.dsl.graph import create_graph, run_graph +from nodetool.dsl.nodetool.workspace import WriteTextFile +from nodetool.dsl.nodetool.data import ( + LoadCSVFile, + Filter, + Rename, + SelectColumn, + SaveCSVDataframeFile, + ToList, +) +from nodetool.dsl.nodetool.list import MapField +from nodetool.dsl.nodetool.output import DataframeOutput, ListOutput + + +# --- Workspace setup --------------------------------------------------------- +sales_seed_file = WriteTextFile( + path="sales_pipeline.csv", + content=( + "company,region,status,revenue\n" + "Acme Rockets,North America,Completed,12500\n" + "Beacon Analytics,Europe,Prospecting,4200\n" + "Cascade Systems,North America,Completed,9800\n" + "Delta Freight,Asia Pacific,Completed,4700\n" + "Evergreen Labs,Europe,Completed,15750\n" + "Futura Robotics,Latin America,Prospecting,3100\n" + "Glide Solar,North America,Completed,6200\n" + ), +) + + +# --- Dataframe transformations ----------------------------------------------- +raw_sales = LoadCSVFile(file_path=sales_seed_file.output) +qualified_sales = Filter( + df=raw_sales.output, + condition="status == 'Completed' and revenue >= 6000", +) +renamed_sales = Rename( + dataframe=qualified_sales.output, + rename_map="company:client_name,revenue:total_revenue", +) +selected_columns = SelectColumn( + dataframe=renamed_sales.output, + columns="client_name,region,total_revenue", +) + +export_csv = SaveCSVDataframeFile( + dataframe=selected_columns.output, + filename="qualified_sales.csv", +) + +sales_records = ToList(dataframe=selected_columns.output) +priority_clients = MapField(values=sales_records.output, field="client_name") + + +# --- Workflow outputs -------------------------------------------------------- +clean_table_output = DataframeOutput( + name="qualified_sales_table", + value=export_csv.output, +) +client_list_output = ListOutput( + name="priority_clients", + value=priority_clients.output, +) + + +graph = create_graph(clean_table_output, client_list_output) + + +if __name__ == "__main__": + result = run_graph(graph) + print( + "Qualified sales table rows:", + result["qualified_sales_table"], + ) + print("Priority clients:", result["priority_clients"])