大数据开发技术之Spark SQL的多种使用方法
Spark SQL支持多种数据源,如JDBC、HDFS、HBase。它的内部组件,如SQL的语法解析器、分析器等支持重定义进行扩展,能更好的满足不同的业务场景。与Spark Core无缝集成,提供了DataSet/DataFrame的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。

DataSet/DataFrame
DataSet/DataFrame都是Spark SQL提供的分布式数据集,相对于RDD而言,除了记录数据以外,还记录表的schema信息。
DataFrame是DataSet以命名列方式组织的分布式数据集,类似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame变成类型为Row的Dataset:
type DataFrame = Dataset[Row]。
DataFrame在编译期不进行数据中字段的类型检查,在运行期进行检查。但DataSet则与之相反,因为它是强类型的。此外,二者都是使用catalyst进行sql的解析和优化。为了方便,以下统一使用DataSet统称。
DataSet创建
DataSet通常通过加载外部数据或通过RDD转化创建。
1.加载外部数据
以加载json和mysql为例:
..() ..() .(( , , , , )).()
2.RDD转换为DataSet
通过RDD转化创建DataSet,关键在于为RDD指定schema,通常有两种方式(伪代码):
)
.().(.())
)
(:, :, :)
)
.( (()., (), ().))
)
.
(.().( (, , )))
.(((),()))
.(,)
操作DataSet的两种风格语法
DSL语法
1.查询DataSet部分列中的内容
personDS.select(col("name"))
personDS.select(col("name"), col("age"))
2.查询所有的name和age和salary,并将salary加1000
personDS.select(col("name"), col("age"), col("salary") + 1000)
personDS.select(personDS("name"), personDS("age"), personDS("salary") + 1000)
3.过滤age大于18的
personDS.filter(col("age") > 18)
4.按年龄进行分组并统计相同年龄的人数
personDS.groupBy("age").count()
注意:直接使用col方法需要import org.apache.spark.sql.functions._
SQL语法
如果想使用SQL风格的语法,需要将DataSet注册成表
personDS.registerTempTable("person")
//查询年龄最大的前两名
val result = sparkSession.sql("select * from person order by age desc limit 2")
//保存结果为json文件。注意:如果不指定存储格式,则默认存储为parquet
result.write.format("json").save("hdfs://ip:port/res2")
Spark SQL的几种使用方式
1.sparksql-shell交互式查询
就是利用Spark提供的shell命令行执行SQL
2.编程
首先要获取Spark SQL编程"入口":SparkSession(当然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive则为HiveContext)。这里以读取parquet为例:
val spark = SparkSession.builder()
.appName("example").master("local[*]").getOrCreate();
val df = sparkSession.read.format("parquet").load("/路径/parquet文件")
然后就可以针对df进行业务处理了。
3.Thriftserver
beeline客户端连接操作
启动spark-sql的thrift服务,sbin/start-thriftserver.sh,启动脚本中配置好Spark集群服务资源、地址等信息。然后通过beeline连接thrift服务进行数据处理。
hive-jdbc驱动包来访问spark-sql的thrift服务
在项目pom文件中引入相关驱动包,跟访问mysql等jdbc数据源类似。示例:
.()
.(, , );
{
.()
.()
(.()) {
(.())
}
} {
: .()
} {
() .()
}
Spark SQL 获取Hive数据
Spark SQL读取hive数据的关键在于将hive的元数据作为服务暴露给Spark。除了通过上面thriftserver jdbc连接hive的方式,也可以通过下面这种方式:
首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下内容:
<property>
<name>hive.metastore.uris</name>
<value>thrift://ip:port</value>
</property>
然后,启动hive metastore
最后,将hive-site.xml复制或者软链到$SPARK_HOME/conf/。如果hive的元数据存储在mysql中,那么需要将mysql的连接驱动jar包如mysql-connector-java-5.1.12.jar放到$SPARK_HOME/lib/下,启动spark-sql即可操作hive中的库和表。而此时使用hive元数据获取SparkSession的方式为:
val spark = SparkSession.builder()
.config(sparkConf).enableHiveSupport().getOrCreate()
UDF、UDAF、Aggregator
UDF
UDF是最基础的用户自定义函数,以自定义一个求字符串长度的udf为例:
{(:) .}
..(,)
..()
.()
.()
UDAF
定义UDAF,需要继承抽象类UserDefinedAggregateFunction,它是弱类型的,下面的aggregator是强类型的。以求平均数为例:
....{, }
.....
.....
.....
{
: ((, ) :: )
: {
((, ) :: (, ) :: )
}
:
:
(: ): {
()
()
}
(: , : ): {
(.()) {
() .() .()
() .()
}
}
(: , : ): {
() .() .()
() .() .()
}
(: ): .(). .()
}
..(, )
..()
.()
.()
.()
.()
Aggregator
....{, , }
.....
(: , : )
( : , : )
[, , ] {
: (, )
(: , : ): {
. .
.
}
(: , : ): {
. .
. .
}
(: ): .. .
: [] .
: [] .
}
..().[]
.()
..()
.()
.()
