Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds consumer limit subscribers and cycle event #159

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 9 additions & 58 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Bernard\Event\EnvelopeEvent;
use Bernard\Event\RejectEnvelopeEvent;
use Bernard\Event\ConsumerCycleEvent;

declare(ticks=1);

Expand All @@ -15,12 +16,7 @@ class Consumer
{
protected $router;
protected $dispatcher;
protected $shutdown = false;
protected $configured = false;
protected $options = array(
'max-runtime' => PHP_INT_MAX,
'max-messages' => null,
);
protected $consume = true;

/**
* @param Router $router
Expand All @@ -38,56 +34,25 @@ public function __construct(Router $router, EventDispatcherInterface $dispatcher
* @param Queue $queue
* @param array $options
*/
public function consume(Queue $queue, array $options = array())
public function consume(Queue $queue)
{
$this->bind();

while ($this->tick($queue, $options)) {
// NO op
}
}

/**
* Returns true do indicate it should be run again or false to indicate
* it should not be run again.
*
* @param Queue $queue
* @param array $options
*
* @return boolean
*/
public function tick(Queue $queue, array $options = array())
{
$this->configure($options);

if ($this->shutdown) {
return false;
}

if (microtime(true) > $this->options['max-runtime']) {
return false;
}
while ($this->consume) {
if ($envelope = $queue->dequeue()) {
$this->invoke($envelope, $queue);
}

if (!$envelope = $queue->dequeue()) {
return true;
$this->dispatcher->dispatch('bernard.cycle', new ConsumerCycleEvent($this));
}


$this->invoke($envelope, $queue);

if (null === $this->options['max-messages']) {
return true;
}

return (boolean) --$this->options['max-messages'];
}

/**
* Mark Consumer as shutdown
*/
public function shutdown()
{
$this->shutdown = true;
$this->consume = false;
}

/**
Expand Down Expand Up @@ -118,20 +83,6 @@ public function invoke(Envelope $envelope, Queue $queue)
}
}

/**
* @param array $options
*/
protected function configure(array $options)
{
if ($this->configured) {
return $this->options;
}

$this->options = array_filter($options) + $this->options;
$this->options['max-runtime'] += microtime(true);
$this->configured = true;
}

/**
* Setup signal handlers for unix signals.
*/
Expand Down
29 changes: 29 additions & 0 deletions src/Event/ConsumerCycleEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace Bernard\Event;

use Bernard\Consumer;

/**
* @package Bernard
*/
class ConsumerCycleEvent extends \Symfony\Component\EventDispatcher\Event
{
protected $consumer;

/**
* @param Envelope $envelope
*/
public function __construct(Consumer $consumer)
{
$this->consumer = $consumer;
}

/**
* Stops the Consumer on the next cycle
*/
public function shutdown()
{
$this->consumer->shutdown();
}
}
74 changes: 74 additions & 0 deletions src/EventListener/MessageLimitSubscriber.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

namespace Bernard\EventListener;

use Bernard\Event\ConsumerCycleEvent;
use Bernard\Event\EnvelopeEvent;
use Bernard\Event\RejectEnvelopeEvent;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

/**
* @package Bernard
*/
class MessageLimitSubscriber implements EventSubscriberInterface
{
protected $messageLimit;
protected $messageCount = 0;
protected $countFailures = true;

/**
* @param integer $messageLimit
* @param boolean $countFailures
*/
public function __construct($messageLimit, $countFailures = true)
{
$this->messageLimit = $messageLimit;
$this->countFailures = (bool) $countFailures;
}

/**
* Check if the consumer passed the limit
*
* @param ConsumerCycleEvent $event
*/
public function onCycle(ConsumerCycleEvent $event)
{
if ($this->messageLimit <= $this->messageCount) {
$event->shutdown();
}
}

/**
* Counts an invoke
*
* @param EnvelopeEvent $event
*/
public function onInvoke(EnvelopeEvent $event)
{
$this->messageCount++;
}

/**
* Counts a reject
*
* @param RejectEnvelopeEvent $event
*/
public function onReject(RejectEnvelopeEvent $event)
{
if ($this->countFailures) {
$this->messageCount++;
}
}

/**
* @return array
*/
public static function getSubscribedEvents()
{
return array(
'bernard.cycle' => array('onCycle'),
'bernard.invoke' => array('onInvoke'),
'bernard.reject' => array('onReject'),
);
}
}
50 changes: 50 additions & 0 deletions src/EventListener/RuntimeLimitSubscruber.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

namespace Bernard\EventListener;

use Bernard\Event\ConsumerCycleEvent;
use Symfony\Component\EventDispatcher\Event;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

/**
* @package Bernard
*/
class RuntimeLimitSubscriber implements EventSubscriberInterface
{
protected $timeLimit;
protected $initialized = false;

/**
* @param integer $timeLimit
*/
public function __construct($timeLimit)
{
$this->timeLimit = $timeLimit;
}

/**
* Check if the consumer passed the limit
*
* @param ConsumerCycleEvent $event
*/
public function onCycle(ConsumerCycleEvent $event)
{
if (!$this->initialized) {
$this->timeLimit += microtime(true);
$this->initialized = true;
}
if ($this->timeLimit <= microtime(true)) {
$event->shutdown();
}
}

/**
* @return array
*/
public static function getSubscribedEvents()
{
return array(
'bernard.cycle' => array('onCycle'),
);
}
}
49 changes: 6 additions & 43 deletions tests/ConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ class ConsumerTest extends \PHPUnit_Framework_TestCase
public function setUp()
{
$this->router = new SimpleRouter;
$this->router->add('ImportUsers', new Fixtures\Service);

$this->dispatcher = $this->getMock('Symfony\Component\EventDispatcher\EventDispatcherInterface');
$this->consumer = new Consumer($this->router, $this->dispatcher);

$this->router->add('ImportUsers', new Fixtures\Service($this->consumer));
}

public function testEmitsConsumeEvent()
Expand Down Expand Up @@ -52,33 +53,9 @@ public function testEmitsExceptionEvent()
$this->consumer->invoke($envelope, $queue);
}

public function testShutdown()
{
$queue = new InMemoryQueue('queue');

$this->consumer->shutdown();

$this->assertFalse($this->consumer->tick($queue));
}

public function testMaxRuntime()
{
$queue = new InMemoryQueue('queue');

$this->assertFalse($this->consumer->tick($queue, array(
'max-runtime' => -1 * PHP_INT_MAX,
)));
}

public function testNoEnvelopeInQueue()
{
$queue = new InMemoryQueue('queue');
$this->assertTrue($this->consumer->tick($queue));
}

public function testEnvelopeIsAcknowledged()
{
$service = new Fixtures\Service();
$service = new Fixtures\Service($this->consumer);
$envelope = new Envelope(new DefaultMessage('ImportUsers'));

$this->router->add('ImportUsers', $service);
Expand All @@ -87,38 +64,24 @@ public function testEnvelopeIsAcknowledged()
$queue->expects($this->once())->method('dequeue')->will($this->returnValue($envelope));
$queue->expects($this->once())->method('acknowledge')->with($this->equalTo($envelope));

$this->consumer->tick($queue);
$this->consumer->consume($queue);

$this->assertTrue($service::$importUsers);
}

public function testMaxMessages()
{
$this->router->add('ImportUsers', new Fixtures\Service);

$queue = new InMemoryQueue('send-newsletter');
$queue->enqueue(new Envelope(new DefaultMessage('ImportUsers')));
$queue->enqueue(new Envelope(new DefaultMessage('ImportUsers')));
$queue->enqueue(new Envelope(new DefaultMessage('ImportUsers')));

$this->assertFalse($this->consumer->tick($queue, array('max-messages' => 1)));
$this->assertTrue($this->consumer->tick($queue));
$this->assertTrue($this->consumer->tick($queue, array('max-messages' => 100)));
}

/**
* @group debug
*/
public function testEnvelopeWillBeInvoked()
{
$service = new Fixtures\Service();
$service = new Fixtures\Service($this->consumer);

$this->router->add('ImportUsers', $service);

$queue = new InMemoryQueue('send-newsletter');
$queue->enqueue(new Envelope(new DefaultMessage('ImportUsers')));

$this->consumer->tick($queue);
$this->consumer->consume($queue);

$this->assertTrue($service::$importUsers);
}
Expand Down
11 changes: 11 additions & 0 deletions tests/Fixtures/Service.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@

namespace Bernard\Tests\Fixtures;

use Bernard\Consumer;

class Service
{
public static $importUsers = false;

protected $consumer;

public function __construct(Consumer $consumer)
{
$this->consumer = $consumer;
}

public function failSendNewsletter()
{
throw new \Exception();
Expand All @@ -14,6 +23,8 @@ public function failSendNewsletter()
public function importUsers()
{
static::$importUsers = true;

$this->consumer->shutdown();
}

public function createFile()
Expand Down