diff --git a/pkg/events/rabbitmq/options.go b/pkg/events/rabbitmq/options.go index 6e2eb62..43243c8 100644 --- a/pkg/events/rabbitmq/options.go +++ b/pkg/events/rabbitmq/options.go @@ -7,6 +7,54 @@ import ( "github.com/gothunder/thunder/internal/events/rabbitmq" ) +func WithURL(url string) rabbitmq.RabbitmqConfigOption { + return func(c *rabbitmq.Config) { + c.URL = url + } +} + +func WithExchangeName(name string) rabbitmq.RabbitmqConfigOption { + return func(c *rabbitmq.Config) { + c.ExchangeName = name + } +} + +func WithQueueName(name string) rabbitmq.RabbitmqConfigOption { + return func(c *rabbitmq.Config) { + c.QueueName = name + } +} + +func WithConsumerName(name string) rabbitmq.RabbitmqConfigOption { + return func(c *rabbitmq.Config) { + c.ConsumerName = name + } +} + +func WithConsumerConcurrency(concurrency int) rabbitmq.RabbitmqConfigOption { + return func(c *rabbitmq.Config) { + c.ConsumerConcurrency = concurrency + } +} + +func WithPrefetchCount(prefetch int) rabbitmq.RabbitmqConfigOption { + return func(c *rabbitmq.Config) { + c.PrefetchCount = prefetch + } +} + +func WithMaxRetries(maxRetries int) rabbitmq.RabbitmqConfigOption { + return func(c *rabbitmq.Config) { + c.MaxRetries = maxRetries + } +} + +func WithDeleteDLX(deleteDLX bool) rabbitmq.RabbitmqConfigOption { + return func(c *rabbitmq.Config) { + c.DeleteDLX = deleteDLX + } +} + func WithQueueNamePosfix(posfix string) rabbitmq.RabbitmqConfigOption { return func(c *rabbitmq.Config) { c.QueueName = fmt.Sprintf("%s_%s", c.QueueName, posfix) diff --git a/pkg/log/slog_bridge.go b/pkg/log/slog_bridge.go new file mode 100644 index 0000000..e0768c9 --- /dev/null +++ b/pkg/log/slog_bridge.go @@ -0,0 +1,65 @@ +package log + +import ( + "context" + "encoding/json" + "log/slog" + + "github.com/rs/zerolog" +) + +type slogWriter struct { + logger *slog.Logger +} + +func (w *slogWriter) Write(p []byte) (n int, err error) { + var entry map[string]interface{} + if err := json.Unmarshal(p, &entry); err != nil { + w.logger.Info(string(p)) + return len(p), nil + } + + level := slog.LevelInfo + if lvl, ok := entry["level"].(string); ok { + switch lvl { + case "debug", "trace": + level = slog.LevelDebug + case "info": + level = slog.LevelInfo + case "warn", "warning": + level = slog.LevelWarn + case "error", "fatal", "panic": + level = slog.LevelError + } + delete(entry, "level") + } + + msg := "" + if m, ok := entry["message"].(string); ok { + msg = m + delete(entry, "message") + } + + delete(entry, "time") + + args := make([]any, 0, len(entry)*2) + for k, v := range entry { + args = append(args, k, v) + } + + w.logger.Log(context.Background(), level, msg, args...) + return len(p), nil +} + +// NewLoggerFromSlog creates a *zerolog.Logger that outputs to the given slog.Logger. +// Use this when your service uses slog but needs to pass a logger to Thunder. +// +// Example: +// +// logger := log.NewLoggerFromSlog(slog.Default()) +// consumer, _ := rabbitmq.NewRabbitMQConsumer(logger, ...) +func NewLoggerFromSlog(slogger *slog.Logger) *zerolog.Logger { + writer := &slogWriter{logger: slogger} + logger := zerolog.New(writer).With().Timestamp().Logger() + return &logger +}