RabbitMq运行环境搭建及php-amqplib的简单使用的示例代码

前言

RabbitMq是实现AMQP消息中间件的一种,常被用于RPC场景。RabbitMq对消息的发送端和消息的接收端进行了解耦,使消息的发送者无需了解消息的接收端,反之,接收端也无需过多了解发送端。简言之,生产者只需负责生产,消费者只负责消费。

基本概念

  • queue (队列)

    队列是存放消息的地方,且其中的消息是按序排布的。根据设置,队列可以是临时的(用完即删),也可以是持久的(服务未停止前一直存在)。

  • channel (频道)

    channel的作用是进行消息的传递。消息存放在队列中,以轮询的方式通过频道发送给监听频道的消费者,可以动态的增加消费者以提高消息的“消费能力”。

  • exchange (交换机)

    顾名思义,它的作用是转发消息,角色行为是“调度”。调度规则有四种:direct, topic, headers and fanout

    1.direct: 根据routigKey指定的队列进行消息转发
    2.topic: 按规则转发消息,该类型非常灵活,甚至可以实现其它三种类型的转发方式
    3.header: 不处理路由键,根据消息内容中的headers属性匹配转发规则
    4.fanout: 转发消息到所有已绑定的队列

rabbitMq-server

系统环境: Centos7.3

  • 安装erlang
yum install -y epel-release erlang

#查看版本
erl -version
  • 下载rabbitmq-server并进行yum安装

http://www.rabbitmq.com/download.html

cd /usr/src
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.11/rabbitmq-server-3.6.11-1.el6.noarch.rpm

rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc

yum install -y rabbitmq-server
  • 启动
# 开机自启动
chkconfig rabbitmq-server on

# 启动服务
service rabbitmq-server start

# 查看服务状态
service rabbitmq-server status

web管理工具

  • 安装

    rabbitmq-plugins enable rabbitmq_management
  • 默认账号密码

默认账号只能在localhost等本机模式下才可登陆

guest
guest
  • 添加可以远程登陆的账号

    sudo rabbitmqctl add_user test 123456
    sudo rabbitmqctl set_user_tags test administrator
    sudo rabbitmqctl set_permissions -p / test ".*" ".*" ".*"
  • 查看已创建用户

    rabbitmqctl list_users
  • 浏览器访问web管理工具

注意防火墙配置,开启15672端口

http://domain_or_ip:15672/

rabbitmqadmin

rabbitmqadmin是远程管理接口的命令行工具

安装好web管理工具并成功访问后,可在http://server-name:15672/cli页面下载rabbitmqadmin文件

拷贝到 /usr/local/bin 或者你想要的路径

增加可执行权限 : chmod 777 rabbitmqadmin

rabbitmqctl

rabbitmqctl是一个简单的命令行工具用于管理RabbitMQ Server,适合通过ssh登陆的管理

rabbitmqctl --help # 查看rabbitmqctl相关命令

php-amqplib

composer 组件
https://github.com/php-amqplib/php-amqplib

  • 安装

    composer require php-amqplib/php-amqplib
  • 关于使用php-amplib组件的示例代码 Rabbit.php

<?php
namespace tpr\api\index\controller;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use tpr\framework\Controller;
use tpr\framework\Debug;

class Rabbit extends Controller
{
    public function send()
    {
        /**
         * 连接RabbitMq-Server
         */
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

        //选择channel
        $channel = $connection->channel();

        //选择队列,队列名: 'hello'
        $queue_name = 'hello';
        $channel->queue_declare($queue_name, false, false, false, false);

        // 连续20次发送
        for ($i = 0; $i < 20; $i++) {
            $msg = new AMQPMessage('Hello World!' . time() . '->count:' . $i);
            $channel->basic_publish($msg, '', $queue_name);
        }

        $channel->close();
        $connection->close();
        $this->response($queue_name);
    }

    public function receive()
    {
        //接收者名称
        $receiver_name = uniqid();

        //使用Fork类异步执行消息接收
        $this->response($receiver_name);
    }

    public function forkReceive($receive_name)
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

        //选择channel
        $channel = $connection->channel();

        //选择队列,队列名: 'hello'
        $channel->queue_declare('hello', false, false, false, false);
        echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
        $callback = function ($msg) use ($receive_name) {
            Debug::save(ROOT_PATH . 'rabbitmq.log', $receive_name . ' : ' . $msg->body);
            echo " [x] Received ", $msg->body, "\n";
        };

        //监听队列
        $channel->basic_consume('hello', '', false, true, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
  • 测试过程

receive方法运行两次,创建两个接收端,然后调用send方法,发送20次message

  • 运行结果
59a91fa0ed35a : Hello World!1504256098->count:0
59a91fa0ed35a : Hello World!1504256098->count:2
59a91fa1c2f30 : Hello World!1504256098->count:1
59a91fa0ed35a : Hello World!1504256098->count:4
59a91fa1c2f30 : Hello World!1504256098->count:3
59a91fa0ed35a : Hello World!1504256098->count:6
59a91fa1c2f30 : Hello World!1504256098->count:5
59a91fa0ed35a : Hello World!1504256098->count:8
59a91fa1c2f30 : Hello World!1504256098->count:7
59a91fa0ed35a : Hello World!1504256098->count:10
59a91fa1c2f30 : Hello World!1504256098->count:9
59a91fa0ed35a : Hello World!1504256098->count:12
59a91fa1c2f30 : Hello World!1504256098->count:11
59a91fa0ed35a : Hello World!1504256098->count:14
59a91fa1c2f30 : Hello World!1504256098->count:13
59a91fa0ed35a : Hello World!1504256098->count:16
59a91fa1c2f30 : Hello World!1504256098->count:15
59a91fa1c2f30 : Hello World!1504256098->count:17
59a91fa0ed35a : Hello World!1504256098->count:18
59a91fa1c2f30 : Hello World!1504256098->count:19
已有 2 条评论
  1. Mr.black Mr.black

    致命错误: Class 'think\Fork' not found

    1. axios axios

      Fork类在tpr-framework里才有的,而且新版本的Fork类使用的是 tpr\framework\Fork 命名空间

添加新评论