From 6b29e14a64c74ed6af2da02a26f0c1060f2e6627 Mon Sep 17 00:00:00 2001 From: Da An Date: Thu, 6 Feb 2025 17:09:02 -0800 Subject: [PATCH] add read image and process lables natebook --- .../read_image_and_process_labels.ipynb | 515 ++++++++++++++++++ 1 file changed, 515 insertions(+) create mode 100644 samples/ml/container_runtime/read_image_and_process_labels.ipynb diff --git a/samples/ml/container_runtime/read_image_and_process_labels.ipynb b/samples/ml/container_runtime/read_image_and_process_labels.ipynb new file mode 100644 index 00000000..f0799d7e --- /dev/null +++ b/samples/ml/container_runtime/read_image_and_process_labels.ipynb @@ -0,0 +1,515 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "3775908f-ca36-4846-8f38-5adca39217f2", + "metadata": { + "language": "python", + "name": "cell1", + "resultHeight": 0 + }, + "outputs": [], + "source": [ + "# Import python packages\n", + "import streamlit as st\n", + "import pandas as pd\n", + "\n", + "# We can also use Snowpark for our analyses!\n", + "from snowflake.snowpark.context import get_active_session\n", + "session = get_active_session()\n" + ] + }, + { + "cell_type": "markdown", + "id": "e61e1748-e726-4927-8f49-38647e3f28f4", + "metadata": { + "collapsed": false, + "name": "cell10", + "resultHeight": 46 + }, + "source": [ + "### Create a Data Source to read unstructured data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f71fe16b-c1df-4409-8199-ea99b4fe3769", + "metadata": { + "language": "python", + "name": "cell4", + "resultHeight": 0 + }, + "outputs": [], + "source": [ + "from snowflake.ml.ray.datasource import SFStageImageDataSource, SFStageTextDataSource\n", + "\n", + "image_source = SFStageImageDataSource(\n", + " stage_location = \"@DATA_STAGE_RAY/images/\",\n", + " database = \"ST_DB\",\n", + " schema = \"ST_SCHEMA\",\n", + " image_size=(256, 256),\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2324e409-b4c5-4405-ad1c-267831be1773", + "metadata": { + "language": "python", + "name": "cell15" + }, + "outputs": [], + "source": [ + "label_source = SFStageTextDataSource(\n", + " stage_location = \"@DATA_STAGE_RAY/labels/\",\n", + " database = \"ST_DB\",\n", + " schema = \"ST_SCHEMA\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "206f9a9e-df5b-4a75-8f6f-1e4fbb6fbdd3", + "metadata": { + "collapsed": false, + "name": "cell11", + "resultHeight": 46 + }, + "source": [ + "### Load into a ray dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "41bc513b-b40c-4b27-b429-a04bfb18b962", + "metadata": { + "language": "python", + "name": "cell5", + "resultHeight": 71 + }, + "outputs": [], + "source": [ + "import ray\n", + "\n", + "image_ds = ray.data.read_datasource(image_source)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11781ea0-dfc8-42a9-baef-3f5ce7b88280", + "metadata": { + "language": "python", + "name": "cell20" + }, + "outputs": [], + "source": [ + "print(f'Total load {image_ds.count()} images')\n", + "image_ds.show(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "30d0b330-33bf-4033-bacb-fd1301933302", + "metadata": { + "language": "python", + "name": "cell16" + }, + "outputs": [], + "source": [ + "label_ds = ray.data.read_datasource(label_source, concurrency=8)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7beb066c-62be-476a-825c-5d4f66a6b6f5", + "metadata": { + "language": "python", + "name": "cell19" + }, + "outputs": [], + "source": [ + "label_ds.show(5)" + ] + }, + { + "cell_type": "markdown", + "id": "28a3dd4a-0da6-4072-a0c9-faeb0b127bb8", + "metadata": { + "collapsed": false, + "name": "cell17" + }, + "source": [ + "### Process both dataset to include addition columns\n", + "**Image Dataset**: add a join key, encode the images, standardize image\\n\n", + "\n", + "**Label Dataset**: add a join key, interrpet the labels" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b9b19b8c-ed51-45a3-b277-003a5d16fbf6", + "metadata": { + "language": "python", + "name": "cell26" + }, + "outputs": [], + "source": [ + "import numpy as np\n", + "from typing import Dict\n", + "import base64\n", + "import os\n", + "import torch\n", + "\n", + "def process_image(row):\n", + " # If grayscale (2D), convert to 3D\n", + " img = row['image']\n", + " if len(img.shape) == 2:\n", + " row['image'] = np.stack([img] * 3, axis=-1) # Duplicate grayscale channel 3 times\n", + "\n", + " encoded_image = base64.b64encode(row['image'])\n", + " row['encoded_image'] = encoded_image\n", + "\n", + " fn = row['file_name']\n", + " join_id = os.path.splitext(fn)[0].split('/')[-1]\n", + " row['join_id'] = join_id\n", + " return row\n", + "\n", + "# processed_image_ds = image_ds.map_batches(convert_to_torch, concurrency=4)\n", + "processed_image_ds = image_ds.map(process_image)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dc2af3b0-968c-4f43-9a89-c68df5cb899a", + "metadata": { + "collapsed": false, + "language": "python", + "name": "cell7" + }, + "outputs": [], + "source": [ + "processed_image_ds.show(1)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "02b39bfc-ab79-4bf9-91a0-e51122622664", + "metadata": { + "language": "python", + "name": "cell23" + }, + "outputs": [], + "source": [ + "import os\n", + "\n", + "def expand_label_column(batch: pd.DataFrame) -> pd.DataFrame:\n", + " xmin_list = []\n", + " ymin_list = []\n", + " xmax_list = []\n", + " ymax_list = []\n", + " class_list = []\n", + " file_names = []\n", + " ids = []\n", + " \n", + " # Process each row\n", + " for _, row in batch.iterrows():\n", + " # Split the text and convert to list\n", + " values = row['text'].strip().split()\n", + " \n", + " # Ensure we have exactly 5 values\n", + " if len(values) != 5:\n", + " raise ValueError(f\"Expected 5 values in text, but got {len(values)} values\")\n", + " \n", + " # Add values to respective lists\n", + " xmin_list.append(float(values[0]))\n", + " ymin_list.append(float(values[1]))\n", + " xmax_list.append(float(values[2]))\n", + " ymax_list.append(float(values[3]))\n", + " class_list.append(int(values[4]))\n", + " file_name = row['file_name']\n", + " file_names.append(file_name)\n", + " ids.append(os.path.splitext(file_name)[0].split('/')[-1] + '_test')\n", + " \n", + " # Create new dataframe\n", + " new_df = pd.DataFrame({\n", + " 'join_id': ids,\n", + " 'file_name': file_names,\n", + " 'xmin': xmin_list,\n", + " 'ymin': ymin_list,\n", + " 'xmax': xmax_list,\n", + " 'ymax': ymax_list,\n", + " 'class': class_list,\n", + " })\n", + " return new_df \n", + "\n", + "processed_label_ds = label_ds.map_batches(expand_label_column, concurrency=20, batch_format='pandas')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1a0c06a4-3ab3-445b-bcc3-dd26ee716b4a", + "metadata": { + "language": "python", + "name": "cell6" + }, + "outputs": [], + "source": [ + "processed_label_ds.show(1)" + ] + }, + { + "cell_type": "markdown", + "id": "4b98bd97-140d-4e81-a760-b37a5cfd0a5e", + "metadata": { + "collapsed": false, + "name": "cell21" + }, + "source": [ + "### Merge image source and label source into a single dataset\n", + "We have two ways of achieving this: 1) if customer is more famaliar with `pandas.Dataframe` and if the data fit into memory, then we can convert all data into pandas (or write into snowflake) and do the rest of the ops. 2) If the data does not fit into memory, we can directly leverage ray dataset to do the processing. \n", + "\n", + "**Note**: Ray dataset is not naturally architeched to support join ops, so it's better for to use other method (in memory / snowflake) to perform joins" + ] + }, + { + "cell_type": "markdown", + "id": "5d43a6e8-3c3b-42c6-bac3-8dffb59acd6b", + "metadata": { + "collapsed": false, + "name": "cell27" + }, + "source": [ + "#### Convert both dataset into pandas and perform joins" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a9a18c6e-9851-4fb9-98e1-2ff5aef48b29", + "metadata": { + "language": "python", + "name": "cell24" + }, + "outputs": [], + "source": [ + "image_df = processed_image_ds.drop_columns(cols=['image']).to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13d6ca4a-bedd-494c-a245-e799becc344b", + "metadata": { + "language": "python", + "name": "cell25" + }, + "outputs": [], + "source": [ + "image_df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e4f04b19-aa68-4671-9463-cb3d1d16e1a4", + "metadata": { + "language": "python", + "name": "cell31" + }, + "outputs": [], + "source": [ + "label_df = processed_label_ds.to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "43965eb1-e487-4c75-938d-cda48e871a87", + "metadata": { + "language": "python", + "name": "cell32" + }, + "outputs": [], + "source": [ + "label_df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2c756579-305e-4ac7-b7f1-54c1341606c8", + "metadata": { + "language": "python", + "name": "cell33" + }, + "outputs": [], + "source": [ + "# perform merge \n", + "merged_train_df = pd.merge(image_df, label_df, how='inner', on='join_id')\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3afe32e8-7525-4385-812f-bd802d7e95ec", + "metadata": { + "language": "python", + "name": "cell34" + }, + "outputs": [], + "source": [ + "merged_train_df.head()" + ] + }, + { + "cell_type": "markdown", + "id": "b72a2f50-8c46-4b29-b9ea-f12458100444", + "metadata": { + "collapsed": false, + "name": "cell13", + "resultHeight": 46 + }, + "source": [ + "## Save the Transformed Dataset to a snowflake table\n", + "Customer may also save the processed image dataset and label dataset into snowflake easily" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4ad11e31-bc2a-45e6-8b6a-c3ea08b2ea9b", + "metadata": { + "language": "python", + "name": "cell8", + "resultHeight": 0 + }, + "outputs": [], + "source": [ + "\n", + "from snowflake.ml.ray.datasink import SnowflakeTableDatasink\n", + "\n", + "table_to_save = \"RAY_DEMO_JAN21_IMAGE_DS\"\n", + "datasink = SnowflakeTableDatasink(\n", + " table_name=table_to_save,\n", + " database = \"ST_DB\",\n", + " schema = \"ST_SCHEMA\",\n", + " auto_create_table=True,\n", + " override=True,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "537c406f-a2f0-4c68-a82b-d06c40610130", + "metadata": { + "language": "python", + "name": "cell9", + "resultHeight": 41334 + }, + "outputs": [], + "source": [ + "processed_image_ds.drop_columns(cols=['image']).write_datasink(datasink, concurrency=4)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bf6e353f-4334-4f76-a1ac-8bcd96c8d6b8", + "metadata": { + "language": "sql", + "name": "cell35" + }, + "outputs": [], + "source": [ + "# sql cell\n", + "\n", + "# SELECT * FROM RAY_DEMO_JAN21_IMAGE_DS;" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0a5c702d-338c-47ac-a4f8-6afc208e1ba3", + "metadata": { + "language": "python", + "name": "cell36" + }, + "outputs": [], + "source": [ + "table_to_save = \"RAY_DEMO_JAN21_LABEL_DS\"\n", + "datasink = SnowflakeTableDatasink(\n", + " table_name=table_to_save,\n", + " database = \"ST_DB\",\n", + " schema = \"ST_SCHEMA\",\n", + " auto_create_table=True,\n", + " override=True,\n", + ")\n", + "processed_label_ds.write_datasink(datasink, concurrency=4)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "340d435d-57ce-461c-b72f-051e80d1ce55", + "metadata": { + "language": "sql", + "name": "cell38" + }, + "outputs": [], + "source": [ + "# sql cell\n", + "\n", + "#SELECT * FROM RAY_DEMO_JAN21_LABEL_DS;" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bab9e512-cab3-4f39-9b4e-be428df0aecb", + "metadata": { + "language": "python", + "name": "cell39" + }, + "outputs": [], + "source": [ + "table_to_save = \"RAY_DEMO_JAN21_COMINED_DS\"\n", + "datasink = SnowflakeTableDatasink(\n", + " table_name=table_to_save,\n", + " database = \"ST_DB\",\n", + " schema = \"ST_SCHEMA\",\n", + " auto_create_table=True,\n", + " override=True,\n", + ")\n", + "processed_label_ds.write_datasink(datasink, concurrency=4)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Streamlit Notebook", + "name": "streamlit" + }, + "lastEditStatus": { + "authorEmail": "", + "authorId": "2713708608032", + "authorName": "ADMIN", + "lastEditTime": 1737505394238, + "notebookId": "k7sl7jhmzxh3ifg4aa7v", + "sessionId": "79387567-f895-4d0a-99a1-da87d1489cc5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}