Spark中的RDD到底是什么意思?
1.RDD是弹性分布式数据集,是一个分布式对象的集合。对个RDD可分为多个片,分片可以在集群环境下的不同节点上计算
2.可以通过两种方式创建RDD:
a.加载外部数据集
b.在驱动程序中部署对象集合。
c.创建RDD最简单的方法就是采用现有的内存集合并把它传递给sc的并行化方法。适合测试,不适合生产
优势在于可以快速创建自己的RDD并对其执行相关的操作。
val line = sc.parallelize(List("pandas","i like pandas"))
d.可以加载外部存储数据用sc.textFile("file:///home/ubuntu/simple.txt")来加载一个将文本文件作为字符串的RDD.
val r = sc.textFile("file:///home/ubuntu/simple.txt")
r: org.apache.spark.rdd.RDD[String] = file:///home/ubuntu/simple.txt MapPartitionsRDD[5] at textFile at <console>:24
4.RDD两种类型的操作:装换和动作
a.转换就是将原来的RDD构建成新的RDD,例如:map,filter
val r1 = r.filter(line => line.contains("20"))
r1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at filter at <console>:27
val r2 = r.filter(line => line.contains("10"))
val r3 = r1.union(r2)
r3: org.apache.spark.rdd.RDD[String] = UnionRDD[11] at union at <console>:32
a_1.当相互转换得到并使用新的RDD时,Spark跟踪记录且设定不同的RDD之间的依赖关系,这种关系称为血统图(图1-7)
它使用这个信息来按照需求计算每个RDD,以及恢复持续化的RDD丢失的那一部分数据。我们每次调用一个新的动作,
整个RDD必须从头开始计算,可以使用持久化来提高效率。
b.动作是通过RDD来计算的结果,并且将结果返回给驱动程序或者保存到外部存储系统(HDFS),
如:count()它返回计数,
first()
take(n)包含了前n行记录。
collect()用来获取整个RDD,不应该用在大型数据集上。
大多数情况下,RDD不能仅仅被collect()到驱动,原因是数据量太大,一般是把数据写到HDFS或S3
RDD的内容可以使用saveAsTextFile()或者savaAsSequenceFile()以及其他动作来保存。
scala> r1.first
res12: String = 1201 wang 20
c.惰性评估(Lazy Evaluation)
c1.它意味着,当我们调用RDD的转换时,不立即执行该操作,相反,Spark在内部记录元数据以表明该操作已被请求,而不是考虑RDD包含的具体的数据。
c2.Spark通过使用惰性评估,以减少其在各种转换操作中所需要存储的中间数据。
5.RDD只有在第一次使用它们中的动作时才计算,可以避免浪费大量的存储空间。因为我们随后会立即过滤掉一部分不需要的行
一旦Spark看到整个变换链,他可以计算仅需其结果的数据,对于first()动作,Spark只扫描文件,直到他找到第一个匹配的行,不读整个文件。
6.RDDS在默认的情况下每次运行它们都要进行重新计算。如果重用多个动作,可以使用持久化的方法:RDD.persist(),计算第一次后,Spark将RDD
内容存储在内存中(整个机器的集群分区),默认不适用持久化的意义在于:如果不重用大数据集,可以避免浪费空间。
7.一般会经常使用持久化去加载数据集到内存中,方面重复的查询和使用
Copyright © 广州京杭网络科技有限公司 2005-2024 版权所有 粤ICP备16019765号
广州京杭网络科技有限公司 版权所有