Hive是什么?
Hive是Facebook开源的用于解决海量结构化日志的数据统计,是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并且提供类SQL查询功能,本质是将HQL转化成MapReduce程序。
数据存储在HDFS,分析数据底层实现默认是MapReduce,执行程序运行在Yarn上。
如果没有Hive
想象一下数据统计的时候写大量的MapReduce程序,那会是多么痛苦。如果是写SQL就开心多了,尤其是离线数据仓库方面广泛应用。
由于 Hive 采用了类似SQL 的查询语言 HQL(hive query language),因此很容易将 Hive 理解为数据库。其实从结构上来看,Hive 和数据库除了拥有类似的查询语言,再无类似之处。
Hive通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口。
mysql字段类型 | hive:ods字段类型 | hive:dwd字段类型 |
---|---|---|
tinyint | tinyint | tinyint |
int | int | int |
bigint | bigint | bigint |
varchar | string | string |
datetime | bigint | string |
bit | boolean | int |
double | double | double |
hiveserver2
beeline
!connect jdbc:hive2://cdh01.cm:10000
CREATE DATABASE 数据库名字
show databases
drop database 数据库名字
drop database 数据库名字 cascade
desc database 数据库名字
desc database extended 数据库名字
use 数据库名字
show tables
desc formatted 表名
建表语法
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
建表常用参数
EXTERNAL:外部表关键字,Hive分内部表(元数据和数据会被一起删除,一般不太重要的表如:中间临时表)和外部表(只删除元数据,不删除数据,一般而言都采用外部表,因为数据安全些),一般而言采用外部表。
PARTITIONED BY:分区指定字段为‘dt‘。
PARTITIONED BY (
`dt` String COMMENT ‘partition‘
)
row format delimited fields terminated by ‘\t‘ :数据使用什么做切分,这里使用制表符。
stored as parquet:文件存储格式,推荐parquet(spark天然支持parquet)。
location ‘/warehouse/层名/库名/表名‘ :文件存储位置。
tblproperties ("parquet.compression"="snappy") :文件压缩策略,推荐snappy。
CLUSTERED BY创建分桶表(抽样场景使用)
SORTED BY排序(一般情况查询语句都有排序,故不常用)
建表例子
#删除表
drop table if exists dwd.student
#创建表(库名.表名)
CREATE EXTERNAL TABLE `dwd.student`(
`ID` bigint COMMENT ‘‘,
`CreatedBy` string COMMENT ‘创建人‘,
`CreatedTime` string COMMENT ‘创建时间‘,
`UpdatedBy` string COMMENT ‘更新人‘,
`UpdatedTime` string COMMENT ‘更新时间‘,
`Version` int COMMENT ‘版本号‘,
`name` string COMMENT ‘姓名‘
) COMMENT ‘学生表‘
PARTITIONED BY (
`dt` String COMMENT ‘partition‘
)
row format delimited fields terminated by ‘\t‘
location ‘/warehouse/dwd/test/student/‘
stored as parquet
tblproperties ("parquet.compression"="snappy")
创建内部表就是去掉EXTERNAL关键字,使用插入数据案例测试一下删除表后重新建立的效果吧。
INSERT INTO TABLE dwd.student partition(dt=‘2020-04-05‘) VALUES(1,"heaton","2020-04-05","","","1","zhangsan")
INSERT INTO TABLE dwd.student partition(dt=‘2020-04-06‘) VALUES(2,"heaton","2020-04-06","","","1","lisi")
SELECT * FROM dwd.student
SELECT * FROM dwd.student WHERE dt="2020-04-05"
SELECT * FROM dwd.student WHERE dt="2020-04-06"
LOAD DATA INPATH ‘/warehouse/dwd/test/student/dt=2020-04-05/000000_0‘ INTO TABLE dwd.student1 partition(dt=‘2020-04-05‘)
TRUNCATE TABLE dwd.student1
insert into table dwd.student1 partition(dt=‘2020-04-05‘) select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"
由于student1表2020-04-05分区中已经有一条张三数据,这时插入李四数据,查询结果为2条数据
insert overwrite table dwd.student1 partition(dt=‘2020-04-05‘) select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"
由上面的例子使student1表2020-04-05分区中有2条数据,这时候在插入一条,发现只有一条,因为数据被复写。
with
s1 as
(
select id,createdby,createdtime,version,name from dwd.student where dt="2020-04-05"
),
s2 as
(
select id,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"
)
insert overwrite table dwd.student1 partition(dt=‘2020-04-06‘)
select
s1.id as id,
s1.createdby as createdby,
s1.createdtime as createdtime,
"" as updatedby,
"0" as updatedtime,
s1.version as version,
s1.name as name
from
s1
union all
select
s2.id as id,
"" as createdby,
"0" as createdtime,
s2.updatedby as updatedby,
s2.updatedtime as updatedtime,
s2.version as version,
s2.name as name
from
s2
hive -e “select * from xx”:hive命令行SQL语句
hive -f xx.hql:hive执行sql文件
CLUSTERED BY(字段名) into 分桶数 buckets
指定分桶条件。set hive.enforce.bucketing=true; set mapreduce.job.reduces=-1;
y必须是table总bucket数的倍数或者因子,根据Y值决定抽样的比例,如:table共4个桶,当y=2时,抽取4/2=2个桶数据,等y=8时,抽取4/8=1/2的bucket数据。
x是从哪个桶开始抽,然后依次抽取x+y的桶数据,如:table共4个桶,当y=2,x=1时,抽取4/2=2个桶数据,桶数为1号,1+2=3号。
#建表sql
CREATE EXTERNAL TABLE `dwd.student_buck`(
`ID` bigint COMMENT ‘‘,
`name` string COMMENT ‘姓名‘
) COMMENT ‘学生表‘
CLUSTERED BY(ID) into 4 buckets
row format delimited fields terminated by ‘\t‘
stored as parquet
location ‘/warehouse/dwd/test/student_buck/‘
tblproperties ("parquet.compression"="snappy")
#插入数据
INSERT INTO TABLE dwd.student_buck VALUES(1,"zhangsan"),(2,"lisi") ,(3,"wangwu") ,(4,"zhaoliu") ,(5,"haqi") ,(6,"xiba") ,(7,"heijiu") ,(8,"washi")
#抽样查询
SELECT * from dwd.student_buck TABLESAMPLE(BUCKET 1 OUT OF 4 ON id)
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select
Order By 全局排序(全局一个Reducer)
Group By 分组(会根据跟的字段,发到不同的Reducer中)
Distribute By 类似MR中的partition进行分区,需结合Sort By使用,而且要写在前面,如:根据日期分区,在根据年龄排序
#多reduce才有效果 select * from student distribute by createtime sort by age desc
Sort By每个Reducer内部排序,对全局结果来说不是排序,结合Distribute By使用。(多Reducer区内排序)
Cluster By 当分区字段和区内排序字段相同,也就是distribute by和sort by需要字段相同可以使用其代替,但是只能升序排列。
NVL(string,替换值),如果string为NULL,则返回替换值,否则返回string值
# 1 格式化时间-> 2020-04-05 00:00:00
SELECT date_format("2020-04-05","yyyy-MM-dd HH:mm:ss")
# 2 时间天数相加-> 2020-04-05 ->2020-04-01
SELECT date_add("2020-04-05",4)
SELECT date_add("2020-04-05",-4)
# 3 时间天数相减->2020-04-01(一般直接用date_add了,这个不怎么用)
SELECT date_sub("2020-04-05",4)
# 4 两个时间相减-> 4
SELECT datediff("2020-04-09","2020-04-05")
# 5 时间转时间戳-> 1586016000
SELECT unix_timestamp("2020-04-05","yyyy-MM-dd")
# 6 时间戳转时间-> 2020-04-05
SELECT from_unixtime(cast("1586016000" as BIGINT),"yyyy-MM-dd")
# 7 将日期转为星期-> 1 (星期一)
SELECT pmod(datediff("2020-04-06","1920-01-01")-3,7)
财务部 | 男 |
---|---|
财务部 | 男 |
财务部 | 女 |
科技部 | 女 |
人事部 | 女 |
人事部 | 男 |
如上图假设表 emp 中有6个人,求每个部门下面的男女个数。
#使用CASE WHEN
select
department,
sum(case sex when ‘男‘ then 1 else 0 end) man_count,
sum(case sex when ‘女‘ then 1 else 0 end) woman_count,
from
emp
group by
department
#还可以使用IF
select
department,
sum(if(sex=‘男‘,1,0)) man_count,
sum(if(sex=‘女‘,1,0)) woman_count,
from
emp
group by
department
# 1 concat拼接-> 2020-04-05
SELECT concat("2020","-","04","-","05")
# 2 concat_ws拼接-> 2020-04-05 注意 concat_ws可以传collect_set,需要字段为string
SELECT concat_ws("-","2020","04","05")
# 3 汇总成Array类型字段-> [8,4,5,1,6,2,7,3]
SELECT collect_set(id) from dwd.student_buck
EXPLODE(字段名):将一列复杂的array或者map结构拆分成多行
炸裂函数必须配合侧写视图函数:LATERAL VIEW udtf(expression) 侧写视图别名 AS 侧写结果列别名
# 建表
CREATE EXTERNAL TABLE `dwd.hobby`(
`id` bigint COMMENT ‘‘,
`name` string COMMENT ‘姓名‘,
`hobby_name` array<string> COMMENT ‘爱好名字‘
) COMMENT ‘爱好表‘
PARTITIONED BY (
`dt` String COMMENT ‘partition‘
)
row format delimited fields terminated by ‘\t‘
collection items terminated by ‘,‘
stored as parquet
location ‘/warehouse/dwd/test/hobby/‘
tblproperties ("parquet.compression"="snappy")
# 插入3条数据
INSERT INTO TABLE dwd.hobby partition(dt=‘2020-04-05‘)
SELECT 1,"zhangsan",array("篮球","足球","羽毛球","恶作剧")
INSERT INTO TABLE dwd.hobby partition(dt=‘2020-04-05‘)
SELECT 2,"lisi",array("足球","恶作剧")
INSERT INTO TABLE dwd.hobby partition(dt=‘2020-04-05‘)
SELECT 3,"wangwu",array("羽毛球","恶作剧")
# 使用侧写视图将炸裂结果作为临时视图,聚合需要的结果
select
name,hobby_name_col
from
dwd.hobby
lateral view explode(hobby_name) hobby_tmp as hobby_name_col
主要解决1行和n行数据无法聚合在一起展示的问题。
#建表
CREATE EXTERNAL TABLE `dwd.order`(
`name` string COMMENT ‘姓名‘,
`order_date` string COMMENT ‘购买日期‘,
`price` bigint COMMENT ‘消费金额‘
) COMMENT ‘订单表‘
PARTITIONED BY (
`dt` String COMMENT ‘partition‘
)
row format delimited fields terminated by ‘\t‘
stored as parquet
location ‘/warehouse/dwd/test/order/‘
tblproperties ("parquet.compression"="snappy")
#插入数据
INSERT INTO TABLE `dwd.order` partition(dt=‘2020-04-05‘) VALUES("zhangsan","2020-01-01",15),("lisi","2020-01-02",22),("zhangsan","2020-04-01",34),("lisi","2020-04-01",15),("zhangsan","2020-04-04",42),("zhangsan","2020-03-01",24),("lisi","2020-02-01",65),("wangwu","2020-04-01",33),("zhangsan","2020-05-01",43),("zhangsan","2020-07-01",12),("wangwu","2020-02-05",32),("zhangsan","2020-03-06",22),("lisi","2020-04-07",14)
SELECT name,count(*) OVER() FROM dwd.`order` WHERE order_date BETWEEN "2020-04-01" AND "2020-04-30" GROUP BY name
SELECT *,sum(price) OVER() FROM dwd.`order`
SELECT *,sum(price) OVER(ORDER BY order_date) FROM dwd.`order`
SELECT *,sum(price) OVER(ORDER BY order_date DESC) FROM dwd.`order`
over函数传参,还是给所有结果集进行开窗,但是根据参数限定窗口大小,上面sql的意思为:
1号zhangsan数据窗口内只含有 1号张三
2号lisi数据窗口内含有 1号张三,2号李四
3号lisi数据窗口内含有 1号张三,2号李四,3号李四
依次类推
SELECT *,sum(price) OVER(distribute by name) FROM dwd.`order`
SELECT *,sum(price) OVER(distribute by name sort by order_date) FROM dwd.`order`
SELECT *,sum(price) OVER(distribute by name rows between UNBOUNDED PRECEDING and CURRENT ROW) FROM dwd.`order`
SELECT *,lag(order_date,1,"1970-01-01") OVER(distribute by name sort by order_date) FROM dwd.`order`
SELECT *,ntile(5) OVER() FROM dwd.`order`
SELECT * FROM (SELECT *,ntile(5) OVER(ORDER BY order_date) ntile_5 FROM dwd.`order`) t1 WHERE t1.ntile_5=1
SELECT *,row_number() OVER(order by order_date) FROM dwd.`order`
SELECT *,rank() OVER(order by order_date) FROM dwd.`order`
SELECT *,dense_rank() OVER(order by order_date) FROM dwd.`order`
https://cwiki.apache.org/confluence/display/Hive/HivePlugins
必须有返回值,可以返回null,但是类型不能是void。
需要继承UDF类,并且方法名为evaluate
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
</dependency>
import org.apache.hadoop.hive.ql.exec.UDF;
//模拟concat
public class MyUDF extends UDF {
public String evaluate(String... args) {
StringBuilder s = new StringBuilder();
for (String arg : args) {
s.append(arg);
}
return s.toString();
}
}
# 1 添加jar包,注意服务器本地路径(hive客户端关闭则消失,需要重新添加,如果不想重新添加,可以直接使用4将jar包放入hive/lib下)
add jar /root/test/function-1.0-SNAPSHOT.jar
# 2 添加函数,注意hive中的函数别名,还有jar包全类名,如下为永久创建和临时创建temporary(hive客户端关闭则消失)
create function myconcat as "com.hive.function.udf.MyUDF"
create temporary function myconcat as "com.hive.function.udf.MyUDF"
# 3 加载jar包的第二种方式,上传jar包至hdfs集群
# create temporary function myconcat as "com.hive.function.udf.MyUDF" using jar ‘hdfs:///hive_jar/function-1.0-SNAPSHOT.jar‘;
# 4 加载jar包的第三种方式,直接放在hive的lib目录下,启动hive,使用2添加函数即可。下面是cdh环境的jars包路径,配置软连接到hive/lib下
# ln -s /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/jars/function-1.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hive/lib
# 5 删除函数
# drop function myconcat
SELECT myconcat("a","-","b","-","c")
用户自定义UDAF必须继承UDAF,必须提供一个实现了UDAFEvaluator接口的内部类
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
//模拟avg
public class MyUDAF extends UDAF {
public static class AvgState {
private long mCount;
private double mSum;
}
public static class AvgEvaluator implements UDAFEvaluator {
AvgState state;
public AvgEvaluator() {
super();
state = new AvgState();
init();
}
/**
* init函数类似于构造函数,用于UDAF的初始化
*/
public void init() {
state.mSum = 0;
state.mCount = 0;
}
/**
* iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean * * @param o * @return
*/
public boolean iterate(Double o) {
if (o != null) {
state.mSum += o;
state.mCount++;
}
return true;
}
/**
* terminatePartial无参数,其为iterate函数遍历结束后,返回轮转数据, * terminatePartial类似于hadoop的Combiner * * @return
*/
public AvgState terminatePartial() {
// combiner
return state.mCount == 0 ? null : state;
}
/**
* merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean * * @param o * @return
*/
public boolean merge(AvgState avgState) {
if (avgState != null) {
state.mCount += avgState.mCount;
state.mSum += avgState.mSum;
}
return true;
}
/**
* terminate返回最终的聚集函数结果 * * @return
*/
public Double terminate() {
return state.mCount == 0 ? null : Double.valueOf(state.mSum / state.mCount);
}
}
}
用户自定义UDAF必须继承GenericUDTF,重写initialize(),process(),close()方法。
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
import java.util.List;
//模拟EXPLODE和split
public class MyUDTF extends GenericUDTF {
private List<String> dataList = new ArrayList<>();
//初始化方法,返回对象结构校验器
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//列名,会被用户传递的覆盖
List<String> fieldNames = new ArrayList<>();
fieldNames.add("word1");
//返回列以什么格式输出,这里是string,添加几个就是几个列,和上面的名字个数对应个数。
List<ObjectInspector> fieldOIs = new ArrayList<>();
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@Override
public void process(Object[] objects) throws HiveException {
//获取数据
String data = objects[0].toString();
//获取分隔符
String splitKey = objects[1].toString();
//切分数据
String[] words = data.split(splitKey);
//遍历写出
for (String word : words) {
//将数据放入集合
dataList.clear();
dataList.add(word);
//写出数据到缓冲区
forward(dataList);
}
}
@Override
public void close() throws HiveException {
//没有流操作
}
}
hive.execution.engine配置为spark
Fetch抓取修改为more,可以使全局查找,字段查找,limit查找等都不走计算引擎,而是直接读取表对应储存目录下的文件,大大普通查询速度。
hive.fetch.task.conversion配置为more
hive可以通过本地模式在单台机器上处理所有的任务,对于小的数据集,执行时间可以明显被缩短。
hive-site.xml调整下面3个参数,开启本地模式,文件不超过50M,个数不超过10个。
hive.exec.mode.local.auto=true
hive.exec.mode.local.auto.inputbytes.max=50000000 (50M左右,默认128M->134217728,机器资源足建议使用默认值)
hive.exec.mode.local.auto.input.files.max=10 (模式4个)
<property><name>hive.exec.mode.local.auto</name><value>true</value></property>
<property><name>hive.exec.mode.local.auto.inputbytes.max</name><value>50000000</value></property>
<property><name>hive.exec.mode.local.auto.input.files.max</name><value>10</value></property>
在join问题上,让小表放在左边 去左链接(left join)大表,这样可以有效的减少内存溢出错误发生的几率。
hive.auto.convert.join开启。(默认开启)
hive.mapjoin.smalltable.filesize(默认25000000->接近24M,如果机器内存足可以适当调大,需在hive-site.xml中设置,如7.3)
大表和大表join,空key会打到同一个reduce上,造成数据倾斜,任务缓慢,内存泄漏。(reduce任务某个非常慢,其他很快,及发生数据倾斜)
- 空key过滤:使用子查询过滤掉空key,可以有效的提升查询速率。
- 空key转换:附随机值,使其可以随机均匀的分布在不同的reduce上,使用case when then或if判断null值,赋予rand()函数。
默认情况下map阶段同一个key发送给一个reduce,当一个key数据过大时就发生数据倾斜。
那么把某些聚合操作提到Map端进行部分聚合,最后在reduce端得出最终结果,也可以有效的提升执行效率。
hive.map.aggr开启。(默认开启)
hive.groupby.mapaggr.checkinterval=100000(默认100000条,在map端进行聚合操作的条目数目,需在hive-site.xml中设置,如7.3)
hive.groupby.skewindata=true(默认false,有数据倾斜时进行负载均衡,需在hive-site.xml中设置,如7.3)
hive.groupby.skewindata当选项设置为true时,生成的查询计划会有两个MR Job,第一个MR Job会将key加随机数均匀的分布到Reduce中,做部分聚合操作(预处理),第二个MR Job在根据预处理结果还原原始key,按照Group By Key分布到Reduce中进行聚合运算,完成最终操作。
数据量小的时候没关系,大数据量下,由于Count Distinct操作需要用一个Reduce任务来完成,这一个Reduce需要处理的数据量太大,会导致Job缓慢,可以使用子查询Group By再Count的方式替换。
列处理:不使用Select *,使用什么字段就写什么字段,就算是所有字段,也要一一列出,养成好习惯。
行处理:在join操作中,不要直接关联表后使用where条件,这样会使全表关联后在过滤,使用子查询过滤后在join来替换,使查询效率提高。如下
#错误写法
select id from student s left join class c on s.cid=c.id where c.id<=10
#正确写法
select id from student s left join (select id from class where id<=10 ) c on s.cid=c.id
当数据会根据情况变化的时候,先有数据,再想分区的情况。
设计表时尽量避免动态分区,速度比静态分区(也就是直接指定分区慢很多)。
开启动态分区参数
必要参数
相关参数
建表方式同静态分区,插入方式要注意,查询结果最后一个字段即为分区字段(不管名字是否一样,会将最后一个字段的值,直接拿来分区)
#错误写法:会将name值直接插入dt分区字段。
insert into table dwd.student1 partition(dt)
select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student
#正确写法
insert into table dwd.student1 partition(dt)
select id,createdby,createdtime,updatedby,updatedtime,version,name,dt from dwd.student
hive.exec.parallel=true;默认false。需在hive-site.xml中设置,如7.3
hive.exec.parallel.thread.number=16;默认8,同一个sql允许的最大并行度,针对集群资源适当增加。需在hive-site.xml中设置,如7.3
在执行的查询sql前加上Explain指令,查询分析sql执行过程。
原文:https://www.cnblogs.com/ttzzyy/p/12650543.html