Kafka客户端使用 - Go语言中文社区

Kafka客户端使用


Consumer客户端

1 消费者模型

在开始编码之前, 我们先回顾一下一些基本概念。 在Kafka中, 每个topic被分成一组称为PartitionslogsProducer向这些logs的末尾写入消息, Consumer则自己按自己的节奏读取log。 Kafka通过在一个Consumer Group中分配Partitions来伸缩topic的消费, Consumer Group是一组有同样Group id的consumers。 下图表示的是一个拥有3个分区的topic, 以及一个有两个成员的Consumer Group。 每一个分区都只会分配给组内的一个成员。
这里写图片描述

旧的Consumer依赖于Zookeeper来进行Group管理, 新的消费者使用kafka内部的Group coordination协议。 对于每个Group, brokers中的一个被选为Group coordinator。 这个coordinator负责管理Group的状态, 它的主要工作是当新的成员加入, 或者原本的成员离开, 或者topic的元数据发生了改变时, 协调Partition分配。 重新分配Partition这个过程被称为rebalancing the Group。

当Group第一次初始化时, Consumer通常从分区的最开始或者最末尾开始读取。 每个Partition log中的消息都是按顺序读取。 随着Consumer的读取, 它将会commit它所成功处理的message的offset。 例如下图中, Consumer的位置位于offset 6, 上一次commit的offset为1。
这里写图片描述

当一个Partition被重分配给组内的另一个Consumer, 其最初位置会被设置成上一次commit的offset。 如果上例中的Consumer突然奔溃了, 那么接管这个Partition的另外一个组成员会从offset 1继续消费。 在这种情况下,它会重新处理上一次提交位置到消费者奔溃的位置6的消息。

这张图还有两个log中比较重要的位置。 log end offset是最后一条写入log的message的offset, 而high watermark是最后一条message成功复制到所有的log replica的message offset。 从Consumer的角度来看, 所知道的最主要的事情就是其最多能够读到high watermark的message。 这能够阻止Consumer读取到未被复制到其他broker的, 可能会丢失的message。


2 配置和初始化

开始使用Consumer之前, 需要将kafka-clients依赖加入到你的项目中。

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.9.0.0-cp1</version>
</dependency>12345

Consumer使用一个Properties file来进行构建。 以下提供了使用Consumer Group所需要的最小配置。

Properties props = new Properties();
props.put("bootstrap.servers""localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);123456

和之前的Consumer, Producer一样, 我们需要为Consumer提供一个broker的初始化列表, 以用于发现集群中其他的broker。 这个配置并不需要提供所有的broker, client会从给定的配置中发现所有的当前存活的broker。 这里我们假设broker运行在本地, 同时Consumer还需要知道如何反序列化message key和value。 最后, 为了加入Consumer Group, 我们需要配置一个Group id。 随着教程的继续, 我们会介绍更多的配置。


3 Topic订阅

为了开始消费, 你必须指定你的应用需要读取的topics, 在下面的例子中, 我们订阅了topic foo和bar。

Consumer.subscribe(Arrays.asList(“foo”, “bar”));

订阅之后, Consumer会与组内其他Consumer协调来获取其分区分配。 这都是在你开始消费数据时自动处理的。 后面我们会说明如何使用assign api来手动分配分区, 但是要注意的是, 不能同时混合使用自动和手动的分配。

subscribe方法并不是递增的: 你必须包含所有你想要消费的topics。 你可以在任何时刻改变你想消费的topic集, 当你调用subscribe时, 之前订阅的topics列表会被新的列表取代。


3 基本的Poll事件循环

3.1 Consumer使用了NIO技术

Consumer需要能够并行获取数据, 在众多brokers中获取多个topic的多个分区消息。 为了实现这个目的, Consumer API的设计成风格类似于unix中的poll或者select调用: 一旦topic注册了, 所有的coordination, rebalancing和data fetch都是由一个处于循环中的poll调用来驱动。 这样就提供了一个能在一个线程里处理所有的IO的简单有效的实现。

3.2 启动Poll Loop

在你订阅一个topic之后, 你需要启动这个event loop以获得Partition分配和开始获取数据。 听起来很复杂, 但是所有你需要做的就只有调用poll, 然后Consumer客户端本身负责处理其他的工作。 每一次poll调用都会返回从所分配的Partition获取的一组消息(也许是空的)。 下面的例子展示了一个基本的poll循环, 打印获取的records的offset和value。

try {
  while (running) {
    ConsumerRecords<String, String> records = Consumer.poll(1000); // 超时时间1000毫秒
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
  }
} finally {
  Consumer.close();
}

poll API根据当前的位置返回records,当Group第一次创建时, 消费开始的位置会被根据reset policy(一般设置成从每个分区的最早的offset或者最新的offset开始)来设置。 只要Consumer开始提交offset, 那么之后的rebalance都会重置消费开始位置到最新的被提交的offset。 传递给poll的参数是Consumer在当前位置等待有record返回时需要被阻塞的时间。 一旦有record时, Consumer会立即返回, 如果没有record, 它将会等待直到超时。

Consumer被设计成只在自己的线程中运行, 在没有外部同步措施的情况下, 在多线程中使用时不安全的, 同时也不建议这样做。 在这个例子中, 我们使用了一个flag来使得当应用停止时能够从循环中跳出。 当这个flag被另一个线程设置成false时, pool返回时循环会跳出, 无论返回什么record, 处理过程都会结束。这个的例子使用了一个相对较少的超时时间, 以使得关闭Consumer并不会有太大的延时。

你还可以设置一个较长的timeout, 并且使用wakeup API来使得其从循环中跳出。

try {
  while (true) {
    ConsumerRecords<String, String> records = Consumer.poll(Long.MAX_VALUE);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + “: ” + record.value());
  }
} catch (WakeupException e) {
  // ignore for shutdown
} finally {
  Consumer.close();
}

我们将timeout改为了Long.MAX_VALUE, 意味着Consumer会一直阻塞直到有record返回。 和前面设置flag不同, 用于触发shutdown的线程可以调用Consumer.wakeup()来中断一次poll, 使其抛出WakeupExection。 这个API是线程安全的。 注意如果当前没有活跃的poll, 那么异常会在下一次poll调用时抛出。 在这个例子中, 我们捕捉这个异常, 阻止其继续传播。

4 完整代码

在接下来的例子中, 我们将所有的代码块放到一起来构建一个task, 初始化Consumer, 订阅一个topic列表, 并且执行poll调用直到外部关闭它。

public class ConsumerLoop implements Runnable {
  private final KafkaConsumer<String, String> consumer;
  private final List<String> topics;
  private final int id;

  public ConsumerLoop(int id,
                      String groupId, 
                      List<String> topics) {
    this.id = id;
    this.topics = topics;
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put(“group.id”, groupId);
    props.put(“key.deserializer”, StringDeserializer.class.getName());
    props.put(“value.deserializer”, StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<>(props);
  }

  @Override
  public void run() {
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records) {
          Map<String, Object> data = new HashMap<>();
          data.put("Partition", record.Partition());
          data.put("offset", record.offset());
          data.put("value", record.value());
          System.out.println(this.id + ": " + data);
        }
      }
    } catch (WakeupException e) {
      // ignore for shutdown 
    } finally {
      consumer.close();
    }
  }

  public void shutdown() {
    consumer.wakeup();
  }
}

为了测试这个例子, 你需要一个运行0.9.0.0的kafka发行版的broker, 以及一个提供一些string数据进行消费的topic。 写入一些string数据的最简单的方法就是通过kafka-verifiable-producer.sh脚本。 你需要确保topic有超过一个Partition。 例如对于只有一个运行在localhost的kafka broker和Zookeeper, 你可以在kafka发行包根目录下执行以下的命令:

# bin/kafka-topics.sh --create --topic Consumer-tutorial --replication-factor 1 --Partitions 3 --zookeeper localhost:2181
# bin/kafka-verifiable-producer.sh --topic Consumer-tutorial --max-messages 200000 --broker-list localhost:9092

然后我们可以用以下代码来设置一个有3个成员的Consumer Group, 所有的成员都订阅我们刚刚创建的topic。

public static void main(String[] args) {
  int numConsumers = 3;
  String groupId = "consumer-tutorial-group"
  List<String> topics = Arrays.asList("consumer-tutorial");
  ExecutorService executor = Executors.newFixedThreadPool(numConsumers);

  final List<ConsumerLoop> consumers = new ArrayList<>();
  for (int i = 0; i < numConsumers; i++) {
    ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
    consumers.add(consumer);
    executor.submit(consumer);
  }

  Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
      for (ConsumerLoop consumer : consumers) {
        consumer.shutdown();
      } 
      executor.shutdown();
      try {
        executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        e.printStackTrace;
      }
    }
  });
}

这个例子将三个Runnable Consumer提交到一个executor。 每一个线程都有一个单独的id, 所以你可以看出来哪个线程正在接收数据。 当你准备结束进程时, shutdown hook会被调用, 它会调用wakeup来终止三个线程, 并且等待它们关闭。 以下是程序运行的数据:

2: {Partition=0, offset=928, value=2786}
2: {Partition=0, offset=929, value=2789}
1: {Partition=2, offset=297, value=891}
2: {Partition=0, offset=930, value=2792}
1: {Partition=2, offset=298, value=894}
2: {Partition=0, offset=931, value=2795}
0: {Partition=1, offset=278, value=835}
2: {Partition=0, offset=932, value=2798}
0: {Partition=1, offset=279, value=838}
1: {Partition=2, offset=299, value=897}
1: {Partition=2, offset=300, value=900}
1: {Partition=2, offset=301, value=903}
1: {Partition=2, offset=302, value=906}
1: {Partition=2, offset=303, value=909}
1: {Partition=2, offset=304, value=912}
0: {Partition=1, offset=280, value=841}
2: {Partition=0, offset=933, value=2801}

5 消费者存活

每个Consumer都是被分配它所订阅的topics中所有Partitions的一部分Partitions。 就像是对这些Partitions的Group lock。 只要这个锁被持有, 组内其他的成员就无法从这些分区中读取数据。 当Consumer还存活着, 这就是你想要的, 这是能够避免重复消费的方法(加锁)。 但是如果消费者因为机器或者应用的故障挂掉了, 那么你希望锁能够释放, 并且分区能够分配给其他存活的Consumer。

Kafka group coordination协议通过心跳机制来解决这个问题。 在每次重分配之后, 当前所有的成员会周期性发送心跳给Group coordinator。 只要coordinator接收心跳, 它就会假设成员是存活的。 每次接收到心跳, coordinator都会开始(或重置)一个timer。 当timer过期时, 如果还没有心跳收到, coordinator会将这个成员标记为挂掉, 然后通知其他组内其他成员需要重新分配分区。 timer的时间被称为session timeout, 由客户端的session.timeout.ms进行设置。

props.put(“session.timeout.ms”, “60000”);

session timeout能够保证, 当机器或者应用奔溃, 或者网络分区导致Consumer与coordinator分隔开来时, 锁会被释放。 然而应用故障判断却有点讨巧, 因为有可能Consumer一直在发送心跳给coordinator, 但并意味着应用的状态是正常的。

Consumer的poll api就是设计用于解决这个问题的(发送心跳), 当你调用poll或者其他阻塞API时所有的网络IO都是在前台处理的, Consumer并不使用任何后台线程。 意味着发送给coordinator的心跳只会在poll调用时被发送( 译者注: 这个在0.10.1.0版本改成后台发送心跳), 如果应用停止poll( 无论是处理record的代码抛出了异常或者系统奔溃了),那么心跳将会停止发送, 那么session timeout就会过期, 组内的分区就会重新分配。

唯一一个你需要注意的问题就是: 如果你处理消息的时间比session timeout要长的话, 那么会使得coordinator认为Consumer已经挂了, 导致Partition重新分配。 你应该将session timeout设置的足够大, 以使得这种情况不会发生。 默认session timeout是30s, 但是将它设置成几分钟是不可行的, 这样就会导致coordinator需要更长的时间来检测真正的Consumer奔溃。


6. offset提交

kafka为每个分区的每条消息保持偏移量的值,这个偏移量是该分区中一条消息的唯一标示符。也表示消费者在分区的位置。也就是说,一个位置是5的消费者,说明已经消费了0到4的消息(记录)并下一个接收消息的偏移量设置为5。关于的消费者,实际上“位置”有2个概念。

  • 消费者将给出下一个消息的偏移量的位置,这个是消费者在分区中能看到的最后的偏移量,消费者收到的数据称为poll(long)[长轮询],每次接收消息,偏移量会自动增长,
  • “已提交”的位置是已安全保存的最后偏移量,如果处理失败,这个偏移量会恢复并重新开始。消费者可以自动定期提交偏移量,也可以选择通过调用commitSync来控制,这是阻塞的,直到偏移量提交成功或在提交过程中发生致命的错误,commitAsync是非阻塞式的,当成功或失败时,会引发OffsetCommitCallback。

当一个Consumer Group被建立时, 最开始消费的offset位置是由auto.offset.reset配置来控制的。 一旦Consumer开始消费, 它会根据应用的需要自动提交offset。 每次rebalance,消费开始的position都会被设置成分区最后提交的offset。如果Consumer在为已经成功处理的message提交offset之前奔溃了, 那么重新分配到这个分区的Consumer会重复消费这些消息。 所以, 你提交消息的频率越频繁, 那么因为奔溃而造成的重复消费就会越少。

到现在,我们都假定开启了自动提交。 当我们将enable.auto.commit设置成true(默认是true), Consumer会根据auto.commit.interval.ms的设置, 来周期性自动触发offset提交。 通过减少commit间隔, 你可以减少在奔溃之后消费者需要重新处理的message数量。

为了使用Consumer的commit API,你应该首先通过在Consumer的配置中设置enable.auto.commit为false来禁止自动commit。

props.put(“enable.auto.commit”, “false”);

commit API本身用起来不难, 但是重要的是你如何把它放入到代码中。 下面使用同步commit API是手动commit最简单的方法。

try {
  while (running) {
    ConsumerRecords<String, String> records = Consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());

    try {
      Consumer.commitSync();
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  Consumer.close();
}

使用无参的commitSync方法会提交上次poll返回的最新的offset。 这个调用是阻塞式的, 直到commit成功或者因为不可恢复的错误而失败。 你需要担心的最大的问题就是消息处理时间可能比session timeout长。 当这种情况发生时, coordinator会把这个Consumer踢出Group, 那么Consumer就会抛出CommitFailedException。 你的应用应该处理这个错误, 回滚自从上次成功提交offset以来的消费的message造成的改变。

一般你应该在消息被成功处理完后才去提交offset。 如果Consumer在commit发送之前就奔溃了, 那么message将会被重新处理。 如果commit策略保证最后提交的offset不会超过当前的消费position, 那么你就会获得”at least once(至少一次)”的消息分发语义。

这里写图片描述

提交的offset超前于当前消费位置

通过改变commit策略来保证当前的消费位置绝对不超过最后一次committed offset 如上图所示, 那么你会获得”at most once(至多一次)”的语义。 如果消费者在处理到最后一次committed offset之前就奔溃了, 那么所有在这个间隔之间的数据都会丢失, 但是你可以确保没有消息会被处理超过一遍。 为了实现这种策略, 我们只需要改变commit和消息处理的顺序。

try {
  while (running) {
  ConsumerRecords<String, String> records = Consumer.poll(1000);

  try {
    Consumer.commitSync();
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  Consumer.close();
}

使用自动提交为你提供”at least once”的语义, 因为Consumer保证只commit返回给应用的message的offset。你可能会重复处理的message就被限定在, 两次commit时间间隔(由auto.commit.interval.ms设置)之间你的应用处理的message.

通过使用commit API, 你对可以接受多少重复处理有更好的控制。 在最极端的情况下, 你可以在每次处理完消息都提交offset。如下面所示:

try {
  while (running) {
    ConsumerRecords<String, String> records = Consumer.poll(1000);

    try {
      for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.offset() + ": " + record.value());
        Consumer.commitSync(Collections.singletonMap(record.Partition(), new OffsetAndMetadata(record.offset() + 1)));
      }
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  Consumer.close();
}12345678910111213141516

在这个例子中, 我们在commitSync方法中显式的传递了我们想要提交的offset. 提交的offset应该是你的应用将要读取的下一条message的offset. 当调用无参的commitSync方法时, 就会提交返回给应用的最后一条message的offset(再加上1), 但是我们不能这样做, 因为这样会使得提交的位置超过实际处理的进度。

明显每消费一条消息就commit offset并不合适大多数场景, 因为处理线程必须阻塞, 直到commit请求从服务器返回。 这样会大大减少吞吐量。 一个更加合理的方式就是每N条记录就提交一次, N可以进行调整以获得更好的性能。

commitSync方法的参数是一个键是topic Partition, 值是offsetAndMetadata实例的map. commit API允许在每次commit时包含一些Metadata, 这些数据可以是记录commit的时间, 发送commit的host地址, 或者任何你的应用需要的信息。 在这个例子中, 我们让它为空。

相对每次接收到消息都提交offset, 更加合理的方法就是处理完一个分区的消息后再commit offset. **ConsumerRecords**Collection提供访问其中包含的Partitions集合的方法, 以及每个Partition的messages的方法。 下面的例子就展示了用法。

try {
  while (running) {
    ConsumerRecords<String, String> records = Consumer.poll(Long.MAX_VALUE);
    for (TopicPartition Partition : records.Partitions()) {
      List<ConsumerRecord<String, String>> partitionRecords = records.records(Partition);
      for (ConsumerRecord<String, String> record : partitionRecords)
        System.out.println(record.offset() + ": " + record.value());

      long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
      Consumer.commitSync(Collections.singletonMap(Partition, new OffsetAndMetadata(lastoffset + 1)));
    }
  }
} finally {
  Consumer.close();
}123456789101112131415

到目前为止, 所有的例子都是关注于同步commit API, 但是Consumer同样也有一个异步的API, commitAsync, 使用异步的commit可以使得你的应用拥有更高的吞吐量, 因为你可以不用等待commit返回, 开始处理下一批message. 需要权衡的就是你可能随后发现, commit失败了。 以下的例子展示了基本用法:

try {
  while (running) {
    ConsumerRecords<String, String> records = Consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());

    Consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
                             Exception exception) {
        if (exception != null) {
          // application specific failure handling
        }
      }
    });
  }
} finally {
  Consumer.close();
}

注意到对于commitAsync我们提供了一个callback, 这个方法在Consumer commit结束时被调用(无论commit成功与否), 如果你不需要callback, 那么你可以调用无参的commitAsync


7. 查看Consumer Group信息

当一个Consumer Group处于活跃时, 你可以在命令行中使用位于Kafka发行包bin目录下的Consumer-Groups.sh脚本获取Partition assignment和消费进度。

# bin/kafka-consumer-groups.sh –new-consumer –describe –group consumer-tutorial-group –bootstrap-server localhost:9092 1

例子输出结果如下所示:

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
consumer-tutorial-group, consumer-tutorial, 0, 6667, 6667, 0, consumer-1_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 1, 6667, 6667, 0, consumer-2_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 2, 6666, 6666, 0, consumer-3_/127.0.0.11234

上面显示了Consumer Group中所有的Partition assignment, 哪个消费者实例拥有哪个分区, 以及最新的committed offset(这里的current offset)。 lag表示log end offset与最新的committed offset的差值。 管理员可以使用这个命令来监控以保证Consumer Group跟上了Producer的进度。


8. 使用手动分区分配

在教程前面提到的, 新的Consumer为一些并不需要Consumer Group的应用场景实现了更底层的访问。 推荐使用这个API的理由就是它的易用性。 旧的”simple” Consumer同样也提供了这个功能, 但是需要你自己进行一大堆的异常处理。 使用新的Consumer API, 你只需要分配你想要读取数据的Partition, 然后开始poll data就可以了。

下面的例子展示了如何好使用partitionFor API来将一个topic的所有分区分配给一个Consumer.

List<TopicPartition> Partitions = new ArrayList<>();
for (PartitionInfo Partition : Consumer.partitionsFor(topic))
  Partitions.add(new TopicPartition(topic, Partition.Partition()));
Consumer.assign(Partitions);

subscribe方法类似, assign方法需要传递你想要读取的Partition的列表。 一旦Partition被分配了, poll的工作方式就和之前讲述的一致。

需要注意的是, 无论是使用一个simple Consumer或者是Consumer Group, 所有的offset commit都会经过Group coordinator。 所以如果你想要commit offsets,你仍然需要设置Group.id, 以防止与其他Consumer发生冲突。 如果一个simple Consumer试着使用和一个活跃的Consumer Group相同的Group id来commit offset, 那么coordinator会拒绝这个commit(提交的结果就是抛出CommitFailedException)。 有相同的Group id的simple Consumer却不会有错误。

9. Conclusion

新的Consumer为Kafka社区带来很多的好处, 包括一个清晰的API, 更好的安全性以及更少的依赖。 这篇教程介绍了基本的应用, 集中于poll的语义,以及使用commit API来控制消息传递语义。 虽然还有很多细节没有涉及到, 但是这对于开始使用kafka Consumer来说已经足够了。


Producer客户端

KafkaProducer(org.apache.kafka.clients.producer.KafkaProducer)是一个用于向kafka集群发送数据的Java客户端。该Java客户端是线程安全的,多个线程可以共享同一个producer实例,而且这通常比在多个线程中每个线程创建一个实例速度要快些。

1 代码示例

public class TestProducer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.137.200:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //生产者发送消息 
        String topic = "mytopic";
        Producer<String, String> procuder = new KafkaProducer<String,String>(props);
        for (int i = 1; i <= 10; i++) {
            String value = "value_" + i;
            ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, value);
            procuder.send(msg);
        }
        //列出topic的相关信息
        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>() ;
        partitions = procuder.partitionsFor(topic);
        for(PartitionInfo p:partitions)
        {
            System.out.println(p);
        }

        System.out.println("send message over.");
        procuder.close(100,TimeUnit.MILLISECONDS);
    }
}

2 配置说明

  • bootstrap.servers: 用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2。
  • acks: 生产者需要server端在接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色的都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。
  • retries: 生产者发送失败后,重试的次数。
  • batch.size: 当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求数,有助于提高客户端和服务端的性能。
  • linger.ms: 默认情况下缓冲区的消息会被立即发送到服务端,即使缓冲区的空间并没有被用完。可以将该值设置为大于0的值,这样发送者将等待一段时间后,再向服务端发送请求,以实现每次请求可以尽可能多的发送批量消息。 batch.size和linger.ms是两种实现让客户端每次请求尽可能多的发送消息的机制,它们可以并存使用,并不冲突。
  • buffer.memory:生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。
  • key.serializer,value.serializer:说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。

producer包含一个用于保存待发送消息的缓冲池,缓冲池中消息是还没来得及传输到kafka集群的消息。位于底层的kafka I/O线程负责将缓冲池中的消息转换成请求发送到集群。如果在结束produce时,没有调用close()方法,那么这些资源会发生泄露。


3 消息发送方法send

/*
 *record:key-value形式的待发送数据
 *callback:到发送的消息被borker端确认后的回调函数
 */
public Future<RecordMetadata> send(ProducerRecord<K,V> record); // Equivalent to send(record, null)
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback);

send方法负责将缓冲池中的消息异步的发送到broker的指定topic中。异步发送是指,方法将消息存储到底层待发送的I/O缓存后,将立即返回,这可以实现并行无阻塞的发送更多消息。send方法的返回值是RecordMetadata类型,它含有消息将被投递的partition信息,该条消息的offset,以及时间戳。因为send返回的是Future对象,因此在该对象上调用get()方法将阻塞,直到相关的发送请求完成并返回元数据信息;或者在发送时抛出异常而退出。

阻塞发送的方法如下:

String key = "Key";
String value = "Value";
ProducerRecord<String, String> record = new ProducerRecord<String, String>(key, value);
producer.send(record).get();
ProducerRecord<String,String> record = new ProducerRecord<String,String>("the-topic", key, value);
producer.send(myRecord, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if(e != null) {
                            e.printStackTrace();
                        } else {
                            System.out.println("The offset of the record we just sent is: " + metadata.offset());
                        }
                    }
                });
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/Dustin_CDS/article/details/79434170
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-02-29 21:07:24
  • 阅读 ( 1190 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢