W trakcie działania aplikacji biznesowych niejednokrotnie wiele się dzieje. Wielokrotnie zmieniamy stan systemu, o czym dobrze by było np. poinformować inne systemy bądź wysłać maila. Jednym, z bardzo dobrych do tego celu wzorców, jest architektura sterowana zdarzeniami, o której właśnie będzie ten wpis.

System skoncentrowany na zdarzeniach

Aplikacje biznesowe są bardzo specyficznymi programami. Są one ukierunkowane na procesy, które mają za zadanie procesować dane w oparciu o reguły biznesowe, czego efektem są właśnie zdarzenia. Przykładowo, podczas procesu zakupowego w domenie e-commerce, możemy wyróżnić następujące zdarzenia:

  • Produkt został dodany do koszyka
  • Kupon rabatowy został podany przez użytkownika
  • Zamówienie zostało złożone
  • Stan magazynowy produktu został zmieniony
  • Produkt został wyłączony ze sprzedaży

Zdarzenia same w sobie nie powodują nic w systemie. Są to niemutowalne obiekty, które przenoszą informację o tym, co wydarzyło się w systemie oraz w jakim kontekście. Poniżej przykładowa implementacja zdarzenia Produkt został dodany do koszyka:

<?php

use \DateTimeImmutable;

final class ProductHasBeenAddedToCart
{
    private int $productId;
    private int $quantity;
    private DateTimeInterface $eventTimestamp

    public function __construct(
        int $productId,
        int $quantity,
        DateTimeImmutable $eventTimestamp
    ) {
        if ($quantity < 1) {
            throw new InvalidProductQuantityAddedToCartException($this);
        }

        $this->productId = $productId;
        $this->quantity = $quantity;
        $this->eventTimestamp = $eventTimestamp;
    }

    public function getProductId(): int
    {
        return $this->productId;
    }

    public function getQuantity(): int
    {
        return $this->quantity;
    }

    public function getEventTimestamp(): DateTimeImmutable
    {
        return $this->eventTimestamp;
    }
}

To, co na pierwszy rzut oka widać, to operowanie na typach maksymalnie prostych. Powody ku temu są co najmniej dwa:

  • Raz powstałe zdarzenie nie powinno mieć możliwości mutacji (zmiany swojego stanu). Gdybyśmy do konstruktora przekazali np. obiekt encji, to stan zdarzenia moglibyśmy niejako zmodyfikować poprzez modyfikację encji (która jest przekazywana do zdarzenia przez referencję). Jeżeli potrzebujemy przekazać do zdarzenia dodatkowe informacje o stanie systemu w trakcie jego powstania, to powinniśmy tą informację przekazać do zdarzenia w sposób niemutowalny.
  • Przy dobrze rozplanowanej architekturze zdarzenia mogą być obsługiwane w sposób asynchroniczny. Wiąże się z tym konieczność ich serializacji. Niestety, część klas (takich jak np. encje) sprawiają problemy podczas deserializacji. Drugim potencjalnym problemem związanym z deserializacją zdarzeń może być fakt, że wewnątrz niego mogą znajdować się obiekty, których klasy nie znajdują się w systemie reagującym na zdarzenie (przecież nie musimy obsługiwać zdarzenia wyłącznie w systemie, w którym ono zostało utworzone).

Te dwa argumenty powodują, że najbardziej preferowanym dla zdarzeń będzie wykorzystywanie typów prostych. Do grona tych typów można zaliczyć również klasę \DateTimeImmutable, której obiektów nie można modyfikować.

Implementacja Symfony Messenger?

Do implementacji konceptu posłużymy się bardzo prostym przypadkiem. Będziemy operowali na encji Product, zmieniając jej quantity. W chwili, kiedy „wykupimy” ostatnią sztukę produktu, system to wyłapie przez obsługę odpowiedniego zdarzenia, a następnie ustawi flagę enabled na wartość false.

Do pracy będziemy potrzebowali dwóch skonfigurowanych busów – jeden dla komend, drugi dla zdarzeń:

# config/packages/messenger.yaml

framework:
    messenger:
        default_bus: app.command_bus

        buses:
            app.command_bus:
                middleware:
                    - doctrine_transaction
            app.event_bus:
                default_middleware: allow_no_handlers
                middleware:
                    - doctrine_transaction

Obydwa busy będą korzystały z middleware doctrine_transaction, aby mogły pracować w obrębie osobnych transakcji bazodanowych.

Następnym krokiem jest zdefiniowanie komendy oraz obsługującego ją handlera:

<?php

declare(strict_types=1);

namespace App\Command;

final class BuyProductCommand
{
    private int $productId;

    private int $quantity;

    public function __construct(
        int $productId,
        int $quantity
    ) {
        $this->productId = $productId;
        $this->quantity = $quantity;
    }

    public function getProductId(): int
    {
        return $this->productId;
    }

    public function getQuantity(): int
    {
        return $this->quantity;
    }
}

oraz

<?php

declare(strict_types=1);

namespace App\CommandHandler;

use App\Command\BuyProductCommand;
use App\Entity\Product;
use App\Event\ProductHasBeenBoughtEvent;
use App\Repository\ProductRepository;
use LogicException;
use Symfony\Component\Messenger\MessageBusInterface;

final class BuyProductCommandHandler
{
    private MessageBusInterface $eventBus;

    private ProductRepository $productRepository;

    public function __construct(
        MessageBusInterface $eventBus,
        ProductRepository $productRepository
    ) {
        $this->eventBus = $eventBus;
        $this->productRepository = $productRepository;
    }

    public function __invoke(BuyProductCommand $command): void
    {
        $product = $this->productRepository->find($command->getProductId());

        /** @var Product */
        if ($product === null) {
            throw new LogicException(sprintf('Product with ID %d has not been found', $command->getProductId()));
        }

        if ($product->quantity < $command->getQuantity()) {
            throw new LogicException(sprintf('Not enough quantity. You can buy at most %s', $product->quantity));
        }

        $product->quantity -= $command->getQuantity();

        $this->eventBus->dispatch(
            new ProductHasBeenBoughtEvent($command->getProductId())
        );
    }
}

Jak możemy zauważyć, na końcu kodu handlera wyrzucana jest wiadomość na bus przetwarzający zdarzenia. Dzięki dobrodziejstwu Symfony Messenger, ta wiadomość czeka aż do momentu zakończenia obsługi komendy BuyProductCommand.

Klasa ProductHasBeenBoughtEvent wygląda następująco:

<?php

declare(strict_types=1);

namespace App\Event;

final class ProductHasBeenBoughtEvent
{
    private int $productId;

    public function __construct(int $productId)
    {
        $this->productId = $productId;
    }

    public function getProductId(): int
    {
        return $this->productId;
    }
}

Następnym etapem jest obsługa zdarzenia ProductHasBeenBoughtEvent poprzez zdefiniowany poniżej subscriber:

<?php

declare(strict_types=1);

namespace App\EventSubscriber;

use App\Event\ProductHasBeenBoughtEvent;
use App\Repository\ProductRepository;
use LogicException;

final class DisableOutOfStockProductEventSubscriber
{
    private ProductRepository $productRepository;

    public function __construct(
        ProductRepository $productRepository
    ) {
        $this->productRepository = $productRepository;
    }

    public function __invoke(ProductHasBeenBoughtEvent $event)
    {
        $product = $this->productRepository->find($event->getProductId());

        if ($product === null) {
            throw new LogicException(sprintf('Product with ID %d has not been found', $event->getProductId()));
        }

        if ($product->quantity === 0) {
            $product->enabled = false;
        }
    }
}

Całość opakowujemy w prosty kontroler, który wrzuca komendę BuyProductCommand do odpowiedniego busa:

<?php

declare(strict_types=1);

namespace App\Controller;

use App\Command\BuyProductCommand;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\MessageBusInterface;

final class BuyProductController
{
    private MessageBusInterface $commandBus;

    public function __construct(
        MessageBusInterface $commandBus
    ) {
        $this->commandBus = $commandBus;
    }

    public function __invoke(Request $request): Response
    {
        $productId = (int)$request->get('id', 0);
        $quantity = (int)$request->get('quantity', 0);

        if ($quantity <= 0) {
            return new JsonResponse([
                'status' => 'error',
                'error' => 'Expected quantity greater than zero'
            ], Response::HTTP_BAD_REQUEST);
        }

        try {
            $this->commandBus->dispatch(
                new BuyProductCommand($productId, $quantity)
            );

            return new JsonResponse([
                'status' => 'ok',
            ]);
        } catch (HandlerFailedException $exception) {
            return new JsonResponse([
                'status' => 'error',
                'error' => $exception->getPrevious()->getMessage()
            ], Response::HTTP_UNPROCESSABLE_ENTITY);
        }
    }
}

Całość kodu wraz z konfiguracją (Symfony 6) zamieściłem na GitHubie: https://github.com/senghe/GildiaDeveloperow/tree/master/EventBusExample.

Uwaga, to tylko implementacja konceptu!

Przykład przeze mnie zaimplementowany jest jedynie prostym, nieskupiającym się na żadnej poważnej logice kodem. Na pierwszy rzut oka może nie być widać zalet, jakie wiąże ze sobą poruszana w tym wpisie architektura. Aby móc coś powiedzieć o jej zaletach oraz wadach, należy zaimplementować ten wzorzec szeroko w aplikacji.

Eventy obsługiwane asynchronicznie

Prawdziwą potęgą Symfony Messenger jest oczywiście współpraca ze skończoną liczbą systemów kolejkowych, które umożliwiają asynchroniczną obsługę powstałych w systemie wiadomości. W scenariuszu obsługi zdarzeń ma to szczególne znaczenie, gdyż najczęstszym scenariuszem będzie tutaj informacja systemu zewnętrznego o zmianie, która zaszła w systemie. Samych systemów zewnętrznych prawie nigdy nie musimy informować w tym samym czasie, w którym zaszła zmiana w systemie. Przykładowo:

  • Użytkownik może kilka sekund dłużej poczekać na notyfikacje takie jak e-mail czy push message
  • System indeksujący może doindeksować sobie dane chwilkę później
  • Produkt może zostać zdjęty ze sklepu jedną lub dwie sekundy później

Podobnych przykładów, które „mogą chwilkę poczekać” możemy mnożyć i mnożyć. Dlatego właśnie może okazać się, że w naszym systemie większość ze zdarzeń (o ile infrastruktura serwerowa jest na to przygotowana) wpadać będzie właśnie na kolejkę asynchroniczną.

Połączenie architektury Event Driven oraz CQRS

Okazuje się, że architektura oparta o zdarzenia potrafi bardzo dobrze działać w parze z wzorcem Command Query Responsibility Segregation, zwłaszcza, kiedy do implementacji wykorzystujemy Symfony Messengera.

W systemie będziemy mieli wtedy co najmniej dwa busy: jeden do obsługi komend, drugi do obsługi zdarzeń. W serwisach aplikacyjnych produkujemy zdarzenia, które następnie mogą być obsługiwane przez ewentualne listenery/subscribery. Podczas wykonywania logiki obsługi zdarzeń teoretycznie nie powinniśmy rzucać nowych zdarzeń. W tym miejscu jedynie reagujemy na zmiany, które nastąpiły w systemie. Co jednak, jeżeli na konkretne zdarzenie chciałby zareagować… ten sam system, który je utworzył? Według mojego doświadczenia, jest to poprawne zachowanie, pod warunkiem, że podczas obsługi zdarzenia zostanie utworzona nowa komenda, którą następnie wrzucimy na szynę. Dodatkowa komenda jest nam tutaj potrzebna, ponieważ wg wzorca CQRS zmian systemu dokonujemy tylko podczas obsługi komend.

W takiej sytuacji tworzy się nam wtedy łańcuch wywołań command – event – command – event (zapętlić tutaj), który pod względem architektury jest poprawnym rozwiązaniem. Podczas tego typu implementacji musimy uważać jednak na kilka potencjalnych problemów:

  • W systemie mogą zaistnieć zapętlenia, które jest bardzo, ale to bardzo ciężko wyłapać. To, co szczególnie trudno uchwycić, to początek zapętlonej ścieżki (zwłaszcza, jeżeli zapętlenie trwa długo).
  • Mamy znacznie utrudniony debugging procesu biznesowego. Dlatego dobrze jest zadbać o odpowiedni zestaw stampów, które zapamiętają istotne dla nas momenty, takie jak np. komenda inicjująca proces.
  • Znacznie trudniej jest mieć cały proces biznesowy pod kontrolą, przez co może powstać sporo edge case-ów.

Pomimo wyżej zawartych wad, warto jest moim zdaniem rozważyć współpracę tych dwóch wzorców, zwłaszcza, jeżeli mamy na prawdę skomplikowane procesy (np. importery danych), które można by wykonywać w dużej części asynchronicznie, na kilku consumerach jednocześnie.

Profesjonalista, zajmujący się na co dzień aplikacjami biznesowymi w ekosystemie PHP. Jego pasją jest odkrywanie nowych konceptów programistycznych oraz wzorce architektoniczne. Uwielbia również pisać testy, gdyż jak sam uważa, dobry kod to przetestowany kod.

Comments are closed.