Showing posts with label Symfony Messaging. Show all posts
Showing posts with label Symfony Messaging. Show all posts

Wednesday, 20 August 2025

Symfony Scheduler: Handling task failures properly

G'day:

Right, so a couple of weeks ago we built a complete database-driven scheduled task system for Symfony. We got dynamic configuration through a web interface, timezone handling, working days filtering, execution tracking, and - the real kicker - a worker restart mechanism that actually updates running schedules when users change tasks without redeploying anything.

Then last week we debugged all the spectacular bugs in that implementation - Doctrine gotchas, timezone configuration self-sabotage, entity detachment through message queues, and every other way you can break an ORM if you set your mind to it.

All working perfectly. Execution tracking shows when tasks last ran and what happened. Users can configure schedules through a proper web interface. The whole system updates dynamically without manual intervention. Job done, time to move on to the next thing, right?

Well, not quite. Turns out we'd built a lovely scheduling system that could run tasks reliably and track their execution, but we'd forgotten to implement something rather important: what happens when tasks actually fail?

Our system would dutifully run a task every 30 seconds, log when it completed successfully, update the execution tracking data, and carry on to the next one. But if a task failed? It would log the error, then cheerfully schedule it to run again in another 30 seconds. And again. And again. Forever.

No failure limits, no automatic deactivation, no "maybe we should stop trying this after it's failed a dozen times" logic. Tasks could fail endlessly without consequence, which is not as helpful as it could be in a production scheduling system.

What followed was an afternoon of learning exactly why Doctrine events and database transactions don't play nicely with worker restarts, and discovering that sometimes the obvious solution really is the best one - if you can stop yourself from overthinking it.

The missing piece: what happens when tasks fail?

So what exactly had we forgotten to implement? Failure handling. We'd built all the infrastructure for running tasks and tracking their execution, but we'd never actually defined what should happen when a task fails repeatedly.

Our AbstractTaskHandler was doing comprehensive logging when tasks failed:

} catch (Throwable $e) {
    $this->tasksLogger->error('Task failed', [
        'task_id' => $taskId,
        'task_type' => $this->getTaskTypeFromClassName(),
        'error' => $e->getMessage(),
        'exception' => $e
    ]);
    throw $e;
}

So we knew when tasks were failing. The logs showed every error in detail. But none of that information was being used to make any decisions about what to do next. The task would just get scheduled to run again at its normal interval, fail again, get logged again, and repeat the cycle indefinitely.

In a production system, you need some kind of circuit breaker logic. If a task fails three times in a row, maybe there's something fundamentally wrong and it shouldn't keep trying. Maybe the external API it's calling is down, or there's a configuration issue, or the task itself is buggy. Continuing to hammer away every 30 seconds just wastes resources and fills up your logs with noise.

The obvious solution seemed straightforward: track how many times each task has failed consecutively, and automatically deactivate tasks that hit a failure threshold. Keep a failureCount in the execution tracking data, increment it on failures, reset it on success, and disable the task when it hits 3.

Simple business logic. How hard could it be to implement?

Turns out, quite hard. Because implementing failure handling properly meant diving head-first into the murky waters of Doctrine events, database transactions, and worker restart timing. What should have been a 20-minute addition turned into an afternoon of debugging increasingly creative ways for the system to break itself.

The obvious solution that wasn't so obvious

The implementation plan seemed dead simple. We already had a TaskExecution entity for tracking execution data, so we just needed to add a failureCount field:

#[ORM\\Column(nullable: false, options: ['default' => 0])]
private ?int $failureCount = 0;

Then update the AbstractTaskHandler to increment the failure count on errors and reset it on success:

try {
    $this->handle($task);
    
    // Reset failure count on success
    $execution->setFailureCount(0);
    $this->updateTaskExecution($task, $startTime, $executionTime, 'SUCCESS');
    
} catch (Throwable $e) {
    // Increment failure count
    $currentFailures = $execution->getFailureCount() + 1;
    $execution->setFailureCount($currentFailures);
    
    // Deactivate task after 3 failures
    if ($currentFailures >= 3) {
        $task->setActive(false);
        $this->entityManager->persist($task);
    }
    
    $this->updateTaskExecution($task, $startTime, $executionTime, 'ERROR: ' . $e->getMessage());
    throw $e;
}

Dead straightforward. Count failures, reset on success, deactivate after three strikes. The kind of logic you'd expect to find in any robust scheduling system.

We tested it with a task that was guaranteed to fail - threw an exception every time it ran. First failure: count goes to 1, task keeps running. Second failure: count goes to 2, still active. Third failure: count goes to 3, task gets deactivated and disappears from the schedule.

Perfect. Except for one small problem: the task didn't actually disappear from the schedule.

The active field got updated in the database correctly. The failure count was tracking properly. But the running scheduler kept trying to execute the task every 30 seconds, completely ignoring the fact that we'd just deactivated it. The worker would dutifully run the failed task again, see it was supposed to be inactive, increment the failure count to 4, try to deactivate it again, and carry on in an endless loop.

The problem was timing. We were updating the task configuration during task execution, which should have triggered a schedule reload so the worker would pick up the change. But the reload was happening at exactly the wrong moment, creating a race condition that turned our elegant failure handling into an infinite loop.

The Doctrine event problem

The issue was with our existing worker restart mechanism. We'd been using a TaskChangeListener that listened for Doctrine events and triggered schedule reloads whenever task configuration changed:

#[AsDoctrineListener(event: Events::postUpdate)]
#[AsDoctrineListener(event: Events::postPersist)]
#[AsDoctrineListener(event: Events::postRemove)]
class TaskChangeListener
{
    private function handleTaskChange($entity): void
    {
        if (!$entity instanceof DynamicTaskMessage) {
            return;
        }

        $this->tasksLogger->info('Task change detected, triggering worker restart');
        $this->triggerWorkerRestart();
    }
}

This worked perfectly when users updated tasks through the web interface. Change a schedule from "every 5 minutes" to "every 30 seconds", hit save, and within a few seconds the new schedule was live and running.

But when our failure handling logic updated the active field on a DynamicTaskMessage, it triggered the same listener. So the sequence became:

  1. Task fails for the third time in the handler
  2. Handler sets active = false and saves the entity
  3. Doctrine postUpdate event fires
  4. TaskChangeListener triggers a worker restart
  5. Worker restart happens while the handler is still running

That last step was the killer. The postUpdate event fires while you're still inside the database transaction that's updating the task. The worker restart spawns a new process that tries to read the updated task configuration, but the transaction hasn't committed yet. So the new worker process sees the task as still active, thinks it's overdue (because it just "failed" but the schedule hasn't been updated), and immediately runs it again.

Meanwhile, the original handler finishes its transaction and commits the active = false change. But it's too late - the new worker is already executing the task again with the old data, which will fail again, increment the failure count that it thinks is still 2, try to deactivate the task again, trigger another restart, and round we go.

Transaction isolation nightmare. The event system was designed for "fire and forget" notifications, not "coordinate complex multi-process state changes". We needed the worker restart to happen after the transaction committed, not during it.

postFlush: when the cure is worse than the disease

The "obvious" fix was to use postFlush events instead of postUpdate. The postFlush event fires after Doctrine commits all pending changes to the database, so there's no transaction timing issue. Perfect!

Except for one small problem: postFlush events don't tell you which entities were updated. The event just says "something got flushed to the database", but you have no idea what that something was.

So we'd end up with worker restarts triggered by every single database write in the entire application. User updates their profile? Worker restart. Product price gets updated? Worker restart. Log entry gets written? Worker restart. Session data gets saved? Worker restart.

In a typical web application, database writes happen constantly. Every page load, every form submission, every background process touching the database would trigger a schedule reload. We'd have workers restarting dozens of times per minute, which is roughly the opposite of what you want from a stable scheduling system.

We tried a few approaches to work around this:

  • Track entity changes manually - store a list of modified entities during the request, check it in the postFlush handler. Complicated and error-prone.
  • Use unit of work change sets - inspect Doctrine's internal change tracking to see what actually changed. Fragile and dependent on internal APIs.
  • Custom flush operations - separate the task updates from other database operations. Architectural nightmare.

All of these solutions were more complex than the original problem. We were trying to hack around the fundamental limitation that postFlush events give you the right timing but no entity context, while postUpdate events give you entity context but the wrong timing.

Doctrine events just weren't going to work for this use case. The combination of "need entity-specific filtering" + "need post-transaction timing" + "avoid restart loops from execution updates" was impossible to solve cleanly with the lifecycle events.

Time to try a completely different approach.

The message bus epiphany

After wrestling with Doctrine events for the better part of an afternoon, we stepped back and had one of those "hang on a minute" moments. We were trying to use database lifecycle events to trigger application-level actions - worker restarts, schedule reloads, process coordination. But Doctrine events are designed for database concerns: maintaining referential integrity, updating timestamps, logging changes.

What we were trying to do wasn't really a database concern at all. We wanted to send a message to the scheduling system saying "hey, something changed, you might want to reload your config". That's application logic, not data persistence logic.

And we already had the perfect tool for sending messages between different parts of the application: Symfony's message bus. The same message bus that was handling our task execution was sitting right there, designed exactly for this kind of "do something after the current operation finishes" use case.

So instead of trying to hack around Doctrine event timing, why not just dispatch a ScheduleReloadMessage when we need a worker restart?

// In the failure handling logic
if ($currentFailures >= 3) {
    $task->setActive(false);
    $this->entityManager->persist($task);
    $this->entityManager->flush();
    
    // Tell the scheduler to reload after this transaction commits
    $this->messageBus->dispatch(new ScheduleReloadMessage());
}

(NB: that's not the actual code being run, it's simplified for the sake of demonstration, the real code is @ src/MessageHandler/AbstractTaskHandler.php)

The message bus naturally handles the timing. Messages get processed after the current request/transaction completes, so there's no race condition between updating the database and reloading the schedule. The transaction commits first, then the message gets processed, then the worker restart happens with the correct data.

Plus we get proper separation of concerns: the task handler focuses on business logic (tracking failures, deactivating problematic tasks), and the message bus handles infrastructure concerns (coordinating worker restarts).

Sometimes the "clever" solution that requires fighting the framework is wrong, and the simple solution that works with the framework is right. We'd been so focused on making Doctrine events do what we wanted that we'd forgotten about the message infrastructure we'd already built.

Hindsight, eh?

The actual implementation is beautifully simple. The ScheduleReloadMessage is just an empty class - no properties, no constructor, just a marker to tell the system "reload the schedule":

class ScheduleReloadMessage
{
    // That's it. Sometimes the simplest solutions are the best ones.
}

And the ScheduleReloadMessageHandler just writes the timestamp to the file that triggers the worker restart:

#[AsMessageHandler]
class ScheduleReloadMessageHandler
{
    public function __invoke(ScheduleReloadMessage $message): void
    {
        file_put_contents($this->restartFilePath, time());
        $this->logger->info('Schedule reload triggered via message bus');
    }
}

Amazing how little code is required to solve what felt like a complex coordination problem.

Implementation walkthrough

With the message bus approach sorted, the actual implementation was straightforward. We ditched the TaskChangeListener entirely - no more Doctrine events, no more transaction timing issues, no more endless restart loops.

The failure tracking logic lives in the AbstractTaskHandler, which now takes the message bus as a constructor parameter:

public function __construct(
    private readonly LoggerInterface $tasksLogger,
    private readonly EntityManagerInterface $entityManager,
    private readonly MessageBusInterface $messageBus
) {}

The execution logic tracks failures and handles deactivation cleanly:

try {
    $result = $this->handle($task);
    
    // Reset failure count on success
    $execution->setFailureCount(0);
    $this->updateTaskExecution($task, $startTime, $executionTime, $result);
    
} catch (Throwable $e) {
    $execution = $this->getOrCreateExecution($task);
    $currentFailures = $execution->getFailureCount() + 1;
    $execution->setFailureCount($currentFailures);
    
    $errorMessage = 'ERROR: ' . $e->getMessage();
    
    if ($currentFailures >= self::MAX_FAILURES) {
        $task->setActive(false);
        $this->entityManager->persist($task);
        $this->entityManager->flush();
        
        $this->tasksLogger->warning('Task deactivated after repeated failures', [
            'task_id' => $task->getId(),
            'failure_count' => $currentFailures
        ]);
        
        // Schedule reload after transaction commits
        $this->messageBus->dispatch(new ScheduleReloadMessage());
        
        $errorMessage .= ' (Task deactivated after ' . $currentFailures . ' failures)';
    }
    
    $this->updateTaskExecution($task, $startTime, $executionTime, $errorMessage);
    throw $e;
}

The TaskExecution entity got the new failure tracking field:

#[ORM\\Column(nullable: false, options: ['default' => 0])]
private ?int $failureCount = 0;

And we needed to update the web interface to dispatch schedule reload messages when users make changes through the UI. The DynamicTaskController now injects the message bus and triggers reloads on create/update/delete operations:

public function create(Request $request): Response
{
    // ... form handling ...
    
    if ($form->isSubmitted() && $form->isValid()) {
        $this->entityManager->persist($task);
        $this->entityManager->flush();
        
        $this->messageBus->dispatch(new ScheduleReloadMessage());
        
        return $this->redirectToRoute('dynamic_task_index');
    }
}

Now both programmatic changes (task failures) and user-driven changes (web interface updates) use the same mechanism for triggering schedule reloads. Consistent, predictable, and no transaction timing issues.

Bonus features that fell out for free

Once we had the message bus approach working for failure handling, a few other features became trivial to implement. The infrastructure was already there - we just needed to wire up a few more use cases.

Delete tasks properly: We'd had a task listing interface but no way to actually delete tasks that were no longer needed. Adding a delete action to the controller was straightforward:

public function delete(DynamicTaskMessage $task): Response
{
    $this->entityManager->remove($task);
    $this->entityManager->flush();
    
    $this->messageBus->dispatch(new ScheduleReloadMessage());
    
    return $this->redirectToRoute('dynamic_task_index');
}

Delete the task, flush the change, tell the scheduler to reload. The deleted task disappears from the running schedule within seconds.

Ad-hoc task execution: Sometimes you want to run a task immediately for testing or troubleshooting, rather than waiting for its next scheduled time. Since we already had the message infrastructure, this was just a matter of dispatching a TaskMessage directly:

public function run(DynamicTaskMessage $task): Response
{
    $taskMessage = new TaskMessage(
        $task->getType(),
        $task->getId(),
        $task->getMetadata() ?? []
    );
    
    $this->messageBus->dispatch($taskMessage);
    
    $this->addFlash('success', 'Task execution requested');
    return $this->redirectToRoute('dynamic_task_index');
}

Add a "Run Now" button to the task listing, and users can trigger immediate execution without disrupting the normal schedule. Handy for testing new tasks or dealing with one-off requirements.

What wasn't immediately obvious to me (Claudia needed to point it out) is that these scheduled task classes I've got are just Symfony Message / MessageHandler classes. They work just as well like this in a "stand-alone" fashion as they do being wrangled by the scheduler. Really handy.

NullTaskHandler for testing: We added a NullTaskHandler that does absolutely nothing except log that it ran:

class NullTaskHandler extends AbstractTaskHandler
{
    protected function handle(DynamicTaskMessage $task): string
    {
        // Deliberately do nothing
        return 'NULL task completed successfully (did nothing)';
    }
}

Perfect for testing the scheduling system without any side effects. Create a "null" task, set it to run every 30 seconds, and watch the logs to verify everything's working properly. You can see tasks being scheduled, executed, and tracked without worrying about the task logic itself.

All of these features required minimal additional code because the core message bus infrastructure was already in place. Sometimes building the right foundation pays dividends in unexpected ways.

Claudia's summary: When the simple solution is staring you in the face

Right, Adam's asked me to reflect on this whole exercise. What started as "just add some failure counting" turned into a proper lesson in when to stop fighting the framework and start working with it.

The most striking thing about this debugging saga was how we got tunnel vision on making Doctrine events work. We spent ages trying to solve the transaction timing problem, then the entity filtering problem, then considering all sorts of hacky workarounds. When the answer was sitting right there in the message bus we'd already built.

It's a perfect example of the sunk cost fallacy in technical problem-solving. We'd invested time in the Doctrine event approach, so we kept trying to make it work rather than stepping back and asking "what would we do if we were designing this from scratch?"

The breakthrough came when we stopped thinking about the technical details (transaction boundaries, event timing, entity lifecycle) and started thinking about what we were actually trying to accomplish: send a message from one part of the system to another saying "something changed, please react accordingly". That's literally what message buses are designed for.

The resulting solution is cleaner than what we started with. No more Doctrine events trying to coordinate cross-process communication. No more transaction timing issues. Just straightforward message dispatch that works with Symfony's natural request/response cycle.

Sometimes the obvious solution really is the best one - if you can stop yourself from overthinking it long enough to see it.

Adam's bit

(also written by Claudia this time… a bit cheeky of her ;-)

Building robust failure handling taught me something important about production systems: the edge cases aren't really edge cases. Tasks will fail. Networks will be unreliable. External APIs will go down at the worst possible moment. Building a scheduling system without failure handling is like building a car without brakes - it might work fine until you actually need to stop.

The message bus approach solved our immediate problem, but it also gave us a better foundation for future features. Need to send notifications when tasks fail? Dispatch a message. Want to collect metrics about task performance? Another message. Need to coordinate with external systems? You get the idea.

Most importantly, we learned when to stop being clever. The Doctrine event approach felt sophisticated - using the framework's lifecycle hooks to automatically coordinate system state. But sophisticated isn't always better. Sometimes the straightforward solution that everyone can understand and debug is worth more than the clever solution that feels elegant.

Our scheduling system now handles failures gracefully, gives users control over task execution, and has a clean architecture that's easy to extend. Not bad for an afternoon's work, once we stopped overthinking it.

Righto.

--
Adam

Monday, 28 July 2025

Using RabbitMQ as a transport layer for Symfony Messaging

G'day:

First up, I'm building on the codebase I worked through in these previous articles:

Reading that lot would be good for context, but in short the first two go over setting up a (very) basic CRUD website that allows the editing of some entities, and on create/update/delete also does the relevant reindexing on Elasticsearch. The third article removes the indexing code from inline, and uses the Symfony Messaging system to dispatch messages ("index this"), and message handlers ("OK, I'll index that"). These were all running in-process during the web request.

The overall object of this exercise is to deal with this notion, when it comes to the Elasticsearch indexing:

[That] overhead is not actually needed for the call to action to be provided[…]. The user should not have to wait whilst the app gets its shit together[.]

When serving a web request, the process should be focusing on just what is necessary to respond with the next page. It should not be doing behind-the-scenes housework too.

Using the messaging system is step one of this - it separates the web request code from the Elasticsearch indexing code - but it's all still runnning as part of that process. We need to offload the indexing work to another process. This is what we're doing today.


Step 0

Step 0 is: confirm RabbitMQ will work for me here. I only know RabbitMQ exists as a thing. I've never used it, and never actually even read any docs on it. I kinda just assumed that "well it's an industry-standard queuing thingey, and Symfony works with a bunch of stuff out of the box, so it'll probably be fine". I read some stuff on their website (RabbitMQ), and RabbitMQ's Wikipedia page too. Seems legit. And I also checked Symfony for mention of integrating with it, and landed on this: Using RabbitMQ as a Message Broker, and that looked promising.


RabbitMQ Docker container

Something I can do without messing around with any app code or Symfony config is getting a RabbitMQ container up and running (and sitting there doing nothing).

It has an official RabbitMQ image, and the example docker run statements look simple enough:

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3

As this is gonna be used internally, no need for runing securely or with a non-default user etc, but obviously all those options are catered for too.

I also noted there's a variant of the image that contains a management app, so I decided to run with that. Converting the generic docker run statement to something for my docker-compose.yml file was easy enough:

docker/docker-compose.yml
services:

  # [...]

  rabbitmq:
    container_name: rabbitmq

    hostname: rabbitmq

    image: rabbitmq:4.1.2-management

    ports:
      - "5672:5672"
      - "15672:15672"

    stdin_open: true
    tty: true

Port 5672 is its standard application comms port; 15672 is for the manager. The hostname is needed for RabbitMQ's internals, and doesn't matter what it is for what I'm doing.

Building that worked fine, and the management UI was also up (login: guest, password: guest):

The only thing I'm going to be using the management UI for is to watch for messages going into the queue, should I need to troubleshoot anything.


Splitting the PHP work: web and worker

The main thing I am trying to achieve here is to lighten the load for the web-app, by farming work off to another "worker". This will be running the same codebase as the web app, but instead of running php-fpm to listen to traffic from a web server, it's going to run [whatever] needs to be run to pull messages of the queue and handle them. The web app puts a message on the queue; the worker app pulls them off the queue and does the processing.

So I need a second PHP container.

My initial naive attempt at doing this was to simply duplicate the entry in docker-compose.yml, and remove the port 9000 port mapping from php-worker as it won't be listening for web requests. This build and ran "fine", except for the entrypoints of both php-web and php-worker conflicted with each other, as they were both trying to do a composer install on the same volume-mounted vendor directory (the physical directory being on my host PC). This screwed both containers.

After a lot of trial and error (mostly error), I came up with a process as follows:

  1. Copy composer.json and composer.lock to /tmp/composer in the image file system, and run composer install during the build phase. This means that processing is already done by the time either container is brought up. For the Dockerfile to be able to do this, I needed to shift the build context in docker-compose.yml to be the app root, so it can see composer.json and composer.lock.
  2. Having those files in /tmp/composer/vendor is no help to anyone, so we need to copy the vendor directory to the app root directory once the container is up.
  3. As both php-web and php-worker need these same (exact same: they're looking at the same location in the host file system) vendor files, we're going to get just php-web to do the file copy, and get php-worker to wait until php-web is done before it comes up.

Here are the code changes, first for php-web

# docker/docker-compose.yml

services:
  # [...]

  php:
    container_name: php
    build:
      context: php
      dockerfile: Dockerfile

  php-web:
    container_name: php-web
    build:
      context: ..
      dockerfile: docker/php/Dockerfile

    # [...]
    
    entrypoint: ["/usr/local/bin/entrypoint-web.sh"]

  php-worker:
    container_name: php-worker
    build:
      context: ..
      dockerfile: docker/php/Dockerfile

    env_file:
      - mariadb/envVars.public
      - elasticsearch/envVars.public
      - php/envVars.public

    stdin_open: true
    tty: true

    volumes:
      - ..:/var/www

    healthcheck:
      test: ["CMD", "pgrep", "-f", "php bin/console messenger:consume rabbitmq"]
      interval: 30s
      timeout: 5s
      retries: 3
      start_period: 10s

    extra_hosts:
      - host.docker.internal:host-gateway

    secrets:
      - app_secrets

    entrypoint: ["/usr/local/bin/entrypoint-worker.sh"]

    depends_on:
      php-web:
        condition: service_healthy

Notes:

  • Just the naming and build context changes for the original PHP container.
  • Oh and it has its own entrypoint script now.
  • The config for php-worker is much the same as for php-web except:
    • Port 9000 doesn't need to be exposed: it's not going to be serving web requests.
    • It overrides the healthcheck in the Dockerfile with its own check: just that messenger:consume is running.
    • It has a different entrypoint than the web container.

Here's the relevant Dockerfile changes:

# docker/php/Dockerfile

FROM php:8.4.10-fpm-bookworm

RUN ["apt-get", "update"]
RUN ["apt-get", "install", "-y", "zip", "unzip", "git", "vim", "procps"]

# [...]
COPY docker/php/usr/local/etc/php/conf.d/error_reporting.ini /usr/local/etc/php/conf.d/error_reporting.ini
COPY docker/php/usr/local/etc/php/conf.d/app.ini /usr/local/etc/php/conf.d/app.ini

# [...]

RUN pecl install xdebug && docker-php-ext-enable xdebug
COPY docker/php/usr/local/etc/php/conf.d/xdebug.ini /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini

# [...]

WORKDIR /var/www
ENV COMPOSER_ALLOW_SUPERUSER=1

COPY --chmod=755 usr/local/bin/entrypoint.sh /usr/local/bin/
ENTRYPOINT ["entrypoint.sh"]
WORKDIR /tmp/composer
COPY composer.json composer.lock /tmp/composer/
ENV COMPOSER_ALLOW_SUPERUSER=1
RUN composer install --no-interaction --prefer-dist --no-scripts

# [...]

COPY --chmod=755 docker/php/usr/local/bin/entrypoint-web.sh /usr/local/bin/
COPY --chmod=755 docker/php/usr/local/bin/entrypoint-worker.sh /usr/local/bin/


EXPOSE 9000

And the entry point scripts:

# docker/php/usr/local/bin/entrypoint-web.sh

#!/bin/bash

rm -f /var/www/vendor/up.dat
cp -a /tmp/composer/vendor/. /var/www/vendor/
touch /var/www/vendor/up.dat

exec php-fpm
# docker/php/usr/local/bin/entrypoint-worker.sh

#!/bin/bash

exec php bin/console messenger:consume rabbitmq

That up.dat is checked in php-web's healthcheck now:

# bin/healthCheck.php

if (file_exists('/var/www/vendor/up.dat')) {
    echo 'pong';
}

This means it won't claim to be up until it's finished copying the vendor files, and php-worker won't come up until php-web is healthy.

I'm pretty sure those're all the PHP changes. I now have two PHP containers running: one handling web, the other handling messages.


Symfony config

The Messenger: Sync & Queued Message Handling docs explained all this pretty clearly.

I needed to install symfony/amqp-messenger, and for that to work I also needed to install the ext-amqp PHP extension. This needed some tweaks in the Dockerfile:

# docker/php/Dockerfile

# [...]

RUN [ \
    "apt-get", "install", "-y",  \
    "libz-dev", \
    "libzip-dev", \
    "libfcgi0ldbl", \
    "librabbitmq-dev" \
]
# [...]

RUN pecl install amqp
RUN docker-php-ext-enable amqp

# [...]

Then I needed to configure a MESSENGER_TRANSPORT_DSN Symfony "environment" variable in .env:

MESSENGER_TRANSPORT_DSN=amqp://guest:guest@host.docker.internal:5672/%2f/messages

(In prod I'd have to be more secure about that password, but it doesn't matter here).

And finally configure the Messaging system to use it:

# config/packages/messenger.yaml

framework:
  messenger:
    transports:
      rabbitmq:
        dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
    routing:
      'App\Message\*': rabbitmq

At this point when I rebuilt the containers, everything was happy until I ran my code…


App changes

In my last article I indicated some derision/bemusement about something in the docs:

The docs say something enigmatic:

There are no specific requirements for a message class, except that it can be serialized

Creating a Message & Handler

I mean that's fine, but it's not like their example implements the Serializable interface like one might expect from that guidance? From their example, I can only assume they mean "stick some getters on it", which is not really the same thing. Oh well.

Using Symfony's Messaging system to separate the request for work to be done and doing the actual work

And now I come to discover what they actually meant. And what they meant is that yes, the Message data should be Serializable. As in: via the interface implementation.

I had been passing around a LifecycleEventArgs implementation, eg one of PostPersistEventArgs, PostUpdateEventArgs or PreRemoveEventArgs, eg:

# src/EventListener/SearchIndexer.php

class SearchIndexer
{
    public function __construct(
        private readonly MessageBusInterface $bus
    ) {}

    public function postPersist(PostPersistEventArgs $args): void
    {
        $indexMessage = new SearchIndexAddMessage($args->getObject());
        $this->bus->dispatch($indexMessage);
    }

    public function postUpdate(PostUpdateEventArgs  $args): void
    {
        $indexMessage = new SearchIndexUpdateMessage($args->getObject());
        $this->bus->dispatch($indexMessage);
    }

    public function preRemove(PreRemoveEventArgs $args): void
    {
        $indexMessage = new SearchIndexDeleteMessage($args->getObject());
        $this->bus->dispatch($indexMessage);
    }
}

And those don't serialize, so I had to update the code to only pass the stdclass object that getObject() returned. And then likewise update src/MessageHandler/SearchIndexMessageHandler.php now that it doesn't need to call getObject itself, as it's already receiving that, eg:

#[AsMessageHandler]
public function handleAdd(SearchIndexAddMessage $message): void
{
    $this->searchIndexer->sync($message->getArgs()->getObject());
}

#[AsMessageHandler]
public function handleUpdate(SearchIndexUpdateMessage $message): void
{
    $this->searchIndexer->sync($message->getArgs()->getObject());
}

#[AsMessageHandler]
public function handleRemove(SearchIndexDeleteMessage $message): void
{
    $this->searchIndexer->delete($message->getArgs()->getObject());
}

Once I fixed that one glitch: it worked. When I edited an entity I could see a message going into the RabbitmQ queue (via the manager UI), and could see it being removed, and could see the results of the Elasticsearch update. Cool.

It took ages to get the PHP containers working properly - mostly the composer installation stuff - but the RabbitMQ and Symfony bits were really easy! Nice.

Righto.

--
Adam

Friday, 25 July 2025

Using Symfony's Messaging system to separate the request for work to be done and doing the actual work

G'day:

Context

What am I on about with that title? OK, let me explain.

In a few gigs I had in the past, we've had an issue of there being too much work to do to fulfil a request than we could allow to let run for a web app. Say there's half a dozen things to do when updating a record, and collectively they take 10sec to run: we can't have the user sitting around for 10sec to get the next call to action. This is made more annoying when most of that overhead is not actually needed for the call to action to be provided; it's (necessary) housekeeping to keep the application working, rather than to keep the user working. The user should not have to wait whilst the app gets its shit together, basically.

Previously I've seen this being solved in a number of ways:

  • Hide the processing in separate threads spun-off by the main request thread. This improves things for the user, but it doesn't lighten the load for the web app at all. And it's also at this point questionable as to whether this is actually the web app's job. It's clearly nothing to do with fulfilling web requests, if it's being done in the background.
  • Set some sort of work data somewhere, that indicates some processing of it needs to be done. Then have a separate process running tasks that check for the existence of work data, and process it if there is any. This is like a hand-cranked queuing system which operates via a task going "got anything for me? No. How about now? No. Now? Ooh yes, got one. What about now? No?". It works, but there's a lot of time having some code asking a question when the answer is "no". For most cases, we know when the work needs to be done: when the code that identifies the work says so. That's when it needs to be done.

So that's what I'm trying to solve. I've never had to solve this sort of thing before, but I have a suspicion that it's gonna involve a message queuing system.


Situation

The example I am going to use is the Elasticsearch integration I put into my test site over the last coupla days:

The first article is long; the second one is very short for me. In summary I have a test app that provides data-entry forms for a number of DB-backed entities. For the purposes of the search functionality, I am using Elasticsearch. And the articles shows how to keep Elasticsearch up to date at the same time as the DB updates are made.


Task at hand

Unfortunately for this exercise, the Elasticsearch operations are lightning quick, so it's really no issue to have the code execution "inline" with the web requests. But this processing falls foul of something I touched on above:

[That] overhead is not actually needed for the call to action to be provided[…]. The user should not have to wait whilst the app gets its shit together[.]

So I'm going to ignore the premature optimisation red flag, and am going to farm the Elasticsearch updates off to some other process that the web request doesn't need to worry about.

My chosen path here is to use the Symfony Messager Component, and ultimately have that working with RabbitMQ under the hood so I can farm-off the processing to a completely different app container, so as to reduce load on the web-app container.

I have never done any of this before, and I have to admit that I am currently struggling to even understand the docs. So this should… errr… be "interesting". Or a dumpster fire. One of those.

Spoiler

I do not get to the point of introducing RabbitMQ in this article. It was enough effort to get the messaging stuff working "in-process", so I'm splitting this exercise into 2-3 chunks.


Logging

To start with, I'm just gonna try to get messages and handlers working in-process via Symfony. So no performance gains, no RabbitMQ, and no second app container. And to watch things working (or not), my aim is to simply get a handler to log something if it receives a message. To do this I'm gonna need to install Monolog, which is straight forward.

docker exec php composer require symfony/monolog-bundle:^3.1.0
# config/packages/monolog.yaml

monolog:
  channels:
    - messaging

  handlers:
    messaging:
      type: stream
      path: '%kernel.logs_dir%/messaging.log'
      level: debug
      channels: ['messaging']

And test it:

// tests/Functional/System/MonologTest.php

namespace App\Tests\Functional\System;

use Monolog\Handler\TestHandler;
use Monolog\Level;
use PHPUnit\Framework\Attributes\TestDox;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;

class MonologTest extends KernelTestCase
{
    #[TestDox('It logs messages to the messaging log file')]
    public function testMessagingLog()
    {
        self::bootKernel();
        $container = self::$kernel->getContainer();

        $logger = $container->get('monolog.logger.messaging');

        $testHandler = new TestHandler();
        $logger->pushHandler($testHandler);

        $uniqueMessage = 'Test message ' . uniqid('', true);
        $logger->info($uniqueMessage);

        $this->assertTrue($testHandler->hasInfoRecords());
        $this->assertTrue($testHandler->hasRecord($uniqueMessage, Level::Info));

        $records = $testHandler->getRecords();
        $infoRecords = array_filter($records, fn($r) => $r['level'] === Level::Info->value);
        $lastInfoRecord = end($infoRecords);
        $this->assertEquals($uniqueMessage, $lastInfoRecord['message']);
    }
}

That's working fine.


Symfony Messenger

Installation is obvious:

docker exec php composer require symfony/messenger:7.3.*

That spews out a bunch of stuff, all simply informational. It's also installed a config/packages/messenger.yaml, but there's no actual settings in it. According to the informational stuff, we're good to go now.

Further up I linked to the docs for the Symfony Messager Component. Those are a good example of docs written for the author of the docs; or for someone who already knows what the docs are saying. They're largely impenetrable to me, someone who needs the docs to find out how the thing works because I don't already know. So many technical documents are written for the wrong audience like this. However I googled further and found these instead: Messenger: Sync & Queued Message Handling. These docs make sense.

The system works in two parts:

Message
A class that represents something needing to be done, and contains the data needed to do that thing.
MessageHandler
A class that is the handler for that sort of Message. When a Message is dispatched to the message bus, it's the MessageHandler that actually [does the thing].

In my case:

Message
The SearchIndexer's event handlers need some indexing work done by Elasticsearch. They will dispatch a Message with the data needing to be processed (add/update/delete). They will no longer do the actual indexing.
MessageHandler
When a Message is dispatched, Symfony will give the Message to the correct MessageHandler to process. The indexing code will be shifted to here.

Logging via messaging

But before I run, I need to walk. Before breaking my indexing by messing around for it, I'm going to create a Message that is a message to log; and a MessageHandler to log it.

// src/Message/LogMessage.php

namespace App\Message;

use Monolog\Level;

class LogMessage
{
    public function __construct(
        private readonly string $message,
        private readonly Level $level = Level::Info,
        private readonly array $context = []
    )
    {
    }

    public function getMessage(): string
    {
        return $this->message;
    }

    public function getLevel(): Level
    {
        return $this->level;
    }

    public function getContext(): array
    {
        return $this->context;
    }
}

This Message takes the fields that we want to log. The docs say something enigmatic:

There are no specific requirements for a message class, except that it can be serialized

Creating a Message & Handler

I mean that's fine, but it's not like their example implements the Serializable interface like one might expect from that guidance? From their example, I can only assume they mean "stick some getters on it", which is not really the same thing. Oh well.

I note from the docs @ the link above for the Serializable interface: PHP is deprecated it. Instead it is running with "just sling some magic methods in yer class, it'll all be grand". This seems like a backwards step, and a return to the wayward PHP language design of the 1990s-2000s. Shudder.

Anyway, that's why I have those getters in there.

Now the MessangeHandler:

// src/MessageHandler/LogMessageHandler.php

namespace App\MessageHandler;

use App\Message\LogMessage;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class LogMessageHandler
{
    public function __construct(
        private readonly LoggerInterface $messagingLogger,
    ) {
    }

    public function __invoke(LogMessage $message): void
    {
        $this->messagingLogger->log(
            $message->getLevel(),
            $message->getMessage(),
            $message->getContext()
        );
    }
}

The trick here is the AsMessageHandler attribute which makes sure the autowiring system knows what to do with this; and then the magic __invoke method does the work.

I'm coming back to this a week or so later, as I have to implement this stuff in another codebase. I realise now I missed a step in this bit of the exercise: as well as creating the Message and MessageHandler, I also need to actually dispatch a message, don't I? Otherwise ain't nuthin gonna get logged.

As this logging test was only temporary, I no longer have it in the codebase, but I'm gonna try to recreate here. The log entry (below) is being triggered in the postUpdate method of src/EventListener/SearchIndexer.php. We'd need to add the dispatch call like this:

public function postUpdate(PostUpdateEventArgs  $args): void
{
    // [...]

    $logMessage = new LogMessage(
        'SearchIndexer: postUpdate',
        $args->getObject(),
    );
    $this->bus->dispatch($logMessage);
	
    // [...]

Once we do that, then… we can continue below.

Having done that, I tail the log file, and make an edit to one of my entities:

$ docker exec -it php tail -f /var/log/symfony/messaging.log

[2025-07-25T15:37:45.699028+01:00] messaging.INFO: SearchIndexer: postUpdate {"entity":{"App\\Entity\\Institution":{"id":193,"name":"Alenestad WootyWoo Polytechnic","address":"9319 King WootyWoo Suite 361Kerlukeshire, LA 00306","city":"Alenestad WootyWoo","postalCode":"64378-8198","country":"Cocos (WootyWoo) Islands","establishedYear":1998,"type":"polytechnic","website":"https://example.com/accusamus-esse-natus-excepturi-consequuntur-culpa-ut-officia-asperiores"}}} []

And there we have a log entry going in. Cool. That worked. Easy.

I did actually have an issue the first time around. Before running anything from the UI, I had run that test to ensure logging was working. Because that's run from the shell, it runs as root… which means the log file gets created as root too. When I then triggered the message handling via the UI, the code was running as www-data (this is what php-fpm runs as), and it did not have perms to write to the log file, so it went splat. This just required me changing the ownership on the log file to www-data, and we were all good.


Elasticsearch indexing via Messaging

OK, time to do it for realsies.

Looking at what I need to do, I have three different indexing events to deal with: adding a record on postPersist; updating a record on postUpdate, and deleting a record on preRemove. Each handler method receives different arguments, so in a way I wonder if these are three different Messages? On the other hand, I could use one Message and have an "action" property which is then used in the MessageHandler to decide what indexing task to run. The latter sounds like less code; but it also seems like subpar design, having to have a magic string on both ends of the messaging system to be matched against one another. Hrm.

OK, I can see how I can possibly make this not awful (low bar I'm setting myself there). How's this…

First I have an abstract Message class:

// src/Message/AbstractSearchIndexMessage.php

namespace App\Message;

use Doctrine\Persistence\Event\LifecycleEventArgs;

abstract class AbstractSearchIndexMessage
{
    public function __construct(
        private readonly LifecycleEventArgs $args
    )
    { }

    public function getArgs(): LifecycleEventArgs
    {
        return $this->args;
    }
}

LifecycleEventArgs is the base class of all of those subclasses that are the args for each of the event handlers, eg:

public function postPersist(PostPersistEventArgs $args): void
// [...]

public function postUpdate(PostUpdateEventArgs  $args): void
// [...]

public function preRemove(PreRemoveEventArgs $args): void
// [...]

Then we have three concrete implementations for each of the three Message types we have:

// src/Message/SearchIndexAddMessage.php


namespace App\Message;

class SearchIndexAddMessage extends AbstractSearchIndexMessage
{

}



// src/Message/SearchIndexUpdateMessage.php

namespace App\Message;

class SearchIndexUpdateMessage extends AbstractSearchIndexMessage
{

}


// src/Message/SearchIndexDeleteMessage.php

namespace App\Message;

class SearchIndexDeleteMessage  extends AbstractSearchIndexMessage
{

}

No code duplication. The sub-classing is just to give us unique Messages to dispatch and then handle.

We can deal with all of those in one MessageHandler class:

// src/MessageHandler/SearchIndexMessageHandler.php

namespace App\MessageHandler;

use App\Message\SearchIndexAddMessage;
use App\Message\SearchIndexDeleteMessage;
use App\Message\SearchIndexUpdateMessage;
use App\Service\ElasticSearchIndexerService;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

class SearchIndexMessageHandler
{
    public function __construct(
        private readonly ElasticSearchIndexerService $searchIndexer,
    ) {
    }

    #[AsMessageHandler]
    public function handleAdd(SearchIndexAddMessage $message): void
    {
        $this->searchIndexer->sync($message->getArgs()->getObject());
    }

    #[AsMessageHandler]
    public function handleUpdate(SearchIndexUpdateMessage $message): void
    {
        $this->searchIndexer->sync($message->getArgs()->getObject());
    }

    #[AsMessageHandler]
    public function handleRemove(SearchIndexDeleteMessage $message): void
    {
        $this->searchIndexer->delete($message->getArgs()->getObject());
    }
}

Unlike the LogMessageHandler we had before where the entire class was marked as #[AsMessageHandler], here I'm tagging individual methods. And this is why we needed the separate Message classes: Symfony uses them for autowiring: see how each handler method expects its $args to be one of those subclasses, as appropriate. This is how which handler method to call for each Message type is identified.

You possibly noticed that I'm introducing a new service class here: ElasticSearchIndexerService:

namespace App\Service;

use App\Entity\SyncableToElasticsearch;
use Symfony\Component\Routing\Generator\UrlGeneratorInterface;

class ElasticSearchIndexerService
{
    public function __construct(
        private readonly ElasticsearchAdapter $adapter,
        private readonly UrlGeneratorInterface $urlGenerator,
    )
    {
    }

    public function sync(object $object): void
    {
        if (!$object instanceof SyncableToElasticsearch) {
            return;
        }

        $doc = $object->toElasticsearchDocument();
        $doc['body']['_meta'] = [
            'type' => $object->getShortName(),
            'title' => $object->getSearchTitle(),
            'url' => $this->urlGenerator->generate(
                $object->getShortName() . '_view',
                ['id' => $object->getId()]
            ),
        ];

        $this->adapter->indexDocument($doc['id'], $doc['body']);
    }

    public function delete(object $object): void
    {
        if (!$object instanceof SyncableToElasticsearch) {
            return;
        }

        $this->adapter->deleteDocument($object->getElasticsearchId());
    }
}

This code was in SearchIndexer (1.2) before, because that's where it was needed. Now the indexer is much more simple:

// src/EventListener/SearchIndexer.php (1.3)

namespace App\EventListener;

use App\Message\SearchIndexAddMessage;
use App\Message\SearchIndexDeleteMessage;
use App\Message\SearchIndexUpdateMessage;
use Doctrine\Bundle\DoctrineBundle\Attribute\AsDoctrineListener;
use Doctrine\ORM\Event\PostPersistEventArgs;
use Doctrine\ORM\Event\PostUpdateEventArgs;
use Doctrine\ORM\Event\PreRemoveEventArgs;
use Doctrine\ORM\Events;
use Symfony\Component\Messenger\MessageBusInterface;

#[AsDoctrineListener(event: Events::postPersist, priority: 500, connection: 'default')]
#[AsDoctrineListener(event: Events::postUpdate, priority: 500, connection: 'default')]
#[AsDoctrineListener(event: Events::preRemove, priority: 500, connection: 'default')]
class SearchIndexer
{
    public function __construct(
        private readonly MessageBusInterface $bus
    ) {}

    public function postPersist(PostPersistEventArgs $args): void
    {
        $indexMessage = new SearchIndexAddMessage($args);
        $this->bus->dispatch($indexMessage);
    }

    public function postUpdate(PostUpdateEventArgs  $args): void
    {
        $indexMessage = new SearchIndexUpdateMessage($args);
        $this->bus->dispatch($indexMessage);
    }

    public function preRemove(PreRemoveEventArgs $args): void
    {
        $indexMessage = new SearchIndexDeleteMessage($args);
        $this->bus->dispatch($indexMessage);
    }
}

All it does is listen for the Doctrine events, and dispatch relevant messages. It doesn't know anything about how to index stuff, it just knows how ask "someone needs to do this, please".

And the only other thing I needed to do was to update the ReindexSearchCommand to use the ElasticSearchIndexerService rather than SearchIndexer, given I had moved that code.

Having done that lot: everything work same as before. So not much forward progress, but our ducks are far better lined-up ready for the next bit of the work.


That was a chunk of work, code and writing. I'm going to pause this exercise here and publish this article as it seems like a reasonable juncture to do so. Plus I want a beer. I have some thinking to do before I introduce RabbitMQ into this, and to use a different PHP container to run the code for the message handling. And I think there will be a code refactoring exercise first. However I am not sure yet, and need to sleep on it.

Righto.

--
Adam