一、maven 添加kafka依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.13</artifactId> <version>3.2.1</version></dependency>
配置log4j2日志maven配置
如果使用spring, boot框架,需要先排除spring boot start 依赖loging<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions></dependency>#如果使用立面模式,只需添加以下依赖性<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId></dependency><dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId></dependency>#因为log4j-slf4j-log4j-api和log4j-core依赖#如果不使用门面模式,然后需要添加以下依赖<dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId></dependency><dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId></dependency>#这里没有添加版本号,因为使用了spring boot框架
log4j2.xml 配置文件需要放置resource目录
<?xml version="1.0" encoding="utf-8" ?><Configuration status="error"> <!--全局属性--> <properties> <!--项目名称--> <Property name="PROJECT_NAME">test</Property> <!-- 定义日志存储的路径 --> <property name="LOG_FILE_PATH" value="${PROJECT_NAME}/logs"/> <!-- 格式化输出:%date表示日期,%thread表示线程名,%-5level:从左显示5个字符宽度 %msg:日志新闻,%n是换行符--> <!-- %logger{36} 表示 Logger 最长36个字符的名称 --> <property name="LOG_PATTERN" value="%date{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger{36} - %msg%n"/> </properties> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.sss} %t %-5level %logger{36} - %msg%n" /> </Console> <!-- RolllingFile也可以用RollingRandomacesfile,后者可以比前者提高20-200%的性能 --> <RollingRandomAccessFile name="RollingRandomAccessFile" fileName="${LOG_FILE_PATH}/info.log" filePattern="${LOG_FILE_PATH}/$${date:yyyyMMdd}/$${date:HH}/info-%d{yyyyMMddHH}-%i.log"> <!--过滤器 只统计 level 以上级别的信息(onMatch),其它顺序处理(onMismatch)--> <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="NEUTRAL"/> <!--日志信息 格式--> <PatternLayout pattern="${LOG_PATTERN}"/> <!--指定规则--> <Policies> <!--当系统启动时,生成一个新的日志文件,以触发之前的日志策略--> <OnStartupTriggeringPolicy /> <!--interval属性用于指定滚动多长时间,与filePatern有关--> <TimeBasedTriggeringPolicy modulate="true" interval="1"/> <!--根据日志的大小拆分--> <SizeBasedTriggeringPolicy size="100 MB"/> </Policies> <!--在同一目录下,最大文件编号将重新开始覆盖。--同一目录下的最大文件编号,超过时重新开始覆盖。也就是%。i,默认7--> <DefaultRolloverStrategy max="30"> <!--要删除的路径和目录的深度--> <Delete basePath="${LOG_FILE_PATH}/${date:yyyyMMdd}/" maxDepth="1"> <!--匹配指定文件--> <IfFileName glob="*.log"> <!--删除超过指定时间的日志--> <IfLastModified age="1d"> <!--<IfAny>--> <!--<IfAccumulatedFileSize exceeds="300 KB" />--> <!--<IfAccumulatedFileCount exceeds="10" />--> <!--</IfAny>--> </IfLastModified> </IfFileName> </Delete> </DefaultRolloverStrategy> </RollingRandomAccessFile> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="Console" /> <AppenderRef ref="RollingRandomAccessFile" /> </Root> </Loggers></Configuration>
三、发kafka消息1、producer配置参数private static final Properties producerProp = new Properties(); static { producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.200:9092"); producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // acks 0 :发送后不需要等待leader broker端响应结果,即回调函数失效 // acks all/-1: broker写入本地日志后,还需要确保所有副本都成功写入日志 // acks 1: broker写入本地日志后,发送响应结果。即以上两种方案的折中方案 producerProp.put(ProducerConfig.ACKS_CONFIG, "1"); // kafka 异步发送的条件 1、达到一定时间,默认为0 2、默认为16384,达到一定数量即16kb producerProp.put(ProducerConfig.LINGER_MS_CONFIG, 50000); producerProp.put(ProducerConfig.BATCH_SIZE_CONFIG, 163840); }
2、同步发送/** * 同步发送 * @param topic * @param msg */ private static void sendMsg(String topic, String msg) { try { KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProp); LOGGER.info("============= start sync send msg ================="); int i = 0; // 同步发送 while (true) { String message = "the msg is sync" + msg + " " + i++; ProducerRecord<Integer, String> record = new ProducerRecord<>(topic, message); producer.send(record); LOGGER.info("the msg seng success {}", message); Thread.sleep(2000); } } catch (Exception e) { e.printStackTrace(); } }
3、异步发送/** * 异步发送 * @param topic * @param msg */ private static void asyncSendMsg(String topic, String msg) { try { KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProp); int i = 0; LOGGER.info("============= start async send msg ================="); while (true){ String message = "the msg is sync" + msg + " " + i++; ProducerRecord<Integer, String> record = new ProducerRecord<>(topic, message); LOGGER.info("send msg success {}", message); // 异步发送 producer.send(record, (recordMetadata, e) -> { if (e == null) { LOGGER.info("send msg success {}, topic is {}", message, recordMetadata.topic()); } else { LOGGER.error("send msg failed", e); } }); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } }
4、测试public static void main(String[] args) { String message = "send my message"; String topicName = "topicDemo"; asyncSendMsg(topicName, message);// sendMsg(topicName, message); }
四、消费kafka消息1、consumer参数配置private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerTest.class); private static final Properties consumerProp = new Properties(); static { consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka200:9092"); consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup"); consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); }
2、消费kafkaprivate static void consumerMsg(String topic) { KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProp); consumer.subscribe(Collections.singleton(topic)); LOGGER.info("================== start consumer msg ======================="); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000)); records.forEach(record -> { LOGGER.info("the msg is {}", record.value()); }); } }
3、测试public static void main(String[] args) { String topicName = "topicDemo"; consumerMsg(topicName); }
五、producer主要参数1、bootstrap.serverskafka broker地址用于连接kafka broker服务器,如果kafka集群机很多,只需要指定一些broker,producer会根据这个参数找到所有的broker。指定多个broker的原因是,如果一个broker挂断,kafka集群重启,可以通过其他broker连接到kafka集群
2、key.serializerkey和value在broker中发送消息必须是字节数组,因此消息必须序列化或自定义序列化器,只需实现serializer接口/p>3、value.serializer
value序列化器
4、acksproducer发送消息有三种情况、发送消息给broker后,直接准备发送下一条消息,性能高,吞吐量大,但有丢失数据的风险broker、将消息发送给broker,等待broken将消息写入本地日志,等待其所有副本也将消息写入本地,然后再给producer响应,吞吐量小,数据c不会丢失、在上述折中方案中,broker在将消息写入本地日志后,立即向producer响应acks,用于控制上述三种情况,对应的值为 0、all/-1、1
使用异步发送时,先将消息发送到缓存区,这是缓冲区的大小。默认为3354432,即32M。参数设置越大,吞吐量越高,因为IO次数会减少
6、retries信息发送失败,自动重试次数
7、batch.size异步发送消息时,同一分区的消息将首先放在batch中。当batch满后,将batch中的所有消息发送。此值不能设置太大或太小。太大会给内存带来很大的压力。太小的吞吐量很小。默认值为16384,即16KB。当batch未填写时,也可以发送消息。有以下参数.ms控制的
8、linger.ms该参数决定了在batch中发送消息的时间。默认值为0,即立即发送,这将减少producer的吞吐量
9、max.request.sizeProducer发送的最大消息大小为1048576,即1M
10、request.timeout.msbroker应在规定的时间内将响应结果返回到producer。如果超过此时间,timeoutexception将在回调函数中抛出异常,默认值为30秒
六、producer分区策略1、默认分区策略a、如果指定了key,有相同key的信息尽可能发送相同的分区,或者是否有指定的key数据也会发送到这个分区。如果只是想在没有其他数据的情况下将某些数据发送到单独的分区,则需要定制分区2、没有指定key,消息通过轮询均匀发送到所有分区
2、定制分区策略a、创建分区类别,实现org.apache.kafka.clients.producer.bpartitioner接口、在构建producer对象的properties中设置partioner.class参数
import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.PartitionInfo;import java.util.List;import java.util.Map;import java.util.Random;public class MyPartitionerDemo implements Partitioner { private Random random; /** * 计算给定msg发送的topic的分区 * @param s topic名称 * @param o key * @param bytes key字节数组 * @param o1 value * @param bytes1 value字节数组 * @param cluster 集群元数据 * @return 分区索引值 */ @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { String key = (String)o; List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(s); int size = partitionInfos.size(); int auditPartitioner = size - 1; return key == null || key.isEmpty() || !key.contains("audit") ? random.nextInt(auditPartitioner) : auditPartitioner; } /** * 关闭partitioner */ @Override public void close() { } /** * 该方法实现了资源的初始化 * @param map */ @Override public void configure(Map<String, ?> map) { random = new Random(); }
发送消息
private static final Properties producerProp = new Properties(); static { producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.10.200:9092"); producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // acks 0 :发送后不需要等待leader broker端响应结果,即回调函数失效 // acks all/-1: broker写入本地日志后,还需要确保所有副本都成功写入日志 // acks 1: broker写入本地日志后,发送响应结果。即以上两种方案的折中方案 producerProp.put(ProducerConfig.ACKS_CONFIG, "1"); // kafka 异步发送的条件 1、达到一定时间,默认为0 2、默认为16384,达到一定数量即16kb// producerProp.put(ProducerConfig.LINGER_MS_CONFIG, 50000);// producerProp.put(ProducerConfig.BATCH_SIZE_CONFIG, 163840); } public static void main(String[] args) { String message = "send my message"; String topicName = "topicDemo"; customPartitioner(topicName);// asyncSendMsg(topicName, message);// sendMsg(topicName, message); }/** * 发送指定的分区 * @param topic */ private static void customPartitioner(String topic) { try { producerProp.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, com.example.kafka.MyPartitionerDemo.class); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProp); LOGGER.info("============= start sync send msg ================="); while (true) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, "没有指定分区的消息"); ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "audit", "audit指定分区的消息"); ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "other","audit指定分区的消息"); producer.send(record); producer.send(record1); producer.send(record2); Thread.sleep(60000); } } catch (Exception e) { e.printStackTrace(); } }
消费数据
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerTest { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerTest.class); private static final Properties consumerProp = new Properties(); static { consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka200:9092"); consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup"); consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); } public static void main(String[] args) { String topicName = "topicDemo"; consumerMsg(topicName); } private static void consumerMsg(String topic) { KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProp); consumer.subscribe(Collections.singleton(topic)); LOGGER.info("================== start consumer msg ======================="); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000)); records.forEach(record -> { LOGGER.info("the msg is {}", record.value()); }); } }
消费结果
默认序列化定制序列化。。。。
在发送消息和回调逻辑之前,可以对消息进行一些定制需求,比如修改消息,允许用户指定多个拦截器来实现拦截器,需要实现Producerinterptor接口
无信息丢失配置异步发送的两个问题a、batch中的消息尚未发送,此时producer崩溃,消息将丢失batch、由于网络抖动同时配置重试机制,会导致消息乱序producer端“无消息丢失配置”:
block.on.buffer.full = trueacks = all or -1retries = Integer.MAX_VALUEmax.in.flight.requests.per.connection = 使用带回调机制的send发送消息Callback逻辑中显式地关闭producerunclean.leader.election.enable = falsereplication.factor = 3min.insync.replicas = 2replication.factor > min.insync.replicasenable.auto.commit = false
producer端配置block.on.buffer.full = 当true内存缓冲区被填满时,producer处于阻塞状态,停止接收新消息,而不是抛出异常,否则,producer的生产速度会耗尽缓冲区acks = 所有的alll副本都写入成功后才能认为提交成功retries = Integer.MAX_VALUE重试次数为Integer最大值max.in.flight.request.per.connection = 为防止topic同分区下的消息乱序问题,如果设置为1,producer将无法在broker发送响应之前向broker发送请求,并使用带有回调机制的send。当消息发送失败时,会收到异常通知。callback逻辑显示,为了处理消息的乱序问题,立即关闭producer。如果不使用close(0),默认情况下,producer将被允许发送未完成的消息。这可能会导致消息混乱。
broker端配置unclean.leader.election.enable = false关闭unclean.leader选举,即非ISR中的副本不允许被选为leader,从而避免broker端因日志水位截断而丢失replication.factor >= 保存分区的消息min有3多个副本.insync.replicas > 用于控制信息至少写入ISR的多少副本才算成功replication.factor > min.insync.replicas如果两者相等,所以只要挂了一个副本,分区就不能正常工作,推荐配置为replication.factor = min.insync.replicas+1
消息压缩减少磁盘占用,增加IO,但消耗CPU时钟周期,支持的压缩算法包括:GZIP,Snappy,LZ4,默认情况下不压缩性能比较:LZ4>Snappy>GZIP
多线程处理多线程单Kafkaproducer实例构建了一个producer,用于多线程。由于producer是线程安全的,这种方法也是线程安全的多线程多Kafkaproducer实例。
bootstra.server:grouppkaka地址.id:消费者组名称key.deserializer:key反序列化value.deserializer:value反序列化
a、properties对象的结构private static final Properties consumerProp = new Properties(); static { consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka200:9092"); consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup"); consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); }
b、构建Kafkaconsumer对象**KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProp);**
c、订阅topic**consumer.subscribe(Collections.singleton(topic));**
d、循环拉取消息,处理consumerRecord对象while (true) { ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1000)); records.forEach(record -> { LOGGER.info("the msg is {}", record.value()); }); }
e、关闭kafkaconsumerfinally { consumer.close();// consumer.close(Duration.ofMillis(5000)); }
1.1、consumer脚本命令kafka@kafka200 bin]$ ./kafka-console-consumer.sh --helpThis tool helps to read data from Kafka topics and outputs it to standard output.Option Description ------ ----------- --bootstrap-server <String: server to REQUIRED: The server(s) to connect to. connect to> --consumer-property <String: A mechanism to pass user-defined consumer_prop> properties in the form key=value to the consumer. --consumer.config <String: config file> Consumer config properties file. Note that [consumer-property] takes precedence over this config. --enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) --formatter <String: class> The name of a class to use for formatting kafka messages for display. (default: kafka.tools. DefaultMessageFormatter) --from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message. --group <String: consumer group id> The consumer group id of the consumer. --help Print usage information. --isolation-level <String> Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted to read all messages. (default: read_uncommitted)--key-deserializer <String: deserializer for key> --max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual. --offset <String: consume offset> The offset id to consume from (a non- negative number), or 'earliest' which means from beginning, or 'latest' which means from end (default: latest) --partition <Integer: partition> The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified. --property <String: prop> The properties to initialize the message formatter. Default properties include: print.timestamp=true|false print.key=true|false print.offset=true|false print.partition=true|false print.headers=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> headers.separator=<line.separator> null.literal=<null.literal> key.deserializer=<key.deserializer> value.deserializer=<value. deserializer> header.deserializer=<header. deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key. deserializer.', 'value. deserializer.' and 'headers. deserializer.' prefixes to configure their deserializers. --skip-message-on-error If there is an error when processing a message, skip it instead of halt. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is available for consumption for the specified interval. --topic <String: topic> The topic id to consume on. --value-deserializer <String: deserializer for values> --version Display Kafka version. --whitelist <String: whitelist> Regular expression specifying whitelist of topics to include for consumption.
1.2、consumer主要参数bootstra.server:grouppkaka地址.id:消费者组名称key.deserializer:key反序列化value.deserializer:value反序列化session.timeout.ms:consumer group检测组成员发送崩溃时间,即当group中的一个consumer崩溃时,消费者组协调员可能需要session.timeout.ms可以找到,所以这个时间不能设置太长的max.poll.interval.ms:onsumer消息处理逻辑的最大时间,该参数默认值为10秒auto.offset.reset:kafka在没有位移信息或位移越界时的应对策略,即consumer要消耗的消息的位移不在当前消息日志的合理范围内,该参数有以下三个值:指从最早的位移开始消费,这里最早的不一定说0,如果是group首次运行,即新的group,设置为该值将从零开始消费。如果已经提交了位移信息,latest将从提交的位移开始消费:指从最新的位移开始消费none:指定如未发现位移信息或位移越界,这抛出了异常enablele.auto.commit:位移fetch是否自动提交.max.bytes:consumer单次获取数据最大字节数max.poll.records:单次调用poll返回的最大消息,默许500heartbeat.interval.ms:通知其他group打开Rebalance需要多长时间?.timeout.msconnection.max.idle.ms:kafka定期关闭空闲socket连接,默认为9分钟。如果用户不在乎这些费用,建议将该值设置为-1
2、topic2.1订阅 订阅topic列表consumer.subscribe(Collections.singleton(topic1,topic2,topic3);
2.2、topicc基于正则表达式订阅consumer.subscribe(Pattern.compile("kafka-.*"));// 当指定的分区方案发生变化时,consumer.subscribe(Pattern.compile("kafka-.*"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { } });// 若设置为自动提交,这个类别不用担心consumer.subscribe(Pattern.compile("kafka-.*"), new NoOpConsumerRebalanceListener());
3、消息轮询poll方法返回数据的两个条件
- 获得足够的数据
- 等待时间超过指定的超时间
data:image/s3,"s3://crabby-images/91f67/91f678141a188b48a27580c763fcad614b85b672" alt=""