Using PySpark

By March 20, 2016

Spark and PySpark utilize a container that their developers call a Resilient Distributed Dataset (RDD) for storing and operating on data. The most important characteristic of Spark’s RDD is that it is immutable – once created, the data it contains cannot be updated. New RDDs can be created by transforming the data in another RDD, which is how analysis is done with Spark.

Save this file as job.py.

from pyspark import SparkConf, SparkContext
import sys

# This script takes two arguments, an input and output
if len(sys.argv) != 3:
  print('Usage: ' + sys.argv[0] + ' <in> <out>')
  sys.exit(1)

input = sys.argv[1]
output = sys.argv[2]

# Set up the configuration and job context
conf = SparkConf().setAppName('AnnualWordLength')
sc = SparkContext(conf=conf)


# Read in the dataset and immediately transform all the lines in arrays
data = sc.textFile(input).map(lambda line: line.split('\t'))

# Create the 'length' dataset as mentioned above. This is done using the next two variables, and the 'length' dataset ends up in 'yearlyLength'.
yearlyLengthAll = data.map(
    lambda arr: (int(arr[1]), float(len(arr[0])) * float(arr[2]))
)
yearlyLength = yearlyLengthAll.reduceByKey(lambda a, b: a + b)

# Create the 'words' dataset as mentioned above.
yearlyCount = data.map(
    lambda arr: (int(arr[1]), float(arr[2]))
).reduceByKey(
    lambda a, b: a + b
)

# Create the 'average_length' dataset as mentioned above.
yearlyAvg = yearlyLength.join(yearlyCount).map(
    lambda tup: (tup[0], tup[1][0] / tup[1][1])
)

# Save the results in the specified output directory.
yearlyAvg.saveAsTextFile(output)

# Finally, let Spark know that the job is done.
sc.stop()

This above script averages the lengths of words in the NGrams dataset by year. There are two main operations in the above code: ‘map’ and ‘reduceByKey’. ‘map’ applies a function to each RDD element and returns a new RDD containing the results. ‘reduceByKey’ applies a function to the group of values with the same key – for all keys – and returns an RDD with the result.

The job can be submitted by running:

spark-submit \
 --master yarn-client \
 --queue <your_queue> \
 --num-executors 10 \
 --executor-memory 12g \
 --executor-cores 2 \
 job.py /var/ngrams ngrams-out
hdfs dfs -cat ngrams-out/*

The only required arguments from the above job submission command are ‘–master yarn-client’ and ‘–queue <your_queue>’. The values passed to the other arguments may be modified in order to get better performance or conform to the limits of your queue.

In addition to writing a job and submitting it, Spark comes with an interactive Python console, which can be opened this way:

# Load the pyspark console 
pyspark --master yarn-client --queue <your_queue>

# Load the pyspark console with ipython as the interpreter
module load hadoop-utils
ipyspark --master yarn-client --queue <your_queue>

This interactive console can be used for prototyping or debugging.