Using PySpark

Spark and PySpark utilize a container that their developers call a Resilient Distributed Dataset (RDD) for storing and operating on data. The most important characteristic of Spark’s RDD is that it is immutable – once created, the data it contains cannot be updated. New RDDs can be created by transforming the data in another RDD, which is how analysis is done with Spark.

Save this file as job.py.

from pyspark import SparkConf, SparkContext
import sys

# This script takes two arguments, an input and output
if len(sys.argv) != 3:
  print('Usage: ' + sys.argv[0] + ' <in> <out>')
  sys.exit(1)

input = sys.argv[1]
output = sys.argv[2]

# Set up the configuration and job context
conf = SparkConf().setAppName('AnnualWordLength')
sc = SparkContext(conf=conf)


# Read in the dataset and immediately transform all the lines in arrays
data = sc.textFile(input).map(lambda line: line.split('\t'))

# Create the 'length' dataset as mentioned above. This is done using the next two variables, and the 'length' dataset ends up in 'yearlyLength'.
yearlyLengthAll = data.map(
    lambda arr: (int(arr[1]), float(len(arr[0])) * float(arr[2]))
)
yearlyLength = yearlyLengthAll.reduceByKey(lambda a, b: a + b)

# Create the 'words' dataset as mentioned above.
yearlyCount = data.map(
    lambda arr: (int(arr[1]), float(arr[2]))
).reduceByKey(
    lambda a, b: a + b
)

# Create the 'average_length' dataset as mentioned above.
yearlyAvg = yearlyLength.join(yearlyCount).map(
    lambda tup: (tup[0], tup[1][0] / tup[1][1])
)

# Save the results in the specified output directory.
yearlyAvg.saveAsTextFile(output)

# Finally, let Spark know that the job is done.
sc.stop()

This above script averages the lengths of words in the NGrams dataset by year. There are two main operations in the above code: ‘map’ and ‘reduceByKey’. ‘map’ applies a function to each RDD element and returns a new RDD containing the results. ‘reduceByKey’ applies a function to the group of values with the same key – for all keys – and returns an RDD with the result.

The job can be submitted by running:

spark-submit \
 --master yarn-client \
 --queue <your_queue> \
 --num-executors 35 \
 --executor-memory 5g \
 --executor-cores 4 \
 job.py /var/ngrams ngrams-out


hdfs dfs -cat ngrams-out/*

 

The only required arguments from the above job submission command are ‘–master yarn-client’ and ‘–queue <your_queue>’. The values passed to the other arguments may be modified in order to get better performance or conform to the limits of your queue.

*Note: If you want to use Python 3.5 instead of our default 2.7 in your pyspark job, simply run the following commands, and submit your job normally using your Python 3.5 code:

export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0

export PYSPARK_PYTHON=/sw/lsa/centos7/python-anaconda3/created-20170424/bin/python

 

In addition to writing a job and submitting it, Spark comes with an interactive Python console, which can be opened this way:

# Load the pyspark console 
pyspark --master yarn-client --queue <your_queue>

This interactive console can be used for prototyping or debugging.