自学内容网 自学内容网

消息队列设计一个幂等的消费逻辑golang版

如何实现消息幂等

设计幂等的消费逻辑的关键是确保每条消息只被处理一次,即使在网络故障或消费者重启的情况下。通常使用唯一的消息ID和持久化存储来记录处理过的消息ID。

实现步骤

  1. 连接kafka和redis
  2. 检查消息ID
  3. 处理消息
  4. 标记消息已处理
package main

import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-redis/redis/v8"
"log"
"time"
)

// 初始化Redis客户端
var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{
Addr:     "localhost:6379",
Password: "", // no password set
DB:       0,  // use default DB
})

// 计算消息的唯一ID(可以使用消息的内容或其他标识)
func calculateMessageID(message []byte) string {
hash := md5.Sum(message)
return hex.EncodeToString(hash[:])
}

// 检查消息ID是否已处理
func isMessageProcessed(messageID string) bool {
result, err := rdb.Get(ctx, messageID).Result()
if err == redis.Nil {
return false
} else if err != nil {
log.Fatalf("Failed to get message ID from Redis: %v", err)
}
return result == "processed"
}

// 标记消息ID为已处理
func markMessageAsProcessed(messageID string) {
err := rdb.Set(ctx, messageID, "processed", 0).Err()
if err != nil {
log.Fatalf("Failed to set message ID in Redis: %v", err)
}
}

// 处理消息的逻辑
func processMessage(message []byte) {
// 在这里添加具体的消息处理逻辑
fmt.Printf("Processing message: %s\n", string(message))
}
// 初始化Kafka消费者,读取消息,检查消息ID,处理未处理的消息,并将消息ID标记为已处理。
func main() {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id":          "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.Close()

consumer.Subscribe("myTopic", nil)

for {
msg, err := consumer.ReadMessage(-1)
if err == nil {
messageID := calculateMessageID(msg.Value)
if !isMessageProcessed(messageID) {
processMessage(msg.Value)
markMessageAsProcessed(messageID)
} else {
fmt.Printf("Message %s already processed\n", messageID)
}
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
time.Sleep(1 * time.Second) // 可选:添加延迟以防止消息消费过快
}
}


原文地址:https://blog.csdn.net/weixin_43683432/article/details/140081190

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!