-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzmq.sml
442 lines (386 loc) · 15.9 KB
/
zmq.sml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
(* Copyright (C) 2013 KC Sivaramakrishnan.
* Copyright (C) 1999-2008 Henry Cejtin, Matthew Fluet, Suresh
* Jagannathan, and Stephen Weeks.
* Copyright (C) 1997-2000 NEC Research Institute.
*
* MLton is released under a BSD-style license.
* See the file MLton-LICENSE for details.
*)
structure MLtonZMQ : MLTON_ZMQ =
struct
structure SysCall = PosixError.SysCall
fun exnWrapper f = f () (* handle PosixError.SysErr (str, errOpt) => raise ZMQError (str, errOpt) *)
(* KC: For readability, primitive function names must be made to correspond
* to the C function names they correspond to *)
structure Prim =
struct
open PrimitiveFFI.MLton.ZMQ
val send = _prim "MLton_ZMQ_Send" : 'a ref * Word8.word vector * C_ZMQ_Socket.t * C_Int.t -> (C_Int.t) C_Errno.t;
val deserializeZMQMsg = _prim "MLton_deserializeZMQMsg" : C_ZMQ_Message.t -> 'a;
end
(* Context Management *)
type context = C_ZMQ_Context.t
datatype context_option = IO_THREADS | MAX_SOCKETS
fun contextOptionToInt opt =
case opt of
IO_THREADS => Prim.IO_THREADS
| MAX_SOCKETS => Prim.MAX_SOCKETS
(* Need to use simpleResultRestart' for the following reasons
* (1) We need a return value.
* (2) We need to specify an explicit errVal (null in this case), which is
* not the default errVal of ~1.
* (3) We need to restart the call if we are interrupted by a signal. We
* expect signals *always* due to MultiMLton's user-level threading.
*)
fun ctxNew () =
exnWrapper (fn () => SysCall.simpleResultRestart' ({errVal = CUtil.C_Pointer.null}, Prim.ctx_new))
(* Using simpleRestart due to the following reasons
* (1) errVal is default (~1).
* (2) No result is expected.
*)
fun ctxDestroy context =
exnWrapper (fn () => SysCall.simpleRestart (fn () => Prim.ctx_destroy context))
fun ctxSetOpt (ctx, opt, v) =
exnWrapper (fn () => SysCall.simpleRestart (fn () => Prim.ctx_set (ctx, contextOptionToInt opt, v)))
fun ctxGetOpt (ctx, opt) =
exnWrapper (fn () => SysCall.simpleResultRestart (fn () => Prim.ctx_get (ctx, contextOptionToInt opt)))
(* Sockets *)
datatype socket_kind = Pair | Pub | Sub | Req | Rep
| Dealer | Router | Pull | Push
| XPub | XSub
datatype socket = SOCKET of {hndl : C_ZMQ_Socket.t, kind : socket_kind}
fun socketKindToInt k =
case k of
Pair => Prim.PAIR
| Pub => Prim.PUB
| Sub => Prim.SUB
| Req => Prim.REQ
| Rep => Prim.REP
| Dealer => Prim.DEALER
| Router => Prim.ROUTER
| Pull => Prim.PULL
| Push => Prim.PUSH
| XPub => Prim.XPUB
| XSub => Prim.XSUB
fun sockCreate (ctxt, kind) =
let
val hndl = exnWrapper (fn () => SysCall.simpleResultRestart'
({errVal = CUtil.C_Pointer.null},
fn () => Prim.socket (ctxt, socketKindToInt kind)))
in
SOCKET {hndl = hndl, kind = kind}
end
fun sockClose (SOCKET {hndl, ...}) =
exnWrapper (fn () => SysCall.simpleRestart (fn () => Prim.close hndl))
fun sockConnect (SOCKET {hndl, ...}, endPoint) =
exnWrapper (fn () => SysCall.simpleRestart (fn () => Prim.connect (hndl, NullString.nullTerm endPoint)))
fun sockDisconnect (SOCKET {hndl, ...}, endPoint) =
exnWrapper (fn () => SysCall.simpleRestart (fn () => Prim.disconnect (hndl, NullString.nullTerm endPoint)))
fun sockBind (SOCKET {hndl,...}, endPoint) =
exnWrapper (fn () => SysCall.simpleRestart (fn () => Prim.bind (hndl, NullString.nullTerm endPoint)))
fun sockUnbind (SOCKET {hndl,...}, endPoint) =
exnWrapper (fn () => SysCall.simpleRestart (fn () => Prim.unbind (hndl, NullString.nullTerm endPoint)))
(* Socket Options *)
local
(* host byte order *)
type optvalVec = Word8.word vector
type optvalArr = Word8.word array
val bitsPerByte = 8
val isBigEndian = Primitive.MLton.Platform.Arch.hostIsBigEndian
(* int *)
val intLen = Int.quot (C_Int.precision', bitsPerByte)
fun unmarshalInt (wa: optvalArr) : C_Int.int =
let
fun loop (i, acc) =
if i >= intLen
then acc
else let
val w =
Array.sub
(wa, if isBigEndian
then i
else (intLen - 1) - i)
val w = C_Int.castFromSysWord (Word8.castToSysWord w)
in
loop (i + 1, C_Int.orb (w, C_Int.<< (acc, 0w4)))
end
in
loop (0, 0)
end
fun marshalInt (i: C_Int.int) : optvalVec =
let
val wa = Array.array (intLen, 0wx0)
fun loop (i, acc) =
if i >= intLen
then ()
else let
val w = Word8.castFromSysWord (C_Int.castToSysWord acc)
val () =
Array.update
(wa, if isBigEndian
then (intLen - 1) - i
else i, w)
in
loop (i + 1, C_Int.>> (acc, 0w4))
end
in
loop (0, i)
; Array.vector wa
end
(* bool *)
val boolLen = intLen
fun unmarshalBool (wa: optvalArr) : bool =
if (unmarshalInt wa) = 0 then false else true
fun marshalBool (b: bool) : optvalVec =
marshalInt (if b then 1 else 0)
(* word64 *)
val word64Len = 8 (* Word64.sizeInBits / bitsPerByte *)
fun unmarshalWord64 (wa: optvalArr) : Word64.word =
let
fun loop (i, acc) =
if i >= word64Len
then acc
else let
val w =
Array.sub
(wa, if isBigEndian
then i
else (word64Len - 1) - i)
val w = Word64.castFromSysWord (Word8.castToSysWord w)
in
loop (i + 1, Word64.orb (w, Word64.<< (acc, 0w4)))
end
in
loop (0, 0wx0)
end
fun marshalWord64 (i: Word64.word) : optvalVec =
let
val wa = Array.array (word64Len, 0wx0)
fun loop (i, acc) =
if i >= word64Len
then ()
else let
val w = Word8.castFromSysWord (Word64.castToSysWord acc)
val () =
Array.update
(wa, if isBigEndian
then (word64Len - 1) - i
else i, w)
in
loop (i + 1, Word64.>> (acc, 0w4))
end
in
loop (0, i)
; Array.vector wa
end
(* Time *)
val optTimeLen: int = intLen
fun unmarshalOptTime (wa: optvalArr) : Time.time option =
let
val milliSecs = unmarshalInt wa
in
SOME (Time.fromMilliseconds (C_Int.toLarge milliSecs))
end
fun marshalOptTime (to: Time.time option) : optvalVec =
let
val millisecs = case to of
NONE => ~1
| SOME t => C_Int.fromLarge (Time.toMilliseconds t)
in
marshalInt millisecs
end
fun make (optlen: int,
marshal: 'a -> optvalVec,
unmarshal: optvalArr -> 'a) =
let
fun getSockOpt optname (SOCKET{hndl, ...}) : 'a =
let
val optval = Array.array (optlen, 0wx0)
val optlen' = ref (C_Size.fromInt optlen)
val () =
exnWrapper (fn () => SysCall.simpleRestart
(fn () => Prim.getSockOpt (hndl, optname, optval, optlen')))
val () =
if C_Size.toInt (!optlen') <> optlen
then raise (Fail "ZMQ.getSockOpt: optlen' <> optlen")
else ()
in
unmarshal optval
end
fun setSockOpt optname (SOCKET {hndl, ...}, optval: 'a) : unit =
let
val optval = marshal optval
val optlen' = C_Size.fromInt optlen
val () =
exnWrapper (fn () => SysCall.simpleRestart
(fn () => Prim.setSockOpt (hndl, optname, optval, optlen')))
in
()
end
in
(getSockOpt, setSockOpt)
end
in
val (getSockOptInt, setSockOptInt) = make (intLen, marshalInt, unmarshalInt)
val (getSockOptBool, setSockOptBool) = make (intLen, marshalBool, unmarshalBool)
val (getSockOptWord64, setSockOptWord64) = make (word64Len, marshalWord64, unmarshalWord64)
val (getSockOptTime, setSockOptTime) = make (intLen, marshalOptTime, unmarshalOptTime)
end
datatype socket_event = POLL_IN | POLL_OUT | POLL_IN_OUT | NO_EVENT
fun sockEventFromInt event =
if event = Prim.POLLIN then POLL_IN
else if event = Prim.POLLOUT then POLL_OUT
else if event = C_Int.orb (Prim.POLLIN, Prim.POLLOUT) then POLL_IN_OUT
else NO_EVENT
fun getSockOptWord8Vector optname optlen (SOCKET {hndl, ...}) : Word8.word vector =
let
val optval = Array.array (optlen, 0wx0)
val optlen' = ref (C_Size.fromInt optlen)
val () =
exnWrapper (fn () => SysCall.simpleRestart
(fn () => Prim.getSockOpt (hndl, optname, optval, optlen')))
val optlen = C_Size.toInt (!optlen')
in
ArraySlice.vector (ArraySlice.slice (optval, 0, SOME optlen))
end
fun setSockOptWord8Vector optname (SOCKET {hndl, ...}, optval : Word8.word vector) : unit =
let
val optlen = Vector.length optval
in
exnWrapper (fn () => SysCall.simpleRestart
(fn () => Prim.setSockOpt (hndl, optname, optval, C_Size.fromInt optlen)))
end
val sockGetType = fn (SOCKET {kind, ...}) => kind
val sockGetRcvMore = getSockOptBool Prim.RCVMORE
val sockGetSndHwm = getSockOptInt Prim.SNDHWM
val sockGetRcvHwm = getSockOptInt Prim.RCVHWM
val sockGetAffinity = getSockOptWord64 Prim.AFFINITY
val sockGetIdentity = getSockOptWord8Vector Prim.IDENTITY 256
val sockGetRate = getSockOptInt Prim.RATE
val sockGetRecoveryIvl = getSockOptInt Prim.RECOVERY_IVL
val sockGetSndBuf = getSockOptInt Prim.SNDBUF
val sockGetRcvBuf = getSockOptInt Prim.RCVBUF
val sockGetLinger = getSockOptInt Prim.LINGER
val sockGetReconnectIvl = getSockOptInt Prim.RECONNECT_IVL
val sockGetReconnectIvlMax = getSockOptInt Prim.RECONNECT_IVL_MAX
val sockGetBacklog = getSockOptInt Prim.BACKLOG
(* TODO: Implement getSockOptInt64 *)
val sockGetMaxMsgSize = getSockOptInt Prim.MAXMSGSIZE
val sockGetMulticastHops = getSockOptInt Prim.MULTICAST_HOPS
val sockGetRcvTimeo = getSockOptInt Prim.RCVTIMEO
val sockGetSndTimeo = getSockOptInt Prim.SNDTIMEO
val sockGetIPV4Only = getSockOptBool Prim.IPV4ONLY
val sockGetDelayAttachOnConnect = getSockOptBool Prim.DELAY_ATTACH_ON_CONNECT
val sockGetFD = fn s => Socket.fromRep (C_Sock.castFromSysWord (C_Int.castToSysWord (getSockOptInt Prim.FD s)))
val sockGetEvents = fn s => sockEventFromInt (getSockOptInt Prim.EVENTS s)
val sockGetLastEndpoint = fn s => Byte.bytesToString (Word8Vector.fromPoly (getSockOptWord8Vector Prim.LAST_ENDPOINT 256 s))
val sockGetTCPKeepalive = getSockOptInt Prim.TCP_KEEPALIVE
val sockGetTCPKeepaliveIdle = getSockOptInt Prim.TCP_KEEPALIVE_IDLE
val sockGetTCPKeepaliveCnt = getSockOptInt Prim.TCP_KEEPALIVE_CNT
val sockGetTCPKeepaliveIntvl = getSockOptInt Prim.TCP_KEEPALIVE_INTVL
val sockSetSndHwm = setSockOptInt Prim.SNDHWM
val sockSetRcvHwm = setSockOptInt Prim.RCVHWM
val sockSetAffinity = setSockOptWord64 Prim.AFFINITY
val sockSetSubscribe = setSockOptWord8Vector Prim.SUBSCRIBE
val sockSetUnsubscribe = setSockOptWord8Vector Prim.UNSUBSCRIBE
val sockSetRate = setSockOptInt Prim.RATE
val sockSetRecoveryIvl = setSockOptInt Prim.RECOVERY_IVL
val sockSetSndBuf = setSockOptInt Prim.SNDBUF
val sockSetRcvBuf = setSockOptInt Prim.RCVBUF
val sockSetLinger = setSockOptInt Prim.LINGER
val sockSetReconnectIvl = setSockOptInt Prim.RECONNECT_IVL
val sockSetReconnectIvlMax = setSockOptInt Prim.RECONNECT_IVL_MAX
val sockSetBacklog = setSockOptInt Prim.BACKLOG
(* TODO: Implement setSockOptInt64 *)
val sockSetMaxMsgSize = setSockOptInt Prim.MAXMSGSIZE
val sockSetMulticastHops = setSockOptInt Prim.MULTICAST_HOPS
val sockSetRcvTimeo = setSockOptInt Prim.RCVTIMEO
val sockSetSndTimeo = setSockOptInt Prim.SNDTIMEO
val sockSetIPV4Only = setSockOptBool Prim.IPV4ONLY
val sockSetDelayAttachOnConnect = setSockOptBool Prim.DELAY_ATTACH_ON_CONNECT
val sockSetRouterMandatory = setSockOptInt Prim.ROUTER_MANDATORY
val sockSetXPubVerbose = setSockOptInt Prim.XPUB_VERBOSE
val sockSetTCPKeepalive = setSockOptInt Prim.TCP_KEEPALIVE
val sockSetTCPKeepaliveIdle = setSockOptInt Prim.TCP_KEEPALIVE_IDLE
val sockSetTCPKeepaliveCnt = setSockOptInt Prim.TCP_KEEPALIVE_CNT
val sockSetTCPKeepaliveIntvl = setSockOptInt Prim.TCP_KEEPALIVE_INTVL
val sockSetTCPAcceptFilter = setSockOptWord8Vector Prim.TCP_ACCEPT_FILTER
(* Sends and Receives *)
datatype send_flag = S_DONT_WAIT | S_SEND_MORE | S_NONE
fun sendFlgToInt flg =
case flg of
S_DONT_WAIT => Prim.DONTWAIT
| S_SEND_MORE => Prim.SNDMORE
| S_NONE => 0
datatype recv_flag = R_DONT_WAIT | R_NONE
fun recvFlgToInt flg =
case flg of
R_DONT_WAIT => Prim.DONTWAIT
| R_NONE => 0
(* Send always works with references *)
fun sendWithPrefixAndFlag (SOCKET {hndl, ...}, msg, prefix, flg) =
exnWrapper (fn () => SysCall.simpleRestart
(fn () => Prim.send (ref msg, prefix, hndl, sendFlgToInt flg)))
fun sendWithPrefix (sock, msg, prefix) = sendWithPrefixAndFlag (sock, msg, prefix, S_NONE)
fun send (sock, msg) = sendWithPrefix (sock, msg, Vector.tabulate (0, fn _ => 0wx0 : Word8.word))
(* Since all sent messages are references, we need to dereference the
* deserialized message to get the actual value *)
fun recvWithFlag (SOCKET {hndl, ...}, flg) =
let
val zmqMsg =
exnWrapper (fn () => SysCall.simpleResultRestart'
({errVal = CUtil.C_Pointer.null}, fn () => Prim.recv (hndl, recvFlgToInt flg)))
in
!(Prim.deserializeZMQMsg zmqMsg)
end
fun recv (sock) = recvWithFlag (sock, R_NONE)
fun recvNB sock =
SOME (recvWithFlag (sock, R_DONT_WAIT))
handle exec as PosixError.SysErr (_, SOME e) => if e = PosixError.again then NONE
else raise exec
(* Poll *)
fun poll {ins : socket list, outs : socket list,
inouts : socket list, timeout : int} =
let
local
fun mk l =
let
val vec = Vector.map (fn SOCKET {hndl, ...} => hndl) (Vector.fromList l)
val arr = Array.array (Vector.length vec, 0)
in
(vec, arr)
end
in
val (in_vec, in_arr) = mk ins
val (out_vec, out_arr) = mk outs
val (inout_vec, inout_arr) = mk inouts
end
val res =
exnWrapper (fn () => SysCall.simpleResultRestart
(fn () =>
Prim.poll (in_vec, out_vec, inout_vec,
in_arr, out_arr, inout_arr, timeout)))
val (ins, outs, inouts) =
if res = 0
then ([],[],[])
else
let
fun mk (l, arr) =
(List.rev o #1)
(List.foldl (fn (sd, (l, i)) =>
(if Array.sub (arr, i) <> 0 then sd::l else l, i + 1))
([],0)
l)
in
(mk (ins, in_arr),
mk (outs, out_arr),
mk (inouts, inout_arr))
end
in
{ins = ins, outs = outs, inouts = inouts}
end
fun proxy {frontend = SOCKET {hndl = frontend, ...},
backend = SOCKET {hndl = backend, ...}} =
exnWrapper (fn () => SysCall.simpleRestart
(fn () => Prim.proxy (frontend, backend)))
end