Skip to content

Adding FailedRowProcessor support in soda-spark#114

Open
joaoluga wants to merge 7 commits intosodadata:mainfrom
joaoluga:adding_failedrowsprocessor
Open

Adding FailedRowProcessor support in soda-spark#114
joaoluga wants to merge 7 commits intosodadata:mainfrom
joaoluga:adding_failedrowsprocessor

Conversation

@joaoluga
Copy link
Copy Markdown

@joaoluga joaoluga commented Jun 1, 2022

Resolves #113

snippet:

from sodaspark import scan
from sodasql.scan.failed_rows_processor import FailedRowsProcessor
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


class InMemoryFailedRowProcessor(FailedRowsProcessor):

    def process(self, context):

        try:
            print(context)
        except Exception:
            raise Exception

        return {'message': 'All failed rows were printed in your terminal'}


data2 = [("1", 100),
         ("2", 200),
         ("3", None),
         ("4", 400),
         ]

schema = StructType([
    StructField("id", StringType(), True),
    StructField("number", IntegerType(), True)
    ])

df = spark.createDataFrame(data=data2, schema=schema)

scan_definition = """
table_name: my_table
metric_groups:
    - all
samples:
    table_limit: 5
    failed_limit: 5
tests:
    - row_count > 0
columns:
    number:
        tests:
            - duplicate_count == 0
            - missing_count == 0
"""

scan_result = scan.execute(scan_definition, df, failed_rows_processor=InMemoryFailedRowProcessor())

expected output:

{'sample_name': 'dataset', 'column_name': None, 'test_ids': None, 'sample_columns': [{'name': 'id', 'type': 'string'}, {'name': 'number', 'type': 'int'}], 'sample_rows': [['1', 100], ['2', 200], ['3', None], ['4', 400]], 'sample_description': 'my_table.sample', 'total_row_count': 4}
{'sample_name': 'missing', 'column_name': 'number', 'test_ids': ['{"column":"number","expression":"missing_count == 0"}'], 'sample_columns': [{'name': 'id', 'type': 'string'}, {'name': 'number', 'type': 'int'}], 'sample_rows': [['3', None]], 'sample_description': 'my_table.number.missing', 'total_row_count': 1}

joaolug added 3 commits June 1, 2022 13:55
- Allowing users to use the FailedRowsProcessor feature by passing it in the execute method
@JCZuurmond
Copy link
Copy Markdown
Contributor

Thanks @joaoluga for the PR. Could you add a test for the failed rows processor?

@joaoluga
Copy link
Copy Markdown
Author

joaoluga commented Jun 22, 2022

Thanks @joaoluga for the PR. Could you add a test for the failed rows processor?

hey @JCZuurmond, sorry for taking so long. Yes and I've just included the tests for the failed row processor just now. 😁

Copy link
Copy Markdown
Contributor

@JCZuurmond JCZuurmond left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple suggestions

class InMemoryFailedRowProcessor(FailedRowsProcessor):
def process(self, context: dict) -> dict:

try:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This try except does not do anything, right?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are correct. I was just following the pattern I found in this doc 😅 Just changed the except to throw the exception 🤔

Copy link
Copy Markdown
Contributor

@JCZuurmond JCZuurmond left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @vijaykiran could you give a final go?

@vijaykiran
Copy link
Copy Markdown
Contributor

Thank you @joaoluga and @JCZuurmond - LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement failed rows processor

3 participants