vendor/symfony/messenger/EventListener/SendFailedMessageForRetryListener.php line 42

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\EventListener;
  11. use Psr\Container\ContainerInterface;
  12. use Psr\Log\LoggerInterface;
  13. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  14. use Symfony\Component\Messenger\Envelope;
  15. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  16. use Symfony\Component\Messenger\Exception\HandlerFailedException;
  17. use Symfony\Component\Messenger\Exception\RuntimeException;
  18. use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
  19. use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
  20. use Symfony\Component\Messenger\Stamp\DelayStamp;
  21. use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
  22. use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
  23. /**
  24.  * @author Tobias Schultze <http://tobion.de>
  25.  */
  26. class SendFailedMessageForRetryListener implements EventSubscriberInterface
  27. {
  28.     private $sendersLocator;
  29.     private $retryStrategyLocator;
  30.     private $logger;
  31.     public function __construct(ContainerInterface $sendersLocatorContainerInterface $retryStrategyLocatorLoggerInterface $logger null)
  32.     {
  33.         $this->sendersLocator $sendersLocator;
  34.         $this->retryStrategyLocator $retryStrategyLocator;
  35.         $this->logger $logger;
  36.     }
  37.     public function onMessageFailed(WorkerMessageFailedEvent $event)
  38.     {
  39.         $retryStrategy $this->getRetryStrategyForTransport($event->getReceiverName());
  40.         $envelope $event->getEnvelope();
  41.         $throwable $event->getThrowable();
  42.         $message $envelope->getMessage();
  43.         $context = [
  44.             'message' => $message,
  45.             'class' => \get_class($message),
  46.         ];
  47.         $shouldRetry $retryStrategy && $this->shouldRetry($throwable$envelope$retryStrategy);
  48.         $retryCount RedeliveryStamp::getRetryCountFromEnvelope($envelope);
  49.         if ($shouldRetry) {
  50.             $event->setForRetry();
  51.             ++$retryCount;
  52.             $delay $retryStrategy->getWaitingTime($envelope);
  53.             if (null !== $this->logger) {
  54.                 $this->logger->error('Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"'$context + ['retryCount' => $retryCount'delay' => $delay'error' => $throwable->getMessage(), 'exception' => $throwable]);
  55.             }
  56.             // add the delay and retry stamp info
  57.             $retryEnvelope $envelope->with(new DelayStamp($delay), new RedeliveryStamp($retryCount));
  58.             // re-send the message for retry
  59.             $this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
  60.         } else {
  61.             if (null !== $this->logger) {
  62.                 $this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"'$context + ['retryCount' => $retryCount'error' => $throwable->getMessage(), 'exception' => $throwable]);
  63.             }
  64.         }
  65.     }
  66.     public static function getSubscribedEvents()
  67.     {
  68.         return [
  69.             // must have higher priority than SendFailedMessageToFailureTransportListener
  70.             WorkerMessageFailedEvent::class => ['onMessageFailed'100],
  71.         ];
  72.     }
  73.     private function shouldRetry(\Throwable $eEnvelope $envelopeRetryStrategyInterface $retryStrategy): bool
  74.     {
  75.         // if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
  76.         if ($e instanceof HandlerFailedException) {
  77.             $shouldNotRetry true;
  78.             foreach ($e->getNestedExceptions() as $nestedException) {
  79.                 if (!$nestedException instanceof UnrecoverableExceptionInterface) {
  80.                     $shouldNotRetry false;
  81.                     break;
  82.                 }
  83.             }
  84.             if ($shouldNotRetry) {
  85.                 return false;
  86.             }
  87.         }
  88.         if ($e instanceof UnrecoverableExceptionInterface) {
  89.             return false;
  90.         }
  91.         return $retryStrategy->isRetryable($envelope);
  92.     }
  93.     private function getRetryStrategyForTransport(string $alias): ?RetryStrategyInterface
  94.     {
  95.         if ($this->retryStrategyLocator->has($alias)) {
  96.             return $this->retryStrategyLocator->get($alias);
  97.         }
  98.         return null;
  99.     }
  100.     private function getSenderForTransport(string $alias): SenderInterface
  101.     {
  102.         if ($this->sendersLocator->has($alias)) {
  103.             return $this->sendersLocator->get($alias);
  104.         }
  105.         throw new RuntimeException(sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.'$alias));
  106.     }
  107. }