Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它具有高吞吐量、可持久化、可扩展的特点,广泛应用于大数据和微服务架构中。
Topic(主题):消息的分类,类似于数据库中的表。生产者发送消息到特定的Topic,消费者从Topic中读取消息。
Partition(分区):每个Topic可以分为多个Partition,这是Kafka实现并行处理和扩展性的基础。消息在Partition内是有序的,但跨Partition不保证顺序。
Producer(生产者):负责向Kafka Topic发送消息的客户端应用程序。
Consumer(消费者):从Kafka Topic读取消息的客户端应用程序。
Consumer Group(消费者组):多个消费者实例组成的组,同一个组内的消费者会协作消费Topic的消息,每个Partition只会被组内一个消费者消费。
Broker(代理):Kafka集群中的服务器节点,负责存储数据和服务客户端请求。
Offset(偏移量):消息在Partition中的唯一标识,表示消息的位置。
Replication(副本):为了保证高可用性,每个Partition可以有多个副本分布在不同的Broker上。
Kafka采用分布式架构,主要包含以下组件:
ZooKeeper集群:负责协调和管理Kafka集群的元数据,包括Broker注册、Topic配置、分区分配等。在较新版本中,Kafka正在移除对ZooKeeper的依赖。
Broker集群:由多个Broker节点组成,每个Broker负责存储部分数据和处理客户端请求。
生产者客户端:将消息发送到指定Topic的应用程序。
消费者客户端:从Topic读取消息的应用程序。
生产者将消息发送到Topic的特定Partition,Kafka将消息持久化到磁盘。消费者通过订阅Topic来读取消息,可以按照不同的消费模式进行处理。
Kafka支持多种分区策略,包括轮询分区、键值哈希分区、自定义分区器等。合理的分区策略能够提高并行度和负载均衡。
首先需要下载并启动Kafka服务。可以从Apache Kafka官网下载,或使用Docker运行:
# 使用Docker Compose启动Kafka
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# 创建一个名为test-topic的Topic,包含3个分区
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
在Maven项目的pom.xml中添加Kafka客户端依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerExample {
private static final String TOPIC_NAME = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
// 配置生产者属性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 性能优化配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 延迟时间
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩类型
// 可靠性配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 保证消息顺序
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// 发送消息的几种方式
// 1. 异步发送(最常用)
for (int i = 0; i < 100; i++) {
String key = "key-" + i;
String value = "message-" + i + "-" + System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("发送失败: " + exception.getMessage());
} else {
System.out.printf("消息发送成功 - Topic: %s, Partition: %d, Offset: %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
}
// 2. 同步发送
ProducerRecord<String, String> syncRecord = new ProducerRecord<>(TOPIC_NAME, "sync-key", "同步消息");
try {
RecordMetadata metadata = producer.send(syncRecord).get();
System.out.printf("同步发送成功 - Partition: %d, Offset: %d%n",
metadata.partition(), metadata.offset());
} catch (Exception e) {
System.err.println("同步发送失败: " + e.getMessage());
}
// 3. 指定分区发送
ProducerRecord<String, String> partitionRecord = new ProducerRecord<>(TOPIC_NAME, 0, "partition-key", "指定分区消息");
producer.send(partitionRecord);
} finally {
// 确保所有消息都被发送
producer.flush();
producer.close();
}
}
}
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class KafkaConsumerExample {
private static final String TOPIC_NAME = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "test-consumer-group";
public static void main(String[] args) {
// 自动提交模式消费者
autoCommitConsumer();
// 手动提交模式消费者
// manualCommitConsumer();
// 指定分区消费者
// assignPartitionConsumer();
}
// 自动提交模式(最简单)
public static void autoCommitConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 自动提交配置
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
// 消费者行为配置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 每次拉取的最大消息数
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 最小拉取字节数
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最大等待时间
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("接收到消息 - Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
// 处理业务逻辑
processMessage(record);
}
}
} catch (Exception e) {
System.err.println("消费消息时出错: " + e.getMessage());
} finally {
consumer.close();
}
}
// 手动提交模式(更可靠)
public static void manualCommitConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID + "-manual");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 关闭自动提交
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息
processMessage(record);
// 手动提交单个消息的偏移量
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(offsets);
System.out.printf("手动提交成功 - Partition: %d, Offset: %d%n",
record.partition(), record.offset());
} catch (Exception e) {
System.err.println("处理消息失败: " + e.getMessage());
// 可以选择跳过这条消息或进行重试
}
}
}
} finally {
consumer.close();
}
}
// 指定分区消费
public static void assignPartitionConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 指定消费特定分区
TopicPartition partition0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition partition1 = new TopicPartition(TOPIC_NAME, 1);
consumer.assign(Arrays.asList(partition0, partition1));
// 可以指定从特定偏移量开始消费
consumer.seekToBeginning(Arrays.asList(partition0, partition1));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("分区消费 - Partition: %d, Offset: %d, Value: %s%n",
record.partition(), record.offset(), record.value());
}
}
} finally {
consumer.close();
}
}
private static void processMessage(ConsumerRecord<String, String> record) {
// 模拟消息处理
try {
Thread.sleep(100); // 模拟处理时间
System.out.println("处理完成: " + record.value());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
在Gemfile中添加ruby-kafka gem:
gem 'ruby-kafka', '~> 1.5'
然后运行:
bundle install
require 'kafka'
class KafkaProducerExample
TOPIC_NAME = 'test-topic'
BOOTSTRAP_SERVERS = ['localhost:9092']
def initialize
@kafka = Kafka.new(
seed_brokers: BOOTSTRAP_SERVERS,
client_id: 'ruby-producer',
logger: Logger.new(STDOUT)
)
@producer = @kafka.producer(
# 性能配置
max_buffer_size: 10_000, # 缓冲区大小
max_buffer_bytesize: 10_000_000, # 缓冲区字节大小
compression_codec: :snappy, # 压缩算法
# 可靠性配置
required_acks: :all, # 等待所有副本确认
ack_timeout: 10, # 确认超时时间
retry_backoff: 1, # 重试间隔
max_retries: 3, # 最大重试次数
# 批处理配置
delivery_threshold: 100, # 批次消息数量阈值
delivery_interval: 10 # 批次时间间隔(秒)
)
end
def send_messages
begin
# 发送单条消息
@producer.produce(
value: "单条消息: #{Time.now}",
key: "single-key",
topic: TOPIC_NAME,
partition: 0 # 可选:指定分区
)
# 批量发送消息
100.times do |i|
@producer.produce(
value: "批量消息 #{i}: #{Time.now}",
key: "batch-key-#{i}",
topic: TOPIC_NAME,
headers: {
'message-id' => i.to_s,
'timestamp' => Time.now.to_i.to_s
}
)
end
# 立即发送所有缓冲的消息
@producer.deliver_messages
puts "所有消息发送成功"
rescue Kafka::DeliveryFailed => e
puts "消息发送失败: #{e.message}"
# 可以获取失败的消息进行重试
e.failed_messages.each do |message|
puts "失败消息: #{message}"
end
rescue => e
puts "发送过程中出错: #{e.message}"
ensure
@producer.shutdown
end
end
def send_with_custom_partitioner
# 自定义分区器
partitioner = Proc.new do |partition_count, message|
# 根据消息键的哈希值选择分区
message.key.hash % partition_count
end
custom_producer = @kafka.producer(partitioner: partitioner)
begin
10.times do |i|
custom_producer.produce(
value: "自定义分区消息 #{i}",
key: "custom-#{i}",
topic: TOPIC_NAME
)
end
custom_producer.deliver_messages
puts "自定义分区消息发送完成"
ensure
custom_producer.shutdown
end
end
def send_transactional_messages
# 事务性发送(需要Kafka 0.11+)
begin
@producer.transaction do
@producer.produce(
value: "事务消息1",
key: "tx-key-1",
topic: TOPIC_NAME
)
@producer.produce(
value: "事务消息2",
key: "tx-key-2",
topic: TOPIC_NAME
)
# 如果这里抛出异常,整个事务会回滚
# raise "模拟异常"
end
puts "事务提交成功"
rescue => e
puts "事务失败,已回滚: #{e.message}"
end
end
end
# 使用示例
if __FILE__ == $0
producer = KafkaProducerExample.new
producer.send_messages
producer.send_with_custom_partitioner
producer.send_transactional_messages
end
require 'kafka'
class KafkaConsumerExample
TOPIC_NAME = 'test-topic'
BOOTSTRAP_SERVERS = ['localhost:9092']
GROUP_ID = 'ruby-consumer-group'
def initialize
@kafka = Kafka.new(
seed_brokers: BOOTSTRAP_SERVERS,
client_id: 'ruby-consumer',
logger: Logger.new(STDOUT)
)
end
# 自动提交模式消费
def auto_commit_consumer
consumer = @kafka.consumer(
group_id: GROUP_ID,
# 偏移量管理
auto_offset_reset: :earliest, # 从最早的消息开始
offset_commit_interval: 10, # 自动提交间隔(秒)
offset_commit_threshold: 100, # 自动提交阈值
# 拉取配置
max_bytes: 1048576, # 每次拉取最大字节数
min_bytes: 1024, # 每次拉取最小字节数
max_wait_time: 5, # 最大等待时间(秒)
# 会话配置
session_timeout: 30, # 会话超时时间
heartbeat_interval: 10 # 心跳间隔
)
# 订阅主题
consumer.subscribe(TOPIC_NAME)
begin
# 消费消息
consumer.each_message do |message|
puts "自动提交消费 - Topic: #{message.topic}, " \
"Partition: #{message.partition}, " \
"Offset: #{message.offset}, " \
"Key: #{message.key}, " \
"Value: #{message.value}"
# 处理消息头
if message.headers
puts "Headers: #{message.headers}"
end
# 处理业务逻辑
process_message(message)
end
rescue Kafka::ProcessingError => e
puts "处理消息时出错: #{e.message}"
puts "导致错误的消息: #{e.message}"
rescue => e
puts "消费过程中出错: #{e.message}"
ensure
consumer.stop
end
end
# 手动提交模式消费
def manual_commit_consumer
consumer = @kafka.consumer(
group_id: "#{GROUP_ID}-manual",
auto_offset_reset: :earliest
)
consumer.subscribe(TOPIC_NAME)
begin
consumer.each_batch(
automatically_mark_as_processed: false # 关闭自动标记
) do |batch|
batch.messages.each do |message|
begin
puts "手动提交消费 - Partition: #{message.partition}, " \
"Offset: #{message.offset}, Value: #{message.value}"
# 处理消息
process_message(message)
# 手动标记消息为已处理
batch.mark_message_as_processed(message)
rescue => e
puts "处理消息失败: #{e.message}"
# 可以选择跳过或重试
next
end
end
# 提交整个批次
consumer.commit_offsets
puts "批次提交成功,处理了 #{batch.messages.size} 条消息"
end
ensure
consumer.stop
end
end
# 指定分区消费
def partition_consumer
consumer = @kafka.consumer(
group_id: "#{GROUP_ID}-partition"
)
# 指定消费的分区
partitions = [0, 1] # 消费分区0和1
partitions.each do |partition|
consumer.assign(TOPIC_NAME, partition)
# 可以指定从特定偏移量开始
# consumer.seek(TOPIC_NAME, partition, 100) # 从偏移量100开始
end
begin
consumer.each_message do |message|
puts "分区消费 - Partition: #{message.partition}, " \
"Offset: #{message.offset}, Value: #{message.value}"
process_message(message)
end
ensure
consumer.stop
end
end
# 消费者组协调消费
def consumer_group_example
# 创建多个消费者实例
consumers = []
3.times do |i|
consumer = @kafka.consumer(
group_id: "#{GROUP_ID}-coordination",
auto_offset_reset: :earliest
)
consumer.subscribe(TOPIC_NAME)
consumers << consumer
end
# 并行消费
threads = consumers.map.with_index do |consumer, index|
Thread.new do
begin
consumer.each_message do |message|
puts "消费者#{index} - Partition: #{message.partition}, " \
"Offset: #{message.offset}, Value: #{message.value}"
process_message(message)
end
rescue => e
puts "消费者#{index}出错: #{e.message}"
ensure
consumer.stop
end
end
end
# 等待所有线程完成(在实际应用中,你可能需要优雅关闭)
threads.each(&:join)
end
# 带重试机制的消费者
def retry_consumer
consumer = @kafka.consumer(
group_id: "#{GROUP_ID}-retry",
auto_offset_reset: :earliest
)
consumer.subscribe(TOPIC_NAME)
begin
consumer.each_message do |message|
retry_count = 0
max_retries = 3
begin
puts "处理消息 - Offset: #{message.offset}, 重试次数: #{retry_count}"
# 模拟可能失败的处理
if message.value.include?("error") && retry_count < 2
raise "模拟处理失败"
end
process_message(message)
rescue => e
retry_count += 1
if retry_count <= max_retries
puts "处理失败,第#{retry_count}次重试: #{e.message}"
sleep(retry_count) # 指数退避
retry
else
puts "重试#{max_retries}次后仍失败,跳过消息: #{e.message}"
# 可以将失败的消息发送到死信队列
send_to_dead_letter_queue(message, e)
end
end
end
ensure
consumer.stop
end
end
private
def process_message(message)
# 模拟消息处理
sleep(0.1)
puts "消息处理完成: #{message.value}"
end
def send_to_dead_letter_queue(message, error)
# 实现死信队列逻辑
puts "发送到死信队列 - Offset: #{message.offset}, Error: #{error.message}"
# 这里可以发送到专门的错误处理topic
# dead_letter_producer.produce(
# value: message.value,
# key: message.key,
# topic: 'dead-letter-queue',
# headers: {
# 'original-topic' => message.topic,
# 'original-partition' => message.partition.to_s,
# 'original-offset' => message.offset.to_s,
# 'error-message' => error.message
# }
# )
end
end
# 使用示例
if __FILE__ == $0
consumer = KafkaConsumerExample.new
# 选择一种消费模式
consumer.auto_commit_consumer
# consumer.manual_commit_consumer
# consumer.partition_consumer
# consumer.consumer_group_example
# consumer.retry_consumer
end
这是最简单的消费模式,Kafka会定期自动提交偏移量。优点是使用简单,缺点是可能出现消息丢失或重复消费的情况。适用于对数据一致性要求不高的场景。
消费者完全控制偏移量的提交时机,可以在确保消息处理完成后再提交。这种模式提供了更强的数据一致性保证,但需要处理更多的异常情况。
直接指定要消费的分区,不参与消费者组的协调。这种模式适用于需要精确控制消费行为的场景,比如数据迁移、特定分区的数据分析等。
多个消费者实例组成消费者组,Kafka会自动进行分区分配和负载均衡。当消费者实例发生变化时,会触发重平衡操作。
生产者端优化:合理设置批次大小、压缩算法、缓冲区大小等参数。使用异步发送可以提高吞吐量,但需要处理好回调函数中的异常。
消费者端优化:调整拉取参数,如每次拉取的最大消息数、最小字节数、最大等待时间等。合理设置消费者组的大小,通常不应超过Topic的分区数。
分区策略:合理的分区数量可以提高并行度,但过多的分区会增加管理开销。分区键的选择要考虑数据分布的均匀性。
消息持久化:Kafka将消息持久化到磁盘,但需要合理配置副本数量来保证高可用性。生产环境建议至少3个副本。
事务支持:Kafka支持事务操作,可以保证多个操作的原子性。但事务会带来性能开销,需要根据业务需求权衡。
重复消费处理:由于网络异常、重平衡等原因,可能出现消息重复消费。业务逻辑需要保证幂等性,或者通过唯一键去重。
偏移量监控:定期监控消费者的消费进度,防止出现消费滞后。可以通过Kafka自带的工具或第三方监控系统。
集群健康监控:监控Broker的CPU、内存、磁盘使用情况,以及网络延迟、吞吐量等指标。
日志管理:合理配置日志保留策略,平衡存储成本和数据可用性。
连接异常:网络中断、Broker宕机等情况下,客户端会自动重试。需要合理设置重试次数和间隔。
序列化异常:消息序列化/反序列化失败时,需要有相应的错误处理机制。
消费异常:消息处理失败时,可以选择跳过、重试或发送到死信队列。
认证授权:生产环境应该启用SASL认证和ACL授权,限制客户端的访问权限。
加密传输:敏感数据应该使用SSL/TLS加密传输。
网络安全:合理配置防火墙规则,限制不必要的网络访问。
这份指南涵盖了Kafka的核心概念、架构设计、Java和Ruby的完整实现代码,以及生产环境中的重要注意事项。通过这些内容,你应该能够在项目中成功使用Kafka进行消息队列的开发和部署。记住在实际使用中要根据具体的业务需求调整配置参数,并做好充分的测试。