消息队列代码生成器

0 浏览
0 试用
0 购买
Nov 2, 2025更新

本提示词专为自动化生成消息队列生产者和消费者代码模板而设计,通过智能解析消息队列类型、消息格式和配置参数,快速生成可立即使用的代码框架。它能显著提升开发效率,确保代码规范统一,支持多种主流消息队列系统,帮助开发者专注于业务逻辑实现而非基础架构搭建。特别适合微服务架构、分布式系统等需要高效消息通信的场景。

代码概述

本模板基于 Kafka + Go + Protobuf,提供企业级的生产者和消费者代码框架,具备以下特性:

  • 生产者:连接管理、Protobuf 序列化、可配置分区键、Kafka 发送重试(由 kafka-go 处理)、结构化日志、超时控制。
  • 消费者:消费者组消费、Protobuf 反序列化、处理回调、失败重试、DLQ(死信队列)、手动提交偏移量、结构化日志、优雅停机。
  • 安全:可选 TLS/SASL(PLAIN/SCRAM),无硬编码敏感信息。
  • 配置:支持通过结构体/环境变量/配置文件加载,示例提供 YAML。

适用场景:新项目接入 Kafka、现有系统标准化消息代码、从其他 MQ 迁移到 Kafka 时的代码骨架搭建。


依赖配置

  • Go 版本:1.21+
  • 依赖模块:
    • github.com/segmentio/kafka-go v0.4.x
    • google.golang.org/protobuf v1.34.x 及以上
    • google.golang.org/protobuf/cmd/protoc-gen-go v1.34.x 及以上(生成 Protobuf 代码)
  • 可选(SASL):
    • github.com/segmentio/kafka-go/sasl/plain
    • github.com/segmentio/kafka-go/sasl/scram

安装命令:


生产者代码

文件:internal/kafka/producer.go

package mqkafka

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"errors"
	"fmt"
	"log/slog"
	"os"
	"time"

	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl"
	"github.com/segmentio/kafka-go/sasl/plain"
	"github.com/segmentio/kafka-go/sasl/scram"
	"google.golang.org/protobuf/proto"
)

// ProducerConfig 定义生产者配置
type ProducerConfig struct {
	Brokers              []string      // Kafka broker 列表,例如: []string{"kafka-1:9092","kafka-2:9092"}
	Topic                string        // 目标主题
	ClientID             string        // 客户端ID,用于标识生产者
	Compression          kafka.Compression // 压缩算法:kafka.Snappy / kafka.Lz4 / kafka.Gzip / kafka.Zstd / kafka.NoCompression
	RequiredAcks         kafka.RequiredAcks // 建议使用 kafka.RequireAll 保证可靠性
	BatchSize            int           // 批量大小(消息数)
	BatchBytes           int64         // 批量字节大小
	BatchTimeout         time.Duration // 批量发送等待超时
	WriteTimeout         time.Duration // 写超时
	AllowAutoTopicCreate bool          // 是否允许自动创建主题(依赖 broker 配置)
	SchemaVersion        string        // 自定义 schema 版本,写入 headers 用于兼容控制
	DefaultHeaders       map[string]string // 默认消息头(会与每条消息的 headers 合并)

	// 安全配置
	Security SecurityConfig
}

// SecurityConfig 定义 TLS/SASL 配置
type SecurityConfig struct {
	EnableTLS             bool
	CACertPath            string // 可选:CA 证书路径(PEM)
	ClientCertPath        string // 可选:客户端证书路径(PEM)
	ClientKeyPath         string // 可选:客户端私钥路径(PEM)
	InsecureSkipVerify    bool   // 不建议在生产使用
	EnableSASL            bool
	SASLMechanism         string // 仅当 EnableSASL=true 时有效: "PLAIN" | "SCRAM-SHA-256" | "SCRAM-SHA-512"
	SASLUsername          string // 从环境/配置加载,勿硬编码
	SASLPassword          string // 从环境/配置加载,勿硬编码
}

// Producer 生产者实例
type Producer struct {
	w       *kafka.Writer
	topic   string
	log     *slog.Logger
	headers map[string]string
}

// NewProducer 构建生产者实例
func NewProducer(cfg ProducerConfig, logger *slog.Logger) (*Producer, error) {
	if logger == nil {
		logger = slog.Default()
	}
	if len(cfg.Brokers) == 0 {
		return nil, errors.New("missing brokers")
	}
	if cfg.Topic == "" {
		return nil, errors.New("missing topic")
	}
	if cfg.RequiredAcks == 0 {
		cfg.RequiredAcks = kafka.RequireAll
	}
	if cfg.BatchTimeout == 0 {
		cfg.BatchTimeout = 50 * time.Millisecond
	}
	if cfg.WriteTimeout == 0 {
		cfg.WriteTimeout = 5 * time.Second
	}

	dialer := &kafka.Dialer{
		Timeout:   10 * time.Second,
		ClientID:  cfg.ClientID,
		DualStack: true,
	}

	// TLS
	if cfg.Security.EnableTLS {
		tlsCfg, err := buildTLSConfig(cfg.Security)
		if err != nil {
			return nil, fmt.Errorf("build TLS config: %w", err)
		}
		dialer.TLS = tlsCfg
	}

	// SASL
	if cfg.Security.EnableSASL {
		mech, err := buildSASLMechanism(cfg.Security)
		if err != nil {
			return nil, fmt.Errorf("build SASL mechanism: %w", err)
		}
		dialer.SASLMechanism = mech
	}

	w := &kafka.Writer{
		Addr:                   kafka.TCP(cfg.Brokers...),
		Topic:                  cfg.Topic,
		Balancer:               &kafka.Hash{}, // 使用 key 做哈希选择分区,保证同 key 顺序
		Compression:            cfg.Compression,
		RequiredAcks:           cfg.RequiredAcks,
		BatchSize:              cfg.BatchSize,
		BatchBytes:             cfg.BatchBytes,
		BatchTimeout:           cfg.BatchTimeout,
		WriteTimeout:           cfg.WriteTimeout,
		AllowAutoTopicCreation: cfg.AllowAutoTopicCreate,
		Transport: &kafka.Transport{
			Dial: dialer.DialFunc,
			TLS:  dialer.TLS, // 由 Dialer 统一处理
		},
		// Kafka-go 会在内部对暂时性错误进行重试(例如网络抖动、临时不可用),无需手动实现额外重试。
	}

	defaultHeaders := map[string]string{
		"content-type":  "application/x-protobuf",
		"sdk-language":  "go",
		"sdk-component": "producer",
	}
	if cfg.SchemaVersion != "" {
		defaultHeaders["schema-version"] = cfg.SchemaVersion
	}
	// 合并用户默认 headers
	for k, v := range cfg.DefaultHeaders {
		defaultHeaders[k] = v
	}

	return &Producer{
		w:       w,
		topic:   cfg.Topic,
		log:     logger.With("component", "kafka_producer", "topic", cfg.Topic),
		headers: defaultHeaders,
	}, nil
}

// Close 关闭生产者
func (p *Producer) Close() error {
	return p.w.Close()
}

// Send 发送 protobuf 消息
// 参数:
//   - ctx: 外部上下文(建议带有超时)
//   - key: 分区键(相同 key 的消息进入同一分区),可为 nil
//   - msg: 任意 protobuf 消息
//   - headers: 每条消息的附加 headers(会与默认 headers 合并)
func (p *Producer) Send(ctx context.Context, key []byte, msg proto.Message, headers map[string]string) error {
	if msg == nil {
		return errors.New("nil protobuf message")
	}
	// 建议上层设置超时;若未设置,这里设置一个保守的默认超时
	if _, ok := ctx.Deadline(); !ok {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, 10*time.Second)
		defer cancel()
	}

	payload, err := proto.Marshal(msg)
	if err != nil {
		return fmt.Errorf("protobuf marshal: %w", err)
	}

	allHeaders := mergeHeaders(p.headers, headers)

	kmsg := kafka.Message{
		Key:       key,
		Value:     payload,
		Time:      time.Now(),
		Headers:   toKafkaHeaders(allHeaders),
		Topic:     p.topic, // 可选,writer 已有 Topic
	}

	if err := p.w.WriteMessages(ctx, kmsg); err != nil {
		p.log.Error("write message failed", "err", err)
		return err
	}

	p.log.Debug("message sent", "bytes", len(payload))
	return nil
}

func mergeHeaders(base map[string]string, extra map[string]string) map[string]string {
	out := make(map[string]string, len(base)+len(extra))
	for k, v := range base {
		out[k] = v
	}
	for k, v := range extra {
		out[k] = v
	}
	return out
}

func toKafkaHeaders(h map[string]string) []kafka.Header {
	if len(h) == 0 {
		return nil
	}
	out := make([]kafka.Header, 0, len(h))
	for k, v := range h {
		out = append(out, kafka.Header{Key: k, Value: []byte(v)})
	}
	return out
}

func buildTLSConfig(sec SecurityConfig) (*tls.Config, error) {
	tlsCfg := &tls.Config{
		InsecureSkipVerify: sec.InsecureSkipVerify, // 不建议在生产使用
		MinVersion:         tls.VersionTLS12,
	}
	// CA
	if sec.CACertPath != "" {
		caPEM, err := os.ReadFile(sec.CACertPath)
		if err != nil {
			return nil, fmt.Errorf("read CA cert: %w", err)
		}
		cp := x509.NewCertPool()
		if !cp.AppendCertsFromPEM(caPEM) {
			return nil, errors.New("append CA cert failed")
		}
		tlsCfg.RootCAs = cp
	}
	// 客户端证书
	if sec.ClientCertPath != "" && sec.ClientKeyPath != "" {
		cert, err := tls.LoadX509KeyPair(sec.ClientCertPath, sec.ClientKeyPath)
		if err != nil {
			return nil, fmt.Errorf("load client cert/key: %w", err)
		}
		tlsCfg.Certificates = []tls.Certificate{cert}
	}
	return tlsCfg, nil
}

func buildSASLMechanism(sec SecurityConfig) (sasl.Mechanism, error) {
	user := sec.SASLUsername
	pass := sec.SASLPassword
	if user == "" || pass == "" {
		return nil, errors.New("missing SASL credentials")
	}
	switch sec.SASLMechanism {
	case "PLAIN":
		return plain.Mechanism{Username: user, Password: pass}, nil
	case "SCRAM-SHA-256":
		mech, err := scram.Mechanism(scram.SHA256, user, pass)
		if err != nil {
			return nil, err
		}
		return mech, nil
	case "SCRAM-SHA-512":
		mech, err := scram.Mechanism(scram.SHA512, user, pass)
		if err != nil {
			return nil, err
		}
		return mech, nil
	default:
		return nil, fmt.Errorf("unsupported SASL mechanism: %s", sec.SASLMechanism)
	}
}

消费者代码

文件:internal/kafka/consumer.go

package mqkafka

import (
	"context"
	"errors"
	"fmt"
	"log/slog"
	"strconv"
	"time"

	"github.com/segmentio/kafka-go"
	"google.golang.org/protobuf/proto"
)

// Handler 是用户提供的处理回调,业务逻辑应实现此函数。
// 返回 error 表示处理失败,框架将按重试策略处理。
type Handler func(ctx context.Context, key []byte, msg proto.Message, headers map[string]string) error

// NewMessage 是一个工厂方法,用于创建一个空的 protobuf 消息实例,用于反序列化。
// 例如:func() proto.Message { return &pb.Event{} }
type NewMessage func() proto.Message

// ConsumerConfig 定义消费者配置
type ConsumerConfig struct {
	Brokers      []string
	Topic        string
	GroupID      string
	ClientID     string
	MinBytes     int            // 默认 1
	MaxBytes     int            // 默认 10MB
	MaxWait      time.Duration  // 取消息最大等待
	ReadTimeout  time.Duration  // 读超时
	CommitInterval time.Duration // 自动提交间隔;使用手动提交时可设为较长

	// 偏移控制(仅在首次无已提交偏移时生效)
	StartFromEarliest bool // true: earliest, false: latest

	// 处理与重试
	OperationTimeout   time.Duration // 单条消息处理的超时
	MaxRetryAttempts   int           // 处理失败最大重试次数(不含首次处理),例如 3
	RetryBackoff       time.Duration // 重试回退时间,默认 500ms

	// DLQ
	EnableDLQ          bool
	DLQTopic           string

	// 安全配置
	Security SecurityConfig
}

// Consumer 消费者实例
type Consumer struct {
	r         *kafka.Reader
	dlqWriter *kafka.Writer
	log       *slog.Logger

	handler   Handler
	newMsg    NewMessage

	cfg ConsumerConfig
}

// NewConsumer 构建消费者
func NewConsumer(cfg ConsumerConfig, handler Handler, newMsg NewMessage, logger *slog.Logger) (*Consumer, error) {
	if logger == nil {
		logger = slog.Default()
	}
	if len(cfg.Brokers) == 0 {
		return nil, errors.New("missing brokers")
	}
	if cfg.Topic == "" {
		return nil, errors.New("missing topic")
	}
	if cfg.GroupID == "" {
		return nil, errors.New("missing group id")
	}
	if handler == nil {
		return nil, errors.New("missing handler")
	}
	if newMsg == nil {
		return nil, errors.New("missing message factory")
	}

	dialer, transport, err := buildDialerTransport(cfg.Security, cfg.ClientID)
	if err != nil {
		return nil, fmt.Errorf("build dialer: %w", err)
	}

	if cfg.MinBytes == 0 {
		cfg.MinBytes = 1
	}
	if cfg.MaxBytes == 0 {
		cfg.MaxBytes = 10 << 20 // 10MB
	}
	if cfg.MaxWait == 0 {
		cfg.MaxWait = 250 * time.Millisecond
	}
	if cfg.ReadTimeout == 0 {
		cfg.ReadTimeout = 10 * time.Second
	}
	if cfg.CommitInterval == 0 {
		cfg.CommitInterval = time.Second // 实际使用中通常手动提交;该值保守设置
	}
	if cfg.OperationTimeout == 0 {
		cfg.OperationTimeout = 30 * time.Second
	}
	if cfg.RetryBackoff == 0 {
		cfg.RetryBackoff = 500 * time.Millisecond
	}
	if cfg.MaxRetryAttempts < 0 {
		cfg.MaxRetryAttempts = 0
	}

	readerCfg := kafka.ReaderConfig{
		Brokers:        cfg.Brokers,
		GroupID:        cfg.GroupID,
		Topic:          cfg.Topic,
		MinBytes:       cfg.MinBytes,
		MaxBytes:       cfg.MaxBytes,
		MaxWait:        cfg.MaxWait,
		ReadLagInterval: -1, // 关闭默认延迟日志,避免噪声
		CommitInterval: cfg.CommitInterval,
		StartOffset: func() int64 {
			if cfg.StartFromEarliest {
				return kafka.FirstOffset
			}
			return kafka.LastOffset
		}(),
		Dialer: &kafka.Dialer{
			Timeout:       dialer.Timeout,
			DualStack:     dialer.DualStack,
			ClientID:      cfg.ClientID,
			TLS:           dialer.TLS,
			SASLMechanism: dialer.SASLMechanism,
		},
		RetentionTime: 0, // 使用 broker 默认
	}
	r := kafka.NewReader(readerCfg)
	r.SetOffsetAt(context.Background(), time.Now()) // 不会改变已有提交偏移,仅保证 reader 完整初始化。可移除。

	var dlqWriter *kafka.Writer
	if cfg.EnableDLQ {
		if cfg.DLQTopic == "" {
			return nil, errors.New("dlq enabled but DLQTopic empty")
		}
		dlqWriter = &kafka.Writer{
			Addr:       kafka.TCP(cfg.Brokers...),
			Topic:      cfg.DLQTopic,
			Balancer:   &kafka.Hash{},
			RequiredAcks: kafka.RequireAll,
			Transport:  transport,
			WriteTimeout: 10 * time.Second,
		}
	}

	return &Consumer{
		r:         r,
		dlqWriter: dlqWriter,
		log:       logger.With("component", "kafka_consumer", "topic", cfg.Topic, "group", cfg.GroupID),
		handler:   handler,
		newMsg:    newMsg,
		cfg:       cfg,
	}, nil
}

// Run 启动消费循环(阻塞)。使用 ctx 控制优雅停机。
func (c *Consumer) Run(ctx context.Context) error {
	c.log.Info("consumer started")
	defer c.log.Info("consumer stopped")

	for {
		select {
		case <-ctx.Done():
			return nil
		default:
		}

		// 读取消息(阻塞直到有消息或 ctx/reader 超时)
		m, err := c.r.FetchMessage(ctx)
		if err != nil {
			// 可能是上下文取消或超时,或网络错误
			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
				c.log.Warn("fetch context error", "err", err)
				continue
			}
			c.log.Error("fetch message failed", "err", err)
			continue
		}

		// 解析 headers
		hdrs := fromKafkaHeaders(m.Headers)

		// 反序列化 Protobuf
		pm := c.newMsg()
		if err := proto.Unmarshal(m.Value, pm); err != nil {
			c.log.Error("protobuf unmarshal failed; sending to DLQ (if enabled)", "err", err, "partition", m.Partition, "offset", m.Offset)
			// 不可反序列化的消息:直接 DLQ;若 DLQ 不可用则不提交以便后续人工干预
			if c.cfg.EnableDLQ {
				_ = c.sendToDLQ(ctx, m, hdrs, err, -1) // -1 表示由于解析失败
				if commitErr := c.r.CommitMessages(ctx, m); commitErr != nil {
					c.log.Error("commit after DLQ failed", "err", commitErr)
				}
				continue
			}
			// 无 DLQ:不提交,让消息保留
			continue
		}

		// 处理回调,带超时
		processCtx, cancel := context.WithTimeout(ctx, c.cfg.OperationTimeout)
		err = c.callWithRetry(processCtx, m, pm, hdrs)
		cancel()

		if err != nil {
			// 超过重试,发送 DLQ 并提交偏移避免毒丸阻塞
			if c.cfg.EnableDLQ {
				if dlqErr := c.sendToDLQ(ctx, m, hdrs, err, c.cfg.MaxRetryAttempts); dlqErr != nil {
					// DLQ 发送失败:不提交偏移,以便稍后重试
					c.log.Error("send to DLQ failed; will not commit offset", "err", dlqErr, "partition", m.Partition, "offset", m.Offset)
					continue
				}
				// DLQ 成功 -> 提交偏移
				if commitErr := c.r.CommitMessages(ctx, m); commitErr != nil {
					c.log.Error("commit after DLQ failed", "err", commitErr)
				}
				continue
			}
			// 无 DLQ:不提交,让消息保留以便运维处理
			c.log.Error("message permanently failed without DLQ; offset not committed", "err", err, "partition", m.Partition, "offset", m.Offset)
			continue
		}

		// 成功处理 -> 提交偏移
		if err := c.r.CommitMessages(ctx, m); err != nil {
			c.log.Error("commit message failed", "err", err, "partition", m.Partition, "offset", m.Offset)
			continue
		}
		c.log.Debug("message processed & committed", "partition", m.Partition, "offset", m.Offset)
	}
}

// Close 关闭消费者(Reader/DLQ Writer)
func (c *Consumer) Close() error {
	var err1, err2 error
	if c.r != nil {
		err1 = c.r.Close()
	}
	if c.dlqWriter != nil {
		err2 = c.dlqWriter.Close()
	}
	if err1 != nil {
		return err1
	}
	return err2
}

func (c *Consumer) callWithRetry(ctx context.Context, m kafka.Message, pm proto.Message, headers map[string]string) error {
	attempts := 0
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		err := c.handler(ctx, m.Key, pm, headers)
		if err == nil {
			return nil
		}
		attempts++
		c.log.Warn("handler failed", "attempt", attempts, "err", err)

		if attempts > c.cfg.MaxRetryAttempts {
			return fmt.Errorf("max attempts reached: %w", err)
		}
		// 回退等待
		select {
		case <-time.After(c.cfg.RetryBackoff):
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

func (c *Consumer) sendToDLQ(ctx context.Context, m kafka.Message, headers map[string]string, rootErr error, attempts int) error {
	if c.dlqWriter == nil {
		return errors.New("dlq writer not configured")
	}
	h := mergeHeaders(headers, map[string]string{
		"x-error":       truncateErr(rootErr, 2048),
		"x-retries":     strconv.Itoa(attempts),
		"content-type":  "application/x-protobuf",
		"sdk-component": "dlq",
	})
	dlqMsg := kafka.Message{
		Key:     m.Key,
		Value:   m.Value, // 保留原始 payload
		Time:    time.Now(),
		Headers: toKafkaHeaders(h),
	}
	if _, ok := ctx.Deadline(); !ok {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, 10*time.Second)
		defer cancel()
	}
	if err := c.dlqWriter.WriteMessages(ctx, dlqMsg); err != nil {
		c.log.Error("write dlq message failed", "err", err)
		return err
	}
	c.log.Error("message sent to DLQ", "partition", m.Partition, "offset", m.Offset)
	return nil
}

func fromKafkaHeaders(hdrs []kafka.Header) map[string]string {
	if len(hdrs) == 0 {
		return map[string]string{}
	}
	m := make(map[string]string, len(hdrs))
	for _, h := range hdrs {
		m[h.Key] = string(h.Value)
	}
	return m
}

func truncateErr(err error, n int) string {
	if err == nil {
		return ""
	}
	s := err.Error()
	if len(s) > n {
		return s[:n]
	}
	return s
}

func buildDialerTransport(sec SecurityConfig, clientID string) (*kafka.Dialer, *kafka.Transport, error) {
	dialer := &kafka.Dialer{
		Timeout:   10 * time.Second,
		ClientID:  clientID,
		DualStack: true,
	}
	if sec.EnableTLS {
		tlsCfg, err := buildTLSConfig(sec)
		if err != nil {
			return nil, nil, err
		}
		dialer.TLS = tlsCfg
	}
	if sec.EnableSASL {
		mech, err := buildSASLMechanism(sec)
		if err != nil {
			return nil, nil, err
		}
		dialer.SASLMechanism = mech
	}
	transport := &kafka.Transport{
		Dial: dialer.DialFunc,
		TLS:  dialer.TLS,
	}
	return dialer, transport, nil
}

使用示例(集成业务逻辑):

// 假设已生成 pb 包:package pb; message Event { string id = 1; string type = 2; bytes payload = 3; int64 ts_unix_ms = 4; }
// 生产者发送:
// prod, _ := mqkafka.NewProducer(prodCfg, slog.Default())
// defer prod.Close()
// evt := &pb.Event{Id: "k1", Type: "created", Payload: []byte("..."), TsUnixMs: time.Now().UnixMilli()}
// _ = prod.Send(ctx, []byte(evt.Id), evt, map[string]string{"x-source":"svc-a"})

// 消费者处理:
// handler := func(ctx context.Context, key []byte, msg proto.Message, headers map[string]string) error {
//     evt := msg.(*pb.Event)
//     // TODO: 业务处理逻辑
//     return nil
// }
// cons, _ := mqkafka.NewConsumer(consCfg, handler, func() proto.Message { return &pb.Event{} }, slog.Default())
// defer cons.Close()
// _ = cons.Run(ctx)

配置示例

  1. YAML 示例(config/kafka.yaml)
producer:
  brokers: ["kafka-1:9092","kafka-2:9092"]
  topic: "events"
  clientID: "svc-a-producer"
  compression: "snappy"        # 可选: none|snappy|lz4|gzip|zstd
  requiredAcks: "all"          # 可选: none|leader|all
  batchSize: 100
  batchBytes: 1048576
  batchTimeout: "50ms"
  writeTimeout: "5s"
  allowAutoTopicCreate: false
  schemaVersion: "v1"
  defaultHeaders:
    x-env: "prod"
  security:
    enableTLS: true
    caCertPath: "/etc/ssl/certs/ca.pem"
    clientCertPath: "/etc/ssl/certs/client.pem"
    clientKeyPath: "/etc/ssl/private/client.key"
    insecureSkipVerify: false
    enableSASL: true
    saslMechanism: "SCRAM-SHA-512"
    saslUsername: "${KAFKA_USERNAME}"   # 建议从环境变量加载
    saslPassword: "${KAFKA_PASSWORD}"

consumer:
  brokers: ["kafka-1:9092","kafka-2:9092"]
  topic: "events"
  groupID: "svc-b-consumer"
  clientID: "svc-b-consumer"
  minBytes: 1
  maxBytes: 10485760
  maxWait: "250ms"
  readTimeout: "10s"
  commitInterval: "1s"
  startFromEarliest: true
  operationTimeout: "30s"
  maxRetryAttempts: 3
  retryBackoff: "500ms"
  enableDLQ: true
  dlqTopic: "events.DLQ"
  security:
    enableTLS: true
    caCertPath: "/etc/ssl/certs/ca.pem"
    clientCertPath: "/etc/ssl/certs/client.pem"
    clientKeyPath: "/etc/ssl/private/client.key"
    insecureSkipVerify: false
    enableSASL: true
    saslMechanism: "SCRAM-SHA-512"
    saslUsername: "${KAFKA_USERNAME}"
    saslPassword: "${KAFKA_PASSWORD}"
  1. 环境变量示例(仅示意)
  • KAFKA_BROKERS=kafka-1:9092,kafka-2:9092
  • KAFKA_TOPIC=events
  • KAFKA_GROUP_ID=svc-b-consumer
  • KAFKA_USERNAME=your-username
  • KAFKA_PASSWORD=your-password

Protobuf 消息定义示例

文件:proto/event.proto

syntax = "proto3";

package example.events;
option go_package = "your.module/path/internal/pb;pb";

message Event {
  string id = 1;           // 用于作为 Kafka 分区 key 的候选值
  string type = 2;         // 事件类型
  bytes payload = 3;       // 业务载荷(二进制)
  int64 ts_unix_ms = 4;    // 事件时间戳(毫秒)
}

生成命令(在项目根目录):

  • protoc --go_out=. proto/event.proto

生成后在 Go 代码中引用:

  • import pb "your.module/path/internal/pb"

使用说明

  • 关于可靠性语义:
    • 生产者默认 acks=all,配合 Kafka ISR 提供较高可靠性。
    • 消费者采用“处理成功后手动提交”策略,保证至少一次投递(at-least-once)。如需避免重复处理,请在业务侧实现幂等。
  • 关于并发与吞吐:
    • 该模板按顺序消费,优先确保正确性。建议通过增加消费者实例数量(同 GroupID)横向扩展分区级并发。
    • 如需在单进程内并发处理,请基于分区维度实现有序并发队列,并仅在分区内按顺序提交偏移,避免乱序导致的可见性问题。
  • 关于 DLQ:
    • 当超出最大重试次数或消息不可反序列化时,消费者将消息原样投递到 DLQ,并带上错误和重试次数头信息。DLQ 发送成功后提交偏移,避免毒丸消息阻塞主流程。
  • 安全注意事项:
    • 不要在代码中硬编码任何凭据(用户名、密码、证书)。请使用环境变量或密钥管理系统。
    • 避免设置 InsecureSkipVerify=true;如必须使用,请仅限非生产环境并确保风险可控。
  • 监控与可观测性:
    • 建议结合 slog 的 Handler 输出至日志系统(如 Loki/ELK),并在关键处添加指标(Prometheus)以跟踪生产/消费速率、DLQ 数量、延迟等。
  • 典型集成步骤:
    1. 定义并生成 Protobuf 消息(proto/event.proto -> pb)。
    2. 初始化 Producer/Consumer 配置(可以从 YAML/环境变量加载)。
    3. 生产者:构造 pb.Event,选择合适 key(如 event.id),调用 Producer.Send。
    4. 消费者:实现 Handler,将 proto.Message 断言为 *pb.Event,处理完成后返回 nil。
    5. 在应用启动/退出时,确保调用 Close 进行资源清理。
  • 版本与兼容:
    • 模板基于 kafka-go v0.4.x API,如后续版本有破坏性变更,请参考官方文档进行调整。

如需进一步定制(例如:自定义分区策略、批处理发送/处理、Schema Registry 集成、压测与参数调优),请告知具体需求。

代码概述

本模板基于 RabbitMQ 的 Java 客户端生成生产者与消费者代码,采用 JSON 消息格式,适用于微服务和分布式系统中的异步通信场景。模板包含:

  • 标准化的连接与拓扑管理(Exchange/Queue/DLQ)
  • 生产者采用发布确认(Publisher Confirms),保证消息可靠投递
  • 消费者采用手动确认(Manual ACK),支持预取(Prefetch)和并发消费
  • 使用 Jackson 进行 JSON 序列化/反序列化
  • 完整的错误处理和日志记录机制
  • 配置通过环境变量或 application.properties,不包含敏感信息硬编码

依赖配置

示例使用 Maven:

<dependencies>
  <!-- RabbitMQ Java Client -->
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
  </dependency>

  <!-- Jackson for JSON -->
  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.17.1</version>
  </dependency>

  <!-- SLF4J API -->
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>2.0.13</version>
  </dependency>

  <!-- Logback (Runtime) -->
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.4.14</version>
  </dependency>
</dependencies>

生产者代码

文件:src/main/java/com/example/messaging/rabbitmq/MessageProducer.java

package com.example.messaging.rabbitmq;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * RabbitMQ JSON 消息生产者模板
 * - 使用发布确认确保可靠投递
 * - 设置消息属性(contentType、deliveryMode、messageId、timestamp 等)
 * - 无敏感信息硬编码,配置通过环境变量或 application.properties
 *
 * 使用方式:
 * MessageProducer producer = new MessageProducer(config);
 * producer.initTopology(); // 可选:初始化交换机/队列/绑定(幂等)
 * producer.publish("order.created", new MessageEnvelope(...));
 * producer.close();
 */
public class MessageProducer implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(MessageProducer.class);

    private final RabbitConfig config;
    private final ObjectMapper objectMapper;

    private Connection connection;
    private Channel channel;

    public MessageProducer(RabbitConfig config) {
        this.config = config;
        this.objectMapper = JsonUtil.objectMapper();
    }

    /**
     * 初始化连接与通道,并启用发布确认
     */
    public void start() throws IOException, TimeoutException {
        if (this.connection != null && this.connection.isOpen()) {
            return;
        }
        ConnectionFactory factory = RabbitConfig.createConnectionFactory(config);
        // 建立连接与通道
        this.connection = factory.newConnection(config.getClientName());
        this.channel = connection.createChannel();
        // 启用发布确认
        this.channel.confirmSelect();

        log.info("Producer started: vhost={}, host={}, exchange={}",
                config.getVhost(), config.getHost(), config.getExchangeName());
    }

    /**
     * 并发安全性说明:
     * - Channel 非线程安全,如需并发发送,请使用多个生产者实例或外部串行化调用。
     */

    /**
     * 幂等拓扑初始化:声明交换机、队列与绑定(若存在则跳过)
     */
    public void initTopology() throws IOException {
        TopologyInitializer.init(channel, config);
    }

    /**
     * 发布 JSON 消息并等待确认
     * @param routingKey 路由键
     * @param envelope   消息封装(通用字段)
     * @return true 表示确认成功;false 表示未确认或失败
     */
    public boolean publish(String routingKey, MessageEnvelope envelope) {
        ensureStarted();

        try {
            // 序列化为 JSON
            byte[] body = objectMapper.writeValueAsBytes(envelope);

            Map<String, Object> headers = new HashMap<>();
            headers.put("type", envelope.getType());
            headers.put("schema", envelope.getSchema()); // 可选:JSON Schema 标识
            headers.put("origin", envelope.getOrigin()); // 可选:来源系统

            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .contentType("application/json")
                    .contentEncoding("utf-8")
                    .deliveryMode(2) // 2 = 持久化
                    .messageId(envelope.getId())
                    .timestamp(Date.from(Instant.ofEpochMilli(envelope.getTimestamp())))
                    .correlationId(envelope.getCorrelationId())
                    .headers(headers)
                    .build();

            // 发布到交换机
            channel.basicPublish(
                    config.getExchangeName(),
                    routingKey,
                    props,
                    body
            );

            // 等待确认(单条或小批次场景适用)
            channel.waitForConfirmsOrDie();

            log.debug("Message published and confirmed: routingKey={}, messageId={}",
                    routingKey, envelope.getId());
            return true;

        } catch (IOException e) {
            log.error("Publish failed (serialization or IO): routingKey={}, messageId={}, err={}",
                    routingKey, envelope.getId(), e.toString());
            return false;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Publish interrupted: routingKey={}, messageId={}", routingKey, envelope.getId());
            return false;
        } catch (AlreadyClosedException | TimeoutException e) {
            log.error("Channel/Connection closed or timed out during publish: {}", e.toString());
            return false;
        }
    }

    private void ensureStarted() {
        if (this.channel == null || !this.channel.isOpen()) {
            throw new IllegalStateException("Producer not started. Call start() before publish().");
        }
    }

    @Override
    public void close() {
        try {
            if (channel != null && channel.isOpen()) {
                channel.close();
            }
        } catch (Exception e) {
            log.warn("Error closing channel: {}", e.toString());
        }
        try {
            if (connection != null && connection.isOpen()) {
                connection.close();
            }
        } catch (Exception e) {
            log.warn("Error closing connection: {}", e.toString());
        }
    }
}

辅助类:RabbitConfig.java

package com.example.messaging.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Properties;

/**
 * 统一配置加载:
 * - 优先读取环境变量,其次读取 application.properties
 * - 不输出敏感信息到日志(如密码)
 */
public class RabbitConfig {
    private static final Logger log = LoggerFactory.getLogger(RabbitConfig.class);

    private final String host;
    private final int port;
    private final String username;
    private final String password;
    private final String vhost;

    private final String exchangeName;
    private final String exchangeType; // e.g. topic, direct
    private final String queueName;
    private final String routingKey;
    private final String dlxName;
    private final String dlqName;

    private final int prefetch;
    private final int consumerCount;
    private final boolean useTls;

    private final String clientName;

    public RabbitConfig(Properties props) {
        this.host = getEnvOrProp("RABBITMQ_HOST", props, "localhost");
        this.port = Integer.parseInt(getEnvOrProp("RABBITMQ_PORT", props, "5672"));
        this.username = getEnvOrProp("RABBITMQ_USERNAME", props, "guest");
        this.password = getEnvOrProp("RABBITMQ_PASSWORD", props, "guest");
        this.vhost = getEnvOrProp("RABBITMQ_VHOST", props, "/");

        this.exchangeName = getEnvOrProp("RABBITMQ_EXCHANGE", props, "app.events");
        this.exchangeType = getEnvOrProp("RABBITMQ_EXCHANGE_TYPE", props, "topic");
        this.queueName = getEnvOrProp("RABBITMQ_QUEUE", props, "app.events.q");
        this.routingKey = getEnvOrProp("RABBITMQ_ROUTING_KEY", props, "app.*");
        this.dlxName = getEnvOrProp("RABBITMQ_DLX", props, "app.events.dlx");
        this.dlqName = getEnvOrProp("RABBITMQ_DLQ", props, "app.events.q.dlq");

        this.prefetch = Integer.parseInt(getEnvOrProp("RABBITMQ_PREFETCH", props, "50"));
        this.consumerCount = Integer.parseInt(getEnvOrProp("RABBITMQ_CONSUMER_COUNT", props, "2"));
        this.useTls = Boolean.parseBoolean(getEnvOrProp("RABBITMQ_USE_TLS", props, "false"));

        this.clientName = getEnvOrProp("RABBITMQ_CLIENT_NAME", props, "java-client");
    }

    public static ConnectionFactory createConnectionFactory(RabbitConfig cfg) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(cfg.host);
        factory.setPort(cfg.port);
        factory.setVirtualHost(cfg.vhost);
        factory.setUsername(cfg.username);
        factory.setPassword(cfg.password);
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(Duration.ofSeconds(5).toMillis());

        if (cfg.useTls) {
            try {
                factory.useSslProtocol(); // 使用默认 TLS,证书与信任由 JVM 配置
            } catch (Exception e) {
                throw new IllegalStateException("Failed to enable TLS: " + e.getMessage(), e);
            }
        }

        // 合理超时设置
        factory.setConnectionTimeout(10_000);
        factory.setHandshakeTimeout(10_000);
        factory.setRequestedHeartbeat(30);

        return factory;
    }

    private static String getEnvOrProp(String key, Properties props, String defaultVal) {
        String env = System.getenv(key);
        if (env != null && !env.isEmpty()) return env;
        String prop = props.getProperty(key);
        if (prop != null && !prop.isEmpty()) return prop;
        return defaultVal;
    }

    public String getHost() { return host; }
    public int getPort() { return port; }
    public String getUsername() { return username; }
    public String getPassword() { return password; }
    public String getVhost() { return vhost; }
    public String getExchangeName() { return exchangeName; }
    public String getExchangeType() { return exchangeType; }
    public String getQueueName() { return queueName; }
    public String getRoutingKey() { return routingKey; }
    public String getDlxName() { return dlxName; }
    public String getDlqName() { return dlqName; }
    public int getPrefetch() { return prefetch; }
    public int getConsumerCount() { return consumerCount; }
    public boolean isUseTls() { return useTls; }
    public String getClientName() { return clientName; }
}

辅助类:TopologyInitializer.java

package com.example.messaging.rabbitmq;

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * 声明交换机、队列、DLQ 与绑定(幂等)
 */
public class TopologyInitializer {

    public static void init(Channel channel, RabbitConfig cfg) throws IOException {
        // 主交换机
        channel.exchangeDeclare(cfg.getExchangeName(), cfg.getExchangeType(), true, false, null);

        // DLX 与 DLQ
        channel.exchangeDeclare(cfg.getDlxName(), "topic", true, false, null);
        channel.queueDeclare(cfg.getDlqName(), true, false, false, null);
        channel.queueBind(cfg.getDlqName(), cfg.getDlxName(), cfg.getQueueName() + ".dlq");

        // 主队列配置:关联 DLX
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", cfg.getDlxName());
        args.put("x-dead-letter-routing-key", cfg.getQueueName() + ".dlq");

        channel.queueDeclare(cfg.getQueueName(), true, false, false, args);
        channel.queueBind(cfg.getQueueName(), cfg.getExchangeName(), cfg.getRoutingKey());
    }
}

辅助类:JsonUtil.java

package com.example.messaging.rabbitmq;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.DeserializationFeature;

public class JsonUtil {
    public static ObjectMapper objectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.findAndRegisterModules();
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        return mapper;
    }
}

模型类:MessageEnvelope.java

package com.example.messaging.rabbitmq;

import java.util.Map;

/**
 * 通用消息封装(可根据需要扩展)
 */
public class MessageEnvelope {
    private String id;
    private String type;
    private long timestamp;
    private String correlationId;
    private String schema;      // 可选:用于验证或路由
    private String origin;      // 可选:来源系统标识
    private Map<String, Object> payload;

    public MessageEnvelope() {}

    public MessageEnvelope(String id, String type, long timestamp, String correlationId,
                           String schema, String origin, Map<String, Object> payload) {
        this.id = id;
        this.type = type;
        this.timestamp = timestamp;
        this.correlationId = correlationId;
        this.schema = schema;
        this.origin = origin;
        this.payload = payload;
    }

    public String getId() { return id; }
    public String getType() { return type; }
    public long getTimestamp() { return timestamp; }
    public String getCorrelationId() { return correlationId; }
    public String getSchema() { return schema; }
    public String getOrigin() { return origin; }
    public Map<String, Object> getPayload() { return payload; }

    public void setId(String id) { this.id = id; }
    public void setType(String type) { this.type = type; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    public void setCorrelationId(String correlationId) { this.correlationId = correlationId; }
    public void setSchema(String schema) { this.schema = schema; }
    public void setOrigin(String origin) { this.origin = origin; }
    public void setPayload(Map<String, Object> payload) { this.payload = payload; }
}

消费者代码

文件:src/main/java/com/example/messaging/rabbitmq/MessageConsumer.java

package com.example.messaging.rabbitmq;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;

/**
 * RabbitMQ JSON 消费者模板
 * - 手动 ACK
 * - 每个消费者线程独立 Channel(符合 RabbitMQ 线程模型)
 * - 支持预取与并发
 * - 错误时将消息 dead-letter 到 DLQ,避免无限重试
 *
 * 使用方式:
 * MessageConsumer consumer = new MessageConsumer(config);
 * consumer.start();
 * // 运行...
 * consumer.close();
 */
public class MessageConsumer implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);

    private final RabbitConfig config;
    private final ObjectMapper objectMapper;
    private Connection connection;
    private ExecutorService executor;

    public MessageConsumer(RabbitConfig config) {
        this.config = config;
        this.objectMapper = JsonUtil.objectMapper();
    }

    public void start() throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitConfig.createConnectionFactory(config);
        this.executor = Executors.newFixedThreadPool(config.getConsumerCount(),
                r -> {
                    Thread t = new Thread(r, "rmq-consumer");
                    t.setDaemon(true);
                    return t;
                });
        this.connection = factory.newConnection(config.getClientName() + "-consumer");

        // 启动多个并发消费者,每个线程一个 Channel
        for (int i = 0; i < config.getConsumerCount(); i++) {
            executor.submit(() -> {
                try (Channel channel = connection.createChannel()) {
                    channel.basicQos(config.getPrefetch());
                    consumeLoop(channel);
                } catch (IOException e) {
                    log.error("Consumer channel IO error: {}", e.toString());
                }
            });
        }

        log.info("Consumer started: vhost={}, host={}, queue={}, concurrency={}, prefetch={}",
                config.getVhost(), config.getHost(), config.getQueueName(),
                config.getConsumerCount(), config.getPrefetch());
    }

    private void consumeLoop(Channel channel) throws IOException {
        // 确保队列已存在(实际可在启动时统一拓扑初始化)
        channel.basicConsume(
                config.getQueueName(),
                false, // 手动确认
                new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery delivery) throws IOException {
                        long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                        try {
                            MessageEnvelope envelope = parseMessage(delivery.getBody());
                            // 业务处理占位(请在项目中替换为实际处理逻辑)
                            processMessage(envelope, delivery.getProperties(), delivery.getEnvelope());

                            channel.basicAck(deliveryTag, false);
                            log.debug("Message acked: tag={}, messageId={}",
                                    deliveryTag, safeProp(delivery.getProperties().getMessageId()));
                        } catch (JsonProcessingException jpe) {
                            // JSON 解析失败:不可恢复,直接 dead-letter
                            channel.basicNack(deliveryTag, false, false);
                            log.warn("Message rejected (JSON parse error): tag={}, err={}",
                                    deliveryTag, jpe.getOriginalMessage());
                        } catch (Exception ex) {
                            // 其他错误:默认 dead-letter,避免重试风暴
                            channel.basicNack(deliveryTag, false, false);
                            log.error("Message processing failed and dead-lettered: tag={}, err={}",
                                    deliveryTag, ex.toString());
                        }
                    }
                },
                new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) {
                        log.warn("Consumer canceled: tag={}", consumerTag);
                    }
                }
        );
    }

    private MessageEnvelope parseMessage(byte[] body) throws JsonProcessingException {
        // 可扩展 Schema 验证
        return objectMapper.readValue(body, MessageEnvelope.class);
    }

    /**
     * 业务处理占位:在此实现与业务相关的处理
     * 注意:此模板不包含具体业务逻辑,根据需要进行扩展
     */
    private void processMessage(MessageEnvelope envelope,
                                AMQP.BasicProperties properties,
                                Envelope envelopeMeta) {
        // 示例:可根据 envelope.getType() 分发到不同处理器
        // 此处仅做占位,不实现具体业务
    }

    private String safeProp(String v) {
        return v == null ? "" : v;
    }

    @Override
    public void close() {
        if (executor != null) {
            executor.shutdown();
            try {
                executor.awaitTermination(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        try {
            if (connection != null && connection.isOpen()) {
                connection.close();
            }
        } catch (Exception e) {
            log.warn("Error closing connection: {}", e.toString());
        }
    }
}

配置示例

文件:src/main/resources/application.properties

# RabbitMQ 基本连接(可通过环境变量覆盖)
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_VHOST=/
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest

# 拓扑配置
RABBITMQ_EXCHANGE=app.events
RABBITMQ_EXCHANGE_TYPE=topic
RABBITMQ_QUEUE=app.events.q
RABBITMQ_ROUTING_KEY=app.*

# 死信交换机/队列
RABBITMQ_DLX=app.events.dlx
RABBITMQ_DLQ=app.events.q.dlq

# 消费者并发与预取
RABBITMQ_CONSUMER_COUNT=2
RABBITMQ_PREFETCH=50

# TLS(可选)
RABBITMQ_USE_TLS=false

# 客户端名称
RABBITMQ_CLIENT_NAME=java-client

文件:src/main/resources/logback.xml(日志配置)

<configuration>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%thread] %logger{36} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="info">
    <appender-ref ref="STDOUT"/>
  </root>

  <!-- 可针对生产者/消费者单独设置日志级别 -->
  <logger name="com.example.messaging.rabbitmq" level="debug"/>
</configuration>

使用说明

  • 初始化配置:
    • 推荐使用环境变量设置敏感信息(RABBITMQ_USERNAME、RABBITMQ_PASSWORD),避免硬编码。
    • 如启用 TLS,确保 JVM 信任库正确配置,且不要禁用主机名校验。
  • 拓扑创建:
    • 可在生产者启动时调用 initTopology() 创建交换机、队列与 DLQ;也可由独立的部署步骤完成。
  • 生产者使用:
    • 构建 RabbitConfig 后调用 MessageProducer#start()
    • 调用 publish(routingKey, MessageEnvelope) 发送消息;返回值表示发布确认是否成功。
    • 高吞吐场景建议批量发布后再确认,或使用异步发布确认策略。
  • 消费者使用:
    • 调用 MessageConsumer#start() 启动并发消费者。
    • 消费者使用手动 ACK;解析失败或处理异常时消息将 dead-letter 到 DLQ。
    • 根据业务需求可扩展有限重试策略(延迟队列或重试交换机),避免无限重试。
  • 线程与 Channel:
    • RabbitMQ Channel 非线程安全;消费者已按“每线程一个 Channel”的最佳实践实现。
    • 生产者在并发场景下建议为每个线程维护独立的生产者实例或使用外部串行化。
  • 监控与可观测性:
    • 设置合理的日志级别;在生产环境中建议将生产者发布失败与消费者处理失败进行告警。
  • 常见问题:
    • 连接自动恢复已启用;网络抖动后客户端会自动重建连接与通道,但拓扑和发布确认状态需重新确认。
    • 如果出现消息积压,适当调整 prefetch 与消费者并发数,并检查业务处理耗时。

此模板遵循企业级开发最佳实践,避免使用过时 API,无敏感信息硬编码,可直接集成到现有项目并进行扩展。

代码概述

本模板基于 ActiveMQ(OpenWire 协议)与 C# (.NET 8),提供标准化的生产者与消费者代码框架,使用 XML 作为消息格式。代码遵循企业级最佳实践,包含:

  • 连接管理与自动重试(Failover)
  • 可配置的持久化投递、TTL、预取、超时等参数
  • XML 序列化/反序列化与可选的 XSD 验证
  • 完整的错误处理、日志记录与资源释放
  • 支持并发消费与手动确认机制

适用场景:新项目初始化、现有系统扩展、技术栈迁移,便于直接集成到现有 .NET 项目中。


依赖配置

  • .NET SDK:8.0+
  • NuGet 包:
    • Apache.NMS.ActiveMQ (>= 1.8.0)
    • Apache.NMS (>= 1.8.0)
    • Microsoft.Extensions.Logging.Abstractions (>= 8.0.0)
    • Microsoft.Extensions.Configuration (>= 8.0.0)
    • Microsoft.Extensions.Configuration.Json (>= 8.0.0)
    • Microsoft.Extensions.Configuration.EnvironmentVariables (>= 8.0.0)

安装示例:

dotnet add package Apache.NMS.ActiveMQ --version 1.8.0
dotnet add package Apache.NMS --version 1.8.0
dotnet add package Microsoft.Extensions.Logging.Abstractions
dotnet add package Microsoft.Extensions.Configuration
dotnet add package Microsoft.Extensions.Configuration.Json
dotnet add package Microsoft.Extensions.Configuration.EnvironmentVariables

目标框架建议:net8.0


生产者代码

说明:ActiveMQ XML 生产者,支持连接复用、持久化投递、TTL、消息头设置、XML 序列化与可选的 XSD 校验。

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using System.Xml.Schema;
using System.Xml.Serialization;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Microsoft.Extensions.Logging;

namespace Messaging.ActiveMqXml
{
    public sealed class ActiveMqOptions
    {
        // Broker Failover URIs,例如: tcp://mq1:61616, tcp://mq2:61616
        public List<string> BrokerUris { get; set; } = new List<string>();

        // 队列名
        public string QueueName { get; set; } = "demo.queue";

        // 可选:认证信息(建议使用安全的秘密管理)
        public string? Username { get; set; }
        public string? Password { get; set; }

        // 客户端标识(用于监控与调试)
        public string ClientId { get; set; } = $"producer-{Environment.MachineName}";

        // 生产者设置
        public bool PersistentDelivery { get; set; } = true;
        public int? ProducerPriority { get; set; } = null; // 0-9
        public int DefaultTimeToLiveMs { get; set; } = 0;  // 0 表示不过期

        // 连接与重试
        public int MaxReconnectAttempts { get; set; } = 10;
        public int InitialReconnectDelayMs { get; set; } = 500;
        public int ReceiveTimeoutMs { get; set; } = 2000; // 不用于生产者,但保持统一

        // XML 设置
        public string? XmlSchemaPath { get; set; } = null; // 若提供则会进行 XSD 校验
        public string ContentTypeHeader { get; set; } = "application/xml";

        // 高级设置(例如预取等,可在消费者端使用)
        public int PrefetchSize { get; set; } = 1000;
    }

    public interface IXmlMessageSerializer
    {
        string Serialize<T>(T obj);
        T Deserialize<T>(string xml);
        void ValidateXml(string xml); // 如果设置了 XSD
    }

    public sealed class XmlMessageSerializer : IXmlMessageSerializer
    {
        private readonly string? _xsdPath;
        public XmlMessageSerializer(string? xsdPath = null)
        {
            _xsdPath = xsdPath;
        }

        public string Serialize<T>(T obj)
        {
            var serializer = new XmlSerializer(typeof(T));
            var settings = new XmlWriterSettings
            {
                Encoding = new UTF8Encoding(false),
                Indent = true,
                OmitXmlDeclaration = false
            };
            using var sw = new StringWriter();
            using var xw = XmlWriter.Create(sw, settings);
            serializer.Serialize(xw, obj);
            var xml = sw.ToString();
            if (!string.IsNullOrWhiteSpace(_xsdPath))
            {
                ValidateXml(xml);
            }
            return xml;
        }

        public T Deserialize<T>(string xml)
        {
            var serializer = new XmlSerializer(typeof(T));
            using var sr = new StringReader(xml);
            return (T)serializer.Deserialize(sr)!;
        }

        public void ValidateXml(string xml)
        {
            if (string.IsNullOrWhiteSpace(_xsdPath))
                return;

            var schemas = new XmlSchemaSet();
            using var xsdStream = File.OpenRead(_xsdPath);
            schemas.Add(null, XmlReader.Create(xsdStream));

            var settings = new XmlReaderSettings
            {
                ValidationType = ValidationType.Schema,
                Schemas = schemas
            };
            settings.ValidationEventHandler += (s, e) =>
            {
                throw new XmlSchemaValidationException($"XML validation error: {e.Message}", e.Exception);
            };

            using var sr = new StringReader(xml);
            using var reader = XmlReader.Create(sr, settings);
            while (reader.Read()) { } // 触发验证
        }
    }

    public sealed class ActiveMqXmlProducer : IDisposable
    {
        private readonly ActiveMqOptions _options;
        private readonly ILogger? _logger;
        private readonly IXmlMessageSerializer _serializer;

        private IConnectionFactory? _factory;
        private IConnection? _connection;
        private ISession? _session;
        private IMessageProducer? _producer;
        private IDestination? _destination;
        private readonly object _sync = new();

        private bool _disposed;

        public ActiveMqXmlProducer(ActiveMqOptions options, IXmlMessageSerializer? serializer = null, ILogger? logger = null)
        {
            _options = options ?? throw new ArgumentNullException(nameof(options));
            _serializer = serializer ?? new XmlMessageSerializer(options.XmlSchemaPath);
            _logger = logger;

            EnsureInitialized();
        }

        private string BuildFailoverUri()
        {
            if (_options.BrokerUris == null || _options.BrokerUris.Count == 0)
                throw new InvalidOperationException("BrokerUris must be configured.");

            // failover:(tcp://host1:61616,tcp://host2:61616)?maxReconnectAttempts=10&initialReconnectDelay=500
            var brokers = string.Join(",", _options.BrokerUris);
            var query = $"maxReconnectAttempts={_options.MaxReconnectAttempts}&initialReconnectDelay={_options.InitialReconnectDelayMs}&randomize=false";
            return $"failover:({brokers})?{query}";
        }

        private void EnsureInitialized()
        {
            lock (_sync)
            {
                if (_factory != null) return;
                var uri = BuildFailoverUri();
                _factory = new ConnectionFactory(uri);
            }
        }

        private void EnsureConnected()
        {
            lock (_sync)
            {
                if (_connection == null)
                {
                    _connection = string.IsNullOrWhiteSpace(_options.Username)
                        ? _factory!.CreateConnection()
                        : _factory!.CreateConnection(_options.Username, _options.Password);
                    _connection.ClientId = _options.ClientId;
                    _connection.Start();
                    _logger?.LogInformation("ActiveMQ connection started. ClientId={ClientId}", _options.ClientId);
                }

                if (_session == null)
                {
                    // 生产者使用自动确认会话即可
                    _session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
                }

                if (_destination == null)
                {
                    _destination = _session.GetQueue(_options.QueueName);
                }

                if (_producer == null)
                {
                    _producer = _session.CreateProducer(_destination);
                    _producer.DeliveryMode = _options.PersistentDelivery ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent;
                    if (_options.ProducerPriority.HasValue)
                        _producer.Priority = _options.ProducerPriority.Value;
                    if (_options.DefaultTimeToLiveMs > 0)
                        _producer.TimeToLive = TimeSpan.FromMilliseconds(_options.DefaultTimeToLiveMs);
                }
            }
        }

        public Task SendAsync<T>(T payload, IDictionary<string, object>? headers = null, string? correlationId = null, CancellationToken cancellationToken = default)
        {
            if (payload == null) throw new ArgumentNullException(nameof(payload));
            cancellationToken.ThrowIfCancellationRequested();

            EnsureConnected();

            try
            {
                var xml = _serializer.Serialize(payload);
                var msg = _session!.CreateTextMessage(xml);

                // 标准化头
                msg.NMSType = typeof(T).FullName;
                if (!string.IsNullOrWhiteSpace(correlationId))
                    msg.NMSCorrelationID = correlationId;

                msg.Properties.SetString("content-type", _options.ContentTypeHeader);
                msg.Properties.SetString("x-serializer", "XmlSerializer");

                // 附加自定义头
                if (headers != null)
                {
                    foreach (var kv in headers)
                    {
                        switch (kv.Value)
                        {
                            case null:
                                continue;
                            case string s:
                                msg.Properties.SetString(kv.Key, s);
                                break;
                            case int i:
                                msg.Properties.SetInt(kv.Key, i);
                                break;
                            case long l:
                                msg.Properties.SetLong(kv.Key, l);
                                break;
                            case bool b:
                                msg.Properties.SetBool(kv.Key, b);
                                break;
                            case byte bt:
                                msg.Properties.SetByte(kv.Key, bt);
                                break;
                            case short sh:
                                msg.Properties.SetShort(kv.Key, sh);
                                break;
                            case float f:
                                msg.Properties.SetFloat(kv.Key, f);
                                break;
                            case double d:
                                msg.Properties.SetDouble(kv.Key, d);
                                break;
                            default:
                                msg.Properties.SetString(kv.Key, kv.Value.ToString()!);
                                break;
                        }
                    }
                }

                _producer!.Send(msg);
                _logger?.LogInformation("Message sent to {Queue} (Persistent={Persistent}, TTL={TTL}ms).",
                    _options.QueueName,
                    _options.PersistentDelivery,
                    _options.DefaultTimeToLiveMs);

                return Task.CompletedTask;
            }
            catch (XmlSchemaValidationException xsdEx)
            {
                _logger?.LogError(xsdEx, "XML schema validation failed.");
                throw;
            }
            catch (Exception ex)
            {
                _logger?.LogError(ex, "Failed to send message.");
                throw;
            }
        }

        public void Dispose()
        {
            if (_disposed) return;
            _disposed = true;

            try
            {
                _producer?.Close();
                _session?.Close();
                _connection?.Close();
            }
            catch (Exception ex)
            {
                _logger?.LogWarning(ex, "Error while closing ActiveMQ resources.");
            }

            _producer = null;
            _session = null;
            _connection = null;
            _factory = null;
        }
    }
}

消费者代码

说明:ActiveMQ XML 消费者,支持并发、多线程消费,手动确认模式(ClientAcknowledge),异常时进行会话恢复触发重投,包含 XML 反序列化与可选校验。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Microsoft.Extensions.Logging;

namespace Messaging.ActiveMqXml
{
    public enum AckMode
    {
        Auto,
        Client
    }

    public sealed class ConsumerOptions
    {
        public ActiveMqOptions Broker { get; set; } = new ActiveMqOptions();

        // 消费者并发度:建议与队列负载及服务实例数匹配
        public int Concurrency { get; set; } = 2;

        // 会话确认模式
        public AckMode Acknowledgement { get; set; } = AckMode.Client;

        // 每个消费者实例的接收超时
        public int ReceiveTimeoutMs { get; set; } = 1000;

        // 出错时是否 Session.Recover()
        public bool RecoverOnError { get; set; } = true;
    }

    public interface IXmlMessageHandler<T>
    {
        // 业务处理入口:实现方决定是否线程安全
        Task HandleAsync(T message, IMessage rawMessage, CancellationToken ct);
    }

    public sealed class ActiveMqXmlConsumer<T> : IDisposable
    {
        private readonly ConsumerOptions _options;
        private readonly IXmlMessageSerializer _serializer;
        private readonly IXmlMessageHandler<T> _handler;
        private readonly ILogger? _logger;

        private readonly List<ConsumerWorker> _workers = new();
        private bool _disposed;

        public ActiveMqXmlConsumer(ConsumerOptions options, IXmlMessageSerializer serializer, IXmlMessageHandler<T> handler, ILogger? logger = null)
        {
            _options = options ?? throw new ArgumentNullException(nameof(options));
            _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
            _handler = handler ?? throw new ArgumentNullException(nameof(handler));
            _logger = logger;
        }

        public void Start(CancellationToken cancellationToken = default)
        {
            if (_options.Broker.BrokerUris == null || _options.Broker.BrokerUris.Count == 0)
                throw new InvalidOperationException("BrokerUris must be configured.");

            for (int i = 0; i < _options.Concurrency; i++)
            {
                var worker = new ConsumerWorker(_options, _serializer, _handler, _logger, i);
                _workers.Add(worker);
                worker.Start(cancellationToken);
            }
            _logger?.LogInformation("ActiveMQ consumers started. Concurrency={Concurrency}", _options.Concurrency);
        }

        public async Task StopAsync()
        {
            foreach (var w in _workers)
            {
                await w.StopAsync();
            }
            _workers.Clear();
            _logger?.LogInformation("ActiveMQ consumers stopped.");
        }

        public void Dispose()
        {
            if (_disposed) return;
            _disposed = true;
            foreach (var w in _workers)
            {
                w.Dispose();
            }
            _workers.Clear();
        }

        private sealed class ConsumerWorker : IDisposable
        {
            private readonly ConsumerOptions _options;
            private readonly IXmlMessageSerializer _serializer;
            private readonly IXmlMessageHandler<T> _handler;
            private readonly ILogger? _logger;
            private readonly int _index;

            private IConnectionFactory? _factory;
            private IConnection? _connection;
            private ISession? _session;
            private IMessageConsumer? _consumer;
            private IDestination? _destination;

            private Task? _runTask;
            private CancellationTokenSource? _cts;

            public ConsumerWorker(ConsumerOptions options, IXmlMessageSerializer serializer, IXmlMessageHandler<T> handler, ILogger? logger, int index)
            {
                _options = options;
                _serializer = serializer;
                _handler = handler;
                _logger = logger;
                _index = index;
            }

            private string BuildFailoverUri()
            {
                var brokers = string.Join(",", _options.Broker.BrokerUris);
                var query = $"maxReconnectAttempts={_options.Broker.MaxReconnectAttempts}&initialReconnectDelay={_options.Broker.InitialReconnectDelayMs}&randomize=false&jms.useAsyncSend=true&nms.prefetchPolicy.queuePrefetch={_options.Broker.PrefetchSize}";
                return $"failover:({brokers})?{query}";
            }

            private void EnsureConnected()
            {
                if (_factory == null)
                {
                    _factory = new ConnectionFactory(BuildFailoverUri());
                }

                if (_connection == null)
                {
                    _connection = string.IsNullOrWhiteSpace(_options.Broker.Username)
                        ? _factory.CreateConnection()
                        : _factory.CreateConnection(_options.Broker.Username, _options.Broker.Password);

                    _connection.ClientId = $"{_options.Broker.ClientId}-consumer-{_index}";
                    _connection.Start();
                    _logger?.LogInformation("Consumer[{Index}] connection started.", _index);
                }

                if (_session == null)
                {
                    var ack = _options.Acknowledgement == AckMode.Auto ? AcknowledgementMode.AutoAcknowledge : AcknowledgementMode.ClientAcknowledge;
                    _session = _connection.CreateSession(ack);
                }

                if (_destination == null)
                {
                    _destination = _session.GetQueue(_options.Broker.QueueName);
                }

                if (_consumer == null)
                {
                    _consumer = _session.CreateConsumer(_destination);
                }
            }

            public void Start(CancellationToken externalToken)
            {
                _cts = CancellationTokenSource.CreateLinkedTokenSource(externalToken);
                var ct = _cts.Token;
                _runTask = Task.Run(async () =>
                {
                    EnsureConnected();

                    var timeout = TimeSpan.FromMilliseconds(_options.ReceiveTimeoutMs);
                    while (!ct.IsCancellationRequested)
                    {
                        IMessage? raw = null;
                        try
                        {
                            raw = _consumer!.Receive(timeout);
                            if (raw == null) continue;

                            if (raw is ITextMessage textMsg)
                            {
                                var xml = textMsg.Text;
                                var contentType = textMsg.Properties.GetString("content-type");
                                if (!string.Equals(contentType, _options.Broker.ContentTypeHeader, StringComparison.OrdinalIgnoreCase))
                                {
                                    _logger?.LogWarning("Consumer[{Index}] received unexpected content-type: {ContentType}", _index, contentType);
                                }

                                T model = _serializer.Deserialize<T>(xml);

                                await _handler.HandleAsync(model, raw, ct);

                                if (_options.Acknowledgement == AckMode.Client)
                                {
                                    raw.Acknowledge();
                                }
                            }
                            else
                            {
                                _logger?.LogWarning("Consumer[{Index}] received non-text message: {Type}", _index, raw.GetType().FullName);
                                // 非文本消息通常也应确认,避免阻塞队列
                                if (_options.Acknowledgement == AckMode.Client)
                                {
                                    raw.Acknowledge();
                                }
                            }
                        }
                        catch (OperationCanceledException)
                        {
                            break;
                        }
                        catch (Exception ex)
                        {
                            _logger?.LogError(ex, "Consumer[{Index}] handling message failed.", _index);
                            if (_options.RecoverOnError && _options.Acknowledgement == AckMode.Client)
                            {
                                try
                                {
                                    // 触发重投递(遵循 ActiveMQ 的 RedeliveryPolicy/DLQ 策略)
                                    _session!.Recover();
                                }
                                catch (Exception rex)
                                {
                                    _logger?.LogWarning(rex, "Consumer[{Index}] session recover failed.", _index);
                                }
                            }
                        }
                    }
                }, ct);
            }

            public async Task StopAsync()
            {
                if (_cts != null)
                {
                    _cts.Cancel();
                    if (_runTask != null)
                    {
                        try
                        {
                            await _runTask;
                        }
                        catch { /* 忽略取消异常 */ }
                    }
                    _cts.Dispose();
                    _cts = null;
                }

                try
                {
                    _consumer?.Close();
                    _session?.Close();
                    _connection?.Close();
                }
                catch (Exception ex)
                {
                    _logger?.LogWarning(ex, "Consumer[{Index}] error while closing.", _index);
                }

                _consumer = null;
                _session = null;
                _connection = null;
                _factory = null;
            }

            public void Dispose()
            {
                _ = StopAsync();
            }
        }
    }
}

示例处理器(模板,无业务逻辑):

using System.Threading;
using System.Threading.Tasks;
using Apache.NMS;
using Microsoft.Extensions.Logging;

namespace Messaging.ActiveMqXml
{
    // 示例:将消息打印日志;实际项目中替换为真实处理逻辑
    public sealed class LoggingXmlMessageHandler<T> : IXmlMessageHandler<T>
    {
        private readonly ILogger? _logger;

        public LoggingXmlMessageHandler(ILogger? logger = null)
        {
            _logger = logger;
        }

        public Task HandleAsync(T message, IMessage rawMessage, CancellationToken ct)
        {
            _logger?.LogInformation("Received message type={Type}, CorrelationId={CorrelationId}", typeof(T).FullName, rawMessage.NMSCorrelationID);
            // 在此处执行实际业务处理(模板不包含具体逻辑)
            return Task.CompletedTask;
        }
    }
}

配置示例

appsettings.json(敏感信息请使用环境变量或安全存储):

{
  "ActiveMq": {
    "BrokerUris": [
      "tcp://localhost:61616"
    ],
    "QueueName": "demo.xml.queue",
    "Username": "",
    "Password": "",
    "ClientId": "demo-client",
    "PersistentDelivery": true,
    "ProducerPriority": null,
    "DefaultTimeToLiveMs": 0,
    "MaxReconnectAttempts": 10,
    "InitialReconnectDelayMs": 500,
    "PrefetchSize": 1000,
    "XmlSchemaPath": null,
    "ContentTypeHeader": "application/xml"
  },
  "Consumer": {
    "Concurrency": 2,
    "Acknowledgement": "Client",
    "ReceiveTimeoutMs": 1000,
    "RecoverOnError": true
  }
}

最小示例 Program.cs(演示如何加载配置并运行生产者/消费者):

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Messaging.ActiveMqXml;

class Program
{
    static async Task Main(string[] args)
    {
        var config = new ConfigurationBuilder()
            .AddJsonFile("appsettings.json", optional: true)
            .AddEnvironmentVariables()
            .Build();

        var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
        var logger = loggerFactory.CreateLogger("Demo");

        var mqOptions = config.GetSection("ActiveMq").Get<ActiveMqOptions>() ?? new ActiveMqOptions
        {
            BrokerUris = new() { "tcp://localhost:61616" },
            QueueName = "demo.xml.queue"
        };

        var consumerOptions = config.GetSection("Consumer").Get<ConsumerOptions>() ?? new ConsumerOptions
        {
            Broker = mqOptions
        };

        var serializer = new XmlMessageSerializer(mqOptions.XmlSchemaPath);

        // 启动消费者
        var handler = new LoggingXmlMessageHandler<SampleMessage>(loggerFactory.CreateLogger("Handler"));
        var consumer = new ActiveMqXmlConsumer<SampleMessage>(consumerOptions, serializer, handler, loggerFactory.CreateLogger("Consumer"));
        var cts = new CancellationTokenSource();
        consumer.Start(cts.Token);

        // 发送示例消息
        using var producer = new ActiveMqXmlProducer(mqOptions, serializer, loggerFactory.CreateLogger("Producer"));
        var sample = new SampleMessage { Id = Guid.NewGuid().ToString(), CreatedAtUtc = DateTime.UtcNow, Payload = "hello xml" };
        await producer.SendAsync(sample, correlationId: sample.Id);

        Console.WriteLine("Press ENTER to stop...");
        Console.ReadLine();
        cts.Cancel();
        await consumer.StopAsync();
    }
}

public class SampleMessage
{
    public string Id { get; set; } = default!;
    public DateTime CreatedAtUtc { get; set; }
    public string Payload { get; set; } = default!;
}

使用说明

  • 架构与技术栈选择

    • 使用 Apache.NMS.ActiveMQ 连接 ActiveMQ(OpenWire 协议)。
    • Failover URI 提供自动重连,可配置重试次数与初始延迟。
    • 生产者使用 AutoAcknowledge 会话;消费者可选 Auto 或 Client 模式(推荐 Client 以便精确控制确认)。
  • 发送策略与最佳实践

    • 持久化投递(PersistentDelivery=true)确保消息不丢失,但会降低吞吐;根据业务权衡。
    • 设置 TimeToLive 控制过期;若不需要请设为 0。
    • 使用 IMessage.Properties 设置标准头,如 content-type=application/xml,便于跨语言互操作。
  • 消费模式与并发

    • Concurrency 表示创建多少独立的消费者会话与连接线程,提高并行处理能力。
    • ClientAcknowledge 模式下,处理成功后调用 message.Acknowledge();异常时可 Session.Recover() 触发重投递(遵循 ActiveMQ RedeliveryPolicy/DLQ 策略)。
  • XML 序列化与验证

    • XmlMessageSerializer 支持可选的 XSD 校验,若配置 XmlSchemaPath 则在发送时进行验证。
    • 反序列化失败会抛出异常并触发错误处理流程。
  • 错误处理与日志

    • 所有核心操作包含 try/catch;异常记录到日志并向上抛出,便于调用方统一处理。
    • 关闭资源时捕获并记录异常,确保不会影响进程退出。
  • 安全注意事项

    • 禁止在代码中硬编码用户名/密码,请使用环境变量或安全的秘密管理。
    • 若使用 TLS,请将 BrokerUris 改为 ssl://host:61617 并在 ActiveMQ 端正确配置证书。
    • 不要在日志中输出敏感数据。
  • 配置建议

    • 生产环境建议开启持久化与合理的重投策略;
    • 配置合适的 PrefetchSize,避免消费者端因预取过大造成内存压力;
    • 根据消息大小与频率调整 ReceiveTimeoutMs 及并发度。
  • 集成步骤

    1. 按依赖安装 NuGet 包。
    2. 配置 appsettings.json 或环境变量。
    3. 将 ActiveMqXmlProducer 与 ActiveMqXmlConsumer 及 XmlMessageSerializer 集成到你的项目。
    4. 实现自定义 IXmlMessageHandler 进行业务处理。
    5. 在入口程序中启动消费者,并按需调用生产者发送消息。

该模板避免了过时和不安全的用法,保持通用性与可维护性;请根据实际业务需求扩展重试、度量指标、死信队列处理等功能。

示例详情

适用用户

后端开发工程师

在新微服务或模块中,快速生成Kafka、RabbitMQ、SQS等生产者/消费者代码,标准化错误处理与日志,缩短接入与联调时间。

架构师

批量产出统一的通信模板与配置示例,在分布式系统落地一致的消息规范,降低跨团队差异与维护风险。

DevOps/平台工程师

将接入流程模板化并纳入流水线,自动生成依赖与环境配置,加速部署、灰度与回滚,减少重复脚本维护。

解决的问题

帮助研发团队在微服务与分布式场景下,以最少投入迅速生成消息队列生产者/消费者代码模板,做到开箱可用、规范统一、稳定可靠;支持多种队列与主流语言,覆盖新项目搭建、架构扩展、技术栈迁移等高频场景;通过可读注释、完整配置与使用说明,缩短交付周期、降低故障率;引导试用并升级至付费版,获得更丰富队列类型与语言支持、团队规范预设、合规与安全校验、私有化部署、模板库管理与持续优化服务。

特征总结

一键生成生产者与消费者模板,自动匹配队列类型与语言,立即投入开发
智能解析消息格式,内置发送与确认流程,轻松保障消息收发有序可靠
自动补齐错误处理与日志方案,开箱可用,降低故障排查与联调成本
可定制参数与并发策略,按场景调优吞吐与稳定性,兼顾性能与维护
生成规范统一的代码结构与注释,便于团队协作与长期迭代升级
一键输出依赖与配置示例,减少环境搭建时间,快速融入现有系统
支持多队列系统与迁移场景,助力技术栈更替与异构服务统一通信
自动优化序列化与反序列化流程,降低格式兼容问题,提升处理效率
附带使用说明与最佳实践提示,新人也能迅速上手并交付可运行模块

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 687 tokens
- 3 个可调节参数
{ 消息队列类型 } { 编程语言 } { 消息格式 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59