diff options
author | Stephane Glondu <steph@glondu.net> | 2023-10-09 04:21:43 +0200 |
---|---|---|
committer | Stephane Glondu <steph@glondu.net> | 2023-10-09 04:21:43 +0200 |
commit | b2055d99b4f8d996b6749e51a24930f320d55e6c (patch) | |
tree | ae90dc8cb8bdab0f43a895007784e9728ac54423 | |
parent | d27dd61d9cb4a3aafc4e0fdbf062dc9de6522364 (diff) | |
parent | 61ca6a10ed30ad7d11fc9dfc060bfaf1639b2dd0 (diff) |
Update upstream source from tag 'upstream/0.9.3'
Update to upstream version '0.9.3'
with Debian dir 80a10a6861c4dcdafacd3caf67c4c40ffaaa356e
-rw-r--r-- | .github/workflows/ci.yml | 27 | ||||
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | CHANGES | 6 | ||||
-rw-r--r-- | README.md | 14 | ||||
-rw-r--r-- | dune-project | 5 | ||||
-rw-r--r-- | duppy.opam | 5 | ||||
-rw-r--r-- | examples/dune | 7 | ||||
-rw-r--r-- | examples/http.ml | 29 | ||||
-rw-r--r-- | examples/http_domain.ml | 302 | ||||
-rw-r--r-- | examples/telnet.ml | 31 | ||||
-rw-r--r-- | src/dune | 2 | ||||
-rw-r--r-- | src/duppy.ml | 60 | ||||
-rw-r--r-- | src/duppy.mli | 15 |
13 files changed, 420 insertions, 84 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5f6a8f0..3a8e5ea 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,17 +8,18 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest] + ocaml-compiler: + - 4.08.x steps: - - uses: actions/checkout@v1 - - name: Setup OCaml - uses: avsm/setup-ocaml@master - - name: Install depext module - run: opam install -y depext - - name: Pin locally - run: opam pin -y add --no-action . - - name: Install locally - run: opam depext -y -i duppy - - name: Build locally - run: eval $(opam env) && dune build - - name: Run tests locally - run: eval $(opam env) && dune runtest + - name: Checkout code + uses: actions/checkout@v2 + - name: Use OCaml ${{ matrix.ocaml-compiler }} + uses: ocaml/setup-ocaml@v2 + with: + ocaml-compiler: ${{ matrix.ocaml-compiler }} + - name: Install locally + run: opam install . --deps-only --with-test + - name: Build locally + run: opam exec -- dune build + - name: Run tests locally + run: opam exec -- dune runtest @@ -5,3 +5,4 @@ _build _tests .merlin *.install +.*.sw* @@ -1,3 +1,9 @@ +0.9.3 (2023-07-06) +====== +* Make sure sure `ready_m` is release last to prevent any exception raised + after it unlocked. Refs: savonet/liquidsoap#2585 +* Added optional `on_error` to catch queue errors. + 0.9.2 (07-10-2021) ===== * Fix deadlock issue at shutdown. @@ -6,16 +6,10 @@ Please read the COPYING file before using this software. ## Prerequisites: -- ocaml >= 4.03.0 - -- findlib >= 1.8.0 - -- ocaml-pcre >= 7.3.4 - -- dune >= 2.0 - -The code may work with earlier versions but these are the one currently -supported. +- ocaml +- findlib +- ocaml-pcre +- dune ## Compilation: diff --git a/dune-project b/dune-project index 6c50961..5866385 100644 --- a/dune-project +++ b/dune-project @@ -1,8 +1,8 @@ (lang dune 2.7) -(version 0.9.2) +(version 0.9.3) (name duppy) (source (github savonet/ocaml-duppy)) -(license GPL-2.0) +(license LGPL-2.1-only) (authors "Romain Beauxis <toots@rastageeks.org>") (maintainers "The Savonet Team <savonet-users@lists.sourceforge.net>") @@ -12,6 +12,7 @@ (name duppy) (synopsis "Library providing monadic threads") (depends + (ocaml (>= 4.07.0)) dune pcre) ) @@ -1,13 +1,14 @@ # This file is generated by dune, edit dune-project instead opam-version: "2.0" -version: "0.9.2" +version: "0.9.3" synopsis: "Library providing monadic threads" maintainer: ["The Savonet Team <savonet-users@lists.sourceforge.net>"] authors: ["Romain Beauxis <toots@rastageeks.org>"] -license: "GPL-2.0" +license: "LGPL-2.1-only" homepage: "https://github.com/savonet/ocaml-duppy" bug-reports: "https://github.com/savonet/ocaml-duppy/issues" depends: [ + "ocaml" {>= "4.07.0"} "dune" {>= "2.7"} "pcre" "odoc" {with-doc} diff --git a/examples/dune b/examples/dune index 887559f..99609cf 100644 --- a/examples/dune +++ b/examples/dune @@ -4,6 +4,13 @@ (libraries duppy)) (executable + (name http_domain) + (modules http_domain) + (enabled_if + (<= 5.0.0 %{ocaml_version})) + (libraries duppy)) + +(executable (name telnet) (modules telnet) (libraries duppy)) diff --git a/examples/http.ml b/examples/http.ml index d6e65ce..a96903a 100644 --- a/examples/http.ml +++ b/examples/http.ml @@ -10,7 +10,7 @@ let () = incr pnum; if !pnum > 1 then ( Printf.eprintf "Error: too many arguments\n"; - exit 1 ) + exit 1) else files_path := s in Arg.parse @@ -30,7 +30,7 @@ let () = arg usage; if !files_path = "" then ( Printf.printf "%s\n" usage; - exit 1 ) + exit 1) else () type priority = Maybe_blocking | Non_blocking @@ -186,7 +186,7 @@ let index_uri path index protocol uri = let index = Printf.sprintf "%s/%s" uri index in if Sys.file_exists (Printf.sprintf "%s/%s" path index) then Duppy.Monad.return index - else Duppy.Monad.return uri ) + else Duppy.Monad.return uri) else Duppy.Monad.return uri with _ -> Duppy.Monad.return uri @@ -226,7 +226,7 @@ let file_request path _ request = reply_headers = headers; reply_data = File fd; } - with _ -> Duppy.Monad.raise (error_403 request.request_protocol) ) + with _ -> Duppy.Monad.raise (error_403 request.request_protocol)) else Duppy.Monad.raise (error_404 request.request_protocol)) let file_handler = ((fun _ -> Duppy.Monad.return true), file_request !files_path) @@ -298,7 +298,7 @@ let cgi_handler process path h request = if Array.length ret > 0 then Duppy.Monad.return (Printf.sprintf "%s; extract AUTH_TYPE=%s" env ret.(1)) - else Duppy.Monad.raise error_500 ) + else Duppy.Monad.raise error_500) else Duppy.Monad.return env in Duppy.Monad.bind __pa_duppy_0 (fun env -> @@ -358,7 +358,7 @@ let cgi_handler process path h request = List.filter (fun (x, _) -> x <> "Status") headers ) - with _ -> Duppy.Monad.raise error_500 ) + with _ -> Duppy.Monad.raise error_500) else Duppy.Monad.return ((200, "OK"), headers) in Duppy.Monad.bind __pa_duppy_0 @@ -516,12 +516,11 @@ let () = ignore (Unix.sigprocmask Unix.SIG_BLOCK [Sys.sigpipe]); Unix.setsockopt sock Unix.SO_REUSEADDR true; let rec incoming _ = - ( try - let s, _ = Unix.accept sock in - handle_client s - with e -> - Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e) - ); + (try + let s, _ = Unix.accept sock in + handle_client s + with e -> + Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e)); [ { Duppy.Task.priority = Non_blocking; @@ -530,9 +529,9 @@ let () = }; ] in - ( try Unix.bind sock bind_addr - with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") -> - failwith (Printf.sprintf "port %d already taken" !port) ); + (try Unix.bind sock bind_addr + with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") -> + failwith (Printf.sprintf "port %d already taken" !port)); Unix.listen sock max_conn; Duppy.Task.add scheduler { diff --git a/examples/http_domain.ml b/examples/http_domain.ml new file mode 100644 index 0000000..603bcc9 --- /dev/null +++ b/examples/http_domain.ml @@ -0,0 +1,302 @@ +let queue_mode = ref `Thread +let queues = ref 3 +let port = ref 8080 +let usage = "usage: http_domain [options]" +let ( let* ) = Duppy.Monad.bind + +let () = + let () = + match Domain.recommended_domain_count () with + | 1 -> queue_mode := `Thread + | n -> + queues := n - 1; + queue_mode := `Domain + in + let arg _ = + Printf.eprintf "Error: too many arguments\n"; + exit 1 + in + Arg.parse + [ + ( "--queues", + Arg.Int (fun i -> queues := i), + Printf.sprintf "Number of non-blocking queues. (default: %d)" !queues ); + ( "--mode", + Arg.String + (fun m -> + match m with + | "thread" -> queue_mode := `Thread + | "domain" -> queue_mode := `Thread + | v -> failwith ("Invalid queue mode: " ^ v)), + Printf.sprintf "Queue mode. (default: %s)" + (match !queue_mode with `Thread -> "thread" | `Domain -> "domain") ); + ( "--port", + Arg.Int (fun i -> port := i), + Printf.sprintf "Port used to bind the server. (default: %d)" !port ); + ] + arg usage + +type priority = Non_blocking + +let scheduler = Duppy.create () + +type http_method = Post | Get +type http_protocol = Http_11 | Http_10 + +let string_of_protocol = function + | Http_11 -> "HTTP/1.1" + | Http_10 -> "HTTP/1.0" + +let protocol_of_string = function + | "HTTP/1.1" -> Http_11 + | "HTTP/1.0" -> Http_10 + | _ -> assert false + +let string_of_method = function Post -> "POST" | Get -> "GET" + +let method_of_string = function + | "POST" -> Post + | "GET" -> Get + | _ -> assert false + +type data = None | String of string + +type request = { + request_protocol : http_protocol; + request_method : http_method; + request_uri : string; + request_headers : (string * string) list; + request_data : data; +} + +type reply = { + reply_protocol : http_protocol; + reply_status : int * string; + reply_headers : (string * string) list; + reply_data : data; +} + +exception Assoc of string + +let assoc_uppercase x y = + try + List.iter + (fun (l, v) -> + if String.uppercase_ascii l = x then raise (Assoc v) else ()) + y; + raise Not_found + with Assoc s -> s + +let server = "dhttpd" + +let html_template = + Printf.sprintf + "<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.1//EN\" \ + \"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd\">\r\n\ + <html xmlns=\"http://www.w3.org/1999/xhtml\" xml:lang=\"en\">\r\n\ + %s</html>" + +let server_error status protocol = + let _, explanation = status in + let data = + String + (html_template + (Printf.sprintf "<head><title>%s</title></head>\r\n<body>%s !</body>" + explanation explanation)) + in + { + reply_protocol = protocol; + reply_status = status; + reply_headers = + [("Content-Type", "text/html; charset=UTF-8"); ("Server", server)]; + reply_data = data; + } + +let error_404 = server_error (404, "File Not Found") +let error_500 = server_error (500, "Bad Request") Http_10 +let error_403 = server_error (403, "Forbidden") + +let http_302 protocol uri = + { + reply_protocol = protocol; + reply_status = (302, "Found"); + reply_headers = [("Location", uri)]; + reply_data = String ""; + } + +let send_reply h reply = + let write s = + Duppy.Monad.Io.write ?timeout:None ~priority:Non_blocking h + (Bytes.unsafe_of_string s) + in + let code, status = reply.reply_status in + let http_header = + Printf.sprintf "%s %d %s\r\n%s\r\n\r\n" + (string_of_protocol reply.reply_protocol) + code status + (String.concat "\r\n" + (List.map + (fun (x, y) -> Printf.sprintf "%s: %s" x y) + reply.reply_headers)) + in + let* () = write http_header in + match reply.reply_data with + | String s -> write s + | None -> Duppy.Monad.return () + +let parse_headers headers = + let split_header l h = + try + let rex = Pcre.regexp "([^:\\r\\n]+):\\s*([^\\r\\n]+)" in + let sub = Pcre.exec ~rex h in + Duppy.Monad.return + ((Pcre.get_substring sub 1, Pcre.get_substring sub 2) :: l) + with Not_found -> Duppy.Monad.raise error_500 + in + Duppy.Monad.fold_left split_header [] headers + +let payload = String.init 4096 (fun i -> Char.chr (i mod 100)) + +let handle_request request = + if request.request_uri = "/" then ( + let headers = + [ + ("Server", server); + ("Content-Length", string_of_int (String.length payload)); + ("Content-Type", "application/octet-stream"); + ] + in + Duppy.Monad.raise + { + reply_protocol = request.request_protocol; + reply_status = (200, "OK"); + reply_headers = headers; + reply_data = String payload; + }) + else Duppy.Monad.return (error_404 request.request_protocol) + +let parse_request h r = + try + let headers = Pcre.split ~pat:"\r\n" r in + let* request, headers = + match headers with + | e :: l -> + let* headers = parse_headers l in + Duppy.Monad.return (e, headers) + | _ -> Duppy.Monad.raise error_500 + in + let rex = Pcre.regexp "([\\w]+)\\s([^\\s]+)\\s(HTTP/1.[01])" in + let* http_method, uri, protocol = + try + let sub = Pcre.exec ~rex request in + let http_method, uri, protocol = + ( Pcre.get_substring sub 1, + Pcre.get_substring sub 2, + Pcre.get_substring sub 3 ) + in + Duppy.Monad.return + (method_of_string http_method, uri, protocol_of_string protocol) + with _ -> Duppy.Monad.raise error_500 + in + let* data = + match http_method with + | Get -> Duppy.Monad.return None + | Post -> ( + let* len = + try + let length = assoc_uppercase "CONTENT-LENGTH" headers in + Duppy.Monad.return (int_of_string length) + with + | Not_found -> Duppy.Monad.return 0 + | _ -> Duppy.Monad.raise error_500 + in + match len with + | 0 -> Duppy.Monad.return None + | d -> + let* data = + Duppy.Monad.Io.read ?timeout:None ~priority:Non_blocking + ~marker:(Duppy.Io.Length d) h + in + Duppy.Monad.return (String data)) + in + Duppy.Monad.return + { + request_method = http_method; + request_protocol = protocol; + request_uri = uri; + request_headers = headers; + request_data = data; + } + with _ -> Duppy.Monad.raise error_500 + +let handle_client socket = + (* Read and process lines *) + let on_error _ = error_500 in + let h = { Duppy.Monad.Io.scheduler; socket; data = ""; on_error } in + let exec = + let* reply = + Duppy.Monad.catch + (let* data = + Duppy.Monad.Io.read ?timeout:None ~priority:Non_blocking + ~marker:(Duppy.Io.Split "\r\n\r\n") h + in + let* request = parse_request h data in + handle_request request) + (fun reply -> Duppy.Monad.return reply) + in + send_reply h reply + in + let finish _ = try Unix.close socket with _ -> () in + Duppy.Monad.run ~return:finish ~raise:finish exec + +let new_queue ~priority ~name () = + let priorities p = p = priority in + let queue () = Duppy.queue scheduler ~log:(fun _ -> ()) ~priorities name in + match !queue_mode with + | `Thread -> `Thread (Thread.create queue ()) + | `Domain -> `Domain (Domain.spawn queue) + +let bind_addr_inet = Unix.inet_addr_of_string "0.0.0.0" +let bind_addr = Unix.ADDR_INET (bind_addr_inet, !port) +let max_conn = 100 +let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 + +let () = + (* See http://caml.inria.fr/mantis/print_bug_page.php?bug_id=4640 + * for this: we want Unix EPIPE error and not SIGPIPE, which + * crashes the program.. *) + Sys.set_signal Sys.sigpipe Sys.Signal_ignore; + ignore (Unix.sigprocmask Unix.SIG_BLOCK [Sys.sigpipe]); + Unix.setsockopt sock Unix.SO_REUSEADDR true; + let rec incoming _ = + (try + let s, _ = Unix.accept sock in + handle_client s + with e -> + Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e)); + [ + { + Duppy.Task.priority = Non_blocking; + events = [`Read sock]; + handler = incoming; + }; + ] + in + (try Unix.bind sock bind_addr + with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") -> + failwith (Printf.sprintf "port %d already taken" !port)); + Unix.listen sock max_conn; + Duppy.Task.add scheduler + { + Duppy.Task.priority = Non_blocking; + events = [`Read sock]; + handler = incoming; + }; + for i = 1 to !queues do + Printf.printf "Initiating queue %d\n%!" i; + ignore + (new_queue ~priority:Non_blocking + ~name:(Printf.sprintf "Non blocking queue #%d" i) + ()) + done; + Duppy.queue scheduler ~log:(fun _ -> ()) "root" diff --git a/examples/telnet.ml b/examples/telnet.ml index 1511e33..be808ed 100644 --- a/examples/telnet.ml +++ b/examples/telnet.ml @@ -117,19 +117,18 @@ let sock = socket PF_INET SOCK_STREAM 0 let () = setsockopt sock SO_REUSEADDR true; let rec incoming _ = - ( try - let s, caller = accept sock in - let ip = - let a = - match caller with ADDR_INET (a, _) -> a | _ -> assert false - in - try (gethostbyaddr a).h_name with Not_found -> string_of_inet_addr a - in - Printf.printf "New client: %s\n" ip; - handle_client s - with e -> - Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e) - ); + (try + let s, caller = accept sock in + let ip = + let a = + match caller with ADDR_INET (a, _) -> a | _ -> assert false + in + try (gethostbyaddr a).h_name with Not_found -> string_of_inet_addr a + in + Printf.printf "New client: %s\n" ip; + handle_client s + with e -> + Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e)); [ { Duppy.Task.priority = io_priority; @@ -138,9 +137,9 @@ let () = }; ] in - ( try bind sock bind_addr - with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") -> - failwith (Printf.sprintf "port %d already taken" port) ); + (try bind sock bind_addr + with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") -> + failwith (Printf.sprintf "port %d already taken" port)); listen sock max_conn; Duppy.Task.add scheduler { @@ -1,7 +1,7 @@ (library (name duppy) (public_name duppy) - (libraries unix threads pcre bigarray) + (libraries unix threads pcre) (foreign_stubs (language c) (names duppy_stubs)) diff --git a/src/duppy.ml b/src/duppy.ml index cb0d6e6..dc82e26 100644 --- a/src/duppy.ml +++ b/src/duppy.ml @@ -64,6 +64,7 @@ type 'a t = { } type 'a scheduler = { + on_error : exn -> Printexc.raw_backtrace -> unit; out_pipe : Unix.file_descr; in_pipe : Unix.file_descr; compare : 'a -> 'a -> int; @@ -84,9 +85,10 @@ let clear_tasks s = s.tasks <- []; Mutex.unlock s.tasks_m -let create ?(compare = compare) () = +let create ?(on_error = Printexc.raise_with_backtrace) ?(compare = compare) () = let out_pipe, in_pipe = Unix.pipe () in { + on_error; out_pipe; in_pipe; compare; @@ -126,7 +128,8 @@ module Task = struct enrich = (fun e -> List.fold_left - (fun e -> function `Delay s -> { e with t = min e.t (t0 +. s) } + (fun e -> function + | `Delay s -> { e with t = min e.t (t0 +. s) } | `Read s -> { e with r = s :: e.r } | `Write s -> { e with w = s :: e.w } | `Exception s -> { e with x = s :: e.x }) @@ -268,13 +271,21 @@ let exec s (priorities : 'a -> bool) = * win32 because a thread can double-lock * the same mutex.. *) if Sys.os_type <> "Win32" then assert (not (Mutex.try_lock s.ready_m)); - try - let (_, task), remaining = remove (fun (p, _) -> priorities p) s.ready in - s.ready <- remaining; - Mutex.unlock s.ready_m; - add_t s (task ()); - true - with Not_found -> false + match remove (fun (p, _) -> priorities p) s.ready with + | (_, task), remaining -> + s.ready <- remaining; + Mutex.unlock s.ready_m; + let tasks = + match task () with + | exception exn -> + let bt = Printexc.get_raw_backtrace () in + s.on_error exn bt; + [] + | v -> v + in + add_t s tasks; + true + | exception Not_found -> false exception Queue_stopped exception Queue_processed @@ -325,7 +336,7 @@ let queue ?log ?(priorities = fun _ -> true) s name = Mutex.unlock s.ready_m; process s log; Mutex.unlock s.select_m; - wake (); + wake () end else begin (* We use s.ready_m mutex here. @@ -354,11 +365,12 @@ let queue ?log ?(priorities = fun _ -> true) s name = Condition.signal s.queue_stopped_c; Mutex.unlock s.queues_m in - ( try f () with + (try f () with | Queue_stopped -> () | exn -> - on_done (); - raise exn ); + let bt = Printexc.get_raw_backtrace () in + (try on_done () with _ -> ()); + Printexc.raise_with_backtrace exn bt); on_done () module Async = struct @@ -404,8 +416,8 @@ module Async = struct try begin match t.fd with - | Some t -> ignore (Unix.write t (Bytes.of_string " ") 0 1) - | None -> raise Stopped + | Some t -> ignore (Unix.write t (Bytes.of_string " ") 0 1) + | None -> raise Stopped end; Mutex.unlock t.m with e -> @@ -417,10 +429,10 @@ module Async = struct try begin match t.fd with - | Some c -> - t.stop := true; - ignore (Unix.write c (Bytes.of_string " ") 0 1) - | None -> raise Stopped + | Some c -> + t.stop := true; + ignore (Unix.write c (Bytes.of_string " ") 0 1) + | None -> raise Stopped end; t.fd <- None; Mutex.unlock t.m @@ -593,7 +605,7 @@ struct [{ priority; events; handler = f }] | _ -> exec x; - [] ) + []) | None -> [{ priority; events; handler = f }] in (* Catch all exceptions.. *) @@ -658,12 +670,12 @@ struct let n = write pos len in if n <= 0 then ( on_error Io_error; - [] ) + []) else if n < len then [{ priority; events = [`Write unix_socket]; handler = f (pos + n) }] else ( exec (); - [] ) + []) with | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) when Sys.os_type = "Win32" -> [{ priority; events = [`Write unix_socket]; handler = f pos }] @@ -818,12 +830,12 @@ module Monad = struct if not m.locked then ( (* I don't think shuffling tasks * matters here.. *) - match m.tasks with + match m.tasks with | x :: l -> m.tasks <- l; m.locked <- true; task x :: tasks - | _ -> tasks ) + | _ -> tasks) else tasks let rec handler _ = diff --git a/src/duppy.mli b/src/duppy.mli index be0a4d6..480e050 100644 --- a/src/duppy.mli +++ b/src/duppy.mli @@ -61,7 +61,20 @@ type 'a scheduler (** Initiate a new scheduler * @param compare the comparison function used to sort tasks according to priorities. * Works as in [List.sort] *) -val create : ?compare:('a -> 'a -> int) -> unit -> 'a scheduler +val create : + ?on_error:(exn -> Printexc.raw_backtrace -> unit) -> + ?compare:('a -> 'a -> int) -> + unit -> + 'a scheduler + +(** Internal polling function. Uses `Unix.select` on windows and + `poll` otherwise. *) +val poll : + Unix.file_descr list -> + Unix.file_descr list -> + Unix.file_descr list -> + float -> + Unix.file_descr list * Unix.file_descr list * Unix.file_descr list (** [queue ~log ~priorities s name] * starts a queue, on the scheduler [s] only processing priorities [p] |