importpikaimportsysconnection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.217.161',port=5673))channel=connection.channel()channel.exchange_declare(exchange='topic_logs',type='topic')result=channel.queue_declare(exclusive=True)queue_name=result.method.queuebinding_keys=['disk.error','disk.warning']ifnotbinding_keys:sys.stderr.write("Usage: %s [info] [warning] [error]\n"%sys.argv[0])sys.exit(1)forbinding_keyinbinding_keys:channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')defcallback(ch,method,properties,body):print(" [x] %r:%r"%(method.routing_key,body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()
运行效果
emit_log_topic.py
1
2
3
[x] Sent 'disk.error':'[disk.info] Hello World!'
[x] Sent 'disk.warning':'[disk.warning] Hello World!'
[x] Sent 'test.error':'[test.error] Hello World!'
receive_logs_topic.py
1
2
3
[*] Waiting for logs. To exit press CTRL+C
[x] 'disk.error':b'[disk.info] Hello World!'
[x] 'disk.warning':b'[disk.warning] Hello World!'