RabbitMq运行环境搭建及php-amqplib的简单使用的示例代码

前言

RabbitMq是实现AMQP消息中间件的一种,常被用于RPC场景。RabbitMq对消息的发送端和消息的接收端进行了解耦,使消息的发送者无需了解消息的接收端,反之,接收端也无需过多了解发送端。简言之,生产者只需负责生产,消费者只负责消费。

基本概念

  • queue (队列)

    队列是存放消息的地方,且其中的消息是按序排布的。根据设置,队列可以是临时的(用完即删),也可以是持久的(服务未停止前一直存在)。

  • channel (频道)

    channel的作用是进行消息的传递。消息存放在队列中,以轮询的方式通过频道发送给监听频道的消费者,可以动态的增加消费者以提高消息的“消费能力”。

  • exchange (交换机)

    顾名思义,它的作用是转发消息,角色行为是“调度”。调度规则有四种:direct, topic, headers and fanout

    1.direct: 根据routigKey指定的队列进行消息转发
    2.topic: 按规则转发消息,该类型非常灵活,甚至可以实现其它三种类型的转发方式
    3.header: 不处理路由键,根据消息内容中的headers属性匹配转发规则
    4.fanout: 转发消息到所有已绑定的队列

rabbitMq-server

系统环境: Centos7.3

  • 安装erlang
yum install -y epel-release erlang

#查看版本
erl -version
  • 下载rabbitmq-server并进行yum安装

http://www.rabbitmq.com/download.html

cd /usr/src
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.11/rabbitmq-server-3.6.11-1.el6.noarch.rpm

rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc

yum install -y rabbitmq-server
  • 启动
# 开机自启动
chkconfig rabbitmq-server on

# 启动服务
service rabbitmq-server start

# 查看服务状态
service rabbitmq-server status

web管理工具

  • 安装
rabbitmq-plugins enable rabbitmq_management
  • 默认账号密码

    默认账号只能在localhost等本机模式下才可登陆

    guest
    guest
    
  • 添加可以远程登陆的账号

sudo rabbitmqctl add_user test 123456
sudo rabbitmqctl set_user_tags test administrator
sudo rabbitmqctl set_permissions -p / test ".*" ".*" ".*"
  • 查看已创建用户
rabbitmqctl list_users
  • 浏览器访问web管理工具

注意防火墙配置,开启15672端口

http://domain_or_ip:15672/

rabbitmqadmin

rabbitmqadmin是远程管理接口的命令行工具

安装好web管理工具并成功访问后,可在http://server-name:15672/cli页面下载rabbitmqadmin文件

拷贝到 /usr/local/bin 或者你想要的路径

增加可执行权限 : chmod 777 rabbitmqadmin

rabbitmqctl

rabbitmqctl是一个简单的命令行工具用于管理RabbitMQ Server,适合通过ssh登陆的管理

rabbitmqctl --help # 查看rabbitmqctl相关命令

php-amqplib

composer 组件
https://github.com/php-amqplib/php-amqplib

  • 安装
composer require php-amqplib/php-amqplib
  • 关于使用php-amplib组件的示例代码

https://github.com/AxiosCros/tpr-cms/blob/master/application/api/index/controller/Rabbit.php


namespace tpr\api\index\controller; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use think\Controller; use think\Debug; use think\Fork; class Rabbit extends Controller { public function send(){ /** * 连接RabbitMq-Server */ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); //选择channel $channel = $connection->channel(); //选择队列,队列名: 'hello' $queue_name = 'hello'; $channel->queue_declare($queue_name, false, false, false, false); // 连续20次发送 for ($i=0;$i<20;$i++){ $msg = new AMQPMessage('Hello World!'.time() . '->count:' . $i); $channel->basic_publish($msg, '', $queue_name); } $channel->close(); $connection->close(); $this->response($queue_name); } public function receive(){ //接收者名称 $receiver_name = uniqid(); Fork::work($this , 'forkReceive',[$receiver_name]); $this->response($receiver_name); } public function forkReceive($receive_name){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); //选择channel $channel = $connection->channel(); //选择队列,队列名: 'hello' $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; $callback = function($msg) use ($receive_name) { Debug::save(ROOT_PATH . 'rabbitmq.log',$receive_name.' : '.$msg->body); echo " [x] Received ", $msg->body, "\n"; }; //监听队列 $channel->basic_consume('hello', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } }
  • 测试过程

receive方法运行两次,创建两个接收端,然后调用send方法,发送20次message

  • 运行结果
59a91fa0ed35a : Hello World!1504256098->count:0
59a91fa0ed35a : Hello World!1504256098->count:2
59a91fa1c2f30 : Hello World!1504256098->count:1
59a91fa0ed35a : Hello World!1504256098->count:4
59a91fa1c2f30 : Hello World!1504256098->count:3
59a91fa0ed35a : Hello World!1504256098->count:6
59a91fa1c2f30 : Hello World!1504256098->count:5
59a91fa0ed35a : Hello World!1504256098->count:8
59a91fa1c2f30 : Hello World!1504256098->count:7
59a91fa0ed35a : Hello World!1504256098->count:10
59a91fa1c2f30 : Hello World!1504256098->count:9
59a91fa0ed35a : Hello World!1504256098->count:12
59a91fa1c2f30 : Hello World!1504256098->count:11
59a91fa0ed35a : Hello World!1504256098->count:14
59a91fa1c2f30 : Hello World!1504256098->count:13
59a91fa0ed35a : Hello World!1504256098->count:16
59a91fa1c2f30 : Hello World!1504256098->count:15
59a91fa1c2f30 : Hello World!1504256098->count:17
59a91fa0ed35a : Hello World!1504256098->count:18
59a91fa1c2f30 : Hello World!1504256098->count:19
添加新评论