This repository includes input/output plugins for RabbitMQ.
fluentd >= 0.14.0
$ fluent-gem install fluent-plugin-rabbitmq
$ rabbitmq-server
$ rake test
<source>
@type rabbitmq
tag foo
host 127.0.0.1
# or hosts ["192.168.1.1", "192.168.1.2"]
user guest
pass guest
vhost /
exchange foo # not required. if specified, the queue will be bound to the exchange
queue bar
routing_key hoge # if not specified, the tag is used
heartbeat 10 # integer as seconds or :server (interval specified by server)
<parse>
@type json # or msgpack, ltsv, none
</parse>
</source>
| key | example | default value | description |
|---|---|---|---|
| durable | true | false | set durable flag of the queue |
| exclusive | true | false | set exclusive flag of the queue |
| auto_delete | true | false | set auto_delete flag of the queue |
| ttl | 60000 | nil | queue ttl in ms |
| prefetch_count | 10 | nil | |
| consumer_pool_size | 5 | nil | |
| include_headers | true | false | include headers in events |
| headers_key | string | header | key name of headers |
| create_exchange | true | false | create exchange or not |
| exchange_to_bind | string | nil | exchange to bind created exchange |
| exchange_type | direct | topic | type of created exchange |
| exchange_routing_key | hoge | nil | created exchange routing key |
| exchange_durable | true | false | durability of create exchange |
| exchange_no_declare | true | false | passive declare option for exchange |
| manual_ack | true | false | manual ACK |
| queue_mode | "lazy" | nil | queue mode |
| queue_type | "quorum" | nil | queue type |
<match pattern>
@type rabbitmq
host 127.0.0.1
# or hosts ["192.168.1.1", "192.168.1.2"]
user guest
pass guest
vhost /
format json # or msgpack, ltsv, none
exchange foo # required: name of exchange
exchange_type fanout # required: type of exchange e.g. topic, direct
exchange_durable false
routing_key hoge # if not specified, the tag is used
heartbeat 10 # integer as seconds or :server (interval specified by server)
<format>
@type json # or msgpack, ltsv, none
</format>
<buffer> # to use in buffered mode
</buffer>
</match>
| key | example | default value | description |
|---|---|---|---|
| persistent | true | false | messages is persistent to disk |
| timestamp | true | false | if true, time of record is used as timestamp in AMQP message |
| content_type | application/json | nil | message content type |
| frame_max | 131072 | nil | maximum permissible size of a frame |
| mandatory | true | nil | |
| expiration | 3600 | nil | message time-to-live |
| message_type | nil | ||
| priority | nil | ||
| app_id | nil | ||
| id_key | message_id | nil | id to specify message_id |
tls false # enable TLS or not
tls_cert /path/to/cert
tls_key /path/to/key
tls_ca_certificates ["/path/to/ca_certificate"]
verify_peer true
| key | example | default value | description |
|---|---|---|---|
| automatically_recover | true | nil | automatic network failure recovery |
| network_recovery_interval | 30 | nil | interval between reconnection attempts |
| recovery_attempts | 3 | nil | limits the number of connection recovery |
| connection_timeout | 30 | nil | |
| continuation_timeout | 600 | nil |
The gem is available as open source under the terms of the Apache License, Version 2.0.