This commit is contained in:
2024-10-11 11:12:32 +08:00
commit 8b4a30b940
30 changed files with 1005115 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
package com.aisi;
//TIP 要<b>运行</b>代码,请按 <shortcut actionId="Run"/> 或
// 点击装订区域中的 <icon src="AllIcons.Actions.Execute"/> 图标。
public class Main {
public static void main(String[] args) {
//TIP 当文本光标位于高亮显示的文本处时按 <shortcut actionId="ShowIntentionActions"/>
// 查看 IntelliJ IDEA 建议如何修正。
System.out.printf("Hello and welcome!");
for (int i = 1; i <= 5; i++) {
//TIP 按 <shortcut actionId="Debug"/> 开始调试代码。我们已经设置了一个 <icon src="AllIcons.Debugger.Db_set_breakpoint"/> 断点
// 但您始终可以通过按 <shortcut actionId="ToggleLineBreakpoint"/> 添加更多断点。
System.out.println("i = " + i);
}
}
}

View File

@@ -0,0 +1,27 @@
package com.aisi.spark
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val config = new SparkConf()
config.setMaster("local")
config.setAppName("SH_wordcount")
// 禁用权限设置
config.set("spark.hadoop.dfs.permissions", "false")
val sc = new SparkContext(config)
// 使用 Windows 文件路径格式
val rdd = sc.textFile("D:/JetBrainsToolProject/IntelJ IDEA/HaiNiuProjects/Spark/data/word.txt")
val rdd1 = rdd.flatMap(_.split(" "))
val rdd2 = rdd1.map((_, 1))
val rdd3 = rdd2.groupBy(_._1)
val rdd4 = rdd3.mapValues(_.size)
// 保存结果到本地路径
rdd4.saveAsTextFile("D:/JetBrainsToolProject/IntelJ IDEA/HaiNiuProjects/Spark/data/res")
}
}

View File

@@ -0,0 +1,20 @@
package com.aisi.spark
import org.apache.spark.SparkContext
object WordCountForCluster {
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
val Array(input,output) = args
// 使用 Windows 文件路径格式
val rdd = sc.textFile(input)
val rdd1 = rdd.flatMap(_.split(" "))
val rdd2 = rdd1.map((_, 1))
val rdd3 = rdd2.groupBy(_._1)
val rdd4 = rdd3.mapValues(_.size)
// 保存结果到本地路径
rdd4.saveAsTextFile(output)
}
}

View File

@@ -0,0 +1,6 @@
log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n

View File

@@ -0,0 +1,19 @@
package com.aisi.spark
import org.apache.hive.jdbc.HiveDriver
import java.sql.DriverManager
object TestBeeline{
def main(args: Array[String]): Unit = {
classOf[HiveDriver]
val connection = DriverManager.getConnection("jdbc:hive2://nn1:20000", "hadoop", null)
val statement = connection.prepareStatement("SELECT count(1) as cnt from stu;")
val resultSet = statement.executeQuery()
while (resultSet.next()){
val cnt = resultSet.getLong("cnt")
println("stu表的总条数为" + cnt)
}
connection.close()
}
}

View File

@@ -0,0 +1,75 @@
package com.aisi.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object TestMovieWithSql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("movie")
conf.setMaster("local[*]")
conf.set("spark.shuffle.partitions", "20")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
import sqlSc.implicits._
val df: DataFrame = sc.textFile("Spark/data/movies.txt")
.flatMap(t => {
val line = t.split(",")
val movieId = line(0)
val movieTypes = line.reverse.head
val movieName = line.tail.reverse.tail.reverse.mkString(" ")
// movieTypes.split("\\|").map(movieRecord(movieId, movieName, _))
movieTypes.split("\\|").map(movieType => (movieId, movieName, movieType)) // 返回三元组
}).toDF("movieId", "movieName", "movieType")
// df.limit(10).show()
val df1 = sc.textFile("Spark/data/ratings.txt")
.map(t => {
val line = t.split(",")
val userId = line(0)
val movieId = line(1)
val score = line(2).toDouble
(userId, movieId, score)
}).toDF("userId", "movieId", "score")
df1.limit(10).show()
import org.apache.spark.sql.functions._
df.join(df1,"movieId").groupBy("userId","movieType")
.agg(count("userId").as("cnt"))
.withColumn("rn",row_number().over(Window.partitionBy("userId").orderBy($"cnt".desc)))
.where("rn = 1") // 取到 userId 分区的 第一个,就是 cnt最大的那条数据
.show()
//
// row_number():这是一个窗口函数,用来为分组中的每一行分配一个唯一的行号。行号按照排序规则依次递增。
// Window.partitionBy("userId")partitionBy 会根据 userId 划分数据,即对于每个 userId 来独立计算行号,这意味着每个用户的数据被视为一个独立的分区。
// orderBy($"cnt".desc):在每个 userId 分区内,数据会根据 cnt即该用户对某种电影类型的观看次数降序排列。行号根据这种排序结果分配。
// +------+---------+---+---+
// |userId|movieType|cnt| rn|
// +------+---------+---+---+
// | 1090| Drama| 47| 1|
// | 1159| Sci-Fi|102| 1|
// | 1436| Drama| 33| 1|
// | 1512| Comedy| 19| 1|
// | 1572| Thriller| 16| 1|
// | 2069| Comedy| 30| 1|
// | 2088| Sci-Fi|201| 1|
// | 2136| Comedy| 83| 1|
// | 2162| Comedy|105| 1|
// | 2294| Drama| 35| 1|
// | 2904| Drama| 34| 1|
// | 296| Horror| 42| 1|
// | 3210| Drama| 97| 1|
// | 3414| Action|181| 1|
// | 3606| Drama| 33| 1|
// | 3959| Comedy| 11| 1|
// | 4032| Action|113| 1|
// | 467| Romance| 34| 1|
// | 4821| Drama|100| 1|
// | 4937| Drama|136| 1|
// +------+---------+---+---+
}
}
//case class movieRecord(var movieId:String,var movieName:String, var movieTypes:String)

View File

@@ -0,0 +1,45 @@
package com.aisi.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._ // agg、count函数导入
import org.apache.spark.sql.expressions.Window // 导入 Window
object TestSparkSql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("test sql")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
//引入环境信息
import sqlSc.implicits._
val rdd: RDD[(Int, String, Int, String)] = sc.textFile("Spark/data/a.txt")
.map(t => {
val line = t.split(" ")
val id = line(0).toInt
val name = line(1)
val age = line(2).toInt
val gender = line(3)
(id, name, age, gender)
})
val df: DataFrame = rdd.toDF("id", "name", "age", "gender")
df.show() //展示表数据
df.printSchema() //展示表格字段信息
// df.where("age > 20").groupBy("gender").sum("age").show()
// df.orderBy($"age".desc).show()
// 聚合并排序
// val result = df.groupBy("gender")
// .agg(
// count("id").as("count_id"), // 计数 id
// sum("age").as("sum_age") // 求和 age
// )
// .orderBy($"sum_age".desc) // 按 sum_age 降序排序
// result.show()
df.withColumn("rn",row_number().over(Window.partitionBy("gender").orderBy($"age".desc)))
.where("rn = 1")
.show()
}
}