消息幂等性是分布式系统中的关键问题。在Kafka中,由于网络异常、重平衡、重试机制等原因,同一条消息可能被消费多次。以下是Java和Ruby中处理幂等性的详细方案。
网络异常:消费者处理完消息但提交偏移量时网络中断,重启后会重新消费。
重平衡:消费者组发生重平衡时,某些消息可能被多个消费者处理。
重试机制:消息处理失败触发重试时,可能导致重复处理。
时钟回拨:系统时间异常可能导致基于时间戳的去重失效。
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.sql.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class IdempotentConsumerWithDB {
private static final String TOPIC_NAME = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "idempotent-consumer-group";
private Connection dbConnection;
private PreparedStatement insertMessageStmt;
private PreparedStatement checkMessageStmt;
public IdempotentConsumerWithDB() throws SQLException {
initDatabase();
}
private void initDatabase() throws SQLException {
// 初始化数据库连接
dbConnection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/kafka_test", "user", "password");
dbConnection.setAutoCommit(false);
// 创建消息记录表
String createTableSQL = """
CREATE TABLE IF NOT EXISTS processed_messages (
message_id VARCHAR(255) PRIMARY KEY,
topic VARCHAR(255) NOT NULL,
partition_num INT NOT NULL,
offset_num BIGINT NOT NULL,
processed_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
message_content TEXT,
INDEX idx_topic_partition_offset (topic, partition_num, offset_num)
)
""";
try (Statement stmt = dbConnection.createStatement()) {
stmt.execute(createTableSQL);
}
// 预编译SQL语句
insertMessageStmt = dbConnection.prepareStatement(
"INSERT INTO processed_messages (message_id, topic, partition_num, offset_num, message_content) VALUES (?, ?, ?, ?, ?)"
);
checkMessageStmt = dbConnection.prepareStatement(
"SELECT COUNT(*) FROM processed_messages WHERE message_id = ?"
);
}
public void consumeWithDBIdempotency() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
// 生成消息唯一ID(可以使用业务ID或组合ID)
String messageId = generateMessageId(record);
// 检查消息是否已经处理过
if (isMessageProcessed(messageId)) {
System.out.printf("消息已处理过,跳过 - ID: %s, Offset: %d%n",
messageId, record.offset());
continue;
}
// 开始数据库事务
dbConnection.setAutoCommit(false);
try {
// 处理业务逻辑
processBusinessLogic(record);
// 记录消息已处理
markMessageAsProcessed(messageId, record);
// 提交数据库事务
dbConnection.commit();
System.out.printf("消息处理成功 - ID: %s, Offset: %d%n",
messageId, record.offset());
} catch (Exception e) {
// 回滚数据库事务
dbConnection.rollback();
throw e;
}
} catch (Exception e) {
System.err.printf("处理消息失败 - Offset: %d, Error: %s%n",
record.offset(), e.getMessage());
// 根据业务需求决定是否跳过此消息
continue;
}
}
// 手动提交Kafka偏移量
consumer.commitSync();
}
} catch (Exception e) {
System.err.println("消费过程中出错: " + e.getMessage());
} finally {
try {
consumer.close();
if (dbConnection != null) {
dbConnection.close();
}
} catch (SQLException e) {
System.err.println("关闭数据库连接失败: " + e.getMessage());
}
}
}
private String generateMessageId(ConsumerRecord<String, String> record) {
// 方案1: 使用业务ID(如果消息中包含)
if (record.key() != null && !record.key().isEmpty()) {
return record.key();
}
// 方案2: 使用topic+partition+offset组合
return String.format("%s-%d-%d", record.topic(), record.partition(), record.offset());
// 方案3: 使用消息内容的哈希值(如果消息内容唯一)
// return DigestUtils.md5Hex(record.value());
}
private boolean isMessageProcessed(String messageId) throws SQLException {
checkMessageStmt.setString(1, messageId);
try (ResultSet rs = checkMessageStmt.executeQuery()) {
if (rs.next()) {
return rs.getInt(1) > 0;
}
}
return false;
}
private void markMessageAsProcessed(String messageId, ConsumerRecord<String, String> record)
throws SQLException {
insertMessageStmt.setString(1, messageId);
insertMessageStmt.setString(2, record.topic());
insertMessageStmt.setInt(3, record.partition());
insertMessageStmt.setLong(4, record.offset());
insertMessageStmt.setString(5, record.value());
insertMessageStmt.executeUpdate();
}
private void processBusinessLogic(ConsumerRecord<String, String> record) throws Exception {
// 模拟业务处理
System.out.printf("处理业务逻辑 - Value: %s%n", record.value());
// 这里可能包含数据库操作、API调用等
// 如果这些操作失败,会抛出异常,触发事务回滚
Thread.sleep(100); // 模拟处理时间
}
}
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class IdempotentConsumerWithRedis {
private static final String TOPIC_NAME = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "redis-idempotent-consumer";
private JedisPool jedisPool;
private static final int LOCK_TIMEOUT = 30000; // 锁超时时间30秒
private static final int MESSAGE_TTL = 86400; // 消息记录TTL 24小时
public IdempotentConsumerWithRedis() {
jedisPool = new JedisPool("localhost", 6379);
}
public void consumeWithRedisIdempotency() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
try (Jedis jedis = jedisPool.getResource()) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
String messageId = generateMessageId(record);
String lockKey = "lock:" + messageId;
String processedKey = "processed:" + messageId;
try {
// 获取分布式锁
String lockValue = UUID.randomUUID().toString();
if (!acquireLock(jedis, lockKey, lockValue, LOCK_TIMEOUT)) {
System.out.printf("获取锁失败,跳过消息 - ID: %s%n", messageId);
continue;
}
try {
// 检查消息是否已经处理过
if (jedis.exists(processedKey)) {
System.out.printf("消息已处理过,跳过 - ID: %s%n", messageId);
continue;
}
// 处理业务逻辑
processBusinessLogic(record);
// 标记消息为已处理
jedis.setex(processedKey, MESSAGE_TTL, "1");
System.out.printf("消息处理成功 - ID: %s%n", messageId);
} finally {
// 释放分布式锁
releaseLock(jedis, lockKey, lockValue);
}
} catch (Exception e) {
System.err.printf("处理消息失败 - ID: %s, Error: %s%n",
messageId, e.getMessage());
continue;
}
}
// 提交偏移量
consumer.commitSync();
}
} catch (Exception e) {
System.err.println("消费过程中出错: " + e.getMessage());
} finally {
consumer.close();
jedisPool.close();
}
}
private boolean acquireLock(Jedis jedis, String lockKey, String lockValue, int timeoutMs) {
SetParams params = SetParams.setParams().nx().px(timeoutMs);
String result = jedis.set(lockKey, lockValue, params);
return "OK".equals(result);
}
private void releaseLock(Jedis jedis, String lockKey, String lockValue) {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(lockValue));
}
private String generateMessageId(ConsumerRecord<String, String> record) {
return String.format("%s-%d-%d", record.topic(), record.partition(), record.offset());
}
private void processBusinessLogic(ConsumerRecord<String, String> record) throws Exception {
System.out.printf("处理业务逻辑 - Value: %s%n", record.value());
Thread.sleep(100);
}
}
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class IdempotentConsumerWithCache {
private static final String TOPIC_NAME = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "cache-idempotent-consumer";
// 本地缓存,存储已处理的消息ID
private final Cache<String, Boolean> processedMessages;
public IdempotentConsumerWithCache() {
// 创建本地缓存,设置过期时间和最大大小
this.processedMessages = CacheBuilder.newBuilder()
.maximumSize(100000) // 最大缓存10万条记录
.expireAfterWrite(1, TimeUnit.HOURS) // 1小时后过期
.concurrencyLevel(16) // 并发级别
.build();
}
public void consumeWithCacheIdempotency() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
String messageId = generateMessageId(record);
// 检查本地缓存中是否已处理过
if (processedMessages.getIfPresent(messageId) != null) {
System.out.printf("消息已处理过(缓存命中),跳过 - ID: %s%n", messageId);
continue;
}
try {
// 处理业务逻辑(需要保证幂等性)
boolean success = processBusinessLogicIdempotent(record);
if (success) {
// 将消息ID添加到缓存
processedMessages.put(messageId, true);
System.out.printf("消息处理成功 - ID: %s%n", messageId);
}
} catch (Exception e) {
System.err.printf("处理消息失败 - ID: %s, Error: %s%n",
messageId, e.getMessage());
continue;
}
}
// 提交偏移量
consumer.commitSync();
// 定期清理缓存统计信息
if (System.currentTimeMillis() % 60000 == 0) {
printCacheStats();
}
}
} catch (Exception e) {
System.err.println("消费过程中出错: " + e.getMessage());
} finally {
consumer.close();
}
}
private String generateMessageId(ConsumerRecord<String, String> record) {
// 使用多种策略生成唯一ID
if (record.key() != null && !record.key().isEmpty()) {
return record.key();
}
return String.format("%s-%d-%d", record.topic(), record.partition(), record.offset());
}
private boolean processBusinessLogicIdempotent(ConsumerRecord<String, String> record) throws Exception {
// 业务逻辑必须是幂等的
System.out.printf("幂等处理业务逻辑 - Value: %s%n", record.value());
// 示例:更新数据库记录(使用UPSERT或带WHERE条件的UPDATE)
// 示例:调用幂等的API接口
// 示例:写入幂等的文件操作
Thread.sleep(100);
return true;
}
private void printCacheStats() {
System.out.printf("缓存统计 - 大小: %d, 命中率: %.2f%%, 驱逐数: %d%n",
processedMessages.size(),
processedMessages.stats().hitRate() * 100,
processedMessages.stats().evictionCount());
}
}
require 'kafka'
require 'mysql2'
require 'digest'
class IdempotentConsumerWithDB
TOPIC_NAME = 'test-topic'
BOOTSTRAP_SERVERS = ['localhost:9092']
GROUP_ID = 'ruby-idempotent-consumer'
def initialize
@kafka = Kafka.new(seed_brokers: BOOTSTRAP_SERVERS)
@db_client = Mysql2::Client.new(
host: 'localhost',
username: 'user',
password: 'password',
database: 'kafka_test'
)
setup_database
end
def setup_database
# 创建消息处理记录表
create_table_sql = <<~SQL
CREATE TABLE IF NOT EXISTS processed_messages (
message_id VARCHAR(255) PRIMARY KEY,
topic VARCHAR(255) NOT NULL,
partition_num INT NOT NULL,
offset_num BIGINT NOT NULL,
processed_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
message_content TEXT,
INDEX idx_topic_partition_offset (topic, partition_num, offset_num)
)
SQL
@db_client.query(create_table_sql)
end
def consume_with_db_idempotency
consumer = @kafka.consumer(
group_id: GROUP_ID,
auto_offset_reset: :earliest
)
consumer.subscribe(TOPIC_NAME)
begin
consumer.each_batch(automatically_mark_as_processed: false) do |batch|
# 在数据库事务中处理整个批次
@db_client.query('BEGIN')
processed_count = 0
begin
batch.messages.each do |message|
message_id = generate_message_id(message)
# 检查消息是否已经处理过
if message_processed?(message_id)
puts "消息已处理过,跳过 - ID: #{message_id}"
next
end
# 处理业务逻辑
process_business_logic(message)
# 标记消息为已处理
mark_message_as_processed(message_id, message)
batch.mark_message_as_processed(message)
processed_count += 1
puts "消息处理成功 - ID: #{message_id}"
end
# 提交数据库事务
@db_client.query('COMMIT')
# 提交Kafka偏移量
consumer.commit_offsets
puts "批次处理完成,共处理 #{processed_count} 条新消息"
rescue => e
# 回滚数据库事务
@db_client.query('ROLLBACK')
puts "批次处理失败,已回滚: #{e.message}"
raise e
end
end
rescue => e
puts "消费过程中出错: #{e.message}"
ensure
consumer.stop
@db_client.close
end
end
private
def generate_message_id(message)
# 策略1: 使用消息键
return message.key if message.key && !message.key.empty?
# 策略2: 使用topic+partition+offset
"#{message.topic}-#{message.partition}-#{message.offset}"
# 策略3: 使用消息内容哈希
# Digest::MD5.hexdigest(message.value)
end
def message_processed?(message_id)
result = @db_client.query(
"SELECT COUNT(*) as count FROM processed_messages WHERE message_id = '#{message_id}'"
)
result.first['count'] > 0
end
def mark_message_as_processed(message_id, message)
insert_sql = <<~SQL
INSERT INTO processed_messages
(message_id, topic, partition_num, offset_num, message_content)
VALUES ('#{message_id}', '#{message.topic}', #{message.partition},
#{message.offset}, '#{@db_client.escape(message.value)}')
SQL
@db_client.query(insert_sql)
end
def process_business_logic(message)
puts "处理业务逻辑 - Value: #{message.value}"
# 这里实现具体的业务逻辑
# 确保业务逻辑本身是幂等的
sleep(0.1) # 模拟处理时间
end
end
require 'kafka'
require 'redis'
require 'securerandom'
class IdempotentConsumerWithRedis
TOPIC_NAME = 'test-topic'
BOOTSTRAP_SERVERS = ['localhost:9092']
GROUP_ID = 'ruby-redis-idempotent-consumer'
LOCK_TIMEOUT = 30 # 锁超时时间30秒
MESSAGE_TTL = 86400 # 消息记录TTL 24小时
def initialize
@kafka = Kafka.new(seed_brokers: BOOTSTRAP_SERVERS)
@redis = Redis.new(host: 'localhost', port: 6379)
end
def consume_with_redis_idempotency
consumer = @kafka.consumer(
group_id: GROUP_ID,
auto_offset_reset: :earliest
)
consumer.subscribe(TOPIC_NAME)
begin
consumer.each_message do |message|
message_id = generate_message_id(message)
lock_key = "lock:#{message_id}"
processed_key = "processed:#{message_id}"
# 获取分布式锁
lock_value = SecureRandom.uuid
if acquire_lock(lock_key, lock_value, LOCK_TIMEOUT)
begin
# 检查消息是否已经处理过
if @redis.exists?(processed_key)
puts "消息已处理过,跳过 - ID: #{message_id}"
next
end
# 处理业务逻辑
process_business_logic(message)
# 标记消息为已处理
@redis.setex(processed_key, MESSAGE_TTL, '1')
puts "消息处理成功 - ID: #{message_id}"
rescue => e
puts "处理消息失败 - ID: #{message_id}, Error: #{e.message}"
next
ensure
# 释放分布式锁
release_lock(lock_key, lock_value)
end
else
puts "获取锁失败,跳过消息 - ID: #{message_id}"
next
end
end
rescue => e
puts "消费过程中出错: #{e.message}"
ensure
consumer.stop
@redis.quit
end
end
private
def generate_message_id(message)
return message.key if message.key && !message.key.empty?
"#{message.topic}-#{message.partition}-#{message.offset}"
end
def acquire_lock(lock_key, lock_value, timeout_seconds)
result = @redis.set(lock_key, lock_value, ex: timeout_seconds, nx: true)
result == 'OK'
end
def release_lock(lock_key, lock_value)
# 使用Lua脚本原子性地检查和删除锁
lua_script = <<~LUA
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
LUA
@redis.eval(lua_script, keys: [lock_key], argv: [lock_value])
end
def process_business_logic(message)
puts "处理业务逻辑 - Value: #{message.value}"
sleep(0.1)
end
end
require 'kafka'
require 'lru_redux'
require 'thread'
class IdempotentConsumerWithCache
TOPIC_NAME = 'test-topic'
BOOTSTRAP_SERVERS = ['localhost:9092']
GROUP_ID = 'ruby-cache-idempotent-consumer'
CACHE_SIZE = 100000
CACHE_TTL = 3600 # 1小时
def initialize
@kafka = Kafka.new(seed_brokers: BOOTSTRAP_SERVERS)
# 创建LRU缓存
@processed_cache = LruRedux::Cache.new(CACHE_SIZE)
@cache_mutex = Mutex.new
# 启动缓存清理线程
start_cache_cleanup_thread
end
def consume_with_cache_idempotency
consumer = @kafka.consumer(
group_id: GROUP_ID,
auto_offset_reset: :earliest
)
consumer.subscribe(TOPIC_NAME)
begin
consumer.each_message do |message|
message_id = generate_message_id(message)
# 检查缓存中是否已处理过
if message_processed_in_cache?(message_id)
puts "消息已处理过(缓存命中),跳过 - ID: #{message_id}"
next
end
begin
# 处理业务逻辑(必须是幂等的)
if process_business_logic_idempotent(message)
# 将消息ID添加到缓存
mark_message_as_processed_in_cache(message_id)
puts "消息处理成功 - ID: #{message_id}"
end
rescue => e
puts "处理消息失败 - ID: #{message_id}, Error: #{e.message}"
next
end
end
rescue => e
puts "消费过程中出错: #{e.message}"
ensure
consumer.stop
end
end
private
def generate_message_id(message)
return message.key if message.key && !message.key.empty?
"#{message.topic}-#{message.partition}-#{message.offset}"
end
def message_processed_in_cache?(message_id)
@cache_mutex.synchronize do
@processed_cache[message_id]
end
end
def mark_message_as_processed_in_cache(message_id)
@cache_mutex.synchronize do
@processed_cache[message_id] = {
processed_at: Time.now,
processed: true
}
end
end
def process_business_logic_idempotent(message)
puts "幂等处理业务逻辑 - Value: #{message.value}"
# 业务逻辑必须是幂等的
# 例如:使用UPSERT操作、条件更新、幂等API调用等
sleep(0.1)
true
end
def start_cache_cleanup_thread
Thread.new do
loop do
sleep(300) # 每5分钟清理一次
cleanup_expired_cache_entries
end
end
end
def cleanup_expired_cache_entries
@cache_mutex.synchronize do
current_time = Time.now
expired_keys = []
@processed_cache.each do |key, value|
if value.is_a?(Hash) && value[:processed_at]
if current_time - value[:processed_at] > CACHE_TTL
expired_keys << key
end
end
end
expired_keys.each { |key| @processed_cache.delete(key) }
puts "清理了 #{expired_keys.size} 个过期缓存条目" if expired_keys.size > 0
end
end
end
// Java示例:幂等的业务操作
public class IdempotentBusinessOperations {
// 1. 使用唯一业务ID的插入操作
public void insertOrderIdempotent(String orderId, Order order) {
try {
// 使用INSERT IGNORE或ON DUPLICATE KEY UPDATE
String sql = "INSERT INTO orders (order_id, customer_id, amount, status) " +
"VALUES (?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE " +
"customer_id = VALUES(customer_id), " +
"amount = VALUES(amount), " +
"status = VALUES(status)";
// 执行SQL...
} catch (DuplicateKeyException e) {
// 订单已存在,幂等处理
System.out.println("订单已存在: " + orderId);
}
}
// 2. 状态机式的更新操作
public boolean updateOrderStatusIdempotent(String orderId, String fromStatus, String toStatus) {
String sql = "UPDATE orders SET status = ?, updated_at = NOW() " +
"WHERE order_id = ? AND status = ?";
int affectedRows = jdbcTemplate.update(sql, toStatus, orderId, fromStatus);
if (affectedRows == 0) {
// 检查是否已经是目标状态
String currentStatus = getCurrentOrderStatus(orderId);
return toStatus.equals(currentStatus);
}
return affectedRows > 0;
}
// 3. 使用版本号的乐观锁更新
public boolean updateWithVersionIdempotent(String orderId, Order order, int expectedVersion) {
String sql = "UPDATE orders SET amount = ?, status = ?, version = version + 1 " +
"WHERE order_id = ? AND version = ?";
int affectedRows = jdbcTemplate.update(sql,
order.getAmount(), order.getStatus(), orderId, expectedVersion);
return affectedRows > 0;
}
}
# Ruby示例:幂等的业务操作
class IdempotentBusinessOperations
def initialize(db_client)
@db = db_client
end
# 1. 幂等的订单创建
def create_order_idempotent(order_id, order_data)
begin
sql = <<~SQL
INSERT INTO orders (order_id, customer_id, amount, status)
VALUES ('#{order_id}', '#{order_data[:customer_id]}',
#{order_data[:amount]}, '#{order_data[:status]}')
ON DUPLICATE KEY UPDATE
customer_id = VALUES(customer_id),
amount = VALUES(amount),
status = VALUES(status)
SQL
@db.query(sql)
puts "订单创建/更新成功: #{order_id}"
rescue Mysql2::Error => e
if e.message.include?('Duplicate entry')
puts "订单已存在: #{order_id}"
return true
else
raise e
end
end
end
# 2. 幂等的状态更新
def update_order_status_idempotent(order_id, from_status, to_status)
sql = <<~SQL
UPDATE orders
SET status = '#{to_status}', updated_at = NOW()
WHERE order_id = '#{order_id}' AND status = '#{from_status}'
SQL
result = @db.query(sql)
if @db.affected_rows == 0
# 检查是否已经是目标状态
current_status = get_current_order_status(order_id)
return current_status == to_status
end
@db.affected_rows > 0
end
# 3. 幂等的金额操作
def add_balance_idempotent(user_id, amount, transaction_id)
# 先检查交易记录是否已存在
check_sql = "SELECT id FROM transactions WHERE transaction_id = '#{transaction_id}'"
existing = @db.query(check_sql)
if existing.count > 0
puts "交易已处理过: #{transaction_id}"
return true
end
# 在事务中执行余额更新和交易记录插入
@db.query('BEGIN')
begin
# 更新余额
update_sql = <<~SQL
UPDATE users
SET balance = balance + #{amount}
WHERE user_id = '#{user_id}'
SQL
@db.query(update_sql)
# 插入交易记录
insert_sql = <<~SQL
INSERT INTO transactions (transaction_id, user_id, amount, type)
VALUES ('#{transaction_id}', '#{user_id}', #{amount}, 'credit')
SQL
@db.query(insert_sql)
@db.query('COMMIT')
puts "余额添加成功: 用户#{user_id}, 金额#{amount}"
true
rescue => e
@db.query('ROLLBACK')
raise e
end
end
private
def get_current_order_status(order_id)
result = @db.query("SELECT status FROM orders WHERE order_id = '#{order_id}'")
result.first&.fetch('status')
end
end
数据库约束方案:适用于对一致性要求高、消息量不是特别大的场景。优点是可靠性高,缺点是性能相对较低。
Redis分布式锁方案:适用于高并发场景,可以防止重复处理。优点是性能好,缺点是引入了额外的依赖。
本地缓存方案:适用于单机部署或对一致性要求不是特别严格的场景。优点是性能最好,缺点是在分布式环境下可能有限制。
在实际项目中,通常会组合使用多种幂等性策略,比如先检查本地缓存,再检查Redis,最后检查数据库。这样可以在保证一致性的同时获得更好的性能。
建议对幂等性相关的指标进行监控,比如重复消息的比例、处理失败的消息数量等,这有助于及时发现和解决问题。
通过以上这些方案和最佳实践,你可以有效地处理Kafka消息的幂等性问题,确保系统的可靠性和数据一致性。