rpc_server.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare('rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print("[.] fib(%s)" % n)
    response = fib(n)  # 斐波那契的执行结果赋值给reponse
    # 再把得到的消息发回给客户端
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(
                         correlation_id=props.correlation_id
                     ),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 确保消息被消费,代表任务完成


# channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    'rpc_queue', on_request)

print(" [x] Awaiting RPC request")
channel.start_consuming()

 rpc_client.py

import pika
import uuid


class FibonacciRpcClient(object):
    def __init__(self):
        self.response = None
        self.corr_id = str(uuid.uuid4())

        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', virtual_host='/'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare('', exclusive=True)
        self.callback_queue = result.method.queue  # 获取queue名字
        self.channel.basic_consume(self.callback_queue,
                                   self.on_response,  # 只要收到就调用on_response()
                                   auto_ack=True
                                   )

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:  # 判断服务器端corr_id和本地corr_id相等,才往下走
            self.response = body  # response收到body的消息表示response不为空
            self.channel.stop_consuming()

    def call(self, n):
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,  # 指定返回到那个queue
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(n)
                                   )  # 传字符串,把n传进来

        # 定义超时回调函数
        def out_of_time():
            self.response = b'-1'
            self.channel.stop_consuming()

        self.connection.call_later(2, out_of_time)
        self.channel.start_consuming()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()
request_num = 11
print(" [x] Requesting fib(%r)" % request_num)
response = fibonacci_rpc.call(request_num)
if response == -1:
    print(" [.] Expired....")
print(" [.] Got %r" % response)
D:\source\python\pikatest\Scripts\python.exe C:/Users/Administrator/PycharmProjects/pythonProject/rabbitmq/rpc_client3.py
 [x] Requesting fib(11)
 [.] Got 89

Process finished with exit code 0

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐