G'day:
Context
What am I on about with that title? OK, let me explain.
In a few gigs I had in the past, we've had an issue of there being too much work to do to fulfil a request than we could allow to let run for a web app. Say there's half a dozen things to do when updating a record, and collectively they take 10sec to run: we can't have the user sitting around for 10sec to get the next call to action. This is made more annoying when most of that overhead is not actually needed for the call to action to be provided; it's (necessary) housekeeping to keep the application working, rather than to keep the user working. The user should not have to wait whilst the app gets its shit together, basically.
Previously I've seen this being solved in a number of ways:
- Hide the processing in separate threads spun-off by the main request thread. This improves things for the user, but it doesn't lighten the load for the web app at all. And it's also at this point questionable as to whether this is actually the web app's job. It's clearly nothing to do with fulfilling web requests, if it's being done in the background.
- Set some sort of work data somewhere, that indicates some processing of it needs to be done. Then have a separate process running tasks that check for the existence of work data, and process it if there is any. This is like a hand-cranked queuing system which operates via a task going "got anything for me? No. How about now? No. Now? Ooh yes, got one. What about now? No?". It works, but there's a lot of time having some code asking a question when the answer is "no". For most cases, we know when the work needs to be done: when the code that identifies the work says so. That's when it needs to be done.
So that's what I'm trying to solve. I've never had to solve this sort of thing before, but I have a suspicion that it's gonna involve a message queuing system.
Situation
The example I am going to use is the Elasticsearch integration I put into my test site over the last coupla days:
- Integrating Elasticsearch into a Symfony app
- Making sure this app also deletes from Elasticsearch when I delete an entity
The first article is long; the second one is very short for me. In summary I have a test app that provides data-entry forms for a number of DB-backed entities. For the purposes of the search functionality, I am using Elasticsearch. And the articles shows how to keep Elasticsearch up to date at the same time as the DB updates are made.
Task at hand
Unfortunately for this exercise, the Elasticsearch operations are lightning quick, so it's really no issue to have the code execution "inline" with the web requests. But this processing falls foul of something I touched on above:
[That] overhead is not actually needed for the call to action to be provided[…]. The user should not have to wait whilst the app gets its shit together[.]
So I'm going to ignore the premature optimisation red flag, and am going to farm the Elasticsearch updates off to some other process that the web request doesn't need to worry about.
My chosen path here is to use the Symfony Messager Component, and ultimately have that working with RabbitMQ under the hood so I can farm-off the processing to a completely different app container, so as to reduce load on the web-app container.
I have never done any of this before, and I have to admit that I am currently struggling to even understand the docs. So this should… errr… be "interesting". Or a dumpster fire. One of those.
Spoiler
I do not get to the point of introducing RabbitMQ in this article. It was enough effort to get the messaging stuff working "in-process", so I'm splitting this exercise into 2-3 chunks.
Logging
To start with, I'm just gonna try to get messages and handlers working in-process via Symfony. So no performance gains, no RabbitMQ, and no second app container. And to watch things working (or not), my aim is to simply get a handler to log something if it receives a message. To do this I'm gonna need to install Monolog, which is straight forward.
# config/packages/monolog.yaml
monolog:
channels:
- messaging
handlers:
messaging:
type: stream
path: '%kernel.logs_dir%/messaging.log'
level: debug
channels: ['messaging']
And test it:
// tests/Functional/System/MonologTest.php
namespace App\Tests\Functional\System;
use Monolog\Handler\TestHandler;
use Monolog\Level;
use PHPUnit\Framework\Attributes\TestDox;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
class MonologTest extends KernelTestCase
{
#[TestDox('It logs messages to the messaging log file')]
public function testMessagingLog()
{
self::bootKernel();
$container = self::$kernel->getContainer();
$logger = $container->get('monolog.logger.messaging');
$testHandler = new TestHandler();
$logger->pushHandler($testHandler);
$uniqueMessage = 'Test message ' . uniqid('', true);
$logger->info($uniqueMessage);
$this->assertTrue($testHandler->hasInfoRecords());
$this->assertTrue($testHandler->hasRecord($uniqueMessage, Level::Info));
$records = $testHandler->getRecords();
$infoRecords = array_filter($records, fn($r) => $r['level'] === Level::Info->value);
$lastInfoRecord = end($infoRecords);
$this->assertEquals($uniqueMessage, $lastInfoRecord['message']);
}
}
That's working fine.
Symfony Messenger
Installation is obvious:
That spews out a bunch of stuff, all simply informational. It's also installed a config/packages/messenger.yaml, but there's no actual settings in it. According to the informational stuff, we're good to go now.
Further up I linked to the docs for the Symfony Messager Component. Those are a good example of docs written for the author of the docs; or for someone who already knows what the docs are saying. They're largely impenetrable to me, someone who needs the docs to find out how the thing works because I don't already know. So many technical documents are written for the wrong audience like this. However I googled further and found these instead: Messenger: Sync & Queued Message Handling. These docs make sense.
The system works in two parts:
- Message
- A class that represents something needing to be done, and contains the data needed to do that thing.
- MessageHandler
- A class that is the handler for that sort of Message. When a Message is dispatched to the message bus, it's the MessageHandler that actually [does the thing].
In my case:
- Message
- The SearchIndexer's event handlers need some indexing work done by Elasticsearch. They will dispatch a Message with the data needing to be processed (add/update/delete). They will no longer do the actual indexing.
- MessageHandler
- When a Message is dispatched, Symfony will give the Message to the correct MessageHandler to process. The indexing code will be shifted to here.
Logging via messaging
But before I run, I need to walk. Before breaking my indexing by messing around for it, I'm going to create a Message that is a message to log; and a MessageHandler to log it.
// src/Message/LogMessage.php
namespace App\Message;
use Monolog\Level;
class LogMessage
{
public function __construct(
private readonly string $message,
private readonly Level $level = Level::Info,
private readonly array $context = []
)
{
}
public function getMessage(): string
{
return $this->message;
}
public function getLevel(): Level
{
return $this->level;
}
public function getContext(): array
{
return $this->context;
}
}
This Message takes the fields that we want to log. The docs say something enigmatic:
There are no specific requirements for a message class, except that it can be serialized
I mean that's fine, but it's not like their example implements the Serializable interface like one might expect from that guidance? From their example, I can only assume they mean "stick some getters on it", which is not really the same thing. Oh well.
Anyway, that's why I have those getters in there.
Now the MessangeHandler:
// src/MessageHandler/LogMessageHandler.php
namespace App\MessageHandler;
use App\Message\LogMessage;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class LogMessageHandler
{
public function __construct(
private readonly LoggerInterface $messagingLogger,
) {
}
public function __invoke(LogMessage $message): void
{
$this->messagingLogger->log(
$message->getLevel(),
$message->getMessage(),
$message->getContext()
);
}
}
The trick here is the AsMessageHandler attribute which makes sure the autowiring system knows what to do with this; and then the magic __invoke method does the work.
Having done that, I tail the log file, and make an edit to one of my entities:
$ docker exec -it php tail -f /var/log/symfony/messaging.log [2025-07-25T15:37:45.699028+01:00] messaging.INFO: SearchIndexer: postUpdate {"entity":{"App\\Entity\\Institution":{"id":193,"name":"Alenestad WootyWoo Polytechnic","address":"9319 King WootyWoo Suite 361Kerlukeshire, LA 00306","city":"Alenestad WootyWoo","postalCode":"64378-8198","country":"Cocos (WootyWoo) Islands","establishedYear":1998,"type":"polytechnic","website":"https://example.com/accusamus-esse-natus-excepturi-consequuntur-culpa-ut-officia-asperiores"}}} []
And there we have a log entry going in. Cool. That worked. Easy.
I did actually have an issue the first time around. Before running anything from the UI, I had run that test to ensure logging was working. Because that's run from the shell, it runs as root… which means the log file gets created as root too. When I then triggered the message handling via the UI, the code was running as www-data (this is what php-fpm runs as), and it did not have perms to write to the log file, so it went splat. This just required me changing the ownership on the log file to www-data, and we were all good.
Elasticsearch indexing via Messaging
OK, time to do it for realsies.
Looking at what I need to do, I have three different indexing events to deal with: adding a record on postPersist; updating a record on postUpdate, and deleting a record on preRemove. Each handler method receives different arguments, so in a way I wonder if these are three different Messages? On the other hand, I could use one Message and have an "action" property which is then used in the MessageHandler to decide what indexing task to run. The latter sounds like less code; but it also seems like subpar design, having to have a magic string on both ends of the messaging system to be matched against one another. Hrm.
…
…
OK, I can see how I can possibly make this not awful (low bar I'm setting myself there). How's this…
First I have an abstract Message class:
// src/Message/AbstractSearchIndexMessage.php
namespace App\Message;
use Doctrine\Persistence\Event\LifecycleEventArgs;
abstract class AbstractSearchIndexMessage
{
public function __construct(
private readonly LifecycleEventArgs $args
)
{ }
public function getArgs(): LifecycleEventArgs
{
return $this->args;
}
}
LifecycleEventArgs is the base class of all of those subclasses that are the args for each of the event handlers, eg:
public function postPersist(PostPersistEventArgs $args): void
// [...]
public function postUpdate(PostUpdateEventArgs $args): void
// [...]
public function preRemove(PreRemoveEventArgs $args): void
// [...]
Then we have three concrete implementations for each of the three Message types we have:
// src/Message/SearchIndexAddMessage.php
namespace App\Message;
class SearchIndexAddMessage extends AbstractSearchIndexMessage
{
}
// src/Message/SearchIndexUpdateMessage.php
namespace App\Message;
class SearchIndexUpdateMessage extends AbstractSearchIndexMessage
{
}
// src/Message/SearchIndexDeleteMessage.php
namespace App\Message;
class SearchIndexDeleteMessage extends AbstractSearchIndexMessage
{
}
No code duplication. The sub-classing is just to give us unique Messages to dispatch and then handle.
We can deal with all of those in one MessageHandler class:
// src/MessageHandler/SearchIndexMessageHandler.php
namespace App\MessageHandler;
use App\Message\SearchIndexAddMessage;
use App\Message\SearchIndexDeleteMessage;
use App\Message\SearchIndexUpdateMessage;
use App\Service\ElasticSearchIndexerService;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
class SearchIndexMessageHandler
{
public function __construct(
private readonly ElasticSearchIndexerService $searchIndexer,
) {
}
#[AsMessageHandler]
public function handleAdd(SearchIndexAddMessage $message): void
{
$this->searchIndexer->sync($message->getArgs()->getObject());
}
#[AsMessageHandler]
public function handleUpdate(SearchIndexUpdateMessage $message): void
{
$this->searchIndexer->sync($message->getArgs()->getObject());
}
#[AsMessageHandler]
public function handleRemove(SearchIndexDeleteMessage $message): void
{
$this->searchIndexer->delete($message->getArgs()->getObject());
}
}
Unlike the LogMessageHandler we had before where the entire class was marked as #[AsMessageHandler], here I'm tagging individual methods. And this is why we needed the separate Message classes: Symfony uses them for autowiring: see how each handler method expects its $args to be one of those subclasses, as appropriate. This is how which handler method to call for each Message type is identified.
You possibly noticed that I'm introducing a new service class here: ElasticSearchIndexerService:
namespace App\Service;
use App\Entity\SyncableToElasticsearch;
use Symfony\Component\Routing\Generator\UrlGeneratorInterface;
class ElasticSearchIndexerService
{
public function __construct(
private readonly ElasticsearchAdapter $adapter,
private readonly UrlGeneratorInterface $urlGenerator,
)
{
}
public function sync(object $object): void
{
if (!$object instanceof SyncableToElasticsearch) {
return;
}
$doc = $object->toElasticsearchDocument();
$doc['body']['_meta'] = [
'type' => $object->getShortName(),
'title' => $object->getSearchTitle(),
'url' => $this->urlGenerator->generate(
$object->getShortName() . '_view',
['id' => $object->getId()]
),
];
$this->adapter->indexDocument($doc['id'], $doc['body']);
}
public function delete(object $object): void
{
if (!$object instanceof SyncableToElasticsearch) {
return;
}
$this->adapter->deleteDocument($object->getElasticsearchId());
}
}
This code was in SearchIndexer (1.2) before, because that's where it was needed. Now the indexer is much more simple:
// src/EventListener/SearchIndexer.php (1.3)
namespace App\EventListener;
use App\Message\SearchIndexAddMessage;
use App\Message\SearchIndexDeleteMessage;
use App\Message\SearchIndexUpdateMessage;
use Doctrine\Bundle\DoctrineBundle\Attribute\AsDoctrineListener;
use Doctrine\ORM\Event\PostPersistEventArgs;
use Doctrine\ORM\Event\PostUpdateEventArgs;
use Doctrine\ORM\Event\PreRemoveEventArgs;
use Doctrine\ORM\Events;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsDoctrineListener(event: Events::postPersist, priority: 500, connection: 'default')]
#[AsDoctrineListener(event: Events::postUpdate, priority: 500, connection: 'default')]
#[AsDoctrineListener(event: Events::preRemove, priority: 500, connection: 'default')]
class SearchIndexer
{
public function __construct(
private readonly MessageBusInterface $bus
) {}
public function postPersist(PostPersistEventArgs $args): void
{
$indexMessage = new SearchIndexAddMessage($args);
$this->bus->dispatch($indexMessage);
}
public function postUpdate(PostUpdateEventArgs $args): void
{
$indexMessage = new SearchIndexUpdateMessage($args);
$this->bus->dispatch($indexMessage);
}
public function preRemove(PreRemoveEventArgs $args): void
{
$indexMessage = new SearchIndexDeleteMessage($args);
$this->bus->dispatch($indexMessage);
}
}
All it does is listen for the Doctrine events, and dispatch relevant messages. It doesn't know anything about how to index stuff, it just knows how ask "someone needs to do this, please".
And the only other thing I needed to do was to update the ReindexSearchCommand to use the ElasticSearchIndexerService rather than SearchIndexer, given I had moved that code.
Having done that lot: everything work same as before. So not much forward progress, but our ducks are far better lined-up ready for the next bit of the work.
That was a chunk of work, code and writing. I'm going to pause this exercise here and publish this article as it seems like a reasonable juncture to do so. Plus I want a beer. I have some thinking to do before I introduce RabbitMQ into this, and to use a different PHP container to run the code for the message handling. And I think there will be a code refactoring exercise first. However I am not sure yet, and need to sleep on it.
Righto.
--
Adam