- 論壇徽章:
- 0
|
PHP AMQP 擴展 應(yīng)用
高級消息隊列協(xié)議(AMQP)是一個異步消息傳遞所使用的應(yīng)用層協(xié)議規(guī)范。作為線路層協(xié)議,而不是API(例如JMS),AMQP 客戶端能夠無視消息的來源任意發(fā)送和接受信息,F(xiàn)在,已經(jīng)有相當一部分不同平臺的服務(wù)器和客戶端可以投入使用
我的AMQP服務(wù)器是使用RabbitMQ ,RabbitMQ 的安裝網(wǎng)上很多。
重點說下 PHP 的擴展 php-amqp,我用的是最新的 amqp-1.0.1
文檔是舊的 新的摻雜在一起 因此弄了好久才完成。
首先說下流程 即 Client - AMQP server - Client
左邊的Client向右邊的Client發(fā)送消息,流程:
1, 獲取Conection
2, 獲取Channel
3, 定義Exchange,Queue
4, 使用一個RoutingKey將Queue Binding到一個Exchange上
5, 通過指定一個Exchange和一個RoutingKey來將消息發(fā)送到對應(yīng)的Queue上,
6, 接收方在接收時也是獲取connection,接著獲取channel,然后指定一個Queue直接到它關(guān)心的Queue上取消息,它對Exchange,RoutingKey及如何binding都不關(guān)心,到對應(yīng)的Queue上去取消息就OK了
以下是PHP 的實現(xiàn):
生產(chǎn)消息:
Php代碼- 1.<?php
- 2.
- 3.//設(shè)置你的連接
- 4.$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
- 5.$conn = new AMQPConnection($conn_args);
- 6.if ($conn->connect()) {
- 7. echo "Established a connection to the broker \n";
- 8.}
- 9.else {
- 10. echo "Cannot connect to the broker \n ";
- 11.}
- 12.//你的消息
- 13.$message = json_encode(array('Hello World!','php','c++'));
- 14.//創(chuàng)建channel
- 15.$channel = new AMQPChannel($conn);
- 16.//創(chuàng)建exchange
- 17.$ex = new AMQPExchange($channel);
- 18.$ex->setName('exchange');//創(chuàng)建名字
- 19.$ex->setType(AMQP_EX_TYPE_DIRECT);
- 20.$ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
- 21.echo "exchange status:".$ex->declare();
- 22.echo "\n";
- 23.//創(chuàng)建隊列
- 24.$q = new AMQPQueue($channel);
- 25.//設(shè)置隊列名字 如果不存在則添加
- 26.$q->setName('queue');
- 27.$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
- 28.echo "queue status: ".$q->declare();
- 29.echo "\n";
- 30.echo 'queue bind: '.$q->bind('exchange','route.key');//將你的隊列綁定到routingKey
- 31.echo "\n";
- 32.
- 33.$channel->startTransaction();
- 34.echo "send: ".$ex->publish($message, 'route.key'); //將你的消息通過制定routingKey發(fā)送
- 35.$channel->commitTransaction();
- 36.$conn->disconnect();
- 37.
- 38.?>
- <?php
- //設(shè)置你的連接
- $conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
- $conn = new AMQPConnection($conn_args);
- if ($conn->connect()) {
- echo "Established a connection to the broker \n";
- }
- else {
- echo "Cannot connect to the broker \n ";
- }
- //你的消息
- $message = json_encode(array('Hello World!','php','c++'));
- //創(chuàng)建channel
- $channel = new AMQPChannel($conn);
- //創(chuàng)建exchange
- $ex = new AMQPExchange($channel);
- $ex->setName('exchange');//創(chuàng)建名字
- $ex->setType(AMQP_EX_TYPE_DIRECT);
- $ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
- echo "exchange status:".$ex->declare();
- echo "\n";
- //創(chuàng)建隊列
- $q = new AMQPQueue($channel);
- //設(shè)置隊列名字 如果不存在則添加
- $q->setName('queue');
- $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
- echo "queue status: ".$q->declare();
- echo "\n";
- echo 'queue bind: '.$q->bind('exchange','route.key');//將你的隊列綁定到routingKey
- echo "\n";
- $channel->startTransaction();
- echo "send: ".$ex->publish($message, 'route.key'); //將你的消息通過制定routingKey發(fā)送
- $channel->commitTransaction();
- $conn->disconnect();
- ?>
復制代碼 接收方 消費方
Php代碼- 1.
- 2.<?php
- 3.$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'
- 4. ,'vhost'=>'/');
- 5.$conn = new AMQPConnection($conn_args);
- 6.$conn->connect();
- 7.$channel = new AMQPChannel($conn);
- 8.$q = new AMQPQueue($channel);
- 9.$q->setName('queue2');
- 10.$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
- 11.echo "queue status: ".$q->declare();
- 12.echo "==========\n";
- 13.
- 14.$messages = $q->get(AMQP_AUTOACK);
- 15.print_r($messages->getBody());
- 16.echo "\n";
- 17.// disconnect
- 18.$conn->disconnect();
- 19.
- 20.?>
- <?php
- $conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'
- ,'vhost'=>'/');
- $conn = new AMQPConnection($conn_args);
- $conn->connect();
- $channel = new AMQPChannel($conn);
- $q = new AMQPQueue($channel);
- $q->setName('queue2');
- $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
- echo "queue status: ".$q->declare();
- echo "==========\n";
-
- $messages = $q->get(AMQP_AUTOACK);
- print_r($messages->getBody());
- echo "\n";
- // disconnect
- $conn->disconnect();
- ?>
復制代碼 我是初學 消息隊列這方面知識。
不知道 linux的消息隊列 和AMQP 的 優(yōu)缺點
我用php 的sysvmsg 函數(shù) 執(zhí)行while 循環(huán) 可以掛起在服務(wù)器上,實時監(jiān)聽消息隊列的消息,有消息則取,內(nèi)存不會升高
而用amqp while 循環(huán) 監(jiān)聽 即使無消息 內(nèi)存也會越來越高,最后導致PHP 使用內(nèi)存不足而 報錯。
看來這方面得多交流 多學習
|
|