自学内容网 自学内容网

Hive自定义函数——简单使用

        在 Hadoop 生态系统中,特别是在 Hive 和其他 SQL-on-Hadoop 工具中,UDF(用户自定义函数),UDAF(用户自定义聚合函数),以及 UDTF(用户自定义表生成函数)允许用户定义自定义的函数逻辑,以适应特定的业务需求。这些自定义函数帮助扩展 Hive 的功能,在数据处理和分析中变得非常灵活。下面分别介绍这三种函数及其优劣势、适用场景,并给出具体的示例。

1. UDF(User Defined Function) - 用户自定义函数

定义

UDF 是一种用户自定义的单行函数,它将输入的一行数据进行处理并返回一个结果。例如,可以将 UDF 用来执行列的转换或简单的计算。

优势
  • 简单、灵活:UDF 通常处理单行输入并返回单个结果,非常适合执行简单的列级别的操作。
  • 易于实现:编写和注册 UDF 相对容易,只需实现一个特定的方法。
  • 性能较好:因为它只对单行数据操作,所以性能通常较好。
劣势
  • 只能处理一行数据,无法聚合多行数据。
  • UDF 的逻辑相对简单,不能实现复杂的表操作或数据拆分。
适用场景
  • 适用于单列或多列的简单数据转换或处理。
  • 常用于数据清洗、格式化、字符串处理等场景。
示例

假设需要一个 UDF 函数来将字符串转为大写:

public class UpperCaseUDF extends UDF {
    public String evaluate(String input) {
        return input == null ? null : input.toUpperCase();
    }
}

使用场景:

SELECT UPPERCASE(name) FROM employees;

将 employees 表中的 name 列转为大写。

2. UDAF(User Defined Aggregation Function) - 用户自定义聚合函数

定义

UDAF 是一种用户自定义的聚合函数,它处理多行数据,并返回一个聚合后的结果。类似于 SQL 中的 SUMAVG 等聚合函数。

优势
  • 能够聚合多行数据,适合处理需要计算汇总值、平均值、最大值、最小值等操作的场景。
  • 提供了灵活的多行数据处理能力,可以自定义复杂的聚合逻辑。
劣势
  • 实现较为复杂:相比 UDF,编写 UDAF 需要更多的步骤和逻辑处理,如分阶段的聚合和合并操作。
  • 聚合操作需要在不同阶段维护状态,因此可能会消耗更多内存和计算资源。
适用场景
  • 适合需要聚合多行数据的场景,如汇总计算、求平均、最大最小值等。
  • 适用于自定义复杂的统计分析,如百分位数、标准差等。
示例

假设需要计算员工工资的方差,可以编写一个自定义 UDAF。

import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfoBase;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfoImpl;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.PrimitiveDoubleObjectInspector;
import org.apache.hadoop.io.DoubleWritable;

public class VarianceUDAF extends AbstractGenericUDAFResolver {

    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws HiveException {
        return new VarianceEvaluator();
    }

    public static class VarianceEvaluator extends GenericUDAFEvaluator {

        // 聚合状态类,用来保存聚合过程中间结果
        public static class VarianceBuffer implements UDAFEvaluator {
            private long count;      // 数据点个数
            private double sum;      // 数据和
            private double sumOfSquares; // 数据平方和

            public VarianceBuffer() {
                init();
            }

            public void init() {
                count = 0;
                sum = 0.0;
                sumOfSquares = 0.0;
            }
        }

        private PrimitiveDoubleObjectInspector inputOI;
        private ObjectInspector outputOI;

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);

            // 定义输入和输出的 ObjectInspector
            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
                inputOI = (PrimitiveDoubleObjectInspector) parameters[0];
            }
            outputOI = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;

            return outputOI;
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            VarianceBuffer buffer = new VarianceBuffer();
            buffer.init();
            return buffer;
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((VarianceBuffer) agg).init();
        }

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            if (parameters[0] != null) {
                VarianceBuffer buffer = (VarianceBuffer) agg;
                double value = inputOI.get(parameters[0]);
                buffer.count++;
                buffer.sum += value;
                buffer.sumOfSquares += value * value;
            }
        }

        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            // 返回部分聚合结果
            VarianceBuffer buffer = (VarianceBuffer) agg;
            Object[] result = new Object[3];
            result[0] = new DoubleWritable(buffer.count);
            result[1] = new DoubleWritable(buffer.sum);
            result[2] = new DoubleWritable(buffer.sumOfSquares);
            return result;
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            if (partial != null) {
                VarianceBuffer buffer = (VarianceBuffer) agg;
                Object[] partialResult = (Object[]) partial;
                buffer.count += ((DoubleWritable) partialResult[0]).get();
                buffer.sum += ((DoubleWritable) partialResult[1]).get();
                buffer.sumOfSquares += ((DoubleWritable) partialResult[2]).get();
            }
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            VarianceBuffer buffer = (VarianceBuffer) agg;
            if (buffer.count == 0) {
                return null;
            }
            double mean = buffer.sum / buffer.count;
            double variance = (buffer.sumOfSquares - buffer.sum * mean) / buffer.count;
            return new DoubleWritable(variance);
        }
    }
}

使用场景:

SELECT VARIANCE(salary) FROM employees;

计算 employees 表中 salary 列的工资方差。

3. UDTF(User Defined Table-Generating Function) - 用户自定义表生成函数

定义

UDTF 是一种用户自定义的表生成函数,它接受一行输入,但可以返回多行甚至多列数据。它的作用类似于 SQL 中的 EXPLODE,将一行数据拆分成多行。

优势
  • 能够从单行数据生成多行或多列数据,适合进行数据拆分和结构化。
  • 非常灵活,能够处理复杂的多行、多列数据生成场景。
  • 适合需要扩展一行数据到多行数据的场景。
劣势
  • 实现复杂度较高:与 UDF 和 UDAF 相比,UDTF 需要处理更多的输出逻辑。
  • 性能较差:由于 UDTF 会输出多行数据,可能会引入较大的开销,特别是当输出数据量大时。
适用场景
  • 适合将一行数据拆分成多行数据的场景,如列表拆分、JSON 解析等。
  • 适合处理复杂的表生成操作,如跨多列的数据展开或分组。
示例

假设有一列存储了逗号分隔的字符串,需要将其拆分成多行,可以编写一个 UDTF。

public class ExplodeUDTF extends GenericUDTF {
    @Override
    public void process(Object[] args) throws HiveException {
        String input = args[0].toString();
        for (String word : input.split(",")) {
            forward(new Object[]{word});
        }
    }
}

使用场景:

SELECT EXPLODE(split_col) FROM table_with_comma_separated_data;

将 table_with_comma_separated_data 表中 split_col 列中的逗号分隔字符串拆分成多行。


UDF、UDAF、UDTF 的比较

特性UDFUDAFUDTF
处理的输入一行数据多行数据一行数据
输出单个结果单个聚合结果多行或多列数据
优点实现简单,适合单行数据处理适合复杂的聚合操作,如求和、平均值等适合数据拆分、扩展多行数据
缺点不能处理多行或表级别的操作实现复杂,需要维护状态实现复杂,性能可能较差
适用场景单列转换,如格式化、数据清洗多行聚合操作,如汇总、统计一行拆分多行,如 JSON 解析,列表拆分
使用示例SELECT UPPER(col)SELECT SUM(col)SELECT EXPLODE(col)

总结

  • UDF:适用于列级的简单数据转换和计算,如格式化、字符串处理等。
  • UDAF:适合需要对多行数据进行聚合的场景,如求和、求平均等。
  • UDTF:适合需要将一行数据拆分成多行的情况,如数组或字符串拆分。

每种函数类型都有其独特的优缺点,选择哪一种取决于具体的数据处理需求和应用场景。


原文地址:https://blog.csdn.net/goTsHgo/article/details/142364778

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!