当前位置: 首页 > 图灵资讯 > 技术篇> Kafka系列(二)将消息数据写入Kafka系统--生产者【异步发送、同步发送、单线程发送、多线程发送、配置生产者属性、自定义序列化、自定义主题分区】

Kafka系列(二)将消息数据写入Kafka系统--生产者【异步发送、同步发送、单线程发送、多线程发送、配置生产者属性、自定义序列化、自定义主题分区】

来源:图灵教育
时间:2024-02-04 13:50:34

(Kafka系列)

转自 Kafka不难学!入门,进步,商业实战

发送消息到 Kafka 主题

Kafka 0.10.0.0 以后的版本重构了生产者代码的底层实现。kafka.producer.Producer类 org.apache.kafka.clients.producer.KafkaProducer 替换Kafka 系统支持两种不同的发送方式-同步模式(Sync)和异步模式(ASync)。

了解异步模式

在 Kafka 0.10.0.0 在以后的版本中,客户端应用程序调用生产者的应用接口,默认使用异步发送信息。当生产者客户端通过异步模式发送信息时,通常会调用回调函数 send()发送消息的方法。生产者端收到信息。 Kafka 回调函数将在代理节点响应后触发。

  1. 如果生产者的客户端和客户端在什么场景下需要使用异步模式 Kafka 网络延迟(100ms)存在于集群节点之间,此时发送 10 如果有消息记录,延迟将达到 1s。大数据场景中有大量的消息记录,发送的消息记录远不止 十、延迟将非常严重。在大数据场景中,如果采用异步模式发送消息记录,几乎没有时间,则可以通过回调函数知道消息发送的结果。
  2. 例如,异步模式数据的写入过程(ip login)有6个分区。当生产者客户端写入消息记录时,消息记录将首先写入缓冲区,生产者客户端将直接得到结果(此时,缓冲区的数据尚未写入) Kafka代理节点中主题的某个分区)。之后,==缓冲区==中的数据将通过异步模式发送到 Kafka 主题在代理节点的某个分区。
//将消息记录对象实例化,用于保存主题名、分区索引、键、值和时间戳ProducerRecord<byte[],byte[]> record =new ProducerRecord<byte[],byte[]>("ip login", key, value);//调用 send()方法和回调函数producer.send(myRecord,new Callback() {public void onCompletion (RecordMetadata metadata, Exception e){if (e != null) {e.printStackTrace();} else {System.out.println("The offset of the record we just sent is:" + metadata.offset());}}};

提交消息记录 send()方法结束后,实际上将消息记录放入缓冲区的发送队列,==然后通过后台线程从缓冲区队列中取出发送;发送成功后,将触发send方法的回调函数-Callback.==

了解同步模式

生产者客户端通过 send()方法实现同步模式发送消息,并返回一个 Future 同时调用get()方法等待对象 Future 对象,看 send()方法发送是否成功。

  1. 如果要收集用户访问网页的数据,==在什么场景下使用同步模式? Kafka 在使用同步模式时,需要立即知道新闻是否成功地写入集群代理节点。==
// 将字符串转换为字节数组byte[] key = "key".getBytes();byte[] value ="value".getBytes();// 实例化一个消息记录对象,用于保存主题名、分区索引、键、值和时间戳ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("ip_login",key, value);//调用 send()函数之后,再次通过 get()等待返回结果的方法.send(record).get();

这里通过调用 Future 接口中的 get()等待方法 Kafka 集群代理节点(Broker)如果返回状态 Producer 如果发送消息记录成功,则返回 RecordMetadata 该对象可用于查看消息记录的偏移量(Offset)。

线程发送信息的步骤

在 Kafka 在系统中,通常使用线程发送信息,以确保生产者客户端应用程序的独立运行。创建一个简单的生产者应用程序的步骤如下。(1)实例 Properties 类别对象,==配置生产者响应机制==。必须设置以下三个属性。其他属性一般都有默认值,可以根据需要添加。

  • bootstrap.servers:配置Kafka集群代理节点地址;
  • key.serializer:序列化新闻主键;
  • value.serializer:序列化新闻数据内容,

(2)根据属性对象实例化 KafkaProducer.(3)通过实例化Producerrecord 对象,以“键-值”的形式封装==消息记录==。(4)通过调用 KafkaProducer 对象中==具有回调函数 发送消息的send方法 Kafka 集群==。(5)关闭Kafkaproducer 对象,释放连接资源,

生产者使用单线程发送消息
/** * 生产者客户端应用程序的实现. */public class JProducer extends Thread {private final Logger LOG = LoggerFactory.getLogger(JProducer.class);/** Kafka连接信息配置. */public Properties configure() {Properties props = new Properties();props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址props.put("acks", "1"); // 设置应答模式, 1表示Kafka代理节点返回结果props.put("retries", 0); // props重试次数.put("batch.size", 16384); // 大小propsprops批量提交.put("linger.ms", 1); // props延迟提交.put("buffer.memory", 33554432); // props缓冲大小.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 主键props序列化.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 序列化值returnnn props;}public static void main(String[] args) {JProducer producer = new JProducer();producer.start();}/** 单线程制造商的客户端实现. */public void run() {Producer<String, String> producer = new KafkaProducer<>(configure());// 发送100个JSON格式的数据for (int i = 0; i < 100; i++) {// JSONojectect封装JSON格式 json = new JSONObject();json.put("id", i);json.put("ip", "192.168.0." + i);json.put("date", new Date().toString());String k = "key" + i;// 异步发送producer.send(new ProducerRecord<String, String>("test_kafka_game_x", k, json.toJSONString()), new Callback() {public void onCompletion(RecordMetadata metadata, Exception e) {if (e != null) {LOG.error("Send error, msg is " + e.getMessage());} else {LOG.info("The offset of the record we just sent is: " + metadata.offset());}}});}try {sleep(3000);// 间隔3秒} catch (InterruptedException e) {LOG.error("Interrupted thread error, msg is " + e.getMessage());}producer.close();// 关闭生产者对象}

这里的主题只有一个分区和一个副本。因此,发送的所有信息都将写入同一分区。如果您想在发送消息后获得一些返回信息(如获取消息的偏移、分区索引值、提交时间戳等),您可以通过回调函数 CallBack 返回的 RecordMetadata 实现对象。由于 Kafka 系统的生产者对象是线程安全的,因此可以通过增加生产者对象的线程数量来提高 Kafka 系统吞吐量。

制造商用多线程发送消息
public class JProducerThread extends Thread {// 创建日志对象privatete final Logger LOG = LoggerFactory.getLogger(JProducerThread.class);// 最大线程数privatete final static int MAX_THREAD_SIZE = 6;/** Kafka连接信息配置. */public Properties configure() {Properties props = new Properties();props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址props.put("acks", "1"); // 设置应答模式, 1表示Kafka代理节点返回结果props.put("retries", 0); // props重试次数.put("batch.size", 16384); // 大小propsprops批量提交.put("linger.ms", 1); // props延迟提交.put("buffer.memory", 33554432); // props缓冲大小.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 主键props序列化.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// props序列化值.put("partitioner.class", "org.smartloli.kafka.game.x.book_4.JPartitioner");// 指定自定义分区returnn props;}public static void main(String[] args) {// 在ExecutorService中创建一个固定线程数量的线程池 executorService = Executors.newFixedThreadPool(MAX_THREAD_SIZE);// executorservicer提交任务.submit(new JProducerThread());// executorservice关闭线程池.shutdown();}/** 单线程制造商的客户端实现. */public void run() {Producer<String, String> producer = new KafkaProducer<>(configure());// 发送100个JSON格式的数据for (int i = 0; i < 10; i++) {// JSONojectect封装JSON格式 json = new JSONObject();json.put("id", i);json.put("ip", "192.168.0." + i);json.put("date", new Date().toString());String k = "key" + i;// 异步发送producer.send(new ProducerRecord<String, String>("ip_login_rt", k, json.toJSONString()), new Callback() {public void onCompletion(RecordMetadata metadata, Exception e) {if (e != null) {LOG.error("Send error, msg is " + e.getMessage());} else {LOG.info("The offset of the record we just sent is: " + metadata.offset());}}});}try {sleep(3000);// 间隔3秒} catch (InterruptedException e) {LOG.error("Interrupted thread error, msg is " + e.getMessage());}producer.close();// 关闭生产者对象}
生产者属性的配置

在这里插入图片描述在这里插入图片描述在这里插入图片描述

保存对象的各种属性一序列化序列化

在分布式环境中,任何格式的数据都将分解为二进制,以便存储在文件中或在网络上传输。序列化是用一系列字节描述对象,以解决读写操作中对象引起的问题。序列化可以将对象的状态写入数据流,并在文件或数据库中进行网络传输或保存,必要时取出数据流,重建相同的对象。

  1. 为什么在传统的企业应用中需要序列化,不同的组件分布在不同的系统和网络中?==如果两个组件想要通信,它们之间必须有数据转换机制。实现此过程需要按照协议传输对象,这意味着接收端需要知道发送端使用的协议才能重建对象,以确保两个组件之间的通信安全。==
public class JObjectSerial implements Serializable {private static Logger LOG = LoggerFactory.getLogger(JObjectSerial.class);/** * ID的序列化版本. */private static final long serialVersionUID = 1L;public byte id = 1; // IDpubliclicicc用户ID byte money = 100; // 充值金额/** 实例入口函数. */public static void main(String[] args) {try {FileOutputStream fos = new FileOutputStream("/tmp/salary.out"); // 实例化输出流对象Objectoutstream oos = new ObjectOutputStream(fos);// 实例化一个对象输出流JObjectserial jos = new JObjectSerial(); // 实例化序列化类型oos化.writeObject(jos); // 写入对象OOS.flush(); // 刷新数据流oos.close();// 关闭连接} catch (Exception e) {LOG.error("Serial has error, msg is " + e.getMessage());// 打印异常信息}
序列化对象的存储格式

在这里插入图片描述

自己实现 序列化步骤

在这里插入图片描述如果使用原始序列化,则需要将传输的内容拼接成字符串或转换为字符数组或其他类型,以便在实现代码时更加麻烦。而 Kafka 为了解决这一问题,提供了一个序列接口,使用户能够定制对象的序列化方法来完成对象的传输。下面的例子将演示生产者客户端应用程序中的序列化用法 Serializable 接口序列化对象。

1. 创建序列化对象
/** * 声明序列化类. *  * @author smartloli. * *         Created by Apr 30, 2018 */public class JSalarySerial implements Serializable {/** * ID的序列化版本. */private static final long serialVersionUID = 1L;private String id;// IDprivatete用户IDprivate String salary;// 金额public String getId() {return id;}public void setId(String id) {this.id = id;}public String getSalary() {return salary;}public void setSalary(String salary) {this.salary = salary;}// 打印对象的属性值@Overridepublic String toString() {return "JSalarySerial [id=" + id + ", salary=" + salary + "]";}}
2. 编制序列化工具类
/** * 包装一个序列化的工具类. *  * @author smartloli. * *         Created by Apr 30, 2018 */public class SerializeUtils {/** 实现序列化. */public static byte[] serialize(Object object) {try {return object.toString().getBytes("UTF8");// 返回字节数组} catch (Exception e) {e.printStackTrace(); // 抛出异常信息}return null;}/** 实现反序列化. */public static <T> Object deserialize(byte[] bytes) {try {return new String(bytes, "UTF8");// 反序列化} catch (Exception e) {e.printStackTrace();}return null;}}
3. 编写自定义序列逻辑代码
/** * 实现自定义序列化. *  * @author smartloli. * *         Created by Apr 30, 2018 */public class JSalarySeralizer implements Serializer<JSalarySerial> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}/** 实现自定义序列化. */@Overridepublic byte[] serialize(String topic, JSalarySerial data) {return SerializeUtils.serialize(data);}@Overridepublic void close() {}}
4. 为生产者编写应用程序
/** * 自定义序列化, 向Kafkaka发送消息. *  * @author smartloli. * *         Created by Apr 30, 2018 */public class JProducerSerial extends Thread {private static Logger LOG = LoggerFactory.getLogger(JProducerSerial.class);/** Kafka连接信息配置. */public Properties configure() {Properties props = new Properties();props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址props.put("acks", "1"); // 设置响应模式, 1表示Kafka代理节点返回结果props.put("retries", 0); // props重试次数.put("batch.size", 16384); // 大小propsprops批量提交.put("linger.ms", 1); // props延迟提交.put("buffer.memory", 33554432); // props缓冲大小.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 主键props序列化.put("value.serializer", "org.smartloli.kafka.game.x.book_4.serialization.JSalarySeralizer");// 自定义序列化值returnn props;}public static void main(String[] args) {JProducerSerial producer = new JProducerSerial();producer.start();}/** 单线程制造商的客户端实现. */public void run() {Producer<String, JSalarySerial> producer = new KafkaProducer<>(configure());JSalarySerial jss = new JSalarySerial();jss.setId("2018");jss.setSalary("100");producer.send(new ProducerRecord<String, JSalarySerial>("test_topic_ser_des", "key", jss), new Callback() {public void onCompletion(RecordMetadata metadata, Exception e) {if (e != null) {LOG.error("Send error, msg is " + e.getMessage());} else {LOG.info("The offset of the record we just sent is: " + metadata.offset());}}});try {sleep(3000);// 间隔3秒} catch (InterruptedException e) {LOG.error("Interrupted thread error, msg is " + e.getMessage());}producer.close();// 关闭生产者对象}}}

在这里插入图片描述

自定义主题分区

在这里插入图片描述

编写自定义主题分区算法
/** * 实现自定义分区分类. * * @author smartloli. * *         Created by Apr 30, 2018 */public class JPartitioner implements Partitioner {@Overridepublic void configure(Map<String, ?> configs) {}/** Kafka主题分区索引算法. */@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int partition = 0;String k = (String) key;partition = Math.abs(k.hashCode()) % cluster.partitionCountForTopic(topic);return partition;}@Overridepublic void close() {}}
演示自定义分区的作用
/** * 生产者客户端应用程序的实现. *  * @author smartloli. * *         Created by Apr 27, 2018 */public class JProducerThread extends Thread {// 创建日志对象privatete final Logger LOG = LoggerFactory.getLogger(JProducerThread.class);// 最大线程数privatete final static int MAX_THREAD_SIZE = 6;/** Kafka连接信息配置. */public Properties configure() {Properties props = new Properties();props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址props.put("acks", "1"); // 设置应答模式, 1表示Kafka代理节点返回结果props.put("retries", 0); // props重试次数.put("batch.size", 16384); // 大小propsprops批量提交.put("linger.ms", 1); // props延迟提交.put("buffer.memory", 33554432); // props缓冲大小.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 主键props序列化.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// props序列化值.put("partitioner.class", "org.smartloli.kafka.game.x.book_4.JPartitioner");// 指定自定义分区returnn props;}public static void main(String[] args) {// 在ExecutorService中创建一个固定线程数量的线程池 executorService = Executors.newFixedThreadPool(MAX_THREAD_SIZE);// executorservicer提交任务.submit(new JProducerThread());// executorservice关闭线程池.shutdown();}/** 实现单线程制造商客户端. */public void run() {Producer<String, String> producer = new KafkaProducer<>(configure());// 发送100个JSON格式的数据for (int i = 0; i < 10; i++) {// JSONojectect封装JSON格式 json = new JSONObject();json.put("id", i);json.put("ip", "192.168.0." + i);json.put("date", new Date().toString());String k = "key" + i;// 异步发送producer.send(new ProducerRecord<String, String>("ip_login_rt", k, json.toJSONString()), new Callback() {public void onCompletion(RecordMetadata metadata, Exception e) {if (e != null) {LOG.error("Send error, msg is " + e.getMessage());} else {LOG.info("The offset of the record we just sent is: " + metadata.offset());}}});}try {sleep(3000);// 间隔3秒} catch (InterruptedException e) {LOG.error("Interrupted thread error, msg is " + e.getMessage());}producer.close();// 关闭生产者对象}

在这里插入图片描述