To, że Symfony Messenger jest niezastąpiony, wszyscy wiedzą. Za to, jak go skonfigurować – niekoniecznie. Z tego powodu właśnie powstał dzisiejszy post. Skonfigurujmy razem Messengera, aby przeprocesował komendę asynchronicznie! 🙂

Command Pattern zawsze na propsie

Jedną z zalet pracy z Symfony Messengerem jest to, że nasz kod układa się w bardzo ciekawy schemat komend, które będąc przez nas wydanymi, zostają przez system przeprocesowane. Logika, którą mamy w aplikacji staje się reużywalna; tą samą komendę możemy wykorzystać zarówno w kontekście HTTP (przy obsłudze formularza, w API) oraz w CLI. Dodatkowo, jeżeli dobrze się zakręcimy, to możemy zoptymalizować naszą aplikację, zlecając jej wykonanie mniej krytycznych czasowo operacji do tzw. backgroundu, czyli wykonania asynchronicznego.

Samo skonfigurowanie Symfony Messengera w aplikacji nie jest niczym skomplikowanym. Schody zaczynają się właśnie wtedy, kiedy chcemy pobawić się w konfigurację asynchronicznych transportów. Zwłaszcza, kiedy nie mamy jeszcze doświadczenia z systemami kolejkowymi. Dlatego właśnie przejdziemy przez ten proces tu – razem 😉

Postawmy sobie kolejkę! 🙂

Ponieważ będziemy pracowali na systemie kolejkowym, to fajnie by było mieć gdzieś w systemie postawiony serwis obsługujący kolejki. Nie chciałbym się skupiać nad tym, które kolejki warto postawić. Dla mnie wystarczy zwykły RabbitMQ na Dockerze:

# docker-compose.rabbitmq.yaml

version: "3.4"
services:
  rabbitmq:
    image: rabbitmq:3-management-alpine
    ports:
        - 5672:5672
        - 15672:15672
    volumes:
        - rabbit-data:/var/lib/rabbitmq
        - rabbit-log:/var/log/rabbitmq

volumes:
  rabbit-data:
  rabbit-log:

To wyżej, to jest plik konfiguracyjny docker-compose. Aby uruchomić Rabbita na jego podstawie, trzeba skorzystać z komendy:

$ docker-compose -f ./docker-compose.rabbitmq.yaml up

Nie wiem, jak Wy, ale ja w taki sposób odpalam sobie mnóstwo zewnętrznych serwisów: Redisa, Elastic Searcha, Mailhoga, czasami nawet MySQLa. Jest to fajne rozwiązanie, kiedy przełączamy się między mnóstwem projektów, które do developmentu potrzebują różnych serwisów w różnych wersjach.

Teraz popracujmy na Symfony…

Wszystko, co przedstawiam poniżej, można wyklikać sobie na czystym Symfony. Jeżeli nie chcecie brudzić sobie swojego pobocznego projektu, to postawcie sobie nową apkę.

Aplikacja, którą tworzymy ma proste zadanie – stworzyć endpoint, którego wywołanie ma wrzucić wiadomość na kolejkę, do której będzie podpięty skrypt konsumujący wiadomości. Sama wiadomość ma dotyczyć tworzenia nowego produktu, jeżeli ten nie istnieje.

Do pracy będzie potrzebna bardzo prosta encja produktu:

<?php

declare(strict_types=1);

namespace App\Entity;

class Product
{
    protected ?int $id;

    public function __construct(
        protected string $sku,
        protected string $name,
        protected int $price,
    ) {

    }

    public function getId(): ?int
    {
        return $this->id;
    }

    public function getSku(): string
    {
        return $this->sku;
    }

    public function getName(): string
    {
        return $this->name;
    }

    public function getPrice(): int
    {
        return $this->price;
    }
}

Konfiguracja mapowania naszej encji ma się następująco:

<?xml version="1.0" encoding="UTF-8"?>
<doctrine-mapping xmlns="http://doctrine-project.org/schemas/orm/doctrine-mapping"
                  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                  xsi:schemaLocation="http://doctrine-project.org/schemas/orm/doctrine-mapping
                                      http://doctrine-project.org/schemas/orm/doctrine-mapping.xsd">

    <entity name="App\Entity\Product" table="product">
        <id name="id" type="integer" column="id">
            <generator strategy="AUTO"/>
        </id>

        <field name="name" type="string" nullable="false" />
        <field name="sku" type="string" nullable="false" />
        <field name="price" type="integer" nullable="false" />
    </entity>
</doctrine-mapping>

Jak widać, rocket science tutaj nie ma. Trzy pola na krzyż plus identyfikator. Tak na prawdę, to chodzi o to, aby mieć jakiś zasób, który możemy utworzyć, aby stwierdzić, że skrypt konsumujący działa dobrze. Tego, w jaki sposób podpiąć mapowanie encji pod Doctrine tłumaczyć nie będę – zakładam, że tego typu wiedza znajdzie się gdzieś w głowie, albo w internecie.

Teraz tworzymy komendę, czyli wiadomość, która zostanie wysłana na kolejkę:

<?php

declare(strict_types=1);

namespace App\Command;

final class CreateNewProductCommand
{
    public function __construct(
        readonly private string $sku,
        readonly private string $name,
        readonly private int $price,
    ) {

    }

    public function getSku(): string
    {
        return $this->sku;
    }

    public function getName(): string
    {
        return $this->name;
    }

    public function getPrice(): int
    {
        return $this->price;
    }
}

Ponieważ Symfony Messenger domyślnie serializuje wiadomości za pomocą funkcji serialize / unserialize, polecam korzystać z typów prostych, wbudowanych w PHPa. Do tego, w ślad za sentencją „Raz wydanej komendy nie można zmienić” – setterów nie implementujemy 🙂 .

Następnym krokiem będzie utworzenie klasy handlera, który zazwyczaj będzie entrypointem do logiki biznesowej:

<?php

declare(strict_types=1);

namespace App\CommandHandler;

use App\Command\CreateNewProductCommand;
use App\Entity\Product;
use Doctrine\ORM\EntityManagerInterface;
use RuntimeException;

final class CreateNewProductCommandHandler
{
    public function __construct(
        readonly private EntityManagerInterface $entityManager,
    ) {

    }

    public function __invoke(CreateNewProductCommand $command): void
    {
        $repository = $this->entityManager->getRepository(Product::class);

        $sku = $command->getSku();
        $existingProduct = $repository->findOneBy(['sku' => $sku]);
        if ($existingProduct !== null) {
            throw new RuntimeException(sprintf('Can\'t create product with SKU %s', $sku));
        }

        $newProduct = new Product($sku, $command->getName(), $command->getPrice());
        $this->entityManager->persist($newProduct);
    }
}

Ponieważ u nas logika jest ultra prosta, to zawarłem ją całą w handlerze. Możecie zauważyć, że nie mamy tutaj żadnego wykonania metody flush() na obiekcie Entity Managera. To nie jest pomyłka – ten temat załatwimy gdzie indziej 🙂

Dalej tworzymy kontroler, który przechwyci żądanie, a następnie utworzy obiekt komendy i wrzuci go na szynę zdefiniowaną w konfiguracji Messengera:

<?php

declare(strict_types=1);

namespace App\Controller;

use App\Command\CreateNewProductCommand;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;

final class CreateNewProductController
{
    public function __construct(
        readonly private MessageBusInterface $commandBus,
    ) {

    }

    public function __invoke(Request $request): Response
    {
        $payload = $request->getPayload();
        $sku = $payload->get('sku', 'no-sku');
        $name = $payload->get('name', 'no-name');
        $price = (int)$payload->get('price', 0);

        $this->commandBus->dispatch(
            new CreateNewProductCommand($sku, $name, $price),
        );

        return new JsonResponse(['queued' => true]);
    }
}

A teraz trochę konfiguracji…

Kodzik mamy za sobą, teraz pobawmy się w konfigurowanie. Na pewno będziemy potrzebowali definicję dwóch serwisów: kontrolera, oraz handlera.

# config/services.yaml

services:

    # ...

    App\Controller\CreateNewProductController:
        arguments:
            - "@app.command_bus"
        public: true

    App\CommandHandler\CreateNewProductCommandHandler:
        arguments:
            - "@doctrine.orm.entity_manager"
        tags:
            - "messenger.message_handler"

Kontroler jako argument potrzebuje instancję Message Busa. Tak się składa, że na podstawie swojej konfiguracji, Symfony Messenger tworzy w locie serwisy odpowiadające zdefiniowanym Message Busom. Nazwa serwisu jest taka sama, co skonfigurowana nazwa Message Busa. Do tego, ze względu na wymogi Symfony – serwis kontrolera konfigurujemy jako publiczny.

Oprócz kontrolera definiujemy również serwis dla Command Handlera. Argument, który jest potrzebny ze względu na naszą logikę biznesową – Entity Manager – wstawiamy w dosyć domyślny sposób. Oprócz tego, musimy jakoś skonfigurować nasz Command Handler, aby Messenger o nim wiedział. Po to właśnie konfigurujemy tag messenger.message_handler. Resztę, czyli to, do jakiej komendy ten Handler jest przypisany – Messenger sam sobie wyciągnie.

Oprócz serwisów konfigurujemy również routing:

# config/routes.yaml

# ...

products_post:
    path: /products
    methods: [POST]
    defaults:
        _controller: App\Controller\CreateNewProductController

Podsumowując: mamy endpoint doczepiony do kontrolera, który wrzuca wiadomość na szynę. Mamy dla tej szyny skonfigurowaną klasę Handlera. Czego nam tutaj brakuje? A no tak, samego Messengera.

Wisienka na torcie – konfiguracja Symfony Messengera

Do obsługi Rabbita będą nam potrzebne trzy paczki:

$ composer require symfony/messenger
$ composer require sroze/messenger-enqueue-transport
$ composer require enqueue/amqp-bunny

Symfony Messenger wystepuje tutaj jako biblioteka, która daje nam abstrakcję związaną z dispatchowaniem wiadomości. Dalej mamy paczkę sroze/messenger-enqueue-transport, która jest rozszerzeniem Messengera – implementuje ona transport realizujący wysyłkę wiadomości przez Enqueue – bibliotekę dającą jednolitą abstrakcję dla wielu systemów kolejkowych. To właśnie dzięki Enqueue jesteśmy w stanie w relatywnie prosty sposób przełączyć się między różnego typu systemami kolejkowymi.

Na koniec, AMQP-Bunny występuje tutaj jako driver dający nam możliwość nawiązania komunikacji z RabbitMQ przez protokół AMQP. Jest to zależność wymagana przez Enqueue do działania z Rabbitem. Co do samego AMQP, to należy pamiętać, że jest on szerszym pojęciem, które nie dotyczy wyłącznie Rabbita. Istnieje wiele różnych systemów kolejkowych opartych o ten protokół.

Tyle o samych paczkach, przechodzimy teraz do ich konfiguracji. Najpierw konfigurujemy DSN, po którym Enqueue będzie komunikował się z Rabbitem. Robimy to w miejscu, gdzie definiujemy zmienne środowiskowe:

# .env.local

###> enqueue/enqueue-bundle ###
ENQUEUE_DSN=amqp://guest:guest@localhost:5672/%2f
###< enqueue/enqueue-bundle ###

Następnie mamy domyślną konfigurację Enqueue, gdzie wykorzystujemy wyżej wspomnianą zmienną środowiskową ENQUEUE_DSN:

# config/packages/enqueue.yaml

enqueue:
    default:
        transport: '%env(resolve:ENQUEUE_DSN)%'
        client: null

Dalej konfigurujemy Messengera. Tworzymy Message Bus o nazwie app.command_bus, do którego dopinamy middleware doctrine_transaction, który opakowuje nasze wiadomości w transakcję bazodanową. Sam w sobie Messenger nie posiada tego Middleware. Jest on dostępny z poziomu paczki symfony/doctrine-bridge.

# config/packages/messenger.yaml

framework:
    messenger:
        default_bus: app.command_bus
        buses:
            app.command_bus:
                middleware:
                    - doctrine_transaction
        transports:
            async_transport:
                dsn: enqueue://default
                options:
                    delayStrategy: Enqueue\AmqpTools\RabbitMqDlxDelayStrategy
        routing:
            App\Command\CreateNewProductCommand: async_transport

Oprócz definicji Message Busa, mamy definicję transportu. Na pewno Waszą uwagę zwróci opcja delayStrategy. I na pewno zadacie sobie (mi) pytanie, po co nam tutaj ta opcja. Dodałem ją, bo gdzieś pod spodem Enqueue w sposób domyślny wykorzystuje opóźnienia w odbiorze wiadomości, przy wykorzystaniu RabbitMqDelayPluginDelayStrategy. Niestety, ten feature jest dostępny po doinstalowaniu odpowiedniego pluginu do Rabbita. Miałem do wyboru albo bawić się ze snippetem docker-compose z początku posta, albo – wyłączyć to tutaj.

No to co, uruchamiamy?

Aby uruchomić całość, potrzebujemy dwóch rzeczy: działającego skryptu konsumującego oraz wywołania utworzonego wcześniej endpointu. Skrypt konsumujący dostarcza nam Messenger przy pomocą komendy:

$ bin/console messenger:consume async_transport

A request na endpoint puszczamy chociażby w terminalu:

curl --location --request POST 'localhost:8000/products' \
--header 'Content-Type: application/json' \
--data '{
    "name": "Test",
    "sku": "test",
    "price": 123
}'

Jeżeli nie chcecie bawić się CURLem w konsoli, to polecam Postmana lub Insomnię.

Mamy to! 🙂

Chciałem napomknąć, że dzisiejszy wpis miał za zadanie przedstawić jedynie koncept, który nie powinien być wykorzystywany produkcyjnie, bez uprzedniego zbadania tematu i dostosowania konfiguracji użytych bibliotek do swoich potrzeb.

Aha, no i zapomniałbym. Dwie lekcje, które należy odrobić, aby żyło nam się lepiej:

  1. Pomimo, że w Messengerze można, to nie zwracaj nic z klasy Command Handlera. Jeżeli cokolwiek będzie uzależnione od tej wartości zwróconej, to przeniesienie wykonania do backgroundu serwerowego może okazać się trudne, bądź nawet niemożliwe do wykonania.
  2. Tworząc komendy pracuj na typach prostych. Im prostsza serializacja, tym lepiej. A już na pewno nie wrzucaj tam encji ani innego rodzaju obiektów, które przetrzymują połączenia, otwarte deskryptory do plików itp. Zamiast tego, wrzucaj do komend takie dane, które pozwolą Ci na nowo zbudować ten obiekt.

Profesjonalista, zajmujący się na co dzień aplikacjami biznesowymi w ekosystemie PHP. Jego pasją jest odkrywanie nowych konceptów programistycznych oraz wzorce architektoniczne. Uwielbia również pisać testy, gdyż jak sam uważa, dobry kod to przetestowany kod.

Comments are closed.