官方文档:
https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
使用pyspark操作hive,可以很方便得使用udf。
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath(‘spark-warehouse‘)
spark = SparkSession .builder .appName("Python Spark SQL Hive integration example") .config("spark.sql.warehouse.dir", warehouse_location) .enableHiveSupport() .getOrCreate()
# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH ‘./kv1.txt‘ INTO TABLE src")
如果是在win10环境下运行,在传入数据之后,需要修改kv1.txt的权限,使其被程序可读。
# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key| value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...
# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# | 500 |
# +--------+
# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...
# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# | 2| val_2| 2| val_2|
# | 4| val_4| 4| val_4|
# | 5| val_5| 5| val_5|
需求:返回某个字段值的平方
def func_two(key):
return key*key
register
包含三个参数:注册后的udf的函数名,原函数名,函数的返回值类型(需要其为pyspark.sql.types
里的类型)
from pyspark.sql.types import IntegerType
spark.udf.register("func_two",func_two,IntegerType())
sc = spark.sparkContext
from pyspark.sql import HiveContext
# 新的hc
hc = HiveContext(sc)
hc.sql("SELECT func_two(key) as key,value FROM src").collect()
原文:https://www.cnblogs.com/leimu/p/14846438.html