ObsidianRepository/Spark搭建/scala与rdd编程.md

85 lines
3.3 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

### 单词统计
```scala
// 第一种
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
// 第二种
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum)
// 第三种与第二种思路一致 , 但较为复杂
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.map(_._2).sum)
// 第四种
sc.textFile("/word.txt").flatMap(_.split(" ")).countByValue()
```
在 Spark 中DAG有向无环图会被分割为多个 **stage**,每个 stage 包含一系列可以以 pipeline 方式运行的转换操作。**stage 的划分**主要基于是否需要触发 **shuffle** 操作。Shuffle 是当数据需要重新分区、进行全局聚合、排序等时发生的。
```scala
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum)
```
**Stage 的划分依据**如下:
1. **`sc.textFile("/word.txt")``map((_,1))`**:这些操作都是 **窄依赖**narrow dependency可以在同一个 stage 中完成。`flatMap` 和 `map` 操作不会触发 shuffle数据会在每个 partition 中直接处理。
2. **`groupByKey()`**:这是一个典型的 **宽依赖**wide dependency操作意味着数据需要重新分区、在不同的 worker 之间传输,触发了 **shuffle**。因此,在此处会生成一个新的 stage。
3. **`mapValues(_.sum)`**:在 `groupByKey` 之后执行,它只是对每个 partition 内部的数据进行处理,不需要 shuffle因此属于同一个 stage。
### Stage 划分的依据:
- **窄依赖Narrow Dependency**:如 `map`、`flatMap`,不会触发 shuffle可以在一个 stage 内完成。
- **宽依赖Wide Dependency**:如 `groupByKey`、`reduceByKey`,会触发 shuffle导致 stage 划分。
### 编程案例
```scala
package com.aisi
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import java.net.URL
object Example {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setAppName("example")
sparkConf.setMaster("local[*]")
val sparkContext = new SparkContext(sparkConf)
val data = sparkContext.textFile("data/teacher.txt") // 文件地址https://file.shenjianl.cn/text/teacher.txt
val value: RDD[((String, String), Int)] = data.map(t => {
val url = new URL(t)
val subject = url.getHost.split("\\.")(0)
val teacher = url.getPath.substring(1)
((subject, teacher), 1)
}).reduceByKey(_ + _)
value.sortBy(-_._2)
.take(3)
.foreach(println)
}
}
```
```scala
package com.aisi
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Example1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("parse")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val arr = Array("chinese-zhangsan,lisi,wangwu", "math-zhangsan,zhaosi", "english-lisi,wangwu,zhaosi")
val rdd:RDD[String] = sc.makeRDD(arr)
rdd.map(t => {
val strs = t.split("-")
(strs(0), strs(1))
}).flatMapValues(_.split(","))
.map(_.swap)
.reduceByKey(_+","+_)
.foreach(t=>println(t._1+"-"+t._2))
}
}
```