当前位置: 首页 > 图灵资讯 > 技术篇> 连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka?

连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka?

来源:图灵教育
时间:2023-09-05 12:00:01

前言

在介绍Producer端原理之前,大家首先要对其整体结构有一个大致的了解,图如下:

看不懂这张图也没关系。在介绍Producer端原理时,我们将逐一介绍每个部分的含义及其复杂功能。

Main Thread(主线程)

在Main Thread分为四个步骤:KafkaProducer(Kafka生产)、Interceptor(拦截器)、Serializer(序列化器)和Partitioner(分区器);

在上一章中,我们介绍了KafkaProducer端的一些重要参数和使用方法。

本章主要针对剩余的三个部分:Interceptor(拦截器)、Serializer(序列化器)和Partitioner(分区器)进行讲解。

1> Interceptor拦截器

Kafka有两种拦截器,即生产者拦截器(ProducerInterceptor)以及消费者拦截器(ConsumerInterceptor

让我们来看看生产者拦截器界面定义了哪些方法,如下所示:

public interface ProducerInterceptor<K, V> extends Configurable {    /** Kafkaproducer将在[将消息序列化]和[计算分区]之前调用此方法,相应定制信息的操作 */    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);    /** Kafkaproducer会在[消息响应/消息发送失败之前调用此方法 */    void onAcknowledgement(RecordMetadata metadata, Exception exception);    /** 关闭拦截器(因为这种方法可能被KafkaProducer多次调用,所以必须是米等)*/    void close();}

我们可以在Producerrecord类中发送所需的信息和信息 onSend(ProducerRecord<K, V> record) 修改方法,例如,在发送消息之前修改ProducerRecordvalue值,从而改变消息内容。但是,最好不要修改topickeypartition 等待信息,如果要修改,需要确保判断准确,否则会偏离预期效果。以下是Producerecord类中包含的待发消息的属性列表;

public class ProducerRecord<K, V> {    private final String topic;    private final Integer partition;    private final Headers headers;    private final K key;    private final V value;    private final Long timestamp;    ... ...}

然后在Producerrecord类 onAcknowledgement(RecordMetadata metadata, Exception exception) 方法中有以下规律:

【成功发送消息】metadate不为null,exception为null;<br>[消息发送失败]metadate为null,exception不为null

因此,我们可以根据上述规则判断哪些消息发送成功,哪些消息发送失败。如果您想在源代码和注释中显示Recordmetadata类别中包含的“收据”信息,如果您想在源代码和注释中显示:

public final class RecordMetadata {    public static final int UNKNOWN_PARTITION = -1;    private final long offset; // 新闻偏移量    private final long timestamp; // 时间戳    private final int serializedKeySize; // key的序列长度    private final int serializedValueSize; // value序列化长度    private final TopicPartition topicPartition; // 主题所在分区    ... ...}
2> Serializer序列化器

在Producer端将消息发送给Kafka后,待传输的消息对象obj需要转换为 byte[]字节数组 之后才能在网络中传输,所以这里必不可少的一步就是Serializer在Consumer端,接收到的字节数组byte需要[] 然后转换为对象obj,那么这一步就是反序列化器Deserializer了。

Kafka在org.apache.kafka.common.serialization序列化器/反序列化器在目录下提供了多种类型的预置,如下所示:

Deserializer、Serializer、ByteArrayDeserializer、ByteArraySerializer <br>ByteBufferDeserializer、ByteBufferSerializer、BytesDeserializer、BytesSerializer <br>DoubleDeserializer、DoubleSerializer、FloatDeserializer、FloatSerializer<br>IntegerDeserializer、IntegerSerializer、ListDeserializer、ListSerializer<br>LongDeserializer、LongSerializer、ShortDeserializer、ShortSerializer<br>StringDeserializer、StringSerializer、UUIDDeserializer、UUIDSerializer<br>VoidDeserializer、VoidSerializer

因为本章主要介绍了Producer端的执行原理,所以我们只需要关注序列化器Serializer,接口如下:

public interface Serializer<T> extends Closeable {    /** 配置当前类 */    default void configure(Map<String, ?> configs, boolean isKey) {    }/** 将对象data转换为字节数组 */    byte[] serialize(String topic, T data);/** 将对象data转换为字节数组 */    default byte[] serialize(String topic, Headers headers, T data) {        return serialize(topic, data);    }    /** 关闭序列化器(因为这种方法可能被KafkaProducer多次调用,所以必须是米等)*/    @Override    default void close() {    }}

对于Serialize接口中的方法接口,我们需要实现序列化操作。StringSerializer例如,看看它是如何实现的,代码如下:

public class StringSerializer implements Serializer<String> {    private String encoding = StandardCharsets.UTF_8.name(); // 默认编码为UTF-8    @Override    public void configure(Map<String, ?> configs, boolean isKey) {        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";        // 首先,试着从configs中获得"key.serializer.encoding"或"value.serializer.encoding"所配置的值        Object encodingValue = configs.get(propertyName);        if (encodingValue == null)            // 假如没有配置,试着从configs中获得"serializer.encoding"所配置的值            encodingValue = configs.get("serializer.encoding");        if (encodingValue instanceof String)            encoding = (String) encodingValue; // 若配置了自定义编码,给encoding赋值;否则默认UTF-8    }    @Override    public byte[] serialize(String topic, String data) {        try {            if (data == null) return null;            else                 return data.getBytes(encoding); // 字节数组通过调用StringggetBytes获得        } catch (UnsupportedEncodingException e) {            throw new SerializationException(...);        }    }}

StringSerializer在类别中,序列化非常简单,即通过调用String的getBytes获得字节数组;此外,还可以配置自定义代码。通过将key设置为configs:"key.serializer.encoding"、"value.serializer.encoding"、"serializer.encoding"这三个,包括serializer.encoding优先级最低。如果这三个key没有配置,那么 默认编码类型是"UTF-8" ;

如果Kafka内置的这些序列化器不能满足需求,则可以自行实现定制序列化器(例如:MuseSerializer),然后,使用时,在properties指定配置:

Properties properties = new Properties();<br>properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MuseSerializer.class.getName());<br>properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MuseSerializer.class.getName());

3> Partioner分区器

构造ProducerRecord在实例对象中,如果在结构方法中指定了partition字段,则不需要分区器;否则,需要partitioner分区器key字段计算分区值。Producerrecord的结构函数如下:

当我们不在的时候ProducerRecord在构造函数中指定partition字段时,需要分区器工作,所有分区器都需要实现接口partitioner,该接口有以下三种方法:

public interface Partitioner extends Configurable, Closeable {    /** 计算给定记录的分区 */    int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);    /** 关闭分区器(因为这种方法可能会被KafkaProducer多次调用,所以必须是米等)*/    void close();    /** 通知分区器即将创建新的批处理。使用sticky分区器时,这种方法可以更改新批选择的sticky分区 */    default void onNewBatch(String topic, Cluster cluster, int prevPartition) {    }}

Kafka中默认的分区器是defaultpartitioner。这里有两个逻辑判断分支,即keyBytes是否为null(keyBytes是key字节数组

【keyBytes不是null】keyBytesmurmur2哈希计算,然后基于所有指定Topic下的分片总数计算取余寻址。<br>【keyBytes为null】需要调用StickyPartioncachepartition(...)计算方法。

分区逻辑如下:

public class DefaultPartitioner implements Partitioner {    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();... ...        public int partition(String topic, Object key, byte[] keyBytes,                          Object value, byte[] valueBytes, Cluster cluster) {        return partition(topic, key, keyBytes, value, valueBytes, cluster,                          cluster.partitionsForTopic(topic).size()); // Topic下[所有分片]总数    }    public int partition(String topic, Object key, byte[] keyBytes, Object value,                          byte[] valueBytes, Cluster cluster, int numPartitions) {        // 如果没有key的序列化值        if (keyBytes == null)             return stickyPartitionCache.partition(topic, cluster);        // 哈希计算keyBytes,并在Topic下[所有分片]中寻找地址        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;    }... ...}

如果keyBytes==null,如何在StickyPartitionCache中计算分区值?首先,主题topic去缓存keyindexCache获得分区值part,如果part如果不是空的,直接返回part,搞定!!

如果part等于null,则表示缓存中没有缓存topic的分区值,因此需要计算,计算步骤如下:

[步骤1]获取topic下面所有的分片集合partitions;<br>[步骤2]获取topic下面是所有有效的分片集合availablePartitions;<br>[步骤3]如果没有有效的分片,则基于随机数获得随机数partitions中取余寻址;<br>[步骤4]如果有一个有效的分片,则获得该分片值;<br>[步骤5]如果有多个有效分片,则获得一个基于随机数的随机数availablePartitions中取余寻址;<br>【步骤6】将topic分区值维护到缓存indexCache并返回分区值;

下面是partition方法的源代码和注释,如下所示:

public int partition(String topic, Cluster cluster) {    Integer part = indexCache.get(topic); // 试着在缓存中获取,如果获得,则直接返回    if (part == null)         return nextPartition(topic, cluster, -1); // 获取主题topic的分区号,将其维护到缓存indexcache中    return part;}public int nextPartition(String topic, Cluster cluster, int prevPartition) {    // 在topic下获得所有分片集    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);    Integer oldPart = indexCache.get(topic); // 试着在缓存中获得分片号,作为旧分片oldPart    Integer newPart = oldPart;    if (oldPart == null || oldPart == prevPartition) {        // 在Topic下获得所有[有效分片]集合        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);        // 若无有效分片,获得随机数,在partitions中,        if (availablePartitions.size() < 1) {            Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());            newPart = random % partitions.size();        }                 // 若有一个有效的分片,在这里分配        else if (availablePartitions.size() == 1) {            newPart = availablePartitions.get(0).partition();        }         // 若有多个有效分片,获得随机数,在availablePartions中,        else {            while (newPart == null || newPart.equals(oldPart)) {                int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());                newPart = availablePartitions.get(random % availablePartitions.size()).partition();            }        }                // 维护到缓存indexcache,主题Topickeyy        if (oldPart == null) indexCache.putIfAbsent(topic, newPart);        else indexCache.replace(topic, prevPartition, newPart);        return indexCache.get(topic); // 获得主题Topic的分区号    }        return indexCache.get(topic); // 获得主题Topic的分区号}

以下是今天文章的内容:

写作并不容易。作者几个小时甚至几天就完成了一篇文章。我只想换取你几秒钟 点赞 & 分享 。

更多技术干货,欢迎关注微信官方账号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」