In this walkthrough, we will cover how we can use Spark SQL to do Cassandra Data Operations. We will be using Spark Shell instead of the Spark SQL Shell due to the amount logs that come with a command in the Spark SQL Shell. We will also be using the Catalog method from DataStax's Spark Cassandra Connector.
- Docker
- Spark 3.0.X
git clone https://github.com/Anant/example-cassandra-spark-sql.gitcd example-cassandra-spark-sqldocker run --name cassandra -p 9042:9042 -d -v "$(pwd)":/example-cassandra-spark-sql cassandra:latestdocker exec -it cassandra cqlshsource '/example-cassandra-spark-sql/setup.cql'./sbin/start-master.shYou can find your Spark master URL at localhost:8080
./sbin/start-slave.sh <master-url>./bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 \
--master <spark-master-url> \
--conf spark.cassandra.connection.host=127.0.0.1 \
--conf spark.cassandra.connection.port=9042 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \
--conf spark.sql.catalog.cassandra=com.datastax.spark.connector.datasource.CassandraCatalogWe will cover some basic Cassandra Schema commands we can do with Spark SQL. More can this can be found here
spark.sql("CREATE TABLE cassandra.demo.testTable (key_1 Int, key_2 Int, key_3 Int, cc1 STRING, cc2 String, cc3 String, value String) USING cassandra PARTITIONED BY (key_1, key_2, key_3) TBLPROPERTIES (clustering_key='cc1.asc, cc2.desc, cc3.asc', compaction='{class=SizeTieredCompactionStrategy,bucket_high=1001}')")spark.sql("ALTER TABLE cassandra.demo.testTable ADD COLUMNS (newCol INT)")spark.sql("describe table cassandra.demo.testTable").showspark.sql("DROP TABLE cassandra.demo.testTable")spark.sql("SHOW TABLES from cassandra.demo").showPerform a basic read
spark.sql("SELECT * from cassandra.demo.previous_employees_by_job_title").showWrite data to a table from another table and use SQL functions
spark.sql("INSERT INTO cassandra.demo.days_worked_by_previous_employees_by_job_title SELECT job_title, employee_id, employee_name, abs(datediff(last_day, first_day)) as number_of_days_worked from cassandra.demo.previous_employees_by_job_title")Join data from two tables together
spark.sql("""
SELECT cassandra.demo.previous_employees_by_job_title.job_title, cassandra.demo.previous_employees_by_job_title.employee_name, cassandra.demo.previous_employees_by_job_title.first_day, cassandra.demo.previous_employees_by_job_title.last_day, cassandra.demo.days_worked_by_previous_employees_by_job_title.number_of_days_worked 
FROM cassandra.demo.previous_employees_by_job_title 
LEFT JOIN cassandra.demo.days_worked_by_previous_employees_by_job_title ON cassandra.demo.previous_employees_by_job_title.employee_id=cassandra.demo.days_worked_by_previous_employees_by_job_title.employee_id 
WHERE cassandra.demo.days_worked_by_previous_employees_by_job_title.job_title='Dentist'
""").showTRUNCATE TABLE demo.previous_employees_by_job_title ; TRUNCATE TABLE demo.days_worked_by_previous_employees_by_job_title ; ./bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 \
--master spark://arpans-mbp.lan:7077 \
--conf spark.cassandra.connection.host=127.0.0.1 \
--conf spark.cassandra.connection.port=9042 \
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions \
--conf spark.sql.catalog.cassandra=com.datastax.spark.connector.datasource.CassandraCatalog \
--files /path/to/example-cassandra-spark-sql/previous_employees_by_job_title.csv val csv_df = spark.read.format("csv").option("header", "true").load("/path/to/example-cassandra-spark-sql/previous_employees_by_job_title.csv")csv_df.createOrReplaceTempView("source")spark.sql("INSERT INTO cassandra.demo.previous_employees_by_job_title SELECT * from source")And that will wrap up the walkthrough on using Spark SQL for basic Cassandra Data Operations. If you want to watch a live recording of the walkthrough, be sure to check out the YouTube video linked below!