在教程第二篇里面我们学习了如何实现一个任务队列,异步方式去处理那些比较耗时的任务。
但是如果我们需要调用一个远程主机上面的方法,并且等待它的执行结果呢?
这种模式我们通常将它称为远程方法调用(RPC)。
这一篇我们将利用RabbitMQ来构建一个RPC服务,服务器上面有一个可返回斐波拉契数的函数。
客户端通过rpc调用来获取结果。
客户端接口
为了演示的方便,我在客户端创建一个简单的类,里面有个call方法,它会发送一个rpc请求并等待执行的返回结果。
使用它的方式如下:
1
2
3
fibonacci_rpc = FibonacciRpcClient ()
result = fibonacci_rpc . call ( 4 )
print ( "fib(4) is %r " % result )
回调队列
使用RabbitMQ实现RPC原理很简单,客户端使用消息的方式发送方法名和参数,服务器将结果也作为消息回传回来,
那么这时候就需要另外一个队列来接受返回的消息,这里可指定回调队列。
1
2
3
4
5
6
7
8
9
10
11
result = channel . queue_declare ( exclusive = True )
callback_queue = result . method . queue
channel . basic_publish ( exchange = '' ,
routing_key = 'rpc_queue' ,
properties = pika . BasicProperties (
reply_to = callback_queue ,
),
body = request )
# ... and some code to read a response message from the callback_queue ...
Correlation id
上面的例子每一次的RPC请求都会创建一个新的回调队列,这个很浪费资源。
不过我们现在可以通过Correlation id,也就是关联id来对每个客户端只创建唯一的回调队列。
每次请求带上这个关联id,当获取返回值时候比较关联id,一致的说明是这个请求的返回消息就接受,不一致的忽略这个消息。
Summary
一次RPC调用流程大致如下:
客户端启动时候,创建一个匿名的排他回调队列 每一次的RPC请求,客户端发送的消息都会带两个属性:reply_to(回调队列)和correlation_id 请求发送至rpc_queue队列 RPC服务器订阅这个rpc_queue 队列,收到消息就执行然后返回结果,使用reply_to指定的回调队列。 客户端等待回调队列的数据返回,当收到消息后,检查correlation_id属性,如果一致就返回结果。 最终代码
RPC服务器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import pika
connection = pika . BlockingConnection ( pika . ConnectionParameters ( host = '192.168.217.161' , port = 5673 ))
channel = connection . channel ()
channel . queue_declare ( queue = 'rpc_queue' )
def fib ( n ):
if n == 0 :
return 0
elif n == 1 :
return 1
else :
return fib ( n - 1 ) + fib ( n - 2 )
def on_request ( ch , method , props , body ):
n = int ( body )
print ( " [.] fib( %s )" % n )
response = fib ( n )
ch . basic_publish ( exchange = '' ,
routing_key = props . reply_to ,
properties = pika . BasicProperties ( correlation_id = props . correlation_id ),
body = str ( response ))
ch . basic_ack ( delivery_tag = method . delivery_tag )
channel . basic_qos ( prefetch_count = 1 )
channel . basic_consume ( on_request , queue = 'rpc_queue' )
print ( " [x] Awaiting RPC requests" )
channel . start_consuming ()
客户端调用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import pika
import uuid
class FibonacciRpcClient ( object ):
def __init__ ( self ):
self . connection = pika . BlockingConnection ( pika . ConnectionParameters ( host = '192.168.217.161' , port = 5673 ))
self . channel = self . connection . channel ()
result = self . channel . queue_declare ( exclusive = True )
self . callback_queue = result . method . queue
self . channel . basic_consume ( self . on_response , no_ack = True ,
queue = self . callback_queue )
def on_response ( self , ch , method , props , body ):
if self . corr_id == props . correlation_id :
self . response = body
def call ( self , n ):
self . response = None
self . corr_id = str ( uuid . uuid4 ())
self . channel . basic_publish ( exchange = '' ,
routing_key = 'rpc_queue' ,
properties = pika . BasicProperties (
reply_to = self . callback_queue ,
correlation_id = self . corr_id ,
),
body = str ( n ))
while self . response is None :
self . connection . process_data_events ()
return int ( self . response )
fibonacci_rpc = FibonacciRpcClient ()
print ( " [x] Requesting fib(30)" )
response = fibonacci_rpc . call ( 30 )
print ( " [.] Got %r " % response )
演示效果
rpc_server.py
1
2
[x] Awaiting RPC requests
[.] fib(30)
rpc_client.py
1
2
[x] Requesting fib(30)
[.] Got 832040