spark2.2官方教程笔记-spark编程向导
首先,spark应用程序由一个驱动程序构成,由它运行用户的main函数,并且在集群上执行各种并行化操作。这个抽象的spark应用函数提供弹性分部式数据集【Spark provides is a resilient distributed dataset (RDD)】,一个rdd可以从hadoop文件系统,或者现存的scala集合,或者从其它集合中转换生成。我们可以让rdd保存在内存中,可以让rdd可以高效的做序列化操作。并且RDD还有一个牛逼的功能,就是自动恢复无效的节点。
其次,spark支持并行化操作中共享变量。当一个spark在不同的节点上运行一系列任务,spark能复制函数中每一个变量到每一个任务节点中。然而,有时一个变量要在不同集群中共享,spark支持两种方式解决这个问题:广播变量(缓存一个数值到所有节点中),累加器(只能用来累加和求和的变量)
链接相关库
建立spark2.2.0需要依赖相关库,如果需要使用hadoop 集群还需要使用hdfs的库,maven示例
spark core
groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0
hadoop
groupId = org.apache.hadoop
artifactId = hadoop-client
version =
初始化spark
spark编程的第一件事就是创建sparkContext对象,该对象告诉spark如何访问集群。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
每个jvm只能实例化一个spark上下文,在创建一个新的sparkContext之前你必须调用stop函数(估计是单例模式?)
appName是显示在集群控制界面Ui上的名称,master是运行spark的模式,有 Spark, Mesos or YARN cluster URL,或者是本地模式“local”,如果是运行集群模式,就不需要以硬编码方式设置master,而是以spark-submit的方式启动程序,并且在集群中接受运行。对于本地测试和单机测试,建议使用设置“local”的方式去在进程中运行spark。
在shell中运行
在spark shell中,一个sparkContext已经内建好了,变量名是sc,创建的SparkContext是不生效的。
在shell中,可以通过--master参数设置运行模式,--jars添加jar包,--packages添加额外包
$ ./bin/spark-shell --master local[4]
$ ./bin/spark-shell --master local[4] --jars code.jar
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
弹性分布式数据集(RDD)
spark的核心概念是rdd,一个容错的可并行处理的分布式数据集合框架。rdd通过两种方式创建:1,外部数据源(各种集合对象并行化,文件系统,hdfs,hbase等);2,rdd转换。
并行化集合
在程序中现有的集合中调用sparkContext的parallelize(集合对象)方法。举个栗子:创建1到5的并行化集合对象
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
代码很简单,spark官方文档这里絮絮叨叨的讲了一些废话,只有一句是重点,
We describe operations on distributed datasets later on.spark是懒执行,既是遇到action操作的时候才运行程序。
对于并行化集合的一个重要的参数是数据集拆分的数量。spark将在集群上的每个节点运行一个任务。通常集群的cpu数量和分区数一致比较好。
外部数据集
spark能通过以下几种方式创建rdd:本地文件系统,hdfs, Cassandra(一款开源分布式NOSql图形数据库), HBase, Amazon S3等,spark支持文本文件,序列化文件,其它hadoop输入格式(官方文档重复了n次了,估计是spark引以自豪的东西)。
对于文本文件,rdd能够使用 SparkContext 的 textFile 方法创建rdd对象。这个方法参数uri可以是本地路径,hdfs://,s3n://,然后读取其中的每一行。举个栗子,我们可以通过这种方式累加文本文件的行数的长度。
distFile.map(s => s.length).reduce((a, b) => a + b).
一些注意点
如果uri是本地参数,那么集群中的每个节点相同路径下均要能够访问到这个文件,可以使用文件拷贝或者网络mount共享这文件。
spark文件输入中,支持输入文件夹,压缩文件和甚至支持通配符。举个栗子
textFile("/my/directory"),
textFile("/my/directory/*.txt"),
textFile("/my/directory/*.gz")
这里讲textFile(filepath,minPartitions)第二个参数minPartitions,指定数据最小分区。The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.举个栗子
val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/" //local file
val rdd1 = sc.textFile(path,2)
从本地系统中读取licenses这个文件夹下的所有文件
這里特别注意的是,比如這个文件夹下有35个文件,上面分区数设置是2,那么整个RDD的分区数是35*2?這是错误的,這个RDD的分区数不管你的partition数设置为多少时,只要license這个文件夹下的這个文件a.txt(比如有a.txt)没有超过128m,那么a.txt就只有一个partition。那么就是说只要这35个文件其中没有一个超过128m,那么分区数就是 35个。
SparkContext.wholeTextFiles() 读取路径下的所有文本文件,返回 (filename, content) 对。
RDD.saveAsObjectFile and SparkContext.objectFile 将rdd对象序列化后保存
时间:2018-10-09 22:47 来源: 转发量:次
声明:本站部分作品是由网友自主投稿和发布、编辑整理上传,对此类作品本站仅提供交流平台,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,不为其版权负责。如果您发现网站上有侵犯您的知识产权的作品,请与我们取得联系,我们会及时修改或删除。
相关文章:
相关推荐:
网友评论: