Introduction
More often than not, data doesn't come packaged exactly as we'd like it for analysis. Transformation,
match-merge operations, and a host of data munging tasks
are usually needed before we can extract insights from our Big Data sources. Few people find data
munging exciting, but it has to be done. Once we've suffered that boredom, we should take steps to automate
the process. We want codify our work into repeatable units and create
workflows which we can leverage over and over again without having to write new code. In this article,
we'll look at how to use Oozie to create a workflow for the
parallel machine learning task I described on Cloudera's site.
Hive Actions: Prepping for Pig
In my parallel machine learning article, I use data from the
National Climatic Data Center to build weather models
on a state-by-state basis. NCDC makes the data freely available as gzipped files of day-over-day observations
stretching from the 1930s to today. In reading that post, one might get the impression that the data came
in a handy, ready-to-model files with convenient delimiters. The truth of it is that I need to perform
some parsing and projection on the dataset before it can be modeled. If I get more observations, I'll want
to retrain and test those models, which will require more parsing and projection. This is a good opportunity
to start building up a workflow with Oozie.
I store the data from the NCDC in HDFS and create an external Hive table
partitioned by year. This gives me flexibility of Hive's query language when I want it, but let's me put
the dataset in a directory of my choosing in case I want to treat the same data with Pig or MapReduce code.
CREATE EXTERNAL TABLE IF NOT EXISTS historic_weather(column 1, column2)
PARTITIONED BY (yr string)
STORED AS ...
LOCATION '/user/oracle/weather/historic';
As new weather data comes in from NCDC, I'll need to add partitions to my table. That's an action I should put in the workflow.
Similarly, the weather data requires parsing
in order to be useful as a set of columns. Because of their long history, the weather data is broken up into fields of specific
byte lengths: x bytes for the station ID, y bytes for the dew point, and so on. The delimiting is consistent from year to year,
so writing SerDe or a parser for transformation is simple. Once that's done, I want to select columns on which to train,
classify certain features, and place the training data in an HDFS directory for my Pig script to access.
ALTER TABLE historic_weather ADD IF NOT EXISTS PARTITION (yr='2010')
LOCATION '/user/oracle/weather/historic/yr=2011';
INSERT OVERWRITE DIRECTORY '/user/oracle/weather/cleaned_history'
SELECT w.stn, w.wban, w.weather_year, w.weather_month,
w.weather_day, w.temp, w.dewp, w.weather FROM (
FROM historic_weather SELECT TRANSFORM(...)
USING '/path/to/hive/filters/ncdc_parser.py'
as stn, wban, weather_year, weather_month, weather_day, temp, dewp, weather
) w;
Since I'm going to prepare training directories with at least the same frequency that I add partitions, I should
also add that to my workflow. Oozie is going to invoke these Hive actions using what's somewhat obviously referred to as
a Hive action. Hive actions amount to Oozie running a script file containing our query language statements,
so we can place them in a file called weather_train.hql.
Starting Our Workflow
Oozie offers two types of jobs: workflows and coordinator jobs. Workflows are straightforward: they define a set of actions
to perform as a sequence or directed acyclic graph. Coordinator jobs can take all the same actions of Workflow jobs, but
they can be automatically started either periodically or when new data arrives in a specified location. To keep things
simple we'll make a workflow job; coordinator jobs simply require another XML file for scheduling. The bare minimum
for workflow XML defines a name, a starting point, and an end point:
<workflow-app name="WeatherMan" xmlns="uri:oozie:workflow:0.1">
<start to="ParseNCDCData"/>
<end name="end"/>
</workflow-app>
To this we need to add an action, and within that we'll specify the hive parameters Also, keep in mind that actions require
<ok> and <error> tags to direct the next action on success or failure.
<action name="ParseNCDCData">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>localhost:8021</job-tracker>
<name-node>localhost:8020</name-node>
<configuration>
<property>
<name>oozie.hive.defaults</name>
<value>/user/oracle/weather_ooze/hive-default.xml</value>
</property>
</configuration>
<script>ncdc_parse.hql</script>
</hive>
<ok to="WeatherMan"/>
<error to="end"/>
</action>
There are a couple of things to note here:
I have to give the FQDN (or IP) and port of my JobTracker and NameNode.
I have to include a hive-default.xml file.
I have to include a script file.
The hive-default.xml and script file must be stored in HDFS
That last point is particularly important. Oozie doesn't make assumptions about where a given workflow is being run. You might
submit workflows against different clusters, or have different hive-defaults.xml on different clusters (e.g. MySQL or Postgres-backed metastores).
A quick way to ensure that all the assets end up in the right place in HDFS is just to make a working directory locally, build your workflow.xml
in it, and copy the assets you'll need to it as you add actions to workflow.xml.
At this point, our local directory should contain:
workflow.xml
hive-defaults.xml (make sure this file contains your metastore connection data)
ncdc_parse.hql
Adding Pig to the Ooze
Adding our Pig script as an action is slightly simpler from an XML standpoint. All we do is add an action to workflow.xml as follows:
<action name="WeatherMan">
<pig>
<job-tracker>localhost:8021</job-tracker>
<name-node>localhost:8020</name-node>
<script>weather_train.pig</script>
</pig>
<ok to="end"/>
<error to="end"/>
</action>
Once we've done this, we'll copy weather_train.pig to our working directory. However, there's a bit of a "gotcha" here.
My pig script registers the Weka Jar and a chunk of jython. If those aren't also in HDFS, our action will fail from the outset --
but where do we put them? The Jython script goes into the working directory at the same level as the pig script, because pig attempts to
load Jython files in the directory from which the script executes. However, that's not where our Weka jar goes.
While Oozie doesn't assume much, it does make an assumption about the Pig classpath. Anything under working_directory/lib
gets automatically added to the Pig classpath and no longer requires a REGISTER statement in the script. Anything that uses a REGISTER statement
cannot be in the working_directory/lib directory. Instead, it needs to be in a different HDFS directory and attached to the pig action with an
<archive> tag.
Yes, that's as confusing as you think it is.
You can get the exact rules for adding Jars to the
distributed cache from Oozie's Pig Cookbook.
Making the Workflow Work
We've got a workflow defined and have collected all the components we'll need to run. But we can't run anything yet, because we
still have to define some properties about the job and submit it to Oozie. We need to start with the job properties, as this is essentially the
"request" we'll submit to the Oozie server. In the same working directory, we'll make a file called job.properties as follows:
nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
weatherRoot=weather_ooze
mapreduce.jobtracker.kerberos.principal=foo
dfs.namenode.kerberos.principal=foo
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.wf.application.path=${nameNode}/user/${user.name}/${weatherRoot}
outputDir=weather-ooze
While some of the pieces of the properties file are familiar (e.g., JobTracker address), others take a bit of explaining.
The first is weatherRoot: this is essentially an environment variable for the script (as are jobTracker and queueName). We're simply
using them to simplify the directives for the Oozie job. The oozie.libpath pieces is extremely important. This is a directory
in HDFS which holds Oozie's shared libraries: a collection of Jars necessary for invoking Hive, Pig, and other actions. It's a good idea to make sure
this has been installed and copied up to HDFS. The last two lines are straightforward: run the application defined by workflow.xml at the application path listed and write the output
to the output directory.
We're finally ready to submit our job! After all that work we only need to do a few more things:
Validate our workflow.xml
Copy our working directory to HDFS
Submit our job to the Oozie server
Run our workflow
Let's do them in order. First validate the workflow:
oozie validate workflow.xml
Next, copy the working directory up to HDFS:
hadoop fs -put working_dir /user/oracle/working_dir
Now we submit the job to the Oozie server. We need to ensure that we've got the correct URL for the Oozie server, and we need to
specify our job.properties file as an argument.
oozie job -oozie http://url.to.oozie.server:port_number/ -config /path/to/working_dir/job.properties -submit
We've submitted the job, but we don't see any activity on the JobTracker? All I got was this funny bit of output:
14-20120525161321-oozie-oracle
This is because submitting a job to Oozie creates an
entry for the job and places it in PREP status. What we got back, in essence, is a ticket for our workflow to ride the
Oozie train. We're responsible for redeeming our ticket and running the job.
oozie -oozie http://url.to.oozie.server:port_number/ -start 14-20120525161321-oozie-oracle
Of course, if we really want to run the job from the outset, we can change the "-submit" argument above to "-run." This will prep and run the workflow immediately.
Takeaway
So, there you have it: the somewhat laborious process of building an Oozie workflow. It's a bit tedious the first time out, but it
does present a pair of real benefits to those of us who spend a great deal of time data munging. First, when new data arrives that requires the same processing,
we already have the workflow defined and ready to run. Second, as we build up a set of useful action definitions over time, creating new workflows becomes quicker and
quicker.