From collectd Wiki
|Callbacks:||config, shutdown, write|
|Copyright:|| 2009 Sebastien Pahl|
2010 Florian Forster
|List of Plugins|
The AMQP plugin transmits or receives values collected by collectd via the Advanced Message Queuing Protocol (AMQP). Data is sent to or received from a "message broker" – a daemon relaying messages. The values are encoded in either the plain text protocol or JSON, though only the plain-text protocol can be parsed by the plugin at the moment. Messages can be sent in either persistent (guaranteed delivery) or transient (higher efficiency but values may be lost) delivery mode.
<Plugin "amqp"> # Send values to an AMQP broker <Publish "some_name"> Host "localhost" Port "5672" VHost "/" User "guest" Password "guest" Exchange "amq.fanout" # ExchangeType "fanout" # RoutingKey "collectd" # Persistent false # Format "command" # StoreRates false </Publish> # Receive values from an AMQP broker (work in progress) <Subscribe "some_name"> Host "localhost" Port "5672" VHost "/" User "guest" Password "guest" Exchange "amq.fanout" # ExchangeType "fanout" # Queue "queue_name" # RoutingKey "collectd.#" </Subscribe> </Plugin>
AMQP uses three components to deliver messages from a publisher to a subscriber: Exchanges are the place where publishers post their messages; Queues are the place from where subscribers receive messages. An exchange and a queue may be connected with a binding, meaning that messages may be routed from the exchange to the queue.
Which messages are sent to which queue depends on the type of the exchange and the binding.
- A fanout exchange sends messages too all queues bound to it.
- A direct exchange compares the routing keys of the message and the binding and sends the message to the appropriate queue only if they are identical.
- A topic exchange compares the routing keys of the message and the binding and allows for wildcards (placeholders) in the binding's routing key.
- A headers exchange matches header fields attached to a message to make a routing decision. Currently the AMQP plugin doesn't support the creation of header fields and can't create appropriate bindings automatically.
The AMQP plugin can automatically create an appropriate exchange after connecting to the broker. This happens if the
ExchangeType configuration option has been set.
The type is given as a string which is passed to the broker. Usually this would be one of the standard strings – "direct", "fanout", "topic", or "header" – but the standard explicitly allows for implementation specific types.
Exchanges are created as non-durable and auto delete. This means the exchange won't exist after the broker was restarted and will automatically get removed when there are no more queues bound to it.
When subscribing, the plugin will automatically create a queue after connecting. If the
Queue configuration option has been specified, this name is used, otherwise a unique name is chosen automatically by the broker.
Flags used when creating the queue are non-durable, non-exclusive, auto delete.
Exchange configuration option is given in a Subscribe block, the (possibly newly created) queue will be bound to the given exchange. The binding is created using the routing key (if it exists) and an empty "argument table". (Setting this argument table would be required for header exchanges.)
If a routing key has been configured, the data sent to the broker will always use the given routing key. If no routing key has been configured in a Publish block, a routing key will be created from the identifier. Because a routing key consists of several fields separated by dots, all dots within the identifier will be replaced by slashes. For example:
The routing key specified in Subscribe blocks is copied into automatically created bindings. How this routing key is evaluated depends on the type of exchange the queue is bound to. In case of a topic exchange, you can use wildcards to select only certain values. The following example requests only cpu statistics from the exchange:
- The RabbitMQ homepage
- The sp/amqp branch in octo's repository on Github.
- The amqp branch in Sebastien's repository on Github.