Refactor point value object and add observability

This commit is contained in:
Bernard Ngandu
2025-10-10 14:55:36 +02:00
parent 8a43d3967c
commit 68eb54995f
46 changed files with 3691 additions and 229 deletions
@@ -0,0 +1,67 @@
<?php
declare(strict_types=1);
namespace App\MessageHandler;
use App\Message\SignalCreatedMessage;
use App\Repository\SignalRepository;
use App\Service\SignalSnapshotBuilder;
use DateTimeImmutable;
use DateTimeZone;
use Symfony\Component\Mercure\HubInterface;
use Symfony\Component\Mercure\Update;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\Attribute\Autowire;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use function json_encode;
use const JSON_THROW_ON_ERROR;
use Throwable;
#[AsMessageHandler]
final class SignalCreatedMessageHandler
{
public function __construct(
private readonly SignalRepository $signals,
private readonly SignalSnapshotBuilder $snapshotBuilder,
private readonly HubInterface $hub,
#[Autowire('%app.signal_stream_topic%')]
private readonly string $topic,
#[Autowire('%app.signal_snapshot_limit%')]
private readonly int $snapshotLimit,
private readonly ?LoggerInterface $logger = null,
) {
}
public function __invoke(SignalCreatedMessage $message): void
{
$recentSignals = $this->signals->findRecent($this->snapshotLimit);
$snapshot = $this->snapshotBuilder->build($recentSignals);
$payload = [
'type' => 'snapshot',
'payload' => [
'points' => $snapshot['points'],
'density' => $snapshot['density'],
'latestByUser' => $snapshot['latestByUser'],
'totals' => $snapshot['totals'],
'updatedAt' => (new DateTimeImmutable('now', new DateTimeZone('UTC')))->format(DATE_ATOM),
],
];
$data = json_encode($payload, JSON_THROW_ON_ERROR);
$update = new Update(
topics: $this->topic,
data: $data,
);
try {
$this->hub->publish($update);
} catch (Throwable $exception) {
$this->logger?->error('Failed to publish signal update to Mercure.', [
'exception' => $exception,
]);
}
}
}