-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathQueue.hx
134 lines (120 loc) · 3.64 KB
/
Queue.hx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
* SPDX-FileCopyrightText: © Vegard IT GmbH (https://vegardit.com) and contributors
* SPDX-FileContributor: Sebastian Thomschke, Vegard IT GmbH
* SPDX-License-Identifier: Apache-2.0
*/
package hx.concurrent.collection;
import hx.concurrent.atomic.AtomicInt;
import hx.concurrent.lock.RLock;
import hx.concurrent.thread.Threads;
/**
* Unbound thread-safe first-in-first-out message queue.
*/
class Queue<T> {
#if (cpp || cs || (threads && eval) || java || neko || hl)
final _queue = new sys.thread.Deque<T>();
#elseif python
final _queue:Dynamic;
#else
final _queue = new List<T>();
final _queueLock = new RLock();
#end
public var length(get, never):Int;
var _length = new AtomicInt(0);
inline function get_length():Int return _length;
public function new() {
#if python
python.Syntax.code("import collections");
_queue = untyped collections.deque();
#end
}
#if threads
/**
* Pop a message from the queue head.
*
* By default (with timeoutMS=0) this function is non-blocking, meaning if no message is available in the queue
* `null` is returned immediately.
*
* If <code>timeoutMS</code> is set to value > 0, the function waits up to the given timespan for a new message.
* If <code>timeoutMS</code> is set to `-1`, the function waits indefinitely until a new message is available.
* If <code>timeoutMS</code> is set to value lower than -1, results in an exception.
*/
public function pop(timeoutMS:Int = 0):Null<T> {
var msg:Null<T> = null;
if (timeoutMS < -1)
throw "[timeoutMS] must be >= -1";
if (timeoutMS == 0) {
#if (cpp || cs || (threads && eval) || java || neko || hl)
msg = _queue.pop(false);
#elseif python
msg = try _queue.pop() catch (ex) null;
#else
_queueLock.acquire();
msg = _queue.pop();
_queueLock.release();
#end
} else {
Threads.await(function() {
#if (cpp || cs || (threads && eval) || java || neko || hl)
msg = _queue.pop(false);
#elseif python
msg = try _queue.pop() catch (ex) null;
#else
_queueLock.acquire();
msg = _queue.pop();
_queueLock.release();
#end
return msg != null;
}, timeoutMS);
}
if (msg != null) _length--;
return msg;
}
#else
public function pop():Null<T> {
_queueLock.acquire();
final msg = _queue.pop();
if (msg != null) _length--;
_queueLock.release();
return msg;
}
#end
/**
* Skips the quue and adds the given message to the head of the queue.
*
* @throws exception if given msg is null
*/
public function pushHead(msg:T):Void {
if (msg == null)
throw "[msg] must not be null";
#if (cpp || cs || (threads && eval) || java || neko || hl)
_queue.push(msg);
#elseif python
_queue.append(msg);
#else
_queueLock.acquire();
_queue.push(msg);
_queueLock.release();
#end
_length++;
}
/**
* Add a message at the end of the queue.
*
* @throws exception if given msg is null
*/
public function push(msg:T):Void {
if (msg == null)
throw "[msg] must not be null";
#if (cpp || cs || (threads && eval) || java || neko || hl)
_queue.add(msg);
#elseif python
_queue.appendleft(msg);
#else
_queueLock.acquire();
_queue.add(msg);
_queueLock.release();
#end
_length++;
}
}