Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB storage driver #133

Merged
merged 5 commits into from
Dec 23, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ matrix:
allow_failures:
- php: hhvm

services: mongodb

before_script:
- sh -c "if [ \"$TRAVIS_PHP_VERSION\" != \"hhvm\" ]; then pyrus install pecl/redis && pyrus build pecl/redis; fi"
- sh -c "if [ \"$TRAVIS_PHP_VERSION\" != \"hhvm\" ]; then echo \"extension=redis.so\" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini; fi"
- sh -c "if [ \"$TRAVIS_PHP_VERSION\" = \"hhvm\" ]; then composer require --dev mongofill/mongofill=dev-master --no-update; fi"
- "composer require --dev phpspec/phpspec:~2.0 --no-update"
- "composer install --no-progress --no-plugins"

Expand Down
43 changes: 43 additions & 0 deletions doc/drivers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Several different types of drivers are supported. Currently these are available:
* `IronMQ`_
* `Amazon SQS`_
* `Google AppEngine`_
* `MongoDB`_

Redis Extension
---------------
Expand Down Expand Up @@ -347,3 +348,45 @@ Requires the installation of pda/pheanstalk. Add the following to your
$pheanstalk = new Pheanstalk('localhost');

$driver = new PheanstalkDriver($pheanstalk);

MongoDB
-------

The MongoDB driver requires the `mongo PECL extension <http://pecl.php.net/package/mongo>`_.
On platforms where the PECL extension is unavailable, such as HHVM,
`mongofill <https://github.com/mongofill/mongofill>`_ may be used instead.

The driver should be constructed with two MongoCollection objects, which
corresponding to the queue and message collections, respectively.

.. code-block:: php

<?php

$mongoClient = new \MongoClient();
$driver = new \Bernard\Driver\MongoDBDriver(
$mongoClient->selectCollection('bernardDatabase', 'queues'),
$mongoClient->selectCollection('bernardDatabase', 'messages'),
);

.. note::

If you are using Doctrine MongoDB or the ODM, you can access the
MongoCollection objects through the ``getMongoCollection()`` method on the
``Doctrine\MongoDB\Collection`` wrapper class, which in turn may be
retrieved from a ``Doctrine\MongoDB\Database`` wrapper or DocumentManager
directly.

To support message queries, the following index should also be created:

.. code-block:: php

<?php

$mongoClient = new \MongoClient();
$collection = $mongoClient->selectCollection('bernardDatabase', 'messages');
$collection->createIndex([
'queue' => 1,
'visible' => 1,
'sentAt' => 1,
]);
150 changes: 150 additions & 0 deletions src/Bernard/Driver/MongoDBDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
<?php

namespace Bernard\Driver;

use MongoCollection;
use MongoDate;
use MongoId;

/**
* Driver supporting MongoDB
*
* @package Bernard
*/
class MongoDBDriver implements \Bernard\Driver
{
private $messages;
private $queues;

/**
* Constructor.
*
* @param MongoCollection $queues Collection where queues will be stored
* @param MongoCollection $messages Collection where messages will be stored
*/
public function __construct(MongoCollection $queues, MongoCollection $messages)
{
$this->queues = $queues;
$this->messages = $messages;
}

/**
* {@inheritDoc}
*/
public function listQueues()
{
return $this->queues->distinct('_id');
}

/**
* {@inheritDoc}
*/
public function createQueue($queueName)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this function make sense for the Doctrine DBAL and MongoDB drivers? For both, it simply inserts a one-field record into a separate table/collection. None of the message methods seem to require us to create a queue. If anything, we only need this record to exist for listing queues, since we'd have no way to know about queues with no messages otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have to support it as the other drivers does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it matter that you can call pushMessage() with a non-existent queue? Or is this not really a concern because users don't call these driver methods directly? I assume your library (or Juno) is the main user here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally i dont see it as a concern, as you are correct only Juno really needs this information.

{
$data = array('_id' => (string) $queueName);

$this->queues->update($data, $data, array('upsert' => true));
}

/**
* {@inheritDoc}
*/
public function countMessages($queueName)
{
return $this->messages->count(array(
'queue' => (string) $queueName,
'visible' => true,
));
}

/**
* {@inheritDoc}
*/
public function pushMessage($queueName, $message)
{
$data = array(
'queue' => (string) $queueName,
'message' => (string) $message,
'sentAt' => new MongoDate(),
'visible' => true,
);

$this->messages->insert($data);
}

/**
* {@inheritDoc}
*/
public function popMessage($queueName, $interval = 5)
{
$runtime = microtime(true) + $interval;

while (microtime(true) < $runtime) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@henrikbjorn: I know you asked about this in #94 (comment). I don't have any ideas for removing duplication between the three drivers that do this (looks like Doctrine DBAL, filesystem, and MongoDB), but I did have some concerns about the loop in general. As-is, it seems impossible to simply poll the queue without blocking. This is noticed in some tests where we call popMessage() on an empty queue and effectively slam the database for five seconds.

Ideally, I'd like to be able to specify zero as my $interval to do a single query. This would require changing the while loop to a do while. Thoughts?

Do other drivers block on popMessage() when the queue is empty? If not, perhaps $interval should always default to zero so users can handle polling on their own.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other drivers do block when polling, this is especially important for services such as SQS or IronMQ where each request costs money.

It would be fine to change the loops from a while to a do while if it is done for all of the drivers.

The interval is always 5 as there is no way to set it currently when using the consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other drivers do block when polling, this is especially important for services such as SQS or IronMQ where each request costs money.

I see. That's rather unfortunate for the database and filesystem drivers, though. Ideally, I'd want to use a tailable cursor to block on results with MongoDB (instead of slamming the server with findAndModify commands), but that would require making the collection capped and also complicate the method quite a bit:

  • Attempt a findAndModify. If something was found, return it; otherwise, continue.
  • Create a tailable cursor to find a message and block up to $interval seconds on it. If nothing is found, return; otherwise, continue.
  • Re-try a findAndModify command, since we expect there is a message waiting. Regardless of whether we find it or not, return afterwards.

I'll open a separate PR to change while to do while after this is merged. Additionally, I think it'd be good to make the sleep interval configurable as well. I noticed that the database and filesystem sleeps were inconsistent, but I wasn't sure if that was intentional.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmikola but what if you have 2 consumers reading from Mongo ? Wouldn't this allow them to process the message twice if you don't store in MongoDB the fact that it has been sent to a consumer ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

findAndModify is atomic, so we can either update the document's visible field (as we do now) or delete the document entirely in the same operation that will return it. Either way, we don't have to worry about another query picking up the document.

Using a tailable cursor, it's quite possible that two PHP processes might see the same message as available, but only one will be able to claim it via findAndModify. In this case, I'd just be using the tailable cursor to block on new data w/o spinning PHP. If multiple processes find new data and it turns out a few new messages were added they can each take on. Otherwise, one will get there first and the others will no-op on the second findAndModify before returning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to support both in a nice way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tailing would require that we always have a capped collection. I suppose we could check once when the driver is constructed and assume the collection will be the same for the duration of our execution. This is reasonable, as users shouldn't be dropping and recreating their messages collection while a system is running.

Some caveats of capped collections (from here):

  • We can't grow documents. This should be fine because our updates merely flip the visible field.
  • We can't delete objects. This means we'd either rely on visible as the deleted flag or introduce a new field to track deleted messages. Since the driver has no way to set visible from false to true, I think using visible to track deleted status is fine. This does mean that acknowledgeMessage() would be a no-op if we're dealing with a capped collection. I think we can still delete messages from non-capped collections, as we wouldn't want those to grow indefinitely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will let it be up to you what to do here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #135 for capped collection support and #136 for refactoring the poll loops for database and filesystem drivers.

$result = $this->messages->findAndModify(
array('queue' => (string) $queueName, 'visible' => true),
array('$set' => array('visible' => false)),
array('message' => 1),
array('sort' => array('sentAt' => 1))
);

if ($result) {
return array((string) $result['message'], (string) $result['_id']);
}

usleep(10000);
}

return array(null, null);
}

/**
* {@inheritDoc}
*/
public function acknowledgeMessage($queueName, $receipt)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that a number of drivers don't implement this method. It would be quite easy to delete the message document in popMessage() as we select it with the remove option. Even in the Doctrine DBAL driver, couldn't we delete the message after popping it (within the same transaction)? What is the value of a visible field in the message records? Is acknowledgeMessage() only intended to be called after a popped message is successfully acted upon?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added when we used SQS as we needed to not loose messages. I would be great to implement this for all drivers, but then i think we should have a rejectMessage aswell when something bad happens.

The visible field is to not select the same message again when using multiple consumers and the dbal driver.

Yes it should only be called when a message is successfully acted upon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume rejectMessage() would just reset the visibility flag?

Either way, it looks like the implementation in this PR is OK for the MongoDB driver, so I assume there's nothing to address here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it would. Yes it is OK!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #134 to track this.

{
$this->messages->remove(array(
'_id' => new MongoId((string) $receipt),
'queue' => (string) $queueName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need the queue if you have the id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but it's basically a free check since the _id index will be used anyway. Since we do take both arguments, I think it's best to actually use them. This avoids the odd case where a user acknowledges a message but gets the queue name wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check

));
}

/**
* {@inheritDoc}
*/
public function peekQueue($queueName, $index = 0, $limit = 20)
{
$cursor = $this->messages->find(
array('queue' => (string) $queueName, 'visible' => true),
array('_id' => 0, 'message' => 1)
)
->sort(array('sentAt' => 1))
->limit($limit)
->skip($index)
;

return array_map(
function ($result) { return (string) $result['message']; },
iterator_to_array($cursor, false)
);
}

/**
* {@inheritDoc}
*/
public function removeQueue($queueName)
{
$this->queues->remove(array('_id' => $queueName));
$this->messages->remove(array('queue' => (string) $queueName));
}

/**
* {@inheritDoc}
*/
public function info()
{
return array(
'messages' => (string) $this->messages,
'queues' => (string) $this->queues,
);
}
}
154 changes: 154 additions & 0 deletions tests/Bernard/Tests/Driver/MongoDBDriverFunctionalTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
<?php

namespace Bernard\Tests\Driver;

use Bernard\Driver\MongoDBDriver;
use MongoClient;
use MongoCollection;
use MongoException;

/**
* @coversDefaultClass Bernard\Driver\MongoDBDriver
*/
class MongoDBDriverFunctionalTest extends \PHPUnit_Framework_TestCase
{
const DATABASE = 'bernardQueueTest';
const MESSAGES = 'bernardMessages';
const QUEUES = 'bernardQueues';

private $messages;
private $queues;
private $driver;

public function setUp()
{
if ( ! class_exists('MongoClient')) {
$this->markTestSkipped('MongoDB extension is not available.');
}

try {
$mongoClient = new MongoClient();
} catch (MongoConnectionException $e) {
$this->markTestSkipped('Cannot connect to MongoDB server.');
}

$this->queues = $mongoClient->selectCollection(self::DATABASE, self::QUEUES);
$this->messages = $mongoClient->selectCollection(self::DATABASE, self::MESSAGES);
$this->driver = new MongoDBDriver($this->queues, $this->messages);
}

public function tearDown()
{
if ( ! $this->messages instanceof MongoCollection) {
return;
}

$this->messages->drop();
$this->queues->drop();
}

/**
* @medium
* @covers ::acknowledgeMessage()
* @covers ::countMessages()
* @covers ::popMessage()
* @covers ::pushMessage()
*/
public function testMessageLifecycle()
{
$this->assertEquals(0, $this->driver->countMessages('foo'));

$this->driver->pushMessage('foo', 'message1');
$this->assertEquals(1, $this->driver->countMessages('foo'));

$this->driver->pushMessage('foo', 'message2');
$this->assertEquals(2, $this->driver->countMessages('foo'));

list($message1, $receipt1) = $this->driver->popMessage('foo');
$this->assertSame('message1', $message1, 'The first message pushed is popped first');
$this->assertRegExp('/^[a-f\d]{24}$/i', $receipt1, 'The message receipt is an ObjectId');
$this->assertEquals(1, $this->driver->countMessages('foo'));

list($message2, $receipt2) = $this->driver->popMessage('foo');
$this->assertSame('message2', $message2, 'The second message pushed is popped second');
$this->assertRegExp('/^[a-f\d]{24}$/i', $receipt2, 'The message receipt is an ObjectId');
$this->assertEquals(0, $this->driver->countMessages('foo'));

list($message3, $receipt3) = $this->driver->popMessage('foo', 1);
$this->assertNull($message3, 'Null message is returned when popping an empty queue');
$this->assertNull($receipt3, 'Null receipt is returned when popping an empty queue');

$this->assertEquals(2, $this->messages->count(), 'Popped messages remain in the database');

$this->driver->acknowledgeMessage('foo', $receipt1);
$this->assertEquals(1, $this->messages->count(), 'Acknowledged messages are removed from the database');

$this->driver->acknowledgeMessage('foo', $receipt2);
$this->assertEquals(0, $this->messages->count(), 'Acknowledged messages are removed from the database');
}

public function testPeekQueue()
{
$this->driver->pushMessage('foo', 'message1');
$this->driver->pushMessage('foo', 'message2');

$this->assertSame(array('message1', 'message2'), $this->driver->peekQueue('foo'));
$this->assertSame(array('message2'), $this->driver->peekQueue('foo', 1));
$this->assertSame(array(), $this->driver->peekQueue('foo', 2));
$this->assertSame(array('message1'), $this->driver->peekQueue('foo', 0, 1));
$this->assertSame(array('message2'), $this->driver->peekQueue('foo', 1, 1));
}

/**
* @covers ::createQueue()
* @covers ::listQueues()
* @covers ::removeQueue()
*/
public function testQueueLifecycle()
{
$this->driver->createQueue('foo');
$this->driver->createQueue('bar');

$queues = $this->driver->listQueues();
$this->assertCount(2, $queues);
$this->assertContains('foo', $queues);
$this->assertContains('bar', $queues);

$this->driver->removeQueue('foo');

$queues = $this->driver->listQueues();
$this->assertCount(1, $queues);
$this->assertNotContains('foo', $queues);
$this->assertContains('bar', $queues);
}

public function testRemoveQueueDeletesMessages()
{
$this->driver->pushMessage('foo', 'message1');
$this->driver->pushMessage('foo', 'message2');
$this->assertEquals(2, $this->driver->countMessages('foo'));
$this->assertEquals(2, $this->messages->count());

$this->driver->removeQueue('foo');
$this->assertEquals(0, $this->driver->countMessages('foo'));
$this->assertEquals(0, $this->messages->count());
}

public function testCreateQueueWithDuplicateNameIsNoop()
{
$this->driver->createQueue('foo');
$this->driver->createQueue('foo');

$this->assertSame(array('foo'), $this->driver->listQueues());
}

public function testInfo()
{
$info = array(
'messages' => self::DATABASE . '.' . self::MESSAGES,
'queues' => self::DATABASE . '.' . self::QUEUES,
);

$this->assertSame($info, $this->driver->info());
}
}
Loading