summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephane Glondu <steph@glondu.net>2023-10-09 04:21:43 +0200
committerStephane Glondu <steph@glondu.net>2023-10-09 04:21:43 +0200
commitb2055d99b4f8d996b6749e51a24930f320d55e6c (patch)
treeae90dc8cb8bdab0f43a895007784e9728ac54423
parentd27dd61d9cb4a3aafc4e0fdbf062dc9de6522364 (diff)
parent61ca6a10ed30ad7d11fc9dfc060bfaf1639b2dd0 (diff)
Update upstream source from tag 'upstream/0.9.3'
Update to upstream version '0.9.3' with Debian dir 80a10a6861c4dcdafacd3caf67c4c40ffaaa356e
-rw-r--r--.github/workflows/ci.yml27
-rw-r--r--.gitignore1
-rw-r--r--CHANGES6
-rw-r--r--README.md14
-rw-r--r--dune-project5
-rw-r--r--duppy.opam5
-rw-r--r--examples/dune7
-rw-r--r--examples/http.ml29
-rw-r--r--examples/http_domain.ml302
-rw-r--r--examples/telnet.ml31
-rw-r--r--src/dune2
-rw-r--r--src/duppy.ml60
-rw-r--r--src/duppy.mli15
13 files changed, 420 insertions, 84 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 5f6a8f0..3a8e5ea 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -8,17 +8,18 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
+ ocaml-compiler:
+ - 4.08.x
steps:
- - uses: actions/checkout@v1
- - name: Setup OCaml
- uses: avsm/setup-ocaml@master
- - name: Install depext module
- run: opam install -y depext
- - name: Pin locally
- run: opam pin -y add --no-action .
- - name: Install locally
- run: opam depext -y -i duppy
- - name: Build locally
- run: eval $(opam env) && dune build
- - name: Run tests locally
- run: eval $(opam env) && dune runtest
+ - name: Checkout code
+ uses: actions/checkout@v2
+ - name: Use OCaml ${{ matrix.ocaml-compiler }}
+ uses: ocaml/setup-ocaml@v2
+ with:
+ ocaml-compiler: ${{ matrix.ocaml-compiler }}
+ - name: Install locally
+ run: opam install . --deps-only --with-test
+ - name: Build locally
+ run: opam exec -- dune build
+ - name: Run tests locally
+ run: opam exec -- dune runtest
diff --git a/.gitignore b/.gitignore
index 6fadc6f..04d31b9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,4 @@ _build
_tests
.merlin
*.install
+.*.sw*
diff --git a/CHANGES b/CHANGES
index dbfbb62..30ce288 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,3 +1,9 @@
+0.9.3 (2023-07-06)
+======
+* Make sure sure `ready_m` is release last to prevent any exception raised
+ after it unlocked. Refs: savonet/liquidsoap#2585
+* Added optional `on_error` to catch queue errors.
+
0.9.2 (07-10-2021)
=====
* Fix deadlock issue at shutdown.
diff --git a/README.md b/README.md
index d621f21..c42f4ed 100644
--- a/README.md
+++ b/README.md
@@ -6,16 +6,10 @@ Please read the COPYING file before using this software.
## Prerequisites:
-- ocaml >= 4.03.0
-
-- findlib >= 1.8.0
-
-- ocaml-pcre >= 7.3.4
-
-- dune >= 2.0
-
-The code may work with earlier versions but these are the one currently
-supported.
+- ocaml
+- findlib
+- ocaml-pcre
+- dune
## Compilation:
diff --git a/dune-project b/dune-project
index 6c50961..5866385 100644
--- a/dune-project
+++ b/dune-project
@@ -1,8 +1,8 @@
(lang dune 2.7)
-(version 0.9.2)
+(version 0.9.3)
(name duppy)
(source (github savonet/ocaml-duppy))
-(license GPL-2.0)
+(license LGPL-2.1-only)
(authors "Romain Beauxis <toots@rastageeks.org>")
(maintainers "The Savonet Team <savonet-users@lists.sourceforge.net>")
@@ -12,6 +12,7 @@
(name duppy)
(synopsis "Library providing monadic threads")
(depends
+ (ocaml (>= 4.07.0))
dune
pcre)
)
diff --git a/duppy.opam b/duppy.opam
index 0095389..bdff20a 100644
--- a/duppy.opam
+++ b/duppy.opam
@@ -1,13 +1,14 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
-version: "0.9.2"
+version: "0.9.3"
synopsis: "Library providing monadic threads"
maintainer: ["The Savonet Team <savonet-users@lists.sourceforge.net>"]
authors: ["Romain Beauxis <toots@rastageeks.org>"]
-license: "GPL-2.0"
+license: "LGPL-2.1-only"
homepage: "https://github.com/savonet/ocaml-duppy"
bug-reports: "https://github.com/savonet/ocaml-duppy/issues"
depends: [
+ "ocaml" {>= "4.07.0"}
"dune" {>= "2.7"}
"pcre"
"odoc" {with-doc}
diff --git a/examples/dune b/examples/dune
index 887559f..99609cf 100644
--- a/examples/dune
+++ b/examples/dune
@@ -4,6 +4,13 @@
(libraries duppy))
(executable
+ (name http_domain)
+ (modules http_domain)
+ (enabled_if
+ (<= 5.0.0 %{ocaml_version}))
+ (libraries duppy))
+
+(executable
(name telnet)
(modules telnet)
(libraries duppy))
diff --git a/examples/http.ml b/examples/http.ml
index d6e65ce..a96903a 100644
--- a/examples/http.ml
+++ b/examples/http.ml
@@ -10,7 +10,7 @@ let () =
incr pnum;
if !pnum > 1 then (
Printf.eprintf "Error: too many arguments\n";
- exit 1 )
+ exit 1)
else files_path := s
in
Arg.parse
@@ -30,7 +30,7 @@ let () =
arg usage;
if !files_path = "" then (
Printf.printf "%s\n" usage;
- exit 1 )
+ exit 1)
else ()
type priority = Maybe_blocking | Non_blocking
@@ -186,7 +186,7 @@ let index_uri path index protocol uri =
let index = Printf.sprintf "%s/%s" uri index in
if Sys.file_exists (Printf.sprintf "%s/%s" path index) then
Duppy.Monad.return index
- else Duppy.Monad.return uri )
+ else Duppy.Monad.return uri)
else Duppy.Monad.return uri
with _ -> Duppy.Monad.return uri
@@ -226,7 +226,7 @@ let file_request path _ request =
reply_headers = headers;
reply_data = File fd;
}
- with _ -> Duppy.Monad.raise (error_403 request.request_protocol) )
+ with _ -> Duppy.Monad.raise (error_403 request.request_protocol))
else Duppy.Monad.raise (error_404 request.request_protocol))
let file_handler = ((fun _ -> Duppy.Monad.return true), file_request !files_path)
@@ -298,7 +298,7 @@ let cgi_handler process path h request =
if Array.length ret > 0 then
Duppy.Monad.return
(Printf.sprintf "%s; extract AUTH_TYPE=%s" env ret.(1))
- else Duppy.Monad.raise error_500 )
+ else Duppy.Monad.raise error_500)
else Duppy.Monad.return env
in
Duppy.Monad.bind __pa_duppy_0 (fun env ->
@@ -358,7 +358,7 @@ let cgi_handler process path h request =
List.filter
(fun (x, _) -> x <> "Status")
headers )
- with _ -> Duppy.Monad.raise error_500 )
+ with _ -> Duppy.Monad.raise error_500)
else Duppy.Monad.return ((200, "OK"), headers)
in
Duppy.Monad.bind __pa_duppy_0
@@ -516,12 +516,11 @@ let () =
ignore (Unix.sigprocmask Unix.SIG_BLOCK [Sys.sigpipe]);
Unix.setsockopt sock Unix.SO_REUSEADDR true;
let rec incoming _ =
- ( try
- let s, _ = Unix.accept sock in
- handle_client s
- with e ->
- Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e)
- );
+ (try
+ let s, _ = Unix.accept sock in
+ handle_client s
+ with e ->
+ Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e));
[
{
Duppy.Task.priority = Non_blocking;
@@ -530,9 +529,9 @@ let () =
};
]
in
- ( try Unix.bind sock bind_addr
- with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") ->
- failwith (Printf.sprintf "port %d already taken" !port) );
+ (try Unix.bind sock bind_addr
+ with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") ->
+ failwith (Printf.sprintf "port %d already taken" !port));
Unix.listen sock max_conn;
Duppy.Task.add scheduler
{
diff --git a/examples/http_domain.ml b/examples/http_domain.ml
new file mode 100644
index 0000000..603bcc9
--- /dev/null
+++ b/examples/http_domain.ml
@@ -0,0 +1,302 @@
+let queue_mode = ref `Thread
+let queues = ref 3
+let port = ref 8080
+let usage = "usage: http_domain [options]"
+let ( let* ) = Duppy.Monad.bind
+
+let () =
+ let () =
+ match Domain.recommended_domain_count () with
+ | 1 -> queue_mode := `Thread
+ | n ->
+ queues := n - 1;
+ queue_mode := `Domain
+ in
+ let arg _ =
+ Printf.eprintf "Error: too many arguments\n";
+ exit 1
+ in
+ Arg.parse
+ [
+ ( "--queues",
+ Arg.Int (fun i -> queues := i),
+ Printf.sprintf "Number of non-blocking queues. (default: %d)" !queues );
+ ( "--mode",
+ Arg.String
+ (fun m ->
+ match m with
+ | "thread" -> queue_mode := `Thread
+ | "domain" -> queue_mode := `Thread
+ | v -> failwith ("Invalid queue mode: " ^ v)),
+ Printf.sprintf "Queue mode. (default: %s)"
+ (match !queue_mode with `Thread -> "thread" | `Domain -> "domain") );
+ ( "--port",
+ Arg.Int (fun i -> port := i),
+ Printf.sprintf "Port used to bind the server. (default: %d)" !port );
+ ]
+ arg usage
+
+type priority = Non_blocking
+
+let scheduler = Duppy.create ()
+
+type http_method = Post | Get
+type http_protocol = Http_11 | Http_10
+
+let string_of_protocol = function
+ | Http_11 -> "HTTP/1.1"
+ | Http_10 -> "HTTP/1.0"
+
+let protocol_of_string = function
+ | "HTTP/1.1" -> Http_11
+ | "HTTP/1.0" -> Http_10
+ | _ -> assert false
+
+let string_of_method = function Post -> "POST" | Get -> "GET"
+
+let method_of_string = function
+ | "POST" -> Post
+ | "GET" -> Get
+ | _ -> assert false
+
+type data = None | String of string
+
+type request = {
+ request_protocol : http_protocol;
+ request_method : http_method;
+ request_uri : string;
+ request_headers : (string * string) list;
+ request_data : data;
+}
+
+type reply = {
+ reply_protocol : http_protocol;
+ reply_status : int * string;
+ reply_headers : (string * string) list;
+ reply_data : data;
+}
+
+exception Assoc of string
+
+let assoc_uppercase x y =
+ try
+ List.iter
+ (fun (l, v) ->
+ if String.uppercase_ascii l = x then raise (Assoc v) else ())
+ y;
+ raise Not_found
+ with Assoc s -> s
+
+let server = "dhttpd"
+
+let html_template =
+ Printf.sprintf
+ "<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.1//EN\" \
+ \"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd\">\r\n\
+ <html xmlns=\"http://www.w3.org/1999/xhtml\" xml:lang=\"en\">\r\n\
+ %s</html>"
+
+let server_error status protocol =
+ let _, explanation = status in
+ let data =
+ String
+ (html_template
+ (Printf.sprintf "<head><title>%s</title></head>\r\n<body>%s !</body>"
+ explanation explanation))
+ in
+ {
+ reply_protocol = protocol;
+ reply_status = status;
+ reply_headers =
+ [("Content-Type", "text/html; charset=UTF-8"); ("Server", server)];
+ reply_data = data;
+ }
+
+let error_404 = server_error (404, "File Not Found")
+let error_500 = server_error (500, "Bad Request") Http_10
+let error_403 = server_error (403, "Forbidden")
+
+let http_302 protocol uri =
+ {
+ reply_protocol = protocol;
+ reply_status = (302, "Found");
+ reply_headers = [("Location", uri)];
+ reply_data = String "";
+ }
+
+let send_reply h reply =
+ let write s =
+ Duppy.Monad.Io.write ?timeout:None ~priority:Non_blocking h
+ (Bytes.unsafe_of_string s)
+ in
+ let code, status = reply.reply_status in
+ let http_header =
+ Printf.sprintf "%s %d %s\r\n%s\r\n\r\n"
+ (string_of_protocol reply.reply_protocol)
+ code status
+ (String.concat "\r\n"
+ (List.map
+ (fun (x, y) -> Printf.sprintf "%s: %s" x y)
+ reply.reply_headers))
+ in
+ let* () = write http_header in
+ match reply.reply_data with
+ | String s -> write s
+ | None -> Duppy.Monad.return ()
+
+let parse_headers headers =
+ let split_header l h =
+ try
+ let rex = Pcre.regexp "([^:\\r\\n]+):\\s*([^\\r\\n]+)" in
+ let sub = Pcre.exec ~rex h in
+ Duppy.Monad.return
+ ((Pcre.get_substring sub 1, Pcre.get_substring sub 2) :: l)
+ with Not_found -> Duppy.Monad.raise error_500
+ in
+ Duppy.Monad.fold_left split_header [] headers
+
+let payload = String.init 4096 (fun i -> Char.chr (i mod 100))
+
+let handle_request request =
+ if request.request_uri = "/" then (
+ let headers =
+ [
+ ("Server", server);
+ ("Content-Length", string_of_int (String.length payload));
+ ("Content-Type", "application/octet-stream");
+ ]
+ in
+ Duppy.Monad.raise
+ {
+ reply_protocol = request.request_protocol;
+ reply_status = (200, "OK");
+ reply_headers = headers;
+ reply_data = String payload;
+ })
+ else Duppy.Monad.return (error_404 request.request_protocol)
+
+let parse_request h r =
+ try
+ let headers = Pcre.split ~pat:"\r\n" r in
+ let* request, headers =
+ match headers with
+ | e :: l ->
+ let* headers = parse_headers l in
+ Duppy.Monad.return (e, headers)
+ | _ -> Duppy.Monad.raise error_500
+ in
+ let rex = Pcre.regexp "([\\w]+)\\s([^\\s]+)\\s(HTTP/1.[01])" in
+ let* http_method, uri, protocol =
+ try
+ let sub = Pcre.exec ~rex request in
+ let http_method, uri, protocol =
+ ( Pcre.get_substring sub 1,
+ Pcre.get_substring sub 2,
+ Pcre.get_substring sub 3 )
+ in
+ Duppy.Monad.return
+ (method_of_string http_method, uri, protocol_of_string protocol)
+ with _ -> Duppy.Monad.raise error_500
+ in
+ let* data =
+ match http_method with
+ | Get -> Duppy.Monad.return None
+ | Post -> (
+ let* len =
+ try
+ let length = assoc_uppercase "CONTENT-LENGTH" headers in
+ Duppy.Monad.return (int_of_string length)
+ with
+ | Not_found -> Duppy.Monad.return 0
+ | _ -> Duppy.Monad.raise error_500
+ in
+ match len with
+ | 0 -> Duppy.Monad.return None
+ | d ->
+ let* data =
+ Duppy.Monad.Io.read ?timeout:None ~priority:Non_blocking
+ ~marker:(Duppy.Io.Length d) h
+ in
+ Duppy.Monad.return (String data))
+ in
+ Duppy.Monad.return
+ {
+ request_method = http_method;
+ request_protocol = protocol;
+ request_uri = uri;
+ request_headers = headers;
+ request_data = data;
+ }
+ with _ -> Duppy.Monad.raise error_500
+
+let handle_client socket =
+ (* Read and process lines *)
+ let on_error _ = error_500 in
+ let h = { Duppy.Monad.Io.scheduler; socket; data = ""; on_error } in
+ let exec =
+ let* reply =
+ Duppy.Monad.catch
+ (let* data =
+ Duppy.Monad.Io.read ?timeout:None ~priority:Non_blocking
+ ~marker:(Duppy.Io.Split "\r\n\r\n") h
+ in
+ let* request = parse_request h data in
+ handle_request request)
+ (fun reply -> Duppy.Monad.return reply)
+ in
+ send_reply h reply
+ in
+ let finish _ = try Unix.close socket with _ -> () in
+ Duppy.Monad.run ~return:finish ~raise:finish exec
+
+let new_queue ~priority ~name () =
+ let priorities p = p = priority in
+ let queue () = Duppy.queue scheduler ~log:(fun _ -> ()) ~priorities name in
+ match !queue_mode with
+ | `Thread -> `Thread (Thread.create queue ())
+ | `Domain -> `Domain (Domain.spawn queue)
+
+let bind_addr_inet = Unix.inet_addr_of_string "0.0.0.0"
+let bind_addr = Unix.ADDR_INET (bind_addr_inet, !port)
+let max_conn = 100
+let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0
+
+let () =
+ (* See http://caml.inria.fr/mantis/print_bug_page.php?bug_id=4640
+ * for this: we want Unix EPIPE error and not SIGPIPE, which
+ * crashes the program.. *)
+ Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
+ ignore (Unix.sigprocmask Unix.SIG_BLOCK [Sys.sigpipe]);
+ Unix.setsockopt sock Unix.SO_REUSEADDR true;
+ let rec incoming _ =
+ (try
+ let s, _ = Unix.accept sock in
+ handle_client s
+ with e ->
+ Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e));
+ [
+ {
+ Duppy.Task.priority = Non_blocking;
+ events = [`Read sock];
+ handler = incoming;
+ };
+ ]
+ in
+ (try Unix.bind sock bind_addr
+ with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") ->
+ failwith (Printf.sprintf "port %d already taken" !port));
+ Unix.listen sock max_conn;
+ Duppy.Task.add scheduler
+ {
+ Duppy.Task.priority = Non_blocking;
+ events = [`Read sock];
+ handler = incoming;
+ };
+ for i = 1 to !queues do
+ Printf.printf "Initiating queue %d\n%!" i;
+ ignore
+ (new_queue ~priority:Non_blocking
+ ~name:(Printf.sprintf "Non blocking queue #%d" i)
+ ())
+ done;
+ Duppy.queue scheduler ~log:(fun _ -> ()) "root"
diff --git a/examples/telnet.ml b/examples/telnet.ml
index 1511e33..be808ed 100644
--- a/examples/telnet.ml
+++ b/examples/telnet.ml
@@ -117,19 +117,18 @@ let sock = socket PF_INET SOCK_STREAM 0
let () =
setsockopt sock SO_REUSEADDR true;
let rec incoming _ =
- ( try
- let s, caller = accept sock in
- let ip =
- let a =
- match caller with ADDR_INET (a, _) -> a | _ -> assert false
- in
- try (gethostbyaddr a).h_name with Not_found -> string_of_inet_addr a
- in
- Printf.printf "New client: %s\n" ip;
- handle_client s
- with e ->
- Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e)
- );
+ (try
+ let s, caller = accept sock in
+ let ip =
+ let a =
+ match caller with ADDR_INET (a, _) -> a | _ -> assert false
+ in
+ try (gethostbyaddr a).h_name with Not_found -> string_of_inet_addr a
+ in
+ Printf.printf "New client: %s\n" ip;
+ handle_client s
+ with e ->
+ Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e));
[
{
Duppy.Task.priority = io_priority;
@@ -138,9 +137,9 @@ let () =
};
]
in
- ( try bind sock bind_addr
- with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") ->
- failwith (Printf.sprintf "port %d already taken" port) );
+ (try bind sock bind_addr
+ with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") ->
+ failwith (Printf.sprintf "port %d already taken" port));
listen sock max_conn;
Duppy.Task.add scheduler
{
diff --git a/src/dune b/src/dune
index 0f40400..1f5caa0 100644
--- a/src/dune
+++ b/src/dune
@@ -1,7 +1,7 @@
(library
(name duppy)
(public_name duppy)
- (libraries unix threads pcre bigarray)
+ (libraries unix threads pcre)
(foreign_stubs
(language c)
(names duppy_stubs))
diff --git a/src/duppy.ml b/src/duppy.ml
index cb0d6e6..dc82e26 100644
--- a/src/duppy.ml
+++ b/src/duppy.ml
@@ -64,6 +64,7 @@ type 'a t = {
}
type 'a scheduler = {
+ on_error : exn -> Printexc.raw_backtrace -> unit;
out_pipe : Unix.file_descr;
in_pipe : Unix.file_descr;
compare : 'a -> 'a -> int;
@@ -84,9 +85,10 @@ let clear_tasks s =
s.tasks <- [];
Mutex.unlock s.tasks_m
-let create ?(compare = compare) () =
+let create ?(on_error = Printexc.raise_with_backtrace) ?(compare = compare) () =
let out_pipe, in_pipe = Unix.pipe () in
{
+ on_error;
out_pipe;
in_pipe;
compare;
@@ -126,7 +128,8 @@ module Task = struct
enrich =
(fun e ->
List.fold_left
- (fun e -> function `Delay s -> { e with t = min e.t (t0 +. s) }
+ (fun e -> function
+ | `Delay s -> { e with t = min e.t (t0 +. s) }
| `Read s -> { e with r = s :: e.r }
| `Write s -> { e with w = s :: e.w }
| `Exception s -> { e with x = s :: e.x })
@@ -268,13 +271,21 @@ let exec s (priorities : 'a -> bool) =
* win32 because a thread can double-lock
* the same mutex.. *)
if Sys.os_type <> "Win32" then assert (not (Mutex.try_lock s.ready_m));
- try
- let (_, task), remaining = remove (fun (p, _) -> priorities p) s.ready in
- s.ready <- remaining;
- Mutex.unlock s.ready_m;
- add_t s (task ());
- true
- with Not_found -> false
+ match remove (fun (p, _) -> priorities p) s.ready with
+ | (_, task), remaining ->
+ s.ready <- remaining;
+ Mutex.unlock s.ready_m;
+ let tasks =
+ match task () with
+ | exception exn ->
+ let bt = Printexc.get_raw_backtrace () in
+ s.on_error exn bt;
+ []
+ | v -> v
+ in
+ add_t s tasks;
+ true
+ | exception Not_found -> false
exception Queue_stopped
exception Queue_processed
@@ -325,7 +336,7 @@ let queue ?log ?(priorities = fun _ -> true) s name =
Mutex.unlock s.ready_m;
process s log;
Mutex.unlock s.select_m;
- wake ();
+ wake ()
end
else begin
(* We use s.ready_m mutex here.
@@ -354,11 +365,12 @@ let queue ?log ?(priorities = fun _ -> true) s name =
Condition.signal s.queue_stopped_c;
Mutex.unlock s.queues_m
in
- ( try f () with
+ (try f () with
| Queue_stopped -> ()
| exn ->
- on_done ();
- raise exn );
+ let bt = Printexc.get_raw_backtrace () in
+ (try on_done () with _ -> ());
+ Printexc.raise_with_backtrace exn bt);
on_done ()
module Async = struct
@@ -404,8 +416,8 @@ module Async = struct
try
begin
match t.fd with
- | Some t -> ignore (Unix.write t (Bytes.of_string " ") 0 1)
- | None -> raise Stopped
+ | Some t -> ignore (Unix.write t (Bytes.of_string " ") 0 1)
+ | None -> raise Stopped
end;
Mutex.unlock t.m
with e ->
@@ -417,10 +429,10 @@ module Async = struct
try
begin
match t.fd with
- | Some c ->
- t.stop := true;
- ignore (Unix.write c (Bytes.of_string " ") 0 1)
- | None -> raise Stopped
+ | Some c ->
+ t.stop := true;
+ ignore (Unix.write c (Bytes.of_string " ") 0 1)
+ | None -> raise Stopped
end;
t.fd <- None;
Mutex.unlock t.m
@@ -593,7 +605,7 @@ struct
[{ priority; events; handler = f }]
| _ ->
exec x;
- [] )
+ [])
| None -> [{ priority; events; handler = f }]
in
(* Catch all exceptions.. *)
@@ -658,12 +670,12 @@ struct
let n = write pos len in
if n <= 0 then (
on_error Io_error;
- [] )
+ [])
else if n < len then
[{ priority; events = [`Write unix_socket]; handler = f (pos + n) }]
else (
exec ();
- [] )
+ [])
with
| Unix.Unix_error (Unix.EWOULDBLOCK, _, _) when Sys.os_type = "Win32" ->
[{ priority; events = [`Write unix_socket]; handler = f pos }]
@@ -818,12 +830,12 @@ module Monad = struct
if not m.locked then (
(* I don't think shuffling tasks
* matters here.. *)
- match m.tasks with
+ match m.tasks with
| x :: l ->
m.tasks <- l;
m.locked <- true;
task x :: tasks
- | _ -> tasks )
+ | _ -> tasks)
else tasks
let rec handler _ =
diff --git a/src/duppy.mli b/src/duppy.mli
index be0a4d6..480e050 100644
--- a/src/duppy.mli
+++ b/src/duppy.mli
@@ -61,7 +61,20 @@ type 'a scheduler
(** Initiate a new scheduler
* @param compare the comparison function used to sort tasks according to priorities.
* Works as in [List.sort] *)
-val create : ?compare:('a -> 'a -> int) -> unit -> 'a scheduler
+val create :
+ ?on_error:(exn -> Printexc.raw_backtrace -> unit) ->
+ ?compare:('a -> 'a -> int) ->
+ unit ->
+ 'a scheduler
+
+(** Internal polling function. Uses `Unix.select` on windows and
+ `poll` otherwise. *)
+val poll :
+ Unix.file_descr list ->
+ Unix.file_descr list ->
+ Unix.file_descr list ->
+ float ->
+ Unix.file_descr list * Unix.file_descr list * Unix.file_descr list
(** [queue ~log ~priorities s name]
* starts a queue, on the scheduler [s] only processing priorities [p]