-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MongoDB storage driver #133
Changes from all commits
8611b73
018607c
5777bbf
9a63843
1912d54
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
<?php | ||
|
||
namespace Bernard\Driver; | ||
|
||
use MongoCollection; | ||
use MongoDate; | ||
use MongoId; | ||
|
||
/** | ||
* Driver supporting MongoDB | ||
* | ||
* @package Bernard | ||
*/ | ||
class MongoDBDriver implements \Bernard\Driver | ||
{ | ||
private $messages; | ||
private $queues; | ||
|
||
/** | ||
* Constructor. | ||
* | ||
* @param MongoCollection $queues Collection where queues will be stored | ||
* @param MongoCollection $messages Collection where messages will be stored | ||
*/ | ||
public function __construct(MongoCollection $queues, MongoCollection $messages) | ||
{ | ||
$this->queues = $queues; | ||
$this->messages = $messages; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function listQueues() | ||
{ | ||
return $this->queues->distinct('_id'); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function createQueue($queueName) | ||
{ | ||
$data = array('_id' => (string) $queueName); | ||
|
||
$this->queues->update($data, $data, array('upsert' => true)); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function countMessages($queueName) | ||
{ | ||
return $this->messages->count(array( | ||
'queue' => (string) $queueName, | ||
'visible' => true, | ||
)); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function pushMessage($queueName, $message) | ||
{ | ||
$data = array( | ||
'queue' => (string) $queueName, | ||
'message' => (string) $message, | ||
'sentAt' => new MongoDate(), | ||
'visible' => true, | ||
); | ||
|
||
$this->messages->insert($data); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function popMessage($queueName, $interval = 5) | ||
{ | ||
$runtime = microtime(true) + $interval; | ||
|
||
while (microtime(true) < $runtime) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @henrikbjorn: I know you asked about this in #94 (comment). I don't have any ideas for removing duplication between the three drivers that do this (looks like Doctrine DBAL, filesystem, and MongoDB), but I did have some concerns about the loop in general. As-is, it seems impossible to simply poll the queue without blocking. This is noticed in some tests where we call Ideally, I'd like to be able to specify zero as my Do other drivers block on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The other drivers do block when polling, this is especially important for services such as SQS or IronMQ where each request costs money. It would be fine to change the loops from a The interval is always There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I see. That's rather unfortunate for the database and filesystem drivers, though. Ideally, I'd want to use a tailable cursor to block on results with MongoDB (instead of slamming the server with
I'll open a separate PR to change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jmikola but what if you have 2 consumers reading from Mongo ? Wouldn't this allow them to process the message twice if you don't store in MongoDB the fact that it has been sent to a consumer ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Using a tailable cursor, it's quite possible that two PHP processes might see the same message as available, but only one will be able to claim it via There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to support both in a nice way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tailing would require that we always have a capped collection. I suppose we could check once when the driver is constructed and assume the collection will be the same for the duration of our execution. This is reasonable, as users shouldn't be dropping and recreating their Some caveats of capped collections (from here):
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will let it be up to you what to do here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
$result = $this->messages->findAndModify( | ||
array('queue' => (string) $queueName, 'visible' => true), | ||
array('$set' => array('visible' => false)), | ||
array('message' => 1), | ||
array('sort' => array('sentAt' => 1)) | ||
); | ||
|
||
if ($result) { | ||
return array((string) $result['message'], (string) $result['_id']); | ||
} | ||
|
||
usleep(10000); | ||
} | ||
|
||
return array(null, null); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function acknowledgeMessage($queueName, $receipt) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed that a number of drivers don't implement this method. It would be quite easy to delete the message document in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was added when we used SQS as we needed to not loose messages. I would be great to implement this for all drivers, but then i think we should have a rejectMessage aswell when something bad happens. The visible field is to not select the same message again when using multiple consumers and the dbal driver. Yes it should only be called when a message is successfully acted upon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume Either way, it looks like the implementation in this PR is OK for the MongoDB driver, so I assume there's nothing to address here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it would. Yes it is OK! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Opened #134 to track this. |
||
{ | ||
$this->messages->remove(array( | ||
'_id' => new MongoId((string) $receipt), | ||
'queue' => (string) $queueName, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you really need the queue if you have the id? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, but it's basically a free check since the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check |
||
)); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function peekQueue($queueName, $index = 0, $limit = 20) | ||
{ | ||
$cursor = $this->messages->find( | ||
array('queue' => (string) $queueName, 'visible' => true), | ||
array('_id' => 0, 'message' => 1) | ||
) | ||
->sort(array('sentAt' => 1)) | ||
->limit($limit) | ||
->skip($index) | ||
; | ||
|
||
return array_map( | ||
function ($result) { return (string) $result['message']; }, | ||
iterator_to_array($cursor, false) | ||
); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function removeQueue($queueName) | ||
{ | ||
$this->queues->remove(array('_id' => $queueName)); | ||
$this->messages->remove(array('queue' => (string) $queueName)); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function info() | ||
{ | ||
return array( | ||
'messages' => (string) $this->messages, | ||
'queues' => (string) $this->queues, | ||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
<?php | ||
|
||
namespace Bernard\Tests\Driver; | ||
|
||
use Bernard\Driver\MongoDBDriver; | ||
use MongoClient; | ||
use MongoCollection; | ||
use MongoException; | ||
|
||
/** | ||
* @coversDefaultClass Bernard\Driver\MongoDBDriver | ||
*/ | ||
class MongoDBDriverFunctionalTest extends \PHPUnit_Framework_TestCase | ||
{ | ||
const DATABASE = 'bernardQueueTest'; | ||
const MESSAGES = 'bernardMessages'; | ||
const QUEUES = 'bernardQueues'; | ||
|
||
private $messages; | ||
private $queues; | ||
private $driver; | ||
|
||
public function setUp() | ||
{ | ||
if ( ! class_exists('MongoClient')) { | ||
$this->markTestSkipped('MongoDB extension is not available.'); | ||
} | ||
|
||
try { | ||
$mongoClient = new MongoClient(); | ||
} catch (MongoConnectionException $e) { | ||
$this->markTestSkipped('Cannot connect to MongoDB server.'); | ||
} | ||
|
||
$this->queues = $mongoClient->selectCollection(self::DATABASE, self::QUEUES); | ||
$this->messages = $mongoClient->selectCollection(self::DATABASE, self::MESSAGES); | ||
$this->driver = new MongoDBDriver($this->queues, $this->messages); | ||
} | ||
|
||
public function tearDown() | ||
{ | ||
if ( ! $this->messages instanceof MongoCollection) { | ||
return; | ||
} | ||
|
||
$this->messages->drop(); | ||
$this->queues->drop(); | ||
} | ||
|
||
/** | ||
* @medium | ||
* @covers ::acknowledgeMessage() | ||
* @covers ::countMessages() | ||
* @covers ::popMessage() | ||
* @covers ::pushMessage() | ||
*/ | ||
public function testMessageLifecycle() | ||
{ | ||
$this->assertEquals(0, $this->driver->countMessages('foo')); | ||
|
||
$this->driver->pushMessage('foo', 'message1'); | ||
$this->assertEquals(1, $this->driver->countMessages('foo')); | ||
|
||
$this->driver->pushMessage('foo', 'message2'); | ||
$this->assertEquals(2, $this->driver->countMessages('foo')); | ||
|
||
list($message1, $receipt1) = $this->driver->popMessage('foo'); | ||
$this->assertSame('message1', $message1, 'The first message pushed is popped first'); | ||
$this->assertRegExp('/^[a-f\d]{24}$/i', $receipt1, 'The message receipt is an ObjectId'); | ||
$this->assertEquals(1, $this->driver->countMessages('foo')); | ||
|
||
list($message2, $receipt2) = $this->driver->popMessage('foo'); | ||
$this->assertSame('message2', $message2, 'The second message pushed is popped second'); | ||
$this->assertRegExp('/^[a-f\d]{24}$/i', $receipt2, 'The message receipt is an ObjectId'); | ||
$this->assertEquals(0, $this->driver->countMessages('foo')); | ||
|
||
list($message3, $receipt3) = $this->driver->popMessage('foo', 1); | ||
$this->assertNull($message3, 'Null message is returned when popping an empty queue'); | ||
$this->assertNull($receipt3, 'Null receipt is returned when popping an empty queue'); | ||
|
||
$this->assertEquals(2, $this->messages->count(), 'Popped messages remain in the database'); | ||
|
||
$this->driver->acknowledgeMessage('foo', $receipt1); | ||
$this->assertEquals(1, $this->messages->count(), 'Acknowledged messages are removed from the database'); | ||
|
||
$this->driver->acknowledgeMessage('foo', $receipt2); | ||
$this->assertEquals(0, $this->messages->count(), 'Acknowledged messages are removed from the database'); | ||
} | ||
|
||
public function testPeekQueue() | ||
{ | ||
$this->driver->pushMessage('foo', 'message1'); | ||
$this->driver->pushMessage('foo', 'message2'); | ||
|
||
$this->assertSame(array('message1', 'message2'), $this->driver->peekQueue('foo')); | ||
$this->assertSame(array('message2'), $this->driver->peekQueue('foo', 1)); | ||
$this->assertSame(array(), $this->driver->peekQueue('foo', 2)); | ||
$this->assertSame(array('message1'), $this->driver->peekQueue('foo', 0, 1)); | ||
$this->assertSame(array('message2'), $this->driver->peekQueue('foo', 1, 1)); | ||
} | ||
|
||
/** | ||
* @covers ::createQueue() | ||
* @covers ::listQueues() | ||
* @covers ::removeQueue() | ||
*/ | ||
public function testQueueLifecycle() | ||
{ | ||
$this->driver->createQueue('foo'); | ||
$this->driver->createQueue('bar'); | ||
|
||
$queues = $this->driver->listQueues(); | ||
$this->assertCount(2, $queues); | ||
$this->assertContains('foo', $queues); | ||
$this->assertContains('bar', $queues); | ||
|
||
$this->driver->removeQueue('foo'); | ||
|
||
$queues = $this->driver->listQueues(); | ||
$this->assertCount(1, $queues); | ||
$this->assertNotContains('foo', $queues); | ||
$this->assertContains('bar', $queues); | ||
} | ||
|
||
public function testRemoveQueueDeletesMessages() | ||
{ | ||
$this->driver->pushMessage('foo', 'message1'); | ||
$this->driver->pushMessage('foo', 'message2'); | ||
$this->assertEquals(2, $this->driver->countMessages('foo')); | ||
$this->assertEquals(2, $this->messages->count()); | ||
|
||
$this->driver->removeQueue('foo'); | ||
$this->assertEquals(0, $this->driver->countMessages('foo')); | ||
$this->assertEquals(0, $this->messages->count()); | ||
} | ||
|
||
public function testCreateQueueWithDuplicateNameIsNoop() | ||
{ | ||
$this->driver->createQueue('foo'); | ||
$this->driver->createQueue('foo'); | ||
|
||
$this->assertSame(array('foo'), $this->driver->listQueues()); | ||
} | ||
|
||
public function testInfo() | ||
{ | ||
$info = array( | ||
'messages' => self::DATABASE . '.' . self::MESSAGES, | ||
'queues' => self::DATABASE . '.' . self::QUEUES, | ||
); | ||
|
||
$this->assertSame($info, $this->driver->info()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this function make sense for the Doctrine DBAL and MongoDB drivers? For both, it simply inserts a one-field record into a separate table/collection. None of the message methods seem to require us to create a queue. If anything, we only need this record to exist for listing queues, since we'd have no way to know about queues with no messages otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have to support it as the other drivers does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it matter that you can call
pushMessage()
with a non-existent queue? Or is this not really a concern because users don't call these driver methods directly? I assume your library (or Juno) is the main user here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally i dont see it as a concern, as you are correct only Juno really needs this information.