Using GenericUDFs to return multiple values in Apache Hive

Darren Lee - 18 Jan 2012

A basic user defined function (UDF) in Hive is very easy to write: you simply subclass org.apache.hadoop.hive.ql.exec.UDF and implement an evaluate method. We've previously written about this strategy, and it works well for most simple cases.

The first case where this breaks down is when you want to return multiple values from your UDF. For me, this often arises when we have serialized data stored in a single Hive field and want to extract multiple pieces of information from it.

For example, suppose we have a simple Person object (leaving out all of the error checking code):

case class Person(val firstName: String, val lastName: String)

object Person {
  def serialize(p: Person): String = {
    p.firstName + "|" + p.lastName
  }

  def deserialize(s: String): Person = {
    val parts = s.split("|")
    Person(parts(0), parts(1))
  }
}

We want to convert a data table containing these serialized objects into one containing firstName and lastName columns.

create table input(serializedPerson string);
load data local inpath ... ;
create table output(firstName string, lastName string) ;

So, what should our UDF and query look like?

Using the previous strategy, we could create two separate UDFs:

insert overwrite table output
select firstName(serializedPerson), lastName(serializedPerson)
from input ;

Unfortunately, the two invocations will have to separately deserialize their inputs, which could be expensive in less trivial examples. It also requires writing two separate implementation classes whose only difference is which field to pull out of your model object.

An alternative is to use a GenericUDF and return a struct instead of a simple string. This requires using object inspectors to specify the input and output types, just like in a UDTF:

class DeserializePerson extends GenericUDF {
  private var inputInspector: PrimitiveObjectInspector = _

  def initialize(inputs: Array[ObjectInspector]): StructObjectInspector = {    
    this.inputInspector = inputs(0).asInstanceOf[PrimitiveObjectInspector]    
    val stringOI = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(STRING)    
    val outputFieldNames = Seq("firstName", "lastName")   
    val outputInspectors = Seq(stringOI, stringOI)    
    ObjectInspectorFactory.getStandardStructObjectInspector(outputFieldNames, outputInspectors)  
  }  

  def getDisplayString(children: Array[String]): String = {    
    "deserialize(" + children.mkString(",") + ")"  
  }  

  def evaluate(args: Array[DeferredObject]): Object = {    
    val input = inputInspector.getPrimitiveJavaObject(args(0).get)   
    val person = Person.deserialize(input.asInstanceOf[String])    
    Array(person.firstName, person.lastName)  
  }
}

Here, we're specifying that we expect a single primitive object inspector as an input (error handling code omitted) and returning a struct containing two fields, both of which are strings. We can now use the following query:

create temporary function deserializePerson as 'com.bizo.udf.DeserializePerson' ;

insert overwrite table output
select person.firstName, person.lastName
from (
  select deserializePerson(serializedPerson)
  from input
) parsed ;

This query deserializes the person only once but gives you access to both of the values returned by the UDF.

Note that this method does not allow you to return multiple rows -- for that, you still need to use a UDTF.

comments powered by Disqus