RabbitMQ之tutorial-5

RabbitMQ之tutorial-5

Tutorial Five

https://www.rabbitmq.com/tutorials/tutorial-five-python.html

direct exchange can't do routing based on multiple criteria. eg, in our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log

Topics: Receiving messages based on a pattern

Topic Exchange: Messages sent to a topic exchange can't have an arbitrary routing_key - it must be a list of words, delimited by dots. The words can be anything, but usually they specify some features connected to the message.

Eg:

- stock.usd.nyse
- quick.orange.rabbit

binding Keys: a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key.

  • * (star) can substitute for exactly one word.
  • # (hash) can substitute for zero or more words.
Screen Shot 2020-07-22 at 4.16.01 PM

We created three bindings:

Q1 is bound with binding key ".orange." and Q2 with "..rabbit" and "lazy.#".

  • Q1 is interested in all the orange animals.
  • Q2 wants to hear everything about rabbits, and everything about lazy animals.

quick.orange.rabbit deliver to both; quick.orange.fox goes to first; lazy.brown.fox (or lazy.orange.male.rabbit) goes to second. If NO matching found msg will be lost.

When queue bound with #, receive all msg regardless of routing key, simiar to fanout
when queue bound with exact world (no # or * ), topic exchange behave like direct


emitLog

import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)

print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

Recive Log

import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)

for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)


print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

testing

# receive all logs
python receive_logs_topic.py "#"

#receive all logs from the facility "kern"
python receive_logs_topic.py "kern.*"

# if you want to hear only about "critical" logs:
python receive_logs_topic.py "*.critical"

# create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical"

python emit_log_topic.py "kern.critical" "A critical kernel error"
python emit_log_topic.py "kern.info" "A info kernel msg"
python emit_log_topic.py "network.info" "A critical network error"