Skip to content
This repository was archived by the owner on Feb 21, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions command/command.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package command

import (
"github.com/ricbra/rabbitmq-cli-consumer/config"
"strings"
)

var (
Cconf *config.Config
)

func Factory(baseCmd string) *CommandFactory {
var pcs []string
if split := strings.Split(baseCmd, " "); len(split) > 1 {
Expand Down
33 changes: 32 additions & 1 deletion command/command_executer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,58 @@ import (
type CommandExecuter struct {
errLogger *log.Logger
infLogger *log.Logger
netLogger *NetLogger
}

func New(errLogger, infLogger *log.Logger) *CommandExecuter {

if len(Cconf.Logs.Rpc) > 1 {
netLogger := NewNetLogger()

return &CommandExecuter{
errLogger: errLogger,
infLogger: infLogger,
netLogger: netLogger,
}
}

return &CommandExecuter{
errLogger: errLogger,
infLogger: infLogger,
}

}

func (me CommandExecuter) Execute(cmd *exec.Cmd) bool {
func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool {
me.infLogger.Println("Processing message...")
me.infLogger.Printf("Cmd: %s params: %s \n", cmd.Path, cmd.Args)
out, err := cmd.CombinedOutput()

//log output php script to info
me.infLogger.Printf("Output php: %s\n", string(out[:]))

if err != nil {
me.infLogger.Println("Failed. Check error log for details.")
me.errLogger.Printf("Failed: %s\n", string(out[:]))
me.errLogger.Printf("Error: %s\n", err)

if len(Cconf.Logs.Rpc) > 1 {
me.infLogger.Printf("rpc parameters: %s", Cconf.Logs.Rpc)
if err := me.netLogger.Send(out[:], body[:], true); err != nil {
me.infLogger.Printf("failed sending provision error event -> error: %s", err)
}
}

return false
}

me.infLogger.Println("Processed!")

if len(Cconf.Logs.Rpc) > 1 {
if err := me.netLogger.Send(out[:], body[:], false); err != nil {
me.infLogger.Printf("failed sending provision success event -> error: %s", err)
}
}

return true
}
70 changes: 70 additions & 0 deletions command/netlogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package command

import (
"bytes"
"crypto/tls"
"encoding/json"
"math/rand"
"net/http"
)

type ProvisionEvent struct {
JsonRpc string `json:"jsonrpc"`
Method string `json:"method"`
Id int32 `json:"id"`
Params *Parameters `json:"params"`
}

type Data struct {
Error bool `json:"is_error"`
Output string `json:"out"`
Message []byte `json:"message"`
}

type Parameters struct {
Data *Data `json:"data"`
}

type NetLogger struct {
Address string
}

func (n *NetLogger) Send(p []byte, bod []byte, isError bool) error {
event := ProvisionEvent{
JsonRpc: "2.0",
Method: "Event::createProvisioningEvent",
Id: rand.Int31(),
Params: &Parameters{
Data: &Data{
Error: isError,
Output: string(p),
Message: bod,
},
},
}
return n.send(&event)
}

func NewNetLogger() *NetLogger {
cfg := &tls.Config{
InsecureSkipVerify: true,
}

http.DefaultClient.Transport = &http.Transport{
TLSClientConfig: cfg,
}

netLogger := new(NetLogger)
netLogger.Address = Cconf.Logs.Rpc

return netLogger
}

func (n *NetLogger) send(p *ProvisionEvent) error {
post, _ := json.Marshal(p)
_, err := http.Post(n.Address, "encoding/json", bytes.NewBuffer(post))
if err != nil {
return err
}
return nil
}
26 changes: 19 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,34 @@ type Config struct {
Password string
Port string
Vhost string
Queue string
Compression bool
}
Prefetch struct {
Count int
Global bool
Count int
Global bool
}
Exchange struct {
Name string
Autodelete bool
Type string
Durable bool
Name string
Autodelete bool
Type string
Durable bool
}
Queue struct {
Key string
Name string
}
Deadexchange struct {
Name string
AutoDelete bool
Type string
Durable bool
Queue string
Retry int
}
Logs struct {
Error string
Info string
Rpc string
}
}

Expand Down
125 changes: 109 additions & 16 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/streadway/amqp"
"log"
"net/url"
"strconv"
"time"
)

type Consumer struct {
Expand All @@ -21,6 +23,8 @@ type Consumer struct {
ErrLogger *log.Logger
InfLogger *log.Logger
Executer *command.CommandExecuter
DeadLetter bool
Retry int
Compression bool
}

Expand All @@ -32,19 +36,32 @@ func (c *Consumer) Consume() {
}
c.InfLogger.Println("Succeeded registering consumer.")

var sendCh *amqp.Channel

if c.DeadLetter {
var err error
sendCh, err = c.Connection.Channel()
if err != nil {
c.ErrLogger.Println("Could not open channel to republish failed jobs %s", err)
}
defer sendCh.Close()
}

defer c.Connection.Close()
defer c.Channel.Close()

forever := make(chan bool)

go func() {
for d := range msgs {
c.InfLogger.Println("reading deliveries")
input := d.Body
if c.Compression {
var b bytes.Buffer
w, err := zlib.NewWriterLevel(&b, zlib.BestCompression)
if err != nil {
c.ErrLogger.Println("Could not create zlib handler")

d.Nack(true, true)
}
c.InfLogger.Println("Compressed message")
Expand All @@ -54,12 +71,55 @@ func (c *Consumer) Consume() {
input = b.Bytes()
}

cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input))
if c.Executer.Execute(cmd) {
d.Ack(true)
if c.DeadLetter {
var retryCount int
if d.Headers == nil {
d.Headers = make(map[string]interface{}, 0)
}
retry, ok := d.Headers["retry_count"]
if !ok {
retry = "1"
}
c.InfLogger.Println(fmt.Sprintf("retry %s", retry))

retryCount, err = strconv.Atoi(retry.(string))
if err != nil {
c.ErrLogger.Fatal("could not parse retry header")
}

c.InfLogger.Println(fmt.Sprintf("retryCount : %d max retries: %d", retryCount, c.Retry))

cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input))
if c.Executer.Execute(cmd, d.Body[:]) {
d.Ack(true)
} else if retryCount >= c.Retry {
d.Nack(true, false)
} else {
//republish message with new retry header
retryCount++
d.Headers["retry_count"] = strconv.Itoa(retryCount)
republish := amqp.Publishing{
ContentType: d.ContentType,
ContentEncoding: d.ContentEncoding,
Timestamp: time.Now(),
Body: d.Body,
Headers: d.Headers,
}
err = sendCh.Publish("", c.Queue, false, false, republish)
if err != nil {
c.ErrLogger.Println("error republish %s", err)
}
d.Ack(true)
}
} else {
d.Nack(true, true)
cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input))
if c.Executer.Execute(cmd, d.Body[:]) {
d.Ack(true)
} else {
d.Nack(true, false)
}
}

}
}()
c.InfLogger.Println("Waiting for messages...")
Expand Down Expand Up @@ -100,18 +160,39 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg
}
infLogger.Println("Succeeded setting QoS.")

infLogger.Printf("Declaring queue \"%s\"...", cfg.RabbitMq.Queue)
_, err = ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, nil)

if nil != err {
return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error()))
}

// Check for missing exchange settings to preserve BC
if "" == cfg.Exchange.Name && "" == cfg.Exchange.Type && !cfg.Exchange.Durable && !cfg.Exchange.Autodelete {
cfg.Exchange.Type = "direct"
}

var table map[string]interface{}
deadLetter := false

if "" != cfg.Deadexchange.Name {
infLogger.Printf("Declaring deadletter exchange \"%s\"...", cfg.Deadexchange.Name)
err = ch.ExchangeDeclare(cfg.Deadexchange.Name, cfg.Deadexchange.Type, cfg.Deadexchange.Durable, cfg.Deadexchange.AutoDelete, false, false, amqp.Table{})

if nil != err {
return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error()))
}

table = make(map[string]interface{}, 0)
table["x-dead-letter-exchange"] = cfg.Deadexchange.Name
table["x-dead-letter-routing-key"] = cfg.Queue.Key

infLogger.Printf("Declaring error queue \"%s\"...", cfg.Deadexchange.Queue)
_, err = ch.QueueDeclare(cfg.Deadexchange.Queue, true, false, false, false, amqp.Table{})

// Bind queue
infLogger.Printf("Binding error queue \"%s\" to dead letter exchange \"%s\"...", cfg.Deadexchange.Queue, cfg.Deadexchange.Name)
err = ch.QueueBind(cfg.Deadexchange.Queue, cfg.Queue.Key, cfg.Deadexchange.Name, false, amqp.Table{})

if nil != err {
return nil, errors.New(fmt.Sprintf("Failed to bind queue to dead-letter exchange: %s", err.Error()))
}
deadLetter = true
}

// Empty Exchange name means default, no need to declare
if "" != cfg.Exchange.Name {
infLogger.Printf("Declaring exchange \"%s\"...", cfg.Exchange.Name)
Expand All @@ -121,23 +202,35 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg
return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error()))
}

// Bind queue
infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name)
err = ch.QueueBind(cfg.RabbitMq.Queue, "", cfg.Exchange.Name, false, nil)
//binding to exchange

infLogger.Printf("Declaring queue \"%s\"...with args: %+v", cfg.Queue.Name, table)
_, err = ch.QueueDeclare(cfg.Queue.Name, true, false, false, false, table)

if nil != err {
return nil, errors.New(fmt.Sprintf("Failed to bind queue to exchange: %s", err.Error()))
return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error()))
}

// Bind queue with key
infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.Queue.Name, cfg.Exchange.Name)
err = ch.QueueBind(cfg.Queue.Name, cfg.Queue.Key, cfg.Exchange.Name, false, table)

if nil != err {
return nil, errors.New(fmt.Sprintf("Failed to bind queue exchange: %s", err.Error()))
}

}

return &Consumer{
Channel: ch,
Connection: conn,
Queue: cfg.RabbitMq.Queue,
Queue: cfg.Queue.Name,
Factory: factory,
ErrLogger: errLogger,
InfLogger: infLogger,
Executer: command.New(errLogger, infLogger),
Compression: cfg.RabbitMq.Compression,
DeadLetter: deadLetter,
Retry: cfg.Deadexchange.Retry,
}, nil
}
Loading