ant clean package -Dhadoop.version=2.2.0 -Dhadoop-0.23.version=2.2.0 -Dhadoop.mr.rev=23
export HIVE_HOME=/app/hadoop/hive012/src/build/dist export HIVE_DEV_HOME=/app/hadoop/hive012/src export HADOOP_HOME=/app/hadoop/hadoop220D:启动
sbt/sbt hive/console
/*源自 sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala */
// The test tables that are defined in the Hive QTestUtil.
// /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
val hiveQTestUtilTables = Seq(
TestTable("src",
"CREATE TABLE src (key INT, value STRING)".cmd,
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd),
TestTable("src1",
"CREATE TABLE src1 (key INT, value STRING)".cmd,
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),
TestTable("srcpart", () => {
runSqlHive(
"CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
runSqlHive(
s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
|OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')
""".stripMargin)
}
}),
......
)因为要使用hive0.12的测试数据,所以需要定义两个环境变量:HIVE_HOME和HIVE_DEV_HOME,如果使用hive0.13的话,用户需要更改到相应目录: /*源自 sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala */
/** The location of the compiled hive distribution */
lazy val hiveHome = envVarToFile("HIVE_HOME")
/** The location of the hive source code. */
lazy val hiveDevHome = envVarToFile("HIVE_DEV_HOME") /* 源自 project/SparkBuild.scala */
object Hive {
lazy val settings = Seq(
javaOptions += "-XX:MaxPermSize=1g",
// Multiple queries rely on the TestHive singleton. See comments there for more details.
parallelExecution in Test := false,
// Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings
// only for this subproject.
scalacOptions <<= scalacOptions map { currentOpts: Seq[String] =>
currentOpts.filterNot(_ == "-deprecation")
},
initialCommands in console :=
"""
|import org.apache.spark.sql.catalyst.analysis._
|import org.apache.spark.sql.catalyst.dsl._
|import org.apache.spark.sql.catalyst.errors._
|import org.apache.spark.sql.catalyst.expressions._
|import org.apache.spark.sql.catalyst.plans.logical._
|import org.apache.spark.sql.catalyst.rules._
|import org.apache.spark.sql.catalyst.types._
|import org.apache.spark.sql.catalyst.util._
|import org.apache.spark.sql.execution
|import org.apache.spark.sql.hive._
|import org.apache.spark.sql.hive.test.TestHive._
|import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin
)
}//在控制台逐行运行
case class Person(name:String, age:Int, state:String)
sparkContext.parallelize(Person("Michael",29,"CA")::Person("Andy",30,"NY")::Person("Justin",19,"CA")::Person("Justin",25,"CA")::Nil).registerTempTable("people")
val query= sql("select * from people")query.printSchema
query.queryExecution
query.queryExecution.logical
query.queryExecution.analyzed
query.queryExecution.optimizedPlan
query.queryExecution.sparkPlan
query.toDebugString
{
"fullname": "Sean Kelly",
"org": "SK Consulting",
"emailaddrs": [
{"type": "work", "value": "kelly@seankelly.biz"},
{"type": "home", "pref": 1, "value": "kelly@seankelly.tv"}
],
"telephones": [
{"type": "work", "pref": 1, "value": "+1 214 555 1212"},
{"type": "fax", "value": "+1 214 555 1213"},
{"type": "mobile", "value": "+1 214 555 1214"}
],
"addresses": [
{"type": "work", "format": "us",
"value": "1234 Main StnSpringfield, TX 78080-1216"},
{"type": "home", "format": "us",
"value": "5678 Main StnSpringfield, TX 78080-1316"}
],
"urls": [
{"type": "work", "value": "http://seankelly.biz/"},
{"type": "home", "value": "http://seankelly.tv/"}
]
}去空格和换行符后保存为/home/mmicky/data/nestjson.json,使用jsonFile读入并注册成表jsonPerson,然后定义一个查询jsonQuery:jsonFile("/home/mmicky/data/nestjson.json").registerTempTable("jsonPerson")
val jsonQuery = sql("select * from jsonPerson")jsonQuery.printSchema
jsonQuery.queryExecution
parquetFile("/home/mmicky/data/spark/wiki_parquet").registerTempTable("parquetWiki")
val parquetQuery = sql("select * from parquetWiki")parquetQuery.printSchema
parquetQuery.queryExecution
val hiveQuery = sql("select * from sales")hiveQuery.printSchema
hiveQuery.queryExecution
sql("select state,avg(age) from people group by state").queryExecutionsql("select state,avg(age) from people group by state").toDebugStringsql("select a.name,b.name from people a join people b where a.name=b.name").queryExecutionsql("select a.name,b.name from people a join people b where a.name=b.name").toDebugStringsql("select distinct a.name,b.name from people a join people b where a.name=b.name").queryExecutionsql("select distinct a.name,b.name from people a join people b where a.name=b.name").toDebugStringsql("select name from (select * from people where age >=19) a where a.age <30").queryExecutionsql("select name from (select name,state as location from people) a where location='CA'").queryExecutionsql("select name,1+2 from people").queryExecutionobject CombineFilters extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Filter(c1, Filter(c2, grandChild)) =>
Filter(And(c1,c2),grandChild)
}
}val query= sql("select * from people").where('age >=19).where('age <30)
query.queryExecution.analyzedCombineFilters(query.queryExecution.analyzed)
val hiveQuery = sql("SELECT * FROM (SELECT * FROM src) a")
hiveQuery.queryExecution.analyzedhiveQuery.queryExecution.analyzed transform {
case Project(projectList, child) if projectList == child.output => child
}sparkSQL1.1入门之四:深入了解sparkSQL运行计划
原文:http://blog.csdn.net/book_mmicky/article/details/40370607