diff --git a/Spark/data/csv/._SUCCESS.crc b/Spark/data/csv/._SUCCESS.crc new file mode 100644 index 0000000..3b7b044 Binary files /dev/null and b/Spark/data/csv/._SUCCESS.crc differ diff --git a/Spark/data/csv/.part-00000-9f43c427-2c36-4752-9fe8-930284b81f1a-c000.csv.crc b/Spark/data/csv/.part-00000-9f43c427-2c36-4752-9fe8-930284b81f1a-c000.csv.crc new file mode 100644 index 0000000..531ac76 Binary files /dev/null and b/Spark/data/csv/.part-00000-9f43c427-2c36-4752-9fe8-930284b81f1a-c000.csv.crc differ diff --git a/Spark/data/csv/_SUCCESS b/Spark/data/csv/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/Spark/data/csv/part-00000-9f43c427-2c36-4752-9fe8-930284b81f1a-c000.csv b/Spark/data/csv/part-00000-9f43c427-2c36-4752-9fe8-930284b81f1a-c000.csv new file mode 100644 index 0000000..1f73d00 --- /dev/null +++ b/Spark/data/csv/part-00000-9f43c427-2c36-4752-9fe8-930284b81f1a-c000.csv @@ -0,0 +1,15 @@ +1,Toy Story (1995),Animation +1,Toy Story (1995),Children's +1,Toy Story (1995),Comedy +2,Jumanji (1995),Adventure +2,Jumanji (1995),Children's +2,Jumanji (1995),Fantasy +3,Grumpier Old Men (1995),Comedy +3,Grumpier Old Men (1995),Romance +4,Waiting to Exhale (1995),Comedy +4,Waiting to Exhale (1995),Drama +5,Father of the Bride Part II (1995),Comedy +6,Heat (1995),Action +6,Heat (1995),Crime +6,Heat (1995),Thriller +7,Sabrina (1995),Comedy diff --git a/Spark/data/json/._SUCCESS.crc b/Spark/data/json/._SUCCESS.crc new file mode 100644 index 0000000..3b7b044 Binary files /dev/null and b/Spark/data/json/._SUCCESS.crc differ diff --git a/Spark/data/json/.part-00000-83641c56-1ceb-46b8-9fe0-774bb6f2f71d-c000.json.crc b/Spark/data/json/.part-00000-83641c56-1ceb-46b8-9fe0-774bb6f2f71d-c000.json.crc new file mode 100644 index 0000000..906a717 Binary files /dev/null and b/Spark/data/json/.part-00000-83641c56-1ceb-46b8-9fe0-774bb6f2f71d-c000.json.crc differ diff --git a/Spark/data/json/_SUCCESS b/Spark/data/json/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/Spark/data/json/part-00000-83641c56-1ceb-46b8-9fe0-774bb6f2f71d-c000.json b/Spark/data/json/part-00000-83641c56-1ceb-46b8-9fe0-774bb6f2f71d-c000.json new file mode 100644 index 0000000..03b86f0 --- /dev/null +++ b/Spark/data/json/part-00000-83641c56-1ceb-46b8-9fe0-774bb6f2f71d-c000.json @@ -0,0 +1,2 @@ +{"age":30,"name":"John"} +{"age":25,"name":"Doe"} diff --git a/Spark/data/salary.txt b/Spark/data/salary.txt new file mode 100644 index 0000000..84284ec --- /dev/null +++ b/Spark/data/salary.txt @@ -0,0 +1,3 @@ +1 zhangsan 20000 10000 +2 lisi 21000 20000 +3 wangwu 22000 21000 \ No newline at end of file diff --git a/Spark/data/stu.json b/Spark/data/stu.json new file mode 100644 index 0000000..5f4d7d1 --- /dev/null +++ b/Spark/data/stu.json @@ -0,0 +1,4 @@ +[ + {"name": "John", "age": 30}, + {"name": "Doe", "age": 25} +] diff --git a/Spark/data/text/._SUCCESS.crc b/Spark/data/text/._SUCCESS.crc new file mode 100644 index 0000000..3b7b044 Binary files /dev/null and b/Spark/data/text/._SUCCESS.crc differ diff --git a/Spark/data/text/.part-00000-fa1ea2f4-a6ad-4d6a-bf4f-a810d6532ec5-c000.txt.crc b/Spark/data/text/.part-00000-fa1ea2f4-a6ad-4d6a-bf4f-a810d6532ec5-c000.txt.crc new file mode 100644 index 0000000..029fe92 Binary files /dev/null and b/Spark/data/text/.part-00000-fa1ea2f4-a6ad-4d6a-bf4f-a810d6532ec5-c000.txt.crc differ diff --git a/Spark/data/text/_SUCCESS b/Spark/data/text/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/Spark/data/text/part-00000-fa1ea2f4-a6ad-4d6a-bf4f-a810d6532ec5-c000.txt b/Spark/data/text/part-00000-fa1ea2f4-a6ad-4d6a-bf4f-a810d6532ec5-c000.txt new file mode 100644 index 0000000..aade0f8 --- /dev/null +++ b/Spark/data/text/part-00000-fa1ea2f4-a6ad-4d6a-bf4f-a810d6532ec5-c000.txt @@ -0,0 +1,13 @@ +shenjianZ poop yuqing yuqin +yuqing yuqin shenjianZ poop +shenjianZ poop yuqing yuqin +yuqing yuqin shenjianZ poop +shenjianZ poop yuqing yuqin +yuqing yuqin shenjianZ poop +shenjianZ poop yuqing yuqin +yuqing yuqin shenjianZ poop +shenjianZ poop yuqing yuqin +yuqing yuqin shenjianZ poop +shenjianZ poop yuqing yuqin +yuqing yuqin shenjianZ poop + diff --git a/Spark/pom.xml b/Spark/pom.xml index 5e932fc..5b70547 100644 --- a/Spark/pom.xml +++ b/Spark/pom.xml @@ -33,6 +33,11 @@ slf4j-log4j12 1.7.30 + + mysql + mysql-connector-java + 8.0.33 + log4j diff --git a/Spark/src/main/scala/com/aisi/spark/TestMovieWithSql.scala b/Spark/src/main/scala/com/aisi/spark/TestMovieWithPureSql.scala similarity index 71% rename from Spark/src/main/scala/com/aisi/spark/TestMovieWithSql.scala rename to Spark/src/main/scala/com/aisi/spark/TestMovieWithPureSql.scala index f9cbb00..d235aeb 100644 --- a/Spark/src/main/scala/com/aisi/spark/TestMovieWithSql.scala +++ b/Spark/src/main/scala/com/aisi/spark/TestMovieWithPureSql.scala @@ -1,11 +1,10 @@ package com.aisi.spark -import org.apache.spark.rdd.RDD import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} -object TestMovieWithSql { +object TestMovieWithPureSql { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("movie") @@ -35,15 +34,33 @@ object TestMovieWithSql { }).toDF("userId", "movieId", "score") df1.limit(10).show() import org.apache.spark.sql.functions._ - df.join(df1,"movieId").groupBy("userId","movieType") - .agg(count("userId").as("cnt")) - .withColumn("rn",row_number().over(Window.partitionBy("userId").orderBy($"cnt".desc))) - .where("rn = 1") // 取到 userId 分区的 第一个,就是 cnt最大的那条数据 - .show() -// -// row_number():这是一个窗口函数,用来为分组中的每一行分配一个唯一的行号。行号按照排序规则依次递增。 -// Window.partitionBy("userId"):partitionBy 会根据 userId 划分数据,即对于每个 userId 来独立计算行号,这意味着每个用户的数据被视为一个独立的分区。 -// orderBy($"cnt".desc):在每个 userId 分区内,数据会根据 cnt(即该用户对某种电影类型的观看次数)降序排列。行号根据这种排序结果分配。 + + df.createTempView("movie") //("movieId", "movieName", "movieType") + df1.createTempView("ratings") // ("userId", "movieId", "score") + // cnt 即为该用户 所看影片出现类型(movieType)最多的 次数count + sqlSc.sql( + """ + |SELECT userId, movieType, cnt, rn + |FROM ( + | SELECT *, + | row_number() OVER (PARTITION BY userId ORDER BY cnt DESC) AS rn + | FROM ( + | SELECT COUNT(1) AS cnt, + | userId, + | movieType + | FROM ( + | SELECT userId, + | movieType + | FROM movie m + | JOIN ratings r ON m.movieId = r.movieId + | ) t + | GROUP BY userId, movieType + | ) t1 + |) t2 + |WHERE rn = 1 + |""".stripMargin + ).show(20) + // +------+---------+---+---+ // |userId|movieType|cnt| rn| diff --git a/Spark/src/main/scala/com/aisi/spark/TestMovieWithSqlApi.scala b/Spark/src/main/scala/com/aisi/spark/TestMovieWithSqlApi.scala new file mode 100644 index 0000000..a69911d --- /dev/null +++ b/Spark/src/main/scala/com/aisi/spark/TestMovieWithSqlApi.scala @@ -0,0 +1,58 @@ +package com.aisi.spark + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkConf, SparkContext} + +/** + * 每个用户最喜欢哪个类型的电影 + * 每个类型中最受欢迎的前三个电影? + * 然后给用户推荐 + */ +object TestMovieWithSqlApi { + def main(args: Array[String]): Unit = { + val conf = new SparkConf() + conf.setAppName("movie") + conf.setMaster("local[*]") + // conf.set("spark.shuffle.partitions", "20") + val sc = new SparkContext(conf) + val sqlSc = new SQLContext(sc) + + import sqlSc.implicits._ + val df: DataFrame = sc.textFile("Spark/data/movies.txt") + .flatMap(t => { + val line = t.split(",") + val movieId = line(0) + val movieTypes = line.reverse.head + val movieName = line.tail.reverse.tail.reverse.mkString(" ") + // movieTypes.split("\\|").map(movieRecord(movieId, movieName, _)) + movieTypes.split("\\|").map(movieType => (movieId, movieName, movieType)) // 返回三元组 + }).toDF("movieId", "movieName", "movieType") + // df.limit(10).show() + val df1 = sc.textFile("Spark/data/ratings.txt") + .map(t => { + val line = t.split(",") + val userId = line(0) + val movieId = line(1) + val score = line(2).toDouble + (userId, movieId, score) + }).toDF("userId", "movieId", "score") + df1.limit(10).show() + import org.apache.spark.sql.functions._ + val df11 = df.join(df1, "movieId").groupBy("userId", "movieType") + .agg(count("userId").as("cnt")) + .withColumn("rn", row_number().over(Window.partitionBy("userId").orderBy($"cnt".desc))) + .where("rn = 1") // 取到 userId 分区的 第一个,就是 cnt最大的那条数据 + .select("userId", "movieType") + val df22 = df.join(df1, "movieId").groupBy("movieType", "movieName") + .agg(avg("score").as("avg")) + .withColumn("rn", row_number().over(Window.partitionBy("movieType").orderBy($"avg".desc))) + .where("rn < 4") + .select("movieType", "movieName") + df11.join(df22, "movieType").show() + + } + +} +//case class movieRecord(var movieId:String,var movieName:String, var movieTypes:String) diff --git a/Spark/src/main/scala/com/aisi/spark/TestMovieWithWriteApi.scala b/Spark/src/main/scala/com/aisi/spark/TestMovieWithWriteApi.scala new file mode 100644 index 0000000..18052df --- /dev/null +++ b/Spark/src/main/scala/com/aisi/spark/TestMovieWithWriteApi.scala @@ -0,0 +1,76 @@ +package com.aisi.spark + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkConf, SparkContext} +import java.util.Properties + +/** + * 每个用户最喜欢哪个类型的电影 + * 每个类型中最受欢迎的前三个电影? + * 然后给用户推荐 + */ +object TestMovieWithWriteApi { + def main(args: Array[String]): Unit = { + val conf = new SparkConf() + conf.setAppName("movie") + conf.setMaster("local[*]") + // 添加禁用文件权限检查的配置 + conf.set("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") + + val sc = new SparkContext(conf) + val sqlSc = new SQLContext(sc) + + import sqlSc.implicits._ + + // 读取数据并转换为 DataFrame + val df: DataFrame = sc.textFile("Spark/data/movies.txt") + .flatMap(t => { + val line = t.split(",") + val movieId = line(0) + val movieTypes = line.reverse.head + val movieName = line.tail.reverse.tail.reverse.mkString(" ") + // 将电影类型拆分为多个记录 + movieTypes.split("\\|").map(movieType => (movieId, movieName, movieType)) // 返回三元组 + }).toDF("movieId", "movieName", "movieType").limit(15) + + // 打印 DataFrame 内容 + df.show() + // + // (1) 通过 JDBC 存储到 MySql + // // 设置数据库连接属性 + // val properties = new Properties() + // properties.put("user", "root") + // properties.put("password", "root") + // properties.put("driver", "com.mysql.cj.jdbc.Driver") // 指定 MySQL JDBC 驱动类 + // import org.apache.spark.sql.functions._ + // // 添加 id 列,但在写入数据库时省略它 + // val dfWithId = df.withColumn("id", monotonically_increasing_id()) + // // 选择需要写入 MySQL 的列,省略 id 列 + // val finalDf = dfWithId.select("movieId", "movieName", "movieType") + // // 写入 MySQL 数据库 + // finalDf.write.mode("append").jdbc("jdbc:mysql://localhost:3306/test", "movie", properties) + + // (2) 存储为 CSV 格式 + // df.write.csv("Spark/data/csv") + // df.write.csv("data/csv") + // df.write.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2") + // .save("Spark/data/csv") + + // (3) 以 parquet 写 + // df.write.parquet("Spark/data/parquet") + // df.write.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2") + // .save("Spark/data/parquet") + + + // (4) 以 json 写 + // val df1 = sqlSc.read.json("Spark/data/stu.json") + // df1.select("_corrupt_record").show(false) + // df1.write.mode("overwrite").format("json").save("Spark/data/json") + + + // (5) 以 text文本写 + // val df1 = sqlSc.read.text("Spark/data/word.txt") + // df1.write.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2") + // .save("Spark/data/text") + } +} diff --git a/Spark/src/main/scala/com/aisi/spark/TestSparkPureSql.scala b/Spark/src/main/scala/com/aisi/spark/TestSparkPureSql.scala new file mode 100644 index 0000000..18d25e6 --- /dev/null +++ b/Spark/src/main/scala/com/aisi/spark/TestSparkPureSql.scala @@ -0,0 +1,45 @@ +package com.aisi.spark + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.{SparkConf, SparkContext} // 导入 Window + +object TestSparkPureSql { + def main(args: Array[String]): Unit = { + val conf = new SparkConf() + conf.setMaster("local[*]") + conf.setAppName("test sql") + val sc = new SparkContext(conf) + val sqlSc = new SQLContext(sc) + //引入环境信息 + import sqlSc.implicits._ + val rdd: RDD[(Int, String, Int, String)] = sc.textFile("Spark/data/a.txt") + .map(t => { + val line = t.split(" ") + val id = line(0).toInt + val name = line(1) + val age = line(2).toInt + val gender = line(3) + (id, name, age, gender) + }) + val df: DataFrame = rdd.toDF("id", "name", "age", "gender") + df.show() //展示表数据 + df.printSchema() //展示表格字段信息 + // df.where("age > 20").groupBy("gender").sum("age").show() + // df.orderBy($"age".desc).show() + // 聚合并排序 +// val result = df.groupBy("gender") +// .agg( +// count("id").as("count_id"), // 计数 id +// sum("age").as("sum_age") // 求和 age +// ) +// .orderBy($"sum_age".desc) // 按 sum_age 降序排序 +// result.show() + + df.createTempView("stu") + val df1 = sqlSc.sql("select count(1) as gender_count,gender from stu group by gender") + df1.show() + } +} diff --git a/Spark/src/main/scala/com/aisi/spark/TestUDAF.scala b/Spark/src/main/scala/com/aisi/spark/TestUDAF.scala new file mode 100644 index 0000000..cdca619 --- /dev/null +++ b/Spark/src/main/scala/com/aisi/spark/TestUDAF.scala @@ -0,0 +1,42 @@ +package com.aisi.spark + + +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions} + +/** + * UDAF 的定义和使用 + * 计算所有学生的年龄和 + */ +object TestUDAF { + def main(args: Array[String]): Unit = { + val sparkSession = SparkSession.builder().appName("testUDAF").master("local[*]").getOrCreate() + import sparkSession.implicits._ + val df: DataFrame = sparkSession.sparkContext.textFile("Spark/data/a.txt") + .map(t => { + val line = t.split(" ") + (line(0), line(1), line(2).toInt, line(3)) + }).toDF("id", "name", "age", "gender") + import org.apache.spark.sql.functions._ + // 传统的方法 + // val df1 = df.agg(avg("age")).show() + // val df2 = df.groupBy("gender").avg("age").show() + val mySum = new MySum() + sparkSession.udf.register("mysum",functions.udaf(mySum)) + df.createTempView("stu") + sparkSession.sql("select mysum(age) from stu").show() + } +} +class MySum extends Aggregator[Int,Int,Int]{ + override def zero: Int = 0 + + override def reduce(b: Int, a: Int): Int = b+a + + override def merge(b1: Int, b2: Int): Int = b1+b2 + + override def finish(reduction: Int): Int = reduction + + override def bufferEncoder: Encoder[Int] = Encoders.scalaInt + + override def outputEncoder: Encoder[Int] = Encoders.scalaInt +} diff --git a/Spark/src/main/scala/com/aisi/spark/TestUDF.scala b/Spark/src/main/scala/com/aisi/spark/TestUDF.scala new file mode 100644 index 0000000..e4f56ff --- /dev/null +++ b/Spark/src/main/scala/com/aisi/spark/TestUDF.scala @@ -0,0 +1,30 @@ +package com.aisi.spark + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SparkSession} + +/** + * UDF 的定义和使用 + * 统计每个人的一年的收入 : 薪资+奖金 salary * 12 + bonus + */ +object TestUDF { + def main(args: Array[String]): Unit = { + val sparkSession = SparkSession.builder().appName("testUDF").master("local[*]").getOrCreate() + import sparkSession.implicits._ + val df: DataFrame = sparkSession.sparkContext.textFile("Spark/data/salary.txt") + .map(t => { + val line = t.split(" ") + (line(0), line(1), line(2).toInt, line(3).toInt) + }).toDF("id", "name", "salary", "bonus") + sparkSession.udf.register("all_income", (salary: Int, bonus: Int) => salary * 12 + bonus) + df.createTempView("salary") + // 使用 SQL 方式调用 UDF + sparkSession.sql("select id,name, all_income(salary,bonus) all from salary").show() + import org.apache.spark.sql.functions + // 使用 SQL API 方式调用 UDF + df.withColumn("all",functions.callUDF("all_income",$"salary",$"bonus")) + .select("id","name","all") + .show() + + } +} diff --git a/Spark/src/main/scala/com/aisi/spark/TestWithSparkSession.scala b/Spark/src/main/scala/com/aisi/spark/TestWithSparkSession.scala new file mode 100644 index 0000000..7371dd6 --- /dev/null +++ b/Spark/src/main/scala/com/aisi/spark/TestWithSparkSession.scala @@ -0,0 +1,43 @@ +package com.aisi.spark + +import org.apache.spark.sql.SparkSession + +/** + * 在 Spark 中同时使用 Hive 和临时视图不会冲突,你可以在同一个 SparkSession 中同时使用它们。 + * 不会冲突的原因: + * 作用范围: 临时视图只在当前 Spark 会话中有效,而 Hive 表是全局的。使用临时视图不会干扰到 Hive 表。 + * 名称冲突: 如果临时视图和 Hive 表具有相同名称,Spark 会优先使用临时视图。 + * 在 SQL 查询中使用表名时,Spark 会首先查找临时视图,如果找不到再查找 Hive 表。这种行为可以帮助避免命名冲突的问题。 + */ +object TestWithSparkSession { + def main(args: Array[String]): Unit = { + val sparkSession = SparkSession.builder() + .enableHiveSupport() // 启用 Hive 支持 + .master("local[*]") + .appName("test") + .getOrCreate() + + // 创建示例 DataFrame + import sparkSession.implicits._ + val data = Seq( + ("John", 30), + ("Doe", 25) + ) + val df = data.toDF("name", "age") + + // 创建临时视图 + df.createOrReplaceTempView("people") + + // 使用 Spark SQL 查询临时视图 + val resultTempView = sparkSession.sql("SELECT * FROM people WHERE age > 28") + resultTempView.show() + + // 假设 Hive 中有一个表叫 "hive_people" + // 使用 Spark SQL 查询 Hive 表 + val resultHive = sparkSession.sql("SELECT * FROM hive_people WHERE age > 28") + resultHive.show() + + // 停止 SparkSession + sparkSession.stop() + } +}