大数据开发技术之Spark SQL的多种使用方法

Spark SQL支持多种数据源,如JDBC、HDFS、HBase。它的内部组件,如SQL的语法解析器、分析器等支持重定义进行扩展,能更好的满足不同的业务场景。与Spark Core无缝集成,提供了DataSet/DataFrame的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。

DataSet/DataFrame

DataSet/DataFrame都是Spark SQL提供的分布式数据集,相对于RDD而言,除了记录数据以外,还记录表的schema信息。

DataFrame是DataSet以命名列方式组织的分布式数据集,类似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame变成类型为Row的Dataset:

type DataFrame = Dataset[Row]。

DataFrame在编译期不进行数据中字段的类型检查,在运行期进行检查。但DataSet则与之相反,因为它是强类型的。此外,二者都是使用catalyst进行sql的解析和优化。为了方便,以下统一使用DataSet统称。

DataSet创建

DataSet通常通过加载外部数据或通过RDD转化创建。

1.加载外部数据

以加载json和mysql为例:

..()

   ..()
.((  ,
  ,
  ,   ,   )).()

2.RDD转换为DataSet

通过RDD转化创建DataSet,关键在于为RDD指定schema,通常有两种方式(伪代码):

)
   .().(.())

)
  (:, :, :)

)
   .(  (()., (), ().))

)
  .

    (.().(  (, , )))

   .(((),()))

   .(,)

操作DataSet的两种风格语法

DSL语法

1.查询DataSet部分列中的内容

personDS.select(col("name"))

personDS.select(col("name"), col("age"))

2.查询所有的name和age和salary,并将salary加1000

personDS.select(col("name"), col("age"), col("salary") + 1000)

personDS.select(personDS("name"), personDS("age"), personDS("salary") + 1000)

3.过滤age大于18的

personDS.filter(col("age") > 18)

4.按年龄进行分组并统计相同年龄的人数

personDS.groupBy("age").count()

注意:直接使用col方法需要import org.apache.spark.sql.functions._

SQL语法

如果想使用SQL风格的语法,需要将DataSet注册成表

personDS.registerTempTable("person")

//查询年龄最大的前两名

val result = sparkSession.sql("select * from person order by age desc limit 2")

//保存结果为json文件。注意:如果不指定存储格式,则默认存储为parquet

result.write.format("json").save("hdfs://ip:port/res2")

Spark SQL的几种使用方式

1.sparksql-shell交互式查询

就是利用Spark提供的shell命令行执行SQL

2.编程

首先要获取Spark SQL编程"入口":SparkSession(当然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive则为HiveContext)。这里以读取parquet为例:

val spark = SparkSession.builder()
.appName("example").master("local[*]").getOrCreate();

val df = sparkSession.read.format("parquet").load("/路径/parquet文件")

然后就可以针对df进行业务处理了。

3.Thriftserver

beeline客户端连接操作

启动spark-sql的thrift服务,sbin/start-thriftserver.sh,启动脚本中配置好Spark集群服务资源、地址等信息。然后通过beeline连接thrift服务进行数据处理。

hive-jdbc驱动包来访问spark-sql的thrift服务

在项目pom文件中引入相关驱动包,跟访问mysql等jdbc数据源类似。示例:

.()
   .(, , );
 {
     .()
     .()
   (.()) {
    (.())
  }
}  {
   :   .()
} {
  () .()
}

Spark SQL 获取Hive数据

Spark SQL读取hive数据的关键在于将hive的元数据作为服务暴露给Spark。除了通过上面thriftserver jdbc连接hive的方式,也可以通过下面这种方式:

首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下内容:

<property>

<name>hive.metastore.uris</name>

<value>thrift://ip:port</value>

</property>

然后,启动hive metastore

最后,将hive-site.xml复制或者软链到$SPARK_HOME/conf/。如果hive的元数据存储在mysql中,那么需要将mysql的连接驱动jar包如mysql-connector-java-5.1.12.jar放到$SPARK_HOME/lib/下,启动spark-sql即可操作hive中的库和表。而此时使用hive元数据获取SparkSession的方式为:

val spark = SparkSession.builder()

.config(sparkConf).enableHiveSupport().getOrCreate()

UDF、UDAF、Aggregator

UDF

UDF是最基础的用户自定义函数,以自定义一个求字符串长度的udf为例:

{(:)  .}
..(,)
  ..()
.()
.()

UDAF

定义UDAF,需要继承抽象类UserDefinedAggregateFunction,它是弱类型的,下面的aggregator是强类型的。以求平均数为例:

....{, }
 .....
 .....
 .....

    {

   :   ((, ) :: )

   :   {
    ((, ) :: (, ) :: )
  }

   :   

   :   

   (: ):   {
    ()
    ()
  }

   (: , : ):   {
     (.()) {
      ()  .()  .()
      ()  .()
    }
  }

   (: , : ):   {
    ()  .()  .()
    ()  .()  .()
  }

   (: ):   .().  .()
}

..(, )

   ..()
.()
.()
   .()
.()

Aggregator

....{, , }
 .....

  (: , : )
  ( : ,  : )

   [, , ] {

   :   (, )

   (: , : ):   {
    .  .
    .  

  }

   (: , : ):   {
    .  .
    .  .

  }

   (: ):   ..  .

   : []  .

   : []  .
}

   ..().[]
.()

   ..()
   .()
.()
(0)

相关推荐

  • Ambari HDP 下 SPARK2 与 Phoenix 整合

    Ambari HDP 下 SPARK2 与 Phoenix 整合

  • 理解Spark SQL(二)—— SQLContext和HiveContext

    使用Spark SQL,除了使用之前介绍的方法,实际上还可以使用SQLContext或者HiveContext通过编程的方式实现.前者支持SQL语法解析器(SQL-92语法),后者支持SQL语法解析器 ...

  • Hive,Hive on Spark和SparkSQL区别

    Hive on Mapreduce Hive的原理大家可以参考这篇大数据时代的技术hive:hive介绍,实际的一些操作可以看这篇笔记:新手的Hive指南,至于还有兴趣看Hive优化方法可以看看我总结 ...

  • 理解Spark SQL(三)—— Spark SQL程序举例

    上一篇说到,在Spark 2.x当中,实际上SQLContext和HiveContext是过时的,相反是采用SparkSession对象的sql函数来操作SQL语句的.使用这个函数执行SQL语句前需要 ...

  • Spark计算引擎之SparkSQL详解

    一.Spark SQL 二. Spark SQL 1. Spark SQL概述 1.1. Spark SQL的前世今生 Shark是一个为Spark设计的大规模数据仓库系统,它与Hive兼容.Shar ...

  • 大数据开发技术之Spark Job物理执行解析

    一个复杂 job 逻辑执行图: 代码贴在本章最后.给定这样一个复杂数据依赖图,如何合理划分 stage,并未确定 task 的类型和个数? 一个直观想法是将前后关联的 RDDs 组成一个 stage, ...

  • 大数据开发面试必知必会的SQL 30题!!!

    原文链接: https://blog.csdn.net/weixin_45366499/article/details/116355430 作者: 一蓑烟雨任平生 (1)查询每个区域的用户数 (2)查 ...

  • 大数据安全分析07_大数据存储技术介绍

    鉴于网络安全数据组成的复杂性.规模,以及对实时搜索响应的需求,需要通过大数据存储集群快速实现空间的扩容,在PB级的安全数据中做到安全分析查询的秒级响应,同时需要为数据提供了冗余机制,保障数据的安全. ...

  • 海洋讲坛▏刘仁义:浙江大学海洋大数据创新技术及应用

    [作者简介]刘仁义,浙江大学教授,博士生导师.浙江大学GIS重点实验室主任,地理信息科学研究所所长,浙江大学遥感与地理信息系统学科带头人,浙江省跨世纪151人才,教育部地理科学教指委委员.近五年主持国 ...

  • 12个人工智能与大数据开发注意点

    人工智能是近年来科技发展的重要方向,在大数据时代,对数据采集.挖掘.应用的技术越来越受到瞩目.在人工智能和大数据的开发过程中,有哪些特别需要注意的要点? 人工智能领域的算法大师.华盛顿大学教授Pedr ...

  • 大数据平台技术架构方案(ppt)

    大数据平台是为了计算,现今社会所产生的越来越大的数据量,以存储.运算.展现作为目的的平台.大数据技术是指从各种各样类型的数据中,快速获得有价值信息的能力.适用于大数据的技术,包括大规模并行处理(MPP ...

  • 三网运营商大数据的技术原理是什么?

    三网运营商大数据的技术原理是什么?

  • 算法架构系列活动—大数据风控技术应用

    "羊毛党"始于线下.兴于线上,他的兴起与互联网的发展紧密相关.近几年,为了吸引注册用户,O2O企业.电商平台和各种互金网站的营销手段越来越多样化,微信红包.电商优惠券.电商免单之类 ...

  • 《魔方大数据(10):大数据预测技术的应用与发展》顺利落幕!

    <数据猿导读> 9月21日,在<魔方大数据(10):大数据预测技术的应用与发展>活动上,来自大数据不同领域的企业和技术人员,就目前大数据预测技术发展现状.技术难题.底层数据构架 ...