hive map reduce in java

larry ogrodnek - 07 Oct 2009

In my last post, I went through an example of writing custom reduce scripts in hive.

Writing a streaming reducer requires a lot of the same work to check for when keys change. Additionally, in java, there's a decent amount of boilerplate to go through just to read the columns from stdin.

To help with this, I put together a really simple little framework that more closely resembles the hadoop Mapper and Reducer interfaces.

To use it, you just need to write a really simple reduce method:

void reduce(String key, Iterator<String[]> records, Output output);

The helper code will handle all IO, as well as the grouping together of records that have the same key. The 'records' Iterator will run you through all rows that have the key specified in key. It is assumed that the first column is the key. Each element in the String[] record represents a column. These rows aren't buffered in memory or anything, so it can handle any arbitrary number of rows.

Here's the complete example from the my reduce example, in java (even shorter than perl).

public class Condenser {
  public static void main(final String[] args) {
    new GenericMR().reduce(System.in, System.out, new Reducer() {
      void reduce(String key, Iterator records, Output output) throws Exception {
        final StringBuilder vals = new StringBuilder();
        while (records.hasNext()) {
          // note we use col[1] -- the key is provided again as col[0]
          vals.append(records.next()[1]);
          if (records.hasNext()) { vals.append(","); }
        }
        output.collect(new String[] { key, vals.toString() });
      }
    });
  }
}

Here's a wordcount reduce example:

public class WordCountReduce {
  public static void main(final String[] args) {
    new GenericMR().reduce(System.in, System.out, new Reducer() {
      public void reduce(String key, Iterator&lt;String[]&gt; records, Output output) throws Exception {
        int count = 0;
        while (records.hasNext()) {
          count += Integer.parseInt(records.next()[1]);
        }
        output.collect(new String[] { key, String.valueOf(count) });
      }
    });
  }
}

Although the real value is in making it easy to write reducers, there's also support for helping with mappers. Here's my key value split mapper from a previous example:

public class KeyValueSplit {
  public static void main(final String[] args) {
    new GenericMR().map(System.in, System.out, new Mapper() {
      public void map(String[] record, Output output) throws Exception {
        for (final String kvs : record[0].split(",")) {
          final String[] kv = kvs.split("=");
          output.collect(new String[] { kv[0], kv[1] });
        }
      }
    }
  }

The full source code is available here. Or you can download a prebuilt jar here.

The only dependency is apache commons-lang.

I'd love to hear any feedback you may have.

comments powered by Disqus