From 327e6d66c95d960a654137c82ec61a2a808335c3 Mon Sep 17 00:00:00 2001 From: Michael Criscolo Date: Mon, 4 Mar 2024 09:04:21 -0600 Subject: [PATCH 1/3] FAB-6528 update spark streaming query listener to use QueryIdleEvent --- .../streaming/StreamingDataSource.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/datasource-streaming/src/main/java/com/c12e/cortex/examples/streaming/StreamingDataSource.java b/datasource-streaming/src/main/java/com/c12e/cortex/examples/streaming/StreamingDataSource.java index f56859d..e9a5f4f 100644 --- a/datasource-streaming/src/main/java/com/c12e/cortex/examples/streaming/StreamingDataSource.java +++ b/datasource-streaming/src/main/java/com/c12e/cortex/examples/streaming/StreamingDataSource.java @@ -105,21 +105,25 @@ public void onQueryStarted(QueryStartedEvent event) { } @Override - public void onQueryProgress(QueryProgressEvent event) { - logger.info("STREAMING LISTENER: Streaming Query in progress"); - if (event.progress().numInputRows() == 0) { - countBeforeStop--; - if(countBeforeStop == 0){ - logger.info("STREAMING LISTENER: Initiating Streaming Query stop"); - try { - sparkSession.sqlContext().streams().get(event.progress().id()).stop(); - } catch (TimeoutException e) { - logger.error("STREAMING LISTENER: Timeout error in query", e); - } + public void onQueryIdle(QueryIdleEvent event) { + logger.info("STREAMING LISTENER: Streaming Query idle"); + countBeforeStop--; + if(countBeforeStop == 0){ + logger.info("STREAMING LISTENER: Initiating Streaming Query stop"); + try { + sparkSession.sqlContext().streams().get(event.id()).stop(); + } catch (TimeoutException e) { + logger.error("Timeout error in query", e); } + } else { + logger.info("STREAMING LISTENER: No processing occurred in last poll, stopping in {} poll intervals", countBeforeStop); } + } + + @Override + public void onQueryProgress(QueryProgressEvent event) { + logger.info("STREAMING LISTENER: Streaming Query in progress"); logger.info(event.progress().prettyJson()); - logger.info("STREAMING LISTENER: No processing occurred in last poll, stopping in {} poll intervals", countBeforeStop); } @Override From 0143cd2aa4061ea79314d6ddc4a054cc13e32d5c Mon Sep 17 00:00:00 2001 From: Michael Criscolo Date: Mon, 4 Mar 2024 09:07:51 -0600 Subject: [PATCH 2/3] FAB-6528 update spark streaming query listener to use QueryIdleEvent --- .../examples/catalog/RailedCommand.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/catalog-management/src/main/java/com/c12e/cortex/examples/catalog/RailedCommand.java b/catalog-management/src/main/java/com/c12e/cortex/examples/catalog/RailedCommand.java index a8924ea..f051473 100644 --- a/catalog-management/src/main/java/com/c12e/cortex/examples/catalog/RailedCommand.java +++ b/catalog-management/src/main/java/com/c12e/cortex/examples/catalog/RailedCommand.java @@ -257,24 +257,27 @@ public void onQueryStarted(QueryStartedEvent event) { } @Override - public void onQueryProgress(QueryProgressEvent event) { - logger.info("STREAMING LISTENER: Streaming Query in progress"); - if (event.progress().numInputRows() == 0) { - countBeforeStop--; - if (countBeforeStop == 0) { - logger.info("STREAMING LISTENER: Initiating Streaming Query stop"); - try { - sparkSession.sqlContext().streams().get(event.progress().id()).stop(); - countBeforeStop = 1L; - } catch (TimeoutException e) { - logger.error("STREAMING LISTENER: Timeout error in query", e); - } + public void onQueryIdle(QueryIdleEvent event) { + logger.info("STREAMING LISTENER: Streaming Query idle"); + countBeforeStop--; + if(countBeforeStop == 0){ + logger.info("STREAMING LISTENER: Initiating Streaming Query stop"); + try { + sparkSession.sqlContext().streams().get(event.id()).stop(); + } catch (TimeoutException e) { + logger.error("Timeout error in query", e); } + } else { + logger.info("STREAMING LISTENER: No processing occurred in last poll, stopping in {} poll intervals", countBeforeStop); } - logger.info(event.progress().prettyJson()); - logger.info("STREAMING LISTENER: No processing occurred in last poll, stopping in {} poll intervals", countBeforeStop); } + @Override + public void onQueryProgress(QueryProgressEvent event) { + logger.info("STREAMING LISTENER: Streaming Query in progress"); + logger.info(event.progress().prettyJson()); + } + @Override public void onQueryTerminated(QueryTerminatedEvent event) { logger.info("STREAMING LISTENER: onQueryTerminated"); From b73e04302120120f1b9a19398894716d7e376c33 Mon Sep 17 00:00:00 2001 From: Michael Criscolo Date: Mon, 12 Feb 2024 10:43:24 -0600 Subject: [PATCH 3/3] FAB-6498 Delta Lake 3.1.0 upgrade (cherry picked from commit 974b34850a708d5622f4fbe65ca247fa8400c337) --- main-app/src/main/resources/python/submit_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main-app/src/main/resources/python/submit_job.py b/main-app/src/main/resources/python/submit_job.py index 95d4aec..424aed4 100644 --- a/main-app/src/main/resources/python/submit_job.py +++ b/main-app/src/main/resources/python/submit_job.py @@ -33,7 +33,7 @@ def get_runtime_args(config): args.append(key) args.append("{}={}".format(y, s_val)) args.append('--py-files') - args.append(f'local:///opt/spark/jars/profiles-sdk-{os.environ["VERSION"]}.jar,local:///opt/spark/jars/delta-spark_2.12-3.0.0.jar') + args.append(f'local:///opt/spark/jars/profiles-sdk-{os.environ["VERSION"]}.jar,local:///opt/spark/jars/delta-spark_2.12-3.1.0.jar') args.append(pyspark_args['app_location']) for x in pyspark_args['app_command']: args.append(x)