add spark-sql case

This commit is contained in:
2024-10-12 16:56:39 +08:00
parent 8b4a30b940
commit 4726888819
22 changed files with 364 additions and 11 deletions

View File

@@ -1,11 +1,10 @@
package com.aisi.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object TestMovieWithSql {
object TestMovieWithPureSql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("movie")
@@ -35,15 +34,33 @@ object TestMovieWithSql {
}).toDF("userId", "movieId", "score")
df1.limit(10).show()
import org.apache.spark.sql.functions._
df.join(df1,"movieId").groupBy("userId","movieType")
.agg(count("userId").as("cnt"))
.withColumn("rn",row_number().over(Window.partitionBy("userId").orderBy($"cnt".desc)))
.where("rn = 1") // 取到 userId 分区的 第一个就是 cnt最大的那条数据
.show()
//
// row_number()这是一个窗口函数用来为分组中的每一行分配一个唯一的行号行号按照排序规则依次递增
// Window.partitionBy("userId")partitionBy 会根据 userId 划分数据即对于每个 userId 来独立计算行号这意味着每个用户的数据被视为一个独立的分区
// orderBy($"cnt".desc)在每个 userId 分区内数据会根据 cnt即该用户对某种电影类型的观看次数降序排列行号根据这种排序结果分配
df.createTempView("movie") //("movieId", "movieName", "movieType")
df1.createTempView("ratings") // ("userId", "movieId", "score")
// cnt 即为该用户 所看影片出现类型(movieType)最多的 次数count
sqlSc.sql(
"""
|SELECT userId, movieType, cnt, rn
|FROM (
| SELECT *,
| row_number() OVER (PARTITION BY userId ORDER BY cnt DESC) AS rn
| FROM (
| SELECT COUNT(1) AS cnt,
| userId,
| movieType
| FROM (
| SELECT userId,
| movieType
| FROM movie m
| JOIN ratings r ON m.movieId = r.movieId
| ) t
| GROUP BY userId, movieType
| ) t1
|) t2
|WHERE rn = 1
|""".stripMargin
).show(20)
// +------+---------+---+---+
// |userId|movieType|cnt| rn|

View File

@@ -0,0 +1,58 @@
package com.aisi.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 每个用户最喜欢哪个类型的电影
* 每个类型中最受欢迎的前三个电影?
* 然后给用户推荐
*/
object TestMovieWithSqlApi {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("movie")
conf.setMaster("local[*]")
// conf.set("spark.shuffle.partitions", "20")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
import sqlSc.implicits._
val df: DataFrame = sc.textFile("Spark/data/movies.txt")
.flatMap(t => {
val line = t.split(",")
val movieId = line(0)
val movieTypes = line.reverse.head
val movieName = line.tail.reverse.tail.reverse.mkString(" ")
// movieTypes.split("\\|").map(movieRecord(movieId, movieName, _))
movieTypes.split("\\|").map(movieType => (movieId, movieName, movieType)) // 返回三元组
}).toDF("movieId", "movieName", "movieType")
// df.limit(10).show()
val df1 = sc.textFile("Spark/data/ratings.txt")
.map(t => {
val line = t.split(",")
val userId = line(0)
val movieId = line(1)
val score = line(2).toDouble
(userId, movieId, score)
}).toDF("userId", "movieId", "score")
df1.limit(10).show()
import org.apache.spark.sql.functions._
val df11 = df.join(df1, "movieId").groupBy("userId", "movieType")
.agg(count("userId").as("cnt"))
.withColumn("rn", row_number().over(Window.partitionBy("userId").orderBy($"cnt".desc)))
.where("rn = 1") // 取到 userId 分区的 第一个,就是 cnt最大的那条数据
.select("userId", "movieType")
val df22 = df.join(df1, "movieId").groupBy("movieType", "movieName")
.agg(avg("score").as("avg"))
.withColumn("rn", row_number().over(Window.partitionBy("movieType").orderBy($"avg".desc)))
.where("rn < 4")
.select("movieType", "movieName")
df11.join(df22, "movieType").show()
}
}
//case class movieRecord(var movieId:String,var movieName:String, var movieTypes:String)

View File

@@ -0,0 +1,76 @@
package com.aisi.spark
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import java.util.Properties
/**
* 每个用户最喜欢哪个类型的电影
* 每个类型中最受欢迎的前三个电影?
* 然后给用户推荐
*/
object TestMovieWithWriteApi {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("movie")
conf.setMaster("local[*]")
// 添加禁用文件权限检查的配置
conf.set("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
import sqlSc.implicits._
// 读取数据并转换为 DataFrame
val df: DataFrame = sc.textFile("Spark/data/movies.txt")
.flatMap(t => {
val line = t.split(",")
val movieId = line(0)
val movieTypes = line.reverse.head
val movieName = line.tail.reverse.tail.reverse.mkString(" ")
// 将电影类型拆分为多个记录
movieTypes.split("\\|").map(movieType => (movieId, movieName, movieType)) // 返回三元组
}).toDF("movieId", "movieName", "movieType").limit(15)
// 打印 DataFrame 内容
df.show()
//
// (1) 通过 JDBC 存储到 MySql
// // 设置数据库连接属性
// val properties = new Properties()
// properties.put("user", "root")
// properties.put("password", "root")
// properties.put("driver", "com.mysql.cj.jdbc.Driver") // 指定 MySQL JDBC 驱动类
// import org.apache.spark.sql.functions._
// // 添加 id 列,但在写入数据库时省略它
// val dfWithId = df.withColumn("id", monotonically_increasing_id())
// // 选择需要写入 MySQL 的列,省略 id 列
// val finalDf = dfWithId.select("movieId", "movieName", "movieType")
// // 写入 MySQL 数据库
// finalDf.write.mode("append").jdbc("jdbc:mysql://localhost:3306/test", "movie", properties)
// (2) 存储为 CSV 格式
// df.write.csv("Spark/data/csv")
// df.write.csv("data/csv")
// df.write.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2")
// .save("Spark/data/csv")
// (3) 以 parquet 写
// df.write.parquet("Spark/data/parquet")
// df.write.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2")
// .save("Spark/data/parquet")
// (4) 以 json 写
// val df1 = sqlSc.read.json("Spark/data/stu.json")
// df1.select("_corrupt_record").show(false)
// df1.write.mode("overwrite").format("json").save("Spark/data/json")
// (5) 以 text文本写
// val df1 = sqlSc.read.text("Spark/data/word.txt")
// df1.write.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2")
// .save("Spark/data/text")
}
}

View File

@@ -0,0 +1,45 @@
package com.aisi.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext} // 导入 Window
object TestSparkPureSql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("test sql")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
//引入环境信息
import sqlSc.implicits._
val rdd: RDD[(Int, String, Int, String)] = sc.textFile("Spark/data/a.txt")
.map(t => {
val line = t.split(" ")
val id = line(0).toInt
val name = line(1)
val age = line(2).toInt
val gender = line(3)
(id, name, age, gender)
})
val df: DataFrame = rdd.toDF("id", "name", "age", "gender")
df.show() //展示表数据
df.printSchema() //展示表格字段信息
// df.where("age > 20").groupBy("gender").sum("age").show()
// df.orderBy($"age".desc).show()
// 聚合并排序
// val result = df.groupBy("gender")
// .agg(
// count("id").as("count_id"), // 计数 id
// sum("age").as("sum_age") // 求和 age
// )
// .orderBy($"sum_age".desc) // 按 sum_age 降序排序
// result.show()
df.createTempView("stu")
val df1 = sqlSc.sql("select count(1) as gender_count,gender from stu group by gender")
df1.show()
}
}

View File

@@ -0,0 +1,42 @@
package com.aisi.spark
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}
/**
* UDAF 的定义和使用
* 计算所有学生的年龄和
*/
object TestUDAF {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().appName("testUDAF").master("local[*]").getOrCreate()
import sparkSession.implicits._
val df: DataFrame = sparkSession.sparkContext.textFile("Spark/data/a.txt")
.map(t => {
val line = t.split(" ")
(line(0), line(1), line(2).toInt, line(3))
}).toDF("id", "name", "age", "gender")
import org.apache.spark.sql.functions._
// 传统的方法
// val df1 = df.agg(avg("age")).show()
// val df2 = df.groupBy("gender").avg("age").show()
val mySum = new MySum()
sparkSession.udf.register("mysum",functions.udaf(mySum))
df.createTempView("stu")
sparkSession.sql("select mysum(age) from stu").show()
}
}
class MySum extends Aggregator[Int,Int,Int]{
override def zero: Int = 0
override def reduce(b: Int, a: Int): Int = b+a
override def merge(b1: Int, b2: Int): Int = b1+b2
override def finish(reduction: Int): Int = reduction
override def bufferEncoder: Encoder[Int] = Encoders.scalaInt
override def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

View File

@@ -0,0 +1,30 @@
package com.aisi.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* UDF 的定义和使用
* 统计每个人的一年的收入 薪资+奖金 salary * 12 + bonus
*/
object TestUDF {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().appName("testUDF").master("local[*]").getOrCreate()
import sparkSession.implicits._
val df: DataFrame = sparkSession.sparkContext.textFile("Spark/data/salary.txt")
.map(t => {
val line = t.split(" ")
(line(0), line(1), line(2).toInt, line(3).toInt)
}).toDF("id", "name", "salary", "bonus")
sparkSession.udf.register("all_income", (salary: Int, bonus: Int) => salary * 12 + bonus)
df.createTempView("salary")
// 使用 SQL 方式调用 UDF
sparkSession.sql("select id,name, all_income(salary,bonus) all from salary").show()
import org.apache.spark.sql.functions
// 使用 SQL API 方式调用 UDF
df.withColumn("all",functions.callUDF("all_income",$"salary",$"bonus"))
.select("id","name","all")
.show()
}
}

View File

@@ -0,0 +1,43 @@
package com.aisi.spark
import org.apache.spark.sql.SparkSession
/**
* 在 Spark 中同时使用 Hive 和临时视图不会冲突,你可以在同一个 SparkSession 中同时使用它们。
* 不会冲突的原因:
* 作用范围: 临时视图只在当前 Spark 会话中有效,而 Hive 表是全局的。使用临时视图不会干扰到 Hive 表。
* 名称冲突: 如果临时视图和 Hive 表具有相同名称Spark 会优先使用临时视图。
* 在 SQL 查询中使用表名时Spark 会首先查找临时视图,如果找不到再查找 Hive 表。这种行为可以帮助避免命名冲突的问题。
*/
object TestWithSparkSession {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder()
.enableHiveSupport() // 启用 Hive 支持
.master("local[*]")
.appName("test")
.getOrCreate()
// 创建示例 DataFrame
import sparkSession.implicits._
val data = Seq(
("John", 30),
("Doe", 25)
)
val df = data.toDF("name", "age")
// 创建临时视图
df.createOrReplaceTempView("people")
// 使用 Spark SQL 查询临时视图
val resultTempView = sparkSession.sql("SELECT * FROM people WHERE age > 28")
resultTempView.show()
// 假设 Hive 中有一个表叫 "hive_people"
// 使用 Spark SQL 查询 Hive 表
val resultHive = sparkSession.sql("SELECT * FROM hive_people WHERE age > 28")
resultHive.show()
// 停止 SparkSession
sparkSession.stop()
}
}