Skip to content

Commit 250a392

Browse files
committed
Fixed in memory implementation of event store
1 parent b6035db commit 250a392

File tree

3 files changed

+83
-16
lines changed

3 files changed

+83
-16
lines changed

src/Kreta/SharedKernel/Infrastructure/Persistence/InMemory/EventStore/InMemoryEventStore.php

+41-13
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
namespace Kreta\SharedKernel\Infrastructure\Persistence\InMemory\EventStore;
1616

1717
use Kreta\SharedKernel\Domain\Model\AggregateDoesNotExistException;
18+
use Kreta\SharedKernel\Domain\Model\DomainEvent;
1819
use Kreta\SharedKernel\Domain\Model\DomainEventCollection;
1920
use Kreta\SharedKernel\Event\EventStore;
2021
use Kreta\SharedKernel\Event\Stream;
@@ -29,7 +30,7 @@ public function __construct()
2930
$this->store = [];
3031
}
3132

32-
public function appendTo(Stream $stream) : void
33+
public function append(Stream $stream) : void
3334
{
3435
foreach ($stream->events() as $event) {
3536
$content = [];
@@ -52,18 +53,7 @@ public function streamOfName(StreamName $name) : Stream
5253
$events = new DomainEventCollection();
5354
foreach ($this->store as $event) {
5455
if ($event['stream_name'] === $name->name()) {
55-
$eventData = json_decode($event['content']);
56-
$eventReflection = new \ReflectionClass($event['type']);
57-
$parameters = $eventReflection->getConstructor()->getParameters();
58-
$arguments = [];
59-
foreach ($parameters as $parameter) {
60-
foreach ($eventData as $key => $data) {
61-
if ($key === $parameter->getName()) {
62-
$arguments[] = $data;
63-
}
64-
}
65-
}
66-
$events->add(new $event['type'](...$arguments));
56+
$events->add($this->buildEvent($event));
6757
}
6858
}
6959
if (0 === $events->count()) {
@@ -72,4 +62,42 @@ public function streamOfName(StreamName $name) : Stream
7262

7363
return new Stream($name, $events);
7464
}
65+
66+
public function eventsSince(?\DateTimeInterface $since, int $offset = 0, int $limit = -1) : array
67+
{
68+
$since = $since instanceof \DateTimeInterface ? $since->getTimestamp() : 0;
69+
70+
$events = array_map(function (array $event) use ($since) {
71+
$domainEvent = $this->buildEvent($event);
72+
if ($domainEvent->occurredOn()->getTimestamp() >= $since) {
73+
$evenContent = json_decode($event['content'], true);
74+
$evenContent['occurredOn'] = $domainEvent->occurredOn()->getTimestamp();
75+
76+
return [
77+
'stream_name' => $event['stream_name'],
78+
'type' => $event['type'],
79+
'content' => $evenContent,
80+
];
81+
}
82+
}, $this->store);
83+
84+
return array_slice($events, $offset);
85+
}
86+
87+
private function buildEvent(array $event) : DomainEvent
88+
{
89+
$eventData = json_decode($event['content']);
90+
$eventReflection = new \ReflectionClass($event['type']);
91+
$parameters = $eventReflection->getConstructor()->getParameters();
92+
$arguments = [];
93+
foreach ($parameters as $parameter) {
94+
foreach ($eventData as $key => $data) {
95+
if ($key === $parameter->getName()) {
96+
$arguments[] = $data;
97+
}
98+
}
99+
}
100+
101+
return new $event['type'](...$arguments);
102+
}
75103
}

src/Kreta/SharedKernel/Infrastructure/Persistence/Redis/EventStore/RedisEventStore.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
use Kreta\SharedKernel\Domain\Model\AggregateDoesNotExistException;
1818
use Kreta\SharedKernel\Domain\Model\DomainEventCollection;
19-
use Kreta\SharedKernel\Event\StoredEvent;
2019
use Kreta\SharedKernel\Event\EventStore;
20+
use Kreta\SharedKernel\Event\StoredEvent;
2121
use Kreta\SharedKernel\Event\Stream;
2222
use Kreta\SharedKernel\Event\StreamName;
2323
use Kreta\SharedKernel\Serialization\Serializer;

tests/Spec/Kreta/SharedKernel/Infrastructure/Persistence/InMemory/EventStore/InMemoryEventStoreSpec.php

+41-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ function it_appends_to(Stream $stream, StreamName $streamName)
3737
$stream->events()->shouldBeCalled()->willReturn($eventCollection);
3838
$stream->name()->shouldBeCalled()->willReturn($streamName);
3939
$streamName->name()->shouldBeCalled()->willReturn('dummy');
40-
$this->appendTo($stream);
40+
$this->append($stream);
4141
}
4242

4343
function it_get_stream_of_name_given(Stream $stream, StreamName $streamName)
@@ -48,7 +48,7 @@ function it_get_stream_of_name_given(Stream $stream, StreamName $streamName)
4848
$stream->events()->shouldBeCalled()->willReturn($eventCollection);
4949
$stream->name()->shouldBeCalled()->willReturn($streamName);
5050
$streamName->name()->shouldBeCalled()->willReturn('dummy');
51-
$this->appendTo($stream);
51+
$this->append($stream);
5252

5353
$this->streamOfName($streamName)->shouldReturnAnInstanceOf(Stream::class);
5454
}
@@ -60,4 +60,43 @@ function it_does_not_get_any_aggregate(StreamName $streamName, Id $aggregateId)
6060

6161
$this->shouldThrow(AggregateDoesNotExistException::class)->duringStreamOfName($streamName);
6262
}
63+
64+
function it_gets_events_since_given_date(\DateTimeImmutable $since, Stream $stream, StreamName $streamName)
65+
{
66+
$domainEvent = new DomainEventStub('foo', 'bar');
67+
$eventCollection = new DomainEventCollection([$domainEvent]);
68+
$stream->events()->shouldBeCalled()->willReturn($eventCollection);
69+
$stream->name()->shouldBeCalled()->willReturn($streamName);
70+
$streamName->name()->shouldBeCalled()->willReturn('dummy');
71+
$this->append($stream);
72+
73+
$this->eventsSince($since)->shouldReturn([
74+
[
75+
'stream_name' => 'dummy',
76+
'type' => DomainEventStub::class,
77+
'content' => [
78+
'bar' => 'bar',
79+
'foo' => 'foo',
80+
'occurredOn' => $domainEvent->occurredOn()->getTimestamp(),
81+
],
82+
],
83+
]);
84+
}
85+
86+
function it_gets_empty_events_when_since_is_higher_than_persisted_events_occurred_on(
87+
\DateTimeImmutable $since,
88+
Stream $stream,
89+
StreamName $streamName
90+
) {
91+
$domainEvent = new DomainEventStub('foo', 'bar');
92+
$eventCollection = new DomainEventCollection([$domainEvent]);
93+
$stream->events()->shouldBeCalled()->willReturn($eventCollection);
94+
$stream->name()->shouldBeCalled()->willReturn($streamName);
95+
$streamName->name()->shouldBeCalled()->willReturn('dummy');
96+
$this->append($stream);
97+
98+
$since->getTimestamp()->willReturn($domainEvent->occurredOn()->getTimestamp() + 10);
99+
100+
$this->eventsSince($since)->shouldReturn([null]);
101+
}
63102
}

0 commit comments

Comments
 (0)