在本文中,我们将学习如何在 CentOS/RHEL 8 服务器上安装Apache Kafka。
在 Apache Kafka 服务器中创建一个topic
在 Apache Kafka 服务器中创建一个主题。
# /opt/kafka/bin/kafka-topics.sh --create --topic onitroad --bootstrap-server localhost:9092 Created topic onitroad.
要查看该主题的详细信息,我们可以在 Linux 命令行中使用运行以下脚本。
# /opt/kafka/bin/kafka-topics.sh --describe --topic onitroad --bootstrap-server localhost:9092 Topic: onitroad PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: onitroad Partition: 0 Leader: 0 Replicas: 0 Isr: 0
在主题中添加一些示例事件。
# /opt/kafka/bin/kafka-console-producer.sh --topic onitroad --bootstrap-server localhost:9092 >This is the First event. >This is the Second event. >This is the Third event. >^C#
要查看插入到主题中的所有事件,我们可以在 Linux 命令行中执行以下脚本。
# /opt/kafka/bin/kafka-console-consumer.sh --topic onitroad --from-beginning --bootstrap-server localhost:9092 This is the First event. This is the Second event. This is the Third event. ^CProcessed a total of 3 messages
Apache Kafka 成功安装在 CentOS/RHEL 8 上,引导服务器运行在 9092 端口。
什么是 Apache Kafka
Apache Kafka 是一个开源的流处理和消息代理软件。
Kafka 是一个开源的流处理软件平台,由 Apache Software Foundation 开发,用 Scala 和 Java 编写。
该项目旨在提供一个统一的、高吞吐量、低延迟的平台来处理实时数据馈送。
Kafka 可以通过 Kafka Connect 连接到外部系统(用于数据导入/导出),并提供 Kafka Streams ,一个 Java 流处理库。
Kafka 使用基于二进制 TCP 的协议,该协议针对效率进行了优化,并依赖于“消息集”抽象,将消息自然地组合在一起以减少网络往返的开销。
在 Linux 服务器上安装 Java 开发工具包 (JDK)
Apache Kafka 使用 Java 编程语言构建,因此需要 Java Development Kit 8 或者更高版本。
JDK 11 在标准 yum 存储库中可用,因此,我们可以通过执行以下 Linux 命令来安装 JDK 11.
# dnf install -y java-11-openjdk
在 CentOS/RHEL 8 上安装 Apache Kafka 服务器
Kafka 服务器是在 Apache License 2.0 下分发的,因此我们可以从他们的官方网站下载该软件。
从此网页复制所需版本的 Apache Kafka 软件的 URL。
使用 wget 命令复制的 URL 直接从 Linux 命令行下载 Apache Kafka 软件。
# cd /tmp # wget https://downloads.apache.org/kafka/2.6.0/kafka_2.13-2.6.0.tgz
使用 tar 命令解压下载的 tar包。
# tar xzf kafka_2.13-2.6.0.tgz
现在,将提取的文件移动到 /opt/kafka 目录。
# mv kafka_2.13-2.6.0 /opt/kafka
当前版本的 Kafka 服务器需要 Zookeeper 服务进行分布式配置。
但是,在 Kafka 文档中提到
“很快,Apache Kafka 将不再需要 ZooKeeper.”
但是现在,我们必须在 Kafka 服务器之前安装 Apache Zookeeper 服务。
Zookeeper 二进制脚本随 Kafka 安装文件一起提供。
我们可以使用它来配置 ZooKeeper 服务器。
为 Apache Zookeeper 创建一个 systemd 服务单元。
# cd /opt/kafka/ # vi /etc/systemd/system/zookeeper.service
在此文件中添加以下指令。
[Unit] Description=Apache Zookeeper server Documentation=http://zookeeper.apache.org Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple ExecStart=/usr/bin/bash /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties ExecStop=/usr/bin/bash /opt/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
同样,为 Kafka 服务器创建一个 systemd 服务单元。
# vi /etc/systemd/system/kafka.service
其中添加以下指令。
[Unit] Description=Apache Kafka Server Documentation=http://kafka.apache.org/documentation.html Requires=zookeeper.service [Service] Type=simple Environment="JAVA_HOME=/usr/lib/jvm/jre-11-openjdk" ExecStart=/usr/bin/bash /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties ExecStop=/usr/bin/bash /opt/kafka/bin/kafka-server-stop.sh [Install] WantedBy=multi-user.target
启用并启动 Apache Zookeeper 和 Kafka 服务。
# systemctl daemon-reload # systemctl enable --now zookeeper.service Created symlink /etc/systemd/system/multi-user.target.wants/zookeeper.service -> /etc/systemd/system/zookeeper.service. # systemctl enable --now kafka.service Created symlink /etc/systemd/system/multi-user.target.wants/kafka.service -> /etc/systemd/system/kafka.service.
验证 Apache Kafka 服务的状态。
# systemctl status kafka.service