Пример использования symfony/messenger и symfony/console в качестве независимых компонентов

[2024-05-27] PHP

Описание моего опыта по изучению компонентов symfony/messenger и symfony/console фреймворка Symfony.

С ходу непонятно как начать использовать Symfony Messenger

Потребовалось мне на PHP кое-что многопоточно и асинхронно пообрабатывать...
Как я понял, компонент Symfony Messenger - это просто идеальное решение для моей задачи.

Когда-то я проработал книгу Symfony Быстрый старт (https://symfony.com/doc/5.4/the-fast-track/ru/), где на странице https://symfony.com/doc/5.4/the-fast-track/ru/18-async.html показано, как использовать Symfony Messenger в рамках фреймворка Symfony.

Использовать целый фреймворк ради только одного компонента мне не хотелось и я начал искать примеры использования Symfony Messenger в качестве отдельного компонента.
Однако мне не удалось найти какого-то простого и понятного для чайника законченного примера работы Symfony Messenger.

Отдельно выделю две статьи на Хабре, найденные мной в процессе изучения вопроса:
https://habr.com/ru/articles/483584/
https://habr.com/ru/articles/596559/

В первой из них пример есть. Но меня отпугнуло то, что он какой-то слишком уж профессиональный.
Во второй показаны вроде бы понятные пути решения, причём с использованием компонента Symfony Console (прям как мне и надо). Но всё же в этой статье нет законченного рабочего примера...
К тому же, обе статьи "заточены" под дополнительную установку либо БД Redis, либо брокера сообщений RabbitMQ.

Пример использования Symfony Messenger и Symfony Console

Ставить Redis либо RabbitMQ только ради того, чтобы попробовать, как работает Symfony Messenger, мне не хотелось и, взяв за основу вышеуказанные статьи, а также изучив https://symfony.com/doc/current/components/messenger.html и https://symfony.com/doc/current/messenger.html, я создал собственный самодостаточный пример, в котором очередь сообщений хранится в БД SQLite, используемой через Doctrine.

Где скачать пример использования Symfony Messenger

Пример можно взять отсюда: https://github.com/balpom/symfony-messenger-sample

Минимальные требования:
php >=8.1
Symfony >=6.4

Чтобы установить пример, откройте консоль и выполните команду:
composer create balpom/symfony-messenger-sample

Простой пример работы Symfony Messenger

После установки откройте консоль и перейдите в созданную Composer'ом директорию symfony-messenger-sample.
Выполните команду:
php bin/console messenger:consume doctrine-async
Эта команда запустит простой Worker, имитирующий отправку SMS. Сейчас он ждёт, когда в очереди появятся сообщения.

Откройте другую консоль и перейдите в созданную Composer'ом директорию symfony-messenger-sample.
Выполните команду:
php tests/send.php
Эта команда запустит простой скрипт, который добавит несколько сообщений в очередь.
После этого в первой консоли можно увидеть, как Worker "отправляет" SMS, взятые им из очереди.

Пример работы Symfony Messenger в несколько потоков

Из директории symfony-messenger-sample откройте несколько консолей и в каждой из них выполните команду:
php bin/console messenger:consume doctrine-async

Откройте ещё одну консоль и выполните команду:
php tests/sendmany.php
Эта команда запустит простой скрипт, который добавит в очередь несколько десятков сообщений.
После этого в ранее открытых консолях можно увидеть, как Worker'ы совместно "отправляют" SMS, берущиеся ими из очереди.

Symfony Messenger: как остановить работу Worker'ов

Выполните команду: php bin/console messenger:stop-workers
При этом все Worker'ы должны мягко остановиться.

Тонкости работы Symfony Messenger

Не буду здесь подробно описывать как работает вышеуказанный пример - на то он и пример, чтобы его изучить и принцип работы понять в процессе изучения.
За основу была взята статья https://habr.com/ru/articles/596559/ и более-менее этот пример сделан по этой статье.

В Symfony 6 реализации транспорта не входят в компонент

Как я понимаю, примеры в вышеуказанных статьях делались на версии Symfony младше версии 6.
И в этих версиях, как я понимаю, реализации транспорта Ampq, Redis и Doctrine входили в состав компонента Messenger.

Начиная с Symfony 6 эти компоненты нужно устанавливать как отдельные компоненты:
Ampq - symfony/amqp-messenger
Redis - symfony/redis-messenger
Doctrine - symfony/doctrine-messenger

Также существуют компоненты транспорта symfony/amazon-sqs-messenger и symfony/beanstalkd-messenger.

Не останавливаются Worker'ы?

Отдельно остановлюсь вот на каком моменте: по идее при выполнении команды php bin/console messenger:stop-workers все работающие Worker'ы должны мягко завершить свою работу.
Причём сам Worker при запуске радостно сообщает:
The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.

The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command. Ну то есть как бы подразумевается, что по умолчанию команда php bin/console messenger:stop-workers должна работать "из коробки".
Однако в первоначальной версии моего примера по этой команде Worker'ы не останавливались...

В статье https://habr.com/ru/articles/596559/ автор упоминал следующее (цитата):
"Для команды stopWorkersCommand нужно передать адаптер кэширования, для возможности мягкой остановки воркеров, чтобы избежать ситуации когда воркер уже взял сообщение из очереди, но ещё не успел его обработать."

Как я понял, команда messenger:stop-workers прописывает куда-то в кэш некое значение, при появлении которого Worker, работающий в бесконечном цикле, прекращает свою работу.
И действительно, в файле bin/console автор этой статьи передаёт команде stopWorkersCommand экземпляр FilesystemAdapter.

Я чувствовал, что каким-то образом этот же адаптер должен быть известен и команде ConsumeMessagesCommand, также описанной в файле bin/console. Но нигде и никак этот FilesystemAdapter больше не фигурировал...

Сломав весь мозг, пытаясь понять, в чём дело, я полез во внутренности класса Symfony\Component\Messenger\Command\ConsumeMessagesCommand.
Его изучение натолкнуло меня на мысль, что по умолчанию он почему-то не слушает событие StopWorkerOnRestartSignalListener, которое, по идее, останавливает работающий Worker.

В-общем, я придумал, как решить проблему неостанавливающихся Worker'ов...
Исходно команды в файле bin/console описывались следующим образом (по сути, точно как у автора вышеупомянутой статьи):

$cacheItemPool = $container->get(CacheItemPoolInterface::class);
$commands = [
    new ConsumeMessagesCommand(
            new RoutableMessageBus($container),
            $container,
            new EventDispatcher(),
            new ConsoleLogger($output, [])
    ),
    new StopWorkersCommand($cacheItemPool)
];

После добавления в EventDispatcher подписчика StopWorkerOnRestartSignalListener, инициализированного FilesystemAdapter'ом, Worker'ы начали умирать как миленькие: ;-)

$cacheItemPool = $container->get(CacheItemPoolInterface::class);
$eventDispatcher = new EventDispatcher();
$eventDispatcher->addSubscriber(new StopWorkerOnRestartSignalListener($cacheItemPool));
$commands = [
    new ConsumeMessagesCommand(
            new RoutableMessageBus($container),
            $container,
            $eventDispatcher,
            new ConsoleLogger($output, [])
    ),
    new StopWorkersCommand($cacheItemPool)
];

Заключение

Хочу высказать своё небольшое "фи" создателям компонента Symfony Messenger:
не знаю как остальные, но я, как "symfony-чайник", выводимое Worker'ом сообщение The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command воспринимаю как поведение по-умолчанию. В том смысле, что "из коробки" Worker должен "дохнуть" при исполнении этой команды.

На этом всё.
Успехов в изучении Symfony Messenger и Symfony Console!


© 1999 - 2024 www.Balpom.ru