Skip to content

Commit f9b2f96

Browse files
committed
fix bugs
1 parent c219802 commit f9b2f96

22 files changed

+86
-200
lines changed

src/Amqp/AmqpConnectionConfiguration.php

+23-55
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,14 @@
1212

1313
namespace ServiceBus\Transport\Amqp;
1414

15-
use ServiceBus\Transport\Common\Exceptions\InvalidConnectionParameters;
15+
use function ServiceBus\Transport\Common\parseConnectionDSN;
16+
use function ServiceBus\Transport\Common\parseConnectionQuery;
1617

1718
/**
1819
* Amqp connection details.
1920
*/
2021
final class AmqpConnectionConfiguration
2122
{
22-
private const DEFAULT_SCHEMA = 'amqp';
23-
2423
private const DEFAULT_HOST = 'localhost';
2524

2625
private const DEFAULT_PORT = 5672;
@@ -58,7 +57,7 @@ final class AmqpConnectionConfiguration
5857
/**
5958
* @psalm-param non-empty-string $connectionDSN DSN example: amqp://user:password@host:port
6059
*
61-
* @throws \ServiceBus\Transport\Common\Exceptions\InvalidConnectionParameters Incorrect DSN
60+
* @throws \ServiceBus\Transport\Common\Exceptions\IncorrectConnectionParameters Incorrect DSN
6261
*/
6362
public function __construct(string $connectionDSN)
6463
{
@@ -86,6 +85,8 @@ public function __toString(): string
8685
}
8786

8887
/**
88+
* @psalm-param non-empty-string $connectionDSN
89+
*
8990
* @psalm-return array{
9091
* scheme:string,
9192
* user:string,
@@ -97,63 +98,30 @@ public function __toString(): string
9798
* heartbeat:int
9899
* }
99100
*
100-
* @throws \ServiceBus\Transport\Common\Exceptions\InvalidConnectionParameters Incorrect DSN
101+
* @throws \ServiceBus\Transport\Common\Exceptions\IncorrectConnectionParameters
101102
*/
102103
private static function extractConnectionParameters(string $connectionDSN): array
103104
{
104-
$connectionParts = self::parseUrl($connectionDSN);
105-
106-
$queryString = (string) ($connectionParts['query'] ?? '');
105+
$connectionParts = parseConnectionDSN($connectionDSN);
107106

108-
$queryParts = self::parseQuery($queryString);
107+
/**
108+
* @psalm-var array{
109+
* timeout:numeric-string|null,
110+
* vhost:non-empty-string|null,
111+
* heartbeat:numeric-string|null
112+
* } $parsedQuery
113+
*/
114+
$parsedQuery = parseConnectionQuery($connectionParts['query'] ?? '');
109115

110116
return [
111-
'scheme' => (string) ($connectionParts['scheme'] ?? self::DEFAULT_SCHEMA),
112-
'host' => (string) ($connectionParts['host'] ?? self::DEFAULT_HOST),
113-
'port' => (int) ($connectionParts['port'] ?? self::DEFAULT_PORT),
114-
'user' => (string) ($connectionParts['user'] ?? self::DEFAULT_USERNAME),
115-
'password' => (string) ($connectionParts['pass'] ?? self::DEFAULT_PASSWORD),
116-
'timeout' => (int) ($queryParts['timeout'] ?? self::DEFAULT_TIMEOUT),
117-
'vhost' => (string) ($queryParts['vhost'] ?? self::DEFAULT_VIRTUAL_HOST),
118-
'heartbeat' => (int) ($queryParts['heartbeat'] ?? self::DEFAULT_HEARTBEAT_INTERVAL),
117+
'scheme' => $connectionParts['scheme'],
118+
'host' => $connectionParts['host'] ?? self::DEFAULT_HOST,
119+
'port' => $connectionParts['port'] ?? self::DEFAULT_PORT,
120+
'user' => $connectionParts['user'] ?? self::DEFAULT_USERNAME,
121+
'password' => $connectionParts['pass'] ?? self::DEFAULT_PASSWORD,
122+
'timeout' => (int) ($parsedQuery['timeout'] ?? self::DEFAULT_TIMEOUT),
123+
'vhost' => $parsedQuery['vhost'] ?? self::DEFAULT_VIRTUAL_HOST,
124+
'heartbeat' => (int) ($parsedQuery['heartbeat'] ?? self::DEFAULT_HEARTBEAT_INTERVAL),
119125
];
120126
}
121-
122-
/**
123-
* Parse connection DSN parts.
124-
*
125-
* @throws \ServiceBus\Transport\Common\Exceptions\InvalidConnectionParameters Incorrect DSN
126-
*/
127-
private static function parseUrl(string $url): array
128-
{
129-
if ($url === '')
130-
{
131-
throw InvalidConnectionParameters::emptyDSN();
132-
}
133-
134-
$parsedParts = \parse_url($url);
135-
136-
if ($parsedParts !== false)
137-
{
138-
return $parsedParts;
139-
}
140-
141-
throw InvalidConnectionParameters::incorrectDSN($url);
142-
}
143-
144-
/**
145-
* Parse url query parts.
146-
*
147-
* @psalm-return array<string, string|int|float>
148-
*/
149-
private static function parseQuery(string $query): array
150-
{
151-
$output = [];
152-
153-
\parse_str($query, $output);
154-
155-
/** @psalm-var array<string, string|int|float> $output */
156-
157-
return $output;
158-
}
159127
}

src/Amqp/PhpInnacle/PhpInnacleConsumer.php

+2
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ private function createMessageHandler($onMessageReceived): callable
170170
'rawMessagePayload' => $message->content,
171171
]
172172
);
173+
174+
throw $throwable;
173175
}
174176
};
175177
}

src/Amqp/PhpInnacle/PhpInnacleIncomingPackage.php

+10-10
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ public function __construct(Message $message, Channel $channel)
6666
$this->originMessage = $message;
6767
$this->channel = $channel;
6868
/** @psalm-suppress MixedPropertyTypeCoercion */
69-
$this->headers = $message->headers; /* @phpstan-ignore-line */
70-
$this->id = $this->extractFromHeaders(IncomingPackage::HEADER_MESSAGE_ID, uuid());
71-
$this->traceId = $this->extractFromHeaders(IncomingPackage::HEADER_TRACE_ID, uuid());
69+
/* @phpstan-ignore-next-line */
70+
$this->headers = $message->headers;
71+
$this->id = $this->extractUuidFromHeaders(IncomingPackage::HEADER_MESSAGE_ID);
72+
$this->traceId = $this->extractUuidFromHeaders(IncomingPackage::HEADER_TRACE_ID);
7273
}
7374

7475
public function id(): string
@@ -157,21 +158,20 @@ function () use ($requeue): \Generator
157158

158159
/**
159160
* @psalm-param non-empty-string $key
160-
* @psalm-param non-empty-string $withDefault
161161
*
162162
* @psalm-return non-empty-string
163163
*/
164-
private function extractFromHeaders(string $key, string $withDefault): string
164+
private function extractUuidFromHeaders(string $key): string
165165
{
166-
$value = (string) $this->headers[$key];
166+
if (\array_key_exists($key, $this->headers) && \is_string($this->headers[$key]) && $this->headers[$key] !== '')
167+
{
168+
$value = $this->headers[$key];
167169

168-
unset($this->headers[$key]);
170+
unset($this->headers[$key]);
169171

170-
if (!empty($value))
171-
{
172172
return $value;
173173
}
174174

175-
return $withDefault;
175+
return uuid();
176176
}
177177
}

src/Amqp/PhpInnacle/PhpInnacleTransport.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ function () use ($queues, $onMessage): \Generator
165165

166166
$consumer = new PhpInnacleConsumer($queue, $channel, $this->logger);
167167

168-
$consumer->listen($onMessage);
168+
yield $consumer->listen($onMessage);
169169

170170
$this->consumers[$queue->name] = $consumer;
171171
}

src/Nsq/Exceptions/IncorrectConnectionParameters.php renamed to src/Common/Exceptions/IncorrectConnectionParameters.php

+2-10
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,8 @@
11
<?php
22

3-
/**
4-
* AMQP transport implementation.
5-
*
6-
* @author Konstantin Grachev <me@grachevko.ru>
7-
* @license MIT
8-
* @license https://opensource.org/licenses/MIT
9-
*/
3+
declare(strict_types=1);
104

11-
declare(strict_types=0);
12-
13-
namespace ServiceBus\Transport\Nsq\Exceptions;
5+
namespace ServiceBus\Transport\Common\Exceptions;
146

157
final class IncorrectConnectionParameters extends \InvalidArgumentException
168
{

src/Common/Exceptions/InvalidConnectionParameters.php

-35
This file was deleted.

src/Common/functions.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace ServiceBus\Transport\Common;
1414

15-
use ServiceBus\Transport\Redis\Exceptions\IncorrectConnectionParameters;
15+
use ServiceBus\Transport\Common\Exceptions\IncorrectConnectionParameters;
1616

1717
/**
1818
* @psalm-param non-empty-string $connectionDSN
@@ -27,7 +27,7 @@
2727
* query:non-empty-string|null
2828
* }
2929
*
30-
* @throws \ServiceBus\Transport\Redis\Exceptions\IncorrectConnectionParameters
30+
* @throws \ServiceBus\Transport\Common\Exceptions\IncorrectConnectionParameters
3131
*/
3232
function parseConnectionDSN(string $connectionDSN): array
3333
{

src/Nsq/NsqConsumer.php

+8-21
Original file line numberDiff line numberDiff line change
@@ -122,24 +122,12 @@ private function handleMessage(Message $message, string $onChannel, callable $on
122122
{
123123
$decodedPayload = $this->decodeMessageBody($message);
124124

125-
$messageId = self::extractFromHeaders(
126-
key: IncomingPackage::HEADER_MESSAGE_ID,
127-
withDefault: uuid(),
128-
headers: $decodedPayload['headers']
129-
);
130-
131-
$traceId = self::extractFromHeaders(
132-
key: IncomingPackage::HEADER_TRACE_ID,
133-
withDefault: uuid(),
134-
headers: $decodedPayload['headers']
135-
);
136-
137125
asyncCall(
138126
$onMessage,
139127
new NsqIncomingPackage(
140128
$message,
141-
messageId: $messageId,
142-
traceId: $traceId,
129+
messageId: self::extractUuidFromHeaders(IncomingPackage::HEADER_MESSAGE_ID, $decodedPayload['headers']),
130+
traceId: self::extractUuidFromHeaders(IncomingPackage::HEADER_TRACE_ID, $decodedPayload['headers']),
143131
payload: $decodedPayload['body'],
144132
headers: $decodedPayload['headers'],
145133
fromChannel: $onChannel
@@ -209,21 +197,20 @@ private function decodeMessageBody(Message $message): array
209197

210198
/**
211199
* @psalm-param non-empty-string $key
212-
* @psalm-param non-empty-string $withDefault
213200
*
214201
* @psalm-return non-empty-string
215202
*/
216-
private static function extractFromHeaders(string $key, string $withDefault, array &$headers): string
203+
private static function extractUuidFromHeaders(string $key, array &$headers): string
217204
{
218-
$value = (string) $headers[$key];
205+
if (\array_key_exists($key, $headers) && \is_string($headers[$key]) && $headers[$key] !== '')
206+
{
207+
$value = $headers[$key];
219208

220-
unset($headers[$key]);
209+
unset($headers[$key]);
221210

222-
if (!empty($value))
223-
{
224211
return $value;
225212
}
226213

227-
return $withDefault;
214+
return uuid();
228215
}
229216
}

src/Nsq/NsqTransportConnectionConfiguration.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace ServiceBus\Transport\Nsq;
1414

15-
use ServiceBus\Transport\Nsq\Exceptions\IncorrectConnectionParameters;
15+
use ServiceBus\Transport\Common\Exceptions\IncorrectConnectionParameters;
1616
use function ServiceBus\Transport\Common\parseConnectionDSN;
1717
use function ServiceBus\Transport\Common\parseConnectionQuery;
1818

@@ -58,7 +58,7 @@ final class NsqTransportConnectionConfiguration
5858
public $timeout;
5959

6060
/**
61-
* @throws \ServiceBus\Transport\Nsq\Exceptions\IncorrectConnectionParameters
61+
* @throws \ServiceBus\Transport\Common\Exceptions\IncorrectConnectionParameters
6262
*/
6363
public function __construct(string $connectionDSN)
6464
{

src/Redis/Exceptions/IncorrectConnectionParameters.php

-31
This file was deleted.

src/Redis/RedisReceivedPayload.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,12 @@ public function parse(): array
7171
*/
7272
private function extractUuidHeader(string $key, array &$headers): string
7373
{
74-
$value = (string) $headers[$key];
74+
if (\array_key_exists($key, $headers) && \is_string($headers[$key]) && $headers[$key] !== '')
75+
{
76+
$value = $headers[$key];
7577

76-
unset($headers[$key]);
78+
unset($headers[$key]);
7779

78-
if ($value !== '')
79-
{
8080
return $value;
8181
}
8282

src/Redis/RedisTransportConnectionConfiguration.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace ServiceBus\Transport\Redis;
1414

15-
use ServiceBus\Transport\Redis\Exceptions\IncorrectConnectionParameters;
15+
use ServiceBus\Transport\Common\Exceptions\IncorrectConnectionParameters;
1616
use function ServiceBus\Transport\Common\parseConnectionDSN;
1717
use function ServiceBus\Transport\Common\parseConnectionQuery;
1818

@@ -66,7 +66,7 @@ final class RedisTransportConnectionConfiguration
6666
public $password;
6767

6868
/**
69-
* @throws \ServiceBus\Transport\Redis\Exceptions\IncorrectConnectionParameters
69+
* @throws \ServiceBus\Transport\Common\Exceptions\IncorrectConnectionParameters
7070
*/
7171
public function __construct(string $connectionDSN)
7272
{

0 commit comments

Comments
 (0)