热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词专为自动化生成消息队列生产者和消费者代码模板而设计,通过智能解析消息队列类型、消息格式和配置参数,快速生成可立即使用的代码框架。它能显著提升开发效率,确保代码规范统一,支持多种主流消息队列系统,帮助开发者专注于业务逻辑实现而非基础架构搭建。特别适合微服务架构、分布式系统等需要高效消息通信的场景。
本模板基于 Kafka + Go + Protobuf,提供企业级的生产者和消费者代码框架,具备以下特性:
适用场景:新项目接入 Kafka、现有系统标准化消息代码、从其他 MQ 迁移到 Kafka 时的代码骨架搭建。
安装命令:
文件:internal/kafka/producer.go
package mqkafka
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"log/slog"
"os"
"time"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
"google.golang.org/protobuf/proto"
)
// ProducerConfig 定义生产者配置
type ProducerConfig struct {
Brokers []string // Kafka broker 列表,例如: []string{"kafka-1:9092","kafka-2:9092"}
Topic string // 目标主题
ClientID string // 客户端ID,用于标识生产者
Compression kafka.Compression // 压缩算法:kafka.Snappy / kafka.Lz4 / kafka.Gzip / kafka.Zstd / kafka.NoCompression
RequiredAcks kafka.RequiredAcks // 建议使用 kafka.RequireAll 保证可靠性
BatchSize int // 批量大小(消息数)
BatchBytes int64 // 批量字节大小
BatchTimeout time.Duration // 批量发送等待超时
WriteTimeout time.Duration // 写超时
AllowAutoTopicCreate bool // 是否允许自动创建主题(依赖 broker 配置)
SchemaVersion string // 自定义 schema 版本,写入 headers 用于兼容控制
DefaultHeaders map[string]string // 默认消息头(会与每条消息的 headers 合并)
// 安全配置
Security SecurityConfig
}
// SecurityConfig 定义 TLS/SASL 配置
type SecurityConfig struct {
EnableTLS bool
CACertPath string // 可选:CA 证书路径(PEM)
ClientCertPath string // 可选:客户端证书路径(PEM)
ClientKeyPath string // 可选:客户端私钥路径(PEM)
InsecureSkipVerify bool // 不建议在生产使用
EnableSASL bool
SASLMechanism string // 仅当 EnableSASL=true 时有效: "PLAIN" | "SCRAM-SHA-256" | "SCRAM-SHA-512"
SASLUsername string // 从环境/配置加载,勿硬编码
SASLPassword string // 从环境/配置加载,勿硬编码
}
// Producer 生产者实例
type Producer struct {
w *kafka.Writer
topic string
log *slog.Logger
headers map[string]string
}
// NewProducer 构建生产者实例
func NewProducer(cfg ProducerConfig, logger *slog.Logger) (*Producer, error) {
if logger == nil {
logger = slog.Default()
}
if len(cfg.Brokers) == 0 {
return nil, errors.New("missing brokers")
}
if cfg.Topic == "" {
return nil, errors.New("missing topic")
}
if cfg.RequiredAcks == 0 {
cfg.RequiredAcks = kafka.RequireAll
}
if cfg.BatchTimeout == 0 {
cfg.BatchTimeout = 50 * time.Millisecond
}
if cfg.WriteTimeout == 0 {
cfg.WriteTimeout = 5 * time.Second
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
ClientID: cfg.ClientID,
DualStack: true,
}
// TLS
if cfg.Security.EnableTLS {
tlsCfg, err := buildTLSConfig(cfg.Security)
if err != nil {
return nil, fmt.Errorf("build TLS config: %w", err)
}
dialer.TLS = tlsCfg
}
// SASL
if cfg.Security.EnableSASL {
mech, err := buildSASLMechanism(cfg.Security)
if err != nil {
return nil, fmt.Errorf("build SASL mechanism: %w", err)
}
dialer.SASLMechanism = mech
}
w := &kafka.Writer{
Addr: kafka.TCP(cfg.Brokers...),
Topic: cfg.Topic,
Balancer: &kafka.Hash{}, // 使用 key 做哈希选择分区,保证同 key 顺序
Compression: cfg.Compression,
RequiredAcks: cfg.RequiredAcks,
BatchSize: cfg.BatchSize,
BatchBytes: cfg.BatchBytes,
BatchTimeout: cfg.BatchTimeout,
WriteTimeout: cfg.WriteTimeout,
AllowAutoTopicCreation: cfg.AllowAutoTopicCreate,
Transport: &kafka.Transport{
Dial: dialer.DialFunc,
TLS: dialer.TLS, // 由 Dialer 统一处理
},
// Kafka-go 会在内部对暂时性错误进行重试(例如网络抖动、临时不可用),无需手动实现额外重试。
}
defaultHeaders := map[string]string{
"content-type": "application/x-protobuf",
"sdk-language": "go",
"sdk-component": "producer",
}
if cfg.SchemaVersion != "" {
defaultHeaders["schema-version"] = cfg.SchemaVersion
}
// 合并用户默认 headers
for k, v := range cfg.DefaultHeaders {
defaultHeaders[k] = v
}
return &Producer{
w: w,
topic: cfg.Topic,
log: logger.With("component", "kafka_producer", "topic", cfg.Topic),
headers: defaultHeaders,
}, nil
}
// Close 关闭生产者
func (p *Producer) Close() error {
return p.w.Close()
}
// Send 发送 protobuf 消息
// 参数:
// - ctx: 外部上下文(建议带有超时)
// - key: 分区键(相同 key 的消息进入同一分区),可为 nil
// - msg: 任意 protobuf 消息
// - headers: 每条消息的附加 headers(会与默认 headers 合并)
func (p *Producer) Send(ctx context.Context, key []byte, msg proto.Message, headers map[string]string) error {
if msg == nil {
return errors.New("nil protobuf message")
}
// 建议上层设置超时;若未设置,这里设置一个保守的默认超时
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 10*time.Second)
defer cancel()
}
payload, err := proto.Marshal(msg)
if err != nil {
return fmt.Errorf("protobuf marshal: %w", err)
}
allHeaders := mergeHeaders(p.headers, headers)
kmsg := kafka.Message{
Key: key,
Value: payload,
Time: time.Now(),
Headers: toKafkaHeaders(allHeaders),
Topic: p.topic, // 可选,writer 已有 Topic
}
if err := p.w.WriteMessages(ctx, kmsg); err != nil {
p.log.Error("write message failed", "err", err)
return err
}
p.log.Debug("message sent", "bytes", len(payload))
return nil
}
func mergeHeaders(base map[string]string, extra map[string]string) map[string]string {
out := make(map[string]string, len(base)+len(extra))
for k, v := range base {
out[k] = v
}
for k, v := range extra {
out[k] = v
}
return out
}
func toKafkaHeaders(h map[string]string) []kafka.Header {
if len(h) == 0 {
return nil
}
out := make([]kafka.Header, 0, len(h))
for k, v := range h {
out = append(out, kafka.Header{Key: k, Value: []byte(v)})
}
return out
}
func buildTLSConfig(sec SecurityConfig) (*tls.Config, error) {
tlsCfg := &tls.Config{
InsecureSkipVerify: sec.InsecureSkipVerify, // 不建议在生产使用
MinVersion: tls.VersionTLS12,
}
// CA
if sec.CACertPath != "" {
caPEM, err := os.ReadFile(sec.CACertPath)
if err != nil {
return nil, fmt.Errorf("read CA cert: %w", err)
}
cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(caPEM) {
return nil, errors.New("append CA cert failed")
}
tlsCfg.RootCAs = cp
}
// 客户端证书
if sec.ClientCertPath != "" && sec.ClientKeyPath != "" {
cert, err := tls.LoadX509KeyPair(sec.ClientCertPath, sec.ClientKeyPath)
if err != nil {
return nil, fmt.Errorf("load client cert/key: %w", err)
}
tlsCfg.Certificates = []tls.Certificate{cert}
}
return tlsCfg, nil
}
func buildSASLMechanism(sec SecurityConfig) (sasl.Mechanism, error) {
user := sec.SASLUsername
pass := sec.SASLPassword
if user == "" || pass == "" {
return nil, errors.New("missing SASL credentials")
}
switch sec.SASLMechanism {
case "PLAIN":
return plain.Mechanism{Username: user, Password: pass}, nil
case "SCRAM-SHA-256":
mech, err := scram.Mechanism(scram.SHA256, user, pass)
if err != nil {
return nil, err
}
return mech, nil
case "SCRAM-SHA-512":
mech, err := scram.Mechanism(scram.SHA512, user, pass)
if err != nil {
return nil, err
}
return mech, nil
default:
return nil, fmt.Errorf("unsupported SASL mechanism: %s", sec.SASLMechanism)
}
}
文件:internal/kafka/consumer.go
package mqkafka
import (
"context"
"errors"
"fmt"
"log/slog"
"strconv"
"time"
"github.com/segmentio/kafka-go"
"google.golang.org/protobuf/proto"
)
// Handler 是用户提供的处理回调,业务逻辑应实现此函数。
// 返回 error 表示处理失败,框架将按重试策略处理。
type Handler func(ctx context.Context, key []byte, msg proto.Message, headers map[string]string) error
// NewMessage 是一个工厂方法,用于创建一个空的 protobuf 消息实例,用于反序列化。
// 例如:func() proto.Message { return &pb.Event{} }
type NewMessage func() proto.Message
// ConsumerConfig 定义消费者配置
type ConsumerConfig struct {
Brokers []string
Topic string
GroupID string
ClientID string
MinBytes int // 默认 1
MaxBytes int // 默认 10MB
MaxWait time.Duration // 取消息最大等待
ReadTimeout time.Duration // 读超时
CommitInterval time.Duration // 自动提交间隔;使用手动提交时可设为较长
// 偏移控制(仅在首次无已提交偏移时生效)
StartFromEarliest bool // true: earliest, false: latest
// 处理与重试
OperationTimeout time.Duration // 单条消息处理的超时
MaxRetryAttempts int // 处理失败最大重试次数(不含首次处理),例如 3
RetryBackoff time.Duration // 重试回退时间,默认 500ms
// DLQ
EnableDLQ bool
DLQTopic string
// 安全配置
Security SecurityConfig
}
// Consumer 消费者实例
type Consumer struct {
r *kafka.Reader
dlqWriter *kafka.Writer
log *slog.Logger
handler Handler
newMsg NewMessage
cfg ConsumerConfig
}
// NewConsumer 构建消费者
func NewConsumer(cfg ConsumerConfig, handler Handler, newMsg NewMessage, logger *slog.Logger) (*Consumer, error) {
if logger == nil {
logger = slog.Default()
}
if len(cfg.Brokers) == 0 {
return nil, errors.New("missing brokers")
}
if cfg.Topic == "" {
return nil, errors.New("missing topic")
}
if cfg.GroupID == "" {
return nil, errors.New("missing group id")
}
if handler == nil {
return nil, errors.New("missing handler")
}
if newMsg == nil {
return nil, errors.New("missing message factory")
}
dialer, transport, err := buildDialerTransport(cfg.Security, cfg.ClientID)
if err != nil {
return nil, fmt.Errorf("build dialer: %w", err)
}
if cfg.MinBytes == 0 {
cfg.MinBytes = 1
}
if cfg.MaxBytes == 0 {
cfg.MaxBytes = 10 << 20 // 10MB
}
if cfg.MaxWait == 0 {
cfg.MaxWait = 250 * time.Millisecond
}
if cfg.ReadTimeout == 0 {
cfg.ReadTimeout = 10 * time.Second
}
if cfg.CommitInterval == 0 {
cfg.CommitInterval = time.Second // 实际使用中通常手动提交;该值保守设置
}
if cfg.OperationTimeout == 0 {
cfg.OperationTimeout = 30 * time.Second
}
if cfg.RetryBackoff == 0 {
cfg.RetryBackoff = 500 * time.Millisecond
}
if cfg.MaxRetryAttempts < 0 {
cfg.MaxRetryAttempts = 0
}
readerCfg := kafka.ReaderConfig{
Brokers: cfg.Brokers,
GroupID: cfg.GroupID,
Topic: cfg.Topic,
MinBytes: cfg.MinBytes,
MaxBytes: cfg.MaxBytes,
MaxWait: cfg.MaxWait,
ReadLagInterval: -1, // 关闭默认延迟日志,避免噪声
CommitInterval: cfg.CommitInterval,
StartOffset: func() int64 {
if cfg.StartFromEarliest {
return kafka.FirstOffset
}
return kafka.LastOffset
}(),
Dialer: &kafka.Dialer{
Timeout: dialer.Timeout,
DualStack: dialer.DualStack,
ClientID: cfg.ClientID,
TLS: dialer.TLS,
SASLMechanism: dialer.SASLMechanism,
},
RetentionTime: 0, // 使用 broker 默认
}
r := kafka.NewReader(readerCfg)
r.SetOffsetAt(context.Background(), time.Now()) // 不会改变已有提交偏移,仅保证 reader 完整初始化。可移除。
var dlqWriter *kafka.Writer
if cfg.EnableDLQ {
if cfg.DLQTopic == "" {
return nil, errors.New("dlq enabled but DLQTopic empty")
}
dlqWriter = &kafka.Writer{
Addr: kafka.TCP(cfg.Brokers...),
Topic: cfg.DLQTopic,
Balancer: &kafka.Hash{},
RequiredAcks: kafka.RequireAll,
Transport: transport,
WriteTimeout: 10 * time.Second,
}
}
return &Consumer{
r: r,
dlqWriter: dlqWriter,
log: logger.With("component", "kafka_consumer", "topic", cfg.Topic, "group", cfg.GroupID),
handler: handler,
newMsg: newMsg,
cfg: cfg,
}, nil
}
// Run 启动消费循环(阻塞)。使用 ctx 控制优雅停机。
func (c *Consumer) Run(ctx context.Context) error {
c.log.Info("consumer started")
defer c.log.Info("consumer stopped")
for {
select {
case <-ctx.Done():
return nil
default:
}
// 读取消息(阻塞直到有消息或 ctx/reader 超时)
m, err := c.r.FetchMessage(ctx)
if err != nil {
// 可能是上下文取消或超时,或网络错误
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
c.log.Warn("fetch context error", "err", err)
continue
}
c.log.Error("fetch message failed", "err", err)
continue
}
// 解析 headers
hdrs := fromKafkaHeaders(m.Headers)
// 反序列化 Protobuf
pm := c.newMsg()
if err := proto.Unmarshal(m.Value, pm); err != nil {
c.log.Error("protobuf unmarshal failed; sending to DLQ (if enabled)", "err", err, "partition", m.Partition, "offset", m.Offset)
// 不可反序列化的消息:直接 DLQ;若 DLQ 不可用则不提交以便后续人工干预
if c.cfg.EnableDLQ {
_ = c.sendToDLQ(ctx, m, hdrs, err, -1) // -1 表示由于解析失败
if commitErr := c.r.CommitMessages(ctx, m); commitErr != nil {
c.log.Error("commit after DLQ failed", "err", commitErr)
}
continue
}
// 无 DLQ:不提交,让消息保留
continue
}
// 处理回调,带超时
processCtx, cancel := context.WithTimeout(ctx, c.cfg.OperationTimeout)
err = c.callWithRetry(processCtx, m, pm, hdrs)
cancel()
if err != nil {
// 超过重试,发送 DLQ 并提交偏移避免毒丸阻塞
if c.cfg.EnableDLQ {
if dlqErr := c.sendToDLQ(ctx, m, hdrs, err, c.cfg.MaxRetryAttempts); dlqErr != nil {
// DLQ 发送失败:不提交偏移,以便稍后重试
c.log.Error("send to DLQ failed; will not commit offset", "err", dlqErr, "partition", m.Partition, "offset", m.Offset)
continue
}
// DLQ 成功 -> 提交偏移
if commitErr := c.r.CommitMessages(ctx, m); commitErr != nil {
c.log.Error("commit after DLQ failed", "err", commitErr)
}
continue
}
// 无 DLQ:不提交,让消息保留以便运维处理
c.log.Error("message permanently failed without DLQ; offset not committed", "err", err, "partition", m.Partition, "offset", m.Offset)
continue
}
// 成功处理 -> 提交偏移
if err := c.r.CommitMessages(ctx, m); err != nil {
c.log.Error("commit message failed", "err", err, "partition", m.Partition, "offset", m.Offset)
continue
}
c.log.Debug("message processed & committed", "partition", m.Partition, "offset", m.Offset)
}
}
// Close 关闭消费者(Reader/DLQ Writer)
func (c *Consumer) Close() error {
var err1, err2 error
if c.r != nil {
err1 = c.r.Close()
}
if c.dlqWriter != nil {
err2 = c.dlqWriter.Close()
}
if err1 != nil {
return err1
}
return err2
}
func (c *Consumer) callWithRetry(ctx context.Context, m kafka.Message, pm proto.Message, headers map[string]string) error {
attempts := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
err := c.handler(ctx, m.Key, pm, headers)
if err == nil {
return nil
}
attempts++
c.log.Warn("handler failed", "attempt", attempts, "err", err)
if attempts > c.cfg.MaxRetryAttempts {
return fmt.Errorf("max attempts reached: %w", err)
}
// 回退等待
select {
case <-time.After(c.cfg.RetryBackoff):
case <-ctx.Done():
return ctx.Err()
}
}
}
func (c *Consumer) sendToDLQ(ctx context.Context, m kafka.Message, headers map[string]string, rootErr error, attempts int) error {
if c.dlqWriter == nil {
return errors.New("dlq writer not configured")
}
h := mergeHeaders(headers, map[string]string{
"x-error": truncateErr(rootErr, 2048),
"x-retries": strconv.Itoa(attempts),
"content-type": "application/x-protobuf",
"sdk-component": "dlq",
})
dlqMsg := kafka.Message{
Key: m.Key,
Value: m.Value, // 保留原始 payload
Time: time.Now(),
Headers: toKafkaHeaders(h),
}
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 10*time.Second)
defer cancel()
}
if err := c.dlqWriter.WriteMessages(ctx, dlqMsg); err != nil {
c.log.Error("write dlq message failed", "err", err)
return err
}
c.log.Error("message sent to DLQ", "partition", m.Partition, "offset", m.Offset)
return nil
}
func fromKafkaHeaders(hdrs []kafka.Header) map[string]string {
if len(hdrs) == 0 {
return map[string]string{}
}
m := make(map[string]string, len(hdrs))
for _, h := range hdrs {
m[h.Key] = string(h.Value)
}
return m
}
func truncateErr(err error, n int) string {
if err == nil {
return ""
}
s := err.Error()
if len(s) > n {
return s[:n]
}
return s
}
func buildDialerTransport(sec SecurityConfig, clientID string) (*kafka.Dialer, *kafka.Transport, error) {
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
ClientID: clientID,
DualStack: true,
}
if sec.EnableTLS {
tlsCfg, err := buildTLSConfig(sec)
if err != nil {
return nil, nil, err
}
dialer.TLS = tlsCfg
}
if sec.EnableSASL {
mech, err := buildSASLMechanism(sec)
if err != nil {
return nil, nil, err
}
dialer.SASLMechanism = mech
}
transport := &kafka.Transport{
Dial: dialer.DialFunc,
TLS: dialer.TLS,
}
return dialer, transport, nil
}
使用示例(集成业务逻辑):
// 假设已生成 pb 包:package pb; message Event { string id = 1; string type = 2; bytes payload = 3; int64 ts_unix_ms = 4; }
// 生产者发送:
// prod, _ := mqkafka.NewProducer(prodCfg, slog.Default())
// defer prod.Close()
// evt := &pb.Event{Id: "k1", Type: "created", Payload: []byte("..."), TsUnixMs: time.Now().UnixMilli()}
// _ = prod.Send(ctx, []byte(evt.Id), evt, map[string]string{"x-source":"svc-a"})
// 消费者处理:
// handler := func(ctx context.Context, key []byte, msg proto.Message, headers map[string]string) error {
// evt := msg.(*pb.Event)
// // TODO: 业务处理逻辑
// return nil
// }
// cons, _ := mqkafka.NewConsumer(consCfg, handler, func() proto.Message { return &pb.Event{} }, slog.Default())
// defer cons.Close()
// _ = cons.Run(ctx)
producer:
brokers: ["kafka-1:9092","kafka-2:9092"]
topic: "events"
clientID: "svc-a-producer"
compression: "snappy" # 可选: none|snappy|lz4|gzip|zstd
requiredAcks: "all" # 可选: none|leader|all
batchSize: 100
batchBytes: 1048576
batchTimeout: "50ms"
writeTimeout: "5s"
allowAutoTopicCreate: false
schemaVersion: "v1"
defaultHeaders:
x-env: "prod"
security:
enableTLS: true
caCertPath: "/etc/ssl/certs/ca.pem"
clientCertPath: "/etc/ssl/certs/client.pem"
clientKeyPath: "/etc/ssl/private/client.key"
insecureSkipVerify: false
enableSASL: true
saslMechanism: "SCRAM-SHA-512"
saslUsername: "${KAFKA_USERNAME}" # 建议从环境变量加载
saslPassword: "${KAFKA_PASSWORD}"
consumer:
brokers: ["kafka-1:9092","kafka-2:9092"]
topic: "events"
groupID: "svc-b-consumer"
clientID: "svc-b-consumer"
minBytes: 1
maxBytes: 10485760
maxWait: "250ms"
readTimeout: "10s"
commitInterval: "1s"
startFromEarliest: true
operationTimeout: "30s"
maxRetryAttempts: 3
retryBackoff: "500ms"
enableDLQ: true
dlqTopic: "events.DLQ"
security:
enableTLS: true
caCertPath: "/etc/ssl/certs/ca.pem"
clientCertPath: "/etc/ssl/certs/client.pem"
clientKeyPath: "/etc/ssl/private/client.key"
insecureSkipVerify: false
enableSASL: true
saslMechanism: "SCRAM-SHA-512"
saslUsername: "${KAFKA_USERNAME}"
saslPassword: "${KAFKA_PASSWORD}"
文件:proto/event.proto
syntax = "proto3";
package example.events;
option go_package = "your.module/path/internal/pb;pb";
message Event {
string id = 1; // 用于作为 Kafka 分区 key 的候选值
string type = 2; // 事件类型
bytes payload = 3; // 业务载荷(二进制)
int64 ts_unix_ms = 4; // 事件时间戳(毫秒)
}
生成命令(在项目根目录):
生成后在 Go 代码中引用:
如需进一步定制(例如:自定义分区策略、批处理发送/处理、Schema Registry 集成、压测与参数调优),请告知具体需求。
本模板基于 RabbitMQ 的 Java 客户端生成生产者与消费者代码,采用 JSON 消息格式,适用于微服务和分布式系统中的异步通信场景。模板包含:
application.properties,不包含敏感信息硬编码示例使用 Maven:
<dependencies>
<!-- RabbitMQ Java Client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
<!-- Jackson for JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.1</version>
</dependency>
<!-- SLF4J API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.13</version>
</dependency>
<!-- Logback (Runtime) -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.14</version>
</dependency>
</dependencies>
文件:src/main/java/com/example/messaging/rabbitmq/MessageProducer.java
package com.example.messaging.rabbitmq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* RabbitMQ JSON 消息生产者模板
* - 使用发布确认确保可靠投递
* - 设置消息属性(contentType、deliveryMode、messageId、timestamp 等)
* - 无敏感信息硬编码,配置通过环境变量或 application.properties
*
* 使用方式:
* MessageProducer producer = new MessageProducer(config);
* producer.initTopology(); // 可选:初始化交换机/队列/绑定(幂等)
* producer.publish("order.created", new MessageEnvelope(...));
* producer.close();
*/
public class MessageProducer implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MessageProducer.class);
private final RabbitConfig config;
private final ObjectMapper objectMapper;
private Connection connection;
private Channel channel;
public MessageProducer(RabbitConfig config) {
this.config = config;
this.objectMapper = JsonUtil.objectMapper();
}
/**
* 初始化连接与通道,并启用发布确认
*/
public void start() throws IOException, TimeoutException {
if (this.connection != null && this.connection.isOpen()) {
return;
}
ConnectionFactory factory = RabbitConfig.createConnectionFactory(config);
// 建立连接与通道
this.connection = factory.newConnection(config.getClientName());
this.channel = connection.createChannel();
// 启用发布确认
this.channel.confirmSelect();
log.info("Producer started: vhost={}, host={}, exchange={}",
config.getVhost(), config.getHost(), config.getExchangeName());
}
/**
* 并发安全性说明:
* - Channel 非线程安全,如需并发发送,请使用多个生产者实例或外部串行化调用。
*/
/**
* 幂等拓扑初始化:声明交换机、队列与绑定(若存在则跳过)
*/
public void initTopology() throws IOException {
TopologyInitializer.init(channel, config);
}
/**
* 发布 JSON 消息并等待确认
* @param routingKey 路由键
* @param envelope 消息封装(通用字段)
* @return true 表示确认成功;false 表示未确认或失败
*/
public boolean publish(String routingKey, MessageEnvelope envelope) {
ensureStarted();
try {
// 序列化为 JSON
byte[] body = objectMapper.writeValueAsBytes(envelope);
Map<String, Object> headers = new HashMap<>();
headers.put("type", envelope.getType());
headers.put("schema", envelope.getSchema()); // 可选:JSON Schema 标识
headers.put("origin", envelope.getOrigin()); // 可选:来源系统
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.contentEncoding("utf-8")
.deliveryMode(2) // 2 = 持久化
.messageId(envelope.getId())
.timestamp(Date.from(Instant.ofEpochMilli(envelope.getTimestamp())))
.correlationId(envelope.getCorrelationId())
.headers(headers)
.build();
// 发布到交换机
channel.basicPublish(
config.getExchangeName(),
routingKey,
props,
body
);
// 等待确认(单条或小批次场景适用)
channel.waitForConfirmsOrDie();
log.debug("Message published and confirmed: routingKey={}, messageId={}",
routingKey, envelope.getId());
return true;
} catch (IOException e) {
log.error("Publish failed (serialization or IO): routingKey={}, messageId={}, err={}",
routingKey, envelope.getId(), e.toString());
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Publish interrupted: routingKey={}, messageId={}", routingKey, envelope.getId());
return false;
} catch (AlreadyClosedException | TimeoutException e) {
log.error("Channel/Connection closed or timed out during publish: {}", e.toString());
return false;
}
}
private void ensureStarted() {
if (this.channel == null || !this.channel.isOpen()) {
throw new IllegalStateException("Producer not started. Call start() before publish().");
}
}
@Override
public void close() {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
} catch (Exception e) {
log.warn("Error closing channel: {}", e.toString());
}
try {
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
log.warn("Error closing connection: {}", e.toString());
}
}
}
辅助类:RabbitConfig.java
package com.example.messaging.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Properties;
/**
* 统一配置加载:
* - 优先读取环境变量,其次读取 application.properties
* - 不输出敏感信息到日志(如密码)
*/
public class RabbitConfig {
private static final Logger log = LoggerFactory.getLogger(RabbitConfig.class);
private final String host;
private final int port;
private final String username;
private final String password;
private final String vhost;
private final String exchangeName;
private final String exchangeType; // e.g. topic, direct
private final String queueName;
private final String routingKey;
private final String dlxName;
private final String dlqName;
private final int prefetch;
private final int consumerCount;
private final boolean useTls;
private final String clientName;
public RabbitConfig(Properties props) {
this.host = getEnvOrProp("RABBITMQ_HOST", props, "localhost");
this.port = Integer.parseInt(getEnvOrProp("RABBITMQ_PORT", props, "5672"));
this.username = getEnvOrProp("RABBITMQ_USERNAME", props, "guest");
this.password = getEnvOrProp("RABBITMQ_PASSWORD", props, "guest");
this.vhost = getEnvOrProp("RABBITMQ_VHOST", props, "/");
this.exchangeName = getEnvOrProp("RABBITMQ_EXCHANGE", props, "app.events");
this.exchangeType = getEnvOrProp("RABBITMQ_EXCHANGE_TYPE", props, "topic");
this.queueName = getEnvOrProp("RABBITMQ_QUEUE", props, "app.events.q");
this.routingKey = getEnvOrProp("RABBITMQ_ROUTING_KEY", props, "app.*");
this.dlxName = getEnvOrProp("RABBITMQ_DLX", props, "app.events.dlx");
this.dlqName = getEnvOrProp("RABBITMQ_DLQ", props, "app.events.q.dlq");
this.prefetch = Integer.parseInt(getEnvOrProp("RABBITMQ_PREFETCH", props, "50"));
this.consumerCount = Integer.parseInt(getEnvOrProp("RABBITMQ_CONSUMER_COUNT", props, "2"));
this.useTls = Boolean.parseBoolean(getEnvOrProp("RABBITMQ_USE_TLS", props, "false"));
this.clientName = getEnvOrProp("RABBITMQ_CLIENT_NAME", props, "java-client");
}
public static ConnectionFactory createConnectionFactory(RabbitConfig cfg) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(cfg.host);
factory.setPort(cfg.port);
factory.setVirtualHost(cfg.vhost);
factory.setUsername(cfg.username);
factory.setPassword(cfg.password);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(Duration.ofSeconds(5).toMillis());
if (cfg.useTls) {
try {
factory.useSslProtocol(); // 使用默认 TLS,证书与信任由 JVM 配置
} catch (Exception e) {
throw new IllegalStateException("Failed to enable TLS: " + e.getMessage(), e);
}
}
// 合理超时设置
factory.setConnectionTimeout(10_000);
factory.setHandshakeTimeout(10_000);
factory.setRequestedHeartbeat(30);
return factory;
}
private static String getEnvOrProp(String key, Properties props, String defaultVal) {
String env = System.getenv(key);
if (env != null && !env.isEmpty()) return env;
String prop = props.getProperty(key);
if (prop != null && !prop.isEmpty()) return prop;
return defaultVal;
}
public String getHost() { return host; }
public int getPort() { return port; }
public String getUsername() { return username; }
public String getPassword() { return password; }
public String getVhost() { return vhost; }
public String getExchangeName() { return exchangeName; }
public String getExchangeType() { return exchangeType; }
public String getQueueName() { return queueName; }
public String getRoutingKey() { return routingKey; }
public String getDlxName() { return dlxName; }
public String getDlqName() { return dlqName; }
public int getPrefetch() { return prefetch; }
public int getConsumerCount() { return consumerCount; }
public boolean isUseTls() { return useTls; }
public String getClientName() { return clientName; }
}
辅助类:TopologyInitializer.java
package com.example.messaging.rabbitmq;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* 声明交换机、队列、DLQ 与绑定(幂等)
*/
public class TopologyInitializer {
public static void init(Channel channel, RabbitConfig cfg) throws IOException {
// 主交换机
channel.exchangeDeclare(cfg.getExchangeName(), cfg.getExchangeType(), true, false, null);
// DLX 与 DLQ
channel.exchangeDeclare(cfg.getDlxName(), "topic", true, false, null);
channel.queueDeclare(cfg.getDlqName(), true, false, false, null);
channel.queueBind(cfg.getDlqName(), cfg.getDlxName(), cfg.getQueueName() + ".dlq");
// 主队列配置:关联 DLX
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", cfg.getDlxName());
args.put("x-dead-letter-routing-key", cfg.getQueueName() + ".dlq");
channel.queueDeclare(cfg.getQueueName(), true, false, false, args);
channel.queueBind(cfg.getQueueName(), cfg.getExchangeName(), cfg.getRoutingKey());
}
}
辅助类:JsonUtil.java
package com.example.messaging.rabbitmq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.DeserializationFeature;
public class JsonUtil {
public static ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.findAndRegisterModules();
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
return mapper;
}
}
模型类:MessageEnvelope.java
package com.example.messaging.rabbitmq;
import java.util.Map;
/**
* 通用消息封装(可根据需要扩展)
*/
public class MessageEnvelope {
private String id;
private String type;
private long timestamp;
private String correlationId;
private String schema; // 可选:用于验证或路由
private String origin; // 可选:来源系统标识
private Map<String, Object> payload;
public MessageEnvelope() {}
public MessageEnvelope(String id, String type, long timestamp, String correlationId,
String schema, String origin, Map<String, Object> payload) {
this.id = id;
this.type = type;
this.timestamp = timestamp;
this.correlationId = correlationId;
this.schema = schema;
this.origin = origin;
this.payload = payload;
}
public String getId() { return id; }
public String getType() { return type; }
public long getTimestamp() { return timestamp; }
public String getCorrelationId() { return correlationId; }
public String getSchema() { return schema; }
public String getOrigin() { return origin; }
public Map<String, Object> getPayload() { return payload; }
public void setId(String id) { this.id = id; }
public void setType(String type) { this.type = type; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public void setCorrelationId(String correlationId) { this.correlationId = correlationId; }
public void setSchema(String schema) { this.schema = schema; }
public void setOrigin(String origin) { this.origin = origin; }
public void setPayload(Map<String, Object> payload) { this.payload = payload; }
}
文件:src/main/java/com/example/messaging/rabbitmq/MessageConsumer.java
package com.example.messaging.rabbitmq;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
/**
* RabbitMQ JSON 消费者模板
* - 手动 ACK
* - 每个消费者线程独立 Channel(符合 RabbitMQ 线程模型)
* - 支持预取与并发
* - 错误时将消息 dead-letter 到 DLQ,避免无限重试
*
* 使用方式:
* MessageConsumer consumer = new MessageConsumer(config);
* consumer.start();
* // 运行...
* consumer.close();
*/
public class MessageConsumer implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);
private final RabbitConfig config;
private final ObjectMapper objectMapper;
private Connection connection;
private ExecutorService executor;
public MessageConsumer(RabbitConfig config) {
this.config = config;
this.objectMapper = JsonUtil.objectMapper();
}
public void start() throws IOException, TimeoutException {
ConnectionFactory factory = RabbitConfig.createConnectionFactory(config);
this.executor = Executors.newFixedThreadPool(config.getConsumerCount(),
r -> {
Thread t = new Thread(r, "rmq-consumer");
t.setDaemon(true);
return t;
});
this.connection = factory.newConnection(config.getClientName() + "-consumer");
// 启动多个并发消费者,每个线程一个 Channel
for (int i = 0; i < config.getConsumerCount(); i++) {
executor.submit(() -> {
try (Channel channel = connection.createChannel()) {
channel.basicQos(config.getPrefetch());
consumeLoop(channel);
} catch (IOException e) {
log.error("Consumer channel IO error: {}", e.toString());
}
});
}
log.info("Consumer started: vhost={}, host={}, queue={}, concurrency={}, prefetch={}",
config.getVhost(), config.getHost(), config.getQueueName(),
config.getConsumerCount(), config.getPrefetch());
}
private void consumeLoop(Channel channel) throws IOException {
// 确保队列已存在(实际可在启动时统一拓扑初始化)
channel.basicConsume(
config.getQueueName(),
false, // 手动确认
new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery delivery) throws IOException {
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
try {
MessageEnvelope envelope = parseMessage(delivery.getBody());
// 业务处理占位(请在项目中替换为实际处理逻辑)
processMessage(envelope, delivery.getProperties(), delivery.getEnvelope());
channel.basicAck(deliveryTag, false);
log.debug("Message acked: tag={}, messageId={}",
deliveryTag, safeProp(delivery.getProperties().getMessageId()));
} catch (JsonProcessingException jpe) {
// JSON 解析失败:不可恢复,直接 dead-letter
channel.basicNack(deliveryTag, false, false);
log.warn("Message rejected (JSON parse error): tag={}, err={}",
deliveryTag, jpe.getOriginalMessage());
} catch (Exception ex) {
// 其他错误:默认 dead-letter,避免重试风暴
channel.basicNack(deliveryTag, false, false);
log.error("Message processing failed and dead-lettered: tag={}, err={}",
deliveryTag, ex.toString());
}
}
},
new CancelCallback() {
@Override
public void handle(String consumerTag) {
log.warn("Consumer canceled: tag={}", consumerTag);
}
}
);
}
private MessageEnvelope parseMessage(byte[] body) throws JsonProcessingException {
// 可扩展 Schema 验证
return objectMapper.readValue(body, MessageEnvelope.class);
}
/**
* 业务处理占位:在此实现与业务相关的处理
* 注意:此模板不包含具体业务逻辑,根据需要进行扩展
*/
private void processMessage(MessageEnvelope envelope,
AMQP.BasicProperties properties,
Envelope envelopeMeta) {
// 示例:可根据 envelope.getType() 分发到不同处理器
// 此处仅做占位,不实现具体业务
}
private String safeProp(String v) {
return v == null ? "" : v;
}
@Override
public void close() {
if (executor != null) {
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (Exception e) {
log.warn("Error closing connection: {}", e.toString());
}
}
}
文件:src/main/resources/application.properties
# RabbitMQ 基本连接(可通过环境变量覆盖)
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_VHOST=/
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest
# 拓扑配置
RABBITMQ_EXCHANGE=app.events
RABBITMQ_EXCHANGE_TYPE=topic
RABBITMQ_QUEUE=app.events.q
RABBITMQ_ROUTING_KEY=app.*
# 死信交换机/队列
RABBITMQ_DLX=app.events.dlx
RABBITMQ_DLQ=app.events.q.dlq
# 消费者并发与预取
RABBITMQ_CONSUMER_COUNT=2
RABBITMQ_PREFETCH=50
# TLS(可选)
RABBITMQ_USE_TLS=false
# 客户端名称
RABBITMQ_CLIENT_NAME=java-client
文件:src/main/resources/logback.xml(日志配置)
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%thread] %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
<!-- 可针对生产者/消费者单独设置日志级别 -->
<logger name="com.example.messaging.rabbitmq" level="debug"/>
</configuration>
initTopology() 创建交换机、队列与 DLQ;也可由独立的部署步骤完成。RabbitConfig 后调用 MessageProducer#start()。publish(routingKey, MessageEnvelope) 发送消息;返回值表示发布确认是否成功。MessageConsumer#start() 启动并发消费者。prefetch 与消费者并发数,并检查业务处理耗时。此模板遵循企业级开发最佳实践,避免使用过时 API,无敏感信息硬编码,可直接集成到现有项目并进行扩展。
本模板基于 ActiveMQ(OpenWire 协议)与 C# (.NET 8),提供标准化的生产者与消费者代码框架,使用 XML 作为消息格式。代码遵循企业级最佳实践,包含:
适用场景:新项目初始化、现有系统扩展、技术栈迁移,便于直接集成到现有 .NET 项目中。
安装示例:
dotnet add package Apache.NMS.ActiveMQ --version 1.8.0
dotnet add package Apache.NMS --version 1.8.0
dotnet add package Microsoft.Extensions.Logging.Abstractions
dotnet add package Microsoft.Extensions.Configuration
dotnet add package Microsoft.Extensions.Configuration.Json
dotnet add package Microsoft.Extensions.Configuration.EnvironmentVariables
目标框架建议:net8.0
说明:ActiveMQ XML 生产者,支持连接复用、持久化投递、TTL、消息头设置、XML 序列化与可选的 XSD 校验。
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using System.Xml.Schema;
using System.Xml.Serialization;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Microsoft.Extensions.Logging;
namespace Messaging.ActiveMqXml
{
public sealed class ActiveMqOptions
{
// Broker Failover URIs,例如: tcp://mq1:61616, tcp://mq2:61616
public List<string> BrokerUris { get; set; } = new List<string>();
// 队列名
public string QueueName { get; set; } = "demo.queue";
// 可选:认证信息(建议使用安全的秘密管理)
public string? Username { get; set; }
public string? Password { get; set; }
// 客户端标识(用于监控与调试)
public string ClientId { get; set; } = $"producer-{Environment.MachineName}";
// 生产者设置
public bool PersistentDelivery { get; set; } = true;
public int? ProducerPriority { get; set; } = null; // 0-9
public int DefaultTimeToLiveMs { get; set; } = 0; // 0 表示不过期
// 连接与重试
public int MaxReconnectAttempts { get; set; } = 10;
public int InitialReconnectDelayMs { get; set; } = 500;
public int ReceiveTimeoutMs { get; set; } = 2000; // 不用于生产者,但保持统一
// XML 设置
public string? XmlSchemaPath { get; set; } = null; // 若提供则会进行 XSD 校验
public string ContentTypeHeader { get; set; } = "application/xml";
// 高级设置(例如预取等,可在消费者端使用)
public int PrefetchSize { get; set; } = 1000;
}
public interface IXmlMessageSerializer
{
string Serialize<T>(T obj);
T Deserialize<T>(string xml);
void ValidateXml(string xml); // 如果设置了 XSD
}
public sealed class XmlMessageSerializer : IXmlMessageSerializer
{
private readonly string? _xsdPath;
public XmlMessageSerializer(string? xsdPath = null)
{
_xsdPath = xsdPath;
}
public string Serialize<T>(T obj)
{
var serializer = new XmlSerializer(typeof(T));
var settings = new XmlWriterSettings
{
Encoding = new UTF8Encoding(false),
Indent = true,
OmitXmlDeclaration = false
};
using var sw = new StringWriter();
using var xw = XmlWriter.Create(sw, settings);
serializer.Serialize(xw, obj);
var xml = sw.ToString();
if (!string.IsNullOrWhiteSpace(_xsdPath))
{
ValidateXml(xml);
}
return xml;
}
public T Deserialize<T>(string xml)
{
var serializer = new XmlSerializer(typeof(T));
using var sr = new StringReader(xml);
return (T)serializer.Deserialize(sr)!;
}
public void ValidateXml(string xml)
{
if (string.IsNullOrWhiteSpace(_xsdPath))
return;
var schemas = new XmlSchemaSet();
using var xsdStream = File.OpenRead(_xsdPath);
schemas.Add(null, XmlReader.Create(xsdStream));
var settings = new XmlReaderSettings
{
ValidationType = ValidationType.Schema,
Schemas = schemas
};
settings.ValidationEventHandler += (s, e) =>
{
throw new XmlSchemaValidationException($"XML validation error: {e.Message}", e.Exception);
};
using var sr = new StringReader(xml);
using var reader = XmlReader.Create(sr, settings);
while (reader.Read()) { } // 触发验证
}
}
public sealed class ActiveMqXmlProducer : IDisposable
{
private readonly ActiveMqOptions _options;
private readonly ILogger? _logger;
private readonly IXmlMessageSerializer _serializer;
private IConnectionFactory? _factory;
private IConnection? _connection;
private ISession? _session;
private IMessageProducer? _producer;
private IDestination? _destination;
private readonly object _sync = new();
private bool _disposed;
public ActiveMqXmlProducer(ActiveMqOptions options, IXmlMessageSerializer? serializer = null, ILogger? logger = null)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_serializer = serializer ?? new XmlMessageSerializer(options.XmlSchemaPath);
_logger = logger;
EnsureInitialized();
}
private string BuildFailoverUri()
{
if (_options.BrokerUris == null || _options.BrokerUris.Count == 0)
throw new InvalidOperationException("BrokerUris must be configured.");
// failover:(tcp://host1:61616,tcp://host2:61616)?maxReconnectAttempts=10&initialReconnectDelay=500
var brokers = string.Join(",", _options.BrokerUris);
var query = $"maxReconnectAttempts={_options.MaxReconnectAttempts}&initialReconnectDelay={_options.InitialReconnectDelayMs}&randomize=false";
return $"failover:({brokers})?{query}";
}
private void EnsureInitialized()
{
lock (_sync)
{
if (_factory != null) return;
var uri = BuildFailoverUri();
_factory = new ConnectionFactory(uri);
}
}
private void EnsureConnected()
{
lock (_sync)
{
if (_connection == null)
{
_connection = string.IsNullOrWhiteSpace(_options.Username)
? _factory!.CreateConnection()
: _factory!.CreateConnection(_options.Username, _options.Password);
_connection.ClientId = _options.ClientId;
_connection.Start();
_logger?.LogInformation("ActiveMQ connection started. ClientId={ClientId}", _options.ClientId);
}
if (_session == null)
{
// 生产者使用自动确认会话即可
_session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
}
if (_destination == null)
{
_destination = _session.GetQueue(_options.QueueName);
}
if (_producer == null)
{
_producer = _session.CreateProducer(_destination);
_producer.DeliveryMode = _options.PersistentDelivery ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent;
if (_options.ProducerPriority.HasValue)
_producer.Priority = _options.ProducerPriority.Value;
if (_options.DefaultTimeToLiveMs > 0)
_producer.TimeToLive = TimeSpan.FromMilliseconds(_options.DefaultTimeToLiveMs);
}
}
}
public Task SendAsync<T>(T payload, IDictionary<string, object>? headers = null, string? correlationId = null, CancellationToken cancellationToken = default)
{
if (payload == null) throw new ArgumentNullException(nameof(payload));
cancellationToken.ThrowIfCancellationRequested();
EnsureConnected();
try
{
var xml = _serializer.Serialize(payload);
var msg = _session!.CreateTextMessage(xml);
// 标准化头
msg.NMSType = typeof(T).FullName;
if (!string.IsNullOrWhiteSpace(correlationId))
msg.NMSCorrelationID = correlationId;
msg.Properties.SetString("content-type", _options.ContentTypeHeader);
msg.Properties.SetString("x-serializer", "XmlSerializer");
// 附加自定义头
if (headers != null)
{
foreach (var kv in headers)
{
switch (kv.Value)
{
case null:
continue;
case string s:
msg.Properties.SetString(kv.Key, s);
break;
case int i:
msg.Properties.SetInt(kv.Key, i);
break;
case long l:
msg.Properties.SetLong(kv.Key, l);
break;
case bool b:
msg.Properties.SetBool(kv.Key, b);
break;
case byte bt:
msg.Properties.SetByte(kv.Key, bt);
break;
case short sh:
msg.Properties.SetShort(kv.Key, sh);
break;
case float f:
msg.Properties.SetFloat(kv.Key, f);
break;
case double d:
msg.Properties.SetDouble(kv.Key, d);
break;
default:
msg.Properties.SetString(kv.Key, kv.Value.ToString()!);
break;
}
}
}
_producer!.Send(msg);
_logger?.LogInformation("Message sent to {Queue} (Persistent={Persistent}, TTL={TTL}ms).",
_options.QueueName,
_options.PersistentDelivery,
_options.DefaultTimeToLiveMs);
return Task.CompletedTask;
}
catch (XmlSchemaValidationException xsdEx)
{
_logger?.LogError(xsdEx, "XML schema validation failed.");
throw;
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to send message.");
throw;
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
_producer?.Close();
_session?.Close();
_connection?.Close();
}
catch (Exception ex)
{
_logger?.LogWarning(ex, "Error while closing ActiveMQ resources.");
}
_producer = null;
_session = null;
_connection = null;
_factory = null;
}
}
}
说明:ActiveMQ XML 消费者,支持并发、多线程消费,手动确认模式(ClientAcknowledge),异常时进行会话恢复触发重投,包含 XML 反序列化与可选校验。
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Microsoft.Extensions.Logging;
namespace Messaging.ActiveMqXml
{
public enum AckMode
{
Auto,
Client
}
public sealed class ConsumerOptions
{
public ActiveMqOptions Broker { get; set; } = new ActiveMqOptions();
// 消费者并发度:建议与队列负载及服务实例数匹配
public int Concurrency { get; set; } = 2;
// 会话确认模式
public AckMode Acknowledgement { get; set; } = AckMode.Client;
// 每个消费者实例的接收超时
public int ReceiveTimeoutMs { get; set; } = 1000;
// 出错时是否 Session.Recover()
public bool RecoverOnError { get; set; } = true;
}
public interface IXmlMessageHandler<T>
{
// 业务处理入口:实现方决定是否线程安全
Task HandleAsync(T message, IMessage rawMessage, CancellationToken ct);
}
public sealed class ActiveMqXmlConsumer<T> : IDisposable
{
private readonly ConsumerOptions _options;
private readonly IXmlMessageSerializer _serializer;
private readonly IXmlMessageHandler<T> _handler;
private readonly ILogger? _logger;
private readonly List<ConsumerWorker> _workers = new();
private bool _disposed;
public ActiveMqXmlConsumer(ConsumerOptions options, IXmlMessageSerializer serializer, IXmlMessageHandler<T> handler, ILogger? logger = null)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
_logger = logger;
}
public void Start(CancellationToken cancellationToken = default)
{
if (_options.Broker.BrokerUris == null || _options.Broker.BrokerUris.Count == 0)
throw new InvalidOperationException("BrokerUris must be configured.");
for (int i = 0; i < _options.Concurrency; i++)
{
var worker = new ConsumerWorker(_options, _serializer, _handler, _logger, i);
_workers.Add(worker);
worker.Start(cancellationToken);
}
_logger?.LogInformation("ActiveMQ consumers started. Concurrency={Concurrency}", _options.Concurrency);
}
public async Task StopAsync()
{
foreach (var w in _workers)
{
await w.StopAsync();
}
_workers.Clear();
_logger?.LogInformation("ActiveMQ consumers stopped.");
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
foreach (var w in _workers)
{
w.Dispose();
}
_workers.Clear();
}
private sealed class ConsumerWorker : IDisposable
{
private readonly ConsumerOptions _options;
private readonly IXmlMessageSerializer _serializer;
private readonly IXmlMessageHandler<T> _handler;
private readonly ILogger? _logger;
private readonly int _index;
private IConnectionFactory? _factory;
private IConnection? _connection;
private ISession? _session;
private IMessageConsumer? _consumer;
private IDestination? _destination;
private Task? _runTask;
private CancellationTokenSource? _cts;
public ConsumerWorker(ConsumerOptions options, IXmlMessageSerializer serializer, IXmlMessageHandler<T> handler, ILogger? logger, int index)
{
_options = options;
_serializer = serializer;
_handler = handler;
_logger = logger;
_index = index;
}
private string BuildFailoverUri()
{
var brokers = string.Join(",", _options.Broker.BrokerUris);
var query = $"maxReconnectAttempts={_options.Broker.MaxReconnectAttempts}&initialReconnectDelay={_options.Broker.InitialReconnectDelayMs}&randomize=false&jms.useAsyncSend=true&nms.prefetchPolicy.queuePrefetch={_options.Broker.PrefetchSize}";
return $"failover:({brokers})?{query}";
}
private void EnsureConnected()
{
if (_factory == null)
{
_factory = new ConnectionFactory(BuildFailoverUri());
}
if (_connection == null)
{
_connection = string.IsNullOrWhiteSpace(_options.Broker.Username)
? _factory.CreateConnection()
: _factory.CreateConnection(_options.Broker.Username, _options.Broker.Password);
_connection.ClientId = $"{_options.Broker.ClientId}-consumer-{_index}";
_connection.Start();
_logger?.LogInformation("Consumer[{Index}] connection started.", _index);
}
if (_session == null)
{
var ack = _options.Acknowledgement == AckMode.Auto ? AcknowledgementMode.AutoAcknowledge : AcknowledgementMode.ClientAcknowledge;
_session = _connection.CreateSession(ack);
}
if (_destination == null)
{
_destination = _session.GetQueue(_options.Broker.QueueName);
}
if (_consumer == null)
{
_consumer = _session.CreateConsumer(_destination);
}
}
public void Start(CancellationToken externalToken)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(externalToken);
var ct = _cts.Token;
_runTask = Task.Run(async () =>
{
EnsureConnected();
var timeout = TimeSpan.FromMilliseconds(_options.ReceiveTimeoutMs);
while (!ct.IsCancellationRequested)
{
IMessage? raw = null;
try
{
raw = _consumer!.Receive(timeout);
if (raw == null) continue;
if (raw is ITextMessage textMsg)
{
var xml = textMsg.Text;
var contentType = textMsg.Properties.GetString("content-type");
if (!string.Equals(contentType, _options.Broker.ContentTypeHeader, StringComparison.OrdinalIgnoreCase))
{
_logger?.LogWarning("Consumer[{Index}] received unexpected content-type: {ContentType}", _index, contentType);
}
T model = _serializer.Deserialize<T>(xml);
await _handler.HandleAsync(model, raw, ct);
if (_options.Acknowledgement == AckMode.Client)
{
raw.Acknowledge();
}
}
else
{
_logger?.LogWarning("Consumer[{Index}] received non-text message: {Type}", _index, raw.GetType().FullName);
// 非文本消息通常也应确认,避免阻塞队列
if (_options.Acknowledgement == AckMode.Client)
{
raw.Acknowledge();
}
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger?.LogError(ex, "Consumer[{Index}] handling message failed.", _index);
if (_options.RecoverOnError && _options.Acknowledgement == AckMode.Client)
{
try
{
// 触发重投递(遵循 ActiveMQ 的 RedeliveryPolicy/DLQ 策略)
_session!.Recover();
}
catch (Exception rex)
{
_logger?.LogWarning(rex, "Consumer[{Index}] session recover failed.", _index);
}
}
}
}
}, ct);
}
public async Task StopAsync()
{
if (_cts != null)
{
_cts.Cancel();
if (_runTask != null)
{
try
{
await _runTask;
}
catch { /* 忽略取消异常 */ }
}
_cts.Dispose();
_cts = null;
}
try
{
_consumer?.Close();
_session?.Close();
_connection?.Close();
}
catch (Exception ex)
{
_logger?.LogWarning(ex, "Consumer[{Index}] error while closing.", _index);
}
_consumer = null;
_session = null;
_connection = null;
_factory = null;
}
public void Dispose()
{
_ = StopAsync();
}
}
}
}
示例处理器(模板,无业务逻辑):
using System.Threading;
using System.Threading.Tasks;
using Apache.NMS;
using Microsoft.Extensions.Logging;
namespace Messaging.ActiveMqXml
{
// 示例:将消息打印日志;实际项目中替换为真实处理逻辑
public sealed class LoggingXmlMessageHandler<T> : IXmlMessageHandler<T>
{
private readonly ILogger? _logger;
public LoggingXmlMessageHandler(ILogger? logger = null)
{
_logger = logger;
}
public Task HandleAsync(T message, IMessage rawMessage, CancellationToken ct)
{
_logger?.LogInformation("Received message type={Type}, CorrelationId={CorrelationId}", typeof(T).FullName, rawMessage.NMSCorrelationID);
// 在此处执行实际业务处理(模板不包含具体逻辑)
return Task.CompletedTask;
}
}
}
appsettings.json(敏感信息请使用环境变量或安全存储):
{
"ActiveMq": {
"BrokerUris": [
"tcp://localhost:61616"
],
"QueueName": "demo.xml.queue",
"Username": "",
"Password": "",
"ClientId": "demo-client",
"PersistentDelivery": true,
"ProducerPriority": null,
"DefaultTimeToLiveMs": 0,
"MaxReconnectAttempts": 10,
"InitialReconnectDelayMs": 500,
"PrefetchSize": 1000,
"XmlSchemaPath": null,
"ContentTypeHeader": "application/xml"
},
"Consumer": {
"Concurrency": 2,
"Acknowledgement": "Client",
"ReceiveTimeoutMs": 1000,
"RecoverOnError": true
}
}
最小示例 Program.cs(演示如何加载配置并运行生产者/消费者):
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Messaging.ActiveMqXml;
class Program
{
static async Task Main(string[] args)
{
var config = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", optional: true)
.AddEnvironmentVariables()
.Build();
var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("Demo");
var mqOptions = config.GetSection("ActiveMq").Get<ActiveMqOptions>() ?? new ActiveMqOptions
{
BrokerUris = new() { "tcp://localhost:61616" },
QueueName = "demo.xml.queue"
};
var consumerOptions = config.GetSection("Consumer").Get<ConsumerOptions>() ?? new ConsumerOptions
{
Broker = mqOptions
};
var serializer = new XmlMessageSerializer(mqOptions.XmlSchemaPath);
// 启动消费者
var handler = new LoggingXmlMessageHandler<SampleMessage>(loggerFactory.CreateLogger("Handler"));
var consumer = new ActiveMqXmlConsumer<SampleMessage>(consumerOptions, serializer, handler, loggerFactory.CreateLogger("Consumer"));
var cts = new CancellationTokenSource();
consumer.Start(cts.Token);
// 发送示例消息
using var producer = new ActiveMqXmlProducer(mqOptions, serializer, loggerFactory.CreateLogger("Producer"));
var sample = new SampleMessage { Id = Guid.NewGuid().ToString(), CreatedAtUtc = DateTime.UtcNow, Payload = "hello xml" };
await producer.SendAsync(sample, correlationId: sample.Id);
Console.WriteLine("Press ENTER to stop...");
Console.ReadLine();
cts.Cancel();
await consumer.StopAsync();
}
}
public class SampleMessage
{
public string Id { get; set; } = default!;
public DateTime CreatedAtUtc { get; set; }
public string Payload { get; set; } = default!;
}
架构与技术栈选择
发送策略与最佳实践
消费模式与并发
XML 序列化与验证
错误处理与日志
安全注意事项
配置建议
集成步骤
该模板避免了过时和不安全的用法,保持通用性与可维护性;请根据实际业务需求扩展重试、度量指标、死信队列处理等功能。
在新微服务或模块中,快速生成Kafka、RabbitMQ、SQS等生产者/消费者代码,标准化错误处理与日志,缩短接入与联调时间。
批量产出统一的通信模板与配置示例,在分布式系统落地一致的消息规范,降低跨团队差异与维护风险。
将接入流程模板化并纳入流水线,自动生成依赖与环境配置,加速部署、灰度与回滚,减少重复脚本维护。
帮助研发团队在微服务与分布式场景下,以最少投入迅速生成消息队列生产者/消费者代码模板,做到开箱可用、规范统一、稳定可靠;支持多种队列与主流语言,覆盖新项目搭建、架构扩展、技术栈迁移等高频场景;通过可读注释、完整配置与使用说明,缩短交付周期、降低故障率;引导试用并升级至付费版,获得更丰富队列类型与语言支持、团队规范预设、合规与安全校验、私有化部署、模板库管理与持续优化服务。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期