Thursday 9 February 2023

PHP: looking at spatie/async

G'day:

For no good reason at all, other than it piquing my interest, I've decided to mess around with spatie/async, which is - in its own words:

[A] small and easy wrapper around PHP's PCNTL extension. It allows running of different processes in parallel, with an easy-to-use API.

I got onto it because after messing around with Guzzle the other day ("PHP: looking at ways of making HTTP requests"), I got to wondering how PHP is doing async stuff. It occurs to me now I didn't think to just check which lib(s) Guzzle was using; I just googled, and found a few options, and spatie/async seemed to be the simplest / most-no-nonsense of the lot, so thought would start here.

I'm gonna "experiment via test", as is my wont.


Set-up

First of all(*) though, I need to install PCNTL, which is not installed by default in PHP's stock Docker image. It's easy though, just add this to my Dockerfile and rebuild my container:

RUN docker-php-ext-configure pcntl && docker-php-ext-install pcntl

(*) I say "first of all", but - full disclosure - I missed the note in the first sentence of the docs (ie: that sentence I quoted above) that it was required, and wrote a whole bunch of code, and was left wondering "why is this not running async?". But only for like an hour or two. Ahem. Cough. Anyway. Moving right along.

First I have set up a proc that I can run that just takes ages to run. I'm gonna use this to tie-up PHP processing. Pardon my pathetic MySQL skillz:

USE db1;

DROP PROCEDURE IF EXISTS sleep_and_return;
DELIMITER //
CREATE PROCEDURE sleep_and_return(IN seconds INT)
BEGIN
    DO SLEEP(seconds);
    SELECT seconds;
END //
DELIMITER ;

Baseline: it runs stuff asynchronously

And now my first test:

/** @testdox It can call a slow proc multiple times async */
public function testSlowProcAsync()
{
    $connection = DB::getDbalConnection();

    $pool = Pool::create();

    $startTime = microtime(true);
    for ($i = 1; $i <= 3; $i++) {
        $pool->add(function () use ($connection, $i, $startTime) {
            $result = $connection->executeQuery("CALL sleep_and_return(?)", [2]);
            return sprintf(
                 "%d:%d:%d",
                 $i,
                 $result->fetchOne(),
                 microtime(true) - $startTime
             );
        });
    }
    $poolPopulationTime = microtime(true) - $startTime;
    $this->assertLessThanOrEqual(1, $poolPopulationTime);

    $startTime = microtime(true);
    $results = $pool->wait();
    $endTime = microtime(true);
    $executionTime = $endTime - $startTime;

    $this->assertLessThan(3, $executionTime);
    $this->assertCount(3, $results);
    $this->assertContains("1:2:2", $results, "1:2:2 not found in " . implode(",", $results));
    $this->assertContains("2:2:2", $results, "2:2:2 not found in " . implode(",", $results));
    $this->assertContains("3:2:2", $results, "3:2:2 not found in " . implode(",", $results));
}

There's a bit going on here (it's not my first iteration of this test, I have to admit).

  • The premise is that each task I need to process async is a call to that proc which I am making sleep for two seconds. Three tasks doing that is six seconds of synchronous work; or only around two seconds or so if each is running asynchronously.
  • This thing seems to work by having a pool, then adding processes to run asynchronously.
  • I take note of the time before I start adding processes to the pool, so I can take note of how long each takes to run (should be ~2sec).
  • I return those metrics, and they end up being what's returned from the wait call at the end. Easy.
  • I also note how long it took to populate the pool. This should be pretty quick (less than even one of the processes running).
  • I then time how long the pool takes to run. Each process is ~2sec, and if they're running asynchronously the whole thing should finish in less time than it would take two of them to run.
  • Lastly I verify the results are what I'd expect. Note that I'm not testing an exact array, but just looking for each element: as each process is async, I can't really assume that they'll complete (and populate that array) in an specific sequence.

The metrics I'm grabbing are not as clever as they could be. I'm assuming each process will finish around 2sec after I start the timer before adding them to the pool. There's overhead in the adding, and calling the code as well as the 2sec pause at the DB, and sometimes the latter 1-2 assertions fail. I've added the failure message to see what actually gets returned, but sadly I have not been able to see it fail since I added those. I imagine it's a case that ~3sec has passed between one of the latter process callbacks being added, and them actually finishing running. So like the metrics returned for that process might be 2:2:3, not 2:2:2: the last number is the duration since we started adding the processes to the pool.

Update

I'm writing this the day after I wrote the paragraph above.

I got the test failure, and this bears out my theory from before, pretty miuch:

tests of spatie/async (https://github.com/spatie/async)
  It can call a slow proc multiple times async
   
    1:2:2 not found in 3:2:3,1:2:3,2:2:3            
    Failed asserting that an array contains '1:2:2'.
   
    /var/www/tests/Functional/Async/SpatieAsyncTest.php:41
     
  

Note the array returned in the results: 3:2:3,1:2:3,2:2:3. In each of these the process took three seconds to complete. Probably because I had only just started my containers, and the PHP code was not compiled and cached, and the DB had only just started. So, yeah, not a great test, but it helped show this issue, which is something.


Adding a then handler

The next test is very similar, except I'm chaining a then handler onto the process:

/** @testdox It uses a then handler which acts on the result */
public function testSlowProcAsyncThen()
{
    $connection = DB::getDbalConnection();

    $pool = Pool::create();

    $startTime = microtime(true);
    for ($i = 1; $i <= 3; $i++) {
        $pool
            ->add(function () use ($connection) {
                return $connection->executeQuery("CALL sleep_and_return(?)", [2]);
            })
            ->then(function ($result) use ($i, $startTime) {
                return sprintf(
                    "%d:%d:%d",
                    $i,
                    $result->fetchOne(),
                    microtime(true) - $startTime
                );
            });
    }

    $startTime = microtime(true);
    $results = $pool->wait();
    $endTime = microtime(true);
    $executionTime = $endTime - $startTime;

    $this->assertLessThan(3, $executionTime);
    $this->assertCount(3, $executionTimes);
    $this->assertContains("1:2:2", $results, "1:2:2 not found in " . implode(",", $results));
    $this->assertContains("2:2:2", $results, "2:2:2 not found in " . implode(",", $results));
    $this->assertContains("3:2:2", $results, "3:2:2 not found in " . implode(",", $results));
}
  • The task returns the DB statement this time.
  • And the then handler receives that, and now it returns the metrics.

Everything else is the same.

And this test goes splat:

Exception : Serialization of 'PDOStatement' is not allowed

OK, fair enough. Not everything is serialisable, and I'm guessing this is how objects are being passed to different PHP processes. Good to know. I will update my test to pass the result, not the whole statement object:

$pool
    ->add(function () use ($connection) {
        return$result = $connection->executeQuery("CALL sleep_and_return(?)", [2]);
        return $result->fetchOne();
    })
    ->then(function ($result) use ($i, $startTime) {
        return sprintf(
            "%d:%d:%d",
            $i,
            $result,
            microtime(true) - $startTime
        );
    });

But this is wrong too:

1:2:2 not found in 2,2,2
Failed asserting that an array contains '1:2:2'.

What's going on here? Ah. OK, so the final result that is returned from $pool->wait() is the return values from the initial process callbacks, not the end result of the then handler. My process callbacks are returning the result of the proc call, which is just the number originally passed into it (2), hence the result array being [2,2,2]. If I want to access the result of the processing done in the then I'm gonna need to grab it separately:

public function testSlowProcAsyncThen()
{
    $connection = DB::getDbalConnection();

    $pool = Pool::create();

    $metrics = [];
    $startTime = microtime(true);
    for ($i = 1; $i <= 3; $i++) {
        $pool
            ->add(function () use ($connection) {
                $result = $connection->executeQuery("CALL sleep_and_return(?)", [2]);
                return $result->fetchOne();
            })
            ->then(function ($result) use (&$metrics, $i, $startTime) {
                $metrics[] = sprintf(
                    "%d:%d:%d",
                    $i,
                    $result,
                    microtime(true) - $startTime
                );
            });
    }

    $startTime = microtime(true);
    $pool->wait();
    $endTime = microtime(true);
    $executionTime = $endTime - $startTime;

    $this->assertLessThan(3, $executionTime);
    $this-->assertCount(3, $results);
    $this->assertCount(3, $metrics);
    $this->assertContains("1:2:2", $metrics, "1:2:2 not found in " . implode(",", $metrics));
    $this->assertContains("2:2:2", $metrics, "2:2:2 not found in " . implode(",", $metrics));
    $this->assertContains("3:2:2", $metrics, "3:2:2 not found in " . implode(",", $metrics));
}

And now this gives expected results.


Handling timeouts

Next up, I'm having a look at the timeout one can put on a pool. Similar code again:

/** @testdox It supports a timeout */
public function testSlowProcAsyncTimeout()
{
    $connection = DB::getDbalConnection();

    $pool = Pool::create();
    $pool->timeout(1);

    $timeOuts = [];

    $startTime = microtime(true);
    for ($i = 1; $i <= 3; $i++) {
        $pool
            ->add(function () use ($connection) {
                $result = $connection->executeQuery("CALL sleep_and_return(?)", [2]);
                return $result->fetchOne();
            })
            ->timeout(function () use (&$timeOuts, $i, $startTime) {
                $timeOuts[] = sprintf(
                    "TIMED OUT ON ITERATION %d after %d seconds",
                    $i,
                    microtime(true) - $startTime
                );
                return false;
            });
    }

    $results = $pool->wait();
    $endTime = microtime(true);
    $executionTime = $endTime - $startTime;

    $this->assertEquals(
        [
            "TIMED OUT ON ITERATION 1 after 1 seconds",
            "TIMED OUT ON ITERATION 2 after 1 seconds",
            "TIMED OUT ON ITERATION 3 after 1 seconds"
        ],
        $timeOuts
    );

    $this->assertLessThan(2, $executionTime);

    $this->assertEquals([], $results);
}
  • I set a timeout of 1sec.
  • I have a timeout handler that returns some metrics.
  • Note that the value returned from wait is an empty array this time.

Handling exceptions

Next a much more simplified test, looking at exception handling:

/** @testdox it supports exception handling */
public function testAsyncException()
{
    $pool = Pool::create();
    $pool
        ->add(function () {
            throw new \Exception("This is an exception");
        })
        ->catch(function (\Exception $exception) {
            $this->assertEquals("This is an exception", $exception->getMessage());
        });

    $pool->wait();
}

There's no surprises there, except the assertion fails, because $exception->getMessage() is not "This is an exception", it's this mess:

'This is an exception\n
\n
#0 [internal function]: adamcameron\php8\tests\Functional\Async\SpatieAsyncTest::{closure}()\n
#1 /var/www/vendor/laravel/serializable-closure/src/Serializers/Native.php(91): call_user_func_array(Object(Closure), Array)\n
#2 [internal function]: Laravel\SerializableClosure\Serializers\Native->__invoke()\n
#3 /var/www/vendor/laravel/serializable-closure/src/SerializableClosure.php(48): call_user_func_array(Object(Laravel\SerializableClosure\Serializers\Native), Array)\n
#4 /var/www/vendor/spatie/async/src/Runtime/ChildRuntime.php(26): Laravel\SerializableClosure\SerializableClosure->__invoke()\n
#5 {main}'

Exactly that. The exception message has somehow been polluted with a stack trace.

Initially I thought it might be because the library was wrapping my exception in another one (kinda fair enough), but it's not: it's just an exception. If I wasn't a complete n00b with this (and how PHP handles async stuff), and also wasn't incredibly rusty with PHP, I'd probably raise an issue with the library maintainers. But: I'll mull it over a bit first, to see if I can work out what dumbarsery I might be engaging in.


I decided to push the exception down a bit, and throw it in a then handler. Disappointing results here:

/** @testdox it does not support exception handling from a then handler */
public function testAsyncExceptionFromThen()
{
    $this->expectException(\Exception::class);
    $this->expectExceptionMessage("This is an exception");

    $pool = Pool::create();
    $pool
        ->add(function () {
            // do nothing
        })
        ->then(function () {
            throw new \Exception("This is an exception");
        })
        ->catch(function (\Exception $exception) {
            $this->assertStringStartsWith("This is an exception", $exception->getMessage());
        });

    $pool->wait();
}

The exception wasn't handled by the catch handler. It seems it'll only catch exceptions from the main process.


Stopping the pool in its tracks

I'm changing the test approach now, indeed I'm lifting their sample code from the docs and turning it into a test:

/**  @testdox a pool can be stopped */
public function testPoolStop()
{
    $pool = Pool::create();

    for ($i = 0; $i < 10000; $i++) {
        $pool->add(function () {
            return rand(0, 100);
        })->then(function ($output) use ($pool) {
            // If one of them randomly picks 100, end the pool early.
            if ($output === 100) {
                $pool->stop();
            }
        });
    }

    $results = $pool->wait();

    $this->assertLessThan(10000, count($results));
    $this->assertContains(100, $results);
}

This is obviously really contrived, but it's a way to see that the stop method does the trick.

I decided to see if it would also work in a catch handler:

/**  @testdox a pool can be stopped in a catch */
public function testPoolStopInCatch()
{
    $pool = Pool::create();

    for ($i = 0; $i < 10000; $i++) {
        $pool->add(function () {
            $result = rand(0, 100);
            if ($result === 100) {
                throw new \Exception("Something went wrong");
            }
            return $result;
        })->catch(function (\Exception $_) use ($pool) {
            $pool->stop();
        });
    }

    $results = $pool->wait();

    $this->assertLessThan(10000, count($results));
    $this->assertNotContains(100, $results);
}

Note that in this case, unlike the previous one, the results won't include the 100, because it never gets returned.


Stopping the Cameron in its tracks

There's more to look at, but this article is already quite long, and this is a reasonable stopping point because the next bit deals with alterntive syntaxes. I'll do that tomorrow (-ish).

This seems like pretty handy stuff so far, although there's that exception weirdness to mull over, and also I need to reason why I can't catch exceptions thrown in the then handlers.

All the methods are linked-to inline, but for the record they're all here: /tests/Functional/Async/SpatieAsyncTest.php.

Righto.

--
Adam