diff --git a/.obsidian/workspace.json b/.obsidian/workspace.json index 2c1dba9..efa9fc5 100644 --- a/.obsidian/workspace.json +++ b/.obsidian/workspace.json @@ -13,7 +13,7 @@ "state": { "type": "markdown", "state": { - "file": "README.md.md", + "file": "Spark搭建/scala与rdd编程.md", "mode": "source", "source": false } @@ -85,7 +85,7 @@ "state": { "type": "backlink", "state": { - "file": "README.md.md", + "file": "Spark搭建/scala与rdd编程.md", "collapseAll": false, "extraContext": false, "sortOrder": "alphabetical", @@ -102,7 +102,7 @@ "state": { "type": "outgoing-link", "state": { - "file": "README.md.md", + "file": "Spark搭建/scala与rdd编程.md", "linksCollapsed": false, "unlinkedCollapsed": true } @@ -125,7 +125,7 @@ "state": { "type": "outline", "state": { - "file": "README.md.md" + "file": "Spark搭建/scala与rdd编程.md" } } }, @@ -155,12 +155,14 @@ "obsidian-git:Open Git source control": false } }, - "active": "78f6b03712f9678a", + "active": "a6359902bf66f6d5", "lastOpenFiles": [ - "HaiNiuHadoop搭建/1.环境安装.md", - "HaiNiuHadoop搭建/images/000001.png", - "README.md.md", + "Spark搭建/images/workers.png", + "Spark搭建/scala与rdd编程.md", "Spark搭建/01.md", + "HaiNiuHadoop搭建/1.环境安装.md", + "README.md.md", + "HaiNiuHadoop搭建/images/000001.png", "2024-09-18.md", "欢迎.md", "2024-09-04.md", @@ -170,7 +172,6 @@ "HaiNiuHadoop搭建/未命名.md", "HaiNiuHadoop搭建/4.Yarn配置.md", "HaiNiuHadoop搭建/3.HDFS搭建.md", - "Spark搭建/images/workers.png", "Spark搭建/images", "Spark搭建", "HaiNiuHadoop搭建/Yarn配置.md", diff --git a/Spark搭建/scala与rdd编程.md b/Spark搭建/scala与rdd编程.md new file mode 100644 index 0000000..27b0eca --- /dev/null +++ b/Spark搭建/scala与rdd编程.md @@ -0,0 +1,85 @@ +### 单词统计 +```scala +// 第一种 +sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) +// 第二种 +sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum) +// 第三种与第二种思路一致 , 但较为复杂 +sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.map(_._2).sum) +// 第四种 +sc.textFile("/word.txt").flatMap(_.split(" ")).countByValue() +``` + +在 Spark 中,DAG(有向无环图)会被分割为多个 **stage**,每个 stage 包含一系列可以以 pipeline 方式运行的转换操作。**stage 的划分**主要基于是否需要触发 **shuffle** 操作。Shuffle 是当数据需要重新分区、进行全局聚合、排序等时发生的。 + + +```scala +sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum) +``` + +**Stage 的划分依据**如下: + +1. **`sc.textFile("/word.txt")` 到 `map((_,1))`**:这些操作都是 **窄依赖**(narrow dependency),可以在同一个 stage 中完成。`flatMap` 和 `map` 操作不会触发 shuffle,数据会在每个 partition 中直接处理。 + +2. **`groupByKey()`**:这是一个典型的 **宽依赖**(wide dependency)操作,意味着数据需要重新分区、在不同的 worker 之间传输,触发了 **shuffle**。因此,在此处会生成一个新的 stage。 + +3. **`mapValues(_.sum)`**:在 `groupByKey` 之后执行,它只是对每个 partition 内部的数据进行处理,不需要 shuffle,因此属于同一个 stage。 + +### Stage 划分的依据: + +- **窄依赖(Narrow Dependency)**:如 `map`、`flatMap`,不会触发 shuffle,可以在一个 stage 内完成。 +- **宽依赖(Wide Dependency)**:如 `groupByKey`、`reduceByKey`,会触发 shuffle,导致 stage 划分。 + + +### 编程案例 +```scala +package com.aisi + +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} + +import java.net.URL + +object Example { + def main(args: Array[String]): Unit = { + val sparkConf = new SparkConf() + sparkConf.setAppName("example") + sparkConf.setMaster("local[*]") + val sparkContext = new SparkContext(sparkConf) + val data = sparkContext.textFile("data/teacher.txt") // 文件地址:https://file.shenjianl.cn/text/teacher.txt + val value: RDD[((String, String), Int)] = data.map(t => { + val url = new URL(t) + val subject = url.getHost.split("\\.")(0) + val teacher = url.getPath.substring(1) + ((subject, teacher), 1) + }).reduceByKey(_ + _) + value.sortBy(-_._2) + .take(3) + .foreach(println) + } +} +``` + +```scala +package com.aisi + +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} +object Example1 { + def main(args: Array[String]): Unit = { + val conf = new SparkConf() + conf.setAppName("parse") + conf.setMaster("local[*]") + val sc = new SparkContext(conf) + val arr = Array("chinese-zhangsan,lisi,wangwu", "math-zhangsan,zhaosi", "english-lisi,wangwu,zhaosi") + val rdd:RDD[String] = sc.makeRDD(arr) + rdd.map(t => { + val strs = t.split("-") + (strs(0), strs(1)) + }).flatMapValues(_.split(",")) + .map(_.swap) + .reduceByKey(_+","+_) + .foreach(t=>println(t._1+"-"+t._2)) + } +} +``` \ No newline at end of file