libkafka封装生产者同步接口

最近实习有需要用到zookeeper和kafka, zookeeper主要用到的是服务注册与发现, kafka主要做的是经典的生产者和消费者模型的实现, 这里主要介绍下kafka, zookeeper的话接口非常的清晰, 入门会很快, 需要注意的一点的zookeeper的节点的问题, 临时节点和持久节点可能需要我们稍微注意下. kafka我入门也只有几天, 如果有什么不正确的地方, 欢迎读者朋友们指出!

  1. 首先介绍一下几个创建生产者必须的api, 我用的是librdkafka, 这个是c语言的接口, 其他语言的接口可以去看对应的文档, 但是思路基本是大同小异的.

    1. rd_kafka_conf_t *rd_kafka_conf_new(void);这个函数功能很简单, 创建一个全局配置句柄, 如果我们用到一些自定义的配置, 就需要调用这个函数, 在创建kafka句柄的时候, 这个句柄可能需要作为参数.
    2. rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,const char *name,const char *value,char *errstr, size_t errstr_size); 这个函数很简单, 就是修改全局的配置, conf就是第一个函数的返回值, 后面的name和value是配置的名称和我们想要配置成的值.后面两个是我们用户传入的参数, 如果创建失败, errstr修改为错误信息.
    3. rd_kafka_t *rd_kafka_new(rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size); 这个函数作用是创建一个新的kafka句柄, 然后根据传入第一个参数决定创建句柄的类型, 比如是生产者还是消费者.第二个参数是我们可选的传入的全局配置句柄, 如果我们需要对默认的配置进行修改, 我们就需要用到这个, 如果我们想使用默认的参数配置的话, 就直接传入一个空指针就好, 后面两个参数是我们自己提供的错误输出缓冲区, 如果创建句柄失败的话, 那么errstr里面就会有错误信息.
    4. rd_kafka_topic_conf_t *rd_kafka_topic_conf_new(void); 这个函数不细说, 就是创建一个topic配置的句柄, 没有参数.
    5. rd_kafka_conf_res_t rd_kafka_topic_conf_set(rd_kafka_topic_conf_t *conf,const char *name,const char *value, char *errstr, size_t errstr_size);这个函数也很简单, 核心是对topic句柄的配置进行修改, name就是需要修改的配置的名字, value是我们希望设的值,后面两个和第一个函数一样, 不再介绍.
    6. void rd_kafka_conf_set_opaque(rd_kafka_conf_t *conf, void *opaque); 这个函数很重要,需要我们注意, 这个函数的功能是将opaque作为参数传给我们的回调函数. 这个是针对异步接口编程非常重要的一个思想, 对于吞吐量要求比较高的场景下, 我们很可能需要用异步的接口, 所谓异步的接口是当我们调用异步的接口发起请求时, 接口不会阻塞, 但是请求不一定成功, 如何判断请求是否成功了呢, 这个时候我们就需要注册回调函数, 在回调函数里面收到请求是否成功的信息,这个opaque我们现在设定好了, 到回调函数被调用的时候,opaque会原封不动的作为回调函数的参数传回来. 这样说可能有点抽象, 等下后面会有例子.
    7. void rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_t *conf,void (*dr_msg_cb) (rd_kafka_t *rk,const rd_kafka_message_t *rkmessage, void *opaque)); 这个是设置发送信息的回调函数, 第一个参数是配置的句柄, 在第一个函数中已经得到.后面就是我们需要注册的回调函数, 注意到回调函数的最后一个参数就是我们在上一步传入的参数opaque.
    8. int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len,const void *key, size_t keylen, void *msg_opaque); 这个函数就是生产信息了, 第一个是我们生产的消息希望发送到的topic, 第二个参数是该topic下的分区, 这个参数可以我们自己制定, 也可以使用RD_KAFKA_PARTITION_UA 参数让kafka内部帮我们选择合适的分区, msgflags是一些标志, 这里不再细说, 可以在头文件里看到注释, payload就是我们希望发送的消息, key的话这个参数我还不是特别理解, 暂时就不强行解释了,最后一个参数是用户指定的参数, 在消息传递回调函数的时候作为参数被传入.
    9. int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms);这个函数其实很有意思, 在kafka里面, 即使我们调用了kafka的异步接口, 回调函数却需要我们手动调用rd_kafka_poll这个函数, 然后回调函数才会被调用, 因此我们必须要每隔一定时间就调用一次该函数, 避免回调的消息累计, 第一个参数不用说, 第二个参数是超时时间, 这个参数也很简单, 如果是0就是直接返回, 如果是-1就永久等待, 直到有回调消息到来, 如果是正数的话, 指定等待的时间.
  2. 下面就是介绍如何实现一个同步的生产者接口了, 实际上kafka没有提供同步的生产者接口, 当我们调用rd_kafka_produce的时候, 实际上我们无法得到消息是否发送成功的信息, 因为这个接口会把我们需要发送的消息放入自己的缓存中, 然后在一定时间内不断尝试发送, 尝试一定次数或者到达一定时间后, 如果还是发送失败, 那么就才认为是发送失败, 然后消息传递的回调函数会被调用, 其实看到这里, 我们的思路就基本出来了, 我们调用rd_kafka_produce的时候增加一段阻塞等待时间, 如果这段时间内消息发送失败的话, 那么我们就让函数返回失败, 如果这段时间内, 消息发送成功了, 那么我们就让函数返回成功. 下面是实现的介绍:
    1.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    	int RdkafkaManger::init(const std::string &brokers)
    {
    /* Kafka configuration */
    rd_kafka_conf_ = rd_kafka_conf_new();
    if(rd_kafka_conf_ == NULL)
    {
    printf("%s:Failed to create rd_kafka_conf_", __FUNCTION__);
    return -1;
    }
    //rd_kafka_conf_set_dr_cb(rd_kafka_conf_, RdkafkaManger::msg_delivered);
    /* Topic configuration */
    char errstr[512] = {0};
    if (rd_kafka_conf_set(rd_kafka_conf_, "metadata.broker.list",brokers.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    printf("%s:Failed to set brokers: %s.",__FUNCTION__,errstr);
    return -1;
    }
    //注意下面这个是将this作为参数传入, 在回调函数中会被作为参数回传
    rd_kafka_conf_set_opaque(rd_kafka_conf_, static_cast<void*>(this));
    //下面是设置消息传递的回调函数
    rd_kafka_conf_set_dr_msg_cb(rd_kafka_conf_, &RdkafkaManger::message_deliver_callback);
    brokers_ = brokers;
    return 0;
    }
    //下面是我们写的回调函数
    void RdkafkaManger::message_deliver_callback(rd_kafka_t *rk, const rd_kafka_message_t * rkmessage, void *opaque)
    {
    //强制类型转换, opaque是我们之前传入的this, 现在再转回来
    RdkafkaManger *rdkafka_manger = static_cast<RdkafkaManger*>(opaque);
    if(rdkafka_manger == NULL)
    {
    printf("%s: Failed to static_cast from opaque to RdkafkaManger",__FUNCTION__);
    }
    //这个很关键, 我们之后在决定rd_kafka_produce是否阻塞需要用到这个变量
    rdkafka_manger->sync_send_err_ = rkmessage->err;
    if(rkmessage->err)
    {
    printf("%s:Message delivery failed: %s\n",__FUNCTION__, rd_kafka_err2str(rkmessage->err));
    }
    }

    2.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    	int RdkafkaManger::create_rdkafka_producer(const std::vector<std::string > &topics_list)
    {
    char errstr[512] = {0};
    rdkafka_mutex_.lock();
    if (!(rd_kafka_producer_ = rd_kafka_new(RD_KAFKA_PRODUCER, rd_kafka_conf_,
    errstr, sizeof(errstr)))) {
    printf("%s: Failed to create new producer: %s\n",__FUNCTION__,errstr);
    rdkafka_mutex_.unlock();
    return -1;
    }
    for(int i = 0;i<topics_list.size();i++)
    {
    RdKafaTopicInfo rd_kafa_topic_info;
    rd_kafa_topic_info.topic_conf = rd_kafka_topic_conf_new();
    if (rd_kafka_topic_conf_set(rd_kafa_topic_info.topic_conf, "message.timeout.ms",std::to_string(10000).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    printf("%s:Failed to set messgae.timeoout.ms: %s.",__FUNCTION__,errstr);
    return -1;
    }
    rd_kafa_topic_info.topic = rd_kafka_topic_new(rd_kafka_producer_, topics_list[i].c_str(), rd_kafa_topic_info.topic_conf);
    if(rd_kafa_topic_info.topic != nullptr)
    {
    //...
    }else{
    printf("%s: Failed to create new producer topic: %s\n",__FUNCTION__,topics_list[i].c_str());
    }
    }
    rdkafka_mutex_.unlock();
    return 0;
    }

    这里最关键的部分是rd_kafka_topic_conf_set(rd_kafa_topic_info.topic_conf, “message.timeout.ms”,std::to_string(10000).c_str(), errstr, sizeof(errstr)), 这一步的作用是设置超时时间, 前面我们说过, rd_kafka_produce将需要发送的消息放入自己的缓存中后, 需要尝试一定次数或者到达一定时间才会停止尝试, 并且报告发送消息失败的消息, 这一步就是设置超时时间, 这个时间你可以自己设定, 不过注意单位是毫秒, 我这里设定的是10000, 也就是10s的意思.
    3.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    	int RdkafkaManger::produce_message(const std::string &topic,const std::string &message)
    {
    char errstr[512] = {0};
    int partition = 0;//假设默认分组是0
    rdkafka_mutex_.lock();
    //下面这个RD_KAFKA_PRODUCE_ERROR_INIT_VALUE只是一个宏定义, 随便定义一个负数就ok(最好是一个奇怪一点的负数比如-1234这种, 避免和kafka内置的一些错误相同), 如果发送消息失败或者发送消息成功, sync_send_err_会在消息传递回调函数中被修改
    sync_send_err_= (rd_kafka_resp_err_t)RD_KAFKA_PRODUCE_ERROR_INIT_VALUE;
    if (rd_kafka_produce(rdkafka_topic, partition,
    RD_KAFKA_MSG_F_COPY,
    /* Payload and length */
    (void *)message.c_str(), message.length(),
    /* Optional key and its length */
    NULL, 0,
    /* Message opaque, provided in
    * delivery report callback as
    * msg_opaque. */
    NULL) == -1) {
    fprintf(stderr,\
    "%% Failed to produce to topic %s "\
    "partition %i: %s\n",\
    rd_kafka_topic_name(rdkafka_topic), partition,rd_kafka_err2name(rd_kafka_last_error()));
    /* Poll to handle delivery reports */
    rd_kafka_poll(rd_kafka_producer_, 0);
    rdkafka_mutex_.unlock();
    return -1;
    }
    else
    {
    int current_retry_time = 0;
    while(RD_KAFKA_PRODUCE_ERROR_INIT_VALUE == sync_send_err_ && current_retry_time++ < (10+1))
    {
    rd_kafka_poll(rd_kafka_producer_, 1000);
    }
    if(RD_KAFKA_RESP_ERR_NO_ERROR != sync_send_err_)
    {
    LOG_ERROR("%s: Failed to send msg",__FUNCTION__);
    rdkafka_mutex_.unlock();
    return -1;
    }
    }
    fprintf(stderr, "%% Sent %zd bytes to topic %s partition %i\n",\
    message.length(), rd_kafka_topic_name(rdkafka_topic), partition);
    /* Poll to handle delivery reports */
    rd_kafka_poll(rd_kafka_producer_, 0);
    rdkafka_mutex_.unlock();
    return 0;
    }

    最重要的部分就是while循环的部分, 其实思路很简单, 我们维护一个变量current_retry_time, 代表尝试的次数, 然后在每次尝试, 调用rd_kafka_poll(rd_kafka_producer_, 1000);, 阻塞等待1000毫秒, 如果sync_send_err_的值被改了(意味着消息传递回调函数被调用)或者current_retry_time到达上限(说明等待时间超过上限, 已经超时), 我们就跳出循环, 检测sync_send_err_的值, 如果值不是RD_KAFKA_RESP_ERR_NO_ERROR的话, 说明就发送失败了, 我们直接返回失败就好, 否则就认为是发送成了.

  3. 介绍到这里, 基本就介绍完了, kafka这个项目开发了不少年, 确实也很复杂, 目前我也在处于继续学习的过程中, 这里写下这篇文章记录一下自己小小的总结, 未来需要继续努力鸭!