前言
在介绍Producer端原理之前,大家首先要对其整体结构有一个大致的了解,图如下:
看不懂这张图也没关系。在介绍Producer端原理时,我们将逐一介绍每个部分的含义及其复杂功能。
Main Thread(主线程)在Main Thread分为四个步骤:KafkaProducer
(Kafka生产)、Interceptor
(拦截器)、Serializer
(序列化器)和Partitioner
(分区器);
在上一章中,我们介绍了KafkaProducer端的一些重要参数和使用方法。
本章主要针对剩余的三个部分:Interceptor(拦截器)、Serializer(序列化器)和Partitioner(分区器)进行讲解。
1> Interceptor拦截器Kafka有两种拦截器,即生产者拦截器(ProducerInterceptor
)以及消费者拦截器(ConsumerInterceptor
)
让我们来看看生产者拦截器界面定义了哪些方法,如下所示:
public interface ProducerInterceptor<K, V> extends Configurable { /** Kafkaproducer将在[将消息序列化]和[计算分区]之前调用此方法,相应定制信息的操作 */ ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); /** Kafkaproducer会在[消息响应/消息发送失败之前调用此方法 */ void onAcknowledgement(RecordMetadata metadata, Exception exception); /** 关闭拦截器(因为这种方法可能被KafkaProducer多次调用,所以必须是米等)*/ void close();}
我们可以在Producerrecord类中发送所需的信息和信息 onSend(ProducerRecord<K, V> record) 修改方法,例如,在发送消息之前修改ProducerRecordvalue
值,从而改变消息内容。但是,最好不要修改topic
、key
和partition
等待信息,如果要修改,需要确保判断准确,否则会偏离预期效果。以下是Producerecord类中包含的待发消息的属性列表;
public class ProducerRecord<K, V> { private final String topic; private final Integer partition; private final Headers headers; private final K key; private final V value; private final Long timestamp; ... ...}
然后在Producerrecord类 onAcknowledgement(RecordMetadata metadata, Exception exception) 方法中有以下规律:
【成功发送消息】metadate
不为null
,exception为null
;<br>[消息发送失败]metadate为null
,exception不为null
;
因此,我们可以根据上述规则判断哪些消息发送成功,哪些消息发送失败。如果您想在源代码和注释中显示Recordmetadata类别中包含的“收据”信息,如果您想在源代码和注释中显示:
public final class RecordMetadata { public static final int UNKNOWN_PARTITION = -1; private final long offset; // 新闻偏移量 private final long timestamp; // 时间戳 private final int serializedKeySize; // key的序列长度 private final int serializedValueSize; // value序列化长度 private final TopicPartition topicPartition; // 主题所在分区 ... ...}
2> Serializer序列化器在Producer端将消息发送给Kafka后,待传输的消息对象obj需要转换为 byte[]字节数组 之后才能在网络中传输,所以这里必不可少的一步就是Serializer
在Consumer端,接收到的字节数组byte需要[] 然后转换为对象obj,那么这一步就是反序列化器Deserializer
了。
Kafka在org.apache.kafka.common.serialization
序列化器/反序列化器在目录下提供了多种类型的预置,如下所示:
Deserializer、Serializer、ByteArrayDeserializer、ByteArraySerializer <br>ByteBufferDeserializer、ByteBufferSerializer、BytesDeserializer、BytesSerializer <br>DoubleDeserializer、DoubleSerializer、FloatDeserializer、FloatSerializer<br>IntegerDeserializer、IntegerSerializer、ListDeserializer、ListSerializer<br>LongDeserializer、LongSerializer、ShortDeserializer、ShortSerializer<br>StringDeserializer、StringSerializer、UUIDDeserializer、UUIDSerializer<br>VoidDeserializer、VoidSerializer
因为本章主要介绍了Producer端的执行原理,所以我们只需要关注序列化器Serializer
,接口如下:
public interface Serializer<T> extends Closeable { /** 配置当前类 */ default void configure(Map<String, ?> configs, boolean isKey) { }/** 将对象data转换为字节数组 */ byte[] serialize(String topic, T data);/** 将对象data转换为字节数组 */ default byte[] serialize(String topic, Headers headers, T data) { return serialize(topic, data); } /** 关闭序列化器(因为这种方法可能被KafkaProducer多次调用,所以必须是米等)*/ @Override default void close() { }}
对于Serialize接口中的方法接口,我们需要实现序列化操作。StringSerializer
例如,看看它是如何实现的,代码如下:
public class StringSerializer implements Serializer<String> { private String encoding = StandardCharsets.UTF_8.name(); // 默认编码为UTF-8 @Override public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; // 首先,试着从configs中获得"key.serializer.encoding"或"value.serializer.encoding"所配置的值 Object encodingValue = configs.get(propertyName); if (encodingValue == null) // 假如没有配置,试着从configs中获得"serializer.encoding"所配置的值 encodingValue = configs.get("serializer.encoding"); if (encodingValue instanceof String) encoding = (String) encodingValue; // 若配置了自定义编码,给encoding赋值;否则默认UTF-8 } @Override public byte[] serialize(String topic, String data) { try { if (data == null) return null; else return data.getBytes(encoding); // 字节数组通过调用StringggetBytes获得 } catch (UnsupportedEncodingException e) { throw new SerializationException(...); } }}
在
StringSerializer
在类别中,序列化非常简单,即通过调用String的getBytes获得字节数组;此外,还可以配置自定义代码。通过将key设置为configs:"key.serializer.encoding
"、"value.serializer.encoding
"、"serializer.encoding
"这三个,包括serializer.encoding优先级最低。如果这三个key没有配置,那么 默认编码类型是"UTF-8" ;
如果Kafka内置的这些序列化器不能满足需求,则可以自行实现定制序列化器(例如:MuseSerializer),然后,使用时,在properties
指定配置:
3> Partioner分区器Properties properties = new Properties();<br>properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
MuseSerializer.class.getName()
);<br>properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,MuseSerializer.class.getName()
);
构造ProducerRecord
在实例对象中,如果在结构方法中指定了partition字段,则不需要分区器;否则,需要partitioner分区器key字段
计算分区值。Producerrecord的结构函数如下:
当我们不在的时候ProducerRecord
在构造函数中指定partition字段时,需要分区器工作,所有分区器都需要实现接口partitioner,该接口有以下三种方法:
public interface Partitioner extends Configurable, Closeable { /** 计算给定记录的分区 */ int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** 关闭分区器(因为这种方法可能会被KafkaProducer多次调用,所以必须是米等)*/ void close(); /** 通知分区器即将创建新的批处理。使用sticky分区器时,这种方法可以更改新批选择的sticky分区 */ default void onNewBatch(String topic, Cluster cluster, int prevPartition) { }}
Kafka中默认的分区器是defaultpartitioner。这里有两个逻辑判断分支,即keyBytes是否为null(keyBytes是key字节数组
)
【keyBytes不是null】keyBytes
murmur2
哈希计算,然后基于所有指定Topic下的分片总数
计算取余寻址。<br>【keyBytes为null】需要调用StickyPartioncachepartition(...)
计算方法。
分区逻辑如下:
public class DefaultPartitioner implements Partitioner { private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();... ... public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size()); // Topic下[所有分片]总数 } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) { // 如果没有key的序列化值 if (keyBytes == null) return stickyPartitionCache.partition(topic, cluster); // 哈希计算keyBytes,并在Topic下[所有分片]中寻找地址 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }... ...}
如果keyBytes==null
,如何在StickyPartitionCache中计算分区值?首先,主题topic
去缓存keyindexCache
获得分区值part
,如果part
如果不是空的,直接返回part
,搞定!!
如果part
等于null,则表示缓存中没有缓存topic的分区值,因此需要计算,计算步骤如下:
[步骤1]获取
topic
下面所有的分片集合partitions
;<br>[步骤2]获取topic
下面是所有有效的分片集合availablePartitions
;<br>[步骤3]如果没有有效的分片,则基于随机数获得随机数partitions
中取余寻址;<br>[步骤4]如果有一个有效的分片,则获得该分片值;<br>[步骤5]如果有多个有效分片,则获得一个基于随机数的随机数availablePartitions
中取余寻址;<br>【步骤6】将topic
和分区值
维护到缓存indexCache
并返回分区值;
下面是partition方法的源代码和注释,如下所示:
public int partition(String topic, Cluster cluster) { Integer part = indexCache.get(topic); // 试着在缓存中获取,如果获得,则直接返回 if (part == null) return nextPartition(topic, cluster, -1); // 获取主题topic的分区号,将其维护到缓存indexcache中 return part;}public int nextPartition(String topic, Cluster cluster, int prevPartition) { // 在topic下获得所有分片集 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); // 试着在缓存中获得分片号,作为旧分片oldPart Integer newPart = oldPart; if (oldPart == null || oldPart == prevPartition) { // 在Topic下获得所有[有效分片]集合 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); // 若无有效分片,获得随机数,在partitions中, if (availablePartitions.size() < 1) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } // 若有一个有效的分片,在这里分配 else if (availablePartitions.size() == 1) { newPart = availablePartitions.get(0).partition(); } // 若有多个有效分片,获得随机数,在availablePartions中, else { while (newPart == null || newPart.equals(oldPart)) { int random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // 维护到缓存indexcache,主题Topickeyy if (oldPart == null) indexCache.putIfAbsent(topic, newPart); else indexCache.replace(topic, prevPartition, newPart); return indexCache.get(topic); // 获得主题Topic的分区号 } return indexCache.get(topic); // 获得主题Topic的分区号}
以下是今天文章的内容:
写作并不容易。作者几个小时甚至几天就完成了一篇文章。我只想换取你几秒钟 点赞 & 分享 。
更多技术干货,欢迎关注微信官方账号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」