在这篇文章中,我们将深入了解用户定义表函数(UDTF),该函数的实现是通过继承org.apache.Hadoop.hive.ql.udf.generic.GenericUDTF这个抽象通用类,UDTF相对UDF更为复杂,但是通过它,我们读入一个数据域,输出多行多列,而UDF只能输出单行单列。
 
 
代码
文章中所有的代码可以在这里找到:hive examples、GitHub repository
 
示例数据
首先先创建一张包含示例数据的表:people,该表只有name一列,该列中包含了一个或多个名字,该表数据保存在people.txt文件中。
- ~$ cat ./people.txt  
-   
- John Smith  
- John and Ann White  
- Ted Green  
- Dorothy  
 
把该文件上载到hdfs目录/user/matthew/people中:
 
- hadoop fs -mkdir people  
- hadoop fs -put ./people.txt people  
 
下面要创建hive外部表,在hive shell中执行
 
- CREATE EXTERNAL TABLE people (name string)  
- ROW FORMAT DELIMITED FIELDS   
-     TERMINATED BY ‘\t‘   
-     ESCAPED BY ‘‘   
-     LINES TERMINATED BY ‘\n‘  
- STORED AS TEXTFILE   
- LOCATION ‘/user/matthew/people‘;  
 
UDTF的输出值
上一文章讲解的UDF与GenericUDF函数是操作单个数据域。它们必须要返回一个值。但是这并不适用于所用的数据处理任务。Hive可以存储许多类型的数据,而有时候我们并不想单数据域输入、单数据域输出。对于每一行的输入,可能我们想输出多行,又或是不输出,举个例子,想一下函数explode(一个hive内置函数)的作用。
同样,可能我们也想输出多列,而不是输出单列。
以上所有的要求我们可以用UDTF去完成。
 
实例
首先我们先假设我们想清洗people这张表中的人名,这个新的表有:
1、姓和名 两个分开的列
2、所有记录都包含姓名
3、每条记录或有包含多个人名(eg Nick and Nicole Smith)
 
为了达到这个实例目的,我们将实现以下API:
- org.apache.hadoop.hive.ql.udf.generic.GenericUDTF  
 
我们将覆盖以下三个方法:
 
- abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;   
-   
- abstract void process(Object[] record) throws HiveException;  
-   
- abstract void close() throws HiveException;  
 
代码实现
 
完整代码
- public class NameParserGenericUDTF extends GenericUDTF {  
-   
-       private PrimitiveObjectInspector stringOI = null;  
-   
-       @Override  
-       public StructObjectInspector initialize(ObjectInspector[] args) UDFArgumentException {  
-   
-         if (args.length != 1) {  
-           throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument");  
-         }  
-   
-         if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE  
-             && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {  
-           throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter");  
-         }  
-           
-         
-         stringOI = (PrimitiveObjectInspector) args[0];  
-   
-         
-         List<String> fieldNames = new ArrayList<String>(2);  
-         List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);  
-         fieldNames.add("name");  
-         fieldNames.add("surname");  
-         fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
-         fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
-         return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);  
-       }  
-             
-       public ArrayList<Object[]> processInputRecord(String name){  
-             ArrayList<Object[]> result = new ArrayList<Object[]>();  
-             
-             
-             if (name == null || name.isEmpty()) {  
-               return result;  
-             }  
-               
-             String[] tokens = name.split("\\s+");  
-               
-             if (tokens.length == 2){  
-                 result.add(new Object[] { tokens[0], tokens[1] });  
-             }else if (tokens.length == 4 && tokens[1].equals("and")){  
-                 result.add(new Object[] { tokens[0], tokens[3] });  
-                 result.add(new Object[] { tokens[2], tokens[3] });  
-             }  
-               
-             return result;  
-       }  
-         
-       @Override  
-       public void process(Object[] record) throws HiveException {  
-   
-         final String name = stringOI.getPrimitiveJavaObject(record[0]).toString();  
-   
-         ArrayList<Object[]> results = processInputRecord(name);  
-   
-         Iterator<Object[]> it = results.iterator();  
-           
-         while (it.hasNext()){  
-             Object[] r = it.next();  
-             forward(r);  
-         }  
-       }  
-   
-       @Override  
-       public void close() throws HiveException {  
-         
-       }  
- }  
 
以上代码可以从:github目录 check 下来。
 
 
代码走读
该UDTF以string类型作为参数,返回一个拥有两个属性的对象,与GenericUDF比较相似,指定输入输出数据格式(objectinspector),以便hive能识别输入与输出。
我们为输入的string参数定义了数据格式PrimitiveObjectInspector
- stringOI = (PrimitiveObjectInspector) args[0]  
 
定义输出数据格式(objectinspectors) 需要我们先定义两个属性名称,因为(objectinspectors)需要读取每一个属性(在这个实例中,两个属性都是string类型)。
 
- List<String> fieldNames = new ArrayList<String>(2);  
- fieldNames.add("name");  
- fieldNames.add("surname");  
-   
- List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);  
- fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
- fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
-   
- return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);  
 
我们主要的处理逻辑放在这个比较直观的processInputRecord方法当中。分开逻辑处理有利我们进行更简单的单元测试,而不用涉及到繁琐的objectinspector。
 
最后,一旦得到结果就可以对其进行forward,把基注册为hive处理后的输出记录对象。
- while (it.hasNext()){  
-             Object[] r = it.next();  
-             forward(r);  
-     }  
- }  
 
使用该UDTF函数
我们可以在hive中创建我们自己的函数
 
- mvn package  
- cp target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar ./ext.jar  
 
然后在hive中使用
 
- ADD JAR ./ext.jar;  
-   
- CREATE TEMPORARY FUNCTION process_names as ‘com.matthewrathbone.example.NameParserGenericUDTF‘;   
-   
- SELECT   
-     adTable.name,  
-     adTable.surname   
- FROM people   
-     lateral view process_names(name) adTable as name, surname;  
 
输出
 
- OK  
- John    Smith  
- John    White  
- Ann     White  
- Ted     Green  
 
原文链接