在本文中, 我们借由深入剖析wordcount.py, 来揭开Spark内部各种概念的面纱。我们再次回顾wordcount.py代码来回答如下问题
成都创新互联公司专注于企业成都营销网站建设、网站重做改版、夏津网站定制设计、自适应品牌网站建设、H5技术、成都做商城网站、集团公司官网建设、成都外贸网站制作、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为夏津等各大城市提供网站开发制作服务。对于大多数语言的Hello Word示例,都有main()函数, wordcount.py的main函数,或者说调用Spark的main() 在哪里
数据的读入,各个RDD数据如何转换
map与flatMap的工作机制,以及区别
reduceByKey的作用
WordCount.py 的代码如下:
from __future__ import print_functionimport sysfrom operator import add# SparkSession:是一个对Spark的编程入口,取代了原本的SQLContext与HiveContext,方便调用Dataset和DataFrame API# SparkSession可用于创建DataFrame,将DataFrame注册为表,在表上执行SQL,缓存表和读取parquet文件。from pyspark.sql import SparkSessionif __name__ == "__main__": # Python 常用的简单参数传入 if len(sys.argv) != 2: print("Usage: wordcountSpark 入口 SparkSession
Spark2.0中引入了SparkSession的概念,它为用户提供了一个统一的切入点来使用Spark的各项功能,这边不妨对照Http Session, 在此Spark就在充当Web service的角色,程序调用Spark功能的时候需要先建立一个Session。因此看到getOrCreate()就很容易理解了, 表明可以视情况新建session或利用已有的session。
spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate()既然将Spark 想象成一个Web server, 也就意味着可能用多个访问在进行,为了便于监控管理, 对应用命名一个恰当的名称是个好办法。Web UI并不是本文的重点,有兴趣的同学可以参考 Spark Application’s Web Console
加载数据
在建立SparkSession之后, 就是读入数据并写入到Dateset中。
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])为了更好的分解执行过程,是时候借助PySpark了, PySpark是python调用Spark的 API,它可以启动一个交互式Python Shell。为了方便脚本调试,暂时切换到Linux执行
# pysparkPython 2.7.6 (default, Jun 22 2015, 17:58:13) [GCC 4.8.2] on linux2 Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/02/23 08:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/02/23 08:30:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 17/02/23 08:30:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 17/02/23 08:30:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ Using Python version 2.7.6 (default, Jun 22 2015 17:58:13) SparkSession available as 'spark'.>>> ds = spark.read.text('/home/spark2.1/spark/examples/src/main/python/a.txt')>>> type(ds)交互式Shell的好处是可以方便的查看变量内容和类型。此刻文件a.txt已经加载到lines中,它是RDD(Resilient Distributed Datasets)弹性分布式数据集的实例。
RDD操作
RDD在内存中的结构可以参考论文, 理解RDD有两点比较重要:
一是RDD一种只读、只能由已存在的RDD变换而来的共享内存,然后将所有数据都加载到内存中,方便进行多次重用。
二是RDD的数据默认情况下存放在集群中不同节点的内存中,本身提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。
为了探究RDD内部的数据内容,可以利用collect()函数, 它能够以数组的形式,返回RDD数据集的所有元素。
>>> lines = ds.rdd>>> for i in lines.collect():... print i... Row(value=u'These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.')lines存储的是Row object类型,而我们希望的是对String类型进行处理,所以需要利用map api进一步转换RDD
>>> lines_map = lines.map(lambda x: x[0])>>> for i in lines_map.collect():... print i... These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.为了统计每个单词的出现频率,需要对每个单词分别统计,那么第一步需要将上面的字符串以空格作为分隔符将单词提取出来,并为每个词设置一个计数器。比如 These出现次数是1, 我们期望的数据结构是['There', 1]。但是如何将包含字符串的RDD转换成元素为类似 ['There', 1] 的RDD呢?
>>> flat_map = lines_map.flatMap(lambda x: x.split(' '))>>> rdd_map = flat_map.map(lambda x: [x, 1])>>> for i in rdd_map.collect():... print i... [u'These', 1] [u'examples', 1] [u'give', 1] [u'a', 1] [u'quick', 1]下图简要的讲述了flatMap 和 map的转换过程。
transfrom.png
不难看出,map api只是为所有出现的单词初始化了计数器为1,并没有统计相同词,接下来这个任务由reduceByKey()来完成。在rdd_map 中,所有的词被视为一个key,而key相同的value则执行reduceByKey内的算子操作,因为统计相同key是累加操作,所以可以直接add操作。
>>> from operator import add>>> add_map = rdd_map.reduceByKey(add)>>> for i in add_map.collect():... print i... (u'a', 1) (u'on', 1) (u'of', 2) (u'arbitrary', 1) (u'quick', 1) (u'the', 2) (u'or', 1)>>> print rdd_map.count()26>>> print add_map.count()23根据a.txt 的内容,可知只有 of 和 the 两个单词出现了两次,符合预期。
总结
以上的分解步骤,可以帮我们理解RDD的操作,需要提示的是,RDD将操作分为两类:transformation与action。无论执行了多少次transformation操作,RDD都不会真正执行运算,只有当action操作被执行时,运算才会触发。也就是说,上面所有的RDD都是通过collect()触发的, 那么如果将上述的transformation放入一条简练语句中, 则展现为原始wordcount.py的书写形式。
counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add)而真正的action 则是由collect()完成。
output = counts.collect()至此,已经完成了对wordcount.py的深入剖析,但是有意的忽略了一些更底层的执行过程,比如DAG, stage, 以及Driver程序。
作者:或然子
链接:https://www.jianshu.com/p/067907b23546
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
新闻名称:PySpark进阶--深入剖析wordcount.py-创新互联
标题网址:http://cqwzjz.cn/article/ccchjo.html