Skip to content

Commit

Permalink
reject message in consumer plugin instead of throw exception, add cla…
Browse files Browse the repository at this point in the history
…sses/methods comment, fix code style issues
  • Loading branch information
anvasiliev committed May 31, 2019
1 parent c336e36 commit 038615b
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ class MassConsumerEnvelopeCallback
* @var StoreManagerInterface
*/
private $storeManager;

/**
* @var EnvelopeFactory
*/
private $envelopeFactory;

/**
* @var LoggerInterface
*/
Expand All @@ -52,13 +54,13 @@ public function __construct(

/**
* Check if amqpProperties['application_headers'] have 'store_id' and use it to setCurrentStore
* Restore currentStore of consumer process after execution.
* Restore original store value in consumer process after execution.
* Reject queue messages because of wrong store_id.
*
* @param SubjectMassConsumerEnvelopeCallback $subject
* @param callable $proceed
* @param EnvelopeInterface $message
* @return array|null
* @throws NoSuchEntityException
* @return void
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
*/
public function aroundExecute(SubjectMassConsumerEnvelopeCallback $subject, callable $proceed, EnvelopeInterface $message)
Expand All @@ -75,19 +77,22 @@ public function aroundExecute(SubjectMassConsumerEnvelopeCallback $subject, call
$currentStoreId = $this->storeManager->getStore()->getId();
} catch (NoSuchEntityException $e) {
$this->logger->error(
sprintf("Can't set currentStoreId during processing queue. Error %s.", $e->getMessage())
sprintf(
"Can't set currentStoreId during processing queue. Message rejected. Error %s.",
$e->getMessage()
)
);
throw new NoSuchEntityException(__($e->getMessage()));
$subject->getQueue()->reject($message, false, $e->getMessage());
return;
}
if (isset($storeId) && $storeId !== $currentStoreId) {
$this->storeManager->setCurrentStore($storeId);
}
}
}
$result = $proceed($message);
$proceed($message);
if (isset($storeId, $currentStoreId) && $storeId !== $currentStoreId) {
$this->storeManager->setCurrentStore($currentStoreId);//restore previous current store
$this->storeManager->setCurrentStore($currentStoreId);//restore original store value
}
return $result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ class Exchange
* @var StoreManagerInterface
*/
private $storeManager;

/**
* @var EnvelopeFactory
*/
private $envelopeFactory;

/**
* @var LoggerInterface
*/
Expand All @@ -51,11 +53,14 @@ public function __construct(
}

/**
* Set current store_id in amqpProperties['application_headers']
* so consumer may check store_id and execute operation in correct store scope.
* Prevent publishing inconsistent messages because of store_id not defined or wrong.
*
* @param SubjectExchange $subject
* @param $topic
* @param EnvelopeInterface[] $envelopes
* @return array|null
* @throws NoSuchEntityException
* @return array
* @throws AMQPInvalidArgumentException
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
*/
Expand All @@ -67,7 +72,7 @@ public function beforeEnqueue(SubjectExchange $subject, $topic, array $envelopes
$this->logger->error(
sprintf("Can't get current storeId and inject to amqp message. Error %s.", $e->getMessage())
);
throw new NoSuchEntityException(__($e->getMessage()));
throw new \Exception($e->getMessage());
}

$updatedEnvelopes = [];
Expand Down
3 changes: 3 additions & 0 deletions app/code/Magento/AmqpStore/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"magento/module-store": "*",
"php": "~7.1.3||~7.2.0"
},
"suggest": {
"magento/module-asynchronous-operations": "*"
},
"type": "magento2-module",
"license": [
"OSL-3.0",
Expand Down
1 change: 0 additions & 1 deletion app/code/Magento/AmqpStore/etc/module.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
<module name="Magento_AmqpStore">
<sequence>
<module name="Magento_Amqp"/>
<module name="Magento_Store"/>
</sequence>
</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class MassConsumer implements ConsumerInterface
* @var Registry
*/
private $registry;

/**
* @var MassConsumerEnvelopeCallbackFactory
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use Magento\Framework\MessageQueue\MessageController;

/**
* Class used by \Magento\AsynchronousOperations\Model\MassConsumer as public callback function.
* Class used as public callback function by async consumer.
* @SuppressWarnings(PHPMD.CouplingBetweenObjects)
*/
class MassConsumerEnvelopeCallback
Expand Down Expand Up @@ -85,6 +85,7 @@ public function __construct(
* Get transaction callback. This handles the case of async.
*
* @param EnvelopeInterface $message
* @return void
*/
public function execute(EnvelopeInterface $message)
{
Expand Down Expand Up @@ -121,4 +122,13 @@ public function execute(EnvelopeInterface $message)
}
}
}

/**
* Get message queue.
* @return QueueInterface
*/
public function getQueue()
{
return $this->queue;
}
}

0 comments on commit 038615b

Please sign in to comment.