Which form of connection to use with pika

The SelectConnection is useful if your application architecture can benefit from an asynchronous design, e.g. doing something else while the RabbitMQ IO completes (e.g. switch to some other IO etc) . This type of connection uses callbacks to indicate when functions return. For example you can declare callbacks for on_connected, on_channel_open, on_exchange_declared, on_queue_declared etc. …to … Read more

Consume multiple queues in python / pika

One possible solution is to use non blocking connection and consume messages. import pika def callback(channel, method, properties, body): print(body) channel.basic_ack(delivery_tag=method.delivery_tag) def on_open(connection): connection.channel(on_open_callback=on_channel_open) def on_channel_open(channel): channel.basic_consume(queue=”queue1″, on_message_callback=callback) channel.basic_consume(queue=”queue2″, on_message_callback=callback) parameters = pika.URLParameters(‘amqp://guest:guest@localhost:5672/%2F’) connection = pika.SelectConnection(parameters=parameters, on_open_callback=on_open) try: connection.ioloop.start() except KeyboardInterrupt: connection.close() This will connect to multiple queues and will consume messages accordingly.

How can I recover unacknowledged AMQP messages from other channels than my connection’s own?

Unacknowledged messages are those which have been delivered across the network to a consumer but have not yet been ack’ed or rejected — but that consumer hasn’t yet closed the channel or connection over which it originally received them. Therefore the broker can’t figure out if the consumer is just taking a long time to … Read more

Handling long running tasks in pika / RabbitMQ

For now, your best bet is to turn off heartbeats, this will keep RabbitMQ from closing the connection if you’re blocking for too long. I am experimenting with pika’s core connection management and IO loop running in a background thread but it’s not stable enough to release. In pika v1.1.0 this is ConnectionParameters(heartbeat=0)