### 单词统计 ```scala // 第一种 sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) // 第二种 sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum) // 第三种与第二种思路一致 , 但较为复杂 sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.map(_._2).sum) // 第四种 sc.textFile("/word.txt").flatMap(_.split(" ")).countByValue() ``` 在 Spark 中,DAG(有向无环图)会被分割为多个 **stage**,每个 stage 包含一系列可以以 pipeline 方式运行的转换操作。**stage 的划分**主要基于是否需要触发 **shuffle** 操作。Shuffle 是当数据需要重新分区、进行全局聚合、排序等时发生的。 ```scala sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum) ``` **Stage 的划分依据**如下: 1. **`sc.textFile("/word.txt")` 到 `map((_,1))`**:这些操作都是 **窄依赖**(narrow dependency),可以在同一个 stage 中完成。`flatMap` 和 `map` 操作不会触发 shuffle,数据会在每个 partition 中直接处理。 2. **`groupByKey()`**:这是一个典型的 **宽依赖**(wide dependency)操作,意味着数据需要重新分区、在不同的 worker 之间传输,触发了 **shuffle**。因此,在此处会生成一个新的 stage。 3. **`mapValues(_.sum)`**:在 `groupByKey` 之后执行,它只是对每个 partition 内部的数据进行处理,不需要 shuffle,因此属于同一个 stage。 ### Stage 划分的依据: - **窄依赖(Narrow Dependency)**:如 `map`、`flatMap`,不会触发 shuffle,可以在一个 stage 内完成。 - **宽依赖(Wide Dependency)**:如 `groupByKey`、`reduceByKey`,会触发 shuffle,导致 stage 划分。 ### 编程案例 ```scala package com.aisi import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import java.net.URL object Example { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() sparkConf.setAppName("example") sparkConf.setMaster("local[*]") val sparkContext = new SparkContext(sparkConf) val data = sparkContext.textFile("data/teacher.txt") // 文件地址:https://file.shenjianl.cn/text/teacher.txt val value: RDD[((String, String), Int)] = data.map(t => { val url = new URL(t) val subject = url.getHost.split("\\.")(0) val teacher = url.getPath.substring(1) ((subject, teacher), 1) }).reduceByKey(_ + _) value.sortBy(-_._2) .take(3) .foreach(println) } } ``` ```scala package com.aisi import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Example1 { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("parse") conf.setMaster("local[*]") val sc = new SparkContext(conf) val arr = Array("chinese-zhangsan,lisi,wangwu", "math-zhangsan,zhaosi", "english-lisi,wangwu,zhaosi") val rdd:RDD[String] = sc.makeRDD(arr) rdd.map(t => { val strs = t.split("-") (strs(0), strs(1)) }).flatMapValues(_.split(",")) .map(_.swap) .reduceByKey(_+","+_) .foreach(t=>println(t._1+"-"+t._2)) } } ```