[转]Hive中对group结果分组取limit N值的实现

转载引言:

数据处理中遇到了取全国各个省份的效果数据先排序后limit 100的需求,HIVE自带功能无法实现,网上搜了下该文章的方法直接拷贝过来就能实现。将其中的代码复制过来后可以用maven打成Jar包,然后在hive中即可使用。

观察代码可以看出,由于是取各个分组的top数据,因此可以先用distribute和sort进行数据分区并排序,在各个reduce节点上,由于运行的是单个JVM虚拟机,所以在JAVA类中使用static变量即可进行整个处理过程的数据共享。于是comparedColumn字符串数组被用来记录每一组的key值,同时用rowNum来记录每一组值的最大标记。最终使用数字标记和所需的数字比较,取出group后的limit数目。

背景

假设有一个学生各门课的成绩的表单,应用hive取出每科成绩前100名的学生成绩。

这个就是典型在分组取Top N的需求。

解决思路

对于取出每科成绩前100名的学生成绩,针对学生成绩表,根据学科,成绩做order by排序,然后对排序后的成绩,执行自定义函数row_number(),必须带一个或者多个列参数,如ROW_NUMBER(col1, ....),它的作用是按指定的列进行分组生成行序列。在ROW_NUMBER(a,b) 时,若两条记录的a,b列相同,则行序列+1,否则重新计数。

只要返回row_number()返回值小于100的的成绩记录,就可以返回每个单科成绩前一百的学生。

解决过程

成绩表结构

create table score_table (
  subject        string,
  student       string,
  score           int)
partitioned by (date string)

 

 如果要查询2012年每科成绩前100的学生成绩,sql如下

create temporary function row_number as 'com.blue.hive.udf.RowNumber';select subject,score,student from
    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_scorewhere row_number(subject) <= 100;

com.blue.hive.udf.RowNumber是自定义函数,函数的作用是按指定的列进行分组生成行序列。这里根据每个科目的所有成绩,生成序列,序列值从1开始自增。

假设成绩表的记录如下:

物理  80 张三
数学  100 李一
物理  90  张二
数学  90  李二
物理  100 张一
数学  80  李三
.....

经过order by全局排序后,记录如下

物理  100 张一
物理  90  张二
物理  80 张三
.....
数学  100 李一
数学  90  李二
数学  80  李三
....

接着执行row_number函数,返回值如下

科目  成绩 学生   row_number
物理  100 张一      1
物理  90  张二      2
物理  80  张三      3
.....
数学  100 李一      1
数学  90  李二      2
数学  80  李三      3
....

因为hive是基于MAPREADUCE的,必须保证row_number执行是在reducer中执行。上述的语句保证了成绩表的记录,按照科目和成绩做了全局排序,然后在reducer端执行row_number函数,如果在map端执行了row_number,那么结果将是错误的。

要查看row_number函数在map端还是reducer端执行,可以查看hive的执行计划:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select subject,score,student from
    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_scorewhere row_number(subject) <= 100;

 

explain不会执行mapreduce计算,只会显示执行计划。

只要row_number函数在reducer端执行,除了使用order by全局排序配合,也可以使用distribute by + sort by。distribute by可以让相同科目的成绩记录发送到同一个reducer,而sort by可以在reducer端对记录做排序。

而使用order by全局排序,只有一个reducer,未能充分利用资源,相比之下,distribute by + sort by在这里更有性能优势,可以在多个reducer做排序,再做row_number的计算。

sql如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';select subject,score,student from
    (select subject,score,student from score where dt='2012'  distribute by subject sort by subject asc, socre desc) order_scorewhere row_number(subject) <= 100;

如果成绩有学院字段college,要找出学院里,单科成绩前一百的学生,解决方法如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select college,subject,score,student from
    (select college,subject,score,student from score where dt='2012'  order by college asc,subject asc,socre desc) order_scorewhere row_number(college,subject) <= 100;

如果成绩有学院字段college,要找出学院里,总成绩前一百的学生,解决方法如下:

create temporary function row_number as 'com.blue.hive.udf.RowNumber';
explain select college,totalscore,student from
    (select college,student,sum(score) as totalscore from score where dt='2012'  group by college,student  order by college asc,totalscore desc) order_scorewhere row_number(college) <= 100;

row_number的源码

函数row_number(),必须带一个或者多个列参数,如ROW_NUMBER(col1, ....),它的作用是按指定的列进行分组生成行序列。在ROW_NUMBER(a,b) 时,若两条记录的a,b列相同,则行序列+1,否则重新计数。

package com.blue.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

public class RowNumber extends UDF {

	private static int MAX_VALUE = 50;
	private static String comparedColumn[] = new String[MAX_VALUE];
	private static int rowNum = 1;

	public int evaluate(Object... args) {
		String columnValue[] = new String[args.length];
		for (int i = 0; i < args.length; i++) {
			columnValue[i] = args[i].toString();
		}
		if (rowNum == 1) {
			for (int i = 0; i < columnValue.length; i++)
				comparedColumn[i] = columnValue[i];
		}

		for (int i = 0; i < columnValue.length; i++) {
			if (!comparedColumn[i].equals(columnValue[i])) {
				for (int j = 0; j < columnValue.length; j++) {
					comparedColumn[j] = columnValue[j];
				}
				rowNum = 1;
				return rowNum++;
			}
		}
		return rowNum++;
	}
}

编译后,打包成一个jar包,如/usr/local/hive/udf/blueudf.jar

然后在hive shell下使用,如下:

add jar /usr/local/hive/udf/blueudf.jar;create temporary function row_number as 'com.blue.hive.udf.RowNumber';select subject,score,student from
    (select subject,score,student from score where dt='2012'  order by subject,socre desc) order_scorewhere row_number(subject) <= 100;

转自:http://www.cnblogs.com/ggjucheng/archive/2013/01/30/2868993.html

本文地址:http://crazyant.net/1409.html

相关推荐

Leave a Comment