Oozing Caribou

October 12th, 2013
PerlHive

Oozing Caribou

Meet Oozie’s Workflows

Oozie is a workflow scheduler for Hadoop. But that’s not terribly important right now. What is important is that it defines its workflows using an XML dialect. And as all XML things go, the result is… shall we say, less than easy on the eyes and the typing fingers. As piece of evidence, I bring you that simple example workflow part of the Oozie distribution:

<workflow-app name="demo-wf">

    <start to="cleanup-node"/>

    <action name="cleanup-node">
        <fs>
            <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo"/>
        </fs>
        <ok to="fork-node"/>
        <error to="fail"/>
    </action>

    <fork name="fork-node">
        <path start="pig-node"/>
        <path start="streaming-node"/>
    </fork>

    <action name="pig-node">
        <pig>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.map.output.compress</name>
                    <value>false</value>
                </property>
            </configuration>
            <script>id.pig</script>
            <param>INPUT=/user/${wf:user()}/${examplesRoot}/input-data/text</param>
            <param>OUTPUT=/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node</param>
        </pig>
        <ok to="join-node"/>
        <error to="fail"/>
    </action>

    <action name="streaming-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node"/>
            </prepare>
            <streaming>
                <mapper>/bin/cat</mapper>
                <reducer>/usr/bin/wc</reducer>
            </streaming>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>

                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/input-data/text</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="join-node"/>
        <error to="fail"/>
    </action>

    <join name="join-node" to="mr-node"/>


    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>

                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.oozie.example.DemoMapper</value>
                </property>
                <property>
                    <name>mapred.mapoutput.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapred.mapoutput.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.oozie.example.DemoReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node,/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="decision-node"/>
        <error to="fail"/>
    </action>

    <decision name="decision-node">
        <switch>
            <case to="hdfs-node">${fs:exists(concat(concat(concat(concat(concat(nameNode, '/user/'), wf:user()), '/'), examplesRoot), '/output-data/demo/mr-node')) == "true"}</case>
            <default to="end"/>
        </switch>
    </decision>

    <action name="hdfs-node">
        <fs>
            <move source="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node"
                  target="/user/${wf:user()}/${examplesRoot}/output-data/demo/final-data"/>
        </fs>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Demo workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>

    <end name="end"/>

</workflow-app>

Not the worst tag soup ever, I’ll give you. But still, that’s hefty on the eyes.

For the Love of the FSM, DSL That ML

At the core, the XML representation of the workflow is a fine thing. It’s very easily machine-parsable and well-defined. It’s just to very friendly to us humans, and it’s one case where I think DSLs do wonders to abstract most of the tediousness and verbosity of the job.

Enter Template::Caribou, that toy templating system of mine. While its primary raison d’etre is HTML templating, it has been designed such that it’s friendly to any XML dialect. Indeed, with the help of a Hive tag library (currently available on the ‘hive’ branch of the GitHub repo of Caribou), here is how the workflow above could look like.

First, we need a wrapping class:

#!/usr/bin/perl

use strict;
use warnings;

package Workflow;

use Moose;
use Template::Caribou;

with 'Template::Caribou';
with 'Template::Caribou::Files' => {
    dirs => [ '.' ],
};

my $template = Workflow->new;

print $template->render('demo');

and there is demo.bou, in all its glory:

use Template::Caribou::Tags::Hive ':all';

workflow 'demo-wf',
    start => 'cleanup-node',
    end => 'end',
    sub {

    action 'cleanup-node',
        ok => 'fork-node',
        error => 'fail',
        sub {
            fs {
                oozie_delete '${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo';
            }
    };

    oozie_fork 'fork-node', qw/
        pig-node
        streaming-node
    /;


    action 'pig-node',
        ok => 'join-node',
        error => 'fail',
        sub { pig
            'job-tracker' => '${JobTracker}',
            'name-node' => '${nameNode}',
            prepare => sub {
                oozie_delete '${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node';
            },
            configuration => {
                'mapred.job.queue.name' => '${queueName}',
                'mapred.map.output.compress' => 'false',
            },
            script => 'id.pig',
            params => [
            'INPUT=/user/${wf:user()}/${examplesRoot}/input-data/text',
            'OUTPUT=/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node'
            ],
    };


    action 'streaming-node',
        ok => 'join-node',
        error => 'fail',
        sub {
        map_reduce
            job_tracker => '${JobTracker}',
            name_node => '${nameNode}',
            prepare => sub {
                delete => '${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node'
            },
            streaming => {
                mapper => '/bin/cat',
                reducer => '/usr/bin/wc',
            },
            configuration => {
                'mapred.job.queue.name' => '${queueName}',
                'mapred.input.dir' => '/user/${wf:user()}/${examplesRoot}/input-data/text',
                'mapred.output.dir' => '/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node'
            },
        ;
    };

    oozie_join 'join-node' => 'mr-node';

    action 'mr-node',
        ok => 'decision-node',
        error => 'fail',
        sub {
            map_reduce
                job_tracker => '${JobTracker}',
                name_node => '${nameNode}',
                prepare => sub {
                    oozie_delete '${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node';
                },
                configuration => {
                    'mapred.job.queue.name' => '${queueName}',
                    'mapred.mapper.class' => 'org.apache.oozie.example.DemoMapper',
                    'mapred.mapoutput.key.class' => 'org.apache.hadoop.io.Text',
                    'mapred.mapoutput.value.class' => 'org.apache.hadoop.io.IntWritable',
                    'mapred.reducer.class' => 'org.apache.oozie.example.DemoReducer',
                    'mapred.map.tasks' => 1,
                    'mapred.input.dir' => '/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node,/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node',
                    'mapred.output.dir' => '/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node'
                },
        };

    decision 'decision-node', 'end',
        'hdfs-node' => q[${fs:exists(concat(concat(concat(concat(concat(nameNode, '/user/'), wf:user()), '/'), examplesRoot), '/output-data/demo/mr-node')) == "true"}];

    action 'hdfs-node',
        ok => 'end',
        error => 'fail',
        sub {
        fs {
            move '${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node' => '/user/${wf:user()}/${examplesRoot}/output-data/demo/final-data';
        };
    };

    oozie_kill 'fail' => <<'URGH';
Demo workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
URGH

};

Reading the whole thing still doesn’t feel like Christmas, but it’s an improvement. And now that it’s part of a templating system, we can split the different actions in their own template/file, and then use a little bit of programmatic magic to slurp them all in the main workflow.

Also, while the example we’re using is simple enough that no great feats of simplification can be done, it’s easy to think of cases where a for loop will be our best friend. Like if we might need to create lots of actions based on some parameter:

# somewhere in the 'workflow' template

oozie_fork 'update_tables' => map {
    'update_' . $_
} $self->tables_to_update;

# sub-template creating the 'update_*table*' action node
show( 'table_update', table_name => $_ ) for $self->tables_to_update;

oozie_join 'all_updates_done' => 'some-next-node';

Of course for loops we could do also something similar with an XSLT transform. But… y’know… no. Just… no.

Bonus Feature: Workflow Graph!

Unrelated to the stuff above, but just because I find it cute: want to make a quick graph of the workflow? Here’s a quick and dirty way to do it:

#!/usr/bin/env perl

use strict;
use warnings;

use Web::Query;
use Graph::Easy;

my $q = Web::Query->new_from_html( join '', <> );
my $graph = Graph::Easy->new;

$q->find( 'start' )->each(sub{
    $graph->add_edge( 'START' => $_[1]->attr('to') );
});

$q->find( 'end' )->each(sub{
    $graph->add_node($_[1]->attr('name') );
});

$q->find('action')->each(sub{
    for my $next (qw/ ok error /) {
        my $next_node = $_[1]->find($next)->attr('to') or next;
        $graph->add_edge(
            $_[1]->attr('name') => $next_node
        )->set_attribute( label => $next );
    }
});

$q->find('fork')->each(sub{
    my $name = $_[1]->attr('name');
    $_[1]->find('path')->each(sub{
        $graph->add_edge($name => $_[1]->attr('start'))
    });
});

$q->find('join')->each(sub{
    $graph->add_edge( map { $_[1]->attr($_) } qw/ name to / );
});

$q->find('decision')->each(sub{
    my $name = $_[1]->attr('name');
    $_[1]->find('case,default')->each(sub{
        $graph->add_edge( $name => $_[1]->attr('to') );
    });

});

print $graph->as_ascii;

Which gives us

$ perl graph.pl workflow.xml

                                               +----------------+  ok
  +------------------------------------------- | streaming-node | ------------------------------+
  |                                            +----------------+                               |
  |                                              ^                                              |
  |                                              |                                              |                                   +-----------------------------------------+
  |                                              |                                              v                                   |                                         v
  |  +-------+          +--------------+  ok   +----------------+          +----------+  ok   +-----------+     +---------+  ok   +---------------+     +-----------+  ok   +-----+
  |  | START | -------> | cleanup-node | ----> |   fork-node    | -------> | pig-node | ----> | join-node | --> | mr-node | ----> | decision-node | --> | hdfs-node | ----> | end |
  |  +-------+          +--------------+       +----------------+          +----------+       +-----------+     +---------+       +---------------+     +-----------+       +-----+
  |                       |                                                  |                                    |                                       |
  |                       | error                                            |                                    |                                       |
  |                       v                                                  |                                    |                                       |
  |            error    +---------------------------------------+  error     |                                    |                                       |
  +-------------------> |                                       | <----------+                                    |                                       |
                        |                                       |                                                 |                                       |
                        |                                       |  error                                          |                                       |
                        |                 fail                  | <-----------------------------------------------+                                       |
                        |                                       |                                                                                         |
                        |                                       |  error                                                                                  |
                        |                                       | <---------------------------------------------------------------------------------------+
                        +---------------------------------------+