diff --git a/.travis.yml b/.travis.yml index e6a4bea1..0256a8e0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,9 +13,12 @@ cache: directories: - $HOME/.composer/cache +services: mongodb + before_script: - sh -c "if [ \"$TRAVIS_PHP_VERSION\" != \"hhvm\" ]; then pyrus install pecl/redis && pyrus build pecl/redis; fi" - sh -c "if [ \"$TRAVIS_PHP_VERSION\" != \"hhvm\" ]; then echo \"extension=redis.so\" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini; fi" + - sh -c "if [ \"$TRAVIS_PHP_VERSION\" = \"hhvm\" ]; then composer require --dev mongofill/mongofill=dev-master --no-update; fi" - "composer require --dev phpspec/phpspec:~2.0 --no-update" - "composer install --no-progress --no-plugins" diff --git a/doc/drivers.rst b/doc/drivers.rst index 54f85886..e516e083 100644 --- a/doc/drivers.rst +++ b/doc/drivers.rst @@ -9,6 +9,7 @@ Several different types of drivers are supported. Currently these are available: * `IronMQ`_ * `Amazon SQS`_ * `Google AppEngine`_ +* `MongoDB`_ Redis Extension --------------- @@ -347,3 +348,45 @@ Requires the installation of pda/pheanstalk. Add the following to your $pheanstalk = new Pheanstalk('localhost'); $driver = new PheanstalkDriver($pheanstalk); + +MongoDB +------- + +The MongoDB driver requires the `mongo PECL extension `_. +On platforms where the PECL extension is unavailable, such as HHVM, +`mongofill `_ may be used instead. + +The driver should be constructed with two MongoCollection objects, which +corresponding to the queue and message collections, respectively. + +.. code-block:: php + + selectCollection('bernardDatabase', 'queues'), + $mongoClient->selectCollection('bernardDatabase', 'messages'), + ); + +.. note:: + + If you are using Doctrine MongoDB or the ODM, you can access the + MongoCollection objects through the ``getMongoCollection()`` method on the + ``Doctrine\MongoDB\Collection`` wrapper class, which in turn may be + retrieved from a ``Doctrine\MongoDB\Database`` wrapper or DocumentManager + directly. + +To support message queries, the following index should also be created: + +.. code-block:: php + + selectCollection('bernardDatabase', 'messages'); + $collection->createIndex([ + 'queue' => 1, + 'visible' => 1, + 'sentAt' => 1, + ]); diff --git a/src/Bernard/Driver/MongoDBDriver.php b/src/Bernard/Driver/MongoDBDriver.php new file mode 100644 index 00000000..2e42fff7 --- /dev/null +++ b/src/Bernard/Driver/MongoDBDriver.php @@ -0,0 +1,150 @@ +queues = $queues; + $this->messages = $messages; + } + + /** + * {@inheritDoc} + */ + public function listQueues() + { + return $this->queues->distinct('_id'); + } + + /** + * {@inheritDoc} + */ + public function createQueue($queueName) + { + $data = array('_id' => (string) $queueName); + + $this->queues->update($data, $data, array('upsert' => true)); + } + + /** + * {@inheritDoc} + */ + public function countMessages($queueName) + { + return $this->messages->count(array( + 'queue' => (string) $queueName, + 'visible' => true, + )); + } + + /** + * {@inheritDoc} + */ + public function pushMessage($queueName, $message) + { + $data = array( + 'queue' => (string) $queueName, + 'message' => (string) $message, + 'sentAt' => new MongoDate(), + 'visible' => true, + ); + + $this->messages->insert($data); + } + + /** + * {@inheritDoc} + */ + public function popMessage($queueName, $interval = 5) + { + $runtime = microtime(true) + $interval; + + while (microtime(true) < $runtime) { + $result = $this->messages->findAndModify( + array('queue' => (string) $queueName, 'visible' => true), + array('$set' => array('visible' => false)), + array('message' => 1), + array('sort' => array('sentAt' => 1)) + ); + + if ($result) { + return array((string) $result['message'], (string) $result['_id']); + } + + usleep(10000); + } + + return array(null, null); + } + + /** + * {@inheritDoc} + */ + public function acknowledgeMessage($queueName, $receipt) + { + $this->messages->remove(array( + '_id' => new MongoId((string) $receipt), + 'queue' => (string) $queueName, + )); + } + + /** + * {@inheritDoc} + */ + public function peekQueue($queueName, $index = 0, $limit = 20) + { + $cursor = $this->messages->find( + array('queue' => (string) $queueName, 'visible' => true), + array('_id' => 0, 'message' => 1) + ) + ->sort(array('sentAt' => 1)) + ->limit($limit) + ->skip($index) + ; + + return array_map( + function ($result) { return (string) $result['message']; }, + iterator_to_array($cursor, false) + ); + } + + /** + * {@inheritDoc} + */ + public function removeQueue($queueName) + { + $this->queues->remove(array('_id' => $queueName)); + $this->messages->remove(array('queue' => (string) $queueName)); + } + + /** + * {@inheritDoc} + */ + public function info() + { + return array( + 'messages' => (string) $this->messages, + 'queues' => (string) $this->queues, + ); + } +} diff --git a/tests/Bernard/Tests/Driver/MongoDBDriverFunctionalTest.php b/tests/Bernard/Tests/Driver/MongoDBDriverFunctionalTest.php new file mode 100644 index 00000000..e16d5d55 --- /dev/null +++ b/tests/Bernard/Tests/Driver/MongoDBDriverFunctionalTest.php @@ -0,0 +1,154 @@ +markTestSkipped('MongoDB extension is not available.'); + } + + try { + $mongoClient = new MongoClient(); + } catch (MongoConnectionException $e) { + $this->markTestSkipped('Cannot connect to MongoDB server.'); + } + + $this->queues = $mongoClient->selectCollection(self::DATABASE, self::QUEUES); + $this->messages = $mongoClient->selectCollection(self::DATABASE, self::MESSAGES); + $this->driver = new MongoDBDriver($this->queues, $this->messages); + } + + public function tearDown() + { + if ( ! $this->messages instanceof MongoCollection) { + return; + } + + $this->messages->drop(); + $this->queues->drop(); + } + + /** + * @medium + * @covers ::acknowledgeMessage() + * @covers ::countMessages() + * @covers ::popMessage() + * @covers ::pushMessage() + */ + public function testMessageLifecycle() + { + $this->assertEquals(0, $this->driver->countMessages('foo')); + + $this->driver->pushMessage('foo', 'message1'); + $this->assertEquals(1, $this->driver->countMessages('foo')); + + $this->driver->pushMessage('foo', 'message2'); + $this->assertEquals(2, $this->driver->countMessages('foo')); + + list($message1, $receipt1) = $this->driver->popMessage('foo'); + $this->assertSame('message1', $message1, 'The first message pushed is popped first'); + $this->assertRegExp('/^[a-f\d]{24}$/i', $receipt1, 'The message receipt is an ObjectId'); + $this->assertEquals(1, $this->driver->countMessages('foo')); + + list($message2, $receipt2) = $this->driver->popMessage('foo'); + $this->assertSame('message2', $message2, 'The second message pushed is popped second'); + $this->assertRegExp('/^[a-f\d]{24}$/i', $receipt2, 'The message receipt is an ObjectId'); + $this->assertEquals(0, $this->driver->countMessages('foo')); + + list($message3, $receipt3) = $this->driver->popMessage('foo', 1); + $this->assertNull($message3, 'Null message is returned when popping an empty queue'); + $this->assertNull($receipt3, 'Null receipt is returned when popping an empty queue'); + + $this->assertEquals(2, $this->messages->count(), 'Popped messages remain in the database'); + + $this->driver->acknowledgeMessage('foo', $receipt1); + $this->assertEquals(1, $this->messages->count(), 'Acknowledged messages are removed from the database'); + + $this->driver->acknowledgeMessage('foo', $receipt2); + $this->assertEquals(0, $this->messages->count(), 'Acknowledged messages are removed from the database'); + } + + public function testPeekQueue() + { + $this->driver->pushMessage('foo', 'message1'); + $this->driver->pushMessage('foo', 'message2'); + + $this->assertSame(array('message1', 'message2'), $this->driver->peekQueue('foo')); + $this->assertSame(array('message2'), $this->driver->peekQueue('foo', 1)); + $this->assertSame(array(), $this->driver->peekQueue('foo', 2)); + $this->assertSame(array('message1'), $this->driver->peekQueue('foo', 0, 1)); + $this->assertSame(array('message2'), $this->driver->peekQueue('foo', 1, 1)); + } + + /** + * @covers ::createQueue() + * @covers ::listQueues() + * @covers ::removeQueue() + */ + public function testQueueLifecycle() + { + $this->driver->createQueue('foo'); + $this->driver->createQueue('bar'); + + $queues = $this->driver->listQueues(); + $this->assertCount(2, $queues); + $this->assertContains('foo', $queues); + $this->assertContains('bar', $queues); + + $this->driver->removeQueue('foo'); + + $queues = $this->driver->listQueues(); + $this->assertCount(1, $queues); + $this->assertNotContains('foo', $queues); + $this->assertContains('bar', $queues); + } + + public function testRemoveQueueDeletesMessages() + { + $this->driver->pushMessage('foo', 'message1'); + $this->driver->pushMessage('foo', 'message2'); + $this->assertEquals(2, $this->driver->countMessages('foo')); + $this->assertEquals(2, $this->messages->count()); + + $this->driver->removeQueue('foo'); + $this->assertEquals(0, $this->driver->countMessages('foo')); + $this->assertEquals(0, $this->messages->count()); + } + + public function testCreateQueueWithDuplicateNameIsNoop() + { + $this->driver->createQueue('foo'); + $this->driver->createQueue('foo'); + + $this->assertSame(array('foo'), $this->driver->listQueues()); + } + + public function testInfo() + { + $info = array( + 'messages' => self::DATABASE . '.' . self::MESSAGES, + 'queues' => self::DATABASE . '.' . self::QUEUES, + ); + + $this->assertSame($info, $this->driver->info()); + } +} diff --git a/tests/Bernard/Tests/Driver/MongoDBDriverTest.php b/tests/Bernard/Tests/Driver/MongoDBDriverTest.php new file mode 100644 index 00000000..a0e3798b --- /dev/null +++ b/tests/Bernard/Tests/Driver/MongoDBDriverTest.php @@ -0,0 +1,191 @@ +markTestSkipped('MongoDB extension is not available.'); + } + + $this->queues = $this->getMockMongoCollection(); + $this->messages = $this->getMockMongoCollection(); + $this->driver = new MongoDBDriver($this->queues, $this->messages); + } + + public function testListQueues() + { + $this->queues->expects($this->once()) + ->method('distinct') + ->with('_id') + ->will($this->returnValue(array('foo', 'bar'))); + + $this->assertSame(array('foo', 'bar'), $this->driver->listQueues()); + } + + public function testCreateQueue() + { + $this->queues->expects($this->once()) + ->method('update') + ->with(array('_id' => 'foo'), array('_id' => 'foo'), array('upsert' => true)); + + $this->driver->createQueue('foo'); + } + + public function testCountMessages() + { + $this->messages->expects($this->once()) + ->method('count') + ->with(array('queue' => 'foo', 'visible' => true)) + ->will($this->returnValue(2)); + + $this->assertSame(2, $this->driver->countMessages('foo')); + } + + public function testPushMessage() + { + $this->messages->expects($this->once()) + ->method('insert') + ->with($this->callback(function($data) { + return $data['queue'] === 'foo' && + $data['message'] === 'message1' && + $data['sentAt'] instanceof MongoDate && + $data['visible'] === true; + })); + + $this->driver->pushMessage('foo', 'message1'); + } + + public function testPopMessageWithFoundMessage() + { + $this->messages->expects($this->atLeastOnce()) + ->method('findAndModify') + ->with( + array('queue' => 'foo', 'visible' => true), + array('$set' => array('visible' => false)), + array('message' => 1), + array('sort' => array('sentAt' => 1)) + ) + ->will($this->returnValue(array('message' => 'message1', '_id' => '000000000000000000000000'))); + + list($message, $receipt) = $this->driver->popMessage('foo'); + $this->assertSame('message1', $message); + $this->assertSame('000000000000000000000000', $receipt); + } + + /** + * @medium + */ + public function testPopMessageWithMissingMessage() + { + $this->messages->expects($this->atLeastOnce()) + ->method('findAndModify') + ->with( + array('queue' => 'foo', 'visible' => true), + array('$set' => array('visible' => false)), + array('message' => 1), + array('sort' => array('sentAt' => 1)) + ) + ->will($this->returnValue(false)); + + list($message, $receipt) = $this->driver->popMessage('foo', 1); + $this->assertNull($message); + $this->assertNull($receipt); + } + + public function testAcknowledgeMessage() + { + $this->messages->expects($this->once()) + ->method('remove') + ->with($this->callback(function($query) { + return $query['_id'] instanceof MongoId && + (string) $query['_id'] === '000000000000000000000000' && + $query['queue'] === 'foo'; + })); + + $this->driver->acknowledgeMessage('foo', '000000000000000000000000'); + } + + public function testPeekQueue() + { + $cursor = $this->getMockBuilder('MongoCursor') + ->disableOriginalConstructor() + ->getMock(); + + $this->messages->expects($this->once()) + ->method('find') + ->with(array('queue' => 'foo', 'visible' => true), array('_id' => 0, 'message' => 1)) + ->will($this->returnValue($cursor)); + + $cursor->expects($this->at(0)) + ->method('sort') + ->with(array('sentAt' => 1)) + ->will($this->returnValue($cursor)); + + $cursor->expects($this->at(1)) + ->method('limit') + ->with(20) + ->will($this->returnValue($cursor)); + + /* Rather than mock MongoCursor's iterator interface, take advantage of + * the final fluent method call and return an ArrayIterator. */ + $cursor->expects($this->at(2)) + ->method('skip') + ->with(0) + ->will($this->returnValue(new ArrayIterator(array( + array('message' => 'message1'), + array('message' => 'message2'), + )))); + + $this->assertSame(array('message1', 'message2'), $this->driver->peekQueue('foo')); + } + + public function testRemoveQueue() + { + $this->queues->expects($this->once()) + ->method('remove') + ->with(array('_id' => 'foo')); + + $this->messages->expects($this->once()) + ->method('remove') + ->with(array('queue' => 'foo')); + + $this->driver->removeQueue('foo'); + } + + public function testInfo() + { + $this->queues->expects($this->once()) + ->method('__toString') + ->will($this->returnValue('db.queues')); + + $this->messages->expects($this->once()) + ->method('__toString') + ->will($this->returnValue('db.messages')); + + $info = array( + 'messages' => 'db.messages', + 'queues' => 'db.queues', + ); + + $this->assertSame($info, $this->driver->info()); + } + + private function getMockMongoCollection() + { + return $this->getMockBuilder('MongoCollection') + ->disableOriginalConstructor() + ->getMock(); + } +}