3.3 KiB
3.3 KiB
单词统计
// 第一种
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
// 第二种
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum)
// 第三种与第二种思路一致 , 但较为复杂
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.map(_._2).sum)
// 第四种
sc.textFile("/word.txt").flatMap(_.split(" ")).countByValue()
在 Spark 中,DAG(有向无环图)会被分割为多个 stage,每个 stage 包含一系列可以以 pipeline 方式运行的转换操作。stage 的划分主要基于是否需要触发 shuffle 操作。Shuffle 是当数据需要重新分区、进行全局聚合、排序等时发生的。
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum)
Stage 的划分依据如下:
-
sc.textFile("/word.txt")到map((_,1)):这些操作都是 窄依赖(narrow dependency),可以在同一个 stage 中完成。flatMap和map操作不会触发 shuffle,数据会在每个 partition 中直接处理。 -
groupByKey():这是一个典型的 宽依赖(wide dependency)操作,意味着数据需要重新分区、在不同的 worker 之间传输,触发了 shuffle。因此,在此处会生成一个新的 stage。 -
mapValues(_.sum):在groupByKey之后执行,它只是对每个 partition 内部的数据进行处理,不需要 shuffle,因此属于同一个 stage。
Stage 划分的依据:
- 窄依赖(Narrow Dependency):如
map、flatMap,不会触发 shuffle,可以在一个 stage 内完成。 - 宽依赖(Wide Dependency):如
groupByKey、reduceByKey,会触发 shuffle,导致 stage 划分。
编程案例
package com.aisi
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import java.net.URL
object Example {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setAppName("example")
sparkConf.setMaster("local[*]")
val sparkContext = new SparkContext(sparkConf)
val data = sparkContext.textFile("data/teacher.txt") // 文件地址:https://file.shenjianl.cn/text/teacher.txt
val value: RDD[((String, String), Int)] = data.map(t => {
val url = new URL(t)
val subject = url.getHost.split("\\.")(0)
val teacher = url.getPath.substring(1)
((subject, teacher), 1)
}).reduceByKey(_ + _)
value.sortBy(-_._2)
.take(3)
.foreach(println)
}
}
package com.aisi
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Example1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("parse")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val arr = Array("chinese-zhangsan,lisi,wangwu", "math-zhangsan,zhaosi", "english-lisi,wangwu,zhaosi")
val rdd:RDD[String] = sc.makeRDD(arr)
rdd.map(t => {
val strs = t.split("-")
(strs(0), strs(1))
}).flatMapValues(_.split(","))
.map(_.swap)
.reduceByKey(_+","+_)
.foreach(t=>println(t._1+"-"+t._2))
}
}