Streaming (Other Programming Methods)

By | | No Comments

It is also possible to write a job in any programming language, such as Python or C, that operates on tab-separated key-value pairs. The same example done above with Hive and Pig can also be written in Python and submitted as a Hadoop job using Hadoop Streaming. Submitting a job with Hadoop Streaming requires writing a mapper and a reducer. The mapper reads input line by line and generates key-value pairs for the reducer to “reduce” into some sort of sensible data. For our case, the mapper will read in lines and output the year as the key and a ‘1’ as the value if the ngram in the line it reads has only appeared in a single volume. The python code to do this is:

(Save this file as map.py)

#!/usr/bin/env python2.7
import fileinput
for line in fileinput.input():
 arr = line.split("\t")
 try:
    if int(arr[3]) == 1:
       print("\t".join([arr[1], '1']))
 except IndexError:
       pass
 except ValueError:
       pass

 

Now that the mapper has done this, the reduce merely needs to sum the values based on the key:

(Save this file as red.py)

#!/usr/bin/env python2.7

import fileinput

data = dict()

for line in fileinput.input():
  arr = line.split("\t")
  if arr[0] not in data.keys():
     data[arr[0]] = int(arr[1])
  else:
     data[arr[0]] = data[arr[0]] + int(arr[1])

for key in data:
 print("\t".join([key, str(data[key])]))

 

Submitting this streaming job can be done by running the below command:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
 -Dmapreduce.job.queuename=<your_queue> \
 -input /var/ngrams \
 -output ngrams-out \
 -mapper map.py \
 -reducer red.py \
 -file map.py \
 -file red.py \
 -numReduceTasks 10


hdfs dfs -cat ngrams-out/* | tail -5

streaming output
hdfs dfs -rm -r -skipTrash /user/<your_uniqname>/ngrams-out

Hive on Spark

By | | No Comments

Running Hive on Spark rather than MapReduce, its default, can be a faster alternative. The easiest way to do this is to set the hive execution engine to spark on your Hive session.

hive --hiveconf mapreduce.job.queuename=<your_queue>
set hive.execution.engine=spark;

Next, you’ll want to set the number of executor instances, executor cores, and executor memory. For a better idea of how to find the ideal settings for this, you can take a look at this documentation. Changing these settings is essential to getting the job to run quickly. An example of doing so would be:

set spark.executor.instances=15;
set spark.executor.cores=4;
set spark.executor.memory=3g;

Then, run a query just like you would for any Hive job (such as the examples earlier in this guide), and it should run (faster) with Spark instead.

For more information on how to use Hive on Spark and its advantages, check out this post.

Pig

By | | No Comments

Pig is similar to Hive and can do the same thing. The Pig code to do this is a little bit longer due to its design. However, writing long Pig code is generally easier that writing multiple SQL queries that that chain together, since Pig’s language, PigLatin, allows for variables and other high-level constructs.

# Open the interactive pig console
pig -Dmapreduce.job.queuename=<your_queue>

# Load the data
ngrams = LOAD '/var/ngrams' USING PigStorage('\t') AS 
(ngram:chararray,
year:int, count:long, volumes:long);

# Look at the schema of the ngrams variable
describe ngrams;

# Count the total number of rows (should be 1201784959)
ngrp = GROUP ngrams ALL;
count = FOREACH ngrp GENERATE COUNT(ngrams);
DUMP count;

# Select the number of words, by year, that have only appeared in a single volume
one_volume = FILTER ngrams BY volumes == 1;
by_year = GROUP one_volume BY year;
yearly_count = FOREACH by_year GENERATE group, COUNT(one_volume);
DUMP yearly_count;

The last few lines of output should look like this:

pig output

More information on Pig can be found on the Apache website.

Hive

By | | No Comments

To demonstrate Hive, below is a short tutorial. The tutorial uses the Google NGrams dataset, which is available in HDFS in /var/ngrams.

# Open the interactive hive console
hive --hiveconf mapreduce.job.queuename=<your_queue>

# Create a table with the Google NGrams data in /var/ngrams
CREATE EXTERNAL TABLE ngrams(ngram STRING, year INT, count 
BIGINT,
     volumes BIGINT)
     ROW FORMAT DELIMITED
     FIELDS TERMINATED BY '\t'
     STORED AS TEXTFILE
     LOCATION '/var/ngrams';

# Look at the schema of the table
DESCRIBE ngrams;

# Count the total number of rows (should be 1201784959)
SELECT COUNT(*) FROM ngrams;

# Select the number of words, by year, that have only appeared in a single volume
SELECT year, COUNT(ngram) FROM ngrams WHERE 
volumes = 1
GROUP BY year;

# Optional: delete your ngrams table
DROP table ngrams;

# Exit the Hive console
QUIT;

The last few lines of output should look something like this:

hive output

More information can be found on the Apache website.