Using Hadoop

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models.

To search this user guide, use the command + f keyboard shortcut.

Overview

Hadoop is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures. (From hadoop.apache.org)

Flux Hadoop is a technology preview and available at no cost. It may have less technical support than the other Flux services. The Flux Hadoop cluster consists of 12 nodes offering 100TB of HDFS space. The software available is:

Using Flux Hadoop requires a Flux user account but does not require an active Flux allocation. Information on getting a Flux account can be found on the Getting Started with Flux page. When applying for a Flux account, mention that you would like to use Hadoop and what queue name you would like. To be placed on an already existing queue, the owner/PI on the queue needs to send an email to hpc-support@umich.edu.

Hadoop consists of two components; HDFS, a filesystem built for high read speeds, and YARN, a resource manager. HDFS is not a POSIX filesystem, so normal command line tools like “cp” and “mv” will not work. Most of the common tools have been reimplemented for HDFS and can be run using the “hdfs dfs” command. All data must be in HDFS for jobs to be able to read it.

Here are a few basic commands:

# List the contents of your HDFS home directory
hdfs dfs -ls

# Copy local file data.csv to your HDFS home directory
hdfs dfs -put data.csv data.csv

# Copy HDFS file data.csv back to your local home directory
hdfs dfs -get data.csv data2.csv

A complete reference of HDFS commands can be found on the Apache website.

Available software

There are multiple tools available (listed at the top) to perform analysis. Hive transparently transforms SQL queries to jobs that run on the Hadoop cluster, allowing researchers already familiar with SQL to continue using it as their data grows beyond the capacity of a traditional database such as MySQL or PostgreSQL. Pig is similar to Hive, except that its language, PigLatin, is slightly more complicated than SQL. However, PigLatin can be used to express complicated or long transformations and queries that are impossible or difficult to express with SQL.

Hadoop jobs may also be directly written in any programming language. Writing a map-reduce job directly gives more control than using Hive or Pig, but requires knowing or learning a programming language. Working on unstructured data generally requires writing a map-reduce job instead of using Hive or Pig, as those tools can’t work with unstructured data.

Spark and PySpark are completely separate from Hadoop, even though the jobs are run on the Hadoop cluster on files that are stored in HDFS. Spark exposes a single data structure, the Resilient Distributed Dataset (RDD), in the Java, Scala, and Python programming languages. RDDs support multiple operations (map, reduce, filter, etc.), and are held in-memory when possible to increase the speed of the analysis. Spark is a good choice over a native Hadoop job or a Streaming job, as Spark contains a number of convenient functions that Hadoop does not have.

About This User Guide

There are a few things that are important to note about this user guide before you begin:

  • Make sure that you always replace “<your_queue>” with whatever the name of your queue is.
  • Any backslashes (‘\’) in a command indicate that it is actually one long command, but it is broken up simply for the sake of readability.
  • These examples all use the Google Ngrams Dataset. You may want to familiarize yourself with that in order to better understand what kind of data you are running these jobs on. Essentially, its schema is:
    1. word – the word of interest
    2. year – the year of the data
    3. count – the number of times the word has appeared in books this year
    4. volumes – the number of volumes the word has appear in for this year

Back To Top

Using Hadoop

Writing Hadoop MapReduce code in Java is the lowest level way to program against a Hadoop cluster. Hadoop’s libraries do not contain any abstractions, like Spark RDDs or a Hive or Pig-like higher level language. All code must implement the MapReduce paradigm.

Some example Java MapReduce code can be found in the ARC-TS Hadoop examples page. There is an example job – AverageNGramLength – to generate a yearly average of all words in Google’s
NGrams dataset that is in HDFS at `/var/ngrams`. This job can be built and launched from flux-login by running:

git clone https://bitbucket.org/umarcts/hadoop-examples.git
cd hadoop-examples
./gradlew jar
yarn jar build/libs/hadoop-examples-*-all.jar \
  com.alectenharmsel.examples.hadoop.AverageNGramLength \
  -Dmapreduce.job.queuename=<your_queue> \
  /var/ngrams ngrams-out

# List running jobs
yarn application -list

# Once it is done, view the output
hdfs dfs -cat ngrams-out/*

The result of the command will print all of the output to the terminal. This output can be redirected to a file and plotted with any plotting tool, such as R or MATLAB.

The last few lines of output should look like this:

hadoop output

To get those exact lines (if you don’t want to see all the output), the command is:

hdfs dfs -cat ngrams-out/* | tail -5

 

Please note that it can take around 1-5 minutes for everything to build and launch.

Afterwards, it’s smart to remove the directory your output is in, so that you can use the same name in future examples. To do this, run:

hdfs dfs -rm -r -skipTrash /user/<your_uniqname>/ngrams-out

Back To Top

Using Hive

To demonstrate Hive, below is a short tutorial. The tutorial uses the Google NGrams dataset, which is available in HDFS in /var/ngrams.

# Open the interactive hive console
hive --hiveconf mapreduce.job.queuename=<your_queue>

# Create a table with the Google NGrams data in /var/ngrams
CREATE EXTERNAL TABLE ngrams(ngram STRING, year INT, count 
BIGINT,
     volumes BIGINT)
     ROW FORMAT DELIMITED
     FIELDS TERMINATED BY '\t'
     STORED AS TEXTFILE
     LOCATION '/var/ngrams';

# Look at the schema of the table
DESCRIBE ngrams;

# Count the total number of rows (should be 1201784959)
SELECT COUNT(*) FROM ngrams;

# Select the number of words, by year, that have only appeared in a single volume
SELECT year, COUNT(ngram) FROM ngrams WHERE 
volumes = 1
GROUP BY year;

# Exit the Hive console
QUIT;

The last few lines of output should look something like this:

hive output

More information can be found on the Apache website.

Back To Top

Using Pig

Pig is similar to Hive and can do the same thing. The Pig code to do this is a little bit longer due to its design. However, writing long Pig code is generally easier that writing multiple SQL queries that that chain together, since Pig’s language, PigLatin, allows for variables and other high-level constructs.

# Open the interactive pig console
pig -Dmapreduce.job.queuename=<your_queue>

# Load the data
ngrams = LOAD '/var/ngrams' USING PigStorage('\t') AS 
(ngram:chararray,
year:int, count:long, volumes:long);

# Look at the schema of the ngrams variable
describe ngrams;

# Count the total number of rows (should be 1201784959)
ngrp = GROUP ngrams ALL;
count = FOREACH ngrp GENERATE COUNT(ngrams);
DUMP count;

# Select the number of words, by year, that have only appeared in a single volume
one_volume = FILTER ngrams BY volumes == 1;
by_year = GROUP one_volume BY year;
yearly_count = FOREACH by_year GENERATE group, COUNT(one_volume);
DUMP yearly_count;

The last few lines of output should look like this:

pig output

More information on Pig can be found on the Apache website.

Back To Top

Using Streaming (Other Programming Methods)

It is also possible to write a job in any programming language, such as Python or C, that operates on tab-separated key-value pairs. The same example done above with Hive and Pig can also be written in Python and submitted as a Hadoop job using Hadoop Streaming. Submitting a job with Hadoop Streaming requires writing a mapper and a reducer. The mapper reads input line by line and generates key-value pairs for the reducer to “reduce” into some sort of sensible data. For our case, the mapper will read in lines and output the year as the key and a ‘1’ as the value if the ngram in the line it reads has only appeared in a single volume. The python code to do this is:

(Save this file as map.py)

#!/usr/bin/env python2.7
import fileinput
for line in fileinput.input():
 arr = line.split("\t")
 try:
    if int(arr[3]) == 1:
       print("\t".join([arr[1], '1']))
 except IndexError:
       pass
 except ValueError:
       pass

 

Now that the mapper has done this, the reduce merely needs to sum the values based on the key:

(Save this file as red.py)

#!/usr/bin/env python2.7

import fileinput

data = dict()

for line in fileinput.input():
  arr = line.split("\t")
  if arr[0] not in data.keys():
     data[arr[0]] = int(arr[1])
  else:
     data[arr[0]] = data[arr[0]] + int(arr[1])

for key in data:
 print("\t".join([key, str(data[key])]))

 

Submitting this streaming job can be done by running the below command:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
 -Dmapreduce.job.queuename=<your_queue> \
 -input /var/ngrams \
 -output ngrams-out \
 -mapper map.py \
 -reducer red.py \
 -file map.py \
 -file red.py \
 -numReduceTasks 10

hdfs dfs -cat "ngrams-out/*"

hdfs dfs -cat "ngrams-out/*" | tail -5
streaming output

Back To Top

Using Spark

Spark and PySpark utilize a container called Resilient Distributed Dataset (RDD) for storing and operating on data. The most important characteristic of Spark’s RDD is that it is immutable — once created, the data it contains cannot be updated. New RDDs can be created by transforming the data in another RDD, which is how analysis is done with Spark.

Using Spark’s native language, Scala, requires more setup than using PySpark. Some example Scala jobs, including the same example job in the PySpark documentation, can be found on this website. That Spark code has some trivial set up required to run a Spark job, and all of the actual logic is in the ‘run’ function.

In addition to the example code in the Spark-examples repository is a gradle build file. Gradle is a popular build tool for Java and Scala. This code can be downloaded and built by logging on to flux-login and running:

git clone https://bitbucket.org/umarcts/spark-examples
cd spark-examples
./gradlew jar

The last command, “./gradlew jar”, will download all dependencies, compile the code, run tests, and package all of the code into a Java ARchive (JAR). This JAR is submitted to the cluster to run a job. For example, the AverageNGramLength job can be launched by running:

spark-submit \
   --class com.alectenharmsel.examples.spark.AverageNGramLength \
   --master yarn-client \
   --executor-memory 3g \
   --num-executors 5 \
   --queue <your_queue> \
 build/libs/spark-examples-*-all.jar /var/ngrams ngrams-out

The output will be located in your home directory in a directory called ‘ngrams-out’, and can be viewed by running:

hdfs dfs -cat ngrams-out/*

The last few lines of output should look like this:

spark output

You can also run this command to just get those lines:

hdfs dfs -cat ngrams-out/* | tail -5

 

Similar to PySpark’s interactive Python shell, Spark has an interactive Scala shell for prototyping and debugging. The Spark shell can be launched by running:

spark-shell --master yarn-client --queue <your_queue>

Back To Top

Using PySpark

Spark and PySpark utilize a container that their developers call a Resilient Distributed Dataset (RDD) for storing and operating on data. The most important characteristic of Spark’s RDD is that it is immutable – once created, the data it contains cannot be updated. New RDDs can be created by transforming the data in another RDD, which is how analysis is done with Spark.

Save this file as job.py.

from pyspark import SparkConf, SparkContext
import sys

# This script takes two arguments, an input and output
if len(sys.argv) != 3:
  print('Usage: ' + sys.argv[0] + ' <in> <out>')
  sys.exit(1)

input = sys.argv[1]
output = sys.argv[2]

# Set up the configuration and job context
conf = SparkConf().setAppName('AnnualWordLength')
sc = SparkContext(conf=conf)


# Read in the dataset and immediately transform all the lines in arrays
data = sc.textFile(input).map(lambda line: line.split('\t'))

# Create the 'length' dataset as mentioned above. This is done using the next two variables, and the 'length' dataset ends up in 'yearlyLength'.
yearlyLengthAll = data.map(
    lambda arr: (int(arr[1]), float(len(arr[0])) * float(arr[2]))
)
yearlyLength = yearlyLengthAll.reduceByKey(lambda a, b: a + b)

# Create the 'words' dataset as mentioned above.
yearlyCount = data.map(
    lambda arr: (int(arr[1]), float(arr[2]))
).reduceByKey(
    lambda a, b: a + b
)

# Create the 'average_length' dataset as mentioned above.
yearlyAvg = yearlyLength.join(yearlyCount).map(
    lambda tup: (tup[0], tup[1][0] / tup[1][1])
)

# Save the results in the specified output directory.
yearlyAvg.saveAsTextFile(output)

# Finally, let Spark know that the job is done.
sc.stop()

This above script averages the lengths of words in the NGrams dataset by year. There are two main operations in the above code: ‘map’ and ‘reduceByKey’. ‘map’ applies a function to each RDD element and returns a new RDD containing the results. ‘reduceByKey’ applies a function to the group of values with the same key – for all keys – and returns an RDD with the result.

The job can be submitted by running:

spark-submit \
 --master yarn-client \
 --queue <your_queue> \
 --num-executors 10 \
 --executor-memory 12g \
 --executor-cores 2 \
 job.py /var/ngrams ngrams-out
hdfs dfs -cat ngrams-out/*

The only required arguments from the above job submission command are ‘–master yarn-client’ and ‘–queue <your_queue>’. The values passed to the other arguments may be modified in order to get better performance or conform to the limits of your queue.

In addition to writing a job and submitting it, Spark comes with an interactive Python console, which can be opened this way:

# Load the pyspark console 
pyspark --master yarn-client --queue <your_queue>

# Load the pyspark console with ipython as the interpreter
module load hadoop-utils
ipyspark --master yarn-client --queue <your_queue>

This interactive console can be used for prototyping or debugging.

Back To Top

Using Hive on Spark

Running Hive on Spark rather than MapReduce, its default, can be a faster alternative. The easiest way to do this is to set the hive execution engine to spark on your Hive session.

hive --hiveconf mapreduce.job.queuename=<your_queue>
set hive.execution.engine=spark;

Next, you’ll want to set the number of executor instances, executor cores, and executor memory. For a better idea of how to find the ideal settings for this, you can take a look at this documentation. Changing these settings is essential to getting the job to run quickly. An example of doing so would be:

set spark.executor.instances=15;
set spark.executor.cores=4;
set spark.executor.memory=3g;

Then, run a query just like you would for any Hive job (such as the examples earlier in this guide), and it should run (faster) with Spark instead.

Back To Top

Order Service

Using the Flux Hadoop environment requires a user account (available at no cost), but currently does not require a Flux allocation.

To order:

Email hpc-support@umich.edu.

For more information: data-science-support@umich.edu.

Related Event

December 9 @ 3:00 pm - 4:00 pm

MICDE Seminar: Ann Almgren, Lawrence Berkeley National Lab

Bio: Ann S. Almgren is an applied mathematician who works as a staff scientist and acting group leader of the Center for Computational Sciences and Engineering at the Lawrence Berkeley National…

December 12 @ 1:00 pm - 5:00 pm

HPC User Meetup

Users of high performance computing resources are invited to meet ARC-TS HPC operators and support staff in person. There is not a set agenda; come at anytime and stay as long as…