update kafka consumer-01

This commit is contained in:
2024-10-27 10:55:40 +08:00
parent 4726888819
commit eece6503c9
45 changed files with 1444 additions and 28 deletions

64
Kafka/pom.xml Normal file
View File

@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.aisi</groupId>
<artifactId>HaiNiuProjects</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>Kafka</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aisi.producer.ProducerWithObjectSerializer</mainClass> <!-- 指定你的主类 -->
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,32 @@
package com.aisi.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.protocol.types.Field;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class Consumer1 {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "aisi-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
List<String> topics = Arrays.asList("topic_a", "topic_b"); // 将每个主题作为独立的字符串
consumer.subscribe(topics);
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value());
}
}
}
}

View File

@@ -0,0 +1,49 @@
package com.aisi.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
public class ConsumerWithCooperativeStickyAssignor {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "aisi-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//设定分区分配策略为 cooperative-sticky
properties.put("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());
//设定consumer断开超时时间最小不能小于6s
properties.put("session.timeout.ms", 6000);
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
List<String> topics = Arrays.asList("topic_d","topic_e");
consumer.subscribe(topics);
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
System.out.println("分区移除:::::"+topicPartition.topic() + ":" + topicPartition.partition());
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
System.out.println("分区添加:::::"+topicPartition.topic() + ":" + topicPartition.partition());
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value());
}
}
}
}

View File

@@ -0,0 +1,40 @@
package com.aisi.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class ConsumerWithDeserializer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "aisi-group");
properties.put("key.deserializer", MyStringDeserializer.class.getName());
properties.put("value.deserializer", MyStringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
List<String> topics = Arrays.asList("topic_a", "topic_b"); // 将每个主题作为独立的字符串
consumer.subscribe(topics);
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value());
}
}
}
public static class MyStringDeserializer implements Deserializer<String> {
@Override
public String deserialize(String topic, byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
}
}

View File

@@ -0,0 +1,49 @@
package com.aisi.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
public class ConsumerWithRangeAssignor {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "aisi-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//设定分区分配策略为range
properties.put("partition.assignment.strategy", RangeAssignor.class.getName());
//设定consumer断开超时时间最小不能小于6s
properties.put("session.timeout.ms", 6000);
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
List<String> topics = Arrays.asList("topic_d");
consumer.subscribe(topics);
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
System.out.println("分区移除:::::"+topicPartition.topic() + ":" + topicPartition.partition());
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
System.out.println("分区添加:::::"+topicPartition.topic() + ":" + topicPartition.partition());
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value());
}
}
}
}

View File

@@ -0,0 +1,49 @@
package com.aisi.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
public class ConsumerWithRoundRobinAssignor {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "aisi-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//设定分区分配策略为 round-robin
properties.put("partition.assignment.strategy", RoundRobinAssignor.class.getName());
//设定consumer断开超时时间最小不能小于6s
properties.put("session.timeout.ms", 6000);
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
List<String> topics = Arrays.asList("topic_d","topic_e");
consumer.subscribe(topics);
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
System.out.println("分区移除:::::"+topicPartition.topic() + ":" + topicPartition.partition());
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
System.out.println("分区添加:::::"+topicPartition.topic() + ":" + topicPartition.partition());
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value());
}
}
}
}

View File

@@ -0,0 +1,49 @@
package com.aisi.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
public class ConsumerWithStickyAssignor {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "aisi-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//设定分区分配策略为 sticky
properties.put("partition.assignment.strategy", StickyAssignor.class.getName());
//设定consumer断开超时时间最小不能小于6s
properties.put("session.timeout.ms", 6000);
KafkaConsumer<String, String> consumer = new KafkaConsumer(properties);
List<String> topics = Arrays.asList("topic_d","topic_e");
consumer.subscribe(topics);
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
System.out.println("分区移除:::::"+topicPartition.topic() + ":" + topicPartition.partition());
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
System.out.println("分区添加:::::"+topicPartition.topic() + ":" + topicPartition.partition());
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + "->" + record.partition() + "->" + record.offset() + "->" + record.key() + "->" + record.value());
}
}
}
}

View File

@@ -0,0 +1,78 @@
package com.aisi.producer;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
public class Producer1 {
private static final Logger log = LoggerFactory.getLogger(Producer1.class);
public static void main(String[] args) throws Exception {
KafkaProducer<String, String> kafkaProducer = getStringStringKafkaProducer();
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic_b", "message-value: " + i);
kafkaProducer.send(record, (metadata, exception) -> {
if (exception == null) {
// 消息发送成功
System.out.println("Message sent successfully to topic: " + metadata.topic() +
" partition: " + metadata.partition() +
" offset: " + metadata.offset());
} else {
// 消息发送失败
exception.printStackTrace();
}
});
}
// 确保所有消息都被发送
kafkaProducer.flush();
// 关闭生产者
kafkaProducer.close();
}
private static KafkaProducer<String, String> getStringStringKafkaProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "nn1:9092");
properties.put("batch.size", 16384);
properties.put("acks", "all"); // 建议将-1改为all
properties.put("retries", 3);
properties.put("linger.ms", 50);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyInterceptor.class.getName());
return new KafkaProducer<>(properties);
}
public static class MyInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
// 对消息进行处理,例如添加时间戳等
return new ProducerRecord<>(producerRecord.topic(), System.currentTimeMillis() + "-" + producerRecord.value());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.error("Failed to send message with error: {}", e.getMessage());
}else {
log.info("Successfully sent message to topic {}, partition {}, offset {}",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
}

View File

@@ -0,0 +1,124 @@
package com.aisi.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
public class ProducerWithObjectSerializer {
private static final Logger log = LoggerFactory.getLogger(ProducerWithObjectSerializer.class);
public static void main(String[] args) throws Exception {
KafkaProducer<String, Student> kafkaProducer = getStringStringKafkaProducer();
Student stu1 = new Student();
stu1.setName("张三");
stu1.setAge(18);
stu1.setScore(100);
Student stu2 = new Student();
stu2.setName("李四");
stu2.setAge(24);
stu2.setScore(100);
kafkaProducer.send(new ProducerRecord<>("student", 1,"blog-1", stu1) , (metadata, exception) -> {
if (exception != null) {
log.error("发送消息失败", exception);
}else {
log.info("发送消息成功, offset: " + metadata.offset());
log.info("发送消息成功, topic: " + metadata.topic());
log.info("发送消息成功, partition: " + metadata.partition());
log.info("发送消息成功, timestamp: " + metadata.timestamp());
}
});
kafkaProducer.send(new ProducerRecord<>("student", 2,"blog-2", stu2), (metadata, exception) -> {
if (exception != null) {
log.error("发送消息失败", exception);
}else {
log.info("发送消息成功, offset: " + metadata.offset());
log.info("发送消息成功, topic: " + metadata.topic());
log.info("发送消息成功, partition: " + metadata.partition());
log.info("发送消息成功, timestamp: " + metadata.timestamp());
}
});
// 确保所有消息都被发送
kafkaProducer.flush();
// 关闭生产者
kafkaProducer.close();
}
private static KafkaProducer<String, Student> getStringStringKafkaProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "nn1:9092");
properties.put("batch.size", 16384);
properties.put("acks", "all");
properties.put("retries", 3);
properties.put("linger.ms", 50);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", StudentSerializer.class.getName());
return new KafkaProducer<>(properties);
}
public static class Student implements Serializable {
private String name;
private int score;
private int age;
public Student(String name, int score, int age) {
this.name = name;
this.score = score;
this.age = age;
}
public Student() {
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", score=" + score +
", age=" + age +
'}';
}
}
public static class StudentSerializer implements Serializer<Student> {
@Override
public byte[] serialize(String s, Student student) {
return student.toString().getBytes(StandardCharsets.UTF_8);
}
}
}

View File

@@ -0,0 +1,59 @@
package com.aisi.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
public class ProducerWithStringSerializer {
private static final Logger log = LoggerFactory.getLogger(ProducerWithStringSerializer.class);
public static void main(String[] args) throws Exception {
KafkaProducer<String, String> kafkaProducer = getStringStringKafkaProducer();
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("topic_b","key_:"+i,"message-value: " + i);
kafkaProducer.send(record, (metadata, exception) -> {
if (exception == null) {
// 消息发送成功
System.out.println("Message sent successfully to topic: " + metadata.topic() +
" partition: " + metadata.partition() +
" offset: " + metadata.offset());
} else {
// 消息发送失败
exception.printStackTrace();
}
});
}
// 确保所有消息都被发送
kafkaProducer.flush();
// 关闭生产者
kafkaProducer.close();
}
private static KafkaProducer<String, String> getStringStringKafkaProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "nn1:9092");
properties.put("batch.size", 16384);
properties.put("acks", "all");
properties.put("retries", 3);
properties.put("linger.ms", 50);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", MyStringSerializer.class.getName());
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<>(properties);
}
public static class MyStringSerializer implements Serializer<String> {
@Override
public byte[] serialize(String topic, String data) {
// log.warn("serialize method called with parameters: {}, {}", s, s2);
return data.getBytes(StandardCharsets.UTF_8);
}
}
}

View File

@@ -0,0 +1,53 @@
package com.aisi.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.Properties;
public class ProducerWithUDPartitioner {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "nn1:9092");
properties.put("batch.size", 16384);
properties.put("acks", "all"); // 建议将-1改为all
properties.put("retries", 3);
properties.put("linger.ms", 50);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("topic1", "key" + i, "value" + i));
}
kafkaProducer.close();
}
public static class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return 0;
}
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
}

View File

@@ -0,0 +1,5 @@
log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n