Files
points-of-interest/server/src/MessageHandler/SignalCreatedMessageHandler.php
T

64 lines
2.1 KiB
PHP

<?php
declare(strict_types=1);
namespace App\MessageHandler;
use App\Message\SignalCreatedMessage;
use App\Repository\SignalRepository;
use App\Service\SignalSnapshotBuilder;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\Attribute\Autowire;
use Symfony\Component\Mercure\HubInterface;
use Symfony\Component\Mercure\Update;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Serializer\Exception\ExceptionInterface;
use Symfony\Component\Serializer\SerializerInterface;
use Throwable;
#[AsMessageHandler]
final readonly class SignalCreatedMessageHandler
{
public function __construct(
private SignalRepository $signals,
private SignalSnapshotBuilder $snapshotBuilder,
private HubInterface $hub,
private SerializerInterface $serializer,
private LoggerInterface $logger,
#[Autowire('%app.signal_stream_topic%')] private string $topic,
#[Autowire('%app.signal_snapshot_limit%')] private int $snapshotLimit,
) {
}
/**
* @throws ExceptionInterface
*/
public function __invoke(SignalCreatedMessage $message): void
{
$recentSignals = $this->signals->findRecent($this->snapshotLimit);
$snapshot = $this->snapshotBuilder->build($recentSignals);
$payload = [
'type' => 'snapshot',
'payload' => [
'points' => $snapshot['points'],
'density' => $snapshot['density'],
'latestByUser' => $snapshot['latestByUser'],
'totals' => $snapshot['totals'],
'updatedAt' => new \DateTimeImmutable('now', new \DateTimeZone('UTC'))->format(DATE_ATOM),
],
];
$data = $this->serializer->serialize($payload, 'json');
$update = new Update(topics: $this->topic, data: $data);
try {
$this->hub->publish($update);
} catch (Throwable $exception) {
$this->logger->error('Failed to publish signal update to Mercure.', [
'exception' => $exception,
]);
}
}
}