Skip to content

Commit b8b9a83

Browse files
authored
Merge pull request #17 from imandra-ai/simon/synchronous-logger
sync logger
2 parents 57d58c9 + e03b180 commit b8b9a83

File tree

5 files changed

+124
-64
lines changed

5 files changed

+124
-64
lines changed

src/log/dune

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
(name imandrakit_log)
44
(preprocess
55
(pps ppx_deriving.std imandrakit.ser-pack.ppx imandrakit.twine.ppx))
6+
(private_modules lock_)
67
(flags :standard -open Imandrakit)
78
(libraries
89
imandrakit

src/log/lock_.ml

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
open struct
2+
external reraise : exn -> 'a = "%reraise"
3+
end
4+
5+
type 'a t = {
6+
v: 'a;
7+
m: Mutex.t;
8+
}
9+
10+
let create v = { v; m = Mutex.create () }
11+
12+
let with_ (self : _ t) f =
13+
Mutex.lock self.m;
14+
match f self.v with
15+
| x ->
16+
Mutex.unlock self.m;
17+
x
18+
| exception e ->
19+
Mutex.unlock self.m;
20+
reraise e

src/log/log_google.ml

+12-8
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,23 @@ let event_to_json ?(other_fields = []) (ev : Log_event.t) : json =
2222
`Assoc (List.rev_append other_fields fields)
2323

2424
let logger ?other_fields oc : Logger.Output.t =
25-
Logger.Output.to_event () ~emit_ev:(fun ev ->
25+
let oc = Lock_.create oc in
26+
Logger.Output.to_event ()
27+
~flush:(fun () -> Lock_.with_ oc Stdlib.flush)
28+
~emit_ev:(fun ev ->
2629
let json = event_to_json ?other_fields ev in
2730

2831
try
29-
((* use a local buffer *)
30-
let@ buf = Apool.with_resource Logger.Output.buf_pool in
31-
assert (Buffer.length buf = 0);
32-
Yojson.Safe.to_buffer buf json;
33-
(* write buffer's content immediately *)
34-
Buffer.output_buffer oc buf);
32+
(* use a local buffer *)
33+
let@ buf = Apool.with_resource Logger.Output.buf_pool in
34+
assert (Buffer.length buf = 0);
35+
Yojson.Safe.to_buffer buf json;
3536

37+
(* write buffer's content immediately *)
38+
let@ oc = Lock_.with_ oc in
39+
Buffer.output_buffer oc buf;
3640
output_char oc '\n';
3741
flush oc
3842
with exn ->
39-
Printf.eprintf "log to json chan: failed with %s\n%!"
43+
Printf.eprintf "log to json in google format: failed with\n%s\n%!"
4044
(Printexc.to_string exn))

src/log/logger.ml

+77-50
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
open Log_level
2-
module Sync_queue = Moonpool.Blocking_queue
1+
open! Log_level
32

43
type level = Log_level.t [@@deriving show, eq]
54

@@ -51,9 +50,12 @@ open struct
5150
end
5251

5352
module Output = struct
54-
type t = { emit: Log_event.t -> unit } [@@unboxed]
53+
type t = {
54+
emit: Log_event.t -> unit;
55+
flush: unit -> unit;
56+
}
5557

56-
let to_event ~emit_ev () : t = { emit = emit_ev }
58+
let to_event ~emit_ev ~flush () : t = { emit = emit_ev; flush }
5759
let () = Fmt.set_color_default true
5860

5961
let to_str_ (ev : Log_event.t) : string =
@@ -92,41 +94,49 @@ module Output = struct
9294

9395
Buf_fmt.get_contents buf_fmt
9496

95-
let to_str ~(emit_str : string -> unit) () : t =
97+
let to_str ~(emit_str : string -> unit) ~flush () : t =
9698
{
9799
emit =
98100
(fun ev ->
99101
let s = to_str_ ev in
100102
emit_str s);
103+
flush;
101104
}
102105

103-
let to_chan (oc : out_channel) : t =
106+
let to_chan ?(autoflush = true) (oc : out_channel) : t =
107+
let oc = Lock_.create oc in
104108
to_str
109+
~flush:(fun () -> Lock_.with_ oc Stdlib.flush)
105110
~emit_str:(fun s ->
106111
try
112+
let@ oc = Lock_.with_ oc in
107113
output_string oc s;
108114
output_char oc '\n';
109-
flush oc
115+
if autoflush then Stdlib.flush oc
110116
with _ -> Printf.eprintf "logger: failed to log to chan\n%!")
111117
()
112118

113119
let stdout () = to_chan stdout
114120
let stderr () = to_chan stderr
115121

116122
let filter_level pred (self : t) : t =
117-
{ emit = (fun ev -> if pred ev.lvl then self.emit ev) }
123+
{ emit = (fun ev -> if pred ev.lvl then self.emit ev); flush = self.flush }
118124

119125
let buf_pool : Buffer.t Apool.t =
120126
Apool.create ~clear:Buffer.reset
121127
~mk_item:(fun () -> Buffer.create 256)
122128
~max_size:8 ()
123129

124130
(** Logger that writes events, one per line, on the given channel. *)
125-
let to_chan_jsonl (oc : out_channel) : t =
126-
to_event () ~emit_ev:(fun ev ->
131+
let to_chan_jsonl ?(autoflush = true) (oc : out_channel) : t =
132+
let oc = Lock_.create oc in
133+
to_event ()
134+
~flush:(fun () -> Lock_.with_ oc flush)
135+
~emit_ev:(fun ev ->
127136
let json = Log_event.to_yojson ev in
128137

129138
try
139+
let@ oc = Lock_.with_ oc in
130140
((* use a local buffer *)
131141
let@ buf = Apool.with_resource buf_pool in
132142
Buffer.clear buf;
@@ -135,16 +145,12 @@ module Output = struct
135145
Buffer.output_buffer oc buf);
136146

137147
output_char oc '\n';
138-
flush oc
148+
if autoflush then flush oc
139149
with exn ->
140150
Printf.eprintf "log to json chan: failed with %s\n%!"
141151
(Printexc.to_string exn))
142152
end
143153

144-
type task =
145-
| T_fence of { wakeup: unit Moonpool.Fut.promise }
146-
| T_emit of Log_event.t
147-
148154
type capture_meta_hook = unit -> (string * Log_meta.t) list
149155

150156
open struct
@@ -183,19 +189,23 @@ end
183189
let add_rich_tag = add_rich_tag
184190

185191
type t = {
186-
q: task Sync_queue.t;
187-
events: Log_event.t Observer.t;
192+
active: bool Atomic.t;
193+
events: Log_event.t Observer.t; (** Additional subscriptions on events *)
188194
outputs: Output.t list Atomic.t;
189-
reporter: Logs.reporter;
190-
mutable bg_thread: Thread.t option;
195+
emit_ev: Log_event.t -> unit;
196+
emit_fence: unit -> unit;
197+
reporter: Logs.reporter; (** As reporter *)
191198
}
192199

193200
let[@inline] as_reporter self = self.reporter
194201
let[@inline] events self = self.events
202+
let[@inline] shutdown self = Atomic.set self.active false
195203

196-
let shutdown self =
197-
Sync_queue.close self.q;
198-
Option.iter Thread.join self.bg_thread
204+
let trace_level_of_level : level -> Trace.Level.t = function
205+
| Info | App -> Trace.Level.Info
206+
| Error -> Trace.Level.Error
207+
| Warning -> Trace.Level.Warning
208+
| Debug -> Trace.Level.Debug3
199209

200210
let add_output self out : unit =
201211
while
@@ -267,7 +277,8 @@ let to_event_if_ (p : level -> bool) ~emit_ev : Logs.reporter =
267277
at least the TEF collector needs to know on which thread we are running. *)
268278
if Trace_core.enabled () then (
269279
let msg = Ansi_clean.remove_escape_codes msg in
270-
Trace_core.message msg ~data:(fun () ->
280+
Trace_core.message ~level:(trace_level_of_level level) msg
281+
~data:(fun () ->
271282
let meta =
272283
List.map (fun (k, v) -> k, Log_meta.to_trace_data v) meta
273284
in
@@ -290,6 +301,7 @@ let to_event_if_ (p : level -> bool) ~emit_ev : Logs.reporter =
290301
in
291302
{ Logs.report }
292303

304+
(*
293305
let bg_thread_loop_ (self : t) : unit =
294306
Trace_core.set_thread_name "logger.bg";
295307
ignore
@@ -304,7 +316,7 @@ let bg_thread_loop_ (self : t) : unit =
304316
Sys.sigusr2;
305317
Sys.sigvtalrm;
306318
]
307-
: _ list);
319+
: _ list);
308320
309321
let local_q = Queue.create () in
310322
try
@@ -323,55 +335,71 @@ let bg_thread_loop_ (self : t) : unit =
323335
Queue.clear local_q
324336
done
325337
with Sync_queue.Closed -> ()
338+
*)
326339

327-
let fence_ : (unit -> unit Moonpool.Fut.t) ref =
328-
ref (fun () -> Moonpool.Fut.return ())
329-
330-
let[@inline] emit_ev (self : t) ev : unit =
331-
try Sync_queue.push self.q (T_emit ev) with Sync_queue.Closed -> ()
340+
let fence_ : (unit -> unit) ref = ref ignore
341+
let set_as_fence_ (self : t) : unit = fence_ := self.emit_fence
332342

333343
let to_outputs (outs : Output.t list) : t =
344+
let active = Atomic.make true in
334345
let outputs = Atomic.make outs in
335346
let events = Observer.create () in
336-
let q = Sync_queue.create () in
347+
let emit_ev ev =
348+
(try Observer.emit events ev
349+
with exn ->
350+
Printf.eprintf "logger observer failed with %s\n%!"
351+
(Printexc.to_string exn));
352+
match Atomic.get outputs with
353+
| [] -> ()
354+
| outs ->
355+
List.iter
356+
(fun (out : Output.t) ->
357+
try out.emit ev
358+
with exn ->
359+
Printf.eprintf "logger output failed with %s\n%!"
360+
(Printexc.to_string exn))
361+
outs
362+
in
337363
let reporter =
338364
to_event_if_
339365
(fun _ ->
340366
(* emit event only if we have some outputs or event subscribers *)
341-
Observer.has_subscribers events
342-
|| not (CCList.is_empty (Atomic.get outputs)))
343-
~emit_ev:(fun ev ->
344-
try Sync_queue.push q (T_emit ev) with Sync_queue.Closed -> ())
367+
Atomic.get active
368+
&& (Observer.has_subscribers events
369+
|| not (CCList.is_empty (Atomic.get outputs))))
370+
~emit_ev
345371
in
346372

347-
let fence () =
348-
let fut, prom = Moonpool.Fut.make () in
349-
(try Sync_queue.push q (T_fence { wakeup = prom })
350-
with Sync_queue.Closed -> Moonpool.Fut.fulfill prom @@ Ok ());
351-
fut
373+
let emit_fence () =
374+
let outs = Atomic.get outputs in
375+
List.iter (fun (out : Output.t) -> out.flush ()) outs
352376
in
353377

354-
fence_ := fence;
355-
let self = { q; outputs; reporter; events; bg_thread = None } in
356-
self.bg_thread <- Some (Thread.create bg_thread_loop_ self);
378+
let self = { active; emit_ev; emit_fence; outputs; reporter; events } in
357379
self
358380

381+
let[@inline] emit_ev (self : t) ev : unit = self.emit_ev ev
359382
let null () : t = to_outputs []
360383

384+
let setup_ (self : t) : unit =
385+
Logs.set_reporter self.reporter;
386+
set_as_fence_ self;
387+
()
388+
361389
let with_no_logger () f =
362390
let old = Logs.reporter () in
363391
Logs.set_reporter Logs.nop_reporter;
364392
Fun.protect ~finally:(fun () -> Logs.set_reporter old) f
365393

366394
let setup_logger_to_stdout () =
367395
let outs = [ Output.stdout () ] in
368-
let reporter = to_outputs outs in
369-
Logs.set_reporter (as_reporter reporter)
396+
let logger = to_outputs outs in
397+
setup_ logger
370398

371399
let setup_logger_to_stderr () =
372400
let outs = [ Output.stderr () ] in
373-
let reporter = to_outputs outs in
374-
Logs.set_reporter (as_reporter reporter)
401+
let logger = to_outputs outs in
402+
setup_ logger
375403

376404
(** Setup a logger that emits into the file specified in ["LOG_FILE"] env,
377405
or no logger otherwise. *)
@@ -380,7 +408,8 @@ let setup_logger_to_LOG_FILE ?filename () k =
380408
| Some file, _ | None, Some file ->
381409
let@ oc = CCIO.with_out file in
382410
let outs = [ Output.to_chan oc ] in
383-
Logs.set_reporter (to_outputs outs |> as_reporter);
411+
let logger = to_outputs outs in
412+
setup_ logger;
384413
k ()
385414
| _ -> k ()
386415

@@ -390,9 +419,7 @@ module type LOG = sig
390419
val src : Logs.src
391420
end
392421

393-
let fence () =
394-
let fut = !fence_ () in
395-
Moonpool.Fut.wait_block_exn fut
422+
let[@inline] fence () = !fence_ ()
396423

397424
let mk_log_str s : (module LOG) =
398425
let src = Logs.Src.create s in

src/log/logger.mli

+14-6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ module Log_event = Log_event
88
99
We can have multiple log outputs, they each get a {!Log_event.t}.
1010
See {!Reporter.to_outputs} to see how to use these.
11+
12+
A log output must be thread-safe, as it will be called from
13+
multiple threads.
1114
*)
1215
module Output : sig
1316
type t
@@ -16,15 +19,20 @@ module Output : sig
1619

1720
val stdout : unit -> t
1821
val stderr : unit -> t
19-
val to_event : emit_ev:(Log_event.t -> unit) -> unit -> t
22+
23+
val to_event :
24+
emit_ev:(Log_event.t -> unit) -> flush:(unit -> unit) -> unit -> t
25+
2026
val filter_level : (level -> bool) -> t -> t
21-
val to_str : emit_str:(string -> unit) -> unit -> t
27+
val to_str : emit_str:(string -> unit) -> flush:(unit -> unit) -> unit -> t
2228

23-
val to_chan : out_channel -> t
24-
(** Write into the channel, as text. *)
29+
val to_chan : ?autoflush:bool -> out_channel -> t
30+
(** Write into the channel, as text.
31+
@param autoflush if true (default) channel is flushed after each event *)
2532

26-
val to_chan_jsonl : out_channel -> t
27-
(** Write into the channel as jsonl. *)
33+
val to_chan_jsonl : ?autoflush:bool -> out_channel -> t
34+
(** Write into the channel as jsonl.
35+
@param autoflush if true (default) channel is flushed after each event *)
2836

2937
val buf_pool : Buffer.t Apool.t
3038
(** Buffer pool for loggers. Please hold onto buffers for only

0 commit comments

Comments
 (0)