最近实习有需要用到zookeeper和kafka, zookeeper主要用到的是服务注册与发现, kafka主要做的是经典的生产者和消费者模型的实现, 这里主要介绍下kafka, zookeeper的话接口非常的清晰, 入门会很快, 需要注意的一点的zookeeper的节点的问题, 临时节点和持久节点可能需要我们稍微注意下. kafka我入门也只有几天, 如果有什么不正确的地方, 欢迎读者朋友们指出!
首先介绍一下几个创建生产者必须的api, 我用的是librdkafka, 这个是c语言的接口, 其他语言的接口可以去看对应的文档, 但是思路基本是大同小异的.
- rd_kafka_conf_t *rd_kafka_conf_new(void);这个函数功能很简单, 创建一个全局配置句柄, 如果我们用到一些自定义的配置, 就需要调用这个函数, 在创建kafka句柄的时候, 这个句柄可能需要作为参数.
- rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,const char *name,const char *value,char *errstr, size_t errstr_size); 这个函数很简单, 就是修改全局的配置, conf就是第一个函数的返回值, 后面的name和value是配置的名称和我们想要配置成的值.后面两个是我们用户传入的参数, 如果创建失败, errstr修改为错误信息.
- rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size); 这个函数作用是创建一个新的kafka句柄, 然后根据传入第一个参数决定创建句柄的类型, 比如是生产者还是消费者.第二个参数是我们可选的传入的全局配置句柄, 如果我们需要对默认的配置进行修改, 我们就需要用到这个, 如果我们想使用默认的参数配置的话, 就直接传入一个空指针就好, 后面两个参数是我们自己提供的错误输出缓冲区, 如果创建句柄失败的话, 那么errstr里面就会有错误信息.
- rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void); 这个函数不细说, 就是创建一个topic配置的句柄, 没有参数.
- rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf,const char *name,const char *value, char *errstr, size_t errstr_size);这个函数也很简单, 核心是对topic句柄的配置进行修改, name就是需要修改的配置的名字, value是我们希望设的值,后面两个和第一个函数一样, 不再介绍.
- void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque); 这个函数很重要,需要我们注意, 这个函数的功能是将opaque作为参数传给我们的回调函数. 这个是针对异步接口编程非常重要的一个思想, 对于吞吐量要求比较高的场景下, 我们很可能需要用异步的接口, 所谓异步的接口是当我们调用异步的接口发起请求时, 接口不会阻塞, 但是请求不一定成功, 如何判断请求是否成功了呢, 这个时候我们就需要注册回调函数, 在回调函数里面收到请求是否成功的信息,这个opaque我们现在设定好了, 到回调函数被调用的时候,opaque会原封不动的作为回调函数的参数传回来. 这样说可能有点抽象, 等下后面会有例子.
- void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf,void (*dr_msg_cb) (rd_kafka_t *rk,const rd_kafka_message_t *rkmessage, void *opaque)); 这个是设置发送信息的回调函数, 第一个参数是配置的句柄, 在第一个函数中已经得到.后面就是我们需要注册的回调函数, 注意到回调函数的最后一个参数就是我们在上一步传入的参数opaque.
- int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len,const void *key, size_t keylen, void *msg_opaque); 这个函数就是生产信息了, 第一个是我们生产的消息希望发送到的topic, 第二个参数是该topic下的分区, 这个参数可以我们自己制定, 也可以使用RD_KAFKA_PARTITION_UA 参数让kafka内部帮我们选择合适的分区, msgflags是一些标志, 这里不再细说, 可以在头文件里看到注释, payload就是我们希望发送的消息, key的话这个参数我还不是特别理解, 暂时就不强行解释了,最后一个参数是用户指定的参数, 在消息传递回调函数的时候作为参数被传入.
- int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms);这个函数其实很有意思, 在kafka里面, 即使我们调用了kafka的异步接口, 回调函数却需要我们手动调用rd_kafka_poll这个函数, 然后回调函数才会被调用, 因此我们必须要每隔一定时间就调用一次该函数, 避免回调的消息累计, 第一个参数不用说, 第二个参数是超时时间, 这个参数也很简单, 如果是0就是直接返回, 如果是-1就永久等待, 直到有回调消息到来, 如果是正数的话, 指定等待的时间.
下面就是介绍如何实现一个同步的生产者接口了, 实际上kafka没有提供同步的生产者接口, 当我们调用rd_kafka_produce的时候, 实际上我们无法得到消息是否发送成功的信息, 因为这个接口会把我们需要发送的消息放入自己的缓存中, 然后在一定时间内不断尝试发送, 尝试一定次数或者到达一定时间后, 如果还是发送失败, 那么就才认为是发送失败, 然后消息传递的回调函数会被调用, 其实看到这里, 我们的思路就基本出来了, 我们调用rd_kafka_produce的时候增加一段阻塞等待时间, 如果这段时间内消息发送失败的话, 那么我们就让函数返回失败, 如果这段时间内, 消息发送成功了, 那么我们就让函数返回成功. 下面是实现的介绍:
1.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39int RdkafkaManger::init(const std::string &brokers)
{
/* Kafka configuration */
rd_kafka_conf_ = rd_kafka_conf_new();
if(rd_kafka_conf_ == NULL)
{
printf("%s:Failed to create rd_kafka_conf_", __FUNCTION__);
return -1;
}
//rd_kafka_conf_set_dr_cb(rd_kafka_conf_, RdkafkaManger::msg_delivered);
/* Topic configuration */
char errstr[512] = {0};
if (rd_kafka_conf_set(rd_kafka_conf_, "metadata.broker.list",brokers.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
printf("%s:Failed to set brokers: %s.",__FUNCTION__,errstr);
return -1;
}
//注意下面这个是将this作为参数传入, 在回调函数中会被作为参数回传
rd_kafka_conf_set_opaque(rd_kafka_conf_, static_cast<void*>(this));
//下面是设置消息传递的回调函数
rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_, &RdkafkaManger::message_deliver_callback);
brokers_ = brokers;
return 0;
}
//下面是我们写的回调函数
void RdkafkaManger::message_deliver_callback(rd_kafka_t *rk, const rd_kafka_message_t * rkmessage, void *opaque)
{
//强制类型转换, opaque是我们之前传入的this, 现在再转回来
RdkafkaManger *rdkafka_manger = static_cast<RdkafkaManger*>(opaque);
if(rdkafka_manger == NULL)
{
printf("%s: Failed to static_cast from opaque to RdkafkaManger",__FUNCTION__);
}
//这个很关键, 我们之后在决定rd_kafka_produce是否阻塞需要用到这个变量
rdkafka_manger->sync_send_err_ = rkmessage->err;
if(rkmessage->err)
{
printf("%s:Message delivery failed: %s\n",__FUNCTION__, rd_kafka_err2str(rkmessage->err));
}
}2.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29int RdkafkaManger::create_rdkafka_producer(const std::vector<std::string > &topics_list)
{
char errstr[512] = {0};
rdkafka_mutex_.lock();
if (!(rd_kafka_producer_ = rd_kafka_new(RD_KAFKA_PRODUCER, rd_kafka_conf_,
errstr, sizeof(errstr)))) {
printf("%s: Failed to create new producer: %s\n",__FUNCTION__,errstr);
rdkafka_mutex_.unlock();
return -1;
}
for(int i = 0;i<topics_list.size();i++)
{
RdKafaTopicInfo rd_kafa_topic_info;
rd_kafa_topic_info.topic_conf = rd_kafka_topic_conf_new();
if (rd_kafka_topic_conf_set(rd_kafa_topic_info.topic_conf, "message.timeout.ms",std::to_string(10000).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
printf("%s:Failed to set messgae.timeoout.ms: %s.",__FUNCTION__,errstr);
return -1;
}
rd_kafa_topic_info.topic = rd_kafka_topic_new(rd_kafka_producer_, topics_list[i].c_str(), rd_kafa_topic_info.topic_conf);
if(rd_kafa_topic_info.topic != nullptr)
{
//...
}else{
printf("%s: Failed to create new producer topic: %s\n",__FUNCTION__,topics_list[i].c_str());
}
}
rdkafka_mutex_.unlock();
return 0;
}这里最关键的部分是rd_kafka_topic_conf_set(rd_kafa_topic_info.topic_conf, “message.timeout.ms”,std::to_string(10000).c_str(), errstr, sizeof(errstr)), 这一步的作用是设置超时时间, 前面我们说过, rd_kafka_produce将需要发送的消息放入自己的缓存中后, 需要尝试一定次数或者到达一定时间才会停止尝试, 并且报告发送消息失败的消息, 这一步就是设置超时时间, 这个时间你可以自己设定, 不过注意单位是毫秒, 我这里设定的是10000, 也就是10s的意思.
3.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47int RdkafkaManger::produce_message(const std::string &topic,const std::string &message)
{
char errstr[512] = {0};
int partition = 0;//假设默认分组是0
rdkafka_mutex_.lock();
//下面这个RD_KAFKA_PRODUCE_ERROR_INIT_VALUE只是一个宏定义, 随便定义一个负数就ok(最好是一个奇怪一点的负数比如-1234这种, 避免和kafka内置的一些错误相同), 如果发送消息失败或者发送消息成功, sync_send_err_会在消息传递回调函数中被修改
sync_send_err_= (rd_kafka_resp_err_t)RD_KAFKA_PRODUCE_ERROR_INIT_VALUE;
if (rd_kafka_produce(rdkafka_topic, partition,
RD_KAFKA_MSG_F_COPY,
/* Payload and length */
(void *)message.c_str(), message.length(),
/* Optional key and its length */
NULL, 0,
/* Message opaque, provided in
* delivery report callback as
* msg_opaque. */
NULL) == -1) {
fprintf(stderr,\
"%% Failed to produce to topic %s "\
"partition %i: %s\n",\
rd_kafka_topic_name(rdkafka_topic), partition,rd_kafka_err2name(rd_kafka_last_error()));
/* Poll to handle delivery reports */
rd_kafka_poll(rd_kafka_producer_, 0);
rdkafka_mutex_.unlock();
return -1;
}
else
{
int current_retry_time = 0;
while(RD_KAFKA_PRODUCE_ERROR_INIT_VALUE == sync_send_err_ && current_retry_time++ < (10+1))
{
rd_kafka_poll(rd_kafka_producer_, 1000);
}
if(RD_KAFKA_RESP_ERR_NO_ERROR != sync_send_err_)
{
LOG_ERROR("%s: Failed to send msg",__FUNCTION__);
rdkafka_mutex_.unlock();
return -1;
}
}
fprintf(stderr, "%% Sent %zd bytes to topic %s partition %i\n",\
message.length(), rd_kafka_topic_name(rdkafka_topic), partition);
/* Poll to handle delivery reports */
rd_kafka_poll(rd_kafka_producer_, 0);
rdkafka_mutex_.unlock();
return 0;
}最重要的部分就是while循环的部分, 其实思路很简单, 我们维护一个变量current_retry_time, 代表尝试的次数, 然后在每次尝试, 调用rd_kafka_poll(rd_kafka_producer_, 1000);, 阻塞等待1000毫秒, 如果sync_send_err_的值被改了(意味着消息传递回调函数被调用)或者current_retry_time到达上限(说明等待时间超过上限, 已经超时), 我们就跳出循环, 检测sync_send_err_的值, 如果值不是RD_KAFKA_RESP_ERR_NO_ERROR的话, 说明就发送失败了, 我们直接返回失败就好, 否则就认为是发送成了.
介绍到这里, 基本就介绍完了, kafka这个项目开发了不少年, 确实也很复杂, 目前我也在处于继续学习的过程中, 这里写下这篇文章记录一下自己小小的总结, 未来需要继续努力鸭!