diff --git a/.obsidian/core-plugins.json b/.obsidian/core-plugins.json index 9405bfd..436f43c 100644 --- a/.obsidian/core-plugins.json +++ b/.obsidian/core-plugins.json @@ -1,20 +1,30 @@ -[ - "file-explorer", - "global-search", - "switcher", - "graph", - "backlink", - "canvas", - "outgoing-link", - "tag-pane", - "page-preview", - "daily-notes", - "templates", - "note-composer", - "command-palette", - "editor-status", - "bookmarks", - "outline", - "word-count", - "file-recovery" -] \ No newline at end of file +{ + "file-explorer": true, + "global-search": true, + "switcher": true, + "graph": true, + "backlink": true, + "canvas": true, + "outgoing-link": true, + "tag-pane": true, + "properties": false, + "page-preview": true, + "daily-notes": true, + "templates": true, + "note-composer": true, + "command-palette": true, + "slash-command": false, + "editor-status": true, + "bookmarks": true, + "markdown-importer": false, + "zk-prefixer": false, + "random-note": false, + "outline": true, + "word-count": true, + "slides": false, + "audio-recorder": false, + "workspaces": false, + "file-recovery": true, + "publish": false, + "sync": false +} \ No newline at end of file diff --git a/.obsidian/plugins/obsidian-git/data.json b/.obsidian/plugins/obsidian-git/data.json index f2cab91..be93772 100644 --- a/.obsidian/plugins/obsidian-git/data.json +++ b/.obsidian/plugins/obsidian-git/data.json @@ -15,7 +15,7 @@ "syncMethod": "merge", "customMessageOnAutoBackup": false, "autoBackupAfterFileChange": false, - "treeStructure": true, + "treeStructure": false, "refreshSourceControl": true, "basePath": "", "differentIntervalCommitAndPush": false, diff --git a/.obsidian/workspace.json b/.obsidian/workspace.json index 7d6345c..62275aa 100644 --- a/.obsidian/workspace.json +++ b/.obsidian/workspace.json @@ -4,19 +4,21 @@ "type": "split", "children": [ { - "id": "b206ab15670086d3", + "id": "40d05df79b704c51", "type": "tabs", "children": [ { - "id": "78f6b03712f9678a", + "id": "849267c81e2cb715", "type": "leaf", "state": { "type": "markdown", "state": { - "file": "Kafka/3.Customer.md", + "file": "Kafka/4.Kafka高级原理篇.md", "mode": "source", "source": false - } + }, + "icon": "lucide-file", + "title": "4.Kafka高级原理篇" } } ] @@ -39,7 +41,9 @@ "type": "file-explorer", "state": { "sortOrder": "byCreatedTimeReverse" - } + }, + "icon": "lucide-folder-closed", + "title": "文件列表" } }, { @@ -54,7 +58,9 @@ "collapseAll": false, "extraContext": false, "sortOrder": "alphabetical" - } + }, + "icon": "lucide-search", + "title": "搜索" } }, { @@ -62,7 +68,9 @@ "type": "leaf", "state": { "type": "bookmarks", - "state": {} + "state": {}, + "icon": "lucide-bookmark", + "title": "书签" } } ] @@ -85,7 +93,7 @@ "state": { "type": "backlink", "state": { - "file": "Kafka/3.Customer.md", + "file": "Kafka/4.Kafka高级原理篇.md", "collapseAll": false, "extraContext": false, "sortOrder": "alphabetical", @@ -93,7 +101,9 @@ "searchQuery": "", "backlinkCollapsed": false, "unlinkedCollapsed": true - } + }, + "icon": "links-coming-in", + "title": "4.Kafka高级原理篇 的反向链接列表" } }, { @@ -102,10 +112,12 @@ "state": { "type": "outgoing-link", "state": { - "file": "Kafka/3.Customer.md", + "file": "Kafka/4.Kafka高级原理篇.md", "linksCollapsed": false, "unlinkedCollapsed": true - } + }, + "icon": "links-going-out", + "title": "4.Kafka高级原理篇 的出链列表" } }, { @@ -116,7 +128,9 @@ "state": { "sortOrder": "frequency", "useHierarchy": true - } + }, + "icon": "lucide-tags", + "title": "标签" } }, { @@ -125,8 +139,10 @@ "state": { "type": "outline", "state": { - "file": "Kafka/3.Customer.md" - } + "file": "Kafka/4.Kafka高级原理篇.md" + }, + "icon": "lucide-list", + "title": "4.Kafka高级原理篇 的大纲" } }, { @@ -134,7 +150,9 @@ "type": "leaf", "state": { "type": "git-view", - "state": {} + "state": {}, + "icon": "git-pull-request", + "title": "Source Control" } } ], @@ -155,13 +173,15 @@ "obsidian-git:Open Git source control": false } }, - "active": "c615f431a27fc4e9", + "active": "849267c81e2cb715", "lastOpenFiles": [ + "Kafka/assert/2.png", + "Kafka/assert/5.png", + "Kafka/assert/6.png", "Kafka/assert/4.png", - "Kafka/assert/3.png", - "Kafka/assert/2.png", - "Kafka/assert/2.png", + "Kafka/4.Kafka高级原理篇.md", "Kafka/3.Customer.md", + "Kafka/assert/3.png", "Kafka/assert/1.png", "HaiNiuHadoop搭建/2.ZooKeeper配置.md", "HaiNiuHadoop搭建/未命名.md", @@ -190,7 +210,6 @@ "HaiNiuHadoop搭建/Yarn配置.md", "HaiNiuHadoop搭建/images/Pasted image 20240910224258.png", "Hadoop/脚本.md", - "Pasted image 20240910224258.png", "HaiNiuHadoop搭建/images", "HaiNiuHadoop搭建/新建文件夹", "HaiNiuHadoop搭建", diff --git a/Kafka/3.Customer.md b/Kafka/3.Customer.md index f6d71d8..bd88b9e 100644 --- a/Kafka/3.Customer.md +++ b/Kafka/3.Customer.md @@ -23,4 +23,5 @@ kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_off # 重置消费者组对topic消费的offset kafka-consumer-groups --bootstrap-server localhost:9092 --group aisi-group-01 --reset-offsets --to-earliest --all-topics --execute ``` -![](./assert/4.png) \ No newline at end of file +![](./assert/4.png) + diff --git a/Kafka/4.Kafka高级原理篇.md b/Kafka/4.Kafka高级原理篇.md new file mode 100644 index 0000000..1bf5021 --- /dev/null +++ b/Kafka/4.Kafka高级原理篇.md @@ -0,0 +1,81 @@ +### Kafka的文件存储原理 +>kafka 是一个分布式的缓存队列,文件存储时 将topic流数据进行分区存储(存放到多个broker节点),这样起到负载均衡、分布式并行处理的作用,因此具体存储的形式是 以`topic-partition` 形式命名的。 + +- 随着单文件的大小增大,检索效率降低,因此 参数 `log.segment.bytes` 设置 单文件最大为1G,这意味着topic的一个分区也将分成多个segment,每个segment最大为1G,文件检索时效率更高。文件追加写入的策略是buffer满足4k,将数据写入*.log,并向*.index, *.timeindex写入索引 +- kafka的写是追加写,这会跳过文件的检索步骤,kafka官网指出追加写入速率为`600MB/S` ,随机写速率为 `100KB/S` ,使得使用机械硬盘的性能不逊于固态硬盘。 +- kafka的读,当读取对应topic的相应分区时,先通过segment来选择具体的读取文件,然后通过 offset向*.index 查询物理偏移量,再去*.log读取数据。此外,kafka实现零拷贝,使得数据无需在用户缓存空间进行传递拷贝,仅需要复制到内核缓冲区,然后发送到网卡,省去两次拷贝的开销 + +### 文件的清理 +>kafka时一个分布式的缓存队列,它的数据不像HDFS那样长期存储,kafka默认会将存在7天的过期数据进行删除。 +- `log.cleanup.policy`, 可选为 delete、compact,默认是delete +- `log.retention.hours` 默认 167 ,7天时间过期 +- `log.retention.bytes` 默认 -1 ,分区内的数据最大值,默认没有限制 +- `log.segment.bytes`,默认1073741824 , 分区内的 `segment` 的最大值,默认为1G +- `log.cleaner.interval.ms` ,默认为 30000 ,5 分钟,这个参数适用于所有日志清理策略,包括删除(`log.cleanup.policy=delete`)和压缩(`log.cleanup.policy=compact`)。 +如果`log.cleanup.policy = compact` ,这不会删除过期数据,而是将重复建的记录进行合并,仅仅保留最新的记录,旧的记录将被清除。因此每个键在压缩后的日志中是唯一的最新值。需要配合使用下列参数 +- `log.cleaner.enable=true` 启用日志压缩功能。 +- `log.cleaner.min.cleanable.ratio` 定义清理的最低比例。即当未清理的数据量超过指定比例`(默认 0.5)`时,Kafka 会触发压缩。 +- `log.cleaner.backoff.ms`,默认15000, 当 Kafka 完成一次日志压缩后,等待指定时间后才会执行下一次压缩。 + +### kafka的监控 +> [kafka-console-ui]([[https://drive.google.com/file/d/1-38QwSMsq3MvrpTofa8YeNLpp4KyE407/view?usp=drive_link]]) 下载 +```bash +unzip kafka-console-ui.zip -d /usr/local/ +cd /usr/local/kafka-console-ui bin/start.sh +bin/start.sh +``` +访问localhost:7766,点击最右边的运维,添加集群信息 + +### Kafka集群动态添加节点 +![](./assert/5.png) + +### Kafka的选举机制 +>`controller` 选举 + +各个`broker`节点会向`zookeper`的 注册`/controller`节点 ,如果注册了这个节点,这个`broker`就是 `controller`节点 +当 controller broker 宕机时,Zookeeper 会检测到 `/controller` 节点的会话过期,并自动删除该节点。 +其他的 broker 都会监听该节点的变化,一旦监听到`/controller`节点被删除,brokers 会竞相尝试创建新的 `/controller` 节点》 +![controller节点](./assert/6.png) + +>`partition leader` 选举 + +`topic`存在多个分区,而这每个分区又存在副本,这些副本之间有主从关系,为主从分区。主分区负责处理读写请求,从分区从 Leader 副本同步数据,确保数据冗余和容错。 +集群的`/controller`的`broker`节点有`controller`组件,他会进行主从分区的选取,形成一个`ISR`(动态副本集) + +- 问题:如果`topic`的主分区所在节点宕机,需要从`ISR`中重新选举主分区,这个选举是顺序性的,最大程度保证和宕机的leader的最新数据是同步一致的。`controller`会协调这个选举过程,一旦检测到leader没有heartbeat,认为其宕机,立即选取新leader,并将zookeeper和broker的元数据进行更新,保证`topic`分区可用 +- **`unclean.leader.election.enable`**:当设置为 `true` 时,允许非 ISR 副本成为 Leader;当设置为 `false` 时,仅允许 ISR 中的副本成为 Leader(默认值为 `false`,即不启用非 ISR 选举) + +### Kafka的数据同步 +>上述说到了topic的主分区宕机之后的leader选举,此时还需要进行分区数据的同步,选举出新的leader分区将自己的数据完整地同步到follower分区 +- 情况一:就是当从分区正在同步主分区的数据,但是此时leader宕机,从分区的数据对于宕机leader的数据来说是不完整的,那么只能从`ISR`中选择第一个,数据最新的从分区为新的主分区,然后进行数据的同步 +- 情况二:就是宕机的是从分区`broker`节点,这不会影响集群的运行,会将从分区从`ISR`中去除,如果从分区的`broker`恢复过来,从分区会向主分区同步,并重新加入`ISR` + +### Kafka的数据均衡 +>Kafka的一些`broker`因为`leader`的频繁选举导致负载较重,此时可以进行分区再平衡,**Leader 分区转移为 Follower 分区**,从而减少该节点的压力。 + +我们可以选择关闭Kafka的leader平衡管理,改为自动管理。 +`auto.leader.rebalance.enable`,默认为true,集群自动管理leader的切换选举 +`leader.imbalance.check.interval.seconds`,默认300,集群检查是否选举切换leader的间隔时间 + +现在我们关闭集群的leader平衡管理,使用手动管理 +- 直接指定`topic`的leader + ```bash + kafka-leader-election.sh --bootstrap-server nn1:9092 --topic topic_a --partition 1 --election-type preferred +``` +- 也可以手动生成均衡计划 + ```bash + # 首先创建一个topic.json 输入如下内容 填写需要均衡的分区 + {"topics":[{"topic":"topic_a"}],"version":1} + # 使用这个均衡优化命令生成优化计划 + kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --broker-list 0,1,2,3,4 --topics-to-move-json-file topic.json --generate , + # 将上述命令生成的优化计划 编辑为 reassignment.json + kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --execute --reassignment-json-file reassignment.json + # 验证是否重新分配成功 + kafka-reassign-partitions.sh --bootstrap-server nn1:9092 --reassignment-json-file reassignment.json --verify +``` + + + + + + diff --git a/Kafka/assert/4.png b/Kafka/assert/4.png index 8803a2b..aae11ee 100644 Binary files a/Kafka/assert/4.png and b/Kafka/assert/4.png differ diff --git a/Kafka/assert/5.png b/Kafka/assert/5.png new file mode 100644 index 0000000..ae46c4b Binary files /dev/null and b/Kafka/assert/5.png differ diff --git a/Kafka/assert/6.png b/Kafka/assert/6.png new file mode 100644 index 0000000..4d3ee4c Binary files /dev/null and b/Kafka/assert/6.png differ