举杯邀月

yii2 使用 rabbitmq 扩展监听、发送消息

摘要:RabbitMQ是一个在AMQP基础上实现的企业级消息系统。何谓消息系统,就是消息队列系统,消息队列是“消费-生产者模型”的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。

RabbitMQ是一个在AMQP基础上实现的企业级消息系统。何谓消息系统,就是消息队列系统,消息队列是“消费-生产者模型”的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。

在开始前确保php安装了amqp扩展

扩展安装

composer require mikemadisonweb/yii2-rabbitmq "1.7.0"
1
1
1
1
1

配置及使用

1.新建rabbitmq.php文件,单独放其配置,在配置前,要先保证,rabbitmq的服务已经配置好

/**
 * rabbitmq的配置
 */
$rabbitMq = [
    'on before_consume' => function ($event) {
        if (isset(\Yii::$app->db)) {
            $db = \Yii::$app->db;
            if ($db->getIsActive()) {
                $db->close();
            }
            $db->open();
        }
    },
    'logger' => [
        'enable' => true,
        'category' => 'amqp',
        'print_console' => true,
    ],
    'class' => 'mikemadisonweb\rabbitmq\Configuration',
    'connections' => [
        'default' => [
            'host' => '127.0.0.1',
            'port' => '5672',
            'user' => '你的登陆账号',
            'password' => '你的密码',
            'vhost' => '虚拟主机',
            'heartbeat' => 0,
        ],
    ],
    'producers' => [  //生产者,发送消息需要在次配置
        'import_data' => [ 
            'connection' => 'default',
            'exchange_options' => [
                'name' => 'import_data',
                'type' => 'direct',
            ],
            'queue_options' => [
                'name' => 'import_data', // Queue name which will be binded to the exchange adove
                'routing_keys' => ['import_data'], // Your custom options
                'durable' => true,
                'auto_delete' => false,
            ],
        ],
    ],
    'consumers' => [  //消费者,接受消息需要再次配置
        'import_data' => [
            'connection' => 'default',
            'exchange_options' => [
                'name' => 'import_data', // Name of exchange to declare
                'type' => 'direct', // Type of exchange
            ],
            'queue_options' => [
                'name' => 'import_data', // Queue name which will be binded to the exchange adove
                'routing_keys' => ['import_data'], // Your custom options
                'durable' => true,
                'auto_delete' => false,
            ],
            'callback' => \common\components\rabbitmq\ImportDataConsumer::class,
        ],
    ],
];

return $rabbitMq;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

2.在主配置文件,common/config/main.php中添加组件

'components' => [
    "rabbitmq" => require(__DIR__ . '/rabbitmq.php'),
],
1
2
3
1
2
3
1
2
3
1
2
3
1
2
3

3.配置回调
在\common\components\rabbitmq目录下,配置回调类

namespace common\components\rabbitmq;

use mikemadisonweb\rabbitmq\components\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class ImportDataConsumer implements ConsumerInterface
{
    /**
     * @param AMQPMessage $msg
     * @return bool
     */
    public function execute(AMQPMessage $msg)
    {
        $data = unserialize($msg->body);
        var_dump($data);
        echo '我刚消费了消息';
        return ConsumerInterface::MSG_ACK;
       
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

4.配置启动命令 在console/config/main.php中添加启动配置

'controllerMap' => [
        'rabbitmq-consumer' => \mikemadisonweb\rabbitmq\controllers\ConsumerController::class,
        'rabbitmq-producer' => \mikemadisonweb\rabbitmq\controllers\ProducerController::class,
    ],
1
2
3
4
1
2
3
4
1
2
3
4
1
2
3
4
1
2
3
4

5.启动命令

yii rabbitmq-consumer/single import_data
1
1
1
1
1

6.发送消息

\Yii::$app->rabbitmq->load();
$producer = \Yii::$container->get(sprintf('rabbit_mq.producer.%s', 'import_data'));
$msg = serialize(['id' => rand(1,100), 'msg' => 'hello world']);
$producer->publish($msg, 'import_data');
1
2
3
4
1
2
3
4
1
2
3
4
1
2
3
4
1
2
3
4

7.消费消息,在第三步配置的回调类中

作者:举杯邀月

出处: http://www.hug-code.cn/archives/5fc5b2d6c1d66.html

2020-04-27 标签: phpyii2rabbitmq