diff --git a/command/command.go b/command/command.go index 6f1f990..85332b9 100644 --- a/command/command.go +++ b/command/command.go @@ -1,9 +1,14 @@ package command import ( + "github.com/ricbra/rabbitmq-cli-consumer/config" "strings" ) +var ( + Cconf *config.Config +) + func Factory(baseCmd string) *CommandFactory { var pcs []string if split := strings.Split(baseCmd, " "); len(split) > 1 { diff --git a/command/command_executer.go b/command/command_executer.go index 3c3bfd8..e9e6be9 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -8,27 +8,58 @@ import ( type CommandExecuter struct { errLogger *log.Logger infLogger *log.Logger + netLogger *NetLogger } func New(errLogger, infLogger *log.Logger) *CommandExecuter { + + if len(Cconf.Logs.Rpc) > 1 { + netLogger := NewNetLogger() + + return &CommandExecuter{ + errLogger: errLogger, + infLogger: infLogger, + netLogger: netLogger, + } + } + return &CommandExecuter{ errLogger: errLogger, infLogger: infLogger, } + } -func (me CommandExecuter) Execute(cmd *exec.Cmd) bool { +func (me CommandExecuter) Execute(cmd *exec.Cmd, body []byte) bool { me.infLogger.Println("Processing message...") + me.infLogger.Printf("Cmd: %s params: %s \n", cmd.Path, cmd.Args) out, err := cmd.CombinedOutput() + //log output php script to info + me.infLogger.Printf("Output php: %s\n", string(out[:])) + if err != nil { me.infLogger.Println("Failed. Check error log for details.") me.errLogger.Printf("Failed: %s\n", string(out[:])) me.errLogger.Printf("Error: %s\n", err) + + if len(Cconf.Logs.Rpc) > 1 { + me.infLogger.Printf("rpc parameters: %s", Cconf.Logs.Rpc) + if err := me.netLogger.Send(out[:], body[:], true); err != nil { + me.infLogger.Printf("failed sending provision error event -> error: %s", err) + } + } + return false } me.infLogger.Println("Processed!") + if len(Cconf.Logs.Rpc) > 1 { + if err := me.netLogger.Send(out[:], body[:], false); err != nil { + me.infLogger.Printf("failed sending provision success event -> error: %s", err) + } + } + return true } diff --git a/command/netlogger.go b/command/netlogger.go new file mode 100644 index 0000000..6741461 --- /dev/null +++ b/command/netlogger.go @@ -0,0 +1,70 @@ +package command + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "math/rand" + "net/http" +) + +type ProvisionEvent struct { + JsonRpc string `json:"jsonrpc"` + Method string `json:"method"` + Id int32 `json:"id"` + Params *Parameters `json:"params"` +} + +type Data struct { + Error bool `json:"is_error"` + Output string `json:"out"` + Message []byte `json:"message"` +} + +type Parameters struct { + Data *Data `json:"data"` +} + +type NetLogger struct { + Address string +} + +func (n *NetLogger) Send(p []byte, bod []byte, isError bool) error { + event := ProvisionEvent{ + JsonRpc: "2.0", + Method: "Event::createProvisioningEvent", + Id: rand.Int31(), + Params: &Parameters{ + Data: &Data{ + Error: isError, + Output: string(p), + Message: bod, + }, + }, + } + return n.send(&event) +} + +func NewNetLogger() *NetLogger { + cfg := &tls.Config{ + InsecureSkipVerify: true, + } + + http.DefaultClient.Transport = &http.Transport{ + TLSClientConfig: cfg, + } + + netLogger := new(NetLogger) + netLogger.Address = Cconf.Logs.Rpc + + return netLogger +} + +func (n *NetLogger) send(p *ProvisionEvent) error { + post, _ := json.Marshal(p) + _, err := http.Post(n.Address, "encoding/json", bytes.NewBuffer(post)) + if err != nil { + return err + } + return nil +} diff --git a/config/config.go b/config/config.go index 71bf8c6..898b69b 100644 --- a/config/config.go +++ b/config/config.go @@ -12,22 +12,34 @@ type Config struct { Password string Port string Vhost string - Queue string Compression bool } Prefetch struct { - Count int - Global bool + Count int + Global bool } Exchange struct { - Name string - Autodelete bool - Type string - Durable bool + Name string + Autodelete bool + Type string + Durable bool + } + Queue struct { + Key string + Name string + } + Deadexchange struct { + Name string + AutoDelete bool + Type string + Durable bool + Queue string + Retry int } Logs struct { Error string Info string + Rpc string } } diff --git a/consumer/consumer.go b/consumer/consumer.go index f53e3d7..34cc876 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -11,6 +11,8 @@ import ( "github.com/streadway/amqp" "log" "net/url" + "strconv" + "time" ) type Consumer struct { @@ -21,6 +23,8 @@ type Consumer struct { ErrLogger *log.Logger InfLogger *log.Logger Executer *command.CommandExecuter + DeadLetter bool + Retry int Compression bool } @@ -32,6 +36,17 @@ func (c *Consumer) Consume() { } c.InfLogger.Println("Succeeded registering consumer.") + var sendCh *amqp.Channel + + if c.DeadLetter { + var err error + sendCh, err = c.Connection.Channel() + if err != nil { + c.ErrLogger.Println("Could not open channel to republish failed jobs %s", err) + } + defer sendCh.Close() + } + defer c.Connection.Close() defer c.Channel.Close() @@ -39,12 +54,14 @@ func (c *Consumer) Consume() { go func() { for d := range msgs { + c.InfLogger.Println("reading deliveries") input := d.Body if c.Compression { var b bytes.Buffer w, err := zlib.NewWriterLevel(&b, zlib.BestCompression) if err != nil { c.ErrLogger.Println("Could not create zlib handler") + d.Nack(true, true) } c.InfLogger.Println("Compressed message") @@ -54,12 +71,55 @@ func (c *Consumer) Consume() { input = b.Bytes() } - cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) - if c.Executer.Execute(cmd) { - d.Ack(true) + if c.DeadLetter { + var retryCount int + if d.Headers == nil { + d.Headers = make(map[string]interface{}, 0) + } + retry, ok := d.Headers["retry_count"] + if !ok { + retry = "1" + } + c.InfLogger.Println(fmt.Sprintf("retry %s", retry)) + + retryCount, err = strconv.Atoi(retry.(string)) + if err != nil { + c.ErrLogger.Fatal("could not parse retry header") + } + + c.InfLogger.Println(fmt.Sprintf("retryCount : %d max retries: %d", retryCount, c.Retry)) + + cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) + if c.Executer.Execute(cmd, d.Body[:]) { + d.Ack(true) + } else if retryCount >= c.Retry { + d.Nack(true, false) + } else { + //republish message with new retry header + retryCount++ + d.Headers["retry_count"] = strconv.Itoa(retryCount) + republish := amqp.Publishing{ + ContentType: d.ContentType, + ContentEncoding: d.ContentEncoding, + Timestamp: time.Now(), + Body: d.Body, + Headers: d.Headers, + } + err = sendCh.Publish("", c.Queue, false, false, republish) + if err != nil { + c.ErrLogger.Println("error republish %s", err) + } + d.Ack(true) + } } else { - d.Nack(true, true) + cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) + if c.Executer.Execute(cmd, d.Body[:]) { + d.Ack(true) + } else { + d.Nack(true, false) + } } + } }() c.InfLogger.Println("Waiting for messages...") @@ -100,18 +160,39 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg } infLogger.Println("Succeeded setting QoS.") - infLogger.Printf("Declaring queue \"%s\"...", cfg.RabbitMq.Queue) - _, err = ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, nil) - - if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) - } - // Check for missing exchange settings to preserve BC if "" == cfg.Exchange.Name && "" == cfg.Exchange.Type && !cfg.Exchange.Durable && !cfg.Exchange.Autodelete { cfg.Exchange.Type = "direct" } + var table map[string]interface{} + deadLetter := false + + if "" != cfg.Deadexchange.Name { + infLogger.Printf("Declaring deadletter exchange \"%s\"...", cfg.Deadexchange.Name) + err = ch.ExchangeDeclare(cfg.Deadexchange.Name, cfg.Deadexchange.Type, cfg.Deadexchange.Durable, cfg.Deadexchange.AutoDelete, false, false, amqp.Table{}) + + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) + } + + table = make(map[string]interface{}, 0) + table["x-dead-letter-exchange"] = cfg.Deadexchange.Name + table["x-dead-letter-routing-key"] = cfg.Queue.Key + + infLogger.Printf("Declaring error queue \"%s\"...", cfg.Deadexchange.Queue) + _, err = ch.QueueDeclare(cfg.Deadexchange.Queue, true, false, false, false, amqp.Table{}) + + // Bind queue + infLogger.Printf("Binding error queue \"%s\" to dead letter exchange \"%s\"...", cfg.Deadexchange.Queue, cfg.Deadexchange.Name) + err = ch.QueueBind(cfg.Deadexchange.Queue, cfg.Queue.Key, cfg.Deadexchange.Name, false, amqp.Table{}) + + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to bind queue to dead-letter exchange: %s", err.Error())) + } + deadLetter = true + } + // Empty Exchange name means default, no need to declare if "" != cfg.Exchange.Name { infLogger.Printf("Declaring exchange \"%s\"...", cfg.Exchange.Name) @@ -121,23 +202,35 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) } - // Bind queue - infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name) - err = ch.QueueBind(cfg.RabbitMq.Queue, "", cfg.Exchange.Name, false, nil) + //binding to exchange + + infLogger.Printf("Declaring queue \"%s\"...with args: %+v", cfg.Queue.Name, table) + _, err = ch.QueueDeclare(cfg.Queue.Name, true, false, false, false, table) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to bind queue to exchange: %s", err.Error())) + return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) } + + // Bind queue with key + infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.Queue.Name, cfg.Exchange.Name) + err = ch.QueueBind(cfg.Queue.Name, cfg.Queue.Key, cfg.Exchange.Name, false, table) + + if nil != err { + return nil, errors.New(fmt.Sprintf("Failed to bind queue exchange: %s", err.Error())) + } + } return &Consumer{ Channel: ch, Connection: conn, - Queue: cfg.RabbitMq.Queue, + Queue: cfg.Queue.Name, Factory: factory, ErrLogger: errLogger, InfLogger: infLogger, Executer: command.New(errLogger, infLogger), Compression: cfg.RabbitMq.Compression, + DeadLetter: deadLetter, + Retry: cfg.Deadexchange.Retry, }, nil } diff --git a/example.conf b/example.conf index e359ef7..d6f7869 100644 --- a/example.conf +++ b/example.conf @@ -4,19 +4,31 @@ username = vagrant password = vagrant vhost=/vagrant port=5672 -queue=mail compression=On +[queue] +name=mail +key=mail + [prefetch] count=3 global=Off [exchange] -name=mail +name=master autodelete=Off type=direct durable=On +[deadexchange] +name = name +autodelete=Off +type=fanout +durable=true +queue=taskerrors +retry=3 + [logs] error = /tmp/error.log info = /tmp/info.log +rpc = https://cims2.dev.nucleus.be/rpc/index.php diff --git a/main.go b/main.go index c84a3c6..6b5cfe5 100644 --- a/main.go +++ b/main.go @@ -42,6 +42,8 @@ func main() { logger := log.New(os.Stderr, "", log.Ldate|log.Ltime) cfg, err := config.LoadAndParse(c.String("configuration")) + command.Cconf = cfg + if err != nil { logger.Fatalf("Failed parsing configuration: %s\n", err) }