# RabbitMQ常用模式 ### 1、普通模式 ["Hello World!"](https://www.rabbitmq.com/tutorials/tutorial-one-python.html) > 一名生产者一名消费者,一对一关系  - 生产者 ```php connection = new AMQPStreamConnection('localhost', 5672, '*****', '******', 'qone', false, 'AMQPLAIN', '登录成功', 'zh-CN'); //建立连接通道 $this->channel = $this->connection->channel(); } public function hello() { //声明队列 $this->channel->queue_declare('hello', false, false, false, false, false); //发送信息的参数设置 $option = []; //实例化发送信息类 $msg = new AMQPMessage('Hello World ' . time(), $option); //写入队列 $this->channel->basic_publish($msg, '', 'hello'); //关闭通道 $this->channel->close(); //关闭连接 $this->connection->close(); } } ``` - 效果图,数据写入队列成功   - 消费者 ```php channel(); // 回调函数 队列内容处理逻辑 $callback = function ($msg) { echo 'Received ', $msg->body, "\n"; //确认后队列才会删除信息,basic_consume()第四参数为ture自动需要确认,建议手动确认,防止数据丢失 //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); $msg->ack(); }; //消费信息 $channel->basic_consume('hello', '', false, false, false, false, $callback); //脚本执行完成或者 exit() 后关闭连接和通道 function shutdown($channel, $connection) { $channel->close(); $connection->close(); } register_shutdown_function('shutdown', $channel, $connection); //通过callbacks数组判断是否还在活动 while ($channel->is_consuming()) { $channel->wait(); } ``` - 效果图,读取成功,队列数据消失   ### 2、工作模式 [Work queues](https://www.rabbitmq.com/tutorials/tutorial-two-python.html) > 一个生产者对应多个消费者,工作队列又称任务队列  工作模式代码同普通模式一致,只是同时存在多个消费者 - 结果图  ### 3、发布/订阅模式 [Publish/Subscribe](https://www.rabbitmq.com/tutorials/tutorial-three-python.html) > 消息可以被多个消费者同时获取,生产者将消息发送到交换机,消费者将自己对应的队列注册到交换机,当生产者发送消息后所有注册的队列的消费者都可以收到消息。场景:多个单位需要同时调整时,共同注册交换机。  - 生产者 ```php connection = new AMQPStreamConnection('localhost', 5672, '*****', '******', 'qone', false, 'AMQPLAIN', '登录成功', 'zh-CN'); //建立连接通道 $this->channel = $this->connection->channel(); } public function fanout() { //发送信息的参数设置 $option = []; //实例化发送信息类 $msg = new AMQPMessage('Fanout ' . time(), $option); //声明一个交换机 $this->channel->exchange_declare('fanout', AMQPExchangeType::FANOUT, false, false, true); //写入交换机 $this->channel->basic_publish($msg, 'fanout'); //关闭通道 $this->channel->close(); //关闭连接 $this->connection->close(); } } ``` - 消费者 ,因为队列名称是随机字符串,消费者代码一致,一个脚本启动两次就可以 ```php channel(); // 回调函数 队列内容处理逻辑 $callback = function ($msg) { echo 'Received ', $msg->body, "\n"; //确认后队列才会删除信息,basic_consume()第四参数为ture自动需要确认,建议手动确认,防止数据丢失 //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); $msg->ack(); }; //声明队列 队列名称为空 会随机字符串 list($queue_name, ,) = $channel->queue_declare('', false, false, false, true); //声明交换机 $channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, true); //绑定 $channel->queue_bind($queue_name, $exchange); //消费信息 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //脚本执行完成或者 exit() 后关闭连接和通道 function shutdown($channel, $connection) { $channel->close(); $connection->close(); } register_shutdown_function('shutdown', $channel, $connection); //通过callbacks数组判断是否还在活动 while ($channel->is_consuming()) { $channel->wait(); } ``` - 结果图   ### 4、路由模式 [Routing](https://www.rabbitmq.com/tutorials/tutorial-four-python.html) > 生产者将消息发送到了type为direct模式的交换机,消费者的队列在将自己绑定到路由的时候会给自己绑定一个key,只有消费者发送对应key格式的消息时候队列才会收到消息,类似于带筛选条件的订阅模式。  - 生产者 ```php connection = new AMQPStreamConnection('localhost', 5672, '*****', '******', 'qone', false, 'AMQPLAIN', '登录成功', 'zh-CN'); //建立连接通道 $this->channel = $this->connection->channel(); } public function direct() { $array = ['k1', 'k2', 'k3']; $type = $array[array_rand($array)]; //发送信息的参数设置 $option = []; //实例化发送信息类 $msg = new AMQPMessage('Direct ' . $type . time(), $option); //声明一个交换机 $this->channel->exchange_declare('direct', AMQPExchangeType::DIRECT, false, false, true); //写入交换机 $this->channel->basic_publish($msg, 'direct', $type); //关闭通道 $this->channel->close(); //关闭连接 $this->connection->close(); } } ``` - 消费者 ```php channel(); // 回调函数 队列内容处理逻辑 $callback = function ($msg) { echo 'Received ', $msg->body, "\n"; //确认后队列才会删除信息,basic_consume()第四参数为ture自动需要确认,建议手动确认,防止数据丢失 //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); $msg->ack(); }; //声明队列 队列名称为空 会随机字符串 list($queue_name, ,) = $channel->queue_declare('', false, false, false, true); //声明交换机 $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, false, true); //绑定 $channel->queue_bind($queue_name, $exchange, $argv['1']); //消费信息 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //脚本执行完成或者 exit() 后关闭连接和通道 function shutdown($channel, $connection) { $channel->close(); $connection->close(); } register_shutdown_function('shutdown', $channel, $connection); //通过callbacks数组判断是否还在活动 while ($channel->is_consuming()) { $channel->wait(); } ``` - 结果图  ### 5、主题模式 [Topics](https://www.rabbitmq.com/tutorials/tutorial-five-python.html) > 类似于路由模式,key变得更宽泛,可以模糊匹配。 > \#:可以替代零个或多个单词。 > *:可以代替一个单词。  - 生产者 ```php connection = new AMQPStreamConnection('localhost', 5672, '*****', '******', 'qone', false, 'AMQPLAIN', '登录成功', 'zh-CN'); //建立连接通道 $this->channel = $this->connection->channel(); } //主题模式 public function topic() { $arr = ['mysql.select.k1', 'mysql.select.k2', 'mysql.update.k1']; $type = $arr[array_rand($arr)]; //发送信息的参数设置 $option = []; //实例化发送信息类 $msg = new AMQPMessage('Topic ' . $type, $option); //声明一个交换机 $this->channel->exchange_declare('topic', AMQPExchangeType::TOPIC, false, false, true); //写入交换机 $this->channel->basic_publish($msg, 'topic', $type); //关闭通道 $this->channel->close(); //关闭连接 $this->connection->close(); } } ``` - 消费者 ```php channel(); // 回调函数 队列内容处理逻辑 $callback = function ($msg) { echo 'Received ', $msg->body, "\n"; //确认后队列才会删除信息,basic_consume()第四参数为ture自动需要确认,建议手动确认,防止数据丢失 //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); $msg->ack(); }; //声明队列 队列名称为空 会随机字符串 list($queue_name, ,) = $channel->queue_declare('', false, false, false, true); //声明交换机 $channel->exchange_declare($exchange, AMQPExchangeType::TOPIC, false, false, true); //绑定 $channel->queue_bind($queue_name, $exchange, $argv['1']); //消费信息 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //脚本执行完成或者 exit() 后关闭连接和通道 function shutdown($channel, $connection) { $channel->close(); $connection->close(); } register_shutdown_function('shutdown', $channel, $connection); //通过callbacks数组判断是否还在活动 while ($channel->is_consuming()) { $channel->wait(); } ``` - 结果图  ### 6、[RPC](https://www.rabbitmq.com/tutorials/tutorial-six-python.html)模式 > RPC是远程过程调用(Remote Procedure Call)的缩写形式。 > > [进程间通信](https://baike.baidu.com/item/进程间通信)(IPC)是在[多任务操作系统](https://baike.baidu.com/item/多任务操作系统)或联网的计算机之间运行的程序和进程所用的通信技术。有两种类型的进程间通信。 > > LPC用在多任务操作系统中,使得同时运行的任务能互相会话。这些任务[共享内存](https://baike.baidu.com/item/共享内存)空间使任务同步和互相发送信息。 > > 远程过程调用(RPC)RPC类似于LPC,只是在网上工作。   - 客户端 ```php connection = new AMQPStreamConnection('localhost', 5672, '*****', '*****', '*****', false, 'AMQPLAIN', '登录成功', 'zh-CN'); $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false); $this->channel->basic_consume($this->callback_queue, '', false, true, false, false, array( $this, 'onResponse' ) ); } public function onResponse($rep) { if ($rep->get('correlation_id') == $this->corr_id) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string)$n, array( 'correlation_id' => $this->corr_id, 'php' => $n, 'reply_to' => $this->callback_queue ) ); $this->channel->basic_publish($msg, '', 'rpc_queue'); while (!$this->response) { $this->channel->wait(); } return intval($this->response); } } if (empty($argv['1'])) { exit('参数异常'); } $rpc = new RpcClient(); $response = $rpc->call($argv['1']); echo 'Got ', $response, "\n"; ``` - 服务端,这里业务逻辑只是做了+1处理 ```php channel(); $channel->queue_declare('rpc_queue', false, false, false, false); echo "Awaiting RPC requests\r\n"; $callback = function ($req) { $n = intval($req->body); echo $n."\r\n"; $msg = new AMQPMessage( (string) ($n+1), array('correlation_id' => $req->get('correlation_id')) ); $req->delivery_info['channel']->basic_publish( $msg, '', $req->get('reply_to') ); $req->delivery_info['channel']->basic_ack( $req->delivery_info['delivery_tag'] ); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close(); ``` - 结果图  Last modification:November 26th, 2020 at 05:42 pm © 允许规范转载 Support 如果觉得我的文章对你有用,请随意赞赏 ×Close Appreciate the author Sweeping payments Pay by AliPay Pay by WeChat