Tuesday, August 15, 2017

SparkSQL


SparkSQL is a way for people to use SQL-like language to query their data with ease while taking advantage of the speed of Spark, a fast, general engine for data processing that runs over Hadoop. I wanted to test this out on a dataset I found from Walmart with their stores’ weekly sales numbers. I put the csv into our cluster’s HDFS (in /var/walmart) making it accessible to all Flux Hadoop users.


Spark has an SQLContext available by default as “sqlContext.sql”. SQLContext is a class that allows a user to run SQL-like queries in Spark. To use it, simply call it from the scala> prompt, and anything in parentheses will be interpreted as an SQL-style query.  First, I start up Spark in the normal way:


spark-shell --master yarn-client --queue <your_queue> --num-executors 35 --executor-cores 4 --executor-memory 5g


Then, I use sqlContext to create a table that describes the Walmart data. The “CREATE EXTERNAL TABLE” call will create a table in the metastore in my Flux home directory.

sqlContext.sql("CREATE EXTERNAL TABLE sales(store INT, dept INT, date STRING, weekly_sales BIGINT, IsHoliday BOOLEAN) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/var/walmart'")


Now that I have a table, I can run queries. I will start with a simple line count:

val count = sqlContext.sql(“SELECT COUNT(*) FROM sales”)


Note that no computations are actually done until you view the result of your query. This means that all the above command has done so far is create a dataframe called “count”. A dataframe is essentially a data structure that contains “tabular” data. That is, it consists of rows and columns of data that can, for example, store the results of an SQL-style query. In order to have Spark perform the line count, I would run:

count.show()


This does the computation and gives me the line count of the dataset (421571), and the dataframe “count” now contains the result of my query. Knowing that this simple test was successful, I can now move on to running a more complex query:

val holiday = sqlContext.sql("SELECT date, store, dept FROM sales WHERE weekly_sales = 0 AND IsHoliday = TRUE GROUP BY date, store, dept")


That command created the dataframe “holiday”.  The output of this query should show all the departments from every store that had 0 sales during a holiday.  Like before, Spark will compute once I ask for the result:

holiday.show()


Notice that by default Spark will populate "holiday" with the entire result, but only show the top 20 rows of output.  To view more output, you can specify the amount of rows you want to see as a parameter of show(). For example, since "holiday" has 31 rows, you would run:

holiday.show(31)


When Spark populates a dataframe, it stores data in memory. Because of this, subsequent queries will run faster. However, if you quit your spark-shell, your data is gone. Therefore, it may be a good idea to save the result in your HDFS home directory. Run this command:
result.javaRDD.saveAsTextFile("sales-results")


On our cluster, this would save the output in the directory “/user/<uniqname>/sales-results”.


As mentioned before, the table “sales” was created in the metastore in my home directory. The metastore is external to Spark and my current Spark session, so the tables will be available even after a spark-shell exits. You may leave tables in your metastore if you would like to query them again in the future. However, if not, it’s a good idea to clean up your tables in case you want to use the same name in the future. I decided that I did not want the table anymore, so I ran:
sqlContext.sql(“DROP table sales”)


This deleted the table. Then, I exited the shell (simply by typing “exit”). I wanted to view my whole output which was conveniently saved in my HDFS home directory. I was able to do so by running:
hdfs dfs -cat sales-results/*


Overall, I found that SparkSQL was a great way for me to quickly run SQL-style queries and easily save the results into my HDFS home directory.