Jupyter on Cavium

Fuse HDFS allows you use standard posix system commands with HDFS. This may be useful, for example, if you have a program that needs to use data that is stored in HDFS. 

To use Fuse HDFS, change directories to /hadoop-fuse/user/<your_uniqname>

Once in this directory, you can use commands on your HDFS files just as you would on any other files. For example, the ls command will list the contents of your HDFS home directory.

You could also run a Python or R program that uses a file in HDFS.

You can save the below file and run it as you would regularly run a python program to access an example data file we have available to all users in HDFS.

f = open("/hadoop-fuse/var/examples/romeojuliet.txt", "r")
data = f.read()
d = {}
for word in data.split(' '):
        if word in d:
                d[word] += 1
                d[word] = 1
for word, count in d.items():
        print word + str(count)

Logging In

To log in to the Cavium Hadoop cluster, you need a terminal.  Currently the cluster is only accessible via the command line.

If you are trying to log in from off campus, or using the MGuest wireless network, you have a couple of options:

    • Install VPN software on your computer
    • First ssh to login.itd.umich.edu, then ssh to cavium-thunderx.arc-ts.umich.edu from there.

Here’s what a login looks like using a terminal emulator:

Mac using terminal: Open terminal

Type: ssh -l uniqname cavium-thunderx.arc-ts.umich.edu [replacing your uniqname in the command]

Windows using PuTTY (http://www.chiark.greenend.org.uk/~sgtatham/putty/).

Launch Putty and enter cavium-thunderx.arc-ts.umich.edu as the host name then click open.

For both Mac and Windows:

At the “Enter a passcode or select one of the following options:” prompt, type the number of your preferred choice for Duo authentication.


To demonstrate Hive, below is a short tutorial. The tutorial uses the Google NGrams dataset, which is available in HDFS in /var/ngrams.

# Open the interactive hive console

# Create a table with the Google NGrams data in /var/ngrams
CREATE EXTERNAL TABLE ngrams_your-uniqname(ngram STRING, year INT, count BIGINT, volumes BIGINT)
LOCATION ‘/var/ngrams’;

# Look at the schema of the table
DESCRIBE ngrams_your-uniqname;

# Count the total number of rows (should be 1430731493)
SELECT COUNT(*) FROM ngrams_your-uniqname;

# Select the number of words, by year, that have only appeared in a single volume
SELECT year, COUNT(ngram) FROM ngrams_your-uniqname WHERE
volumes = 1
GROUP BY year;

# Optional: delete your ngrams table
DROP table ngrams_your-uniqname;

# Exit the Hive console

Streaming (Other Programming Methods)

It is also possible to write a job in any programming language, such as Python or C, that operates on tab-separated key-value pairs. The same example done above with Hive and Pig can also be written in Python and submitted as a Hadoop job using Hadoop Streaming. Submitting a job with Hadoop Streaming requires writing a mapper and a reducer. The mapper reads input line by line and generates key-value pairs for the reducer to “reduce” into some sort of sensible data. For our case, the mapper will read in lines and output the year as the key and a ‘1’ as the value if the ngram in the line it reads has only appeared in a single volume. The python code to do this is:

(Save this file as map.py)

#!/usr/bin/env python2.7
import fileinput
for line in fileinput.input():
 arr = line.split("\t")
    if int(arr[3]) == 1:
       print("\t".join([arr[1], '1']))
 except IndexError:
 except ValueError:


Now that the mapper has done this, the reduce merely needs to sum the values based on the key:

(Save this file as red.py)

#!/usr/bin/env python2.7

import fileinput

data = dict()

for line in fileinput.input():
  arr = line.split("\t")
  if arr[0] not in data.keys():
     data[arr[0]] = int(arr[1])
     data[arr[0]] = data[arr[0]] + int(arr[1])

for key in data:
 print("\t".join([key, str(data[key])]))


Submitting this streaming job can be done by running the below command:

 -Dmapreduce.job.queuename=<your_queue> \
 -input /var/ngrams/data \
 -output ngrams-out \
 -mapper map.py \
 -reducer red.py \
 -file map.py \
 -file red.py \
 -numReduceTasks 10

hdfs dfs -cat ngrams-out/* | tail -5

streaming outputhdfs dfs -rm -r -skipTrash /user/<your_uniqname>/ngrams-out


Pig is no longer available as part of our Hadoop software stack due to decisions of the upstream Hadoop project software maintainers.


Another way to run Hadoop jobs is through mrjob. Mrjob is useful for testing out smaller data on another system (such as your laptop), and later being able to run it on something larger, like a Hadoop cluster. To run an mrjob on your laptop, you can simply remove the “-r hadoop” from the command in the example we use here.

A classic example is a word count, taken from the official mrjob documentation here.

Save this file as mrjob_test.py.

"""The classic MapReduce job: count the frequency of words.
from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")

class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))

if __name__ == '__main__':

Then, run the following command:

python mrjob_test.py -r hadoop /etc/motd

You should receive an output with the word count of the file /etc/motd. You can also try this with any other file you have that contains text.

Introduction to Spark

Spark and PySpark utilize a container called Resilient Distributed Dataset (RDD) for storing and operating on data. The most important characteristic of Spark’s RDD is that it is immutable — once created, the data it contains cannot be updated. New RDDs can be created by transforming the data in another RDD, which is how analysis is done with Spark.

Using Spark’s native language, Scala, requires more setup than using PySpark. Some example Scala jobs, including the same example job in the PySpark documentation, can be found on this website. That Spark code has some trivial set up required to run a Spark job, and all of the actual logic is in the ‘run’ function.


Spark comes with an interactive Python console, which can be opened this way:

# Load the pyspark console 
pyspark --master yarn --queue <your_queue>

This interactive console can be used for prototyping or debugging, or just running simple jobs.

The following example runs a simple line count on a text file, as well as counts the number of instances of the word “words” in that textfile. You can use any text file you have for this example:

>>> textFile = sc.textFile("test.txt")
>>> textFile.count()
>>> textFile.first()
>>> textFile.filter(lambda line: "words" in line).count()


You can also submit a job using PySpark without using the interactive console.

Save this file as job.py.

from pyspark import SparkConf, SparkContext
import sys

# This script takes two arguments, an input and output
if len(sys.argv) != 3:
  print('Usage: ' + sys.argv[0] + ' <in> <out>')

input = sys.argv[1]
output = sys.argv[2]

# Set up the configuration and job context
conf = SparkConf().setAppName('AnnualWordLength')
sc = SparkContext(conf=conf)

# Read in the dataset and immediately transform all the lines in arrays
data = sc.textFile(input).map(lambda line: line.split('\t'))

# Create the 'length' dataset as mentioned above. This is done using the next two variables, and the 'length' dataset ends up in 'yearlyLength'.
yearlyLengthAll = data.map(
    lambda arr: (int(arr[1]), float(len(arr[0])) * float(arr[2]))
yearlyLength = yearlyLengthAll.reduceByKey(lambda a, b: a + b)

# Create the 'words' dataset as mentioned above.
yearlyCount = data.map(
    lambda arr: (int(arr[1]), float(arr[2]))
    lambda a, b: a + b

# Create the 'average_length' dataset as mentioned above.
yearlyAvg = yearlyLength.join(yearlyCount).map(
    lambda tup: (tup[0], tup[1][0] / tup[1][1])

# Save the results in the specified output directory.

# Finally, let Spark know that the job is done.

This above script averages the lengths of words in the NGrams dataset by year. There are two main operations in the above code: ‘map’ and ‘reduceByKey’. ‘map’ applies a function to each RDD element and returns a new RDD containing the results. ‘reduceByKey’ applies a function to the group of values with the same key – for all keys – and returns an RDD with the result.

The job can be submitted by running:

spark-submit \
 --master yarn \
 --num-executors 35 \
 --executor-memory 5g \
 --executor-cores 4 \
 job.py /var/ngrams/data ngrams-out

hdfs dfs -cat ngrams-out/*


The only required argument from the above job submission command is ‘–master yarn’. The values passed to the other arguments may be modified in order to get better performance or conform to the limits of your queue.