Tworząc skomplikowane systemy bardzo często sięgamy do architektury mikroserwisowej. Im więcej serwisów, tym większa również potrzeba komunikacyjna między nimi. Przyjrzyjmy się zatem jednej z form komunikacji aplikacji, jaką jest obsługa systemów kolejkowych za pomocą komponentu Symfony Messenger.

Rola mikroserwisów w świecie aplikacji biznesowych

Mikroserwisami nazywamy potocznie zestaw pomniejszych, samodzielnych oraz mocno wyspecjalizowanych aplikacji pracujących w ramach co najmniej jednego biznesu. W świecie rzeczywistym możemy wyobrazić sobie osobną aplikację służącą do autentykacji użytkowników między wieloma aplikacjami (SSO – Single sign-on), zarządzającą wysyłką notyfikacji do użytkownika czy nawet zarządzającą stanami magazynowymi produktów w sklepach wielokanałowych.

Architektura mikroserwisowa niesie za sobą zestaw korzyści:

  • Możemy tworzyć bardziej wyspecjalizowane, samodzielne jednostki kodu o jasno wyznaczonych granicach funkcjonowania (przyjazne DDD)
  • Mniejsze jednostki aplikacyjne przekładają się na lepsze ich wsparcie ze strony programistów. Każdy z serwisów może być wspierany przez małe zespoły programistyczne, co przekłada się pozytywnie na szybkość dostarczania funkcjonalności
  • Skalujemy wyłącznie najbardziej obciążone serwisy, co realnie przekłada się na zaoszczędzone fundusze

Oprócz wskazanej wyżej listy zalet architektura mikroserwisowa posiada również szereg wad:

  • Trudniejszy debugging błędów powstałych w skutek źle przeprowadzonej komunikacji między serwisami
  • Potencjalnie możliwe rozsynchronizowanie spójności danych między serwisami, zwłaszcza w sytuacjach awaryjnych
  • Im więcej serwisów, tym bardziej skomplikowana komunikacja przebiega między nimi

To, czy architektura mikroserwisowa daje nam więcej wartości czy problemów, powinno być oparte o argumenty, a decyzja związana z wykorzystaniem mikroserwisów nie powinna być podjęta w oparciu o panujące trendy.

Dzisiejszy wpis nie jest jednak o mikroserwisach samych w sobie, lecz o komunikacji między nimi w oparciu o bardzo potężne narzędzie Symfony Messenger.

Kilka słów o systemach kolejkowych

Jedną z form komunikacji aplikacji ze sobą jest wykorzystywanie systemów kolejkowych. Praca z systemami kolejkowymi polega na wysyłaniu wiadomości na szynę (ang. bus), skąd dalej trafia ona na konkretny kanał nazywany kolejką. Do każdej kolejki może zostać podpięta skończona liczba procesów konsumujących, a następnie procesujących wiadomości. Kiedy wiadomość zostanie przetworzona przez proces konsumujący (ang. consumer), do kolejki wysyłana zostaje informacja zwrotna, po czym proces konsumujący może pobrać kolejną wiadomość.

Zazwyczaj istnieje możliwość konfiguracji systemu kolejkowego w taki sposób, aby każda wiadomość mogła zostać skonsumowana tylko przez jeden proces, oraz w taki sposób, aby jedna wiadomość mogła zostać skonsumowana przez wiele procesów. Bardzo znanymi systemami kolejkowymi sa:

Ile istnieje mechanizmów kolejkowych, tyle różnych interfejsów, które należy poznać a następnie zaimplementować. Zazwyczaj odnoszą się one w nieco odmienny sposób do tego samego konceptu zaimplementowanego w komponencie Symfony Messenger – Messagingu.

Podstawy pracy z komponentem Symfony Messenger

Pierwszym fragmentem kodu, który powinien nas zainteresować będzie fragment wysyłający wiadomość do Message Busa:

<?php

use Symfony/Component/Messenger/MessageBusInterface;

class CustomMessageDispatcher
{
    private MessageBusInterface $messageBus;

    public function __construct(MessageBusInterface $messageBus)
    {
        $this->messageBus = $messageBus;
    }

    public function dispatchMessage(): void
    {
        $message = "I'm the message you want to receive";

        $this->messageBus->dispatch($message);
    }
}

Na listingu powyżej mamy prostą klasę, która ma za zadanie stworzyć wiadomość i wysłać ją na szynę, która na podstawie odpowiedniej konfiguracji decyduje o tym, co z tą wiadomością dzieje się dalej.

Symfony Messenger nieco od srodka…

To, co dzieje się z wiadomością pod spodem, to opakowanie jej przez obiekt klasy Envelope. Wewnątrz niemalże całej implementacji Messengera operuje się właśnie na obiekcie typu Envelope. Wyobraźmy sobie przykład w prawdziwym świecie: pracując na dyspozytorni paczek każda z nich jest opakowana w pudełko kartonowe. Nikogo w firmie nie interesuje to, co znajduje się wewnątrz. Tak również jest w naszym przypadku.

Do obiektu klasy Envelope możemy dołączać obiekty implementujące interfejs StampInterface. Patrząc znowu na przykład z życia, to są to znaczki, które są dołączane do przesyłki podczas jej procesowania. Dzięki nim jesteśmy w stanie poznać historię przesyłki oraz np. poznać jej dotychczasową drogę. Podobnie jest w architekturze mikroserwisowej – przykładowo, podczas sesji debuggingowej, bardzo pomocna może okazać się informacja o systemach, które przetwarzały naszą wiadomość wcześniej. Same stampy możemy traktować jako dodatkową informację obok logów, do wychwycenia skomplikowanych scenariuszy komunikacyjnych.

Po zapakowaniu wiadomości w klasę Envelope podróżuje ona przez łańcuch klas implementujących interfejs MiddlewareInterface. Każdy middleware w łańcuchu może przetworzyć wiadomość, nadawać jej nowe stampy oraz w razie, jeżeli nadejdzie potrzeba – zaprzestać propagowania kolejnych middleware-ów w łańcuchu. Przyjrzyjmy się przykładowemu middleware zaimplementowanemu przez samego Messengera:

<?php

namespace Symfony\Component\Messenger\Middleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\ValidationFailedException;
use Symfony\Component\Messenger\Stamp\ValidationStamp;
use Symfony\Component\Validator\Validator\ValidatorInterface;

class ValidationMiddleware implements MiddlewareInterface
{
    private $validator;

    public function __construct(ValidatorInterface $validator)
    {
        $this->validator = $validator;
    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $message = $envelope->getMessage();
        $groups = null;
        /** @var ValidationStamp|null $validationStamp */
        if ($validationStamp = $envelope->last(ValidationStamp::class)) {
            $groups = $validationStamp->getGroups();
        }

        $violations = $this->validator->validate($message, null, $groups);
        if (\count($violations)) {
            throw new ValidationFailedException($message, $violations);
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

Powyższy kod realizuje walidację wysłanej na szynę wiadomości. Pomijając detale implementacyjne, wyróżnić możemy dwa miejsca kończące wykonanie metody:

  • Wyrzucany jest wyjątek, jeżeli walidacja zwróci niepustą listę błędów
  • Ze stosu wszystkich middleware-ów wybierany i wykonywany zostaje następny middleware

Bardzo dobrą ideą konceptu middleware-ów jest kontrola nad dalszym wykonaniem kolejnych elementów znajdujących się na stosie. Możemy dzięki temu wykonać dowolną logikę przed oraz po wykonaniu następnego middleware. Bardzo dobrym przykładem użycia jest tutaj middleware dostarczany przez bibliotekę Doctrine:

<?php

namespace Symfony\Bridge\Doctrine\Messenger;

use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;

class DoctrineTransactionMiddleware extends AbstractDoctrineMiddleware
{
    protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
    {
        $entityManager->getConnection()->beginTransaction();
        try {
            $envelope = $stack->next()->handle($envelope, $stack);
            $entityManager->flush();
            $entityManager->getConnection()->commit();

            return $envelope;
        } catch (\Throwable $exception) {
            $entityManager->getConnection()->rollBack();

            if ($exception instanceof HandlerFailedException) {
                // Remove all HandledStamp from the envelope so the retry will execute all handlers again.
                // When a handler fails, the queries of allegedly successful previous handlers just got rolled back.
                throw new HandlerFailedException($exception->getEnvelope()->withoutAll(HandledStamp::class), $exception->getNestedExceptions());
            }

            throw $exception;
        }
    }
}

Dzięki powyższemu middleware nie musimy dbać o transakcyjność zapytań wewnątrz handlerów. Jest to duże ułatwienie, dzięki któremu mamy pewność, że procesy wewnątrz naszej aplikacji działają w sposób atomowy.

Konfiguracja message busów

Domyślnie komponent Messenger wystawia serwis messenger.bus.default, który możemy używać bez żadnej konfiguracji wewnątrz naszej aplikacji. Jest on również domyślnie wstrzykiwany, podczas gdy mamy włączony autowire i korzystamy z interfejsu MessageBusInterface. Do tego busa jest podpięta również następująca lista middleware-ów:

  • add_bus_name_stamp_middleware – dodaje stamp z nazwą busa, przez który przechodzi wiadomość
  • dispatch_after_current_bus – pomaga w lepszej kontroli logiki podczas wystąpienia błędu w handlerze (tylko wielo-busowe systemy)
  • failed_message_processing_middleware – obsługuje ponownie wiadomości przeprocesowane z błędem
  • send_message – wysyła wiadomość do transportu, jeżeli ten jest skonfigurowany
  • handle_message – wywołuje wszystkie handlery skonfigurowane do obsłużenia wiadomości

Przetwarzanie wysłanej wiadomości

To, że wysłana na szynę wiadomość przechodzi przez serię middleware-ów, to już wiemy. Jednym z najczęstszych scenariuszy aplikacyjnych jest jej natychmiastowe przetworzenie. Operacja procesowania wiadomości odbywa się w HandleMessageMiddleware zaimplementowanym również wewnątrz komponentu Symfony Messenger. Mamy tam logikę związaną z wykonywaniem zarówno pojedynczej wiadomości, jak i paczki wielu wiadomości jednocześnie. To, co rzuca nam się na pewno w oczy, to to, że powyższa implementacja w dalszym ciągu korzysta z obiektu klasy Envelope.

Tworzenie własnych middleware-ów

Komponent Messenger jest doskonały pod wieloma względami. Jedną z najlepszych jego cech jest jego rozszerzalność. Przykładowo, możemy stworzyć własne middleware i włączyć je do łańcucha:

framework:
    messenger:
        buses:
            messenger.bus.default:
                # wyłącza domyślną kolekcję middleware-ów
                default_middleware: false

                middleware:
                    # serwisy implementujące Symfony\Component\Messenger\Middleware\MiddlewareInterface
                    - 'App\Middleware\MyMiddleware'
                    - 'App\Middleware\AnotherMiddleware'

Domyślnie, Symfony Messenger uruchamia zdefiniowane przez siebie middleware w kolejności, w której zostały przeze mnie przedstawione powyżej. Lista zdefiniowanych przez nas middleware uruchamiana jest pomiędzy failed_message_processing_middleware oraz send_message. Istnieje jednak możliwość wyłączenia z działania middleware zaimplementowanych przez Messengera – odpowiada za to opcja default_middleware ustawiona na false.

Asynchroniczność z Symfony Messengerem

W języku PHP standardowe pojęcie asynchroniczności nie istnieje. Wszystkie operacje, od początku do końca przetwarzania żądania, są wykonywane w jednym wątku. Jednakże, dzięki komponentowi Symfony Messenger, jesteśmy w stanie zasymulować asynchroniczność. Dokładniej, to jesteśmy w stanie oddelegować część zadań do osobnego procesu uruchomionego w CLI, nazywanego consumerem.

Tuż po wysłaniu wiadomości do szyny, nasza wiadomość zostaje przechwycona przez SendMessageMiddleware. Następnie, wiadomość zostaje zserializowana i wysłana do jednego z systemów realizujących funkcjonalność kolejkowania. Do tego zadania możemy wykorzystać bazę danych, system Redis lub jeden z dedykowanych takim sytuacjom systemów kolejkowych (np. RabbitMQ lub Amazon SQS). Dalej, system kolejkowy decyduje, czy wiadomość może w danej chwili zostać przetworzona (jeżeli jest zbyt duży ruch wiadomości, to może ona czekać w kolejce na swoją kolej). W dalszym kroku system kolejkowy wysyła tę wiadomość do (conajmniej jednego) procesu nasłuchującego – consumera, który fizycznie może pracować na osobnym serwerze niż ten, który wiadomość utworzył. Rolą consumera jest deserializacja tej wiadomości a następnie jej przetworzenie (zmiana stanu systemu na podstawie informacji w niej zawartych). Ostatnim krokiem jest wysłanie informacji zwrotnej z consumera do systemu kolejkowego odnośnie stanu przetworzenia wiadomości.

Aby zmienić sposób wysyłki wiadomości z synchronicznego do asynchronicznego, potrzebujemy w konfiguracji Messengera zdefiniować transport:

framework:
    messenger:
        # ...
        transports:
            myAsyncTransport: "%env(MESSENGER_TRANSPORT_DSN)%"

Bardzo dobrą praktyką jest trzymanie wrażliwych informacji w zmiennych środowiskowych (oraz plikach .env), jak widzimy w wypadku definicji powyższego transportu. Jeżeli chcemy zdefiniować transport z większą liczbą parametrów, to robimy to w następujący (przykładowy) sposób:

framework:
    messenger:
        transports:
            myAsyncTransport:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: high
                    exchange:
                        name: high
                    queues:
                        messages_high: ~

Kolejną rzeczą jest definicja routingu, dzięki któremu Messenger będzie wiedział, w jaki sposób ma zostać obsłużona wiadomość:

framework:
    messenger:
        # ...
        routing:
            App\Message\SmsNotification: myAsyncTransport

Bardzo dużą wygodę może sprawić fakt, że możemy zdefiniować routing na podstawie klasy bazowej bądź interfejsu, dzięki czemu będziemy mieli bardzo krótką definicję routingu. Następnie należy oczywiście dziedziczyć klasy wiadomości po odpowiednich klasach bądź implementować odpowiednie interfejsy.

Ostatnim krokiem jest uruchomienie dwóch procesów: systemu obsługującego kolejkowanie wiadomości oraz procesu konsumującego wiadomości. O ile sposób uruchomienia tego pierwszego bardzo mocno zależy od samego systemu kolejkowego, to proces consumera jesteśmy w stanie uruchomić za pomocą komendy dostarczanej przez komponent Messengera:

bin/console messenger:consume myAsyncTransport

Poniżej przedstawiam kompletną konfigurację Messengera, w oparciu o wszystkie wcześniejsze listingi:

framework:
    messenger:
        buses:
            messenger.bus.default:
                middleware:
                    - 'App\Middleware\MyMiddleware'
                    - 'App\Middleware\AnotherMiddleware'

        transports:
            syncTransport: 'sync://'
            asyncTransport:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: high
                    exchange:
                        name: high
                    queues:
                        messages_high: ~

        routing:
            App\Message\SyncTransportInterface: syncTransport
            App\Message\AsyncMessageInterface: asyncTransport

Na koniec, serdecznie zachęcam do zapoznania się z pełną dokumentacją Symfony Messengera.

Profesjonalista, zajmujący się na co dzień aplikacjami biznesowymi w ekosystemie PHP. Jego pasją jest odkrywanie nowych konceptów programistycznych oraz wzorce architektoniczne. Uwielbia również pisać testy, gdyż jak sam uważa, dobry kod to przetestowany kod.

Comments are closed.