ObsidianRepository/Spark搭建/scala与rdd编程.md

3.3 KiB
Raw Permalink Blame History

单词统计

// 第一种
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
// 第二种
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum)
// 第三种与第二种思路一致 , 但较为复杂
sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupBy(_._1).mapValues(_.map(_._2).sum)
// 第四种
sc.textFile("/word.txt").flatMap(_.split(" ")).countByValue()

在 Spark 中DAG有向无环图会被分割为多个 stage,每个 stage 包含一系列可以以 pipeline 方式运行的转换操作。stage 的划分主要基于是否需要触发 shuffle 操作。Shuffle 是当数据需要重新分区、进行全局聚合、排序等时发生的。

sc.textFile("/word.txt").flatMap(_.split(" ")).map((_,1)).groupByKey().mapValues(_.sum)

Stage 的划分依据如下:

  1. sc.textFile("/word.txt")map((_,1)):这些操作都是 窄依赖narrow dependency可以在同一个 stage 中完成。flatMapmap 操作不会触发 shuffle数据会在每个 partition 中直接处理。

  2. groupByKey():这是一个典型的 宽依赖wide dependency操作意味着数据需要重新分区、在不同的 worker 之间传输,触发了 shuffle。因此,在此处会生成一个新的 stage。

  3. mapValues(_.sum):在 groupByKey 之后执行,它只是对每个 partition 内部的数据进行处理,不需要 shuffle因此属于同一个 stage。

Stage 划分的依据:

  • 窄依赖Narrow Dependency:如 mapflatMap,不会触发 shuffle可以在一个 stage 内完成。
  • 宽依赖Wide Dependency:如 groupByKeyreduceByKey,会触发 shuffle导致 stage 划分。

编程案例

package com.aisi  
  
import org.apache.spark.rdd.RDD  
import org.apache.spark.{SparkConf, SparkContext}  
  
import java.net.URL  
  
object Example {  
  def main(args: Array[String]): Unit = {  
    val sparkConf = new SparkConf()  
    sparkConf.setAppName("example")  
    sparkConf.setMaster("local[*]")  
    val sparkContext = new SparkContext(sparkConf)  
    val data = sparkContext.textFile("data/teacher.txt")  // 文件地址https://file.shenjianl.cn/text/teacher.txt
    val value: RDD[((String, String), Int)] = data.map(t => {  
      val url = new URL(t)  
      val subject = url.getHost.split("\\.")(0)  
      val teacher = url.getPath.substring(1)  
      ((subject, teacher), 1)  
    }).reduceByKey(_ + _)  
    value.sortBy(-_._2)  
      .take(3)  
      .foreach(println)  
  }  
}
package com.aisi  
  
import org.apache.spark.rdd.RDD  
import org.apache.spark.{SparkConf, SparkContext}  
object Example1 {  
  def main(args: Array[String]): Unit = {  
    val conf = new SparkConf()  
    conf.setAppName("parse")  
    conf.setMaster("local[*]")  
    val sc = new SparkContext(conf)  
    val arr = Array("chinese-zhangsan,lisi,wangwu", "math-zhangsan,zhaosi", "english-lisi,wangwu,zhaosi")  
    val rdd:RDD[String] = sc.makeRDD(arr)  
    rdd.map(t => {  
        val strs = t.split("-")  
        (strs(0), strs(1))  
      }).flatMapValues(_.split(","))  
      .map(_.swap)  
      .reduceByKey(_+","+_)  
      .foreach(t=>println(t._1+"-"+t._2))  
  }  
}