diff --git a/src/clojure/sparkling/core.clj b/src/clojure/sparkling/core.clj index c189207..a6e7414 100644 --- a/src/clojure/sparkling/core.clj +++ b/src/clojure/sparkling/core.clj @@ -28,6 +28,7 @@ [org.apache.spark.api.java JavaSparkContext StorageLevels JavaRDD JavaPairRDD JavaDoubleRDD] [org.apache.spark HashPartitioner Partitioner] + [org.apache.spark.sql SparkSession] [org.apache.spark.rdd PartitionwiseSampledRDD PartitionerAwareUnionRDD] [scala.collection JavaConversions] [scala.reflect ClassTag$])) @@ -741,6 +742,10 @@ so that the wrapped function returns a tuple [f(v),v]" (conf/app-name app-name))] (spark-context conf)))) +(defn spark-session + [^JavaSparkContext context] + (SparkSession. (.sc context))) + (defn local-spark-context [app-name] (let [conf (-> (conf/spark-conf) diff --git a/test/sparkling/core_test.clj b/test/sparkling/core_test.clj index ec67bd7..1f3d835 100644 --- a/test/sparkling/core_test.clj +++ b/test/sparkling/core_test.clj @@ -10,7 +10,21 @@ )) +(deftest spark-session-test + (s/with-context + c + (-> (conf/spark-conf) + (conf/set-sparkling-registrator) + (conf/set "spark.kryo.registrationRequired" "true") + (conf/master "local[*]") + (conf/app-name "api-test")) + (let [spark-session (s/spark-session c) + df (.toDF (.range spark-session 10) (into-array String ["number"]))] + (testing "checking SparkSession instance" + (is (instance? org.apache.spark.sql.SparkSession spark-session))) + (testing "checking DF instance" + (instance? org.apache.spark.sql.Dataset df))))) (deftest lookup (s/with-context