<?php
$connect = new AMQPConnection();
$connect->connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName(‘logs‘);
$queue->declare();
$queue->bind(‘exchange‘, ‘logs‘);
while (true) {
$queue->consume(‘callback‘);
}
$connection->close();
function callback($envelope, $queue) {
var_dump($envelope->getBody());
$queue->nack($envelope->getDeliveryTag());
}
<?php
$connect = new AMQPConnection();
$connect->connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$exchange->publish(‘direct type test‘,‘logs‘);
var_dump("Send Message OK");
$connect->disconnect();<?php
$connect = new AMQPConnection();
$connect->connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName(‘logs‘);
@$queue->declare();
$queue->bind(‘exchange‘, ‘logs‘);
while (true) {
$queue->consume(‘callback‘);
}
$connection->close();
function callback($envelope, $queue) {
var_dump($envelope->getBody());
$queue->nack($envelope->getDeliveryTag());
}
<?php
$connect = new AMQPConnection();
$connect->connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
for ($index = 1; $index < 5; $index++) {
$exchange->publish($index,‘logs‘);
var_dump("Send:$index");
}
$exchange->delete();
$connect->disconnect();for ($index = 1; $index < 50; $index++) {
$exchange->publish($index,‘logs‘);
var_dump("Send:$index");
}function callback($envelope, $queue) {
var_dump($envelope->getBody());
sleep(3);
$queue->nack($envelope->getDeliveryTag());
}$channel = new AMQPChannel($connect);
$channel = new AMQPChannel($connect); $channel->setPrefetchCount(1);
php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)
原文:http://blog.csdn.net/starparker/article/details/19179353