大数据入门与实战-PySpark的使用教程

1 PySpark简介

Apache Spark是用Scala编程语言编写的。为了用Spark支持Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,您也可以使用Python编程语言处理RDD。正是由于一个名为Py4j的库,他们才能实现这一目标。
这里不介绍PySpark的环境设置,主要介绍一些实例,以便快速上手。

2 PySpark - SparkContext

SparkContext是任何spark功能的入口点。当我们运行任何Spark应用程序时,会启动一个驱动程序,它具有main函数,并且此处启动了SparkContext。然后,驱动程序在工作节点上的执行程序内运行操作。

SparkContext使用Py4J启动JVM并创建JavaSparkContext。默认情况下,PySpark将SparkContext作为'sc'提供,因此创建新的SparkContext将不起作用。

以下代码块包含PySpark类的详细信息以及SparkContext可以采用的参数。

class pyspark.SparkContext (
   master = None,
   appName = None,
   sparkHome = None,
   pyFiles = None,
   environment = None,
   batchSize = 0,
   serializer = PickleSerializer(),
   conf = None,
   gateway = None,
   jsc = None,
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

以下是SparkContext的参数具体含义:

  • Master- 它是连接到的集群的URL。

  • appName- 您的工作名称。

  • sparkHome - Spark安装目录。

  • pyFiles - 要发送到集群并添加到PYTHONPATH的.zip或.py文件。

  • environment - 工作节点环境变量。

  • batchSize - 表示为单个Java对象的Python对象的数量。设置1以禁用批处理,设置0以根据对象大小自动选择批处理大小,或设置为-1以使用无限批处理大小。

  • serializer- RDD序列化器。

  • Conf - L {SparkConf}的一个对象,用于设置所有Spark属性。

  • gateway - 使用现有网关和JVM,否则初始化新JVM。

  • JSC - JavaSparkContext实例。

  • profiler_cls - 用于进行性能分析的一类自定义Profiler(默认为pyspark.profiler.BasicProfiler)。
    在上述参数中,主要使用master和appname。任何PySpark程序的会使用以下两行:

from pyspark import SparkContext
sc = SparkContext("local", "First App")

2.1 SparkContext示例 - PySpark Shell

现在你对SparkContext有了足够的了解,让我们在PySpark shell上运行一个简单的例子。在这个例子中,我们将计算README.md文件中带有字符“a”或“b”的行数。那么,让我们说如果一个文件中有5行,3行有字符'a',那么输出将是→ Line with a:3。字符'b'也是如此。

注 - 我们不会在以下示例中创建任何SparkContext对象,因为默认情况下,当PySpark shell启动时,Spark会自动创建名为sc的SparkContext对象。如果您尝试创建另一个SparkContext对象,您将收到以下错误 - “ValueError:无法一次运行多个SparkContexts”。

在终端输入pyspark 启动PySpark Shell:

>>> logFile="file:////opt/modules/hadoop-2.8.5/README.txt"
>>> logData=sc.textFile(logFile).cache()
>>> numAs=logData.filter(lambda s:'a' in s).count()
>>> numBs=logData.filter(lambda s:'b' in s).count()
>>> print("Line with a:%i,line with b:%i" % (numAs,numBs))
Line with a:25,line with b:7

2.2 SparkContext示例 - Python程序

让我们使用Python程序运行相同的示例。创建一个名为demo.py的Python文件,并在该文件中输入以下代码。

from pyspark import SparkContext
logFile = "file:////opt/modules/hadoop-2.8.5/README.txt"
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Line with a:%i,lines with b :%i" % (numAs, numBs))

然后我们将在终端中执行以下命令来运行此Python文件。我们将得到与上面相同的输出。

spark-submit demo.py

3 PySpark - RDD

在介绍PySpark处理RDD操作之前,我们先了解下RDD的基本概念:

RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素。RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改。RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以对这些RDD应用多个操作来完成某项任务。

要对这些RDD进行操作,有两种方法 :

  • Transformation

  • Action

转换 - 这些操作应用于RDD以创建新的RDD。Filter,groupBy和map是转换的示例。

操作 - 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序。

要在PySpark中应用任何操作,我们首先需要创建一个PySpark RDD。以下代码块具有PySpark RDD类的详细信息 :

class pyspark.RDD (
   jrdd,
   ctx,
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

接下来让我们看看如何使用PySpark运行一些基本操作,用以下代码创建存储一组单词的RDD(spark使用parallelize方法创建RDD),我们现在将对单词进行一些操作。

3.1 count()

返回RDD中的元素个数

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
counts = words.count()
print("Number of elements in RDD -> %i" % counts)

执行spark-submit count.py,将会输出以下结果

Number of elements in RDD → 8

3.2 collect()

返回RDD中的所有元素

----------------------------------------collect.py - --------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "collect app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
coll = words.collect()
print("Elements in RDD -> %s" % coll)

执行spark-submit collect.py 输出以下结果

Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

3.3 foreach(func)

仅返回满足foreach内函数条件的元素。在下面的示例中,我们在foreach中调用print函数,该函数打印RDD中的所有元素。

# foreach.py
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)

执行spark-submit foreach.py,然后输出:

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

3.4 filter(f)

返回一个包含元素的新RDD,它满足过滤器内部的功能。在下面的示例中,我们过滤掉包含''spark'的字符串。

# filter.py
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))

执行spark-submit filter.py:

Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

3.5 map(f, preservesPartitioning = False)

通过将该函数应用于RDD中的每个元素来返回新的RDD。在下面的示例中,我们形成一个键值对,并将每个字符串映射为值1

# map.py
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print("Key value pair -> %s" % (mapping))

执行spark-submit map.py

Key value pair -> [('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1)]

3.6 reduce(f)

执行指定的可交换和关联二元操作后,将返回RDD中的元素。在下面的示例中,我们从运算符导入add包并将其应用于'num'以执行简单的加法运算。说白了和Python的reduce一样:假如有一组整数[x1,x2,x3],利用reduce执行加法操作add,对第一个元素执行add后,结果为sum=x1,然后再将sum和x2执行add,sum=x1+x2,最后再将x2和sum执行add,此时sum=x1+x2+x3。

# reduce.py
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print("Adding all the elements -> %i" % (adding))

执行spark-submit reduce.py:

Adding all the elements -> 15

3.7 join(other, numPartitions = None)

它返回RDD,其中包含一对带有匹配键的元素以及该特定键的所有值。

from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print( "Join RDD -> %s" % (final))

执行spark-submit join.py:

Join RDD -> [
   ('spark', (1, 2)),
   ('hadoop', (4, 5))
]

reference:https://www.tutorialspoint.com/pyspark/index.htm

(0)

相关推荐

  • 没有自己的服务器如何学习生物数据分析(上篇)

    编者注:完整文章首发于作者博客 http://huboqiang.cn/ 在这篇文章中,作者利用大数据平台 IBM data science 对生信技能树论坛的一道生物信息入门题进行了分析. 由于文章 ...

  • win10 安装 spark单机版(失败版)

    书里面使用这个spark,我这里就配置一下(失败了...) 从这里下载 https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3. ...

  • 搭建Spark计算平台 python操作Spark

    一.Spark安装及服务启动 Apache Spark是一种快速的集群计算技术,专为快速计算而设计.它基于Hadoop MapReduce,它扩展了MapReduce模型,以有效地将其用于更多类型的计 ...

  • 大数据基础:Spark工作原理及基础概念

    一.Spark 介绍及生态Spark是UC Berkeley AMP Lab开源的通用分布式并行计算框架,目前已成为Apache软件基金会的顶级开源项目.至于为什么我们要学习Spark,可以总结为下面 ...

  • Spark架构与运行流程

    Spark架构与运行流程

  • Spark 单机环境配置

    概要 大数据和人工智能已经宣传了好多年, Hadoop 和 Spark 也已经发布了很长时间, 一直想试试, 但是工作也遇不到使用的场景, 就一直拖着. 这次在极客时间上选了蔡元楠老师的<大规模 ...

  • 全面解析Spark,以及和Python的对接

    作者:@古明地盆 喜欢这篇文章的话,就点个关注吧,或者去bilibili看看我也行,虽然啥也没有.:https://space.bilibili.com/12921175 楔子 这次我们来聊一聊 Sp ...

  • 如何使用pyspark统计词频?

    假如进化的历史重来一遍, 人的出现概率是零.  -- 古德尔   Spark 作为一个用途广泛的大数据运算平台. Spark 允许用户将数据加载到多台计算机所建立的 cluster 集群的内存中存储, ...

  • 在线教育大数据营销平台实战(三):数据产品实施推广实战

    不仅仅是数据产品,所有的B端产品都会面临实施效果问题,这里所说的B端产品不仅包括ToB企业服务的商业化B端产品,也包括企业内部自建的B端产品. 内部自建B端产品实施难度相对来说小一些,因为你的用户就是 ...

  • 在线教育大数据营销平台实战(一):大数据平台构建实战

    第一篇文章,按照惯例先做个自我介绍.本人目前在一家在线教育公司担任大数据营销产品负责人,由于一些机缘巧合,我同时负责了数据产品线和营销CRM产品线,因此给了我更多的机会去思考和实践如何把数据与营销业务 ...

  • 大数据Spark企业级实战与Hadoop实战|

    本书完全从企业处理大数据业务场景的角度出发,完全基于实战代码来组织内容,从零起步,完全无痛地掌握Spark大数据处理实战技术. 本书能满足读者全面学习最新的Hadoop技术及其相关(Hive.Hbas ...

  • 大数据开发之Hadoop集群安装教程

    配置文件的修改 注意:以下所有操作都在node01主机进行. 1.1 hadoop-env.sh 1.介绍 文件中设置的是Hadoop运行时需要的环境变量.JAVA_HOME是必须设置的,即使我们当前 ...

  • 【案例】茶叶行业大数据专家解读溯源实战秘笈

    当李克强总理提出"互联网+"时,很多专家赶时髦似地开始各种解读."互联网+"对于茶行业实际存在的问题,不是灵丹妙药立马见效.不但需要时间,更需要实践!而最好的办 ...

  • 大数据 ETL 处理工具 Kettle 入门实践

    Kettle 简介 Kettle 是什么 Kettle 是一款国外开源的 ETL 工具,对商业用户也没有限制,纯 Java 编写,可以在 Window.Linux.Unix 上运行,绿色无需安装,数据 ...

  • 入门“大数据”,你需要通读这18本书

    大数据或称巨量资料,指的是需要新处理模式才能具有更强的决策力.洞察力和流程优化能力的海量.高增长率和多样化的信息资产.在移动互联网快速发展的当下,越来越多的工作岗位会运用到大数据技术.了解学习&quo ...

  • 《大数据是这样计算的》:Xlab实例入门 | 每周一本书

    数据猿导读 本周给大家推荐一本<大数据是这样计算的>,是关于Xlab实例入门,也是阿里巴巴集团重点推荐的的技术丛书.XLab是阿里巴巴集团内部最大的大数据算法平台,作者是深居阿里大数据算法 ...

  • 每周一本书 |《大数据时代营销人的变革》:送你实战”宝典“,希望不要再盲干!

    <数据猿导读> 本周给大家推荐的这本<大数据时代营销人的变革>,绝对是营销人的实战"宝典".本书以大数据和机器学习为基础,涵盖了从零售到出版.从软件到制造的 ...