Skip to content

Commit f3ff635

Browse files
committed
sync
1 parent e7ed012 commit f3ff635

27 files changed

+617
-93
lines changed

.phpstorm.meta.php

+12-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,17 @@
33
{
44
$STATIC_METHOD_TYPES = [
55
\Zan\Framework\Foundation\Container\Di::make('') => [
6-
"" == "@",
7-
]
6+
'' == '@',
7+
],
8+
9+
// new \ZanPHP\Contracts\Container\Container => [
10+
// '' == '@',
11+
// ],
12+
// \ZanPHP\Contracts\Container\Container::make => [
13+
// '' == '@',
14+
// ],
15+
// \make => [
16+
// '' == '@',
17+
// ]
818
];
919
}

composer.json

+42-4
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,61 @@
99
"type": "git",
1010
"url": "https://git.oschina.net/zan-group/nova-generic.git"
1111
},
12+
"2": {
13+
"type": "git",
14+
"url": "https://git.oschina.net/zan-group/nova-generic.git"
15+
},
16+
"3": {
17+
"type": "git",
18+
"url": "https://github.com/zanphp/cache.git"
19+
},
20+
"4": {
21+
"type": "git",
22+
"url": "https://github.com/zanphp/container.git"
23+
},
24+
"5": {
25+
"type": "git",
26+
"url": "https://github.com/zanphp/contracts.git"
27+
},
28+
"6": {
29+
"type": "git",
30+
"url": "https://github.com/zanphp/etcd-client.git"
31+
},
32+
"7": {
33+
"type": "git",
34+
"url": "https://github.com/zanphp/loadbalance.git"
35+
},
36+
"8": {
37+
"type": "git",
38+
"url": "https://github.com/zanphp/service-chain.git"
39+
},
1240
"packagist": {
1341
"type": "composer",
1442
"url": "https://packagist.phpcomposer.com"
1543
}
1644
},
1745
"require": {
18-
"nova-service/generic": "dev-master",
19-
"zanphp/nova": "dev-master",
2046
"packaged/thrift": "0.9.2.1",
2147
"symfony/console": "3.1",
22-
"psr/log": "1.0.0"
48+
"psr/log": "1.0.0",
49+
"nova-service/generic": "dev-master",
50+
51+
52+
"zanphp/cache": "dev-master",
53+
"zanphp/container": "dev-master",
54+
"zanphp/contracts": "dev-master",
55+
"zanphp/etcd-client": "dev-master",
56+
"zanphp/loadbalance": "dev-master",
57+
"zanphp/nova": "dev-master",
58+
"zanphp/service-chain": "dev-master"
59+
2360
},
2461
"autoload": {
2562
"psr-4": {
2663
"Zan\\Framework\\": "src/"
2764
},
28-
"classmap": [],
65+
"classmap": [
66+
],
2967
"files": [
3068
"src/Foundation/Coroutine/Command/Task.php",
3169
"src/Utilities/helpers.php"

src/Foundation/Application.php

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Zan\Framework\Foundation\Booting\CheckIfBootable;
77
use Zan\Framework\Foundation\Booting\InitializeCliInput;
88
use Zan\Framework\Foundation\Booting\InitializeCache;
9+
use Zan\Framework\Foundation\Booting\InitializeContainer;
910
use Zan\Framework\Foundation\Booting\InitializeKv;
1011
use Zan\Framework\Foundation\Booting\InitializeSyslog;
1112
use Zan\Framework\Foundation\Booting\LoadFiles;
@@ -97,6 +98,7 @@ protected function bootstrap()
9798
InitializePathes::class,
9899
LoadConfiguration::class,
99100
InitializeSharedObjects::class,
101+
InitializeContainer::class,
100102
RegisterClassAliases::class,
101103
LoadFiles::class,
102104
InitializeCache::class,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
namespace Zan\Framework\Foundation\Booting;
4+
5+
use Zan\Framework\Contract\Foundation\Bootable;
6+
use Zan\Framework\Foundation\Application;
7+
use Zan\Framework\Foundation\Core\Config;
8+
use ZanPHP\Container\Container;
9+
10+
class InitializeContainer implements Bootable
11+
{
12+
13+
/**
14+
* Bootstrap the given application.
15+
*
16+
* @param \Zan\Framework\Foundation\Application $app
17+
*/
18+
public function bootstrap(Application $app)
19+
{
20+
$binds = Config::get("zan_container", []);
21+
$container = Container::getInstance();
22+
foreach ($binds as $abstract => $bindArgs) {
23+
$container->bind($abstract, ...(array)$bindArgs);
24+
}
25+
}
26+
}

src/Foundation/Coroutine/Command/Task.php

+6-6
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ function getRpcContext($key = null, $default = null)
7171
{
7272
return new SysCall(function (Task $task) use($key, $default) {
7373
$context = $task->getContext();
74-
$rpcCtx = $context->get(RpcContext::KEY, null, RpcContext::class);
74+
$rpcCtx = $context->get("rpc-context", null, RpcContext::class);
7575
if ($rpcCtx) {
7676
$task->send($rpcCtx->get($key, $default));
7777
} else {
@@ -86,10 +86,10 @@ function setRpcContext($key, $value)
8686
{
8787
return new SysCall(function (Task $task) use ($key, $value) {
8888
$context = $task->getContext();
89-
$rpcCtx = $context->get(RpcContext::KEY, null, RpcContext::class);
89+
$rpcCtx = $context->get("rpc-context", null, RpcContext::class);
9090
if ($rpcCtx === null) {
9191
$rpcCtx = new RpcContext;
92-
$context->set(RpcContext::KEY, $rpcCtx);
92+
$context->set("rpc-context", $rpcCtx);
9393
}
9494
$task->send($rpcCtx->set($key, $value));
9595
return Signal::TASK_CONTINUE;
@@ -155,10 +155,10 @@ function wait()
155155
});
156156
}
157157

158-
function parallel($coroutines)
158+
function parallel($coroutines, &$fetchCtx = [])
159159
{
160-
return new SysCall(function (Task $task) use ($coroutines) {
161-
(new Parallel($task))->call($coroutines);
160+
return new SysCall(function (Task $task) use ($coroutines, &$fetchCtx) {
161+
(new Parallel($task))->call($coroutines, $fetchCtx);
162162

163163
return Signal::TASK_WAIT;
164164
});

src/Foundation/Coroutine/Parallel.php

+29-12
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,32 @@
44

55
use Zan\Framework\Foundation\Exception\ParallelException;
66
use Zan\Framework\Foundation\Exception\System\InvalidArgumentException;
7+
use Zan\Framework\Utilities\DesignPattern\Context;
78

89
class Parallel
910
{
1011
private $task;
1112
private $childTasks = [];
1213
private $sendValues = [];
1314
private $exceptions = [];
15+
private $fetchCtx = [];
16+
17+
/**
18+
* @var Context
19+
*/
20+
private $taskContext;
1421

1522
public function __construct(Task $task)
1623
{
1724
$this->task = $task;
1825
}
1926

20-
public function call($coroutines)
27+
public function call($coroutines, &$fetchCtx = [])
2128
{
29+
$this->fetchCtx = &$fetchCtx;
30+
2231
$parentTaskId = $this->task->getTaskId();
23-
$taskContext = $this->task->getContext();
32+
$this->taskContext = $this->task->getContext();
2433

2534
$taskDoneEventName = 'parallel_task_done_' . $parentTaskId;
2635
$event = $this->task->getContext()->getEvent();
@@ -38,12 +47,14 @@ public function call($coroutines)
3847
continue;
3948
}
4049

41-
$childTask = new Task($this->catchException($key, $coroutine), $taskContext, 0, $this->task);
50+
$childTask = new Task($this->catchException($key, $coroutine), $this->taskContext, 0, $this->task);
4251
$this->childTasks[$key] = $childTask;
4352

4453
$newTaskId = $childTask->getTaskId();
4554
$evtName = 'task_event_' . $newTaskId;
4655
$eventChain->before($evtName, $taskDoneEventName);
56+
57+
4758
}
4859

4960
if ($this->childTasks == []) {
@@ -85,14 +96,20 @@ public function done()
8596
private function catchException($key, \Generator $coroutine)
8697
{
8798
try {
88-
yield $coroutine;
89-
return;
90-
} catch (\Throwable $t) {
91-
$ex = t2ex($t);
92-
} catch (\Exception $ex) { }
93-
94-
echo_exception($ex);
95-
$this->exceptions[$key] = $ex;
96-
yield $ex;
99+
$r = (yield $coroutine);
100+
} catch (\Throwable $r) {
101+
$r = t2ex($r);
102+
$this->exceptions[$key] = $r;
103+
echo_exception($r);
104+
} catch (\Exception $r) {
105+
$this->exceptions[$key] = $r;
106+
echo_exception($r);
107+
}
108+
109+
foreach ($this->fetchCtx as $ctxKey) {
110+
$this->fetchCtx[$ctxKey] = $this->taskContext->get($ctxKey);
111+
}
112+
113+
yield $r;
97114
}
98115
}

src/Network/Common/HttpClient.php

+17-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Zan\Framework\Network\Common\Exception\HostNotFoundException;
1111
use Zan\Framework\Network\Server\Timer\Timer;
1212
use Zan\Framework\Network\Common\Exception\HttpClientTimeoutException;
13+
use Zan\Framework\Network\Tcp\RpcContext;
1314
use Zan\Framework\Sdk\Trace\Constant;
1415
use Zan\Framework\Sdk\Trace\DebuggerTrace;
1516
use Zan\Framework\Sdk\Trace\Trace;
@@ -41,6 +42,9 @@ class HttpClient implements Async
4142

4243
private $callback;
4344

45+
/** @var RpcContext */
46+
private $rpcContext;
47+
4448
/** @var Trace */
4549
private $trace;
4650
private $traceHandle;
@@ -182,6 +186,7 @@ public function handle(Context $ctx = null)
182186
if ($ctx) {
183187
$this->trace = $ctx->get("trace");
184188
$this->debuggerTrace = $ctx->get('debugger_trace');
189+
$this->rpcContext = $ctx->get("rpc-context");
185190
}
186191

187192
if ($this->useHttpProxy) {
@@ -217,10 +222,8 @@ public function request($ip, $port)
217222
} else {
218223
$this->client = new \swoole_http_client($ip, $port, $this->ssl);
219224
}
225+
220226
$this->buildHeader();
221-
if (null !== $this->timeout) {
222-
Timer::after($this->timeout, [$this, 'checkTimeout'], spl_object_hash($this));
223-
}
224227

225228
if ($this->trace) {
226229
$this->traceHandle = $this->trace->transactionBegin(Constant::HTTP_CALL, $this->host . $this->uri);
@@ -236,6 +239,10 @@ public function request($ip, $port)
236239
]);
237240
}
238241

242+
if (null !== $this->timeout) {
243+
Timer::after($this->timeout, [$this, 'checkTimeout'], spl_object_hash($this));
244+
}
245+
239246
if('GET' === $this->method){
240247
if ($this->trace) {
241248
$this->trace->logEvent(Constant::GET, Constant::SUCCESS);
@@ -275,6 +282,13 @@ private function buildHeader()
275282
$this->header[DebuggerTrace::KEY] = $this->debuggerTrace->getKey();
276283
}
277284

285+
if ($this->rpcContext instanceof RpcContext) {
286+
$pairs = $this->rpcContext->get();
287+
foreach ($pairs as $key => $value) {
288+
$this->header[$key] = $value;
289+
}
290+
}
291+
278292
$this->client->setHeaders($this->header);
279293
}
280294

src/Network/Connection/Driver/NovaClient.php

+24-7
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
use Kdt\Iron\Nova\Network\Client as NovaPingClient;
1111
use Kdt\Iron\Nova\Exception\NetworkException;
1212
use Zan\Framework\Utilities\Types\Time;
13+
use ZanPHP\Contracts\LoadBalance\Node;
1314

14-
class NovaClient extends Base implements Connection
15+
class NovaClient extends Base implements Connection, Node
1516
{
1617
private $clientCb;
1718
protected $isAsync = true;
@@ -34,15 +35,17 @@ protected function closeSocket()
3435
}
3536
}
3637

37-
public function init() {
38+
public function init()
39+
{
3840
//set callback
3941
$this->getSocket()->on('connect', [$this, 'onConnect']);
4042
$this->getSocket()->on('receive', [$this, 'onReceive']);
4143
$this->getSocket()->on('close', [$this, 'onClose']);
4244
$this->getSocket()->on('error', [$this, 'onError']);
4345
}
4446

45-
public function onConnect(SwooleClient $cli) {
47+
public function onConnect(SwooleClient $cli)
48+
{
4649
//put conn to active_pool
4750
Timer::clearAfterJob($this->getConnectTimeoutJobId());
4851
Timer::clearAfterJob($this->getHeartbeatingJobId());
@@ -54,14 +57,16 @@ public function onConnect(SwooleClient $cli) {
5457
$this->inspect("connect to server", $cli);
5558
}
5659

57-
public function onClose(SwooleClient $cli){
60+
public function onClose(SwooleClient $cli)
61+
{
5862
Timer::clearAfterJob($this->getConnectTimeoutJobId());
5963
Timer::clearAfterJob($this->getHeartbeatingJobId());
6064
$this->close();
6165
$this->inspect("close", $cli);
6266
}
6367

64-
public function onReceive(SwooleClient $cli, $data) {
68+
public function onReceive(SwooleClient $cli, $data)
69+
{
6570
try {
6671
call_user_func($this->clientCb, $data);
6772
} catch (\Throwable $t) {
@@ -71,17 +76,20 @@ public function onReceive(SwooleClient $cli, $data) {
7176
}
7277
}
7378

74-
public function onError(SwooleClient $cli){
79+
public function onError(SwooleClient $cli)
80+
{
7581
Timer::clearAfterJob($this->getConnectTimeoutJobId());
7682
Timer::clearAfterJob($this->getHeartbeatingJobId());
7783
$this->close();
7884

7985
$this->inspect("error", $cli, true);
8086
}
8187

82-
public function setClientCb(callable $cb) {
88+
public function setClientCb(callable $cb)
89+
{
8390
$this->clientCb = $cb;
8491
}
92+
8593
public function heartbeat()
8694
{
8795
Timer::after($this->config['heartbeat-time'], [$this, 'heartbeating'], $this->getHeartbeatingJobId());
@@ -153,4 +161,13 @@ private function inspect($desc, SwooleClient $cli, $error = false)
153161

154162
sys_echo("nova client $desc [" . implode(", ", $buffer) . "]");
155163
}
164+
165+
/**
166+
* 0 ~ 100
167+
* @return int|null
168+
*/
169+
public function getWeight()
170+
{
171+
return isset($this->config["weight"]) ? $this->config["weight"] : 100;
172+
}
156173
}

0 commit comments

Comments
 (0)