diff --git a/.idea/encodings.xml b/.idea/encodings.xml index 459beb2..747e584 100644 --- a/.idea/encodings.xml +++ b/.idea/encodings.xml @@ -1,6 +1,8 @@ + + diff --git a/Kafka/pom.xml b/Kafka/pom.xml new file mode 100644 index 0000000..b831beb --- /dev/null +++ b/Kafka/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + com.aisi + HaiNiuProjects + 1.0-SNAPSHOT + + + Kafka + + + 8 + 8 + UTF-8 + + + + org.apache.kafka + kafka-clients + 3.3.2 + + + org.slf4j + slf4j-log4j12 + 1.7.30 + + + log4j + log4j + 1.2.17 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + false + + + com.aisi.producer.ProducerWithObjectSerializer + + + + + + + + + + + \ No newline at end of file diff --git a/Kafka/src/main/java/com/aisi/consumer/Consumer1.java b/Kafka/src/main/java/com/aisi/consumer/Consumer1.java new file mode 100644 index 0000000..9dc4af3 --- /dev/null +++ b/Kafka/src/main/java/com/aisi/consumer/Consumer1.java @@ -0,0 +1,32 @@ +package com.aisi.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.protocol.types.Field; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +public class Consumer1 { + public static void main(String[] args) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "localhost:9092"); + properties.put("group.id", "aisi-group"); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer consumer = new KafkaConsumer(properties); + List topics = Arrays.asList("topic_a", "topic_b"); // 将每个主题作为独立的字符串 + consumer.subscribe(topics); + + consumer.subscribe(topics); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value()); + } + } + } +} diff --git a/Kafka/src/main/java/com/aisi/consumer/ConsumerWithCooperativeStickyAssignor.java b/Kafka/src/main/java/com/aisi/consumer/ConsumerWithCooperativeStickyAssignor.java new file mode 100644 index 0000000..b9547a8 --- /dev/null +++ b/Kafka/src/main/java/com/aisi/consumer/ConsumerWithCooperativeStickyAssignor.java @@ -0,0 +1,49 @@ +package com.aisi.consumer; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +public class ConsumerWithCooperativeStickyAssignor { + public static void main(String[] args) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "localhost:9092"); + properties.put("group.id", "aisi-group"); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + //设定分区分配策略为 cooperative-sticky + properties.put("partition.assignment.strategy", CooperativeStickyAssignor.class.getName()); + //设定consumer断开超时时间最小不能小于6s + properties.put("session.timeout.ms", 6000); + KafkaConsumer consumer = new KafkaConsumer(properties); + List topics = Arrays.asList("topic_d","topic_e"); + consumer.subscribe(topics); + + consumer.subscribe(topics, new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + for (TopicPartition topicPartition : partitions) { + System.out.println("分区移除:::::"+topicPartition.topic() + ":" + topicPartition.partition()); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + for (TopicPartition topicPartition : partitions) { + System.out.println("分区添加:::::"+topicPartition.topic() + ":" + topicPartition.partition()); + } + } + }); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value()); + } + } + } +} diff --git a/Kafka/src/main/java/com/aisi/consumer/ConsumerWithDeserializer.java b/Kafka/src/main/java/com/aisi/consumer/ConsumerWithDeserializer.java new file mode 100644 index 0000000..2babed0 --- /dev/null +++ b/Kafka/src/main/java/com/aisi/consumer/ConsumerWithDeserializer.java @@ -0,0 +1,40 @@ +package com.aisi.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +public class ConsumerWithDeserializer { + public static void main(String[] args) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "localhost:9092"); + properties.put("group.id", "aisi-group"); + properties.put("key.deserializer", MyStringDeserializer.class.getName()); + properties.put("value.deserializer", MyStringDeserializer.class.getName()); + KafkaConsumer consumer = new KafkaConsumer(properties); + List topics = Arrays.asList("topic_a", "topic_b"); // 将每个主题作为独立的字符串 + consumer.subscribe(topics); + + consumer.subscribe(topics); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value()); + } + } + } + + public static class MyStringDeserializer implements Deserializer { + @Override + public String deserialize(String topic, byte[] data) { + return new String(data, StandardCharsets.UTF_8); + } + } +} diff --git a/Kafka/src/main/java/com/aisi/consumer/ConsumerWithRangeAssignor.java b/Kafka/src/main/java/com/aisi/consumer/ConsumerWithRangeAssignor.java new file mode 100644 index 0000000..540a1af --- /dev/null +++ b/Kafka/src/main/java/com/aisi/consumer/ConsumerWithRangeAssignor.java @@ -0,0 +1,49 @@ +package com.aisi.consumer; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +public class ConsumerWithRangeAssignor { + public static void main(String[] args) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "localhost:9092"); + properties.put("group.id", "aisi-group"); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + //设定分区分配策略为range + properties.put("partition.assignment.strategy", RangeAssignor.class.getName()); + //设定consumer断开超时时间最小不能小于6s + properties.put("session.timeout.ms", 6000); + KafkaConsumer consumer = new KafkaConsumer(properties); + List topics = Arrays.asList("topic_d"); + consumer.subscribe(topics); + + consumer.subscribe(topics, new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + for (TopicPartition topicPartition : partitions) { + System.out.println("分区移除:::::"+topicPartition.topic() + ":" + topicPartition.partition()); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + for (TopicPartition topicPartition : partitions) { + System.out.println("分区添加:::::"+topicPartition.topic() + ":" + topicPartition.partition()); + } + } + }); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value()); + } + } + } +} diff --git a/Kafka/src/main/java/com/aisi/consumer/ConsumerWithRoundRobinAssignor.java b/Kafka/src/main/java/com/aisi/consumer/ConsumerWithRoundRobinAssignor.java new file mode 100644 index 0000000..8b62544 --- /dev/null +++ b/Kafka/src/main/java/com/aisi/consumer/ConsumerWithRoundRobinAssignor.java @@ -0,0 +1,49 @@ +package com.aisi.consumer; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +public class ConsumerWithRoundRobinAssignor { + public static void main(String[] args) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "localhost:9092"); + properties.put("group.id", "aisi-group"); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + //设定分区分配策略为 round-robin + properties.put("partition.assignment.strategy", RoundRobinAssignor.class.getName()); + //设定consumer断开超时时间最小不能小于6s + properties.put("session.timeout.ms", 6000); + KafkaConsumer consumer = new KafkaConsumer(properties); + List topics = Arrays.asList("topic_d","topic_e"); + consumer.subscribe(topics); + + consumer.subscribe(topics, new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + for (TopicPartition topicPartition : partitions) { + System.out.println("分区移除:::::"+topicPartition.topic() + ":" + topicPartition.partition()); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + for (TopicPartition topicPartition : partitions) { + System.out.println("分区添加:::::"+topicPartition.topic() + ":" + topicPartition.partition()); + } + } + }); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value()); + } + } + } +} diff --git a/Kafka/src/main/java/com/aisi/consumer/ConsumerWithStickyAssignor.java b/Kafka/src/main/java/com/aisi/consumer/ConsumerWithStickyAssignor.java new file mode 100644 index 0000000..cce1f8c --- /dev/null +++ b/Kafka/src/main/java/com/aisi/consumer/ConsumerWithStickyAssignor.java @@ -0,0 +1,49 @@ +package com.aisi.consumer; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +public class ConsumerWithStickyAssignor { + public static void main(String[] args) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "localhost:9092"); + properties.put("group.id", "aisi-group"); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + //设定分区分配策略为 sticky + properties.put("partition.assignment.strategy", StickyAssignor.class.getName()); + //设定consumer断开超时时间最小不能小于6s + properties.put("session.timeout.ms", 6000); + KafkaConsumer consumer = new KafkaConsumer(properties); + List topics = Arrays.asList("topic_d","topic_e"); + consumer.subscribe(topics); + + consumer.subscribe(topics, new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + for (TopicPartition topicPartition : partitions) { + System.out.println("分区移除:::::"+topicPartition.topic() + ":" + topicPartition.partition()); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + for (TopicPartition topicPartition : partitions) { + System.out.println("分区添加:::::"+topicPartition.topic() + ":" + topicPartition.partition()); + } + } + }); + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value()); + } + } + } +} diff --git a/Kafka/src/main/java/com/aisi/producer/Producer1.java b/Kafka/src/main/java/com/aisi/producer/Producer1.java new file mode 100644 index 0000000..490127d --- /dev/null +++ b/Kafka/src/main/java/com/aisi/producer/Producer1.java @@ -0,0 +1,78 @@ +package com.aisi.producer; + +import org.apache.kafka.clients.producer.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Properties; + +public class Producer1 { + private static final Logger log = LoggerFactory.getLogger(Producer1.class); + + public static void main(String[] args) throws Exception { + KafkaProducer kafkaProducer = getStringStringKafkaProducer(); + for (int i = 0; i < 5; i++) { + ProducerRecord record = new ProducerRecord<>("topic_b", "message-value: " + i); + kafkaProducer.send(record, (metadata, exception) -> { + if (exception == null) { + // 消息发送成功 + System.out.println("Message sent successfully to topic: " + metadata.topic() + + " partition: " + metadata.partition() + + " offset: " + metadata.offset()); + } else { + // 消息发送失败 + exception.printStackTrace(); + } + }); + } + + // 确保所有消息都被发送 + kafkaProducer.flush(); + // 关闭生产者 + kafkaProducer.close(); + } + + private static KafkaProducer getStringStringKafkaProducer() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "nn1:9092"); + properties.put("batch.size", 16384); + properties.put("acks", "all"); // 建议将-1改为all + properties.put("retries", 3); + properties.put("linger.ms", 50); + properties.put("buffer.memory", 33554432); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyInterceptor.class.getName()); + return new KafkaProducer<>(properties); + } + + public static class MyInterceptor implements ProducerInterceptor { + + @Override + public ProducerRecord onSend(ProducerRecord producerRecord) { + // 对消息进行处理,例如添加时间戳等 + return new ProducerRecord<>(producerRecord.topic(), System.currentTimeMillis() + "-" + producerRecord.value()); + } + + @Override + public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { + if (e != null) { + log.error("Failed to send message with error: {}", e.getMessage()); + }else { + log.info("Successfully sent message to topic {}, partition {}, offset {}", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); + } + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } + } +} diff --git a/Kafka/src/main/java/com/aisi/producer/ProducerWithObjectSerializer.java b/Kafka/src/main/java/com/aisi/producer/ProducerWithObjectSerializer.java new file mode 100644 index 0000000..2bdabd5 --- /dev/null +++ b/Kafka/src/main/java/com/aisi/producer/ProducerWithObjectSerializer.java @@ -0,0 +1,124 @@ +package com.aisi.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +public class ProducerWithObjectSerializer { + private static final Logger log = LoggerFactory.getLogger(ProducerWithObjectSerializer.class); + + public static void main(String[] args) throws Exception { + KafkaProducer kafkaProducer = getStringStringKafkaProducer(); + + Student stu1 = new Student(); + stu1.setName("张三"); + stu1.setAge(18); + stu1.setScore(100); + Student stu2 = new Student(); + stu2.setName("李四"); + stu2.setAge(24); + stu2.setScore(100); + kafkaProducer.send(new ProducerRecord<>("student", 1,"blog-1", stu1) , (metadata, exception) -> { + if (exception != null) { + log.error("发送消息失败", exception); + }else { + log.info("发送消息成功, offset: " + metadata.offset()); + log.info("发送消息成功, topic: " + metadata.topic()); + log.info("发送消息成功, partition: " + metadata.partition()); + log.info("发送消息成功, timestamp: " + metadata.timestamp()); + } + }); + kafkaProducer.send(new ProducerRecord<>("student", 2,"blog-2", stu2), (metadata, exception) -> { + if (exception != null) { + log.error("发送消息失败", exception); + }else { + log.info("发送消息成功, offset: " + metadata.offset()); + log.info("发送消息成功, topic: " + metadata.topic()); + log.info("发送消息成功, partition: " + metadata.partition()); + log.info("发送消息成功, timestamp: " + metadata.timestamp()); + } + }); + + // 确保所有消息都被发送 + kafkaProducer.flush(); + // 关闭生产者 + kafkaProducer.close(); + } + + private static KafkaProducer getStringStringKafkaProducer() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "nn1:9092"); + properties.put("batch.size", 16384); + properties.put("acks", "all"); + properties.put("retries", 3); + properties.put("linger.ms", 50); + properties.put("buffer.memory", 33554432); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", StudentSerializer.class.getName()); + return new KafkaProducer<>(properties); + } + + public static class Student implements Serializable { + private String name; + + private int score; + private int age; + + public Student(String name, int score, int age) { + this.name = name; + this.score = score; + this.age = age; + } + + public Student() { + + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getScore() { + return score; + } + + public void setScore(int score) { + this.score = score; + } + + public int getAge() { + return age; + } + + public void setAge(int age) { + this.age = age; + } + + @Override + public String toString() { + return "Student{" + + "name='" + name + '\'' + + ", score=" + score + + ", age=" + age + + '}'; + } + } + + public static class StudentSerializer implements Serializer { + @Override + public byte[] serialize(String s, Student student) { + return student.toString().getBytes(StandardCharsets.UTF_8); + } + } + +} \ No newline at end of file diff --git a/Kafka/src/main/java/com/aisi/producer/ProducerWithStringSerializer.java b/Kafka/src/main/java/com/aisi/producer/ProducerWithStringSerializer.java new file mode 100644 index 0000000..a769238 --- /dev/null +++ b/Kafka/src/main/java/com/aisi/producer/ProducerWithStringSerializer.java @@ -0,0 +1,59 @@ +package com.aisi.producer; + +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +public class ProducerWithStringSerializer { + private static final Logger log = LoggerFactory.getLogger(ProducerWithStringSerializer.class); + + public static void main(String[] args) throws Exception { + KafkaProducer kafkaProducer = getStringStringKafkaProducer(); + for (int i = 0; i < 5; i++) { + ProducerRecord record = new ProducerRecord<>("topic_b","key_:"+i,"message-value: " + i); + kafkaProducer.send(record, (metadata, exception) -> { + if (exception == null) { + // 消息发送成功 + System.out.println("Message sent successfully to topic: " + metadata.topic() + + " partition: " + metadata.partition() + + " offset: " + metadata.offset()); + } else { + // 消息发送失败 + exception.printStackTrace(); + } + }); + } + + // 确保所有消息都被发送 + kafkaProducer.flush(); + // 关闭生产者 + kafkaProducer.close(); + } + + private static KafkaProducer getStringStringKafkaProducer() { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "nn1:9092"); + properties.put("batch.size", 16384); + properties.put("acks", "all"); + properties.put("retries", 3); + properties.put("linger.ms", 50); + properties.put("buffer.memory", 33554432); + properties.put("key.serializer", MyStringSerializer.class.getName()); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + return new KafkaProducer<>(properties); + } + + public static class MyStringSerializer implements Serializer { + @Override + public byte[] serialize(String topic, String data) { +// log.warn("serialize method called with parameters: {}, {}", s, s2); + + return data.getBytes(StandardCharsets.UTF_8); + } + } + +} \ No newline at end of file diff --git a/Kafka/src/main/java/com/aisi/producer/ProducerWithUDPartitioner.java b/Kafka/src/main/java/com/aisi/producer/ProducerWithUDPartitioner.java new file mode 100644 index 0000000..6dd13f7 --- /dev/null +++ b/Kafka/src/main/java/com/aisi/producer/ProducerWithUDPartitioner.java @@ -0,0 +1,53 @@ +package com.aisi.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Cluster; + +import java.util.Map; +import java.util.Properties; + +public class ProducerWithUDPartitioner { + public static void main(String[] args) { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "nn1:9092"); + properties.put("batch.size", 16384); + properties.put("acks", "all"); // 建议将-1改为all + properties.put("retries", 3); + properties.put("linger.ms", 50); + properties.put("buffer.memory", 33554432); + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class); + KafkaProducer kafkaProducer = new KafkaProducer<>(properties); + for (int i = 0; i < 10; i++) { + kafkaProducer.send(new ProducerRecord<>("topic1", "key" + i, "value" + i)); + } + + kafkaProducer.close(); + + + } + public static class MyPartitioner implements Partitioner { + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + if (keyBytes == null) { + return 0; + } + return 0; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + + } + } +} diff --git a/Kafka/src/main/resources/log4j.properties b/Kafka/src/main/resources/log4j.properties new file mode 100644 index 0000000..4b91e0c --- /dev/null +++ b/Kafka/src/main/resources/log4j.properties @@ -0,0 +1,5 @@ +log4j.rootLogger=info,console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n \ No newline at end of file diff --git a/MapReduceTest/data/file.txt b/MapReduceTest/data/file.txt new file mode 100644 index 0000000..82860ba --- /dev/null +++ b/MapReduceTest/data/file.txt @@ -0,0 +1,10 @@ +2024-10-01 www.example.com +2024-10-01 www.example.com +2024-10-01 www.test.com +2024-10-02 www.example.com +2024-10-02 www.test.com +2024-10-02 www.sample.com +2024-10-03 www.example.com +2024-10-03 www.test.com +2024-10-03 www.sample.com +2024-10-03 www.example.com diff --git a/MapReduceTest/pom.xml b/MapReduceTest/pom.xml index 10098a7..cff21a1 100644 --- a/MapReduceTest/pom.xml +++ b/MapReduceTest/pom.xml @@ -33,6 +33,14 @@ log4j 1.2.17 + + org.junit.jupiter + junit-jupiter + 5.9.3 + test + + + @@ -60,28 +68,28 @@ true - true + ${project.build.directory}/${project.build.finalName}-shaded.jar - - - org.apache.commons - shade.org.apache.commons - - + + + + + + - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - + + + + + + + + - com.aisi.wordcount.WordCountDriver + com.aisi.accesscount.VisitCountDriver diff --git a/MapReduceTest/src/main/java/com/aisi/accesscount/SortMapper.java b/MapReduceTest/src/main/java/com/aisi/accesscount/SortMapper.java new file mode 100644 index 0000000..e63b8b2 --- /dev/null +++ b/MapReduceTest/src/main/java/com/aisi/accesscount/SortMapper.java @@ -0,0 +1,22 @@ +package com.aisi.accesscount; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +public class SortMapper extends Mapper { + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + // 数据格式:日期 总访问次数 + String[] fields = value.toString().split("\t"); + if (fields.length == 2) { + String date = fields[0]; + int count = Integer.parseInt(fields[1]); + // 以访问次数作为 key,日期作为 value + context.write(new IntWritable(count), new Text(date)); + } + } +} diff --git a/MapReduceTest/src/main/java/com/aisi/accesscount/SortReducer.java b/MapReduceTest/src/main/java/com/aisi/accesscount/SortReducer.java new file mode 100644 index 0000000..f537bbc --- /dev/null +++ b/MapReduceTest/src/main/java/com/aisi/accesscount/SortReducer.java @@ -0,0 +1,16 @@ +package com.aisi.accesscount; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class SortReducer extends Reducer { + @Override + protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { + for (Text date : values) { + context.write(date, key); + } + } +} diff --git a/MapReduceTest/src/main/java/com/aisi/accesscount/VisitCountDriver.java b/MapReduceTest/src/main/java/com/aisi/accesscount/VisitCountDriver.java new file mode 100644 index 0000000..8661ef9 --- /dev/null +++ b/MapReduceTest/src/main/java/com/aisi/accesscount/VisitCountDriver.java @@ -0,0 +1,51 @@ +package com.aisi.accesscount; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +public class VisitCountDriver { + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + + // 第一个 Job:统计每日访问次数 + Job countJob = Job.getInstance(conf, "Visit Count"); + countJob.setJarByClass(VisitCountDriver.class); + countJob.setMapperClass(VisitCountMapper.class); + countJob.setReducerClass(VisitCountReducer.class); + + countJob.setMapOutputKeyClass(Text.class); + countJob.setMapOutputValueClass(IntWritable.class); + countJob.setOutputKeyClass(Text.class); + countJob.setOutputValueClass(IntWritable.class); + + FileInputFormat.addInputPath(countJob, new Path(args[0])); + Path tempOutput = new Path("temp_output"); + FileOutputFormat.setOutputPath(countJob, tempOutput); + + boolean countJobSuccess = countJob.waitForCompletion(true); + if (!countJobSuccess) { + System.exit(1); + } + + // 第二个 Job:将访问次数进行升序排序 + Job sortJob = Job.getInstance(conf, "Sort Visits"); + sortJob.setJarByClass(VisitCountDriver.class); + sortJob.setMapperClass(SortMapper.class); + sortJob.setReducerClass(SortReducer.class); + + sortJob.setMapOutputKeyClass(IntWritable.class); + sortJob.setMapOutputValueClass(Text.class); + sortJob.setOutputKeyClass(Text.class); + sortJob.setOutputValueClass(IntWritable.class); + + FileInputFormat.addInputPath(sortJob, tempOutput); + FileOutputFormat.setOutputPath(sortJob, new Path(args[1])); + + System.exit(sortJob.waitForCompletion(true) ? 0 : 1); + } +} diff --git a/MapReduceTest/src/main/java/com/aisi/accesscount/VisitCountMapper.java b/MapReduceTest/src/main/java/com/aisi/accesscount/VisitCountMapper.java new file mode 100644 index 0000000..9ea8775 --- /dev/null +++ b/MapReduceTest/src/main/java/com/aisi/accesscount/VisitCountMapper.java @@ -0,0 +1,20 @@ +package com.aisi.accesscount; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import java.io.IOException; + +public class VisitCountMapper extends Mapper { + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + // 解析输入的每一行数据,假设格式为 "日期 URL" + String[] fields = value.toString().split(" "); + if (fields.length == 2) { + String date = fields[0]; + context.write(new Text(date), new IntWritable(1)); + } + } +} \ No newline at end of file diff --git a/MapReduceTest/src/main/java/com/aisi/accesscount/VisitCountReducer.java b/MapReduceTest/src/main/java/com/aisi/accesscount/VisitCountReducer.java new file mode 100644 index 0000000..660c5bd --- /dev/null +++ b/MapReduceTest/src/main/java/com/aisi/accesscount/VisitCountReducer.java @@ -0,0 +1,18 @@ +package com.aisi.accesscount; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class VisitCountReducer extends Reducer { + @Override + protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + int sum = 0; + for (IntWritable value : values) { + sum += value.get(); + } + context.write(key, new IntWritable(sum)); + } +} \ No newline at end of file diff --git a/MapReduceTest/src/main/java/com/aisi/api/Test.java b/MapReduceTest/src/main/java/com/aisi/api/Test.java new file mode 100644 index 0000000..eff9cf3 --- /dev/null +++ b/MapReduceTest/src/main/java/com/aisi/api/Test.java @@ -0,0 +1,7 @@ +package com.aisi.api; + + + +public class Test { + +} diff --git a/MapReduceTest/src/test/java/Example.java b/MapReduceTest/src/test/java/Example.java new file mode 100644 index 0000000..e26bf0d --- /dev/null +++ b/MapReduceTest/src/test/java/Example.java @@ -0,0 +1,107 @@ +import org.apache.commons.compress.utils.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.*; + +public class Example { + static FileSystem fs = null; + @BeforeAll + public static void setup() throws IOException { + Configuration conf = new Configuration(); + fs = FileSystem.get(conf); + } + @AfterAll + public static void teardown() throws IOException { + if (fs != null) { + fs.close(); + } + } + @Test + public void list() throws IOException { + FileStatus[] fileStatuses = fs.listStatus(new Path("/")); + for (FileStatus fileStatus : fileStatuses) { + System.out.println(fileStatus.getPath()); + } + } + + @Test + public void mkdir() throws IOException { + boolean mkdirsed = fs.mkdirs(new Path("/test")); + if (mkdirsed) { + System.out.println("mkdirsed"); + }else + System.out.println("mkdir failed"); + } + + @Test + public void delete() throws IOException { + boolean deleted = fs.delete(new Path("/test"), true); + if (deleted) { + System.out.println("delete"); + }else + System.out.println("delete failed"); + } + + @Test + public void upload() throws IOException { + fs.copyFromLocalFile(new Path("d:\\tmp\\process.xml"), new Path("/test/process.xml")); + System.out.println("upload success"); + } + + @Test + public void download() throws IOException { + fs.copyToLocalFile(new Path("/test/process.xml"), new Path("d:\\tmp\\process_download.xml")); + System.out.println("download success"); + } + + @Test + public void read() throws IOException { + FSDataInputStream fsDataInputStream = fs.open(new Path("/test/process.xml")); + new BufferedReader(new InputStreamReader(fsDataInputStream)).lines().forEach(System.out::println); + fsDataInputStream.close(); + } + + @Test + public void write() throws IOException { + FSDataOutputStream fsDataOutputStream = fs.create(new Path("/test/process_replication.xml")); + FSDataInputStream fsDataInputStream = fs.open(new Path("/test/process.xml")); + BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream, "utf-8")); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, "utf-8")); + String line = ""; + while ((line = reader.readLine()) != null) { + writer.write(line); + writer.newLine(); + } + writer.close(); + reader.close(); + fsDataOutputStream.close(); + fsDataInputStream.close(); + } + + @Test + public void read1() throws IOException { + Configuration conf=new Configuration(); + //获取SequenceFile.Reader对象 + SequenceFile.Reader reader=new SequenceFile.Reader(fs,new Path("/example/part-m-00000"),conf); + //获取序列化中使用的键和值类型 + Text key=new Text(); + Text value=new Text(); + //将读取的数据写入janfeb.txt文件 + BufferedWriter out=new BufferedWriter(new OutputStreamWriter(new FileOutputStream("D:\\tmp\\5-12.txt"))); + while(reader.next(key,value)){ + out.write(key.toString()+"\t"+value.toString()+"\r\n"); + } + out.close(); + reader.close(); + } + + + +} diff --git a/MapReduceTest/src/test/java/SelectData.java b/MapReduceTest/src/test/java/SelectData.java new file mode 100644 index 0000000..35ea036 --- /dev/null +++ b/MapReduceTest/src/test/java/SelectData.java @@ -0,0 +1,56 @@ + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.conf.Configuration; + +//代码5-3 +import java.io.IOException; + +public class SelectData { + public static class MyMap extends Mapper { + public void map(Object key, Text value, Context context) throws IOException, InterruptedException { + String line = value.toString(); + String arr[] = line.split(","); + if (arr[4].contains("2021/1") || arr[4].contains("2021/2")) { + context.write(new Text(arr[2]), + new Text(arr[4].substring(0, 9))); + } + + + } + } + + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if(otherArgs.length<2){ + System.err.println("必须输入读取文件路径和输出路径"); + System.exit(2); + } + Job job =Job.getInstance(conf,"Select Data"); + job.setJarByClass(SelectData.class); + job.setMapperClass(MyMap.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + //设置输入格式 + job.setInputFormatClass(TextInputFormat.class); + //设置输出格式 + job.setOutputFormatClass(SequenceFileOutputFormat.class); + //设置reduce的任务数是0 + job.setNumReduceTasks(0); + for(int i=0;ihive-jdbc 3.1.2 + + + + org.apache.spark + spark-streaming_2.12 + 3.1.2 + diff --git a/Spark/src/main/java/com/aisi/spark/WordCount.scala b/Spark/src/main/java/com/aisi/sparkSql/WordCount.scala similarity index 96% rename from Spark/src/main/java/com/aisi/spark/WordCount.scala rename to Spark/src/main/java/com/aisi/sparkSql/WordCount.scala index dae298e..787690f 100644 --- a/Spark/src/main/java/com/aisi/spark/WordCount.scala +++ b/Spark/src/main/java/com/aisi/sparkSql/WordCount.scala @@ -1,4 +1,4 @@ -package com.aisi.spark +package com.aisi.sparkSql import org.apache.spark.{SparkConf, SparkContext} diff --git a/Spark/src/main/java/com/aisi/spark/WordCountForCluster.scala b/Spark/src/main/java/com/aisi/sparkSql/WordCountForCluster.scala similarity index 94% rename from Spark/src/main/java/com/aisi/spark/WordCountForCluster.scala rename to Spark/src/main/java/com/aisi/sparkSql/WordCountForCluster.scala index 1dce772..0246355 100644 --- a/Spark/src/main/java/com/aisi/spark/WordCountForCluster.scala +++ b/Spark/src/main/java/com/aisi/sparkSql/WordCountForCluster.scala @@ -1,4 +1,4 @@ -package com.aisi.spark +package com.aisi.sparkSql import org.apache.spark.SparkContext diff --git a/Spark/src/main/resources/log4j.properties b/Spark/src/main/resources/log4j.properties index a2bf17d..aa78009 100644 --- a/Spark/src/main/resources/log4j.properties +++ b/Spark/src/main/resources/log4j.properties @@ -1,4 +1,4 @@ -log4j.rootLogger=info,console +log4j.rootLogger=error,console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out diff --git a/Spark/src/main/scala/MockData.scala b/Spark/src/main/scala/MockData.scala new file mode 100644 index 0000000..941a887 --- /dev/null +++ b/Spark/src/main/scala/MockData.scala @@ -0,0 +1,115 @@ +import java.io.{File, PrintWriter} +import java.text.SimpleDateFormat +import java.util.{Date, Properties, Random} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} + +object MockData { + + def randomNum(index: Int, random: Random): String = { + var str = "" + for (i <- 0 until index) { + str += random.nextInt(10) + } + str + } + + def fillZero(random: Random, num: Int, index: Int): String = { + val randomNum = random.nextInt(num) + var randomNumStr = randomNum.toString + + if (randomNum < 10) { + randomNumStr = ("%0" + index + "d").format(randomNum) + } + + randomNumStr + } + + def initFile(path: String): PrintWriter = { + new PrintWriter(new File(path)) + } + + def writeDataToFile(pw: PrintWriter, content: String): Unit = { + pw.write(content + "\n") + pw.flush() + } + + def closeFile(pw: PrintWriter): Unit = { + pw.close() + } + + def initKafkaProducer(): KafkaProducer[String, String] = { + val props = new Properties() + props.put("bootstrap.servers", "localhost:9092") + props.put("acks", "all") + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") + + new KafkaProducer[String, String](props) + } + + def writeDataToKafka(producer: KafkaProducer[String, String], content: String): Unit = { + producer.send(new ProducerRecord[String, String]("RoadRealTimeLog", content)) + } + + def closeKafka(producer: KafkaProducer[String, String]): Unit = { + producer.close() + } + + def mock(): Unit = { + val pw = initFile("d:\\tmp\\data.txt") + val producer = initKafkaProducer() + val random = new Random() + val locations = Array("鲁", "京", "豫", "京", "沪", "赣", "津", "深", "黑", "粤") + val day = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) + + for (i <- 0 until 30) { + val car = locations(random.nextInt(10)) + (65 + random.nextInt(26)).asInstanceOf[Char] + randomNum(5, random) + var baseActionTime = day + " " + fillZero(random, 24, 2) + + for (j <- 0 until random.nextInt(300)) { + + if (j % 30 == 0 && j != 0) { + var nextHour = "" + val baseHourParts = baseActionTime.split(" ") + + if (baseHourParts.length > 1) { + val baseHour = baseHourParts(1) + if (baseHour.startsWith("0")) { + if (baseHour.endsWith("9")) { + nextHour = "10" + } else { + nextHour = "0" + (baseHour.substring(1).toInt + 1).toString + } + } else if (baseHour == "23") { + nextHour = fillZero(random, 24, 2) + } else { + nextHour = (baseHour.toInt + 1).toString + } + baseActionTime = day + " " + nextHour + } else { + baseActionTime = day + " 00" // 如果 baseActionTime 无法正确分割,默认使用 00 时 + } + } + + val actionTime = baseActionTime + ":" + fillZero(random, 60, 2) + ":" + fillZero(random, 60, 2) + val monitorId = fillZero(random, 10, 4) + val speed = random.nextInt(200) + 1 + val roadId = random.nextInt(50) + 1 + val cameraId = "0" + randomNum(4, random) + val areald = fillZero(random, random.nextInt(8) + 1, 2) + + val content = day + "\t" + monitorId + "\t" + cameraId + "\t" + car + "\t" + actionTime + "\t" + speed + "\t" + roadId + "\t" + areald + writeDataToFile(pw, content) + writeDataToKafka(producer, content) + Thread.sleep(50) + } + } + + closeFile(pw) + closeKafka(producer) + } + + def main(args: Array[String]): Unit = { + mock() + } +} diff --git a/Spark/src/main/scala/com/aisi/sparkSql/A1.scala b/Spark/src/main/scala/com/aisi/sparkSql/A1.scala new file mode 100644 index 0000000..0324293 --- /dev/null +++ b/Spark/src/main/scala/com/aisi/sparkSql/A1.scala @@ -0,0 +1,51 @@ +package com.aisi.sparkSql; + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ + +object A1 { + def main(args: Array[String]): Unit = { + val conf = new SparkConf() + conf.setMaster("local[*]") + conf.setAppName("user active") + val sc = new SparkContext(conf) + val sqlSc = new SQLContext(sc) + import sqlSc.implicits._ + + val userDF: DataFrame = sc.textFile("Spark/data/user.txt") + .map(t => { + val line = t.split(",") + val strDataTime = line(1).split("-") + val year = strDataTime(0) + val month = if (strDataTime(1).indexOf(0) == -1) strDataTime(1).substring(1) else strDataTime(1) + val day = if (strDataTime(2).indexOf(0) == -1) strDataTime(2).substring(1) else strDataTime(2) + (userRecord(line(0), year, month, day)) + }).toDF() + + userDF.show() + + // 根据用户ID和月份进行分组 + val groupedDF = userDF.groupBy("uid", "year", "month") + .agg(collect_list("day").as("days")) + + // 展开 days 列,并转换为整型 + val explodedDF = groupedDF + .withColumn("day", explode($"days")) + .withColumn("day", $"day".cast("int")) + + // 定义窗口函数,按用户、年份、月份排序天数 + val windowSpec = Window.partitionBy("uid", "year", "month").orderBy("day") + + // 计算相邻天数之间的差值 + val resultDF = explodedDF + .withColumn("prev_day", lag("day", 1).over(windowSpec)) + .withColumn("day_diff", $"day" - $"prev_day") + .withColumn("is_active", when($"day_diff" === 1, 1).otherwise(0)) + + resultDF.show() + } +} + +case class userRecord(uid: String, year: String, month: String, day: String) diff --git a/Spark/src/main/scala/com/aisi/sparkSql/A2.scala b/Spark/src/main/scala/com/aisi/sparkSql/A2.scala new file mode 100644 index 0000000..4686f59 --- /dev/null +++ b/Spark/src/main/scala/com/aisi/sparkSql/A2.scala @@ -0,0 +1,44 @@ +package com.aisi.sparkSql +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{DataFrame, RelationalGroupedDataset, SQLContext, SparkSession} + +import java.text.DateFormat +import java.time.format.DateTimeFormatter + +/** + * 计算连续活跃用户的记录 + */ +object A2 { + def main(args: Array[String]): Unit = { + val conf = new SparkConf() + conf.setMaster("local[*]") + conf.setAppName("shop count") + val sc = new SparkContext(conf) + val sqlSc = new SQLContext(sc) + import sqlSc.implicits._ + // sid,dt,money + val userDF: DataFrame = sc.textFile("Spark/data/shops.txt") + .map(t => { + val line = t.split(",") + val sid = line(0) + val strDataTime = line(1).split("-") + val year = strDataTime(0) + val month = if (strDataTime(1).indexOf(0) == -1 ) strDataTime(1).substring(1) else strDataTime(1) + val day = if (strDataTime(2).indexOf(0) == -1 ) strDataTime(2).substring(1) else strDataTime(2) + val money = line(2).toInt + shopRecord(sid,year, month, day,money) + }).toDF() + // userDF.show() + userDF.show() + // RelationalGroupedDataset: [grouping expressions: [sid: string, month: string], value: [sid: string, year: string ... 3 more fields], type: GroupBy] + val dataset = userDF.groupBy("sid","month") + println(dataset) + dataset.sum().show() + + // val sparkSession = SparkSession.builder().appName("user active").master("local[*]").getOrCreate() + // userDF.groupBy("") + // userDF.show() + } +} +case class shopRecord (sid:String, year:String,month:String,day:String,money:Int){} diff --git a/Spark/src/main/scala/com/aisi/sparkSql/MockData.scala b/Spark/src/main/scala/com/aisi/sparkSql/MockData.scala new file mode 100644 index 0000000..643c469 --- /dev/null +++ b/Spark/src/main/scala/com/aisi/sparkSql/MockData.scala @@ -0,0 +1,152 @@ +package com.aisi.sparkSql; + +import java.io.{File, PrintWriter} +import java.text.SimpleDateFormat +import java.util.{Date, Properties, Random} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} + +object MockData { + + def randomNum(index: Int, random: Random): String = { + + var str = "" + + for (i <- 0 until index) { + + str += random.nextInt(10) + + } + str + } + + def fillZero(random: Random, num: Int, index: Int): String = { + + val randomNum = random.nextInt(num) + + + var randomNumStr = randomNum.toString + + if (randomNum < 10) { + + randomNumStr = ("%0" + index + "d").format(randomNum) + + } + + randomNumStr + + } + + def initFile(path: String): PrintWriter = { + + new PrintWriter(new File(path)) + + } + + def writeDataToFile(pw: PrintWriter, content: String): Unit = { + pw.write(content + "\n") + + pw.flush() + } + + def closeFile(pw: PrintWriter): Unit = { + + pw.close() + + } + + def initKafkaProducer(): KafkaProducer[String,String] ={ + + val props = new Properties () + + props.put ("bootstrap.servers", "localhost:9092") + + props.put ("acks", "all") + + props.put ("key.serializer","org.apache.kafka.common.serialization.StringSerializer") + + props.put ("value.serializer","org.apache.kafka.common.serialization.StringSerializer") + + new KafkaProducer[String, String] (props) + + } + + def writeDataToKafka(producer:KafkaProducer[String,String],content:String):Unit = { + + producer.send(new ProducerRecord[String,String]("RoadRealTimeLog",content)) + + } + + def closeKafka(producer:KafkaProducer[String,String]):Unit = { + + producer.close() + } + + def mock(): Unit = { + + val pw = initFile("路径") + + val producer = initKafkaProducer() + + val random = new Random() + + val locations = Array("鲁","京","豫","京","沪","赣","津","深","黑","粤") + + val day = new SimpleDateFormat ("yyyy-MM-dd").format (new Date()) + + for(i<-0 until 3000) { + + val car = locations (random.nextInt (10)) + (65 + random.nextInt (26)).asInstanceOf[Char]+ randomNum(5, random) + + var baseActionTime = day +""+ fillZero(random, 24,2) + + for(j <- 0 until random.nextInt (300)) { + + if (j % 30 == 0&j!=0) { + var nextHour = "" + + val baseHour = baseActionTime.split(" ")(1) + + if (baseHour.startsWith("0")) { + + if (baseHour.endsWith("9")) { + + nextHour = "10" + + } else { + + nextHour = "0" + (baseHour.substring(1).toInt + 1).toString + } + } else if (baseHour == "23") { + nextHour = fillZero(random, 24, 2) + } else { + nextHour = (baseHour.toInt + 1).toString + } + baseActionTime = day + " " + nextHour + } + + val actionTime = baseActionTime + ":" + fillZero(random, 60, 2) + ":" + fillZero(random, 60, 2) + + val monitorId = fillZero(random, 10, 4) + + val speed = random.nextInt (200) +1 + + val roadId = random.nextInt (50)+1 + + val cameraId= "0"+ randomNum(4, random) + + val areald = fillZero(random, random.nextInt(8) +1, 2) + + val content = day + "\t"+ monitorId +"\t" + cameraId + "\t" + car + "\t" + actionTime + "\t" + speed + "\t" + roadId + "\t"+areald + + writeDataToFile(pw, content) + writeDataToKafka(producer,content) + Thread.sleep(50) + } + } + closeFile(pw) + closeKafka(producer) + } + def main(args:Array[String]):Unit = { + mock() + } +} \ No newline at end of file diff --git a/Spark/src/main/scala/com/aisi/spark/TestBeeline.scala b/Spark/src/main/scala/com/aisi/sparkSql/TestBeeline.scala similarity index 95% rename from Spark/src/main/scala/com/aisi/spark/TestBeeline.scala rename to Spark/src/main/scala/com/aisi/sparkSql/TestBeeline.scala index f8d441a..7904ba4 100644 --- a/Spark/src/main/scala/com/aisi/spark/TestBeeline.scala +++ b/Spark/src/main/scala/com/aisi/sparkSql/TestBeeline.scala @@ -1,4 +1,4 @@ -package com.aisi.spark +package com.aisi.sparkSql import org.apache.hive.jdbc.HiveDriver diff --git a/Spark/src/main/scala/com/aisi/spark/TestMovieWithPureSql.scala b/Spark/src/main/scala/com/aisi/sparkSql/TestMovieWithPureSql.scala similarity index 99% rename from Spark/src/main/scala/com/aisi/spark/TestMovieWithPureSql.scala rename to Spark/src/main/scala/com/aisi/sparkSql/TestMovieWithPureSql.scala index d235aeb..69f2cc2 100644 --- a/Spark/src/main/scala/com/aisi/spark/TestMovieWithPureSql.scala +++ b/Spark/src/main/scala/com/aisi/sparkSql/TestMovieWithPureSql.scala @@ -1,4 +1,4 @@ -package com.aisi.spark +package com.aisi.sparkSql import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{DataFrame, SQLContext} diff --git a/Spark/src/main/scala/com/aisi/spark/TestMovieWithSqlApi.scala b/Spark/src/main/scala/com/aisi/sparkSql/TestMovieWithSqlApi.scala similarity index 98% rename from Spark/src/main/scala/com/aisi/spark/TestMovieWithSqlApi.scala rename to Spark/src/main/scala/com/aisi/sparkSql/TestMovieWithSqlApi.scala index a69911d..7e61019 100644 --- a/Spark/src/main/scala/com/aisi/spark/TestMovieWithSqlApi.scala +++ b/Spark/src/main/scala/com/aisi/sparkSql/TestMovieWithSqlApi.scala @@ -1,4 +1,4 @@ -package com.aisi.spark +package com.aisi.sparkSql import org.apache.spark.rdd.RDD import org.apache.spark.sql.expressions.Window diff --git a/Spark/src/main/scala/com/aisi/spark/TestMovieWithWriteApi.scala b/Spark/src/main/scala/com/aisi/sparkSql/TestMovieWithWriteApi.scala similarity index 99% rename from Spark/src/main/scala/com/aisi/spark/TestMovieWithWriteApi.scala rename to Spark/src/main/scala/com/aisi/sparkSql/TestMovieWithWriteApi.scala index 18052df..c7aa2a7 100644 --- a/Spark/src/main/scala/com/aisi/spark/TestMovieWithWriteApi.scala +++ b/Spark/src/main/scala/com/aisi/sparkSql/TestMovieWithWriteApi.scala @@ -1,4 +1,4 @@ -package com.aisi.spark +package com.aisi.sparkSql import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} diff --git a/Spark/src/main/scala/com/aisi/spark/TestSparkPureSql.scala b/Spark/src/main/scala/com/aisi/sparkSql/TestSparkPureSql.scala similarity index 98% rename from Spark/src/main/scala/com/aisi/spark/TestSparkPureSql.scala rename to Spark/src/main/scala/com/aisi/sparkSql/TestSparkPureSql.scala index 18d25e6..95e39b4 100644 --- a/Spark/src/main/scala/com/aisi/spark/TestSparkPureSql.scala +++ b/Spark/src/main/scala/com/aisi/sparkSql/TestSparkPureSql.scala @@ -1,4 +1,4 @@ -package com.aisi.spark +package com.aisi.sparkSql import org.apache.spark.rdd.RDD import org.apache.spark.sql.expressions.Window diff --git a/Spark/src/main/scala/com/aisi/spark/TestSparkSql.scala b/Spark/src/main/scala/com/aisi/sparkSql/TestSparkSql.scala similarity index 98% rename from Spark/src/main/scala/com/aisi/spark/TestSparkSql.scala rename to Spark/src/main/scala/com/aisi/sparkSql/TestSparkSql.scala index 40fc75f..08d8ce3 100644 --- a/Spark/src/main/scala/com/aisi/spark/TestSparkSql.scala +++ b/Spark/src/main/scala/com/aisi/sparkSql/TestSparkSql.scala @@ -1,4 +1,4 @@ -package com.aisi.spark +package com.aisi.sparkSql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext} diff --git a/Spark/src/main/scala/com/aisi/spark/TestUDAF.scala b/Spark/src/main/scala/com/aisi/sparkSql/TestUDAF.scala similarity index 98% rename from Spark/src/main/scala/com/aisi/spark/TestUDAF.scala rename to Spark/src/main/scala/com/aisi/sparkSql/TestUDAF.scala index cdca619..98b6348 100644 --- a/Spark/src/main/scala/com/aisi/spark/TestUDAF.scala +++ b/Spark/src/main/scala/com/aisi/sparkSql/TestUDAF.scala @@ -1,4 +1,4 @@ -package com.aisi.spark +package com.aisi.sparkSql import org.apache.spark.sql.expressions.Aggregator diff --git a/Spark/src/main/scala/com/aisi/spark/TestUDF.scala b/Spark/src/main/scala/com/aisi/sparkSql/TestUDF.scala similarity index 97% rename from Spark/src/main/scala/com/aisi/spark/TestUDF.scala rename to Spark/src/main/scala/com/aisi/sparkSql/TestUDF.scala index e4f56ff..cabbeac 100644 --- a/Spark/src/main/scala/com/aisi/spark/TestUDF.scala +++ b/Spark/src/main/scala/com/aisi/sparkSql/TestUDF.scala @@ -1,4 +1,4 @@ -package com.aisi.spark +package com.aisi.sparkSql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} diff --git a/Spark/src/main/scala/com/aisi/spark/TestWithSparkSession.scala b/Spark/src/main/scala/com/aisi/sparkSql/TestWithSparkSession.scala similarity index 98% rename from Spark/src/main/scala/com/aisi/spark/TestWithSparkSession.scala rename to Spark/src/main/scala/com/aisi/sparkSql/TestWithSparkSession.scala index 7371dd6..b32ed4f 100644 --- a/Spark/src/main/scala/com/aisi/spark/TestWithSparkSession.scala +++ b/Spark/src/main/scala/com/aisi/sparkSql/TestWithSparkSession.scala @@ -1,4 +1,4 @@ -package com.aisi.spark +package com.aisi.sparkSql import org.apache.spark.sql.SparkSession diff --git a/Spark/src/main/scala/com/aisi/sparkSreaming/TestStreaming.scala b/Spark/src/main/scala/com/aisi/sparkSreaming/TestStreaming.scala new file mode 100644 index 0000000..5da62ba --- /dev/null +++ b/Spark/src/main/scala/com/aisi/sparkSreaming/TestStreaming.scala @@ -0,0 +1,21 @@ +package com.aisi.sparkSreaming + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} + +object TestStreaming { + def main(args: Array[String]): Unit = { + val conf = new SparkConf() + conf.setMaster("local[*]") + conf.setAppName("testStreaming") + val ssc = new StreamingContext(conf, Seconds(5)) + val ds = ssc.socketTextStream("localhost", 6666) + val ds1 = ds.flatMap(_.split(" ")) + .map((_, 1)) + .reduceByKey(_ + _) + ds1.print() + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/pom.xml b/pom.xml index a2d8a9d..de5508a 100644 --- a/pom.xml +++ b/pom.xml @@ -11,6 +11,7 @@ MapReduceTest Spark + Kafka diff --git a/路径 b/路径 new file mode 100644 index 0000000..9448f81 --- /dev/null +++ b/路径 @@ -0,0 +1,30 @@ +2024-10-24 0009 04378 沪K21792 2024-10-2406:11:57 1 20 00 +2024-10-24 0007 05341 沪K21792 2024-10-2406:56:05 43 36 03 +2024-10-24 0003 04301 沪K21792 2024-10-2406:20:17 62 49 00 +2024-10-24 0006 02290 沪K21792 2024-10-2406:13:45 5 43 02 +2024-10-24 0006 08637 沪K21792 2024-10-2406:23:29 33 21 04 +2024-10-24 0003 09518 沪K21792 2024-10-2406:39:04 26 39 00 +2024-10-24 0008 08202 沪K21792 2024-10-2406:41:44 171 34 05 +2024-10-24 0002 09586 沪K21792 2024-10-2406:22:43 69 9 01 +2024-10-24 0004 06210 沪K21792 2024-10-2406:57:41 55 16 00 +2024-10-24 0003 06017 沪K21792 2024-10-2406:22:51 114 49 01 +2024-10-24 0006 07356 沪K21792 2024-10-2406:18:28 95 21 01 +2024-10-24 0001 02689 沪K21792 2024-10-2406:05:04 92 9 03 +2024-10-24 0001 00143 沪K21792 2024-10-2406:12:22 175 44 00 +2024-10-24 0005 01871 沪K21792 2024-10-2406:55:25 184 23 03 +2024-10-24 0004 00887 沪K21792 2024-10-2406:20:25 166 28 07 +2024-10-24 0001 08940 沪K21792 2024-10-2406:59:38 10 3 04 +2024-10-24 0008 02450 沪K21792 2024-10-2406:56:13 122 6 03 +2024-10-24 0005 08706 沪K21792 2024-10-2406:24:02 128 5 01 +2024-10-24 0004 04151 沪K21792 2024-10-2406:42:51 46 24 05 +2024-10-24 0006 07990 沪K21792 2024-10-2406:30:50 88 23 01 +2024-10-24 0000 04371 沪K21792 2024-10-2406:41:15 123 14 00 +2024-10-24 0002 01350 沪K21792 2024-10-2406:16:00 134 22 00 +2024-10-24 0006 08116 沪K21792 2024-10-2406:17:44 17 34 06 +2024-10-24 0000 06980 沪K21792 2024-10-2406:14:28 104 47 03 +2024-10-24 0009 06814 沪K21792 2024-10-2406:05:47 173 29 00 +2024-10-24 0007 01068 沪K21792 2024-10-2406:06:57 169 18 00 +2024-10-24 0007 05685 沪K21792 2024-10-2406:05:57 165 20 02 +2024-10-24 0005 07818 沪K21792 2024-10-2406:08:37 157 11 00 +2024-10-24 0004 06503 沪K21792 2024-10-2406:51:26 9 22 03 +2024-10-24 0008 02831 沪K21792 2024-10-2406:46:46 25 16 03