W poprzednim wpisie zajmowaliśmy się tematem przetwarzania asynchronicznego wiadomości za pomocą komponentu Symfony Messenger. Czy wiecie, że Messenger służy również do komunikacji dwóch serwisów ze sobą? Nie? No to zaraz się dowiecie 😉

Background i potrzeby biznesowe

Wszyscy żyjemy w świecie biznesu, który nieuchronnie dąży do specjalizacji rozwiązań. Jest coraz mniej systemów odpowiedzialnych za więcej niż jedną dziedzinę naraz. Często nawet pod płaszczem rozwiązań wielo-domenowych kryje się architektura rozproszona, czyli podzielona na wiele komunikujących się ze sobą mikroserwisów.

Każdy z mikroserwisów, niezależnie od tego, czy jest on częścią czegoś wielkiego, czy też bierze udział w stosunkowo małym przedsięwzięciu – musi z czymś gadać. Inaczej nie byłby on mikroserwisem. Generalnie, do komunikacji stosujemy najczęściej dwa podejścia: przez API oraz systemy kolejkowe. O ile komunikacja przez API jest czymś, z czym większość programistów ma do czynienia na codzień, o tyle z kolejkami już nie. I jest to powód, dlaczego w ogóle powstał ten wpis 🙂

Kiedy projektujemy dwa mikroserwisy, które będą ze sobą rozmawiały, musimy zastanowić się nad poziomem skomplikowania tej komunikacji. Jednym z pytań, które sobie zadamy powinno być: Komunikujemy się jednostronnie, czy dwustronnie?

Komunikacja jednostronna vs dwustronna

Jeżeli komunikujemy się jednostronnie, to znaczy, że serwis wysyłający wiadomość do drugiego serwisu, nie oczekuje informacji zwrotnej. Dajemy wtedy znać, że „u nas” wydarzyło się coś, co może zainteresować tego drugiego. Przykładem może być np. informacja o tym, że samochód realizujący spedycję zakończył trasę. Kiedy system śledzący trasy uzyska informację o tym wydarzeniu, może przesłać tą informację dalej na szynę, na której nasłuchuje system magazynowy. Bo magazyn może być bezpośrednio zainteresowany informacją o wolnej ciężarówce, która może zrealizować następną dostawę. Z perspektywy systemu śledzącego trasę nie ma potrzeby uzyskania odpowiedzi – jego nie interesuje, co dalej dzieje się z tym samochodem, dopóki nie przyjdzie czas na kolejny wyjazd w trasę.

I w tym miejscu możemy przejść do komunikacji dwustronnej. Bo jeżeli magazyn skompletuje nowe zamówienie i wyznaczy odpowiedni samochód do spedycji, to na pewno poprosi system śledzący trasy o wyznaczenie nowej trasy, na podstawie adresu zamówienia. W tym wypadku potrzebna będzie druga szyna komunikacyjna, która będzie wysyłała wiadomości w przeciwną stronę niż poprzednia. Różnica między jedną a drugą szyną będzie taka, że inny system będzie wysyłał wiadomości na szynę, a inny będzie na niej nasłuchiwał oraz przetwarzał te wiadomości.

Pobawmy się w komunikację dwustronną! 🙂

Celem dzisiejszego wpisu jest przedstawienie wszystkich czynności, które należy wykonać, aby móc skomunikować ze sobą dwie aplikacje przez system kolejkowy. Ponieważ w świecie krąży plotka, że komunikacja dwustronna znacznie podnosi trudność całego przedsięwzięcia, to ja tutaj spróbuję ją zdementować.

Kilka słów o architekturze rozwiązania

Okej, przechodzimy do tej fajnej części wpisu – technicznej 😉 . Wszystko najlepiej będzie zacząć od utworzenia dwóch aplikacji w Symfony. Nie ukrywam, że liczę tutaj na to, że znacie chociaż trochę Symfony. Zatem, plan działania ma się następująco:

  1. Utworzymy dwie aplikacje w Symfony: RoutePlanner oraz Warehouse z bardzo prostym modelem składającym się z jednej, góra dwóch encji.
  2. Aplikacje będą wymieniały ze sobą wiadomości na dwóch szynach (kolejkach):
    • warehouse, do której wiadomości będzie wysyłał RoutePlanner, a konsumować z niej będzie Warehouse,
    • route_planner, do której wiadomości będzie wysyłał Warehourse, a nasłuchiwać na niej będzie RoutePlanner.
  3. Efektem technicznym naszego rozwiązania będzie utworzenie kontrolera w aplikacji Warehouse, który utworzy u siebie encję wysyłki, po czym wyśle wiadomość do RoutePlannera o wyznaczenie trasy. Ten z kolei utworzy u siebie encję trasy i odeśle z powrotem wiadomość do Warehouse z identyfikatorem trasy, czasem trwania podróży oraz kosztem trasy.

Sami przyznacie, że nie wygląda to skomplikowanie. A skoro tak jest, to… do roboty! 🙂

Zaczynamy od infrastruktury i… kolejek

Pozwólcie, że nad kwestią tego, jaki system kolejkowy wybrać, nie będę się zbytnio roztrząsać. Nie ma to jak stary, dobry RabbitMQ postawiony na Dockerze:

# docker-compose.rabbitmq.yaml

version: "3.4"
services:
  rabbitmq:
    image: rabbitmq:3-management-alpine
    ports:
        - 5672:5672
        - 15672:15672
    volumes:
        - rabbit-data:/var/lib/rabbitmq
        - rabbit-log:/var/log/rabbitmq

volumes:
  rabbit-data:
  rabbit-log:

Do uruchomienia Rabbita otwieramy nową kartę w terminalu i podajemy komendę:

$ docker-compose -f ./docker-compose.rabbitmq.yaml up

Komendy i zdarzenia jako osobne repozytorium

Zanim przejdziemy do jednej, czy drugiej aplikacji, musimy przejść przez kwestię wiadomości, którymi te systemy będą między sobą się wymieniać. Ponieważ będziemy korzystać z domyślnej konfiguracji Symfony Messengera, to te wiadomości będą serializowane za pomocą funkcji serialize / unserialize. W pełnej wiadomości będziemy mieli informację o tym, do postaci obiektu jakiej klasy wiadomość ma być zdeserializowana. Dlatego najlepiej by było, aby obydwie aplikacje korzystały z tych samych klas. A skoro tak, to dlaczego by nie skorzystać z composera? Możemy podpiąć sobie do obydwu aplikacji prywatne repozytorium, które będzie zawierało klasy komend i zdarzeń. Nie chciałbym zbytnio omawiać tego, jak to zrobić krok po kroku, najlepiej abyście przeczytali sobie o tym na stronie composera.

Jeżeli jednak nie chcecie bawić się w dodatkowe repozytoria, to możecie po prostu przekleić klasy komend i zdarzeń do obydwu aplikacji. Najważniejsze z perspektywy działania tego przykładu jest to, aby te klasy były dostępne w obydwu aplikacjach w takiej samej postaci.

Przechodząc do kodu: będziemy mieli komendę, którą Warehouse będzie wysyłać do RoutePlannera:

<?php

declare(strict_types=1);

namespace App\Command\RoutePlanner;

final class CreateRouteForPackageCommand
{
    public function __construct(
        public readonly int $packageId,
        public readonly int $carId,
        public readonly int $packageWeight,
        public readonly string $address,
    ) {

    }
}

Sama komenda należy do API mikroserwisu RoutePlannera, ponieważ traktujemy ją jako interfejs komunikacyjny udostępniony właśnie przez niego. Do tego będzie nam również potrzebne zdarzenie, które RoutePlanner odeśle z powrotem do Warehouse, kiedy trasa zostanie wyznaczona:

<?php

declare(strict_types=1);

namespace App\Event\RoutePlanner;

final class RouteHasBeenCreatedEvent
{
    public function __construct(
        public readonly int $packageId,
        public readonly int $routeId,
        public readonly int $duration,
        public readonly int $cost,
    ) {

    }
}

Zdarzenie również będzie należało do API RoutePlannera. Bardzo istotnym tutaj jest, aby nasze komendy oraz zdarzenia były immutable (ang. – niezmienne). W nowym PHP (8.1) możemy to zrobić za pomocą słowa kluczowego readonly. Jeżeli korzystamy ze starszego PHPa, to możemy to zrobić w podobny sposób jak ten, który przedstawiłem we wpisie Symfony Messenger asynchronicznie.

Implementacja aplikacji Warehouse

Aplikacja Warehouse będzie składała się z:

  • Kontrolera, który utworzy egzemplarz encji paczki, a następnie wyśle komendę na szynę komunikacyjną do aplikacji RoutePlanner,
  • Encji oraz konfiguracji mappingu,
  • Subscriber nasłuchujący na zdarzenie RouteHasBeenCreatedEvent, który do odpowiedniego egzemplarza encji paczki dopisze informacje dotyczące wyznaczonej dla niej trasy.

W tym miejscu tworzymy nowy projekt Symfony dla aplikacji Warehouse. Klasy komendy oraz zdarzenia zostały zdefiniowane wyżej. Tutaj zakładamy, że gdzieś po drodze composer został tak dokonfigurowany, aby zarówno Warehouse oraz RoutePlanner widziały te klasy. Albo, że przekleiliście je również tutaj.

Encja paczki w tym projekcie będzie wyglądała następująco:

<?php

declare(strict_types=1);

namespace App\Entity;

class Package
{
    protected ?int $id;

    protected ?int $routeId = null;
    protected ?int $routeDuration = null;
    protected ?int $routeCost = null;

    public function __construct(
        protected int $carId,
        protected int $weight,
        protected string $address,
    ) {

    }

    public function getId(): ?int
    {
        return $this->id;
    }

    public function getCarId(): int
    {
        return $this->carId;
    }

    public function getWeight(): int
    {
        return $this->weight;
    }

    public function getAddress(): string
    {
        return $this->address;
    }

    public function setRouteDetails(
        int $routeId,
        int $duration,
        int $cost,
    ): void {
        $this->routeId = $routeId;
        $this->routeDuration = $duration;
        $this->routeCost = $cost;
    }

    public function getRouteId(): ?int
    {
        return $this->routeId;
    }

    public function getRouteDuration(): ?int
    {
        return $this->routeDuration;
    }

    public function getRouteCost(): ?int
    {
        return $this->routeCost;
    }
}

Natomiast jej mapowanie wygląda następująco:

<?xml version="1.0" encoding="UTF-8"?>
<doctrine-mapping xmlns="http://doctrine-project.org/schemas/orm/doctrine-mapping"
                  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                  xsi:schemaLocation="http://doctrine-project.org/schemas/orm/doctrine-mapping
                                      http://doctrine-project.org/schemas/orm/doctrine-mapping.xsd">

    <entity name="App\Entity\Package" table="package">
        <id name="id" type="integer" column="id">
            <generator strategy="AUTO"/>
        </id>

        <field name="carId" type="integer" nullable="false" />
        <field name="weight" type="integer" nullable="false" />
        <field name="address" type="string" nullable="false" />
        <field name="routeId" type="integer" nullable="true" />
        <field name="routeDuration" type="integer" nullable="true" />
        <field name="routeCost" type="integer" nullable="true" />
    </entity>
</doctrine-mapping>

Jak widać, zbyt wiele filozofii tu nie ma. Chociaż w sumie to warto wspomnieć o tym, że encja zawsze powinna być w stanie poprawnym biznesowo. Dlatego mamy konstruktor i jedną metodę ustawiającą szczegóły związane z trasą. Z setterów rezygnujemy, bo na dłuższą metę takie podejście służy wyciekowi logiki biznesowej poza warstwę domeny.

Kolejnym elementem Warehouse jest kontroler, który utworzy egzemplarz encji paczki oraz wyśle komendę na szynę do RoutePlannera. Poniżej mamy konfigurację routingu:

# config/routes.yaml

# ...

packages_post:
    path: /packages
    methods: [POST]
    defaults:
        _controller: App\Controller\CreateNewPackageController

Poniżej mamy definicję serwisu kontrolera w kontenerze Dependency Injection:

# config/services.yaml

services:

    # ...

    App\Controller\CreateNewPackageController:
        arguments:
            - "@doctrine.orm.entity_manager"
            - "@app.command_bus"
        public: true

A tu z kolei sam kontroler:

<?php

declare(strict_types=1);

namespace App\Controller;

use App\Command\RoutePlanner\CreateRouteForPackageCommand;
use App\Entity\Package;
use Doctrine\Orm\EntityManagerInterface;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;

final class CreateNewPackageController
{
    public function __construct(
        readonly private EntityManagerInterface $entityManager,
        readonly private MessageBusInterface $commandBus,
    ) {

    }

    public function __invoke(Request $request): Response
    {
        $payload = $request->getPayload();
        $carId = (int)$payload->get('carId', 0);
        $weight = (int)$payload->get('weight', 0);
        $address = $payload->get('address', 'No address');

        $package = new Package($carId, $weight, $address);
        $this->entityManager->persist($package);
        $this->entityManager->flush(); // Flush is needed to have entity ID

        $this->commandBus->dispatch(
            new CreateRouteForPackageCommand(
                $package->getId(),
                $package->getCarId(),
                $package->getWeight(),
                $package->getAddress(),
            ),
        );

        return new JsonResponse(['queued' => true]);
    }
}

Jak możecie zauważyć – tworzymy encję, wysyłamy komendę na szynę i… tyle. Nie czekamy na odpowiedź. Cała magia w tym miejscu dzieje się w sposób zupełnie asynchroniczny. Synchronicznie wysyłamy jedynie informację na kolejkę (metoda dispatch(...)). Dalej aplikacja RoutePlanner pobierze (w odpowiednim do aktualnych warunków czasie) tą wiadomość w skrypcie konsumującym. Następnie przetworzy ją i odeśle informację zwrotną – również do systemu kolejkowego. Odpowiedź pójdzie wtedy na inną kolejkę, do której będzie podłączony skrypt konsumujący aplikacji Warehouse.

Skrypt konsumujący dostarczony zostanie przez Symfony Messengera. Jedyne, co musimy zrobić, to zdefiniować klasę subscribera, który obsłuży zdarzenie zwrócone przez RoutePlannera:

<?php

declare(strict_types=1);

namespace App\EventSubscriber;

use App\Event\RoutePlanner\RouteHasBeenCreatedEvent;
use App\Entity\Package;
use Doctrine\ORM\EntityManagerInterface;
use RuntimeException;

final class ApplyRouteDetailsToPackageSubscriber
{
    public function __construct(
        readonly private EntityManagerInterface $entityManager,
    ) {

    }

    public function __invoke(RouteHasBeenCreatedEvent $event): void
    {
        $repository = $this->entityManager->getRepository(Package::class);

        $package = $repository->find($event->packageId);
        if ($package === null) {
            throw new RuntimeException(sprintf('No package found with id %d', $event->packageId));
        }

        $package->setRouteDetails(
            $event->routeId,
            $event->duration,
            $event->cost
        );
    }
}

Poniżej definicja serwisu tego subscribera:

# config/services.yaml

services:

    # ...

    App\EventSubscriber\ApplyRouteDetailsToPackageSubscriber:
        arguments:
            - "@doctrine.orm.entity_manager"
        tags:
            - "messenger.message_handler"

W tym miejscu należałoby omówić konfigurację Messengera. Jednak wydaje mi się, że lepiej będzie zrobić to pod koniec wpisu, z wyjaśnieniem co gdzie i dlaczego, w związku z obydwoma aplikacjami.

Implementacja aplikacji RoutePlanner

W tym miejscu tworzymy nowy projekt Symfony dla aplikacji RoutePlanner, która będzie nieco prostsza niż Warehouse. Tutaj zdefiniujemy encję trasy oraz klasę handlera, która obsłuży komendę wysłaną z Warehouse. Samo obsłużenie będzie polegało na utworzeniu encji trasy, zapisie jej w bazie danych oraz odesłaniu informacji zwrotnej na drugą szynę.

Wewnątrz handlera komendy CreateRouteForPackageCommand będziemy tworzyli encję trasy (z jakimiś testowymi danymi), w celu zasymulowania logiki planowania trasy. Następnie zdobędziemy identyfikator tej trasy i przekażemy wszystkie potrzebne informacje do zdarzenia RouteHasBeenCreatedEvent, na które nasłuchuje Warehouse (subscriber nasłuchujący to zdarzenie jest zaimplementowany wyżej).

Encja trasy wygląda następująco:

<?php

declare(strict_types=1);

namespace App\Entity;

class Route
{
    protected ?int $id;

    public function __construct(
        protected int $packageId,
        protected int $carId,
        protected int $duration,
        protected int $cost,
    ) {

    }

    public function getId(): ?int
    {
        return $this->id;
    }

    public function getPackageId(): int
    {
        return $this->packageId;
    }

    public function getCarId(): int
    {
        return $this->carId;
    }

    public function getDuration(): int
    {
        return $this->duration;
    }

    public function getCost(): int
    {
        return $this->cost;
    }
}

A mapowanie tej encji mamy poniżej:

<?xml version="1.0" encoding="UTF-8"?>
<doctrine-mapping xmlns="http://doctrine-project.org/schemas/orm/doctrine-mapping"
                  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                  xsi:schemaLocation="http://doctrine-project.org/schemas/orm/doctrine-mapping
                                      http://doctrine-project.org/schemas/orm/doctrine-mapping.xsd">

    <entity name="App\Entity\Route" table="route">
        <id name="id" type="integer" column="id">
            <generator strategy="AUTO"/>
        </id>

        <field name="packageId" type="integer" nullable="false" />
        <field name="carId" type="integer" nullable="false" />
        <field name="duration" type="integer" nullable="false" />
        <field name="cost" type="integer" nullable="false" />
    </entity>
</doctrine-mapping>

Kolejnym elementem będzie wspomniany wyżej command handler:

<?php

declare(strict_types=1);

namespace App\CommandHandler;

use App\Command\RoutePlanner\CreateRouteForPackageCommand;
use App\Entity\Route;
use App\Event\RoutePlanner\RouteHasBeenCreatedEvent;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\MessageBusInterface;

final class CreateRouteForPackageCommandHandler
{
    private const DURATION_MULTIPLIER = 2.75;

    private const COST_MULTIPLIER = 17.45;

    public function __construct(
        readonly private EntityManagerInterface $entityManager,
        readonly private MessageBusInterface $eventBus,
    ) {

    }

    public function __invoke(CreateRouteForPackageCommand $command): void
    {
        $repository = $this->entityManager->getRepository(Route::class);

        $route = $repository->findOneBy(['packageId' => $command->packageId]);
        if ($route === null) {
            $route = $this->createNewRoute(
                $command->packageId,
                $command->carId,
                $command->packageWeight,
                $command->address,
            );
            $this->entityManager->persist($route);
            $this->entityManager->flush(); // Flush is needed to have entity ID
        }

        $this->eventBus->dispatch(
            new RouteHasBeenCreatedEvent(
                $route->getPackageId(),
                $route->getId(),
                $route->getDuration(),
                $route->getCost(),
            )
        );
    }

    private function createNewRoute(
        int $packageId,
        int $carId,
        int $packageWeight,
        string $address,
    ): Route {
        return new Route(
            $packageId,
            $carId,
            (int)($packageWeight * self::DURATION_MULTIPLIER),
            (int)($packageWeight + strlen($address) * self::COST_MULTIPLIER),
        );
    }
}

Tutaj już trochę się dzieje 🙂 W przetwarzanej obecnie komendzie mamy zapisane podstawowe informacje dotyczące paczki, w tym jej identyfikator. Na jego podstawie szukamy w repozytorium, czy gdzieś wcześniej nie została już wyznaczona trasa dla tej paczki. Jeżeli została wyznaczona, to korzystamy z niej. Jeżeli nie – tworzymy nowy egzemplarz encji. Wyliczenia, które gdzieś tam po drodze robimy są oczywiście nic nie warte 😉 Kiedy utworzymy mamy już encję trasy, to wysyłamy na szynę (nasłuchiwaną przez Warehouse) zdarzenie z odpowiednimi danymi.

Definicja serwisu tego command handlera wygląda tak:

# config/services.yaml

services:

    # ...

    App\CommandHandler\CreateRouteForPackageCommandHandler:
        arguments:
            - "@doctrine.orm.entity_manager"
            - "@app.event_bus"
        tags:
            - "messenger.message_handler"

Poza encją i handlerem komendy CreateRouteForPackageCommand nic więcej nie wchodzi w skład aplikacji RoutePlannera.

Konfigurujemy Messengera

Aby konfigurować, najpierw trzeba mieć co 🙂 Zatem na obydwu projektach potrzeba będzie uruchomić poniższy zestaw komend:

$ composer require symfony/messenger
$ composer require sroze/messenger-enqueue-transport
$ composer require enqueue/amqp-bunny

Każda paczka ma konkretną rolę w konfiguracji:

  • symfony.messenger wystepuje tutaj jako biblioteka, która daje nam abstrakcję związaną z dispatchowaniem wiadomości.
  • sroze/messenger-enqueue-transport, to paczka, która jest niejako rozszerzeniem Messengera – implementuje ona transport realizujący wysyłkę wiadomości za pomocą Enqueue – biblioteki dającej jednolitą abstrakcję dla wielu systemów kolejkowych. To właśnie dzięki Enqueue jesteśmy w stanie w relatywnie prosty sposób przełączyć się między różnego typu systemami kolejkowymi.
  • AMQP-Bunny jest driverem dającym nam możliwość nawiązania komunikacji z RabbitMQ przez protokół AMQP. Jest to zależność wymagana przez Enqueue do działania z Rabbitem. Co do samego AMQP, to należy pamiętać, że jest on szerszym pojęciem, które nie dotyczy wyłącznie Rabbita. Istnieje wiele różnych systemów kolejkowych opartych o ten protokół.

W dalszej kolejności potrzebujemy skonfigurować Messengera tak, aby każdy z projektów posiadał dwa rodzaje szyn:

  • outgoing – szyna, która będzie służyła do wysyłki wiadomości do drugiego systemu,
  • incoming – szyna, do której będzie uruchomiony skrypt konsumujący.

Konfiguracja dla aplikacji Warehouse wygląda następująco:

# config/packages/messenger.yaml

framework:
    messenger:
        default_bus: app.command_bus
        buses:
            app.command_bus: ~
            app.event_bus: 
                middleware:
                    - doctrine_transaction
        transports:
            incoming_transport:
                dsn: enqueue://default?queue[name]=warehouse&topic[name]=warehouse
                options:
                    delayStrategy: Enqueue\AmqpTools\RabbitMqDlxDelayStrategy
            outgoing_transport:
                dsn: enqueue://default?queue[name]=route_planner&topic[name]=route_planner
                options:
                    delayStrategy: Enqueue\AmqpTools\RabbitMqDlxDelayStrategy
        routing:
            App\Command\RoutePlanner\CreateRouteForPackageCommand: outgoing_transport
            App\Event\RoutePlanner\RouteHasBeenCreatedEvent: incoming_transport

A tutaj jest konfiguracja dla aplikacji RoutePlannera:

# config/packages/messenger.yaml

framework:
    messenger:
        default_bus: app.event_bus
        buses:
            app.command_bus: ~
            app.event_bus: 
                middleware:
                    - doctrine_transaction
        transports:
            incoming_transport:
                dsn: enqueue://default?queue[name]=route_planner&topic[name]=route_planner
                options:
                    delayStrategy: Enqueue\AmqpTools\RabbitMqDlxDelayStrategy
            outgoing_transport:
                dsn: enqueue://default?queue[name]=warehouse&topic[name]=warehouse
                options:
                    delayStrategy: Enqueue\AmqpTools\RabbitMqDlxDelayStrategy
        routing:
            App\Command\RoutePlanner\CreateRouteForPackageCommand: incoming_transport
            App\Event\RoutePlanner\RouteHasBeenCreatedEvent: outgoing_transport

Na pewno zauważyliście, że obydwie definicje różnią się ze sobą w bardzo drobnych detalach. W obydwu miejscach mamy transporty incoming_transport oraz outgoing_transport. Pomimo tego, że w obydwu przypadkach nazwy są takie same, to zauważymy, że wartości już nie. W tym wypadku wygląda to tak, że co jest outgoing w rozumieniu aplikacji Warehouse, to jest incoming w rozumieniu aplikacji RoutePlanner. I odwrotnie: to, co jest incoming dla Warehouse, w rozumieniu RoutePlannera jest outgoing. Jest tak, ponieważ jeden system podłącza się do szyny jako producer, a drugi podłącza się do tej samej szyny jako consumer.

Jeżeli mówimy o szynie, to musimy odpowiedzieć sobie na pytanie: czym jest szyna, i w jaki sposób koreluje ona z takimi nazwami jak queue oraz topic? Zacznijmy od środka; jako queue rozumiemy fizyczną kolejkę, która zostaje utworzona w systemie kolejkowym – w tym przykładzie na Rabbicie. Na kolejkę wysyłamy wiadomości, które są zserializowanymi obiektami klas komend i zdarzeń. Jako topic rozumiemy tutaj intencję, z którą dana wiadomość została wrzucona na tą konkretną kolejkę. Na jednej kolejce możemy mieć wiadomości z różnymi tematami. Jako transport definiujemy kombinację kolejek z tematami wiadomości, które się na nich znajdują. Do tego dochodzi to, że różne systemy mogą w różny sposób produkować i konsumować wiadomości w obrębie transportu. Na przykład możemy mieć dwa różne consumery, które konsumują na tej samej kolejce, ale wiadomości o innych tematach. Możemy również wysłać jedną wiadomość z dwoma tematami, które będą konsumowane przez dwa inne procesu. I to wszystko w sumie składa się na definicję szyny.

Na koniec mamy wspólną konfigurację paczki enqueue, którą wrzucamy do obydwu projektów:

# config/packages/enqueue.yaml

enqueue:
    default:
        transport: '%env(resolve:ENQUEUE_DSN)%'
        client: null

Dalej mamy również konfigurację DSN, którym enqueue będzie posługiwał się, do połączenia z Rabbitem (wrzucamy zarówno do Warehouse, jak i RoutePlannera):

# .env.local

###> enqueue/enqueue-bundle ###
ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f
###< enqueue/enqueue-bundle ###

Sprawdźmy, czy to działa! 🙂

Zanim zaczniemy bawić się w consumery, to musimy uruchomić kilka komend (per projekt), które standardowo uruchamiamy na projekcie w Symfony:

$ bin/console cache:clear
$ bin/console doctrine:database:create
$ bin/console doctrine:schema:create

Skoro mamy już wszystko działające, to teraz trzeba sprawdzić, czy całość działa. Aby to zrobić, będzie trzeba uruchomić trzy procesy:

  • serwer HTTP dla projektu Warehouse, abyśmy mogli uruchomić kontroler rozpoczynający cały proces,
  • skrypt konsumujący dla aplikacji RoutePlanner, który przejmie i przetworzy komendy wysyłane z projektu Warehouse,
  • skrypt konsumujący dla aplikacji Warehouse, który będzie pobierał i przetwarzał zdarzenia, które zostaną odesłane z powrotem z RoutePlannera do Warehouse.

I ile z odpaleniem serwera raczej nie powinniście mieć problemu, to poniżej zamieszczam komendę, którą należy uruchomić dwukrotnie – po jednym razie na projekt:

$ bin/console messenger:consume incoming_transport -vvv

Teraz możemy wysłać żądanie HTTP do aplikacji Warehouse, które rozpocznie cały proces. Można to zrobić za pomocą Postmana lub Insomni. Jeżeli chcecie pobawić się przez termninal, to zamiast tego możecie odpalić CURLa:

curl --location --request POST 'localhost:8000/packages' \
--header 'Content-Type: application/json' \
--data '{
    "carId": 123,
    "weight": 15000,
    "address": "My street"
}'

Oczekiwanym efektem całości powinno być to, że w aplikacji Warehouse każdy wiersz będzie miał douzupełnione wartości, które zostają wygenerowane przez RoutePlannera.

Dawaj to na repo! 🙂

W sumie, mógłbym całość wrzucić na repozytorium Gildii, aczkolwiek raczej tego nie zrobię. Chciałbym przez to zachęcić Was do przejścia całego tego flow samodzielnie, co na pewno pozytywnie wpłynie na zrozumienie zależności wszystkich tych kroków 😉

Profesjonalista, zajmujący się na co dzień aplikacjami biznesowymi w ekosystemie PHP. Jego pasją jest odkrywanie nowych konceptów programistycznych oraz wzorce architektoniczne. Uwielbia również pisać testy, gdyż jak sam uważa, dobry kod to przetestowany kod.

Comments are closed.