博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka producer 序列化& 反序列化 & 分区分配计算
阅读量:4322 次
发布时间:2019-06-06

本文共 2452 字,大约阅读时间需要 8 分钟。

消息的序列化在 Interceptor 之后,分配分区之前执行。

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。

ProducerRecord 包括

private final String topic;//所要发送的topicprivate final Integer partition;//指定的partition序号private final Headers headers;//一组键值对,与RabbitMQ中的headers类似private final K key;//消息的keyprivate final V value;//消息的value,即消息体private final Long timestamp;//消息的时间戳

在KafkaProducer的源码中,计算分区时调用的是下面的partition()方法:

private int partition(ProducerRecord
record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);}

默认的分区提供者是 org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法实现如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {    List
partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List
availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }}

没有指定 key,以一种随机的方式转发。如果key不为null则使用称之为murmur的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。

可自定义分区函数:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {        List
partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (null == keyBytes || keyBytes.length<1) { return atomicInteger.getAndIncrement() % numPartitions; } //借用String的hashCode的计算方式 int hash = 0; for (byte b : keyBytes) { hash = 31 * hash + b; } return hash % numPartitions; }

 

 

 

 

转载于:https://www.cnblogs.com/hansc-blog/p/9289815.html

你可能感兴趣的文章
七天入门统计力学-第2天 系综与配分函数
查看>>
ubuntu server 10.04 apache2配置多个虚拟主机
查看>>
python标准库xml.etree.ElementTree的bug
查看>>
Tomcat服务器介绍和使用
查看>>
IOS网络方面(异步请求)
查看>>
day6 python学习
查看>>
事务分类
查看>>
《程序是怎样跑起来的》第四章读后感
查看>>
遍历datatable的几种方法(C# )
查看>>
Oracle记录(三) Scott用户的表结构
查看>>
centos静默式安装Oracle11g
查看>>
软件评测师下午题笔记
查看>>
性能测试的概念
查看>>
JavaScript中的函数上下文和apply,call
查看>>
中文排序
查看>>
少数股东损益
查看>>
SecureCRT的安装
查看>>
POJ2635-The Embarrassed Cryptographer
查看>>
css中font-family的中文字体
查看>>
学习笔记:CentOS 7学习之十二:查找命令
查看>>