-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathRole.cs
296 lines (266 loc) · 11.8 KB
/
Role.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
using LanguageExt.ClassInstances;
using System.Collections.Generic;
using System.Linq;
using static LanguageExt.Prelude;
using static Echo.Process;
using LanguageExt;
namespace Echo
{
/// <summary>
/// <para>
/// Each node in the cluster has a role name and at all times the cluster-nodes
/// have a list of the alive nodes and their roles (Process.ClusterNodes). Nodes
/// are removed from Process.ClusterNodes if they don't phone in. Process.ClusterNodes
/// is at most 3 seconds out-of-date and can therefore be used to reliably find
/// out which nodes are available and what roles they do.
/// </para>
/// <para>
/// By using Role.First, Role.Broadcast, Role.LeastBusy, Role.Random and Role.RoundRobin
/// you can build a ProcessId that is resolved at the time of doing a 'tell', 'ask',
/// 'subscribe', etc. This can allow reliable messaging to Processes in the cluster.
/// </para>
/// </summary>
public static class Role
{
/// <summary>
/// The role that this node is a part of
/// </summary>
public static ProcessName Current
{
get;
private set;
}
/// <summary>
/// A ProcessId that represents a set of nodes in a cluster. When used for
/// operations like 'tell', the message is dispatched to all nodes in the set.
/// See remarks.
/// </summary>
/// <remarks>
/// You may create a reference to child nodes in the usual way:
/// Role.Broadcast["my-role"]["user"]["child-1"][...]
/// </remarks>
/// <example>
/// tell( Role.Broadcast["message-role"]["user"]["message-log"], "Hello" );
/// </example>
public static readonly ProcessId Broadcast;
/// <summary>
/// A ProcessId that represents a set of nodes in a cluster. When used for
/// operations like 'tell', the message is dispatched to the least busy from
/// the set.
/// See remarks.
/// </summary>
/// <remarks>
/// You may create a reference to child nodes in the usual way:
/// Role.LeastBusy["my-role"]["user"]["child-1"][...]
/// </remarks>
/// <example>
/// tell( Role.LeastBusy["message-role"]["user"]["message-log"], "Hello" );
/// </example>
public static readonly ProcessId LeastBusy;
/// <summary>
/// A ProcessId that represents a set of nodes in a cluster. When used for
/// operations like 'tell', the message is dispatched to a cryptographically
/// random node from the set.
/// See remarks.
/// </summary>
/// <remarks>
/// You may create a reference to child nodes in the usual way:
/// Role.Random["my-role"]["user"]["child-1"][...]
/// </remarks>
/// <example>
/// tell( Role.Random["message-role"]["user"]["message-log"], "Hello" );
/// </example>
public static readonly ProcessId Random;
/// <summary>
/// A ProcessId that represents a set of nodes in a cluster. When used for
/// operations like 'tell', the message is dispatched to the nodes in a round-
/// robin fashion
/// See remarks.
/// </summary>
/// <remarks>
/// You may create a reference to child nodes in the usual way:
/// Role.RoundRobin["my-role"]["user"]["child-1"][...]
/// </remarks>
/// <example>
/// tell( Role.RoundRobin["message-role"]["user"]["message-log"], "Hello" );
/// </example>
public static readonly ProcessId RoundRobin;
/// <summary>
/// A ProcessId that represents a set of nodes in a cluster. When used for
/// operations like 'tell', the node names are sorted in ascending order and
/// the message is dispatched to the first one. This can be used for leader
/// election for example.
/// See remarks.
/// </summary>
/// <remarks>
/// You may create a reference to child nodes in the usual way:
/// Role.First["my-role"]["user"]["child-1"][...]
/// </remarks>
/// <example>
/// tell( Role.First["message-role"]["user"]["message-log"], "Hello" );
/// </example>
public static readonly ProcessId First;
/// <summary>
/// A ProcessId that represents a set of nodes in a cluster. When used for
/// operations like 'tell', the node names are sorted in ascending order and
/// the message is dispatched to the second one.
/// See remarks.
/// </summary>
/// <remarks>
/// You may create a reference to child nodes in the usual way:
/// Role.Second["my-role"]["user"]["child-1"][...]
/// </remarks>
/// <example>
/// tell( Role.Second["message-role"]["user"]["message-log"], "Hello" );
/// </example>
public static readonly ProcessId Second;
/// <summary>
/// A ProcessId that represents a set of nodes in a cluster. When used for
/// operations like 'tell', the node names are sorted in ascending order and
/// the message is dispatched to the third one.
/// See remarks.
/// </summary>
/// <remarks>
/// You may create a reference to child nodes in the usual way:
/// Role.Third["my-role"]["user"]["child-1"][...]
/// </remarks>
/// <example>
/// tell( Role.Third["message-role"]["user"]["message-log"], "Hello" );
/// </example>
public static readonly ProcessId Third;
/// <summary>
/// A ProcessId that represents a set of nodes in a cluster. When used for
/// operations like 'tell', the node names are sorted in descending order and
/// the message is dispatched to the first one.
/// See remarks.
/// </summary>
/// <remarks>
/// You may create a reference to child nodes in the usual way:
/// Role.Last["my-role"]["user"]["child-1"][...]
/// </remarks>
/// <example>
/// tell( Role.Last["message-role"]["user"]["message-log"], "Hello" );
/// </example>
public static readonly ProcessId Last;
/// <summary>
/// Builds a ProcessId that represents the next node in the role that this node
/// is a part of. If there is only one node in the role then any messages sent
/// will be sent to the leaf-process with itself. Unlike other Roles, you do
/// not specify the role-name as the first child.
/// See remarks.
/// </summary>
/// <remarks>
/// You may create a reference to child nodes in the usual way:
/// Role.Next["user"]["child-1"][...]
/// </remarks>
/// <example>
/// tell( Role.Next["user"]["message-log"], "Hello" );
/// </example>
public static ProcessId Next(SystemName system = default(SystemName)) =>
nextRoot[Root(system).Name];
/// <summary>
/// Builds a ProcessId that represents the previous node in the role that this
/// node is a part of. If there is only one node in the role then any messages
/// sent will be sent to the leaf-process with itself. Unlike other Roles, you
/// do not specify the role-name as the first child.
/// See remarks.
/// </summary>
/// <remarks>
/// You may create a reference to child nodes in the usual way:
/// Role.Prev["user"]["child-1"][...]
/// </remarks>
/// <example>
/// tell( Role.Prev["user"]["message-log"], "Hello" );
/// </example>
public static ProcessId Prev(SystemName system = default(SystemName)) =>
prevRoot[Root(system).Name];
public static IEnumerable<ProcessId> NodeIds(ProcessId leaf) =>
Nodes(leaf).Values.Map(node => ProcessId.Top[node.NodeName].Append(leaf.Skip(1)));
public static HashMap<ProcessName, ClusterNode> Nodes(ProcessId leaf, SystemName system = default(SystemName)) =>
ClusterNodes(system).Filter(node => node.Role == leaf.Take(1).Name);
static readonly ProcessId nextRoot;
static readonly ProcessId prevRoot;
internal static Unit init(ProcessName name)
{
Current = name;
return unit;
}
/// <summary>
/// Static ctor
/// Sets up the default roles
/// </summary>
static Role()
{
ProcessName first = "role-first";
ProcessName second = "role-second";
ProcessName third = "role-third";
ProcessName last = "role-last";
ProcessName next = "role-next";
ProcessName prev = "role-prev";
ProcessName broadcast = "role-broadcast";
ProcessName leastBusy = "role-least-busy";
ProcessName random = "role-random";
ProcessName roundRobin = "role-round-robin";
var nextNode = fun((bool fwd) => fun((ProcessId leaf) =>
{
var self = leaf.Take(1).Name;
var isNext = false;
var nodeMap = Nodes(leaf);
var nodes = fwd
? MEnumerable<ClusterNode>.Inst.Append(nodeMap.Values, nodeMap.Values)
: MEnumerable<ClusterNode>.Inst.Append(nodeMap.Values, nodeMap.Values).Reverse(); //< TODO: Inefficient
foreach (var node in nodes)
{
if (isNext)
{
return new[] { ProcessId.Top[node.NodeName].Append(leaf.Skip(1)) }.AsEnumerable();
}
if (node.NodeName == self)
{
isNext = true;
}
}
return new ProcessId[0].AsEnumerable();
}));
// Next
nextRoot = Dispatch.register(next, nextNode(true));
// Prev
prevRoot = Dispatch.register(prev, nextNode(false));
// First
First = Dispatch.register(first, leaf => NodeIds(leaf).Take(1));
// Second
Second = Dispatch.register(second, leaf => NodeIds(leaf).Skip(1).Take(1));
// Third
Third = Dispatch.register(third, leaf => NodeIds(leaf).Skip(2).Take(1));
// Last
Last = Dispatch.register(last, leaf => NodeIds(leaf).Reverse().Take(1));
// Broadcast
Broadcast = Dispatch.register(broadcast, NodeIds);
// Least busy
LeastBusy = Dispatch.register(leastBusy, leaf =>
NodeIds(leaf)
.Map(pid => Tuple(inboxCount(pid), pid))
.OrderBy(tup => tup.Item1)
.Map(tup => tup.Item2)
.Take(1));
// Random
Random = Dispatch.register(random, leaf => {
var workers = NodeIds(leaf).ToArray();
return new ProcessId[1] { workers[Prelude.random(workers.Length)] };
});
// Round-robin
object sync = new object();
HashMap<string, int> roundRobinState = HashMap<string, int>();
RoundRobin = Dispatch.register(roundRobin, leaf => {
var key = leaf.ToString();
var workers = NodeIds(leaf).ToArray();
int index = 0;
lock (sync)
{
roundRobinState = roundRobinState.AddOrUpdate(key, x => { index = x % workers.Length; return x + 1; }, 0);
}
return new ProcessId[1] { workers[index] };
});
}
}
}