Symfony Messenger jest świetny. Zgodzicie się? Instalujecie jedną paczkę i możecie wysłać wiadomość na kolejkę… oh wait. No jednak nie. Chociaż nie jest to tak trywialne jak w Doctrine, to jest to dosyć proste, o czym będzie dzisiejszy post.
Czym w ogóle jest Symfony Messenger?
Symfony Messenger jest komponentem frameworku Symfony, który służy do dysponowania wiadomościami na tzw. szyny. Każda z szyn może świadczyć wysyłkę synchroniczną w obrębie aplikacji oraz wysyłkę asynchroniczną, dzięki której możemy uzyskać funkcję asynchronicznego wykonywania zadań lub komunikacji z innymi aplikacjami. Napisałem ostatnio dwa wpisy, w których przedstawiam jak wykorzystać Symfony Messenger asynchronicznie oraz o Komunikacji dwóch mikroserwisów z Symfony Messengerem, z którymi zachęcam, abyście się zapoznali, bo moim zdaniem warto poszerzyć swoją wiedzę o ten kawałek tortu 🙂
Symfony Messenger jako dobra, ale abstrakcja
Messenger pozwala na wykorzystanie popularnych w dzisiejszym świecie technologii, dzięki którym jest w stanie zaserwować mechanizmy asynchroniczne. Jeżeli przyjrzymy się jego repozytorium, a dokładniej katalogowi src/Transport, to zauważymy, że oprócz bardzo generycznych mechanizmów, nie ma tam nic. Bo sam Messenger w sobie jest tylko abstrakcją, która pozwala na bardzo sprawne rozszerzanie.
Transporty, które możemy sobie doinstalować
Pomimo, że repozytorium Messengera zostaje abstrakcją, to Symfony daje nam kilka implementacji transportów, z których możemy skorzystać:
- AMQP – paczka (link do repozytorium), dzięki której możemy podłączyć się pod dowolny system kolejkowy obsługujący protokół AMQP. Najpopularniejszym narzędziem tutaj jest RabbitMQ.
- AmazonSQS – paczka (link do repozytorium), która realizuje wysyłkę FIFO na kolejkach Amazona (SQS)
- Beanstalkd – paczka (link do repozytorium), dzięki której będziemy mogli wykonywać asynchroniczne zadania przy pomocy Beanstalkd
- Doctrine – paczka (link do repozytorium) realizująca kolejki przy pomocy Doctrine
- Redis – paczka (link do repozytorium), która daje nam możliwość skorzystania z Redis streams
Aby móc zainstalować każdą z tych paczek, należy uruchomić komendę instalacji (composer). Przykładowo, dla transportu Doctrine będzie to wyglądało następująco:
$ composer require symfony/doctrine-messenger
Po wykonaniu tej komendy w katalogu vendor
pojawi nam się katalog vendor/symfony/doctrine-messenger
, który… no właśnie. Jak popatrzymy sobie w źródła, to zauważymy, że nie ma tam żadnej konfiguracji Symfony. Bo nie jest to żaden bundle. Pytaniem zatem jest, w jaki sposób Symfony dowiaduje się o istnieniu tej paczki? W jaki sposób jest ona konfigurowana przez framework?
Cała magia znajduje się w paczce Symfony Framework Bundle, a dokładniej w miejscu, w którym Symfony rozszerza zwój kontener, czyli DependencyInjection Extension. Znajduje się tam metoda registerMessengerConfiguration
, która sprawdza po kolei, czy dana paczka będzie dostępna i znajduje się w niej klasa fabryki konkretnego transportu, Jeżeli tak – to łapie tą fabrykę ze środka i ustawia jej odpowiedni tag, aby mogła ona być widoczna przez kontener:
<?php
// https://github.com/symfony/framework-bundle/blob/6.3/DependencyInjection/FrameworkExtension.php#L2085
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, PhpFileLoader $loader, array $validationConfig)
{
// ...
if (ContainerBuilder::willBeAvailable('symfony/amqp-messenger', AmqpTransportFactory::class, ['symfony/framework-bundle', 'symfony/messenger'], true)) {
$container->getDefinition('messenger.transport.amqp.factory')->addTag('messenger.transport_factory');
}
if (ContainerBuilder::willBeAvailable('symfony/redis-messenger', RedisTransportFactory::class, ['symfony/framework-bundle', 'symfony/messenger'], true)) {
$container->getDefinition('messenger.transport.redis.factory')->addTag('messenger.transport_factory');
}
if (ContainerBuilder::willBeAvailable('symfony/amazon-sqs-messenger', AmazonSqsTransportFactory::class, ['symfony/framework-bundle', 'symfony/messenger'], true)) {
$container->getDefinition('messenger.transport.sqs.factory')->addTag('messenger.transport_factory');
}
if (ContainerBuilder::willBeAvailable('symfony/beanstalkd-messenger', BeanstalkdTransportFactory::class, ['symfony/framework-bundle', 'symfony/messenger'], true)) {
$container->getDefinition('messenger.transport.beanstalkd.factory')->addTag('messenger.transport_factory');
}
// ...
}
Co ciekawe, w tym miejscu mamy również zdefiniowaną domyślną szynę oraz listę middleware, które przetwarzają wiadomość wysyłaną przed i po zdefiniowanych przez użytkownika middleware:
<?php
// https://github.com/symfony/framework-bundle/blob/6.3/DependencyInjection/FrameworkExtension.php#L2115
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, PhpFileLoader $loader, array $validationConfig)
{
// ...
if (null === $config['default_bus'] && 1 === \count($config['buses'])) {
$config['default_bus'] = key($config['buses']);
}
$defaultMiddleware = [
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
],
'after' => [
['id' => 'send_message'],
['id' => 'handle_message'],
],
];
foreach ($config['buses'] as $busId => $bus) {
$middleware = $bus['middleware'];
if ($bus['default_middleware']['enabled']) {
$defaultMiddleware['after'][0]['arguments'] = [$bus['default_middleware']['allow_no_senders']];
$defaultMiddleware['after'][1]['arguments'] = [$bus['default_middleware']['allow_no_handlers']];
// argument to add_bus_name_stamp_middleware
$defaultMiddleware['before'][0]['arguments'] = [$busId];
$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
}
// ...
}
// ...
}
Myślę, że jeżeli ktoś chce przestudiować głębiej to, w jaki sposób Symfony konfiguruje bibliotekę Messengera (oraz jego transporty oczywiście), to klasa FrameworkExtension z paczki symfony/framework-bundle
będzie właściwym miejscem. A teraz wracamy do głównego tematu dzisiejszego wpisu – transportów.
Konstrukcja standardowego transportu
Jeżeli przyjrzycie się każdemu z repozytoriów wymienionych transportów to zauważycie, że każdy z nich wygląda niemal identycznie. Na każdą z tych paczek mamy 6 do 8 klas, które tworzą spójną całość. Kiedy spojrzymy na to nieco bardziej case-agnostycznie, to wyjdzie nam, że transport potrzebuje dosłownie kilku elementów, które różnią się między sobą dosłownie kilkoma szczegółami związanymi z implementacją konkretnego drivera.
Jedną z postawowych klas transportu jest klasa fabryki (TransportFactory), implementująca interfejs TransportFactoryInterface
(zamieszczony poniżej):
<?php
// https://github.com/symfony/messenger/blob/6.3/Transport/TransportFactoryInterface.php
namespace Symfony\Component\Messenger\Transport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
interface TransportFactoryInterface
{
public function createTransport(#[\SensitiveParameter] string $dsn, array $options, SerializerInterface $serializer): TransportInterface;
public function supports(#[\SensitiveParameter] string $dsn, array $options): bool;
}
Klasę TransportFactory można potraktować jako składnik wzorca strategii, który jest wykorzystywany do zdeterminowania transportu, który mógłby obsłużyć podany w konfiguracji DSN (metoda supports(...)
. Czyli jeden poziom wyżej jest coś, co iteruje po wszystkich zdefiniowanych transportach, próbując dopasować ten właściwy. Drugą metodą jest createTransport(...)
, która (jak to fabryka) konstruuje obiekt. W tym wypadku obiekt transportu.
Dalej mamy transport, czyli klasę, która musi implementować interfejs TransportInterface
:
<?php
// https://github.com/symfony/messenger/blob/6.3/Transport/TransportInterface.php
namespace Symfony\Component\Messenger\Transport;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
interface TransportInterface extends ReceiverInterface, SenderInterface
{
}
Tutaj wychodzi na to, że każdy transport musi mieć funkcję tzw. receivera oraz sendera, czyli mechanizmów kolejno odbierających oraz wysyłających wiadomości. O ile nie ma nigdzie wymogu, aby dla funkcji sendera oraz receivera istniały osobne klasy, to Symfony zgodnie z dobrymi praktykami, stosuje tutaj wzorzec dekoratora, delegując metody transportu związane z senderem do obiektu klasy Sender, a metody transportu związane z receiverem – do osobnej instancji klasy Receiver. Oprócz tego, Symfony implementuje w klasie transportu dodatkowe interfejsy, które nie wydają się konieczne do omówienia.
Dalej mamy dwie podobne, ale zarazem różne od siebie klasy: Receivera oraz Sendera. Te dwie klasy są podobne, ponieważ obydwie korzystają z obiektu klasy Connection, która realizuje funkcje związane już bezpośrednio z danym driverem. Tak na prawdę klasa Conection nie jest tutaj konieczna – nie implementuje ona żadnego interfejsu Symfony ani Messengera. Ale musicie przyznać, że oddelegowanie tego typu zadań do osobnego adaptera jest czymś właściwym.
Wracając do Sendera oraz Receivera: implementują one konkretne interfejsy, które dają nam bardzo istotne funkcje:
<?php
// https://github.com/symfony/messenger/blob/6.3/Transport/Sender/SenderInterface.php
namespace Symfony\Component\Messenger\Transport\Sender;
use Symfony\Component\Messenger\Envelope;
interface SenderInterface
{
public function send(Envelope $envelope): Envelope;
}
oraz:
<?php
// https://github.com/symfony/messenger/blob/6.3/Transport/Receiver/ReceiverInterface.php
namespace Symfony\Component\Messenger\Transport\Receiver;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
interface ReceiverInterface
{
public function get(): iterable;
public function ack(Envelope $envelope): void;
public function reject(Envelope $envelope): void;
}
O ile w przypadku Sendera sprawa jest jasna: służy on wysłaniu wiadomości, to w przypadku Receivera jest już nieco bardziej skomplikowanie. O ile sama abstrakcja pozwala na zwrotkę więcej niż jednej wiadomości naraz, to poszczególne implementacje Senderów decydują się jednak na zwrotkę najwyżej jednej wiadomości równocześnie. Zwrotka więcej niż jednej wiadomości mogłaby skutkować przyblokowaniem wiadomości, która została już pobrana i synchronicznie czeka na wykonanie poprzedniej, kiedy tak na prawdę mogłaby być przetworzona przez inny wolny w tym czasie consumer. Wydaje mi się, że stąd właśnie decyzje związane ze zwrotką tylko jednej wiadomości per implementacja. Dodatkowo, każdy receiver może przekazać kolejce (lub innemu mechanizmowi), że wiadomość została przetworzona (metoda ack(...)
) lub została odrzucona (metoda reject(...)
).
Pamiętajcie o konfiguracji!
Jeżeli chcielibyście spróbować własnych sił i stworzyć własny transport, to pamiętajcie o tym, że potrzeba będzie jeszcze odpowiednio skonfigurować Waszą klasę TransportFactory, aby była widoczna przez framework. Wszystkie transporty serwowane przez Symfony są automatycznie konfigurowane przez FrameworkBundle, ale te utworzone przez Was – nie będą.
Comments are closed.