channel.basic_publish(exchange='',routing_key='hello',body='Hello World!')print(" [x] Sent 'Hello World!'")
最后别忘了关闭连接来刷新缓存,确保所有的消息都写到了队列中:
1
connection.close()
最后的接受者程序是这样的:
1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env pythonimportpikaconnection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.217.161',port=5673))channel=connection.channel()channel.queue_declare(queue='hello')channel.basic_publish(exchange='',routing_key='hello',body='Hello World!')print(" [x] Sent 'Hello World!'")connection.close()
接受者程序差不多,不需要路由器,但是需要传递一个回调函数,每次接收到消息就被调用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env pythonimportpikaconnection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.217.161',port=5673))channel=connection.channel()channel.queue_declare(queue='hello')defcallback(ch,method,properties,body):print(" [x] Received %r"%body)channel.basic_consume(callback,queue='hello',no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
importpikaconnection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.217.161',port=5673))channel=connection.channel()channel.queue_declare(queue='task_queue')messages=['python new_task.py First message.','python new_task.py Second message..','python new_task.py Third message...','python new_task.py Fourth message....','python new_task.py Fifth message.....','python new_task.py Sixth message......']forminmessages:channel.basic_publish(exchange='',routing_key='task_queue',body=m,properties=pika.BasicProperties(delivery_mode=2,# make message persistent))print(" [x] Sent %r"%m)connection.close()
工作进程如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
importpikaimporttimeconnection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.217.161',port=5673))channel=connection.channel()channel.queue_declare(queue='task_queue')defcallback(ch,method,properties,body):print(" [x] Received %r"%body)time.sleep(body.count(b'.'))print(" [x] Done")channel.basic_consume(callback,queue='task_queue',no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
运行的时候,可以同时开两个窗口运行worker进程,然后再运行new_task.py,最后效果:
消息发送:
1
2
3
4
5
6
[x] Sent 'python new_task.py First message.'
[x] Sent 'python new_task.py Second message..'
[x] Sent 'python new_task.py Third message...'
[x] Sent 'python new_task.py Fourth message....'
[x] Sent 'python new_task.py Fifth message.....'
[x] Sent 'python new_task.py Sixth message......'
工作进程1:
1
2
3
4
5
6
7
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'python new_task.py First message.'
[x] Done
[x] Received b'python new_task.py Third message...'
[x] Done
[x] Received b'python new_task.py Fifth message.....'
[x] Done
工作进程2:
1
2
3
4
5
6
7
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'python new_task.py Second message..'
[x] Done
[x] Received b'python new_task.py Fourth message....'
[x] Done
[x] Received b'python new_task.py Sixth message......'
[x] Done