add scala rdd (wordCount sample)
This commit is contained in:
parent
c1d2dd4fd4
commit
f5a407629d
|
|
@ -13,7 +13,7 @@
|
||||||
"state": {
|
"state": {
|
||||||
"type": "markdown",
|
"type": "markdown",
|
||||||
"state": {
|
"state": {
|
||||||
"file": "README.md.md",
|
"file": "Spark搭建/scala与rdd编程.md",
|
||||||
"mode": "source",
|
"mode": "source",
|
||||||
"source": false
|
"source": false
|
||||||
}
|
}
|
||||||
|
|
@ -85,7 +85,7 @@
|
||||||
"state": {
|
"state": {
|
||||||
"type": "backlink",
|
"type": "backlink",
|
||||||
"state": {
|
"state": {
|
||||||
"file": "README.md.md",
|
"file": "Spark搭建/scala与rdd编程.md",
|
||||||
"collapseAll": false,
|
"collapseAll": false,
|
||||||
"extraContext": false,
|
"extraContext": false,
|
||||||
"sortOrder": "alphabetical",
|
"sortOrder": "alphabetical",
|
||||||
|
|
@ -102,7 +102,7 @@
|
||||||
"state": {
|
"state": {
|
||||||
"type": "outgoing-link",
|
"type": "outgoing-link",
|
||||||
"state": {
|
"state": {
|
||||||
"file": "README.md.md",
|
"file": "Spark搭建/scala与rdd编程.md",
|
||||||
"linksCollapsed": false,
|
"linksCollapsed": false,
|
||||||
"unlinkedCollapsed": true
|
"unlinkedCollapsed": true
|
||||||
}
|
}
|
||||||
|
|
@ -125,7 +125,7 @@
|
||||||
"state": {
|
"state": {
|
||||||
"type": "outline",
|
"type": "outline",
|
||||||
"state": {
|
"state": {
|
||||||
"file": "README.md.md"
|
"file": "Spark搭建/scala与rdd编程.md"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
@ -155,12 +155,14 @@
|
||||||
"obsidian-git:Open Git source control": false
|
"obsidian-git:Open Git source control": false
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"active": "78f6b03712f9678a",
|
"active": "a6359902bf66f6d5",
|
||||||
"lastOpenFiles": [
|
"lastOpenFiles": [
|
||||||
"HaiNiuHadoop搭建/1.环境安装.md",
|
"Spark搭建/images/workers.png",
|
||||||
"HaiNiuHadoop搭建/images/000001.png",
|
"Spark搭建/scala与rdd编程.md",
|
||||||
"README.md.md",
|
|
||||||
"Spark搭建/01.md",
|
"Spark搭建/01.md",
|
||||||
|
"HaiNiuHadoop搭建/1.环境安装.md",
|
||||||
|
"README.md.md",
|
||||||
|
"HaiNiuHadoop搭建/images/000001.png",
|
||||||
"2024-09-18.md",
|
"2024-09-18.md",
|
||||||
"欢迎.md",
|
"欢迎.md",
|
||||||
"2024-09-04.md",
|
"2024-09-04.md",
|
||||||
|
|
@ -170,7 +172,6 @@
|
||||||
"HaiNiuHadoop搭建/未命名.md",
|
"HaiNiuHadoop搭建/未命名.md",
|
||||||
"HaiNiuHadoop搭建/4.Yarn配置.md",
|
"HaiNiuHadoop搭建/4.Yarn配置.md",
|
||||||
"HaiNiuHadoop搭建/3.HDFS搭建.md",
|
"HaiNiuHadoop搭建/3.HDFS搭建.md",
|
||||||
"Spark搭建/images/workers.png",
|
|
||||||
"Spark搭建/images",
|
"Spark搭建/images",
|
||||||
"Spark搭建",
|
"Spark搭建",
|
||||||
"HaiNiuHadoop搭建/Yarn配置.md",
|
"HaiNiuHadoop搭建/Yarn配置.md",
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,85 @@
|
||||||
|
### 单词统计
|
||||||
|
```scala
|
||||||
|
// 第一种
|
||||||
|
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
|
||||||
|
// 第二种
|
||||||
|
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum)
|
||||||
|
// 第三种与第二种思路一致 , 但较为复杂
|
||||||
|
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.map(_._2).sum)
|
||||||
|
// 第四种
|
||||||
|
sc.textFile("/word.txt").flatMap(_.split(" ")).countByValue()
|
||||||
|
```
|
||||||
|
|
||||||
|
在 Spark 中,DAG(有向无环图)会被分割为多个 **stage**,每个 stage 包含一系列可以以 pipeline 方式运行的转换操作。**stage 的划分**主要基于是否需要触发 **shuffle** 操作。Shuffle 是当数据需要重新分区、进行全局聚合、排序等时发生的。
|
||||||
|
|
||||||
|
|
||||||
|
```scala
|
||||||
|
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Stage 的划分依据**如下:
|
||||||
|
|
||||||
|
1. **`sc.textFile("/word.txt")` 到 `map((_,1))`**:这些操作都是 **窄依赖**(narrow dependency),可以在同一个 stage 中完成。`flatMap` 和 `map` 操作不会触发 shuffle,数据会在每个 partition 中直接处理。
|
||||||
|
|
||||||
|
2. **`groupByKey()`**:这是一个典型的 **宽依赖**(wide dependency)操作,意味着数据需要重新分区、在不同的 worker 之间传输,触发了 **shuffle**。因此,在此处会生成一个新的 stage。
|
||||||
|
|
||||||
|
3. **`mapValues(_.sum)`**:在 `groupByKey` 之后执行,它只是对每个 partition 内部的数据进行处理,不需要 shuffle,因此属于同一个 stage。
|
||||||
|
|
||||||
|
### Stage 划分的依据:
|
||||||
|
|
||||||
|
- **窄依赖(Narrow Dependency)**:如 `map`、`flatMap`,不会触发 shuffle,可以在一个 stage 内完成。
|
||||||
|
- **宽依赖(Wide Dependency)**:如 `groupByKey`、`reduceByKey`,会触发 shuffle,导致 stage 划分。
|
||||||
|
|
||||||
|
|
||||||
|
### 编程案例
|
||||||
|
```scala
|
||||||
|
package com.aisi
|
||||||
|
|
||||||
|
import org.apache.spark.rdd.RDD
|
||||||
|
import org.apache.spark.{SparkConf, SparkContext}
|
||||||
|
|
||||||
|
import java.net.URL
|
||||||
|
|
||||||
|
object Example {
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val sparkConf = new SparkConf()
|
||||||
|
sparkConf.setAppName("example")
|
||||||
|
sparkConf.setMaster("local[*]")
|
||||||
|
val sparkContext = new SparkContext(sparkConf)
|
||||||
|
val data = sparkContext.textFile("data/teacher.txt") // 文件地址:https://file.shenjianl.cn/text/teacher.txt
|
||||||
|
val value: RDD[((String, String), Int)] = data.map(t => {
|
||||||
|
val url = new URL(t)
|
||||||
|
val subject = url.getHost.split("\\.")(0)
|
||||||
|
val teacher = url.getPath.substring(1)
|
||||||
|
((subject, teacher), 1)
|
||||||
|
}).reduceByKey(_ + _)
|
||||||
|
value.sortBy(-_._2)
|
||||||
|
.take(3)
|
||||||
|
.foreach(println)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
```scala
|
||||||
|
package com.aisi
|
||||||
|
|
||||||
|
import org.apache.spark.rdd.RDD
|
||||||
|
import org.apache.spark.{SparkConf, SparkContext}
|
||||||
|
object Example1 {
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val conf = new SparkConf()
|
||||||
|
conf.setAppName("parse")
|
||||||
|
conf.setMaster("local[*]")
|
||||||
|
val sc = new SparkContext(conf)
|
||||||
|
val arr = Array("chinese-zhangsan,lisi,wangwu", "math-zhangsan,zhaosi", "english-lisi,wangwu,zhaosi")
|
||||||
|
val rdd:RDD[String] = sc.makeRDD(arr)
|
||||||
|
rdd.map(t => {
|
||||||
|
val strs = t.split("-")
|
||||||
|
(strs(0), strs(1))
|
||||||
|
}).flatMapValues(_.split(","))
|
||||||
|
.map(_.swap)
|
||||||
|
.reduceByKey(_+","+_)
|
||||||
|
.foreach(t=>println(t._1+"-"+t._2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
Loading…
Reference in New Issue