学术能力的缺乏并不意味着你就擅长混社会,说不定还不如在学校的表现

发掘积累过程的快感

首页 » BIBLE模型 » 运维 » PHP安装Kafka实例

PHP安装Kafka实例


Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

安装 kafka(不需要安装,解包即可)

# 官方下载地址:http://kafka.apache.org/downloads
# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz
tar -xzf kafka_2.12-1.1.1.tgz
cd kafka_2.12-1.1.0

启动 kafka server

# 需先启动zookeeper
# -daemon 可启动后台守护模式
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

安装 kafka 的 php 扩展

# 先安装rdkfka库文件
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka/
./configure 
make
sudo make install

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install

vim [php]/php.ini
;添加kafka扩展
extension=rdkafka.so

启动 kafka 客户端测试

# 创建一个话题,test话题2个分区
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test".

# 显示所有话题
bin/kafka-topics.sh --list --zookeeper localhost:2181
test

# 显示话题信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0


# 启动一个生产者(输入消息)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[等待输入自己的内容 出现>输入即可]
>i am a new msg !
>i am a good msg ?

# 启动一个消费者(等待消息) 
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?

PHP 生产者代码

<?php
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$cf = new RdKafka\TopicConf();
// -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
// 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);

$option = 'qkl';
for ($i = 0; $i < 20; $i++) {
    //RD_KAFKA_PARTITION_UA自动选择分区
    //$option可选
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
}


$len = $rk->getOutQLen();
while ($len > 0) {
    $len = $rk->getOutQLen();
    var_dump($len);
    $rk->poll(50);
}

Kafka 角色介绍

producer:生产者。
consumer:消费者。
topic: 消息以 topic 为类别记录,Kafka 将消息种子(Feed)分类, 每一类的消息称之为一个主题(Topic)。
broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个 broker;消费者可以订阅一个或多个主题(topic), 并从 Broker 拉数据,从而消费这些已发布的消息。

经典模型

  1. 一个主题下的分区不能小于消费者数量,即一个主题下消费者数量不能大于分区属,大了就浪费了空闲了
  2. 一个主题下的一个分区可以同时被不同消费组其中某一个消费者消费
  3. 一个主题下的一个分区只能被同一个消费组的一个消费者消费

常用参数说明

request.required.acks

Kafka producer的ack有3中机制,初始化producer时的producerconfig可以通过配置request.required.acks不同的值来实现。

0:这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。

1:这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。

-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。 
此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。

三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。

auto.offset.reset

1. earliest:自动将偏移重置为最早的偏移量
2. latest:自动将偏移量重置为最新的偏移量(默认)
3. none:如果consumer group没有发现先前的偏移量,则向consumer抛出异常。
4. 其他的参数:向consumer抛出异常(无效参数)
互联网信息太多太杂,各互联网公司不断推送娱乐花边新闻,SNS,微博不断转移我们的注意力。但是,我们的时间和精力却是有限的。这里是互联网浩瀚的海洋中的一座宁静与美丽的小岛,供开发者歇息与静心潜心修炼。 “Bible”是圣经,有权威的书,我们的本意就是为开发者提供真正有用的的资料。 我的电子邮件 1217179982@qq.com,您在开发过程中遇到任何问题,欢迎与我联系。
Copyright © 2024. All rights reserved. 本站由 Helay 纯手工打造. 蜀ICP备15017444号