前言
RabbitMq是实现AMQP消息中间件的一种,常被用于RPC场景。RabbitMq对消息的发送端和消息的接收端进行了解耦,使消息的发送者无需了解消息的接收端,反之,接收端也无需过多了解发送端。简言之,生产者只需负责生产,消费者只负责消费。
基本概念
-
queue (队列)
队列是存放消息的地方,且其中的消息是按序排布的。根据设置,队列可以是临时的(用完即删),也可以是持久的(服务未停止前一直存在)。
-
channel (频道)
channel的作用是进行消息的传递。消息存放在队列中,以轮询的方式通过频道发送给监听频道的消费者,可以动态的增加消费者以提高消息的“消费能力”。
-
exchange (交换机)
顾名思义,它的作用是转发消息,角色行为是“调度”。调度规则有四种:direct, topic, headers and fanout
1.direct: 根据routigKey指定的队列进行消息转发
2.topic: 按规则转发消息,该类型非常灵活,甚至可以实现其它三种类型的转发方式
3.header: 不处理路由键,根据消息内容中的headers属性匹配转发规则
4.fanout: 转发消息到所有已绑定的队列
rabbitMq-server
系统环境: Centos7.3
- 安装erlang
yum install -y epel-release erlang
#查看版本
erl -version
- 下载rabbitmq-server并进行yum安装
cd /usr/src
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.11/rabbitmq-server-3.6.11-1.el6.noarch.rpm
rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum install -y rabbitmq-server
- 启动
# 开机自启动
chkconfig rabbitmq-server on
# 启动服务
service rabbitmq-server start
# 查看服务状态
service rabbitmq-server status
web管理工具
-
安装
rabbitmq-plugins enable rabbitmq_management
-
默认账号密码
默认账号只能在localhost等本机模式下才可登陆
guest
guest
-
添加可以远程登陆的账号
sudo rabbitmqctl add_user test 123456 sudo rabbitmqctl set_user_tags test administrator sudo rabbitmqctl set_permissions -p / test ".*" ".*" ".*"
-
查看已创建用户
rabbitmqctl list_users
-
浏览器访问web管理工具
注意防火墙配置,开启15672端口
rabbitmqadmin
rabbitmqadmin是远程管理接口的命令行工具
安装好web管理工具并成功访问后,可在http://server-name:15672/cli页面下载rabbitmqadmin文件
拷贝到 /usr/local/bin 或者你想要的路径
增加可执行权限 : chmod 777 rabbitmqadmin
rabbitmqctl
rabbitmqctl是一个简单的命令行工具用于管理RabbitMQ Server,适合通过ssh登陆的管理
rabbitmqctl --help # 查看rabbitmqctl相关命令
php-amqplib
composer 组件
https://github.com/php-amqplib/php-amqplib
-
安装
composer require php-amqplib/php-amqplib
-
关于使用php-amplib组件的示例代码 Rabbit.php
<?php
namespace tpr\api\index\controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use tpr\framework\Controller;
use tpr\framework\Debug;
class Rabbit extends Controller
{
public function send()
{
/**
* 连接RabbitMq-Server
*/
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
//选择channel
$channel = $connection->channel();
//选择队列,队列名: 'hello'
$queue_name = 'hello';
$channel->queue_declare($queue_name, false, false, false, false);
// 连续20次发送
for ($i = 0; $i < 20; $i++) {
$msg = new AMQPMessage('Hello World!' . time() . '->count:' . $i);
$channel->basic_publish($msg, '', $queue_name);
}
$channel->close();
$connection->close();
$this->response($queue_name);
}
public function receive()
{
//接收者名称
$receiver_name = uniqid();
//使用Fork类异步执行消息接收
$this->response($receiver_name);
}
public function forkReceive($receive_name)
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
//选择channel
$channel = $connection->channel();
//选择队列,队列名: 'hello'
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function ($msg) use ($receive_name) {
Debug::save(ROOT_PATH . 'rabbitmq.log', $receive_name . ' : ' . $msg->body);
echo " [x] Received ", $msg->body, "\n";
};
//监听队列
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
- 测试过程
receive方法运行两次,创建两个接收端,然后调用send方法,发送20次message
- 运行结果
59a91fa0ed35a : Hello World!1504256098->count:0
59a91fa0ed35a : Hello World!1504256098->count:2
59a91fa1c2f30 : Hello World!1504256098->count:1
59a91fa0ed35a : Hello World!1504256098->count:4
59a91fa1c2f30 : Hello World!1504256098->count:3
59a91fa0ed35a : Hello World!1504256098->count:6
59a91fa1c2f30 : Hello World!1504256098->count:5
59a91fa0ed35a : Hello World!1504256098->count:8
59a91fa1c2f30 : Hello World!1504256098->count:7
59a91fa0ed35a : Hello World!1504256098->count:10
59a91fa1c2f30 : Hello World!1504256098->count:9
59a91fa0ed35a : Hello World!1504256098->count:12
59a91fa1c2f30 : Hello World!1504256098->count:11
59a91fa0ed35a : Hello World!1504256098->count:14
59a91fa1c2f30 : Hello World!1504256098->count:13
59a91fa0ed35a : Hello World!1504256098->count:16
59a91fa1c2f30 : Hello World!1504256098->count:15
59a91fa1c2f30 : Hello World!1504256098->count:17
59a91fa0ed35a : Hello World!1504256098->count:18
59a91fa1c2f30 : Hello World!1504256098->count:19
致命错误: Class 'think\Fork' not found
Fork类在tpr-framework里才有的,而且新版本的Fork类使用的是 tpr\framework\Fork 命名空间