Kafka消息幂等性处理详解

消息幂等性是分布式系统中的关键问题。在Kafka中,由于网络异常、重平衡、重试机制等原因,同一条消息可能被消费多次。以下是Java和Ruby中处理幂等性的详细方案。

1. 幂等性的挑战

常见的重复消费场景

网络异常:消费者处理完消息但提交偏移量时网络中断,重启后会重新消费。

重平衡:消费者组发生重平衡时,某些消息可能被多个消费者处理。

重试机制:消息处理失败触发重试时,可能导致重复处理。

时钟回拨:系统时间异常可能导致基于时间戳的去重失效。

2. Java幂等性处理方案

方案一:基于数据库唯一约束

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.sql.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class IdempotentConsumerWithDB {
    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "idempotent-consumer-group";
    
    private Connection dbConnection;
    private PreparedStatement insertMessageStmt;
    private PreparedStatement checkMessageStmt;
    
    public IdempotentConsumerWithDB() throws SQLException {
        initDatabase();
    }
    
    private void initDatabase() throws SQLException {
        // 初始化数据库连接
        dbConnection = DriverManager.getConnection(
            "jdbc:mysql://localhost:3306/kafka_test", "user", "password");
        dbConnection.setAutoCommit(false);
        
        // 创建消息记录表
        String createTableSQL = """
            CREATE TABLE IF NOT EXISTS processed_messages (
                message_id VARCHAR(255) PRIMARY KEY,
                topic VARCHAR(255) NOT NULL,
                partition_num INT NOT NULL,
                offset_num BIGINT NOT NULL,
                processed_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                message_content TEXT,
                INDEX idx_topic_partition_offset (topic, partition_num, offset_num)
            )
        """;
        
        try (Statement stmt = dbConnection.createStatement()) {
            stmt.execute(createTableSQL);
        }
        
        // 预编译SQL语句
        insertMessageStmt = dbConnection.prepareStatement(
            "INSERT INTO processed_messages (message_id, topic, partition_num, offset_num, message_content) VALUES (?, ?, ?, ?, ?)"
        );
        
        checkMessageStmt = dbConnection.prepareStatement(
            "SELECT COUNT(*) FROM processed_messages WHERE message_id = ?"
        );
    }
    
    public void consumeWithDBIdempotency() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // 生成消息唯一ID(可以使用业务ID或组合ID)
                        String messageId = generateMessageId(record);
                        
                        // 检查消息是否已经处理过
                        if (isMessageProcessed(messageId)) {
                            System.out.printf("消息已处理过,跳过 - ID: %s, Offset: %d%n", 
                                messageId, record.offset());
                            continue;
                        }
                        
                        // 开始数据库事务
                        dbConnection.setAutoCommit(false);
                        
                        try {
                            // 处理业务逻辑
                            processBusinessLogic(record);
                            
                            // 记录消息已处理
                            markMessageAsProcessed(messageId, record);
                            
                            // 提交数据库事务
                            dbConnection.commit();
                            
                            System.out.printf("消息处理成功 - ID: %s, Offset: %d%n", 
                                messageId, record.offset());
                            
                        } catch (Exception e) {
                            // 回滚数据库事务
                            dbConnection.rollback();
                            throw e;
                        }
                        
                    } catch (Exception e) {
                        System.err.printf("处理消息失败 - Offset: %d, Error: %s%n", 
                            record.offset(), e.getMessage());
                        // 根据业务需求决定是否跳过此消息
                        continue;
                    }
                }
                
                // 手动提交Kafka偏移量
                consumer.commitSync();
            }
            
        } catch (Exception e) {
            System.err.println("消费过程中出错: " + e.getMessage());
        } finally {
            try {
                consumer.close();
                if (dbConnection != null) {
                    dbConnection.close();
                }
            } catch (SQLException e) {
                System.err.println("关闭数据库连接失败: " + e.getMessage());
            }
        }
    }
    
    private String generateMessageId(ConsumerRecord<String, String> record) {
        // 方案1: 使用业务ID(如果消息中包含)
        if (record.key() != null && !record.key().isEmpty()) {
            return record.key();
        }
        
        // 方案2: 使用topic+partition+offset组合
        return String.format("%s-%d-%d", record.topic(), record.partition(), record.offset());
        
        // 方案3: 使用消息内容的哈希值(如果消息内容唯一)
        // return DigestUtils.md5Hex(record.value());
    }
    
    private boolean isMessageProcessed(String messageId) throws SQLException {
        checkMessageStmt.setString(1, messageId);
        try (ResultSet rs = checkMessageStmt.executeQuery()) {
            if (rs.next()) {
                return rs.getInt(1) > 0;
            }
        }
        return false;
    }
    
    private void markMessageAsProcessed(String messageId, ConsumerRecord<String, String> record) 
            throws SQLException {
        insertMessageStmt.setString(1, messageId);
        insertMessageStmt.setString(2, record.topic());
        insertMessageStmt.setInt(3, record.partition());
        insertMessageStmt.setLong(4, record.offset());
        insertMessageStmt.setString(5, record.value());
        insertMessageStmt.executeUpdate();
    }
    
    private void processBusinessLogic(ConsumerRecord<String, String> record) throws Exception {
        // 模拟业务处理
        System.out.printf("处理业务逻辑 - Value: %s%n", record.value());
        
        // 这里可能包含数据库操作、API调用等
        // 如果这些操作失败,会抛出异常,触发事务回滚
        
        Thread.sleep(100); // 模拟处理时间
    }
}

方案二:基于Redis的分布式锁和去重

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

public class IdempotentConsumerWithRedis {
    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "redis-idempotent-consumer";
    
    private JedisPool jedisPool;
    private static final int LOCK_TIMEOUT = 30000; // 锁超时时间30秒
    private static final int MESSAGE_TTL = 86400; // 消息记录TTL 24小时
    
    public IdempotentConsumerWithRedis() {
        jedisPool = new JedisPool("localhost", 6379);
    }
    
    public void consumeWithRedisIdempotency() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        
        try (Jedis jedis = jedisPool.getResource()) {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    String messageId = generateMessageId(record);
                    String lockKey = "lock:" + messageId;
                    String processedKey = "processed:" + messageId;
                    
                    try {
                        // 获取分布式锁
                        String lockValue = UUID.randomUUID().toString();
                        if (!acquireLock(jedis, lockKey, lockValue, LOCK_TIMEOUT)) {
                            System.out.printf("获取锁失败,跳过消息 - ID: %s%n", messageId);
                            continue;
                        }
                        
                        try {
                            // 检查消息是否已经处理过
                            if (jedis.exists(processedKey)) {
                                System.out.printf("消息已处理过,跳过 - ID: %s%n", messageId);
                                continue;
                            }
                            
                            // 处理业务逻辑
                            processBusinessLogic(record);
                            
                            // 标记消息为已处理
                            jedis.setex(processedKey, MESSAGE_TTL, "1");
                            
                            System.out.printf("消息处理成功 - ID: %s%n", messageId);
                            
                        } finally {
                            // 释放分布式锁
                            releaseLock(jedis, lockKey, lockValue);
                        }
                        
                    } catch (Exception e) {
                        System.err.printf("处理消息失败 - ID: %s, Error: %s%n", 
                            messageId, e.getMessage());
                        continue;
                    }
                }
                
                // 提交偏移量
                consumer.commitSync();
            }
            
        } catch (Exception e) {
            System.err.println("消费过程中出错: " + e.getMessage());
        } finally {
            consumer.close();
            jedisPool.close();
        }
    }
    
    private boolean acquireLock(Jedis jedis, String lockKey, String lockValue, int timeoutMs) {
        SetParams params = SetParams.setParams().nx().px(timeoutMs);
        String result = jedis.set(lockKey, lockValue, params);
        return "OK".equals(result);
    }
    
    private void releaseLock(Jedis jedis, String lockKey, String lockValue) {
        String script = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "    return redis.call('del', KEYS[1]) " +
            "else " +
            "    return 0 " +
            "end";
        jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(lockValue));
    }
    
    private String generateMessageId(ConsumerRecord<String, String> record) {
        return String.format("%s-%d-%d", record.topic(), record.partition(), record.offset());
    }
    
    private void processBusinessLogic(ConsumerRecord<String, String> record) throws Exception {
        System.out.printf("处理业务逻辑 - Value: %s%n", record.value());
        Thread.sleep(100);
    }
}

方案三:基于本地缓存的幂等性

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;

public class IdempotentConsumerWithCache {
    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "cache-idempotent-consumer";
    
    // 本地缓存,存储已处理的消息ID
    private final Cache<String, Boolean> processedMessages;
    
    public IdempotentConsumerWithCache() {
        // 创建本地缓存,设置过期时间和最大大小
        this.processedMessages = CacheBuilder.newBuilder()
                .maximumSize(100000)                    // 最大缓存10万条记录
                .expireAfterWrite(1, TimeUnit.HOURS)    // 1小时后过期
                .concurrencyLevel(16)                   // 并发级别
                .build();
    }
    
    public void consumeWithCacheIdempotency() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    String messageId = generateMessageId(record);
                    
                    // 检查本地缓存中是否已处理过
                    if (processedMessages.getIfPresent(messageId) != null) {
                        System.out.printf("消息已处理过(缓存命中),跳过 - ID: %s%n", messageId);
                        continue;
                    }
                    
                    try {
                        // 处理业务逻辑(需要保证幂等性)
                        boolean success = processBusinessLogicIdempotent(record);
                        
                        if (success) {
                            // 将消息ID添加到缓存
                            processedMessages.put(messageId, true);
                            System.out.printf("消息处理成功 - ID: %s%n", messageId);
                        }
                        
                    } catch (Exception e) {
                        System.err.printf("处理消息失败 - ID: %s, Error: %s%n", 
                            messageId, e.getMessage());
                        continue;
                    }
                }
                
                // 提交偏移量
                consumer.commitSync();
                
                // 定期清理缓存统计信息
                if (System.currentTimeMillis() % 60000 == 0) {
                    printCacheStats();
                }
            }
            
        } catch (Exception e) {
            System.err.println("消费过程中出错: " + e.getMessage());
        } finally {
            consumer.close();
        }
    }
    
    private String generateMessageId(ConsumerRecord<String, String> record) {
        // 使用多种策略生成唯一ID
        if (record.key() != null && !record.key().isEmpty()) {
            return record.key();
        }
        return String.format("%s-%d-%d", record.topic(), record.partition(), record.offset());
    }
    
    private boolean processBusinessLogicIdempotent(ConsumerRecord<String, String> record) throws Exception {
        // 业务逻辑必须是幂等的
        System.out.printf("幂等处理业务逻辑 - Value: %s%n", record.value());
        
        // 示例:更新数据库记录(使用UPSERT或带WHERE条件的UPDATE)
        // 示例:调用幂等的API接口
        // 示例:写入幂等的文件操作
        
        Thread.sleep(100);
        return true;
    }
    
    private void printCacheStats() {
        System.out.printf("缓存统计 - 大小: %d, 命中率: %.2f%%, 驱逐数: %d%n",
                processedMessages.size(),
                processedMessages.stats().hitRate() * 100,
                processedMessages.stats().evictionCount());
    }
}

3. Ruby幂等性处理方案

方案一:基于数据库的幂等性

require 'kafka'
require 'mysql2'
require 'digest'

class IdempotentConsumerWithDB
  TOPIC_NAME = 'test-topic'
  BOOTSTRAP_SERVERS = ['localhost:9092']
  GROUP_ID = 'ruby-idempotent-consumer'
  
  def initialize
    @kafka = Kafka.new(seed_brokers: BOOTSTRAP_SERVERS)
    @db_client = Mysql2::Client.new(
      host: 'localhost',
      username: 'user',
      password: 'password',
      database: 'kafka_test'
    )
    
    setup_database
  end
  
  def setup_database
    # 创建消息处理记录表
    create_table_sql = <<~SQL
      CREATE TABLE IF NOT EXISTS processed_messages (
        message_id VARCHAR(255) PRIMARY KEY,
        topic VARCHAR(255) NOT NULL,
        partition_num INT NOT NULL,
        offset_num BIGINT NOT NULL,
        processed_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        message_content TEXT,
        INDEX idx_topic_partition_offset (topic, partition_num, offset_num)
      )
    SQL
    
    @db_client.query(create_table_sql)
  end
  
  def consume_with_db_idempotency
    consumer = @kafka.consumer(
      group_id: GROUP_ID,
      auto_offset_reset: :earliest
    )
    
    consumer.subscribe(TOPIC_NAME)
    
    begin
      consumer.each_batch(automatically_mark_as_processed: false) do |batch|
        # 在数据库事务中处理整个批次
        @db_client.query('BEGIN')
        
        processed_count = 0
        
        begin
          batch.messages.each do |message|
            message_id = generate_message_id(message)
            
            # 检查消息是否已经处理过
            if message_processed?(message_id)
              puts "消息已处理过,跳过 - ID: #{message_id}"
              next
            end
            
            # 处理业务逻辑
            process_business_logic(message)
            
            # 标记消息为已处理
            mark_message_as_processed(message_id, message)
            
            batch.mark_message_as_processed(message)
            processed_count += 1
            
            puts "消息处理成功 - ID: #{message_id}"
          end
          
          # 提交数据库事务
          @db_client.query('COMMIT')
          
          # 提交Kafka偏移量
          consumer.commit_offsets
          
          puts "批次处理完成,共处理 #{processed_count} 条新消息"
          
        rescue => e
          # 回滚数据库事务
          @db_client.query('ROLLBACK')
          puts "批次处理失败,已回滚: #{e.message}"
          raise e
        end
      end
      
    rescue => e
      puts "消费过程中出错: #{e.message}"
    ensure
      consumer.stop
      @db_client.close
    end
  end
  
  private
  
  def generate_message_id(message)
    # 策略1: 使用消息键
    return message.key if message.key && !message.key.empty?
    
    # 策略2: 使用topic+partition+offset
    "#{message.topic}-#{message.partition}-#{message.offset}"
    
    # 策略3: 使用消息内容哈希
    # Digest::MD5.hexdigest(message.value)
  end
  
  def message_processed?(message_id)
    result = @db_client.query(
      "SELECT COUNT(*) as count FROM processed_messages WHERE message_id = '#{message_id}'"
    )
    result.first['count'] > 0
  end
  
  def mark_message_as_processed(message_id, message)
    insert_sql = <<~SQL
      INSERT INTO processed_messages 
      (message_id, topic, partition_num, offset_num, message_content) 
      VALUES ('#{message_id}', '#{message.topic}', #{message.partition}, 
              #{message.offset}, '#{@db_client.escape(message.value)}')
    SQL
    
    @db_client.query(insert_sql)
  end
  
  def process_business_logic(message)
    puts "处理业务逻辑 - Value: #{message.value}"
    
    # 这里实现具体的业务逻辑
    # 确保业务逻辑本身是幂等的
    
    sleep(0.1) # 模拟处理时间
  end
end

方案二:基于Redis的幂等性

require 'kafka'
require 'redis'
require 'securerandom'

class IdempotentConsumerWithRedis
  TOPIC_NAME = 'test-topic'
  BOOTSTRAP_SERVERS = ['localhost:9092']
  GROUP_ID = 'ruby-redis-idempotent-consumer'
  LOCK_TIMEOUT = 30 # 锁超时时间30秒
  MESSAGE_TTL = 86400 # 消息记录TTL 24小时
  
  def initialize
    @kafka = Kafka.new(seed_brokers: BOOTSTRAP_SERVERS)
    @redis = Redis.new(host: 'localhost', port: 6379)
  end
  
  def consume_with_redis_idempotency
    consumer = @kafka.consumer(
      group_id: GROUP_ID,
      auto_offset_reset: :earliest
    )
    
    consumer.subscribe(TOPIC_NAME)
    
    begin
      consumer.each_message do |message|
        message_id = generate_message_id(message)
        lock_key = "lock:#{message_id}"
        processed_key = "processed:#{message_id}"
        
        # 获取分布式锁
        lock_value = SecureRandom.uuid
        
        if acquire_lock(lock_key, lock_value, LOCK_TIMEOUT)
          begin
            # 检查消息是否已经处理过
            if @redis.exists?(processed_key)
              puts "消息已处理过,跳过 - ID: #{message_id}"
              next
            end
            
            # 处理业务逻辑
            process_business_logic(message)
            
            # 标记消息为已处理
            @redis.setex(processed_key, MESSAGE_TTL, '1')
            
            puts "消息处理成功 - ID: #{message_id}"
            
          rescue => e
            puts "处理消息失败 - ID: #{message_id}, Error: #{e.message}"
            next
          ensure
            # 释放分布式锁
            release_lock(lock_key, lock_value)
          end
        else
          puts "获取锁失败,跳过消息 - ID: #{message_id}"
          next
        end
      end
      
    rescue => e
      puts "消费过程中出错: #{e.message}"
    ensure
      consumer.stop
      @redis.quit
    end
  end
  
  private
  
  def generate_message_id(message)
    return message.key if message.key && !message.key.empty?
    "#{message.topic}-#{message.partition}-#{message.offset}"
  end
  
  def acquire_lock(lock_key, lock_value, timeout_seconds)
    result = @redis.set(lock_key, lock_value, ex: timeout_seconds, nx: true)
    result == 'OK'
  end
  
  def release_lock(lock_key, lock_value)
    # 使用Lua脚本原子性地检查和删除锁
    lua_script = <<~LUA
      if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
      else
        return 0
      end
    LUA
    
    @redis.eval(lua_script, keys: [lock_key], argv: [lock_value])
  end
  
  def process_business_logic(message)
    puts "处理业务逻辑 - Value: #{message.value}"
    sleep(0.1)
  end
end

方案三:基于内存缓存的幂等性

require 'kafka'
require 'lru_redux'
require 'thread'

class IdempotentConsumerWithCache
  TOPIC_NAME = 'test-topic'
  BOOTSTRAP_SERVERS = ['localhost:9092']
  GROUP_ID = 'ruby-cache-idempotent-consumer'
  CACHE_SIZE = 100000
  CACHE_TTL = 3600 # 1小时
  
  def initialize
    @kafka = Kafka.new(seed_brokers: BOOTSTRAP_SERVERS)
    
    # 创建LRU缓存
    @processed_cache = LruRedux::Cache.new(CACHE_SIZE)
    @cache_mutex = Mutex.new
    
    # 启动缓存清理线程
    start_cache_cleanup_thread
  end
  
  def consume_with_cache_idempotency
    consumer = @kafka.consumer(
      group_id: GROUP_ID,
      auto_offset_reset: :earliest
    )
    
    consumer.subscribe(TOPIC_NAME)
    
    begin
      consumer.each_message do |message|
        message_id = generate_message_id(message)
        
        # 检查缓存中是否已处理过
        if message_processed_in_cache?(message_id)
          puts "消息已处理过(缓存命中),跳过 - ID: #{message_id}"
          next
        end
        
        begin
          # 处理业务逻辑(必须是幂等的)
          if process_business_logic_idempotent(message)
            # 将消息ID添加到缓存
            mark_message_as_processed_in_cache(message_id)
            puts "消息处理成功 - ID: #{message_id}"
          end
          
        rescue => e
          puts "处理消息失败 - ID: #{message_id}, Error: #{e.message}"
          next
        end
      end
      
    rescue => e
      puts "消费过程中出错: #{e.message}"
    ensure
      consumer.stop
    end
  end
  
  private
  
  def generate_message_id(message)
    return message.key if message.key && !message.key.empty?
    "#{message.topic}-#{message.partition}-#{message.offset}"
  end
  
  def message_processed_in_cache?(message_id)
    @cache_mutex.synchronize do
      @processed_cache[message_id]
    end
  end
  
  def mark_message_as_processed_in_cache(message_id)
    @cache_mutex.synchronize do
      @processed_cache[message_id] = {
        processed_at: Time.now,
        processed: true
      }
    end
  end
  
  def process_business_logic_idempotent(message)
    puts "幂等处理业务逻辑 - Value: #{message.value}"
    
    # 业务逻辑必须是幂等的
    # 例如:使用UPSERT操作、条件更新、幂等API调用等
    
    sleep(0.1)
    true
  end
  
  def start_cache_cleanup_thread
    Thread.new do
      loop do
        sleep(300) # 每5分钟清理一次
        cleanup_expired_cache_entries
      end
    end
  end
  
  def cleanup_expired_cache_entries
    @cache_mutex.synchronize do
      current_time = Time.now
      expired_keys = []
      
      @processed_cache.each do |key, value|
        if value.is_a?(Hash) && value[:processed_at]
          if current_time - value[:processed_at] > CACHE_TTL
            expired_keys << key
          end
        end
      end
      
      expired_keys.each { |key| @processed_cache.delete(key) }
      
      puts "清理了 #{expired_keys.size} 个过期缓存条目" if expired_keys.size > 0
    end
  end
end

4. 业务层面的幂等性设计

幂等操作设计原则

// Java示例:幂等的业务操作
public class IdempotentBusinessOperations {
    
    // 1. 使用唯一业务ID的插入操作
    public void insertOrderIdempotent(String orderId, Order order) {
        try {
            // 使用INSERT IGNORE或ON DUPLICATE KEY UPDATE
            String sql = "INSERT INTO orders (order_id, customer_id, amount, status) " +
                        "VALUES (?, ?, ?, ?) " +
                        "ON DUPLICATE KEY UPDATE " +
                        "customer_id = VALUES(customer_id), " +
                        "amount = VALUES(amount), " +
                        "status = VALUES(status)";
            
            // 执行SQL...
            
        } catch (DuplicateKeyException e) {
            // 订单已存在,幂等处理
            System.out.println("订单已存在: " + orderId);
        }
    }
    
    // 2. 状态机式的更新操作
    public boolean updateOrderStatusIdempotent(String orderId, String fromStatus, String toStatus) {
        String sql = "UPDATE orders SET status = ?, updated_at = NOW() " +
                    "WHERE order_id = ? AND status = ?";
        
        int affectedRows = jdbcTemplate.update(sql, toStatus, orderId, fromStatus);
        
        if (affectedRows == 0) {
            // 检查是否已经是目标状态
            String currentStatus = getCurrentOrderStatus(orderId);
            return toStatus.equals(currentStatus);
        }
        
        return affectedRows > 0;
    }
    
    // 3. 使用版本号的乐观锁更新
    public boolean updateWithVersionIdempotent(String orderId, Order order, int expectedVersion) {
        String sql = "UPDATE orders SET amount = ?, status = ?, version = version + 1 " +
                    "WHERE order_id = ? AND version = ?";
        
        int affectedRows = jdbcTemplate.update(sql, 
            order.getAmount(), order.getStatus(), orderId, expectedVersion);
        
        return affectedRows > 0;
    }
}

# Ruby示例:幂等的业务操作
class IdempotentBusinessOperations
  def initialize(db_client)
    @db = db_client
  end
  
  # 1. 幂等的订单创建
  def create_order_idempotent(order_id, order_data)
    begin
      sql = <<~SQL
        INSERT INTO orders (order_id, customer_id, amount, status)
        VALUES ('#{order_id}', '#{order_data[:customer_id]}', 
                #{order_data[:amount]}, '#{order_data[:status]}')
        ON DUPLICATE KEY UPDATE
        customer_id = VALUES(customer_id),
        amount = VALUES(amount),
        status = VALUES(status)
      SQL
      
      @db.query(sql)
      puts "订单创建/更新成功: #{order_id}"
      
    rescue Mysql2::Error => e
      if e.message.include?('Duplicate entry')
        puts "订单已存在: #{order_id}"
        return true
      else
        raise e
      end
    end
  end
  
  # 2. 幂等的状态更新
  def update_order_status_idempotent(order_id, from_status, to_status)
    sql = <<~SQL
      UPDATE orders 
      SET status = '#{to_status}', updated_at = NOW()
      WHERE order_id = '#{order_id}' AND status = '#{from_status}'
    SQL
    
    result = @db.query(sql)
    
    if @db.affected_rows == 0
      # 检查是否已经是目标状态
      current_status = get_current_order_status(order_id)
      return current_status == to_status
    end
    
    @db.affected_rows > 0
  end
  
  # 3. 幂等的金额操作
  def add_balance_idempotent(user_id, amount, transaction_id)
    # 先检查交易记录是否已存在
    check_sql = "SELECT id FROM transactions WHERE transaction_id = '#{transaction_id}'"
    existing = @db.query(check_sql)
    
    if existing.count > 0
      puts "交易已处理过: #{transaction_id}"
      return true
    end
    
    # 在事务中执行余额更新和交易记录插入
    @db.query('BEGIN')
    
    begin
      # 更新余额
      update_sql = <<~SQL
        UPDATE users 
        SET balance = balance + #{amount}
        WHERE user_id = '#{user_id}'
      SQL
      @db.query(update_sql)
      
      # 插入交易记录
      insert_sql = <<~SQL
        INSERT INTO transactions (transaction_id, user_id, amount, type)
        VALUES ('#{transaction_id}', '#{user_id}', #{amount}, 'credit')
      SQL
      @db.query(insert_sql)
      
      @db.query('COMMIT')
      puts "余额添加成功: 用户#{user_id}, 金额#{amount}"
      true
      
    rescue => e
      @db.query('ROLLBACK')
      raise e
    end
  end
  
  private
  
  def get_current_order_status(order_id)
    result = @db.query("SELECT status FROM orders WHERE order_id = '#{order_id}'")
    result.first&.fetch('status')
  end
end

5. 最佳实践总结

选择合适的幂等性方案

数据库约束方案:适用于对一致性要求高、消息量不是特别大的场景。优点是可靠性高,缺点是性能相对较低。

Redis分布式锁方案:适用于高并发场景,可以防止重复处理。优点是性能好,缺点是引入了额外的依赖。

本地缓存方案:适用于单机部署或对一致性要求不是特别严格的场景。优点是性能最好,缺点是在分布式环境下可能有限制。

组合使用多种策略

在实际项目中,通常会组合使用多种幂等性策略,比如先检查本地缓存,再检查Redis,最后检查数据库。这样可以在保证一致性的同时获得更好的性能。

监控和告警

建议对幂等性相关的指标进行监控,比如重复消息的比例、处理失败的消息数量等,这有助于及时发现和解决问题。

通过以上这些方案和最佳实践,你可以有效地处理Kafka消息的幂等性问题,确保系统的可靠性和数据一致性。