浅谈Spark与Python

Spark是什么?

参见:https://www.cnblogs.com/Vito2008/p/5216324.html

Spark是分布式计算框架,可以将大量数据集的计算任务分配到多台计算机上,提供高效内存计算。

分布式计算框架要解决两个问题:如何分发数据和如何分发计算?

Hadoop使用HDFS(分布式文件系统)来解决分布式数据问题,MapReduce计算范式提供有效的分布式计算。

而MapReduce要求每个步骤间的数据要序列化到磁盘,这意味着MapReduce作业的I/O成本很高,导致交互分析和迭代算法开销很大。

Spark与Hadoop相似,拥有Hadoop MapReduce所具有的优点,但不同于MapReduce的是,Spark拥有多种语言的函数式编程API,提供了除map和reduce之外更多的运算符,这些操作是通过弹性分布式数据集RDD进行的,可以将Job的中间输出结果保存在内存中,从而不再需要读写HDFS,因此,Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce算法。

Spark核心组件及功能

Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。
Spark SQL:提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。
Spark Streaming:允许对实时数据流进行处理和控制。
MLlib:常用机器学习算法库,包括分类、回归等需要对大量数据集进行迭代的操作。
GraphX:控制图、并行图操作和计算的一组算法和工具的集合。

Spark本机安装

参见:http://coredumper.cn/index.php/2017/07/02/spark-run-environment-build/
下载地址:https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz

cd spark-2.1.1-bin-hadoop2.7

Spark与Python交互

参见:https://www.cnblogs.com/yangzhang-home/p/6056133.html

./bin/pyspark

val textFile = sc.textFile("README.md")
textFile.count()
textFile.first()
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
textFile.filter(line => line.contains("Spark")).count()

Python的map和reduce函数

参见:
https://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/00141861202544241651579c69d4399a9aa135afef28c44000

map(square,[1,2,3,4])
map(f, [x1, x2, x3, x4]) = [f(x1),f(x2),f(x3),f(x4)]
reduce(f, [x1, x2, x3, x4]) = f(f(f(x1, x2), x3), x4)

弹性分布式数据集RDD是什么?

RDD(Resilient Distributed Dataset)支持两种类型的操作:actions和transformations
actions: 在数据集上运行计算后返回值,包括collect, reduce, count, save, lookupKey等。
transformations: 转换,从现有数据集创建一个新的数据集,包括map, flatMap, filter, union, sample, join, groupByKey, cogroup, ReduceByKey, cros, sortByKey, mapValues等。

创建RRD

textFile = sc.textFile("README.md")
textFile.count()
textFile.first()
linesWithSpark = textFile.filter(lambda line: "Spark" in line)
textFile.filter(lambda line: "Spark" in line).count() 
textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a>b) else b)
#or
def max(a, b):
	if a > b:
		return a
	else:
		return b
textFile.map(lambda line: len(line.split())).reduce(max)
wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
#flatMap(func):与map相似,但是每个输入的item能够被map到0个或者更多的输出items上,也就是说func的返回值应当是一个Seq,而不是一个单独的item
#reduceByKey(func):可以作用于使用"键-值"(K, V)形式存储的数据集上,并返回一组新的数据集(K, V)
wordCounts.collect()

RRD缓存

linesWithSpark.cache()
linesWithSpark.count()

自含式应用程序(self-contained applications)调用

./bin/spark-submit --master local[4] python/pyspark/version.py
#更多例子脚本见 examples\src\main\python

Spark执行过程

text = sc.textFile("README.md")
#print text

from operator import add
def tokenize(text):
	return text.split()

words = text.flatMap(tokenize)
#print words

wc = words.map(lambda x: (x,1))
print wc.toDebugString()  #查看PipelinedRDD是怎么被转换的

counts = wc.reduceByKey(add)
counts.saveAsTextFile("wc")  #一旦我们最终调用了saveAsTextFile动作,这个分布式作业就开始执行了
ls wc/  #退出pyspark,查看运行结果
less wc/part-00000  #每个part文件表示并行进程计算得到的结果,最终被保存到磁盘上。
#备注:结果未像Hadoop一样被排序,可以使用sort在结果写入磁盘前进行排序。

参考信息

https://www.cnblogs.com/Vito2008/p/5216324.html
http://coredumper.cn/index.php/2017/07/02/spark-run-environment-build/
https://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/00141861202544241651579c69d4399a9aa135afef28c44000
https://www.zhihu.com/question/26568496