Skip to content

Commit

Permalink
Merge pull request #133 from jmikola/mongodb
Browse files Browse the repository at this point in the history
MongoDB storage driver
  • Loading branch information
henrikbjorn committed Dec 23, 2014
2 parents 4c298c1 + 1912d54 commit 7a183cd
Show file tree
Hide file tree
Showing 5 changed files with 541 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ cache:
directories:
- $HOME/.composer/cache

services: mongodb

before_script:
- sh -c "if [ \"$TRAVIS_PHP_VERSION\" != \"hhvm\" ]; then pyrus install pecl/redis && pyrus build pecl/redis; fi"
- sh -c "if [ \"$TRAVIS_PHP_VERSION\" != \"hhvm\" ]; then echo \"extension=redis.so\" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini; fi"
- sh -c "if [ \"$TRAVIS_PHP_VERSION\" = \"hhvm\" ]; then composer require --dev mongofill/mongofill=dev-master --no-update; fi"
- "composer require --dev phpspec/phpspec:~2.0 --no-update"
- "composer install --no-progress --no-plugins"

Expand Down
43 changes: 43 additions & 0 deletions doc/drivers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Several different types of drivers are supported. Currently these are available:
* `IronMQ`_
* `Amazon SQS`_
* `Google AppEngine`_
* `MongoDB`_

Redis Extension
---------------
Expand Down Expand Up @@ -347,3 +348,45 @@ Requires the installation of pda/pheanstalk. Add the following to your
$pheanstalk = new Pheanstalk('localhost');
$driver = new PheanstalkDriver($pheanstalk);
MongoDB
-------

The MongoDB driver requires the `mongo PECL extension <http://pecl.php.net/package/mongo>`_.
On platforms where the PECL extension is unavailable, such as HHVM,
`mongofill <https://github.com/mongofill/mongofill>`_ may be used instead.

The driver should be constructed with two MongoCollection objects, which
corresponding to the queue and message collections, respectively.

.. code-block:: php
<?php
$mongoClient = new \MongoClient();
$driver = new \Bernard\Driver\MongoDBDriver(
$mongoClient->selectCollection('bernardDatabase', 'queues'),
$mongoClient->selectCollection('bernardDatabase', 'messages'),
);
.. note::

If you are using Doctrine MongoDB or the ODM, you can access the
MongoCollection objects through the ``getMongoCollection()`` method on the
``Doctrine\MongoDB\Collection`` wrapper class, which in turn may be
retrieved from a ``Doctrine\MongoDB\Database`` wrapper or DocumentManager
directly.

To support message queries, the following index should also be created:

.. code-block:: php
<?php
$mongoClient = new \MongoClient();
$collection = $mongoClient->selectCollection('bernardDatabase', 'messages');
$collection->createIndex([
'queue' => 1,
'visible' => 1,
'sentAt' => 1,
]);
150 changes: 150 additions & 0 deletions src/Bernard/Driver/MongoDBDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
<?php

namespace Bernard\Driver;

use MongoCollection;
use MongoDate;
use MongoId;

/**
* Driver supporting MongoDB
*
* @package Bernard
*/
class MongoDBDriver implements \Bernard\Driver
{
private $messages;
private $queues;

/**
* Constructor.
*
* @param MongoCollection $queues Collection where queues will be stored
* @param MongoCollection $messages Collection where messages will be stored
*/
public function __construct(MongoCollection $queues, MongoCollection $messages)
{
$this->queues = $queues;
$this->messages = $messages;
}

/**
* {@inheritDoc}
*/
public function listQueues()
{
return $this->queues->distinct('_id');
}

/**
* {@inheritDoc}
*/
public function createQueue($queueName)
{
$data = array('_id' => (string) $queueName);

$this->queues->update($data, $data, array('upsert' => true));
}

/**
* {@inheritDoc}
*/
public function countMessages($queueName)
{
return $this->messages->count(array(
'queue' => (string) $queueName,
'visible' => true,
));
}

/**
* {@inheritDoc}
*/
public function pushMessage($queueName, $message)
{
$data = array(
'queue' => (string) $queueName,
'message' => (string) $message,
'sentAt' => new MongoDate(),
'visible' => true,
);

$this->messages->insert($data);
}

/**
* {@inheritDoc}
*/
public function popMessage($queueName, $interval = 5)
{
$runtime = microtime(true) + $interval;

while (microtime(true) < $runtime) {
$result = $this->messages->findAndModify(
array('queue' => (string) $queueName, 'visible' => true),
array('$set' => array('visible' => false)),
array('message' => 1),
array('sort' => array('sentAt' => 1))
);

if ($result) {
return array((string) $result['message'], (string) $result['_id']);
}

usleep(10000);
}

return array(null, null);
}

/**
* {@inheritDoc}
*/
public function acknowledgeMessage($queueName, $receipt)
{
$this->messages->remove(array(
'_id' => new MongoId((string) $receipt),
'queue' => (string) $queueName,
));
}

/**
* {@inheritDoc}
*/
public function peekQueue($queueName, $index = 0, $limit = 20)
{
$cursor = $this->messages->find(
array('queue' => (string) $queueName, 'visible' => true),
array('_id' => 0, 'message' => 1)
)
->sort(array('sentAt' => 1))
->limit($limit)
->skip($index)
;

return array_map(
function ($result) { return (string) $result['message']; },
iterator_to_array($cursor, false)
);
}

/**
* {@inheritDoc}
*/
public function removeQueue($queueName)
{
$this->queues->remove(array('_id' => $queueName));
$this->messages->remove(array('queue' => (string) $queueName));
}

/**
* {@inheritDoc}
*/
public function info()
{
return array(
'messages' => (string) $this->messages,
'queues' => (string) $this->queues,
);
}
}
154 changes: 154 additions & 0 deletions tests/Bernard/Tests/Driver/MongoDBDriverFunctionalTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
<?php

namespace Bernard\Tests\Driver;

use Bernard\Driver\MongoDBDriver;
use MongoClient;
use MongoCollection;
use MongoException;

/**
* @coversDefaultClass Bernard\Driver\MongoDBDriver
*/
class MongoDBDriverFunctionalTest extends \PHPUnit_Framework_TestCase
{
const DATABASE = 'bernardQueueTest';
const MESSAGES = 'bernardMessages';
const QUEUES = 'bernardQueues';

private $messages;
private $queues;
private $driver;

public function setUp()
{
if ( ! class_exists('MongoClient')) {
$this->markTestSkipped('MongoDB extension is not available.');
}

try {
$mongoClient = new MongoClient();
} catch (MongoConnectionException $e) {
$this->markTestSkipped('Cannot connect to MongoDB server.');
}

$this->queues = $mongoClient->selectCollection(self::DATABASE, self::QUEUES);
$this->messages = $mongoClient->selectCollection(self::DATABASE, self::MESSAGES);
$this->driver = new MongoDBDriver($this->queues, $this->messages);
}

public function tearDown()
{
if ( ! $this->messages instanceof MongoCollection) {
return;
}

$this->messages->drop();
$this->queues->drop();
}

/**
* @medium
* @covers ::acknowledgeMessage()
* @covers ::countMessages()
* @covers ::popMessage()
* @covers ::pushMessage()
*/
public function testMessageLifecycle()
{
$this->assertEquals(0, $this->driver->countMessages('foo'));

$this->driver->pushMessage('foo', 'message1');
$this->assertEquals(1, $this->driver->countMessages('foo'));

$this->driver->pushMessage('foo', 'message2');
$this->assertEquals(2, $this->driver->countMessages('foo'));

list($message1, $receipt1) = $this->driver->popMessage('foo');
$this->assertSame('message1', $message1, 'The first message pushed is popped first');
$this->assertRegExp('/^[a-f\d]{24}$/i', $receipt1, 'The message receipt is an ObjectId');
$this->assertEquals(1, $this->driver->countMessages('foo'));

list($message2, $receipt2) = $this->driver->popMessage('foo');
$this->assertSame('message2', $message2, 'The second message pushed is popped second');
$this->assertRegExp('/^[a-f\d]{24}$/i', $receipt2, 'The message receipt is an ObjectId');
$this->assertEquals(0, $this->driver->countMessages('foo'));

list($message3, $receipt3) = $this->driver->popMessage('foo', 1);
$this->assertNull($message3, 'Null message is returned when popping an empty queue');
$this->assertNull($receipt3, 'Null receipt is returned when popping an empty queue');

$this->assertEquals(2, $this->messages->count(), 'Popped messages remain in the database');

$this->driver->acknowledgeMessage('foo', $receipt1);
$this->assertEquals(1, $this->messages->count(), 'Acknowledged messages are removed from the database');

$this->driver->acknowledgeMessage('foo', $receipt2);
$this->assertEquals(0, $this->messages->count(), 'Acknowledged messages are removed from the database');
}

public function testPeekQueue()
{
$this->driver->pushMessage('foo', 'message1');
$this->driver->pushMessage('foo', 'message2');

$this->assertSame(array('message1', 'message2'), $this->driver->peekQueue('foo'));
$this->assertSame(array('message2'), $this->driver->peekQueue('foo', 1));
$this->assertSame(array(), $this->driver->peekQueue('foo', 2));
$this->assertSame(array('message1'), $this->driver->peekQueue('foo', 0, 1));
$this->assertSame(array('message2'), $this->driver->peekQueue('foo', 1, 1));
}

/**
* @covers ::createQueue()
* @covers ::listQueues()
* @covers ::removeQueue()
*/
public function testQueueLifecycle()
{
$this->driver->createQueue('foo');
$this->driver->createQueue('bar');

$queues = $this->driver->listQueues();
$this->assertCount(2, $queues);
$this->assertContains('foo', $queues);
$this->assertContains('bar', $queues);

$this->driver->removeQueue('foo');

$queues = $this->driver->listQueues();
$this->assertCount(1, $queues);
$this->assertNotContains('foo', $queues);
$this->assertContains('bar', $queues);
}

public function testRemoveQueueDeletesMessages()
{
$this->driver->pushMessage('foo', 'message1');
$this->driver->pushMessage('foo', 'message2');
$this->assertEquals(2, $this->driver->countMessages('foo'));
$this->assertEquals(2, $this->messages->count());

$this->driver->removeQueue('foo');
$this->assertEquals(0, $this->driver->countMessages('foo'));
$this->assertEquals(0, $this->messages->count());
}

public function testCreateQueueWithDuplicateNameIsNoop()
{
$this->driver->createQueue('foo');
$this->driver->createQueue('foo');

$this->assertSame(array('foo'), $this->driver->listQueues());
}

public function testInfo()
{
$info = array(
'messages' => self::DATABASE . '.' . self::MESSAGES,
'queues' => self::DATABASE . '.' . self::QUEUES,
);

$this->assertSame($info, $this->driver->info());
}
}
Loading

0 comments on commit 7a183cd

Please sign in to comment.