generated from spatie/package-skeleton-php
-
-
Notifications
You must be signed in to change notification settings - Fork 68
/
Copy pathConnection.php
120 lines (85 loc) · 2.72 KB
/
Connection.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
<?php
namespace Spatie\Fork;
use Generator;
use Socket;
class Connection
{
protected int $timeoutSeconds;
protected int $timeoutMicroseconds;
protected function __construct(
protected Socket $socket,
protected int $bufferSize = 1024,
protected float $timeout = 0.0001,
) {
socket_set_nonblock($this->socket);
$this->timeoutSeconds = floor($this->timeout);
$this->timeoutMicroseconds = ($this->timeout * 1_000_000) - ($this->timeoutSeconds * 1_000_000);
}
/**
* @return self[]
*/
public static function createPair(): array
{
socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $sockets);
[$socketToParent, $socketToChild] = $sockets;
return [
new self($socketToParent),
new self($socketToChild),
];
}
public function close(): self
{
socket_close($this->socket);
return $this;
}
public function write(string $payload): self
{
socket_set_nonblock($this->socket);
while ($payload !== '') {
$write = [$this->socket];
$read = null;
$except = null;
$selectResult = @socket_select($read, $write, $except, $this->timeoutSeconds, $this->timeoutMicroseconds);
if ($selectResult === false && socket_last_error() === SOCKET_EINTR) {
continue;
}
if ($selectResult === false) {
break;
}
if ($selectResult <= 0) {
break;
}
$length = strlen($payload);
$amountOfBytesSent = socket_write($this->socket, $payload, $length);
if ($amountOfBytesSent === false || $amountOfBytesSent === $length) {
break;
}
$payload = substr($payload, $amountOfBytesSent);
}
return $this;
}
public function read(): Generator
{
socket_set_nonblock($this->socket);
while (true) {
$read = [$this->socket];
$write = null;
$except = null;
$selectResult = @socket_select($read, $write, $except, $this->timeoutSeconds, $this->timeoutMicroseconds);
if ($selectResult === false && socket_last_error() === SOCKET_EINTR) {
continue;
}
if ($selectResult === false) {
break;
}
if ($selectResult <= 0) {
break;
}
$outputFromSocket = socket_read($this->socket, $this->bufferSize);
if ($outputFromSocket === false || $outputFromSocket === '') {
break;
}
yield $outputFromSocket;
}
}
}