Using Hadoop

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models.

To search this user guide, use the command + f keyboard shortcut.

Getting Started With Hadoop

Overview

Hadoop is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures. (From hadoop.apache.org)

Flux Hadoop is a technology preview and available at no cost. It may have less technical support than the other Flux services. The Flux Hadoop cluster consists of 12 nodes offering 100TB of HDFS space, and is based on the Kerberos-enabled Hortonworks Data Platform 2.6.3.0. Spark 1 and 2.x are available (and SparkR), Hive2 with Hive on Tez, as well as Anaconda Python 2 and 3.

The software available is:

Back To Top

Creating an Account

Using Flux Hadoop requires a Flux user account but does not require an active Flux allocation. Information on getting a Flux account can be found on the Flux User Guide page. You only need to pay attention to steps 1, 2, and 4 in order to use Flux-Hadoop. Please note that in step 4 rather than logging into flux-login.arc-ts.umich.edu, you need to log into flux-hadoop-login.arc-ts.umich.eduThis video will help teach you some basic Linux navigation commands if needed.

Back To Top

Hadoop and HDFS Basics

Using Hadoop and HDFS

Hadoop consists of two components; HDFS, a filesystem built for high read speeds, and YARN, a resource manager. HDFS is not a POSIX filesystem, so normal command line tools like “cp” and “mv” will not work. Most of the common tools have been reimplemented for HDFS and can be run using the “hdfs dfs” command. All data must be in HDFS for jobs to be able to read it.

Here are a few basic commands:

# List the contents of your HDFS home directory
hdfs dfs -ls

# Copy local file data.csv to your HDFS home directory
hdfs dfs -put data.csv data.csv

# Copy HDFS file data.csv back to your local home directory
hdfs dfs -get data.csv data2.csv

A complete reference of HDFS commands can be found on the Apache website.

Back To Top

Understanding MapReduce

Writing Hadoop MapReduce code in Java is the lowest level way to program against a Hadoop cluster. Hadoop’s libraries do not contain any abstractions, like Spark RDDs or a Hive or Pig-like higher level language. All code must implement the MapReduce paradigm.

This video provides a great introduction to MapReduce. This documentation provides a written explanation and an example.

Back To Top

Spark

Introduction to Spark

Spark and PySpark utilize a container called Resilient Distributed Dataset (RDD) for storing and operating on data. The most important characteristic of Spark’s RDD is that it is immutable — once created, the data it contains cannot be updated. New RDDs can be created by transforming the data in another RDD, which is how analysis is done with Spark.

Using Spark’s native language, Scala, requires more setup than using PySpark. Some example Scala jobs, including the same example job in the PySpark documentation, can be found on this website. That Spark code has some trivial set up required to run a Spark job, and all of the actual logic is in the ‘run’ function.

On our cluster, Spark 2 is the default. If you would like to use Spark 1, you need to run the following command:

export SPARK_MAJOR_VERSION=1

Back To Top

PySpark

Spark comes with an interactive Python console, which can be opened this way:

# Load the pyspark console 
pyspark --master yarn

This interactive console can be used for prototyping or debugging, or just running simple jobs.

The following example runs a simple line count on a text file, as well as counts the number of instances of the word “words” in that textfile. You can use any text file you have for this example:

>>> textFile = sc.textFile("test.txt")
>>> textFile.count()
>>> textFile.first()
>>> textFile.filter(lambda line: "words" in line).count()

 

You can also submit a job using PySpark without using the interactive console.

Save this file as job.py.

from pyspark import SparkConf, SparkContext
import sys

# This script takes two arguments, an input and output
if len(sys.argv) != 3:
  print('Usage: ' + sys.argv[0] + ' <in> <out>')
  sys.exit(1)

input = sys.argv[1]
output = sys.argv[2]

# Set up the configuration and job context
conf = SparkConf().setAppName('AnnualWordLength')
sc = SparkContext(conf=conf)


# Read in the dataset and immediately transform all the lines in arrays
data = sc.textFile(input).map(lambda line: line.split('\t'))

# Create the 'length' dataset as mentioned above. This is done using the next two variables, and the 'length' dataset ends up in 'yearlyLength'.
yearlyLengthAll = data.map(
    lambda arr: (int(arr[1]), float(len(arr[0])) * float(arr[2]))
)
yearlyLength = yearlyLengthAll.reduceByKey(lambda a, b: a + b)

# Create the 'words' dataset as mentioned above.
yearlyCount = data.map(
    lambda arr: (int(arr[1]), float(arr[2]))
).reduceByKey(
    lambda a, b: a + b
)

# Create the 'average_length' dataset as mentioned above.
yearlyAvg = yearlyLength.join(yearlyCount).map(
    lambda tup: (tup[0], tup[1][0] / tup[1][1])
)

# Save the results in the specified output directory.
yearlyAvg.saveAsTextFile(output)

# Finally, let Spark know that the job is done.
sc.stop()

This above script averages the lengths of words in the NGrams dataset by year. There are two main operations in the above code: ‘map’ and ‘reduceByKey’. ‘map’ applies a function to each RDD element and returns a new RDD containing the results. ‘reduceByKey’ applies a function to the group of values with the same key – for all keys – and returns an RDD with the result.

The job can be submitted by running:

spark-submit \
 --master yarn \
 --num-executors 35 \
 --executor-memory 5g \
 --executor-cores 4 \
 job.py /var/ngrams ngrams-out


hdfs dfs -cat ngrams-out/*

 

The only required argument from the above job submission command is ‘–master yarn-client’. The values passed to the other arguments may be modified in order to get better performance or conform to the limits of your queue.

*Note: Our default Python is Anaconda 2-5.0.1. If you would like to use Anaconda 3-5.0.1 for your PySpark job, run the following command:

export PYSPARK_PYTHON=/sw/dsi/centos7/x86-64/Anaconda3-5.0.1/bin/python

Back To Top

Spark Shell

Spark has an easy-to-use interactive shell that can be used to learn API and also analyze data interactively. Below is a simple example written in Scala. You can use any text file that you have:

spark-shell --master yarn
scala> val textFile = spark.read.textFile("test.txt")
scala> textFile.count()
scala> textFile.first()
//Count how many lines contain the word "words"
//You can replace "words" with any word you'd like
scala> textFile.filter(line => line.contains("words")).count()

Back To Top

Spark Submit

The following is a simple example of submitting a Spark job that uses an existing jar all users have access to. It estimates Pi, and the number at the end is the number of iterations it uses (more iterations = more accurate).

export SPARK_MAJOR_VERSION=2
cd /usr/hdp/current/spark2-client
spark-submit \
   --class org.apache.spark.examples.SparkPi \
   --master yarn \
examples/jars/spark-examples*.jar 10

Gradle is a popular build tool for Java and Scala. The following example is useful if you may be getting code from bitbucket, github, etc. This code can be downloaded and built by logging on to flux-hadoop-login and running:

git clone https://bitbucket.org/umarcts/spark-examples
cd spark-examples
./gradlew jar

The last command, “./gradlew jar”, will download all dependencies, compile the code, run tests, and package all of the code into a Java ARchive (JAR). This JAR is submitted to the cluster to run a job. For example, the AverageNGramLength job can be launched by running:

spark-submit \
   --class com.alectenharmsel.examples.spark.AverageNGramLength \
   --master yarn \
   --executor-memory 3g \
   --num-executors 35 \
 build/libs/spark-examples-*-all.jar /var/ngrams ngrams-out

The output will be located in your home directory in a directory called ‘ngrams-out’, and can be viewed by running:

hdfs dfs -cat ngrams-out/* | tail -5

The output should look like this:

spark output

Back To Top

Parquet Files

If you’re familiar with Spark, you know that a dataframe is essentially a data structure that contains “tabular” data in memory. That is, it consists of rows and columns of data that can, for example, store the results of an SQL-style query. Dataframes can be saved into HDFS as Parquet files. Parquet files not only preserve the schema information of the dataframe, but will also compress the data when it gets written into HDFS. This means that the saved file will take up less space in HDFS and it will load faster if you read the data again later. Therefore, it is a useful storage format for data you may want to analyze multiple times.

The Pyspark example below uses Reddit data which is available to all Flux Hadoop users in HDFS ‘/var/reddit’. This data consists of information about all posts made on the popular website Reddit, including their score, subreddit, text body, author, all of which can make for interesting data analysis.

#First, launch the pyspark shell

pyspark --master yarn-client --num-executors 35 --executor-cores 4 --executor-memory 5g

#Load the reddit data into a dataframe

>>> reddit = sqlContext.read.json("/var/reddit/RS_2016-0*")

#Set compression type to snappy

>>> sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

#Write data into a parquet file - this example puts it into your HDFS home directory as “reddit.parquet”

>>> reddit.write.parquet("reddit.parquet")

#Create a new dataframe from parquet file 

>>> parquetFile = sqlContext.read.parquet("reddit.parquet")

#Register dataframe as a SQL temporary table

>>> parquetFile.registerTempTable(“reddit_table")

#Query the table

#Can really be any query, but this query will find some of the more highly rated posts

>>> ask = sqlContext.sql(“SELECT title FROM reddit_table WHERE score > 1000 and subreddit = ‘AskReddit’”)

#Since we created the dataframe “ask” with the previous query, we can write it out to HDFS as a parquet file so it can be accessed again later

>>> ask.write.parquet(“ask.parquet”)

#Exit the pyspark console - you’ll view the contents of your parquet file after

>>> exit()

 

To view the contents of your Parquet file, use Parquet tools. Parquet tools is a command line tool that aids in the inspection of Parquet files, such as viewing its contents or its schema.

#view the output

hadoop parquet.tools.Main cat ask.parquet

#view the schema; in this case, just the “title” of the askreddit thread

hadoop parquet.tools.Main schema ask.parquet

Back To Top

Other Common Hadoop Tools

Hive

To demonstrate Hive, below is a short tutorial. The tutorial uses the Google NGrams dataset, which is available in HDFS in /var/ngrams.

# Open the interactive hive console
hive --hiveconf mapreduce.job

# Create a table with the Google NGrams data in /var/ngrams
CREATE EXTERNAL TABLE ngrams(ngram STRING, year INT, count 
BIGINT,
     volumes BIGINT)
     ROW FORMAT DELIMITED
     FIELDS TERMINATED BY '\t'
     STORED AS TEXTFILE
     LOCATION '/var/ngrams';

# Look at the schema of the table
DESCRIBE ngrams;

# Count the total number of rows (should be 1201784959)
SELECT COUNT(*) FROM ngrams;

# Select the number of words, by year, that have only appeared in a single volume
SELECT year, COUNT(ngram) FROM ngrams WHERE 
volumes = 1
GROUP BY year;

# Optional: delete your ngrams table
DROP table ngrams;

# Exit the Hive console
QUIT;

The last few lines of output should look something like this:

hive output

More information can be found on the Apache website.

Back To Top

Streaming (Other Programming Methods)

It is also possible to write a job in any programming language, such as Python or C, that operates on tab-separated key-value pairs. The same example done above with Hive and Pig can also be written in Python and submitted as a Hadoop job using Hadoop Streaming. Submitting a job with Hadoop Streaming requires writing a mapper and a reducer. The mapper reads input line by line and generates key-value pairs for the reducer to “reduce” into some sort of sensible data. For our case, the mapper will read in lines and output the year as the key and a ‘1’ as the value if the ngram in the line it reads has only appeared in a single volume. The python code to do this is:

(Save this file as map.py)

#!/usr/bin/env python2.7
import fileinput
for line in fileinput.input():
 arr = line.split("\t")
 try:
    if int(arr[3]) == 1:
       print("\t".join([arr[1], '1']))
 except IndexError:
       pass
 except ValueError:
       pass

 

Now that the mapper has done this, the reduce merely needs to sum the values based on the key:

(Save this file as red.py)

#!/usr/bin/env python2.7

import fileinput

data = dict()

for line in fileinput.input():
  arr = line.split("\t")
  if arr[0] not in data.keys():
     data[arr[0]] = int(arr[1])
  else:
     data[arr[0]] = data[arr[0]] + int(arr[1])

for key in data:
 print("\t".join([key, str(data[key])]))

 

Submitting this streaming job can be done by running the below command:

yarn jar $HADOOP_STREAMING \
 -Dmapreduce.job \
 -input /var/ngrams \
 -output ngrams-out \
 -mapper map.py \
 -reducer red.py \
 -file map.py \
 -file red.py \
 -numReduceTasks 10


hdfs dfs -cat ngrams-out/* | tail -5

streaming output
hdfs dfs -rm -r -skipTrash /user/<your_uniqname>/ngrams-out

Back To Top

Pig

Pig is similar to Hive and can do the same thing. The Pig code to do this is a little bit longer due to its design. However, writing long Pig code is generally easier that writing multiple SQL queries that that chain together, since Pig’s language, PigLatin, allows for variables and other high-level constructs.

# Open the interactive pig console
pig -Dmapreduce.job

# Load the data
ngrams = LOAD '/var/ngrams' USING PigStorage('\t') AS 
(ngram:chararray,
year:int, count:long, volumes:long);

# Look at the schema of the ngrams variable
describe ngrams;

# Count the total number of rows (should be 1201784959)
ngrp = GROUP ngrams ALL;
count = FOREACH ngrp GENERATE COUNT(ngrams);
DUMP count;

# Select the number of words, by year, that have only appeared in a single volume
one_volume = FILTER ngrams BY volumes == 1;
by_year = GROUP one_volume BY year;
yearly_count = FOREACH by_year GENERATE group, COUNT(one_volume);
DUMP yearly_count;

The last few lines of output should look like this:

pig output

More information on Pig can be found on the Apache website.

Back To Top

Policies

Order Service

Using the Flux Hadoop environment requires a user account (available at no cost), but currently does not require a Flux allocation.

To order:

Email hpc-support@umich.edu.

For more information: data-science-support@umich.edu.

Related Event

February 23 @ 1:00 pm - 5:00 pm

Advanced batch computing on the Flux cluster

Overview This course will cover some more advanced topics in cluster computing on the U-M Flux Cluster. Topics to be covered include a review of common parallel programming models and…

February 28 @ 2:00 pm - 5:00 pm

Fourier transform and its applications in data analysis

Spectral decomposition of time series (1-D) and image (2-D) data is a commonly used technique across various disciplines that use sensors for data collection. Fourier analysis is the foundation of…

March 6 @ 1:00 pm - 4:00 pm

Introduction to the Flux cluster and batch computing

Overview This workshop will provide a brief overview of the components of the Flux Cluster. The main body of the workshop will cover the resource manager and scheduler, creating submissions…

March 20 @ 2:00 pm - 4:00 pm

 Deep Neural Networks with TensorFlow: A Quick Start Introduction

Deep Neural Networks (DNNs) are used as a machine learning method for both regression and classification problems. TensorFlow is a popular software library that is often used to construct and train DNNs. In this workshop,…