|
Pig is an Apache library that interprets scripts written in a
language called Pig Latin and then runs them on a Hadoop cluster. The
Pig Latin language is a high level data transformation language that
allows you to concentrate on the data transformations you require
rather than begin concerned with individual map and reduce
functions.
You can find the Pig project home page
at http://hadoop.apache.org/pig
and the documentation for Pig Latin version 0.3 --- the version supported by
Amazon Elastic MapReduce ---
at http://hadoop.apache.org/pig/docs/r0.3.0/piglatin.html.
Section 1 - Setting up for SSH
If you are already familiar with Amazon EC2 and setting up SSH to
access Amazon EC2 instances then you can jump to Section 2.
1.1 - Setting up SSH on your Machine
To launch an interactive Pig job flow, you must have SSH set up on
your client PC. If you are using Linux/OSX then likely you already
have ssh installed and can type ssh on the command line.
On Microsoft Windows, if you are familiar with Linux, then you can
install Cygwin and use "ssh" from
the command line. Otherwise you can use
PuTTY,
which requires additional configuration (described below).
1.2 - Setting up an SSH Key
The next step is setting up an SSH key. You can do this using the
AWS Management Console.
- Go to http://console.aws.amazon.com/ec2/home and sign in.
- Click the "Amazon Elastic EC2" tab.
- Click the "Key Pairs" link.
- Click the "Create Key Pair" button.
- Enter a name and save the key file. Record this name and path you
will need it later.
- If you are using PuTTY, you will further have to transform this
key file into PuTTY format. For more information got to
http://docs.amazonwebservices.com/AmazonEC2/gsg/2007-01-19/putty.html and look under "Private Key Format."
Section 2 - Starting an Interactive Job Flow
2.1 - Starting an Interactive Job Flow from the Console
You are now ready to start your interactive pig job flow.
- Go to http://console.aws.amazon.com/elasticmapreduce/home and sign in.
- Click the "Amazon Elastic MapReduce" tab.
- Click the "Create New Job Flow" button.
Figure 2.1.1: New Job Flow Wizard
- In the "Job Flow Name" field type a name such as "Pig Interactive Job
Flow"
- In the "Type" field select "Pig Program", and then click "Continue".
The console displays the following dialog box.
Figure 2.1.2: Choosing Interactive Session
- Select "Start an Interactive Pig Session" and click "Continue".
The console displays the following dialog box.
Figure 2.1.3: Selecting Instance Configuration
- Set "Number of Instances" to "1" and set "Type of Instances" to "m1.small"
You are using only once instance because you are working on only a small amount of data.
- In the "Amazon EC2 Key Pair" chooser, select the name of the key
you created in Section 1, and click Continue.
The console displays the following confirmation screen.
Figure 2.1.4: Reviewing Job Flow Configuration
- Make sure everything looks good and then click "Create Job
Flow". This will start your job flow and show you a confirmation
dialog. Note that unlike other job flow created using the AWS
Management Console this job flow will not terminate until you
terminate manually. To terminate the job flow manually, select the
job flow and click the "Terminate" button.
2.2 - Waiting for the Job Flow to Start
On the main Elastic MapReduce console screen you should see the job
flow you just launched. Click it to get job flow details in the bottom
pane.
Figure 2.2.1: Waiting for JobFlow to Start
You will now be returned to the main Elastic MapReduce screen and
you should see the job flow you just created in the grid. After the
job flow you created has transitioned to waiting click on it and you should get
the bottom pane to show its details like this:
Figure 2.2.2: Job Flow is Ready
2.3 - SSH to the Master Instance
You now need to SSH to the master instance. On the Elastic
MapReduce console, note the value for Master Public DNS Name. This is
the instance you will SSH to.
If the Master Public DNS Name is blank then most likely the Job
Flow is still in the STARTING state. Wait for the job flow to
transition to WAITING and then click on the job flow to update the
detail pane. The Master Public DNS Name field should now be
populated.
If you are using the ssh command, to SSH into the master node, use
the following command. You should replace </path/...> and
<master...> with location of the keypair file you created in
Section 1.2 and name of the master node you noted in the console.
$ ssh -o "ServerAliveInterval 10" -i </path/to/saved/keypair/file.pem> hadoop@<master.public-dns-name.amazonaws.com>
For information about using PuTTY go to http://docs.amazonwebservices.com/AmazonEC2/gsg/2007-01-19/putty.html and follow the instructions under "SSH with PuTTY"
Section 3 - Accessing Pig when in Interactive Mode
3.1 - Starting an Interactive Pig Session in Local Mode
You should now be at the SSH prompt on the master instance. To
start using Pig interactively, launch the Pig Grunt shell in local
mode by typing:
$ pig -x local
The "-x local" needs a bit of explaining. This option instructs Pig
to do all of its computation locally without talking to the Hadoop
servers running on your instance. This is very useful when developing
scripts because it gets rid of all the overhead of communicating to
the cluster and waiting for the cluster to compute results, meaning
you get results quicker when working on small data sets. Its downside
is that you aren't getting any of the speed up benefits of distributed
computing, so it would be much slower for large data sets. Thus, if
you ever start an interactive job flow with more than 1 instance you
should just use the command "pig" to start Grunt.
Having run pig should see the Grunt prompt:
grunt>
3.2 Using the Filesystems
You're going to start by looking at the different file systems you can
talk to using Pig. Pig supports the commands "pwd", "cd", "ls" and
"cp" for interacting with file systems. So, start by looking at
where you currently are:
grunt> pwd
file:/
You can look at directories and files there. You can also cd into Amazon S3 buckets:
grunt> cd hdfs:///
grunt> ls
hdfs://domU-12-31-39-00-48-A3.compute-1.internal:9000/mnt <dir>
You see that you are now at the root of the hdfs file system. You
can look at directories and files there. You can also cd into Amazon
S3 buckets:
grunt> cd s3://elasticmapreduce/samples/pig-apache/input/
grunt> ls
s3://elasticmapreduce/samples/pig-apache/input/access_log_1<r 1> 8803772
s3://elasticmapreduce/samples/pig-apache/input/access_log_2<r 1> 8803772
s3://elasticmapreduce/samples/pig-apache/input/access_log_3<r 1> 8803772
s3://elasticmapreduce/samples/pig-apache/input/access_log_4<r 1> 8803772
The <r 1> at the end of the file names is Pig
telling us that these files have replication factor 1. These are
apache access logs that you want our script to process. Eventually,
you will want to process all of them, but when you are developing a
job flow you only want a portion of the data. That way you can learn
quickly if your commands are wrong. For the same reason, you want the
data on the local disk of the master instance rather than on the
remote Amazon S3 system. That way when you process the file you are
not copying it each time from Amazon S3.
To copy a file from S3 to the local file system we use the cp command.
grunt> cp s3://elasticmapreduce/samples/pig-apache/input/access_log_1 file:///home/hadoop
3.4 - The Piggybank
You are now ready to start processing the data. To help with this,
you are going to use an add-on library to Pig called the
Piggybank. People contribute user defined functions to this open
source library. The functions are written in Java but can be called by
Pig scripts to do special types of processing. As part of supporting
Pig, Amazon has added a lot of functions to the Piggybank to help with
String and Datetime processing.
As part of setting up Pig, Elastic MapReduce copies down the Piggybank
onto the local hard disk. To set it up for use type:
grunt> register file:/home/hadoop/lib/pig/piggybank.jar
This command loads the jar. You also need to DEFINE aliases for any
of the classes you want to use. The class you use in this tutorial is
EXTRACT, which you can define using an alias:
grunt> DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();
3.5 - Loading and Illustrating the Data
You are going to use an internal Pig function that loads each line of the source file as a tuple with a single element, TextLoader:
grunt> RAW_LOGS = LOAD 'file:///home/hadoop/access_log_1' USING TextLoader as (line:chararray);
Use the ILLUSTRATE command to make Pig process a few lines of the input data and display results:
grunt> illustrate RAW_LOGS;
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| RAW_LOGS | line: bytearray |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| | 122.161.184.193 - - [21/Jul/2009:13:14:17 -0700] "GET /rss.pl HTTP/1.1" 200 35942 "-" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Trident/4.0; SLCC1; .NET CLR 2.0.50727; .NET CLR 3.5.21022; InfoPath.2; .NET CLR 3.5.30729; .NET CLR 3.0.30618; OfficeLiveConnector.1.3; OfficeLivePatch.1.3; MSOffice 12)" |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| RAW_LOGS | line: chararray |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| | 122.161.184.193 - - [21/Jul/2009:13:14:17 -0700] "GET /rss.pl HTTP/1.1" 200 35942 "-" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Trident/4.0; SLCC1; .NET CLR 2.0.50727; .NET CLR 3.5.21022; InfoPath.2; .NET CLR 3.5.30729; .NET CLR 3.0.30618; OfficeLiveConnector.1.3; OfficeLivePatch.1.3; MSOffice 12)" |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
This is a little hard to read because of the wrapping. What you should see is that Pig is loading the line into a tuple with just a single element --- the line itself. You now need to split the line into fields. To do this, use the EXTRACT Piggybank function, which applies a regular expression to the input and extracts the matched groups as elements of a tuple. The regular expression is a little tricky because the Apache log defines a couple of fields with quotes. What you get is:
'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\S+) (\S+) "([^"]*)" "([^"]*)"'
Unfortunately, you can't use this as is because in Pig strings all backslashes must be escaped with a backslash. Adding this change, you get:
'^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"'
Using this expression within a Pig FOREACH statement to generate a new bag, you get:
grunt> LOGS_BASE = FOREACH RAW_LOGS GENERATE
FLATTEN(
EXTRACT(line, '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"')
)
as (
remoteAddr: chararray,
remoteLogname: chararray,
user: chararray,
time: chararray,
request: chararray,
status: int,
bytes_string: chararray,
referrer: chararray,
browser: chararray
);
Now illustrating this and looking at the LOGS_BASE portion:
grunt> illustrate LOGS_BASE;
...
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| LOGS_BASE | remoteAddr: chararray | remoteLogname: chararray | user: chararray | time: chararray | request: chararray | status: int | bytes_string: chararray | referrer: chararray | browser: chararray |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| | 72.14.194.1 | - | - | 21/Jul/2009:18:04:54 -0700 | GET /gwidgets/alexa.xml HTTP/1.1 | 200 | 2969 | - | FeedFetcher-Google; (+http://www.google.com/feedfetcher.html) |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
You can see that you have split the input into a tuple.
3.4 - Writing a Query
The query you will write determines the top 50 search terms used to
refer to our website, http://example.com.
Thus, you should look at the referrer element in the tuple. The first
thing you do is create a bag containing tuples with just this
element:
grunt> REFERRER_ONLY = FOREACH LOGS_BASE GENERATE referrer;
You want to see what types of things are in this bag. ILLUSTRATE is
not good enough --- you want to see more tuples of data. So, you use the
DUMP command instead. The DUMP command outputs the complete contents
of a bag to the screen. There is usually too much data to display so
you have to add a LIMIT instruction:
grunt> TEMP = LIMIT REFERRER_ONLY 10;
grunt> DUMP TEMP;
2009-08-04 05:52:31,794 [main] INFO org.apache.pig.backend.local.executionengine.LocalPigLauncher - Successfully stored result in: "file:/tmp/temp1295722813/tmp-2054682351"
2009-08-04 05:52:31,794 [main] INFO org.apache.pig.backend.local.executionengine.LocalPigLauncher - Records written : 10
2009-08-04 05:52:31,794 [main] INFO org.apache.pig.backend.local.executionengine.LocalPigLauncher - Bytes written : 0
2009-08-04 05:52:31,794 [main] INFO org.apache.pig.backend.local.executionengine.LocalPigLauncher - 100% complete!
2009-08-04 05:52:31,794 [main] INFO org.apache.pig.backend.local.executionengine.LocalPigLauncher - Success!!
(-)
(-)
(-)
(-)
(-)
(http://example.org/)
(http://example.org/)
(-)
(-)
(-)
You get a few log messages as the command runs, then the
output. You will notice a lot of dash (-) values in the requests that
don't have referrers. You also note the http://example.org/ which is
just our site referring to itself. So let's dive down on the search
engines and use FILTER to include only those referrers that have
'bing' or 'google' in them.
grunt> FILTERED = FILTER REFERRER_ONLY BY referrer matches '.*bing.*' OR referrer matches '.*google.*';
grunt> TEMP = LIMIT FILTERED 10;
grunt> DUMP TEMP;
(http://www.bing.com/search?q=value)
(http://www.bing.com/search?q=login)
(http://www.bing.com/search?q=value)
(http://www.bing.com/search?q=value)
(http://www.google.com/search?client=safari&rls=en-us&q=examples+of+Me.com+websites&ie=UTF-8&oe=UTF-8)
(http://www.google.com/search?client=safari&rls=en-us&q=examples+of+Me.com+websites&ie=UTF-8&oe=UTF-8)
(http://www.bing.com/search?q=graduation)
(http://www.google.co.uk/search?hl=en&newwindow=1&q=domestic+flight+from+checking+procedure+heathrow+&meta=)
(http://www.bing.com/search?q=biking)
(http://www.bing.com/search?q=walla)
You see that both search engines signify the query terms in the
query string using a key of "q" and then separating them with "+". To
extract these, the first step is to use our EXTRACT function to grab
everything from the "q=" up to the end of a string or an ampersand
(&). You then FILTER out any string that does not match our regular
expression. Together you get:
grunt> SEARCH_TERMS = FOREACH FILTERED GENERATE FLATTEN(EXTRACT(referrer, '.*[&\\?]q=([^&]+).*')) as terms:chararray;
grunt> SEARCH_TERMS_FILTERED = FILTER SEARCH_TERMS BY NOT $0 IS NULL;
grunt> DUMP SEARCH_TERMS_FILTERED;
Finally, you want to count the search terms. To do this, you use
GROUP and COUNT, sort by count, and select the top 50.
grunt> SEARCH_TERMS_COUNT = FOREACH (GROUP SEARCH_TERMS_FILTERED BY $0) GENERATE $0, COUNT($1) as num;
grunt> SEARCH_TERMS_COUNT_SORTED = LIMIT(ORDER SEARCH_TERMS_COUNT BY num DESC) 50;
grunt> DUMP SEARCH_TERMS_COUNT_SORTED;
3.5 - Storing the Data
Now that you have the computation for the data you need to save it to disk. To do that, you use the STORE command to store the data to a local file:
grunt> STORE SEARCH_TERMS_COUNT_SORTED into 'file:///home/hadoop/output/run0';
When you type STORE, Pig blocks everything to run the query. Once
it completes, you can see the output using CAT:
grunt> CAT file:///home/hadoop/output/run0
Section 4: Parameterizing the Script and Uploading it to Amazon S3
4.1 - Converting to Using Parameters
The next step is to add parameters for INPUT and OUTPUT to the
script. This will allow you to set the input and output locations of
the script while executing it in batch mode.
Change the load and store instructions to use variables $INPUT and
$OUTPUT:
grunt> RAW_LOGS = LOAD '$INPUT' USING TextLoader as (line:chararray);
grunt> STORE SEARCH_TERMS_COUNT_SORTED into '$OUTPUT';
To supply values for the parameters when running in batch mode use the '-p' option.
$ pig -p INPUT=file:///home/hadoop/access_log_1 -p OUTPUT=file:///home/hadoop/output/run1 s3://mybucket/scripts/myscript.pig
Here is the complete pig script.
register file:/home/hadoop/lib/pig/piggybank.jar
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();
RAW_LOGS = LOAD '$INPUT' USING TextLoader as (line:chararray);
LOGS_BASE = foreach RAW_LOGS generate
FLATTEN (
EXTRACT (line, '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"')
)
as (
remoteAddr:chararray, remoteLogname:chararray, user:chararray, time:chararray,
request:chararray, status:int, bytes_string:chararray, referrer:chararray,
browser:chararray
)
;
REFERRER_ONLY = FOREACH LOGS_BASE GENERATE referrer;
FILTERED = FILTER REFERRER_ONLY BY referrer matches '.*bing.*' OR referrer matches '.*google.*';
SEARCH_TERMS = FOREACH FILTERED GENERATE FLATTEN(EXTRACT(referrer, '.*[&\\?]q=([^&]+).*')) as terms:chararray;
SEARCH_TERMS_FILTERED = FILTER SEARCH_TERMS BY NOT $0 IS NULL;
SEARCH_TERMS_COUNT = FOREACH (GROUP SEARCH_TERMS_FILTERED BY $0) GENERATE $0, COUNT($1) as num;
SEARCH_TERMS_COUNT_SORTED = LIMIT(ORDER SEARCH_TERMS_COUNT BY num DESC) 50;
STORE SEARCH_TERMS_COUNT_SORTED into '$OUTPUT';
4.2 - Save to file and Run from the Command Line
Lets quit out of Grunt and save this text to a file. At the grunt shell type
grunt> quit
To save the grunt script as a file on the box you can from the shell do the following
$ cat > /home/hadoop/tutorial.pig <<EOF
register file:/home/hadoop/lib/pig/piggybank.jar
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();
RAW_LOGS = LOAD '$INPUT' USING TextLoader as (line:chararray);
LOGS_BASE= foreach RAW_LOGS generate FLATTEN(EXTRACT(line, '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"')) as (remoteAddr:chararray, remoteLogname:chararray, user:chararray, time:chararray, request:chararray, status:int, bytes_string:chararray, referrer:chararray, browser:chararray);
REFERRER_ONLY = FOREACH LOGS_BASE GENERATE referrer;
FILTERED = FILTER REFERRER_ONLY BY referrer matches '.*bing.*' OR referrer matches '.*google.*';
SEARCH_TERMS = FOREACH FILTERED GENERATE FLATTEN(EXTRACT(referrer, '.*[&\\?]q=([^&]+).*')) as terms:chararray;
SEARCH_TERMS_FILTERED = FILTER SEARCH_TERMS BY NOT $0 IS NULL;
SEARCH_TERMS_COUNT = FOREACH (GROUP SEARCH_TERMS_FILTERED BY $0) GENERATE $0, COUNT($1) as num;
SEARCH_TERMS_COUNT_SORTED = LIMIT(ORDER SEARCH_TERMS_COUNT BY num DESC) 50;
STORE SEARCH_TERMS_COUNT_SORTED into '$OUTPUT';
EOF
Or if you are familiar with vim or nano you use them to create the file. You could also
save the file on your desktop and then scp the file across to the master node.
You can now test that the script runs by invoking pig on it from
the command line on the script, setting values for the input and
output parameters using the '-p' option, as follows:
$ pig -p INPUT=file:///home/hadoop/access_log_1 -p OUTPUT=file:///home/hadoop/output/run2 file:///home/hadoop/tutorial.pig
This will now run the script, and when it finishes the output should
be in '/home/hadoop/output/run2'.
4.3 - Upload to Amazon S3
In order to upload the script to Amazon S3 you need to have an
Amazon S3 bucket. If you don't have a bucket then as long as you
choose a unique name the tool will create the bucket for you. For more
information about the limitations on bucket names see http://docs.amazonwebservices.com/AmazonS3/2006-03-01/index.html?UsingBucket.html.
The file can be uploaded using the "hadoop dfs" command, which
despite its name can be used for interacting with any filesystem:
$ hadoop dfs -copyFromLocal /home/hadoop/tutorial.pig s3://my.bucket.name/pig/scripts/tutorial.pig
4.4 Terminating the Interactive Job Flow
You are now finished with your interactive job flow, and so need to
terminate it. Return to the Elastic MapReduce tab in the Console, then
click the job flow, and then click the "Terminate" button.
Then click "OK" to terminate the cluster.
Section 5: Running Script through Console on all the data
You are now up to the final step: running the script on the full
set of data through the console. To do this we go to the AWS Console
and click "Create New Job Flow" as before, and again select "Pig
Program" as the type, after typing a name Press "Continue" This takes
you once more to the Pig Details screen:
Figure 5.1: Starting a Batch Mode Pig Session
Enter the following:
| Script Location |
The exact same place you uploaded the script to in 4.4. e.g. s3://my.bucket.name/pig/scripts/tutorial.pig |
| Input Location |
s3://elasticmapreduce/samples/pig-apache/input |
| Output Location |
A location in your bucket that you want the output to go to. e.g. s3://my.bucket.name/pig/output/run0 |
| Extra args |
leave blank |
Elastic MapReduce automatically sets the INPUT and OUTPUT parameters specified above when calling your script. Continue with the defaults through the next step and click the Create Job Flow to launch your job flow.
Your job flow will now launch the cluster, automatically run your
script, and then when it completes shut down the cluster. This process
should take about 15 minutes. When it is done your output will be in
the Amazon S3 bucket you specified. A good, free tool for downloading
this data is the Amazon S3 Organizer Firefox plug-in, which you can
download from http://www.s3fox.net.
Section 6: Further Reading and Resources
|