diff --git a/catalog-management/src/main/java/com/c12e/cortex/examples/catalog/RailedCommand.java b/catalog-management/src/main/java/com/c12e/cortex/examples/catalog/RailedCommand.java index a8924ea..f051473 100644 --- a/catalog-management/src/main/java/com/c12e/cortex/examples/catalog/RailedCommand.java +++ b/catalog-management/src/main/java/com/c12e/cortex/examples/catalog/RailedCommand.java @@ -257,24 +257,27 @@ public void onQueryStarted(QueryStartedEvent event) { } @Override - public void onQueryProgress(QueryProgressEvent event) { - logger.info("STREAMING LISTENER: Streaming Query in progress"); - if (event.progress().numInputRows() == 0) { - countBeforeStop--; - if (countBeforeStop == 0) { - logger.info("STREAMING LISTENER: Initiating Streaming Query stop"); - try { - sparkSession.sqlContext().streams().get(event.progress().id()).stop(); - countBeforeStop = 1L; - } catch (TimeoutException e) { - logger.error("STREAMING LISTENER: Timeout error in query", e); - } + public void onQueryIdle(QueryIdleEvent event) { + logger.info("STREAMING LISTENER: Streaming Query idle"); + countBeforeStop--; + if(countBeforeStop == 0){ + logger.info("STREAMING LISTENER: Initiating Streaming Query stop"); + try { + sparkSession.sqlContext().streams().get(event.id()).stop(); + } catch (TimeoutException e) { + logger.error("Timeout error in query", e); } + } else { + logger.info("STREAMING LISTENER: No processing occurred in last poll, stopping in {} poll intervals", countBeforeStop); } - logger.info(event.progress().prettyJson()); - logger.info("STREAMING LISTENER: No processing occurred in last poll, stopping in {} poll intervals", countBeforeStop); } + @Override + public void onQueryProgress(QueryProgressEvent event) { + logger.info("STREAMING LISTENER: Streaming Query in progress"); + logger.info(event.progress().prettyJson()); + } + @Override public void onQueryTerminated(QueryTerminatedEvent event) { logger.info("STREAMING LISTENER: onQueryTerminated"); diff --git a/datasource-streaming/src/main/java/com/c12e/cortex/examples/streaming/StreamingDataSource.java b/datasource-streaming/src/main/java/com/c12e/cortex/examples/streaming/StreamingDataSource.java index f56859d..e9a5f4f 100644 --- a/datasource-streaming/src/main/java/com/c12e/cortex/examples/streaming/StreamingDataSource.java +++ b/datasource-streaming/src/main/java/com/c12e/cortex/examples/streaming/StreamingDataSource.java @@ -105,21 +105,25 @@ public void onQueryStarted(QueryStartedEvent event) { } @Override - public void onQueryProgress(QueryProgressEvent event) { - logger.info("STREAMING LISTENER: Streaming Query in progress"); - if (event.progress().numInputRows() == 0) { - countBeforeStop--; - if(countBeforeStop == 0){ - logger.info("STREAMING LISTENER: Initiating Streaming Query stop"); - try { - sparkSession.sqlContext().streams().get(event.progress().id()).stop(); - } catch (TimeoutException e) { - logger.error("STREAMING LISTENER: Timeout error in query", e); - } + public void onQueryIdle(QueryIdleEvent event) { + logger.info("STREAMING LISTENER: Streaming Query idle"); + countBeforeStop--; + if(countBeforeStop == 0){ + logger.info("STREAMING LISTENER: Initiating Streaming Query stop"); + try { + sparkSession.sqlContext().streams().get(event.id()).stop(); + } catch (TimeoutException e) { + logger.error("Timeout error in query", e); } + } else { + logger.info("STREAMING LISTENER: No processing occurred in last poll, stopping in {} poll intervals", countBeforeStop); } + } + + @Override + public void onQueryProgress(QueryProgressEvent event) { + logger.info("STREAMING LISTENER: Streaming Query in progress"); logger.info(event.progress().prettyJson()); - logger.info("STREAMING LISTENER: No processing occurred in last poll, stopping in {} poll intervals", countBeforeStop); } @Override diff --git a/main-app/src/main/resources/python/submit_job.py b/main-app/src/main/resources/python/submit_job.py index 95d4aec..424aed4 100644 --- a/main-app/src/main/resources/python/submit_job.py +++ b/main-app/src/main/resources/python/submit_job.py @@ -33,7 +33,7 @@ def get_runtime_args(config): args.append(key) args.append("{}={}".format(y, s_val)) args.append('--py-files') - args.append(f'local:///opt/spark/jars/profiles-sdk-{os.environ["VERSION"]}.jar,local:///opt/spark/jars/delta-spark_2.12-3.0.0.jar') + args.append(f'local:///opt/spark/jars/profiles-sdk-{os.environ["VERSION"]}.jar,local:///opt/spark/jars/delta-spark_2.12-3.1.0.jar') args.append(pyspark_args['app_location']) for x in pyspark_args['app_command']: args.append(x)