summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStéphane Glondu <glondu@debian.org>2021-11-28 17:37:56 +0100
committerStéphane Glondu <glondu@debian.org>2021-11-28 17:37:56 +0100
commit72db6d910338499b96f773e0498d4f9fe32607e2 (patch)
treef6a74276a8cdab5d8dd2020079ee417625b67724
Import ocaml-duppy_0.9.2.orig.tar.gz
[dgit import orig ocaml-duppy_0.9.2.orig.tar.gz]
-rw-r--r--.github/workflows/ci.yml24
-rw-r--r--.gitignore7
-rw-r--r--.merlin4
-rw-r--r--.ocamlformat9
-rwxr-xr-x.travis-ci.sh15
-rw-r--r--CHANGES133
-rw-r--r--COPYING504
-rw-r--r--README.md48
-rw-r--r--dune-project17
-rw-r--r--duppy.opam29
-rw-r--r--examples/dune9
-rw-r--r--examples/http.ml555
-rw-r--r--examples/index.html1
-rw-r--r--examples/telnet.ml151
-rw-r--r--src/dune8
-rw-r--r--src/duppy.ml1077
-rw-r--r--src/duppy.mli549
-rw-r--r--src/duppy_stubs.c176
18 files changed, 3316 insertions, 0 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 0000000..5f6a8f0
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,24 @@
+name: CI
+
+on: [push]
+
+jobs:
+ build:
+ runs-on: ${{ matrix.os }}
+ strategy:
+ matrix:
+ os: [ubuntu-latest, macos-latest]
+ steps:
+ - uses: actions/checkout@v1
+ - name: Setup OCaml
+ uses: avsm/setup-ocaml@master
+ - name: Install depext module
+ run: opam install -y depext
+ - name: Pin locally
+ run: opam pin -y add --no-action .
+ - name: Install locally
+ run: opam depext -y -i duppy
+ - name: Build locally
+ run: eval $(opam env) && dune build
+ - name: Run tests locally
+ run: eval $(opam env) && dune runtest
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..6fadc6f
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+*~
+_build
+*.byte
+*.native
+_tests
+.merlin
+*.install
diff --git a/.merlin b/.merlin
new file mode 100644
index 0000000..6db9a00
--- /dev/null
+++ b/.merlin
@@ -0,0 +1,4 @@
+B src/**
+S src/**
+B +threads
+PKG pcre
diff --git a/.ocamlformat b/.ocamlformat
new file mode 100644
index 0000000..533d804
--- /dev/null
+++ b/.ocamlformat
@@ -0,0 +1,9 @@
+profile = conventional
+break-separators = after
+space-around-lists = false
+doc-comments = before
+match-indent = 2
+match-indent-nested = always
+parens-ite
+exp-grouping = preserve
+module-item-spacing = compact
diff --git a/.travis-ci.sh b/.travis-ci.sh
new file mode 100755
index 0000000..82e886d
--- /dev/null
+++ b/.travis-ci.sh
@@ -0,0 +1,15 @@
+# Hacking the build into Travis-CI "C" environment
+# See http://anil.recoil.org/2013/09/30/travis-and-ocaml.html
+
+export OPAMYES=1
+opam init
+if [ -n "${OPAM_SWITCH}" ]; then
+ opam switch ${OPAM_SWITCH}
+fi
+eval `opam config env`
+opam install -y depext dune
+opam pin -y add --no-action .
+opam depext -y -i duppy
+
+# compile
+dune build
diff --git a/CHANGES b/CHANGES
new file mode 100644
index 0000000..dbfbb62
--- /dev/null
+++ b/CHANGES
@@ -0,0 +1,133 @@
+0.9.2 (07-10-2021)
+=====
+* Fix deadlock issue at shutdown.
+
+0.9.1 (06-21-2021)
+=====
+* Make `stop` synchronous, waiting for all tasks to stop
+ while sending `Condition.signal`. Should avoid potential
+ race-conditions when signaling tasks to end.
+
+0.9.0 (07-10-2020)
+=====
+* Add offset/length to writing functions.
+* Convert to dune.
+* Drop unused SSL and SecureTransport optional libs.
+
+0.8.0 (12-11-2018)
+=====
+* Removed camlp4 syntactic sugar (unmaintained, unused in liquidsoap now).
+
+0.7.4 (10-11-2018)
+=====
+* Fix stack overflow by making recursive function fully tail-rec. (ref savonet/liquidsoap#640)
+
+0.7.3 (12-09-2018)
+=====
+* Fix write/select logic on windows systems. (savonet/liquidsoap#610)
+* Avoid race conditions when shutting down.
+
+0.7.2 (28-08-2018)
+=====
+* Add placeholder implementation for `caml_poll` on Win32.
+
+0.7.1 (18-08-2018)
+=====
+* Use poll() when available.
+* Wake up all queues when shutting down.
+
+0.7.0 (03-11-2017)
+=====
+* Fix bytes compatibility with OCaml 4.06 and above.
+* Fix camlp4 availability test.
+
+0.6.1 (23-08-2017)
+=====
+* Added SecureTransport support.
+
+0.6.0 (11-04-2017)
+=====
+* Added SSL support.
+
+0.5.2 (03-08-2015)
+=====
+* Dummy github release.
+
+0.5.1 (05-08-2013)
+=====
+* Removed win32 select work-around: patch applied upstream.
+
+0.5.0 (04-03-2013)
+=====
+* Remove Panic exception and let original exception bubble through.
+
+0.4.2 (08-10-2011)
+=====
+* Reimplemented monadic Mutex and Condition.
+* Consume more than one char when waking up Async tasks.
+
+0.4.1 (04-08-2011)
+=====
+* Added optional timeout for
+ all [Duppy.Io] and [Duppy.Monad.Io]
+ operations.
+* Fixed handling of EINTR: update the
+ timeout when restarting after being
+ interrupted.
+
+0.4.0 (26-06-2011)
+=====
+* Added a monad API to write
+ server code.
+* Close both sides of the pipe
+ in Duppy.Async
+* Make calls to [stop] and [wake_up]
+ thread-safe in Duppy.Async
+* Catch Unix.EINTR when calling Unix.select.
+
+0.3.2 (19-08-2010)
+=====
+* Switch from Thread.select to
+ Unix.select. They are the same on
+ POSIX and only Unix.select is available
+ on Win32..
+* Do not use assertions on Mutex.try_lock
+ on Win32: on this plateform, a thread can
+ double-lock a mutex, making the assertion
+ inconsistent.
+
+0.3.1 (14-10-2009)
+=====
+* Really catch raised exception on Duppy.Io
+ operations: catching was missing on recurrent
+ calls.
+
+0.3.0 (18-06-2009)
+=====
+* Added support for --enable-debugging configure option
+* Fixed Makefile for BSD: call $(MAKE) for generating documentation.
+* Added the possibility to restart the task after the returned positive
+ delay in Async.
+* Added unknown exceptions on Duppy.Io when calling on_error.
+
+0.2.0 (17-02-2009)
+=====
+* Fixed typo in Duppy.Async: exception is now Stopped.
+
+0.1.2 (01-07-2008)
+=====
+* Changed logic in shutdown for Async interface:
+ now [Duppy.Async.shutdown t] also wakes the task if
+ asleep. Still it can't stop a running task.
+* Fixed race conditions when a queue starts the select loop:
+ a task could be submitted, but no queue would wake up.
+
+0.1.1 (15-04-2008)
+=====
+* Fixed Conditions usage for non-unix systems
+* Fixed typos in the documentation, added some details
+* Installs .cmx file
+
+0.1.0 (07-03-2008)
+=====
+* Initial release
diff --git a/COPYING b/COPYING
new file mode 100644
index 0000000..b1e3f5a
--- /dev/null
+++ b/COPYING
@@ -0,0 +1,504 @@
+ GNU LESSER GENERAL PUBLIC LICENSE
+ Version 2.1, February 1999
+
+ Copyright (C) 1991, 1999 Free Software Foundation, Inc.
+ 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+[This is the first released version of the Lesser GPL. It also counts
+ as the successor of the GNU Library Public License, version 2, hence
+ the version number 2.1.]
+
+ Preamble
+
+ The licenses for most software are designed to take away your
+freedom to share and change it. By contrast, the GNU General Public
+Licenses are intended to guarantee your freedom to share and change
+free software--to make sure the software is free for all its users.
+
+ This license, the Lesser General Public License, applies to some
+specially designated software packages--typically libraries--of the
+Free Software Foundation and other authors who decide to use it. You
+can use it too, but we suggest you first think carefully about whether
+this license or the ordinary General Public License is the better
+strategy to use in any particular case, based on the explanations below.
+
+ When we speak of free software, we are referring to freedom of use,
+not price. Our General Public Licenses are designed to make sure that
+you have the freedom to distribute copies of free software (and charge
+for this service if you wish); that you receive source code or can get
+it if you want it; that you can change the software and use pieces of
+it in new free programs; and that you are informed that you can do
+these things.
+
+ To protect your rights, we need to make restrictions that forbid
+distributors to deny you these rights or to ask you to surrender these
+rights. These restrictions translate to certain responsibilities for
+you if you distribute copies of the library or if you modify it.
+
+ For example, if you distribute copies of the library, whether gratis
+or for a fee, you must give the recipients all the rights that we gave
+you. You must make sure that they, too, receive or can get the source
+code. If you link other code with the library, you must provide
+complete object files to the recipients, so that they can relink them
+with the library after making changes to the library and recompiling
+it. And you must show them these terms so they know their rights.
+
+ We protect your rights with a two-step method: (1) we copyright the
+library, and (2) we offer you this license, which gives you legal
+permission to copy, distribute and/or modify the library.
+
+ To protect each distributor, we want to make it very clear that
+there is no warranty for the free library. Also, if the library is
+modified by someone else and passed on, the recipients should know
+that what they have is not the original version, so that the original
+author's reputation will not be affected by problems that might be
+introduced by others.
+
+ Finally, software patents pose a constant threat to the existence of
+any free program. We wish to make sure that a company cannot
+effectively restrict the users of a free program by obtaining a
+restrictive license from a patent holder. Therefore, we insist that
+any patent license obtained for a version of the library must be
+consistent with the full freedom of use specified in this license.
+
+ Most GNU software, including some libraries, is covered by the
+ordinary GNU General Public License. This license, the GNU Lesser
+General Public License, applies to certain designated libraries, and
+is quite different from the ordinary General Public License. We use
+this license for certain libraries in order to permit linking those
+libraries into non-free programs.
+
+ When a program is linked with a library, whether statically or using
+a shared library, the combination of the two is legally speaking a
+combined work, a derivative of the original library. The ordinary
+General Public License therefore permits such linking only if the
+entire combination fits its criteria of freedom. The Lesser General
+Public License permits more lax criteria for linking other code with
+the library.
+
+ We call this license the "Lesser" General Public License because it
+does Less to protect the user's freedom than the ordinary General
+Public License. It also provides other free software developers Less
+of an advantage over competing non-free programs. These disadvantages
+are the reason we use the ordinary General Public License for many
+libraries. However, the Lesser license provides advantages in certain
+special circumstances.
+
+ For example, on rare occasions, there may be a special need to
+encourage the widest possible use of a certain library, so that it becomes
+a de-facto standard. To achieve this, non-free programs must be
+allowed to use the library. A more frequent case is that a free
+library does the same job as widely used non-free libraries. In this
+case, there is little to gain by limiting the free library to free
+software only, so we use the Lesser General Public License.
+
+ In other cases, permission to use a particular library in non-free
+programs enables a greater number of people to use a large body of
+free software. For example, permission to use the GNU C Library in
+non-free programs enables many more people to use the whole GNU
+operating system, as well as its variant, the GNU/Linux operating
+system.
+
+ Although the Lesser General Public License is Less protective of the
+users' freedom, it does ensure that the user of a program that is
+linked with the Library has the freedom and the wherewithal to run
+that program using a modified version of the Library.
+
+ The precise terms and conditions for copying, distribution and
+modification follow. Pay close attention to the difference between a
+"work based on the library" and a "work that uses the library". The
+former contains code derived from the library, whereas the latter must
+be combined with the library in order to run.
+
+ GNU LESSER GENERAL PUBLIC LICENSE
+ TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
+
+ 0. This License Agreement applies to any software library or other
+program which contains a notice placed by the copyright holder or
+other authorized party saying it may be distributed under the terms of
+this Lesser General Public License (also called "this License").
+Each licensee is addressed as "you".
+
+ A "library" means a collection of software functions and/or data
+prepared so as to be conveniently linked with application programs
+(which use some of those functions and data) to form executables.
+
+ The "Library", below, refers to any such software library or work
+which has been distributed under these terms. A "work based on the
+Library" means either the Library or any derivative work under
+copyright law: that is to say, a work containing the Library or a
+portion of it, either verbatim or with modifications and/or translated
+straightforwardly into another language. (Hereinafter, translation is
+included without limitation in the term "modification".)
+
+ "Source code" for a work means the preferred form of the work for
+making modifications to it. For a library, complete source code means
+all the source code for all modules it contains, plus any associated
+interface definition files, plus the scripts used to control compilation
+and installation of the library.
+
+ Activities other than copying, distribution and modification are not
+covered by this License; they are outside its scope. The act of
+running a program using the Library is not restricted, and output from
+such a program is covered only if its contents constitute a work based
+on the Library (independent of the use of the Library in a tool for
+writing it). Whether that is true depends on what the Library does
+and what the program that uses the Library does.
+
+ 1. You may copy and distribute verbatim copies of the Library's
+complete source code as you receive it, in any medium, provided that
+you conspicuously and appropriately publish on each copy an
+appropriate copyright notice and disclaimer of warranty; keep intact
+all the notices that refer to this License and to the absence of any
+warranty; and distribute a copy of this License along with the
+Library.
+
+ You may charge a fee for the physical act of transferring a copy,
+and you may at your option offer warranty protection in exchange for a
+fee.
+
+ 2. You may modify your copy or copies of the Library or any portion
+of it, thus forming a work based on the Library, and copy and
+distribute such modifications or work under the terms of Section 1
+above, provided that you also meet all of these conditions:
+
+ a) The modified work must itself be a software library.
+
+ b) You must cause the files modified to carry prominent notices
+ stating that you changed the files and the date of any change.
+
+ c) You must cause the whole of the work to be licensed at no
+ charge to all third parties under the terms of this License.
+
+ d) If a facility in the modified Library refers to a function or a
+ table of data to be supplied by an application program that uses
+ the facility, other than as an argument passed when the facility
+ is invoked, then you must make a good faith effort to ensure that,
+ in the event an application does not supply such function or
+ table, the facility still operates, and performs whatever part of
+ its purpose remains meaningful.
+
+ (For example, a function in a library to compute square roots has
+ a purpose that is entirely well-defined independent of the
+ application. Therefore, Subsection 2d requires that any
+ application-supplied function or table used by this function must
+ be optional: if the application does not supply it, the square
+ root function must still compute square roots.)
+
+These requirements apply to the modified work as a whole. If
+identifiable sections of that work are not derived from the Library,
+and can be reasonably considered independent and separate works in
+themselves, then this License, and its terms, do not apply to those
+sections when you distribute them as separate works. But when you
+distribute the same sections as part of a whole which is a work based
+on the Library, the distribution of the whole must be on the terms of
+this License, whose permissions for other licensees extend to the
+entire whole, and thus to each and every part regardless of who wrote
+it.
+
+Thus, it is not the intent of this section to claim rights or contest
+your rights to work written entirely by you; rather, the intent is to
+exercise the right to control the distribution of derivative or
+collective works based on the Library.
+
+In addition, mere aggregation of another work not based on the Library
+with the Library (or with a work based on the Library) on a volume of
+a storage or distribution medium does not bring the other work under
+the scope of this License.
+
+ 3. You may opt to apply the terms of the ordinary GNU General Public
+License instead of this License to a given copy of the Library. To do
+this, you must alter all the notices that refer to this License, so
+that they refer to the ordinary GNU General Public License, version 2,
+instead of to this License. (If a newer version than version 2 of the
+ordinary GNU General Public License has appeared, then you can specify
+that version instead if you wish.) Do not make any other change in
+these notices.
+
+ Once this change is made in a given copy, it is irreversible for
+that copy, so the ordinary GNU General Public License applies to all
+subsequent copies and derivative works made from that copy.
+
+ This option is useful when you wish to copy part of the code of
+the Library into a program that is not a library.
+
+ 4. You may copy and distribute the Library (or a portion or
+derivative of it, under Section 2) in object code or executable form
+under the terms of Sections 1 and 2 above provided that you accompany
+it with the complete corresponding machine-readable source code, which
+must be distributed under the terms of Sections 1 and 2 above on a
+medium customarily used for software interchange.
+
+ If distribution of object code is made by offering access to copy
+from a designated place, then offering equivalent access to copy the
+source code from the same place satisfies the requirement to
+distribute the source code, even though third parties are not
+compelled to copy the source along with the object code.
+
+ 5. A program that contains no derivative of any portion of the
+Library, but is designed to work with the Library by being compiled or
+linked with it, is called a "work that uses the Library". Such a
+work, in isolation, is not a derivative work of the Library, and
+therefore falls outside the scope of this License.
+
+ However, linking a "work that uses the Library" with the Library
+creates an executable that is a derivative of the Library (because it
+contains portions of the Library), rather than a "work that uses the
+library". The executable is therefore covered by this License.
+Section 6 states terms for distribution of such executables.
+
+ When a "work that uses the Library" uses material from a header file
+that is part of the Library, the object code for the work may be a
+derivative work of the Library even though the source code is not.
+Whether this is true is especially significant if the work can be
+linked without the Library, or if the work is itself a library. The
+threshold for this to be true is not precisely defined by law.
+
+ If such an object file uses only numerical parameters, data
+structure layouts and accessors, and small macros and small inline
+functions (ten lines or less in length), then the use of the object
+file is unrestricted, regardless of whether it is legally a derivative
+work. (Executables containing this object code plus portions of the
+Library will still fall under Section 6.)
+
+ Otherwise, if the work is a derivative of the Library, you may
+distribute the object code for the work under the terms of Section 6.
+Any executables containing that work also fall under Section 6,
+whether or not they are linked directly with the Library itself.
+
+ 6. As an exception to the Sections above, you may also combine or
+link a "work that uses the Library" with the Library to produce a
+work containing portions of the Library, and distribute that work
+under terms of your choice, provided that the terms permit
+modification of the work for the customer's own use and reverse
+engineering for debugging such modifications.
+
+ You must give prominent notice with each copy of the work that the
+Library is used in it and that the Library and its use are covered by
+this License. You must supply a copy of this License. If the work
+during execution displays copyright notices, you must include the
+copyright notice for the Library among them, as well as a reference
+directing the user to the copy of this License. Also, you must do one
+of these things:
+
+ a) Accompany the work with the complete corresponding
+ machine-readable source code for the Library including whatever
+ changes were used in the work (which must be distributed under
+ Sections 1 and 2 above); and, if the work is an executable linked
+ with the Library, with the complete machine-readable "work that
+ uses the Library", as object code and/or source code, so that the
+ user can modify the Library and then relink to produce a modified
+ executable containing the modified Library. (It is understood
+ that the user who changes the contents of definitions files in the
+ Library will not necessarily be able to recompile the application
+ to use the modified definitions.)
+
+ b) Use a suitable shared library mechanism for linking with the
+ Library. A suitable mechanism is one that (1) uses at run time a
+ copy of the library already present on the user's computer system,
+ rather than copying library functions into the executable, and (2)
+ will operate properly with a modified version of the library, if
+ the user installs one, as long as the modified version is
+ interface-compatible with the version that the work was made with.
+
+ c) Accompany the work with a written offer, valid for at
+ least three years, to give the same user the materials
+ specified in Subsection 6a, above, for a charge no more
+ than the cost of performing this distribution.
+
+ d) If distribution of the work is made by offering access to copy
+ from a designated place, offer equivalent access to copy the above
+ specified materials from the same place.
+
+ e) Verify that the user has already received a copy of these
+ materials or that you have already sent this user a copy.
+
+ For an executable, the required form of the "work that uses the
+Library" must include any data and utility programs needed for
+reproducing the executable from it. However, as a special exception,
+the materials to be distributed need not include anything that is
+normally distributed (in either source or binary form) with the major
+components (compiler, kernel, and so on) of the operating system on
+which the executable runs, unless that component itself accompanies
+the executable.
+
+ It may happen that this requirement contradicts the license
+restrictions of other proprietary libraries that do not normally
+accompany the operating system. Such a contradiction means you cannot
+use both them and the Library together in an executable that you
+distribute.
+
+ 7. You may place library facilities that are a work based on the
+Library side-by-side in a single library together with other library
+facilities not covered by this License, and distribute such a combined
+library, provided that the separate distribution of the work based on
+the Library and of the other library facilities is otherwise
+permitted, and provided that you do these two things:
+
+ a) Accompany the combined library with a copy of the same work
+ based on the Library, uncombined with any other library
+ facilities. This must be distributed under the terms of the
+ Sections above.
+
+ b) Give prominent notice with the combined library of the fact
+ that part of it is a work based on the Library, and explaining
+ where to find the accompanying uncombined form of the same work.
+
+ 8. You may not copy, modify, sublicense, link with, or distribute
+the Library except as expressly provided under this License. Any
+attempt otherwise to copy, modify, sublicense, link with, or
+distribute the Library is void, and will automatically terminate your
+rights under this License. However, parties who have received copies,
+or rights, from you under this License will not have their licenses
+terminated so long as such parties remain in full compliance.
+
+ 9. You are not required to accept this License, since you have not
+signed it. However, nothing else grants you permission to modify or
+distribute the Library or its derivative works. These actions are
+prohibited by law if you do not accept this License. Therefore, by
+modifying or distributing the Library (or any work based on the
+Library), you indicate your acceptance of this License to do so, and
+all its terms and conditions for copying, distributing or modifying
+the Library or works based on it.
+
+ 10. Each time you redistribute the Library (or any work based on the
+Library), the recipient automatically receives a license from the
+original licensor to copy, distribute, link with or modify the Library
+subject to these terms and conditions. You may not impose any further
+restrictions on the recipients' exercise of the rights granted herein.
+You are not responsible for enforcing compliance by third parties with
+this License.
+
+ 11. If, as a consequence of a court judgment or allegation of patent
+infringement or for any other reason (not limited to patent issues),
+conditions are imposed on you (whether by court order, agreement or
+otherwise) that contradict the conditions of this License, they do not
+excuse you from the conditions of this License. If you cannot
+distribute so as to satisfy simultaneously your obligations under this
+License and any other pertinent obligations, then as a consequence you
+may not distribute the Library at all. For example, if a patent
+license would not permit royalty-free redistribution of the Library by
+all those who receive copies directly or indirectly through you, then
+the only way you could satisfy both it and this License would be to
+refrain entirely from distribution of the Library.
+
+If any portion of this section is held invalid or unenforceable under any
+particular circumstance, the balance of the section is intended to apply,
+and the section as a whole is intended to apply in other circumstances.
+
+It is not the purpose of this section to induce you to infringe any
+patents or other property right claims or to contest validity of any
+such claims; this section has the sole purpose of protecting the
+integrity of the free software distribution system which is
+implemented by public license practices. Many people have made
+generous contributions to the wide range of software distributed
+through that system in reliance on consistent application of that
+system; it is up to the author/donor to decide if he or she is willing
+to distribute software through any other system and a licensee cannot
+impose that choice.
+
+This section is intended to make thoroughly clear what is believed to
+be a consequence of the rest of this License.
+
+ 12. If the distribution and/or use of the Library is restricted in
+certain countries either by patents or by copyrighted interfaces, the
+original copyright holder who places the Library under this License may add
+an explicit geographical distribution limitation excluding those countries,
+so that distribution is permitted only in or among countries not thus
+excluded. In such case, this License incorporates the limitation as if
+written in the body of this License.
+
+ 13. The Free Software Foundation may publish revised and/or new
+versions of the Lesser General Public License from time to time.
+Such new versions will be similar in spirit to the present version,
+but may differ in detail to address new problems or concerns.
+
+Each version is given a distinguishing version number. If the Library
+specifies a version number of this License which applies to it and
+"any later version", you have the option of following the terms and
+conditions either of that version or of any later version published by
+the Free Software Foundation. If the Library does not specify a
+license version number, you may choose any version ever published by
+the Free Software Foundation.
+
+ 14. If you wish to incorporate parts of the Library into other free
+programs whose distribution conditions are incompatible with these,
+write to the author to ask for permission. For software which is
+copyrighted by the Free Software Foundation, write to the Free
+Software Foundation; we sometimes make exceptions for this. Our
+decision will be guided by the two goals of preserving the free status
+of all derivatives of our free software and of promoting the sharing
+and reuse of software generally.
+
+ NO WARRANTY
+
+ 15. BECAUSE THE LIBRARY IS LICENSED FREE OF CHARGE, THERE IS NO
+WARRANTY FOR THE LIBRARY, TO THE EXTENT PERMITTED BY APPLICABLE LAW.
+EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR
+OTHER PARTIES PROVIDE THE LIBRARY "AS IS" WITHOUT WARRANTY OF ANY
+KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE
+LIBRARY IS WITH YOU. SHOULD THE LIBRARY PROVE DEFECTIVE, YOU ASSUME
+THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
+
+ 16. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN
+WRITING WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY
+AND/OR REDISTRIBUTE THE LIBRARY AS PERMITTED ABOVE, BE LIABLE TO YOU
+FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR
+CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE
+LIBRARY (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING
+RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A
+FAILURE OF THE LIBRARY TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF
+SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGES.
+
+ END OF TERMS AND CONDITIONS
+
+ How to Apply These Terms to Your New Libraries
+
+ If you develop a new library, and you want it to be of the greatest
+possible use to the public, we recommend making it free software that
+everyone can redistribute and change. You can do so by permitting
+redistribution under these terms (or, alternatively, under the terms of the
+ordinary General Public License).
+
+ To apply these terms, attach the following notices to the library. It is
+safest to attach them to the start of each source file to most effectively
+convey the exclusion of warranty; and each file should have at least the
+"copyright" line and a pointer to where the full notice is found.
+
+ <one line to give the library's name and a brief idea of what it does.>
+ Copyright (C) <year> <name of author>
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+Also add information on how to contact you by electronic and paper mail.
+
+You should also get your employer (if you work as a programmer) or your
+school, if any, to sign a "copyright disclaimer" for the library, if
+necessary. Here is a sample; alter the names:
+
+ Yoyodyne, Inc., hereby disclaims all copyright interest in the
+ library `Frob' (a library for tweaking knobs) written by James Random Hacker.
+
+ <signature of Ty Coon>, 1 April 1990
+ Ty Coon, President of Vice
+
+That's all there is to it!
+
+
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..d621f21
--- /dev/null
+++ b/README.md
@@ -0,0 +1,48 @@
+# ocaml-duppy
+
+ocaml-duppy is an advanced scheduler for Ocaml programmers.
+
+Please read the COPYING file before using this software.
+
+## Prerequisites:
+
+- ocaml >= 4.03.0
+
+- findlib >= 1.8.0
+
+- ocaml-pcre >= 7.3.4
+
+- dune >= 2.0
+
+The code may work with earlier versions but these are the one currently
+supported.
+
+## Compilation:
+
+```sh
+$ dune build
+```
+
+This should build both the native and the byte-code version of the
+extension library.
+
+## Installation:
+
+Via `opam`:
+
+```sh
+$ opam install duppy
+```
+
+Via `dune` (for developers):
+```sh
+$ dune install
+```
+
+This should install the library file (using ocamlfind) in the
+appropriate place.
+
+## Author:
+
+This author of this software may be contacted by electronic mail
+at the following address: savonet-users@lists.sourceforge.net.
diff --git a/dune-project b/dune-project
new file mode 100644
index 0000000..6c50961
--- /dev/null
+++ b/dune-project
@@ -0,0 +1,17 @@
+(lang dune 2.7)
+(version 0.9.2)
+(name duppy)
+(source (github savonet/ocaml-duppy))
+(license GPL-2.0)
+(authors "Romain Beauxis <toots@rastageeks.org>")
+(maintainers "The Savonet Team <savonet-users@lists.sourceforge.net>")
+
+(generate_opam_files true)
+
+(package
+ (name duppy)
+ (synopsis "Library providing monadic threads")
+ (depends
+ dune
+ pcre)
+)
diff --git a/duppy.opam b/duppy.opam
new file mode 100644
index 0000000..0095389
--- /dev/null
+++ b/duppy.opam
@@ -0,0 +1,29 @@
+# This file is generated by dune, edit dune-project instead
+opam-version: "2.0"
+version: "0.9.2"
+synopsis: "Library providing monadic threads"
+maintainer: ["The Savonet Team <savonet-users@lists.sourceforge.net>"]
+authors: ["Romain Beauxis <toots@rastageeks.org>"]
+license: "GPL-2.0"
+homepage: "https://github.com/savonet/ocaml-duppy"
+bug-reports: "https://github.com/savonet/ocaml-duppy/issues"
+depends: [
+ "dune" {>= "2.7"}
+ "pcre"
+ "odoc" {with-doc}
+]
+build: [
+ ["dune" "subst"] {dev}
+ [
+ "dune"
+ "build"
+ "-p"
+ name
+ "-j"
+ jobs
+ "@install"
+ "@runtest" {with-test}
+ "@doc" {with-doc}
+ ]
+]
+dev-repo: "git+https://github.com/savonet/ocaml-duppy.git"
diff --git a/examples/dune b/examples/dune
new file mode 100644
index 0000000..887559f
--- /dev/null
+++ b/examples/dune
@@ -0,0 +1,9 @@
+(executable
+ (name http)
+ (modules http)
+ (libraries duppy))
+
+(executable
+ (name telnet)
+ (modules telnet)
+ (libraries duppy))
diff --git a/examples/http.ml b/examples/http.ml
new file mode 100644
index 0000000..d6e65ce
--- /dev/null
+++ b/examples/http.ml
@@ -0,0 +1,555 @@
+let non_blocking_queues = ref 3
+let maybe_blocking_queues = ref 1
+let files_path = ref ""
+let port = ref 8080
+let usage = "usage: http [options] /path/to/files"
+
+let () =
+ let pnum = ref 0 in
+ let arg s =
+ incr pnum;
+ if !pnum > 1 then (
+ Printf.eprintf "Error: too many arguments\n";
+ exit 1 )
+ else files_path := s
+ in
+ Arg.parse
+ [
+ ( "--non_blocking_queues",
+ Arg.Int (fun i -> non_blocking_queues := i),
+ Printf.sprintf "Number of non-blocking queues. (default: %d)"
+ !non_blocking_queues );
+ ( "--maybe_blocking_queues",
+ Arg.Int (fun i -> maybe_blocking_queues := i),
+ Printf.sprintf "Number of maybe-blocking queues. (default: %d)"
+ !maybe_blocking_queues );
+ ( "--port",
+ Arg.Int (fun i -> port := i),
+ Printf.sprintf "Port used to bind the server. (default: %d)" !port );
+ ]
+ arg usage;
+ if !files_path = "" then (
+ Printf.printf "%s\n" usage;
+ exit 1 )
+ else ()
+
+type priority = Maybe_blocking | Non_blocking
+
+let scheduler = Duppy.create ()
+
+type http_method = Post | Get
+type http_protocol = Http_11 | Http_10
+
+let string_of_protocol = function
+ | Http_11 -> "HTTP/1.1"
+ | Http_10 -> "HTTP/1.0"
+
+let protocol_of_string = function
+ | "HTTP/1.1" -> Http_11
+ | "HTTP/1.0" -> Http_10
+ | _ -> assert false
+
+let string_of_method = function Post -> "POST" | Get -> "GET"
+
+let method_of_string = function
+ | "POST" -> Post
+ | "GET" -> Get
+ | _ -> assert false
+
+type data = None | String of string | File of Unix.file_descr
+
+type request = {
+ request_protocol : http_protocol;
+ request_method : http_method;
+ request_uri : string;
+ request_headers : (string * string) list;
+ request_data : data;
+}
+
+type reply = {
+ reply_protocol : http_protocol;
+ reply_status : int * string;
+ reply_headers : (string * string) list;
+ reply_data : data;
+}
+
+exception Assoc of string
+
+let assoc_uppercase x y =
+ try
+ List.iter
+ (fun (l, v) ->
+ if String.uppercase_ascii l = x then raise (Assoc v) else ())
+ y;
+ raise Not_found
+ with Assoc s -> s
+
+let server = "dhttpd"
+
+let html_template =
+ Printf.sprintf
+ "<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.1//EN\" \
+ \"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd\">\r\n\
+ <html xmlns=\"http://www.w3.org/1999/xhtml\" xml:lang=\"en\">\r\n\
+ %s</html>"
+
+let server_error status protocol =
+ let _, explanation = status in
+ let data =
+ String
+ (html_template
+ (Printf.sprintf "<head><title>%s</title></head>\r\n<body>%s !</body>"
+ explanation explanation))
+ in
+ {
+ reply_protocol = protocol;
+ reply_status = status;
+ reply_headers =
+ [("Content-Type", "text/html; charset=UTF-8"); ("Server", server)];
+ reply_data = data;
+ }
+
+let error_404 = server_error (404, "File Not Found")
+let error_500 = server_error (500, "Bad Request") Http_10
+let error_403 = server_error (403, "Forbidden")
+
+let http_302 protocol uri =
+ {
+ reply_protocol = protocol;
+ reply_status = (302, "Found");
+ reply_headers = [("Location", uri)];
+ reply_data = String "";
+ }
+
+type socket_status = Keep | Close
+
+let send_reply h reply =
+ let write s =
+ Duppy.Monad.Io.write ?timeout:None ~priority:Non_blocking h
+ (Bytes.unsafe_of_string s)
+ in
+ let code, status = reply.reply_status in
+ let http_header =
+ Printf.sprintf "%s %d %s\r\n%s\r\n\r\n"
+ (string_of_protocol reply.reply_protocol)
+ code status
+ (String.concat "\r\n"
+ (List.map
+ (fun (x, y) -> Printf.sprintf "%s: %s" x y)
+ reply.reply_headers))
+ in
+ Duppy.Monad.bind (write http_header) (fun () ->
+ match reply.reply_data with
+ | String s -> write s
+ | File fd ->
+ let stats = Unix.fstat fd in
+ let ba =
+ Unix.map_file fd Bigarray.char Bigarray.c_layout false
+ [| stats.Unix.st_size |]
+ in
+ let ba = Bigarray.array1_of_genarray ba in
+ let close () = try Unix.close fd with _ -> () in
+ let on_error e =
+ close ();
+ h.Duppy.Monad.Io.on_error e
+ in
+ let h = { h with Duppy.Monad.Io.on_error } in
+ Duppy.Monad.bind
+ (Duppy.Monad.Io.write_bigarray ?timeout:None
+ ~priority:Non_blocking h ba) (fun () ->
+ Duppy.Monad.return (close ()))
+ | None -> Duppy.Monad.return ())
+
+let parse_headers headers =
+ let split_header l h =
+ try
+ let rex = Pcre.regexp "([^:\\r\\n]+):\\s*([^\\r\\n]+)" in
+ let sub = Pcre.exec ~rex h in
+ Duppy.Monad.return
+ ((Pcre.get_substring sub 1, Pcre.get_substring sub 2) :: l)
+ with Not_found -> Duppy.Monad.raise error_500
+ in
+ Duppy.Monad.fold_left split_header [] headers
+
+let index_uri path index protocol uri =
+ let uri =
+ try
+ let ret = Pcre.extract ~rex:(Pcre.regexp "([^\\?]*)\\?") uri in
+ ret.(1)
+ with Not_found -> uri
+ in
+ try
+ if Sys.is_directory (Printf.sprintf "%s%s" path uri) then
+ if uri.[String.length uri - 1] <> '/' then
+ Duppy.Monad.raise (http_302 protocol (Printf.sprintf "%s/" uri))
+ else (
+ let index = Printf.sprintf "%s/%s" uri index in
+ if Sys.file_exists (Printf.sprintf "%s/%s" path index) then
+ Duppy.Monad.return index
+ else Duppy.Monad.return uri )
+ else Duppy.Monad.return uri
+ with _ -> Duppy.Monad.return uri
+
+let file_request path _ request =
+ let uri =
+ try
+ let ret =
+ Pcre.extract ~rex:(Pcre.regexp "([^\\?]*)\\?.*") request.request_uri
+ in
+ ret.(1)
+ with Not_found -> request.request_uri
+ in
+ let __pa_duppy_0 = index_uri path "index.html" request.request_protocol uri in
+ Duppy.Monad.bind __pa_duppy_0 (fun uri ->
+ let fname = Printf.sprintf "%s%s" path uri in
+ if Sys.file_exists fname then (
+ try
+ let fd = Unix.openfile fname [Unix.O_RDONLY] 0o640 in
+ let stats = Unix.fstat fd in
+ let headers =
+ [
+ ("Server", server);
+ ("Content-Length", string_of_int stats.Unix.st_size);
+ ]
+ in
+ let headers =
+ if Pcre.pmatch ~rex:(Pcre.regexp "\\.html$") fname then
+ ("Content-Type", "text/html") :: headers
+ else if Pcre.pmatch ~rex:(Pcre.regexp "\\.css$") fname then
+ ("Content-Type", "text/css") :: headers
+ else headers
+ in
+ Duppy.Monad.raise
+ {
+ reply_protocol = request.request_protocol;
+ reply_status = (200, "OK");
+ reply_headers = headers;
+ reply_data = File fd;
+ }
+ with _ -> Duppy.Monad.raise (error_403 request.request_protocol) )
+ else Duppy.Monad.raise (error_404 request.request_protocol))
+
+let file_handler = ((fun _ -> Duppy.Monad.return true), file_request !files_path)
+
+let cgi_handler process path h request =
+ let uri, args, suffix =
+ try
+ let ret =
+ Pcre.extract ~rex:(Pcre.regexp "([^\\?]*)\\?(.*)") request.request_uri
+ in
+ try
+ let ans =
+ Pcre.extract ~rex:(Pcre.regexp "^([^/]*)/([^&=]*)$") ret.(2)
+ in
+ (ret.(1), ans.(1), ans.(2))
+ with Not_found -> (ret.(1), ret.(2), "")
+ with Not_found -> (request.request_uri, "", "")
+ in
+ let __pa_duppy_0 = index_uri path "index.php" request.request_protocol uri in
+ Duppy.Monad.bind __pa_duppy_0 (fun script ->
+ let script = Printf.sprintf "%s%s" path script in
+ let env =
+ Printf.sprintf
+ "export SERVER_SOFTWARE=Duppy-httpd/1.0; export \
+ SERVER_NAME=localhost; export GATEWAY_INTERFACE=CGI/1.1; export \
+ SERVER_PROTOCOL=%s; export SERVER_PORT=%d; export \
+ REQUEST_METHOD=%s; export REQUEST_URI=%s; export \
+ REDIRECT_STATUS=200; export SCRIPT_FILENAME=%s"
+ (string_of_protocol request.request_protocol)
+ !port
+ (string_of_method request.request_method)
+ (Filename.quote uri) (Filename.quote script)
+ in
+ let env =
+ Printf.sprintf "%s; export QUERY_STRING=%s" env (Filename.quote args)
+ in
+ let env =
+ let tr_suffix = Printf.sprintf "%s%s" path suffix in
+ (* Trick ! *)
+ let tr_suffix =
+ Printf.sprintf "%s/%s"
+ (Filename.dirname tr_suffix)
+ (Filename.basename tr_suffix)
+ in
+ Printf.sprintf "%s; export PATH_TRANSLATED=%s; export PATH_INFO=%s" env
+ (Filename.quote tr_suffix) (Filename.quote suffix)
+ in
+ let sanitize s =
+ Pcre.replace ~pat:"-" ~templ:"_" (String.uppercase_ascii s)
+ in
+ let headers =
+ List.map (fun (x, y) -> (sanitize x, y)) request.request_headers
+ in
+ let append env key =
+ if List.mem_assoc key headers then
+ Printf.sprintf "%s; export %s=%s" env key
+ (Filename.quote (List.assoc key headers))
+ else env
+ in
+ let env = append env "CONTENT_TYPE" in
+ let env = append env "CONTENT_LENGTH" in
+ let __pa_duppy_0 =
+ if List.mem_assoc "AUTHORIZATION" headers then (
+ let ret =
+ Pcre.extract
+ ~rex:(Pcre.regexp "(^[^\\s]*\\s.*)$")
+ (List.assoc "AUTHORIZATION" headers)
+ in
+ if Array.length ret > 0 then
+ Duppy.Monad.return
+ (Printf.sprintf "%s; extract AUTH_TYPE=%s" env ret.(1))
+ else Duppy.Monad.raise error_500 )
+ else Duppy.Monad.return env
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun env ->
+ let f env (x, y) =
+ Printf.sprintf "%s; export HTTP_%s=%s" env x (Filename.quote y)
+ in
+ let env = List.fold_left f env headers in
+ let data =
+ match request.request_data with
+ | None -> ""
+ | String s -> s
+ | _ -> assert false
+ in
+ (* not implemented *)
+ let process = Printf.sprintf "%s; %s 2>/dev/null" env process in
+ let in_c, out_c = Unix.open_process process in
+ let out_s = Unix.descr_of_out_channel out_c in
+ let h = { h with Duppy.Monad.Io.socket = out_s; data = "" } in
+ let __pa_duppy_0 =
+ Duppy.Monad.Io.write ?timeout:None ~priority:Non_blocking h
+ (Bytes.unsafe_of_string data)
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun () ->
+ let in_s = Unix.descr_of_in_channel in_c in
+ let h = { h with Duppy.Monad.Io.socket = in_s; data = "" } in
+ let __pa_duppy_0 =
+ Duppy.Monad.Io.read ?timeout:None ~priority:Non_blocking
+ ~marker:(Duppy.Io.Split "[\r]?\n[\r]?\n") h
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun headers ->
+ let __pa_duppy_0 =
+ Duppy.Monad.catch
+ (Duppy.Monad.Io.read_all ?timeout:None
+ ~priority:Non_blocking h.Duppy.Monad.Io.scheduler in_s)
+ (fun (s, _) -> Duppy.Monad.return s)
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun data ->
+ let data =
+ Printf.sprintf "%s%s" h.Duppy.Monad.Io.data data
+ in
+ ignore (Unix.close_process (in_c, out_c));
+ let __pa_duppy_0 =
+ let headers = Pcre.split ~pat:"\r\n" headers in
+ parse_headers headers
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun headers ->
+ let __pa_duppy_0 =
+ if List.mem_assoc "Status" headers then (
+ try
+ let ans =
+ Pcre.extract
+ ~rex:(Pcre.regexp "([\\d]+)\\s(.*)")
+ (List.assoc "Status" headers)
+ in
+ Duppy.Monad.return
+ ( (int_of_string ans.(1), ans.(2)),
+ List.filter
+ (fun (x, _) -> x <> "Status")
+ headers )
+ with _ -> Duppy.Monad.raise error_500 )
+ else Duppy.Monad.return ((200, "OK"), headers)
+ in
+ Duppy.Monad.bind __pa_duppy_0
+ (fun (status, headers) ->
+ let headers =
+ ( "Content-length",
+ string_of_int (String.length data) )
+ :: headers
+ in
+ Duppy.Monad.raise
+ {
+ reply_protocol = request.request_protocol;
+ reply_status = status;
+ reply_headers = headers;
+ reply_data = String data;
+ })))))))
+
+let php_handler =
+ ( (fun request ->
+ let __pa_duppy_0 =
+ index_uri !files_path "index.php" request.request_protocol
+ request.request_uri
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun uri ->
+ Duppy.Monad.return (Pcre.pmatch ~rex:(Pcre.regexp "\\.php$") uri))),
+ cgi_handler "php-cgi" !files_path )
+
+let handlers = [php_handler; file_handler]
+
+let handle_request h request =
+ let f (check, handler) =
+ let __pa_duppy_0 = check request in
+ Duppy.Monad.bind __pa_duppy_0 (fun check ->
+ if check then handler h request else Duppy.Monad.return ())
+ in
+ Duppy.Monad.catch
+ (Duppy.Monad.bind (Duppy.Monad.iter f handlers) (fun () ->
+ Duppy.Monad.return (error_404 request.request_protocol)))
+ (fun reply -> Duppy.Monad.return reply)
+
+let parse_request h r =
+ try
+ let headers = Pcre.split ~pat:"\r\n" r in
+ let __pa_duppy_0 =
+ match headers with
+ | e :: l ->
+ let __pa_duppy_0 = parse_headers l in
+ Duppy.Monad.bind __pa_duppy_0 (fun headers ->
+ Duppy.Monad.return (e, headers))
+ | _ -> Duppy.Monad.raise error_500
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun (request, headers) ->
+ let rex = Pcre.regexp "([\\w]+)\\s([^\\s]+)\\s(HTTP/1.[01])" in
+ let __pa_duppy_0 =
+ try
+ let sub = Pcre.exec ~rex request in
+ let http_method, uri, protocol =
+ ( Pcre.get_substring sub 1,
+ Pcre.get_substring sub 2,
+ Pcre.get_substring sub 3 )
+ in
+ Duppy.Monad.return
+ (method_of_string http_method, uri, protocol_of_string protocol)
+ with _ -> Duppy.Monad.raise error_500
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun (http_method, uri, protocol) ->
+ let __pa_duppy_0 =
+ match http_method with
+ | Get -> Duppy.Monad.return None
+ | Post ->
+ let __pa_duppy_0 =
+ try
+ let length = assoc_uppercase "CONTENT-LENGTH" headers in
+ Duppy.Monad.return (int_of_string length)
+ with
+ | Not_found -> Duppy.Monad.return 0
+ | _ -> Duppy.Monad.raise error_500
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun len ->
+ match len with
+ | 0 -> Duppy.Monad.return None
+ | d ->
+ let __pa_duppy_0 =
+ Duppy.Monad.Io.read ?timeout:None
+ ~priority:Non_blocking
+ ~marker:(Duppy.Io.Length d) h
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun data ->
+ Duppy.Monad.return (String data)))
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun data ->
+ Duppy.Monad.return
+ {
+ request_method = http_method;
+ request_protocol = protocol;
+ request_uri = uri;
+ request_headers = headers;
+ request_data = data;
+ })))
+ with _ -> Duppy.Monad.raise error_500
+
+let handle_client socket =
+ (* Read and process lines *)
+ let on_error _ = error_500 in
+ let h = { Duppy.Monad.Io.scheduler; socket; data = ""; on_error } in
+ let rec exec () =
+ let __pa_duppy_0 =
+ Duppy.Monad.catch
+ (let __pa_duppy_0 =
+ Duppy.Monad.Io.read ?timeout:None ~priority:Non_blocking
+ ~marker:(Duppy.Io.Split "\r\n\r\n") h
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun data ->
+ let __pa_duppy_0 = parse_request h data in
+ Duppy.Monad.bind __pa_duppy_0 (fun request ->
+ let __pa_duppy_0 = handle_request h request in
+ Duppy.Monad.bind __pa_duppy_0 (fun reply ->
+ let close_header headers =
+ try assoc_uppercase "CONNECTION" headers = "close"
+ with Not_found -> false
+ in
+ let keep =
+ if
+ request.request_protocol = Http_10
+ || close_header request.request_headers
+ || close_header reply.reply_headers
+ then Close
+ else Keep
+ in
+ Duppy.Monad.return (keep, reply)))))
+ (fun reply -> Duppy.Monad.return (Close, reply))
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun (keep, reply) ->
+ Duppy.Monad.bind (send_reply h reply) (fun () ->
+ if keep = Keep then exec () else Duppy.Monad.return ()))
+ in
+ let finish _ = try Unix.close socket with _ -> () in
+ Duppy.Monad.run ~return:finish ~raise:finish (exec ())
+
+let new_queue ~priority ~name () =
+ let priorities p = p = priority in
+ let queue () = Duppy.queue scheduler ~log:(fun _ -> ()) ~priorities name in
+ Thread.create queue ()
+
+let bind_addr_inet = Unix.inet_addr_of_string "0.0.0.0"
+let bind_addr = Unix.ADDR_INET (bind_addr_inet, !port)
+let max_conn = 100
+let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0
+
+let () =
+ (* See http://caml.inria.fr/mantis/print_bug_page.php?bug_id=4640
+ * for this: we want Unix EPIPE error and not SIGPIPE, which
+ * crashes the program.. *)
+ Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
+ ignore (Unix.sigprocmask Unix.SIG_BLOCK [Sys.sigpipe]);
+ Unix.setsockopt sock Unix.SO_REUSEADDR true;
+ let rec incoming _ =
+ ( try
+ let s, _ = Unix.accept sock in
+ handle_client s
+ with e ->
+ Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e)
+ );
+ [
+ {
+ Duppy.Task.priority = Non_blocking;
+ events = [`Read sock];
+ handler = incoming;
+ };
+ ]
+ in
+ ( try Unix.bind sock bind_addr
+ with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") ->
+ failwith (Printf.sprintf "port %d already taken" !port) );
+ Unix.listen sock max_conn;
+ Duppy.Task.add scheduler
+ {
+ Duppy.Task.priority = Non_blocking;
+ events = [`Read sock];
+ handler = incoming;
+ };
+ for i = 1 to !non_blocking_queues do
+ ignore
+ (new_queue ~priority:Non_blocking
+ ~name:(Printf.sprintf "Non blocking queue #%d" i)
+ ())
+ done;
+ for i = 1 to !maybe_blocking_queues do
+ ignore
+ (new_queue ~priority:Maybe_blocking
+ ~name:(Printf.sprintf "Maybe blocking queue #%d" i)
+ ())
+ done;
+ Duppy.queue scheduler ~log:(fun _ -> ()) "root"
diff --git a/examples/index.html b/examples/index.html
new file mode 100644
index 0000000..a7f8d9e
--- /dev/null
+++ b/examples/index.html
@@ -0,0 +1 @@
+bla
diff --git a/examples/telnet.ml b/examples/telnet.ml
new file mode 100644
index 0000000..1511e33
--- /dev/null
+++ b/examples/telnet.ml
@@ -0,0 +1,151 @@
+type priority = Non_blocking | Maybe_blocking
+
+let io_priority = Non_blocking
+
+(* Create scheduler *)
+let scheduler = Duppy.create ()
+
+(* Create two queues,
+ * one for non blocking events
+ * and another for blocking
+ * events *)
+let new_queue ~priority ~name () =
+ let log = Printf.printf "%s: %s\n%!" name in
+ let priorities p = p = priority in
+ let queue () = Duppy.queue scheduler ~log ~priorities name in
+ Thread.create queue ()
+
+let th =
+ ignore (new_queue ~priority:Non_blocking ~name:"Non blocking queue" ());
+ ignore (new_queue ~priority:Maybe_blocking ~name:"Maybe blocking queue #1" ());
+ new_queue ~priority:Maybe_blocking ~name:"Maybe blocking queue #2" ()
+
+let exec_command s () =
+ let chan = Unix.open_process_in s in
+ let rec aux () =
+ match try Some (input_line chan) with End_of_file -> None with
+ | None -> []
+ | Some s -> s :: aux ()
+ in
+ let l = aux () in
+ ignore (Unix.close_process_in chan);
+ Duppy.Monad.return (String.concat "\r\n" l)
+
+let commands = Hashtbl.create 10
+
+let () =
+ Hashtbl.add commands "hello" (false, fun () -> Duppy.Monad.return "world");
+ Hashtbl.add commands "foo" (false, fun () -> Duppy.Monad.return "bar");
+ Hashtbl.add commands "uptime" (true, exec_command "uptime");
+ Hashtbl.add commands "date" (true, exec_command "date");
+ Hashtbl.add commands "whoami" (true, exec_command "whoami");
+ Hashtbl.add commands "sleep" (true, exec_command "sleep 15");
+ Hashtbl.add commands "exit" (true, fun () -> Duppy.Monad.raise ())
+
+(* Add commands here *)
+let help = Buffer.create 10
+
+let () =
+ Buffer.add_string help "List of commands:";
+ Hashtbl.iter
+ (fun x _ -> Buffer.add_string help (Printf.sprintf "\r\n%s" x))
+ commands;
+ Hashtbl.add commands "help"
+ (false, fun () -> Duppy.Monad.return (Buffer.contents help))
+
+let handle_client socket =
+ let on_error e =
+ match e with
+ | Duppy.Io.Io_error -> Printf.printf "Client disconnected"
+ | Duppy.Io.Unix (c, p, m) ->
+ Printf.printf "%s" (Printexc.to_string (Unix.Unix_error (c, p, m)))
+ | Duppy.Io.Unknown e -> Printf.printf "%s" (Printexc.to_string e)
+ | Duppy.Io.Timeout -> Printf.printf "Timeout"
+ in
+ let h = { Duppy.Monad.Io.scheduler; socket; data = ""; on_error } in
+ (* Read and process lines *)
+ let rec exec () =
+ let __pa_duppy_0 =
+ Duppy.Monad.Io.read ?timeout:None ~priority:io_priority
+ ~marker:(Duppy.Io.Split "[\r\n]+") h
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun req ->
+ let __pa_duppy_0 =
+ try
+ let blocking, command = Hashtbl.find commands req in
+ if not blocking then command ()
+ else Duppy.Monad.Io.exec ~priority:Maybe_blocking h (command ())
+ with Not_found ->
+ Duppy.Monad.return
+ "ERROR: unknown command, type \"help\" to get a list of commands."
+ in
+ Duppy.Monad.bind __pa_duppy_0 (fun ans ->
+ Duppy.Monad.bind
+ (Duppy.Monad.bind
+ (Duppy.Monad.Io.write ?timeout:None ~priority:io_priority h
+ (Bytes.unsafe_of_string "BEGIN\r\n"))
+ (fun () ->
+ Duppy.Monad.bind
+ (Duppy.Monad.Io.write ?timeout:None ~priority:io_priority h
+ (Bytes.unsafe_of_string ans))
+ (fun () ->
+ Duppy.Monad.Io.write ?timeout:None ~priority:io_priority
+ h
+ (Bytes.unsafe_of_string "\r\nEND\r\n"))))
+ (fun () -> exec ())))
+ in
+ let close () = try Unix.close socket with _ -> () in
+ let return () =
+ let on_error e =
+ on_error e;
+ close ()
+ in
+ Duppy.Io.write ~priority:io_priority ~on_error ~exec:close scheduler
+ ~string:(Bytes.unsafe_of_string "Bye!\r\n")
+ socket
+ in
+ Duppy.Monad.run ~return ~raise:close (exec ())
+
+open Unix
+
+let port = 4123
+let bind_addr_inet = inet_addr_of_string "0.0.0.0"
+let bind_addr = ADDR_INET (bind_addr_inet, port)
+let max_conn = 10
+let sock = socket PF_INET SOCK_STREAM 0
+
+let () =
+ setsockopt sock SO_REUSEADDR true;
+ let rec incoming _ =
+ ( try
+ let s, caller = accept sock in
+ let ip =
+ let a =
+ match caller with ADDR_INET (a, _) -> a | _ -> assert false
+ in
+ try (gethostbyaddr a).h_name with Not_found -> string_of_inet_addr a
+ in
+ Printf.printf "New client: %s\n" ip;
+ handle_client s
+ with e ->
+ Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e)
+ );
+ [
+ {
+ Duppy.Task.priority = io_priority;
+ Duppy.Task.events = [`Read sock];
+ Duppy.Task.handler = incoming;
+ };
+ ]
+ in
+ ( try bind sock bind_addr
+ with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") ->
+ failwith (Printf.sprintf "port %d already taken" port) );
+ listen sock max_conn;
+ Duppy.Task.add scheduler
+ {
+ Duppy.Task.priority = io_priority;
+ Duppy.Task.events = [`Read sock];
+ Duppy.Task.handler = incoming;
+ };
+ Thread.join th
diff --git a/src/dune b/src/dune
new file mode 100644
index 0000000..0f40400
--- /dev/null
+++ b/src/dune
@@ -0,0 +1,8 @@
+(library
+ (name duppy)
+ (public_name duppy)
+ (libraries unix threads pcre bigarray)
+ (foreign_stubs
+ (language c)
+ (names duppy_stubs))
+ (synopsis "OCaml advanced scheduler"))
diff --git a/src/duppy.ml b/src/duppy.ml
new file mode 100644
index 0000000..cb0d6e6
--- /dev/null
+++ b/src/duppy.ml
@@ -0,0 +1,1077 @@
+(*****************************************************************************
+
+ Duppy, a task scheduler for OCaml.
+ Copyright 2003-2010 Savonet team
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details, fully stated in the COPYING
+ file at the root of the liquidsoap distribution.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+ *****************************************************************************)
+
+type fd = Unix.file_descr
+
+external poll :
+ Unix.file_descr array ->
+ Unix.file_descr array ->
+ Unix.file_descr array ->
+ float ->
+ Unix.file_descr array * Unix.file_descr array * Unix.file_descr array
+ = "caml_poll"
+
+let poll r w e timeout =
+ let r = Array.of_list r in
+ let w = Array.of_list w in
+ let e = Array.of_list e in
+ let r, w, e = poll r w e timeout in
+ (Array.to_list r, Array.to_list w, Array.to_list e)
+
+let select, select_fname =
+ match Sys.os_type with
+ | "Unix" -> (poll, "poll")
+ | _ -> (Unix.select, "select")
+
+(** [remove f l] is like [List.find f l] but also returns the result of removing
+ * the found element from the original list. *)
+let remove f l =
+ let rec aux acc = function
+ | [] -> raise Not_found
+ | x :: l -> if f x then (x, List.rev_append acc l) else aux (x :: acc) l
+ in
+ aux [] l
+
+(** Events and tasks from the implementation point-of-view:
+ * we have to hide the 'a parameter. *)
+
+type e = { r : fd list; w : fd list; x : fd list; t : float }
+
+type 'a t = {
+ timestamp : float;
+ prio : 'a;
+ enrich : e -> e;
+ is_ready : e -> (unit -> 'a t list) option;
+}
+
+type 'a scheduler = {
+ out_pipe : Unix.file_descr;
+ in_pipe : Unix.file_descr;
+ compare : 'a -> 'a -> int;
+ select_m : Mutex.t;
+ mutable tasks : 'a t list;
+ tasks_m : Mutex.t;
+ mutable ready : ('a * (unit -> 'a t list)) list;
+ ready_m : Mutex.t;
+ mutable queues : Condition.t list;
+ queues_m : Mutex.t;
+ mutable stop : bool;
+ stop_m : Mutex.t;
+ queue_stopped_c : Condition.t;
+}
+
+let clear_tasks s =
+ Mutex.lock s.tasks_m;
+ s.tasks <- [];
+ Mutex.unlock s.tasks_m
+
+let create ?(compare = compare) () =
+ let out_pipe, in_pipe = Unix.pipe () in
+ {
+ out_pipe;
+ in_pipe;
+ compare;
+ select_m = Mutex.create ();
+ tasks = [];
+ tasks_m = Mutex.create ();
+ ready = [];
+ ready_m = Mutex.create ();
+ queues = [];
+ queues_m = Mutex.create ();
+ stop = false;
+ stop_m = Mutex.create ();
+ queue_stopped_c = Condition.create ();
+ }
+
+let wake_up s = ignore (Unix.write s.in_pipe (Bytes.of_string "x") 0 1)
+
+module Task = struct
+ (** Events and tasks from the user's point-of-view. *)
+
+ type event =
+ [ `Delay of float | `Write of fd | `Read of fd | `Exception of fd ]
+
+ type ('a, 'b) task = {
+ priority : 'a;
+ events : 'b list;
+ handler : 'b list -> ('a, 'b) task list;
+ }
+
+ let time () = Unix.gettimeofday ()
+
+ let rec t_of_task (task : ('a, [< event ]) task) =
+ let t0 = time () in
+ {
+ timestamp = t0;
+ prio = task.priority;
+ enrich =
+ (fun e ->
+ List.fold_left
+ (fun e -> function `Delay s -> { e with t = min e.t (t0 +. s) }
+ | `Read s -> { e with r = s :: e.r }
+ | `Write s -> { e with w = s :: e.w }
+ | `Exception s -> { e with x = s :: e.x })
+ e task.events);
+ is_ready =
+ (fun e ->
+ let l =
+ List.filter
+ (fun evt ->
+ match (evt :> event) with
+ | `Delay s when time () > t0 +. s -> true
+ | `Read s when List.mem s e.r -> true
+ | `Write s when List.mem s e.w -> true
+ | `Exception s when List.mem s e.x -> true
+ | _ -> false)
+ task.events
+ in
+ if l = [] then None
+ else Some (fun () -> List.map t_of_task (task.handler l)));
+ }
+
+ let add_t s items =
+ let f item =
+ match item.is_ready { r = []; w = []; x = []; t = 0. } with
+ | Some f ->
+ Mutex.lock s.ready_m;
+ s.ready <- (item.prio, f) :: s.ready;
+ Mutex.unlock s.ready_m
+ | None ->
+ Mutex.lock s.tasks_m;
+ s.tasks <- item :: s.tasks;
+ Mutex.unlock s.tasks_m
+ in
+ List.iter f items;
+ wake_up s
+
+ let add s t = add_t s [t_of_task t]
+end
+
+open Task
+
+let stop s =
+ clear_tasks s;
+ Mutex.lock s.stop_m;
+ s.stop <- true;
+ Mutex.unlock s.stop_m;
+ Mutex.lock s.queues_m;
+ while List.length s.queues > 0 do
+ wake_up s;
+ Mutex.lock s.ready_m;
+ List.iter Condition.signal s.queues;
+ Mutex.unlock s.ready_m;
+ Condition.wait s.queue_stopped_c s.queues_m
+ done;
+ Mutex.unlock s.queues_m
+
+let tmp = Bytes.create 1024
+
+(** There should be only one call of #process at a time.
+ * Process waits for tasks to become ready, and moves ready tasks
+ * to the ready queue. *)
+let process s log =
+ (* Compute the union of all events. *)
+ let e =
+ List.fold_left
+ (fun e t -> t.enrich e)
+ { r = [s.out_pipe]; w = []; x = []; t = infinity }
+ s.tasks
+ in
+ (* Poll for an event. *)
+ let r, w, x =
+ let rec f () =
+ try
+ let timeout = if e.t = infinity then -1. else max 0. (e.t -. time ()) in
+ log
+ (Printf.sprintf "Enter %s at %f, timeout %f (%d/%d/%d)." select_fname
+ (time ()) timeout (List.length e.r) (List.length e.w)
+ (List.length e.x));
+ let r, w, x = select e.r e.w e.x timeout in
+ log
+ (Printf.sprintf "Left %s at %f (%d/%d/%d)." select_fname (time ())
+ (List.length r) (List.length w) (List.length x));
+ (r, w, x)
+ with
+ | Unix.Unix_error (Unix.EINTR, _, _) ->
+ (* [EINTR] means that select was interrupted by
+ * a signal before any of the selected events
+ * occurred and before the timeout interval expired.
+ * We catch it and restart.. *)
+ log (Printf.sprintf "Select interrupted at %f." (time ()));
+ f ()
+ | e ->
+ (* Uncaught exception:
+ * 1) Discards all tasks currently in the loop (we do not know which
+ * socket caused an error).
+ * 2) Re-Raise e *)
+ clear_tasks s;
+ raise e
+ in
+ f ()
+ in
+ (* Empty the wake_up pipe if needed. *)
+ let () =
+ if List.mem s.out_pipe r then
+ (* For safety, we may absorb more than
+ * one write. This avoids bad situation
+ * when exceesive wake_up may fill up the
+ * pipe's write buffer, causing a wake_up
+ * to become blocking.. *)
+ ignore (Unix.read s.out_pipe tmp 0 1024)
+ in
+ (* Move ready tasks to the ready list. *)
+ let e = { r; w; x; t = 0. } in
+ Mutex.lock s.tasks_m;
+ (* Split [tasks] into [r]eady and still [w]aiting. *)
+ let r, w =
+ List.fold_left
+ (fun (r, w) t ->
+ match t.is_ready e with
+ | Some f -> ((t.prio, f) :: r, w)
+ | None -> (r, t :: w))
+ ([], []) s.tasks
+ in
+ s.tasks <- w;
+ Mutex.unlock s.tasks_m;
+ Mutex.lock s.ready_m;
+ s.ready <-
+ List.stable_sort (fun (p, _) (p', _) -> s.compare p p') (s.ready @ r);
+ Mutex.unlock s.ready_m
+
+(** Code for a queue to process ready tasks.
+ * Returns true a task was found (and hence processed).
+ *
+ * s.ready_m *must* be locked before calling
+ * this function, and is freed *only*
+ * if some task was processed. *)
+let exec s (priorities : 'a -> bool) =
+ (* This assertion does not work on
+ * win32 because a thread can double-lock
+ * the same mutex.. *)
+ if Sys.os_type <> "Win32" then assert (not (Mutex.try_lock s.ready_m));
+ try
+ let (_, task), remaining = remove (fun (p, _) -> priorities p) s.ready in
+ s.ready <- remaining;
+ Mutex.unlock s.ready_m;
+ add_t s (task ());
+ true
+ with Not_found -> false
+
+exception Queue_stopped
+exception Queue_processed
+
+(** Main loop for queues. *)
+let queue ?log ?(priorities = fun _ -> true) s name =
+ let log =
+ match log with Some e -> e | None -> Printf.printf "queue %s: %s\n" name
+ in
+ let c =
+ let c = Condition.create () in
+ Mutex.lock s.queues_m;
+ s.queues <- c :: s.queues;
+ Mutex.unlock s.queues_m;
+ log (Printf.sprintf "Queue #%d starting..." (List.length s.queues));
+ c
+ in
+ (* Try to process ready tasks, otherwise try to become the master,
+ * or be a slave and wait for the master to get some more ready tasks. *)
+ let run () =
+ Mutex.lock s.stop_m;
+ let stop = s.stop in
+ Mutex.unlock s.stop_m;
+ if stop then raise Queue_stopped;
+ (* Lock the ready tasks until the queue has a task to proceed,
+ * *or* is really ready to restart on its condition, see the
+ * Condition.wait call below for the atomic unlock and wait. *)
+ Mutex.lock s.ready_m;
+ log (Printf.sprintf "There are %d ready tasks." (List.length s.ready));
+ if exec s priorities then raise Queue_processed;
+ let wake () =
+ let is_ready =
+ Mutex.lock s.ready_m;
+ let is_ready = s.ready <> [] in
+ Mutex.unlock s.ready_m;
+ is_ready
+ in
+ (* Wake up other queues if there are remaining tasks *)
+ if is_ready then begin
+ Mutex.lock s.queues_m;
+ List.iter (fun x -> if x <> c then Condition.signal x) s.queues;
+ Mutex.unlock s.queues_m
+ end
+ in
+ if Mutex.try_lock s.select_m then begin
+ (* Processing finished for me
+ * I can unlock ready_m now.. *)
+ Mutex.unlock s.ready_m;
+ process s log;
+ Mutex.unlock s.select_m;
+ wake ();
+ end
+ else begin
+ (* We use s.ready_m mutex here.
+ * Hence, we avoid race conditions
+ * with any other queue being processing
+ * a task that would create a new task:
+ * without this mutex, the new task may not be
+ * notified to this queue if it is going to sleep
+ * in concurrency..
+ * It also avoid race conditions when restarting
+ * queues since s.ready_m is locked until all
+ * queues have been signaled. *)
+ Condition.wait c s.ready_m;
+ Mutex.unlock s.ready_m
+ end
+ in
+ let rec f () =
+ begin
+ try run () with Queue_processed -> ()
+ end;
+ (f [@tailcall]) ()
+ in
+ let on_done () =
+ Mutex.lock s.queues_m;
+ s.queues <- List.filter (fun q -> q <> c) s.queues;
+ Condition.signal s.queue_stopped_c;
+ Mutex.unlock s.queues_m
+ in
+ ( try f () with
+ | Queue_stopped -> ()
+ | exn ->
+ on_done ();
+ raise exn );
+ on_done ()
+
+module Async = struct
+ (* m is used to make sure that
+ * calls to [wake_up] and [stop]
+ * are thread-safe. *)
+ type t = { stop : bool ref; mutable fd : fd option; m : Mutex.t }
+
+ exception Stopped
+
+ let add ~priority (scheduler : 'a scheduler) f =
+ (* A pipe to wake up the task *)
+ let out_pipe, in_pipe = Unix.pipe () in
+ let stop = ref false in
+ let tmp = Bytes.create 1024 in
+ let rec task l =
+ if List.exists (( = ) (`Read out_pipe)) l then
+ (* Consume data from the pipe *)
+ ignore (Unix.read out_pipe tmp 0 1024);
+ if !stop then begin
+ begin
+ try
+ (* This interface is purely asynchronous
+ * so we close both sides of the pipe here. *)
+ Unix.close in_pipe;
+ Unix.close out_pipe
+ with _ -> ()
+ end;
+ []
+ end
+ else begin
+ let delay = f () in
+ let event = if delay >= 0. then [`Delay delay] else [] in
+ [{ priority; events = `Read out_pipe :: event; handler = task }]
+ end
+ in
+ let task = { priority; events = [`Read out_pipe]; handler = task } in
+ add scheduler task;
+ { stop; fd = Some in_pipe; m = Mutex.create () }
+
+ let wake_up t =
+ Mutex.lock t.m;
+ try
+ begin
+ match t.fd with
+ | Some t -> ignore (Unix.write t (Bytes.of_string " ") 0 1)
+ | None -> raise Stopped
+ end;
+ Mutex.unlock t.m
+ with e ->
+ Mutex.unlock t.m;
+ raise e
+
+ let stop t =
+ Mutex.lock t.m;
+ try
+ begin
+ match t.fd with
+ | Some c ->
+ t.stop := true;
+ ignore (Unix.write c (Bytes.of_string " ") 0 1)
+ | None -> raise Stopped
+ end;
+ t.fd <- None;
+ Mutex.unlock t.m
+ with e ->
+ Mutex.unlock t.m;
+ raise e
+end
+
+module type Transport_t = sig
+ type t
+
+ type bigarray =
+ (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
+
+ val sock : t -> Unix.file_descr
+ val read : t -> Bytes.t -> int -> int -> int
+ val write : t -> Bytes.t -> int -> int -> int
+ val ba_write : t -> bigarray -> int -> int -> int
+end
+
+module Unix_transport : Transport_t with type t = Unix.file_descr = struct
+ type t = Unix.file_descr
+
+ type bigarray =
+ (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
+
+ let sock s = s
+ let read = Unix.read
+ let write = Unix.write
+
+ external ba_write : t -> bigarray -> int -> int -> int
+ = "ocaml_duppy_write_ba"
+end
+
+module type Io_t = sig
+ type socket
+ type marker = Length of int | Split of string
+
+ type bigarray =
+ (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
+
+ type failure =
+ | Io_error
+ | Unix of Unix.error * string * string
+ | Unknown of exn
+ | Timeout
+
+ val read :
+ ?recursive:bool ->
+ ?init:string ->
+ ?on_error:(string * failure -> unit) ->
+ ?timeout:float ->
+ priority:'a ->
+ 'a scheduler ->
+ socket ->
+ marker ->
+ (string * string option -> unit) ->
+ unit
+
+ val write :
+ ?exec:(unit -> unit) ->
+ ?on_error:(failure -> unit) ->
+ ?bigarray:bigarray ->
+ ?offset:int ->
+ ?length:int ->
+ ?string:Bytes.t ->
+ ?timeout:float ->
+ priority:'a ->
+ 'a scheduler ->
+ socket ->
+ unit
+end
+
+module MakeIo (Transport : Transport_t) : Io_t with type socket = Transport.t =
+struct
+ type socket = Transport.t
+ type marker = Length of int | Split of string
+
+ type failure =
+ | Io_error
+ | Unix of Unix.error * string * string
+ | Unknown of exn
+ | Timeout
+
+ exception Io
+ exception Timeout_exc
+
+ type bigarray =
+ (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
+
+ let read ?(recursive = false) ?(init = "") ?(on_error = fun _ -> ()) ?timeout
+ ~priority (scheduler : 'a scheduler) socket marker exec =
+ let length = 1024 in
+ let b = Buffer.create length in
+ let buf = Bytes.make length ' ' in
+ Buffer.add_string b init;
+ let unix_socket = Transport.sock socket in
+ let events, check_timeout =
+ match timeout with
+ | None -> ([`Read unix_socket], fun _ -> false)
+ | Some f -> ([`Read unix_socket; `Delay f], List.mem (`Delay f))
+ in
+ let rec f l =
+ if check_timeout l then raise Timeout_exc;
+ if List.mem (`Read unix_socket) l then begin
+ let input = Transport.read socket buf 0 length in
+ if input <= 0 then raise Io;
+ Buffer.add_subbytes b buf 0 input
+ end;
+ let ret =
+ match marker with
+ | Split r ->
+ let rex = Pcre.regexp r in
+ let acc = Buffer.contents b in
+ let ret = Pcre.full_split ~max:2 ~rex acc in
+ let rec p l =
+ match l with
+ | Pcre.Text x :: Pcre.Delim _ :: l ->
+ let f b x =
+ match x with
+ | Pcre.Text s | Pcre.Delim s -> Buffer.add_string b s
+ | _ -> ()
+ in
+ if recursive then begin
+ Buffer.reset b;
+ List.iter (f b) l;
+ Some (x, None)
+ end
+ else begin
+ let b = Buffer.create 10 in
+ List.iter (f b) l;
+ Some (x, Some (Buffer.contents b))
+ end
+ | _ :: l' -> p l'
+ | [] -> None
+ in
+ p ret
+ | Length n when n <= Buffer.length b ->
+ let s = Buffer.sub b 0 n in
+ let rem = Buffer.sub b n (Buffer.length b - n) in
+ if recursive then begin
+ Buffer.reset b;
+ Buffer.add_string b rem;
+ Some (s, None)
+ end
+ else Some (s, Some rem)
+ | _ -> None
+ in
+ (* Catch all exceptions.. *)
+ let f x =
+ try f x with
+ | Io ->
+ on_error (Buffer.contents b, Io_error);
+ []
+ | Timeout_exc ->
+ on_error (Buffer.contents b, Timeout);
+ []
+ | Unix.Unix_error (x, y, z) ->
+ on_error (Buffer.contents b, Unix (x, y, z));
+ []
+ | e ->
+ on_error (Buffer.contents b, Unknown e);
+ []
+ in
+ match ret with
+ | Some x -> (
+ match x with
+ | s, Some _ when recursive ->
+ exec (s, None);
+ [{ priority; events; handler = f }]
+ | _ ->
+ exec x;
+ [] )
+ | None -> [{ priority; events; handler = f }]
+ in
+ (* Catch all exceptions.. *)
+ let f x =
+ try f x with
+ | Io ->
+ on_error (Buffer.contents b, Io_error);
+ []
+ | Timeout_exc ->
+ on_error (Buffer.contents b, Timeout);
+ []
+ | Unix.Unix_error (x, y, z) ->
+ on_error (Buffer.contents b, Unix (x, y, z));
+ []
+ | e ->
+ on_error (Buffer.contents b, Unknown e);
+ []
+ in
+ (* First one is without read,
+ * in case init contains the wanted match.
+ * Unless the user sets timeout to 0., this
+ * should not interfer with user-defined timeout.. *)
+ let task =
+ { priority; events = [`Delay 0.; `Read unix_socket]; handler = f }
+ in
+ add scheduler task
+
+ let write ?(exec = fun () -> ()) ?(on_error = fun _ -> ()) ?bigarray
+ ?(offset = 0) ?length ?string ?timeout ~priority
+ (scheduler : 'a scheduler) socket =
+ let length, write =
+ match (string, bigarray) with
+ | Some s, _ ->
+ let length =
+ match length with Some length -> length | None -> Bytes.length s
+ in
+ (length, Transport.write socket s)
+ | None, Some b ->
+ let length =
+ match length with
+ | Some length -> length
+ | None -> Bigarray.Array1.dim b
+ in
+ (length, Transport.ba_write socket b)
+ | _ -> (0, fun _ _ -> 0)
+ in
+ let unix_socket = Transport.sock (socket : Transport.t) in
+ let exec () =
+ if Sys.os_type = "Win32" then Unix.clear_nonblock unix_socket;
+ exec ()
+ in
+ let events, check_timeout =
+ match timeout with
+ | None -> ([`Write unix_socket], fun _ -> false)
+ | Some f -> ([`Write unix_socket; `Delay f], List.mem (`Delay f))
+ in
+ let rec f pos l =
+ try
+ if check_timeout l then raise Timeout_exc;
+ assert (List.exists (( = ) (`Write unix_socket)) l);
+ let len = length - pos in
+ let n = write pos len in
+ if n <= 0 then (
+ on_error Io_error;
+ [] )
+ else if n < len then
+ [{ priority; events = [`Write unix_socket]; handler = f (pos + n) }]
+ else (
+ exec ();
+ [] )
+ with
+ | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) when Sys.os_type = "Win32" ->
+ [{ priority; events = [`Write unix_socket]; handler = f pos }]
+ | Timeout_exc ->
+ on_error Timeout;
+ []
+ | Unix.Unix_error (x, y, z) ->
+ on_error (Unix (x, y, z));
+ []
+ | e ->
+ on_error (Unknown e);
+ []
+ in
+ let task = { priority; events; handler = f offset } in
+ if length > 0 then
+ (* Win32 is particularly bad with writting on sockets. It is nearly impossible
+ * to write proper non-blocking code. send will block on blocking sockets if
+ * there isn't enough data available instead of returning a partial buffer
+ * and WSAEventSelect will not return if the socket still has available space.
+ * Thus, setting the socket to non-blocking and writting as much as we can. *)
+ if Sys.os_type = "Win32" then begin
+ Unix.set_nonblock unix_socket;
+ List.iter (add scheduler) (f offset [`Write unix_socket])
+ end
+ else add scheduler task
+ else exec ()
+end
+
+module Io : Io_t with type socket = Unix.file_descr = MakeIo (Unix_transport)
+
+(** A monad for implicit continuations or responses *)
+module Monad = struct
+ type ('a, 'b) handler = { return : 'a -> unit; raise : 'b -> unit }
+ type ('a, 'b) t = ('a, 'b) handler -> unit
+
+ let return x h = h.return x
+ let raise x h = h.raise x
+
+ let bind f g h =
+ let ret x =
+ let process = g x in
+ process h
+ in
+ f { return = ret; raise = h.raise }
+
+ let ( >>= ) = bind
+ let run ~return:ret ~raise f = f { return = ret; raise }
+
+ let catch f g h =
+ let raise x =
+ let process = g x in
+ process h
+ in
+ f { return = h.return; raise }
+
+ let ( =<< ) x y = catch y x
+
+ let rec fold_left f a = function
+ | [] -> a
+ | b :: l -> fold_left f (bind a (fun a -> f a b)) l
+
+ let fold_left f a l = fold_left f (return a) l
+ let iter f l = fold_left (fun () b -> f b) () l
+
+ module Mutex_o = Mutex
+
+ module Mutex = struct
+ module type Mutex_control = sig
+ type priority
+
+ val scheduler : priority scheduler
+ val priority : priority
+ end
+
+ module type Mutex_t = sig
+ (** Type for a mutex. *)
+ type mutex
+
+ module Control : Mutex_control
+
+ (** [create ()] creates a mutex. Implementation-wise,
+ * a duppy task is created that will be used to select a
+ * waiting computation, lock the mutex on it and resume it.
+ * Thus, [priority] and [s] represents, resp., the priority
+ * and scheduler used when running calling process' computation. *)
+ val create : unit -> mutex
+
+ (** A computation that locks a mutex
+ * and returns [unit] afterwards. Computation
+ * will be blocked until the mutex is sucessfuly locked. *)
+ val lock : mutex -> (unit, 'a) t
+
+ (** A computation that tries to lock a mutex.
+ * Returns immediatly [true] if the mutex was sucesfully locked
+ * or [false] otherwise. *)
+ val try_lock : mutex -> (bool, 'a) t
+
+ (** A computation that unlocks a mutex.
+ * Should return immediatly. *)
+ val unlock : mutex -> (unit, 'a) t
+ end
+
+ module Factory (Control : Mutex_control) = struct
+ (* A mutex is either locked or not
+ * and has a list of tasks waiting to get
+ * it. *)
+ type mutex = {
+ mutable locked : bool;
+ mutable tasks : (unit -> unit) list;
+ }
+
+ module Control = Control
+
+ let tmp = Bytes.create 1024
+ let x, y = Unix.pipe ()
+ let stop = ref false
+ let wake_up () = ignore (Unix.write y (Bytes.of_string " ") 0 1)
+ let ctl_m = Mutex_o.create ()
+
+ let finalise _ =
+ stop := true;
+ wake_up ()
+
+ let mutexes = Queue.create ()
+ let () = Gc.finalise finalise mutexes
+
+ let register () =
+ let m = { locked = false; tasks = [] } in
+ Queue.push m mutexes;
+ m
+
+ let cleanup m =
+ Mutex_o.lock ctl_m;
+ let q = Queue.create () in
+ Queue.iter (fun m' -> if m <> m' then Queue.push m q) mutexes;
+ Queue.clear mutexes;
+ Queue.transfer q mutexes;
+ Mutex_o.unlock ctl_m
+
+ let task f =
+ {
+ Task.priority = Control.priority;
+ events = [`Delay 0.];
+ handler =
+ (fun _ ->
+ f ();
+ []);
+ }
+
+ (* This should only be called when [ctl_m] is locked. *)
+ let process_mutex tasks m =
+ if not m.locked then (
+ (* I don't think shuffling tasks
+ * matters here.. *)
+ match m.tasks with
+ | x :: l ->
+ m.tasks <- l;
+ m.locked <- true;
+ task x :: tasks
+ | _ -> tasks )
+ else tasks
+
+ let rec handler _ =
+ Mutex_o.lock ctl_m;
+ if not !stop then begin
+ let tasks = Queue.fold process_mutex [] mutexes in
+ Mutex_o.unlock ctl_m;
+ ignore (Unix.read x tmp 0 1024);
+ { Task.priority = Control.priority; events = [`Read x]; handler }
+ :: tasks
+ end
+ else begin
+ Mutex_o.unlock ctl_m;
+ try
+ Unix.close x;
+ Unix.close y;
+ []
+ with _ -> []
+ end
+
+ let () =
+ Task.add Control.scheduler
+ { Task.priority = Control.priority; events = [`Read x]; handler }
+
+ let create () =
+ Mutex_o.lock ctl_m;
+ let ret = register () in
+ Mutex_o.unlock ctl_m;
+ Gc.finalise cleanup ret;
+ ret
+
+ let lock m h' =
+ Mutex_o.lock ctl_m;
+ if not m.locked then begin
+ m.locked <- true;
+ Mutex_o.unlock ctl_m;
+ h'.return ()
+ end
+ else begin
+ m.tasks <- h'.return :: m.tasks;
+ Mutex_o.unlock ctl_m
+ end
+
+ let try_lock m h' =
+ Mutex_o.lock ctl_m;
+ if not m.locked then begin
+ m.locked <- true;
+ Mutex_o.unlock ctl_m;
+ h'.return true
+ end
+ else begin
+ Mutex_o.unlock ctl_m;
+ h'.return false
+ end
+
+ let unlock m h' =
+ Mutex_o.lock ctl_m;
+ (* Here we allow inter-thread
+ * and double unlock.. Double unlock
+ * is not necessarily a problem and
+ * inter-thread unlock well.. what is
+ * a thread here ?? :-) *)
+ m.locked <- false;
+ let wake = m.tasks <> [] in
+ Mutex_o.unlock ctl_m;
+ if wake then wake_up ();
+ h'.return ()
+ end
+ end
+
+ module Condition = struct
+ module Factory (Mutex : Mutex.Mutex_t) = struct
+ type condition = {
+ condition_m : Mutex_o.t;
+ waiting : (unit -> unit) Queue.t;
+ }
+
+ module Control = Mutex.Control
+
+ let create () =
+ { condition_m = Mutex_o.create (); waiting = Queue.create () }
+
+ (* Mutex.unlock m needs to happen _after_
+ * the task has been registered. *)
+ let wait c m h =
+ let proc () = Mutex.lock m h in
+ Mutex_o.lock c.condition_m;
+ Queue.push proc c.waiting;
+ Mutex_o.unlock c.condition_m;
+ (* Mutex.unlock does not raise exceptions (for now..) *)
+ let h' = { return = (fun () -> ()); raise = (fun _ -> assert false) } in
+ Mutex.unlock m h'
+
+ let wake_up h =
+ let handler _ =
+ h ();
+ []
+ in
+ Task.add Control.scheduler
+ { Task.priority = Control.priority; events = [`Delay 0.]; handler }
+
+ let signal c h =
+ Mutex_o.lock c.condition_m;
+ let h' = Queue.pop c.waiting in
+ Mutex_o.unlock c.condition_m;
+ wake_up h';
+ h.return ()
+
+ let broadcast c h =
+ let q = Queue.create () in
+ Mutex_o.lock c.condition_m;
+ Queue.transfer c.waiting q;
+ Mutex_o.unlock c.condition_m;
+ Queue.iter wake_up q;
+ h.return ()
+ end
+ end
+
+ module type Monad_io_t = sig
+ type socket
+
+ module Io : Io_t with type socket = socket
+
+ type ('a, 'b) handler = {
+ scheduler : 'a scheduler;
+ socket : Io.socket;
+ mutable data : string;
+ on_error : Io.failure -> 'b;
+ }
+
+ val exec :
+ ?delay:float ->
+ priority:'a ->
+ ('a, 'b) handler ->
+ ('c, 'b) t ->
+ ('c, 'b) t
+
+ val delay : priority:'a -> ('a, 'b) handler -> float -> (unit, 'b) t
+
+ val read :
+ ?timeout:float ->
+ priority:'a ->
+ marker:Io.marker ->
+ ('a, 'b) handler ->
+ (string, 'b) t
+
+ val read_all :
+ ?timeout:float ->
+ priority:'a ->
+ 'a scheduler ->
+ Io.socket ->
+ (string, string * Io.failure) t
+
+ val write :
+ ?timeout:float ->
+ priority:'a ->
+ ('a, 'b) handler ->
+ ?offset:int ->
+ ?length:int ->
+ Bytes.t ->
+ (unit, 'b) t
+
+ val write_bigarray :
+ ?timeout:float ->
+ priority:'a ->
+ ('a, 'b) handler ->
+ Io.bigarray ->
+ (unit, 'b) t
+ end
+
+ module MakeIo (Io : Io_t) = struct
+ type socket = Io.socket
+
+ module Io = Io
+
+ type ('a, 'b) handler = {
+ scheduler : 'a scheduler;
+ socket : Io.socket;
+ mutable data : string;
+ on_error : Io.failure -> 'b;
+ }
+
+ let exec ?(delay = 0.) ~priority h f h' =
+ let handler _ =
+ begin
+ try f h' with e -> h'.raise (h.on_error (Io.Unknown e))
+ end;
+ []
+ in
+ Task.add h.scheduler { Task.priority; events = [`Delay delay]; handler }
+
+ let delay ~priority h delay = exec ~delay ~priority h (return ())
+
+ let read ?timeout ~priority ~marker h h' =
+ let process x =
+ let s =
+ match x with
+ | s, None ->
+ h.data <- "";
+ s
+ | s, Some s' ->
+ h.data <- s';
+ s
+ in
+ h'.return s
+ in
+ let init = h.data in
+ h.data <- "";
+ let on_error (s, x) =
+ h.data <- s;
+ h'.raise (h.on_error x)
+ in
+ Io.read ?timeout ~priority ~init ~recursive:false ~on_error h.scheduler
+ h.socket marker process
+
+ let read_all ?timeout ~priority s sock =
+ let handler =
+ { scheduler = s; socket = sock; data = ""; on_error = (fun e -> e) }
+ in
+ let buf = Buffer.create 1024 in
+ let rec f () =
+ let data = read ?timeout ~priority ~marker:(Io.Length 1024) handler in
+ let process data =
+ Buffer.add_string buf data;
+ f ()
+ in
+ data >>= process
+ in
+ let catch_ret e =
+ Buffer.add_string buf handler.data;
+ match e with
+ | Io.Io_error -> return (Buffer.contents buf)
+ | e -> raise (Buffer.contents buf, e)
+ in
+ catch (f ()) catch_ret
+
+ let write ?timeout ~priority h ?offset ?length s h' =
+ let on_error x = h'.raise (h.on_error x) in
+ let exec () = h'.return () in
+ Io.write ?timeout ~priority ~on_error ~exec ?offset ?length ~string:s
+ h.scheduler h.socket
+
+ let write_bigarray ?timeout ~priority h ba h' =
+ let on_error x = h'.raise (h.on_error x) in
+ let exec () = h'.return () in
+ Io.write ?timeout ~priority ~on_error ~exec ~bigarray:ba h.scheduler
+ h.socket
+ end
+
+ module Io = MakeIo (Io)
+end
diff --git a/src/duppy.mli b/src/duppy.mli
new file mode 100644
index 0000000..be0a4d6
--- /dev/null
+++ b/src/duppy.mli
@@ -0,0 +1,549 @@
+(*****************************************************************************
+
+ Duppy, a task scheduler for OCaml.
+ Copyright 2003-2010 Savonet team
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details, fully stated in the COPYING
+ file at the root of the liquidsoap distribution.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+ *****************************************************************************)
+
+(** Advanced scheduler and monad for server-oriented programming. *)
+
+(**
+ * {R {i {v
+ * The bars could not hold me;
+ * Force could not control me now.
+ * They try to keep me down, yeah!
+ * But Jah put I around.
+ * (...)
+ * Let me tell you this -
+ * I'm a duppy conqueror !
+ * v} } }
+ * {R {b Lee "Scratch" Perry & Bob Marley - Duppy conqueror }}
+ *
+ * {2 Duppy task scheduler for OCaml.}
+ *
+ * {!Duppy} is a task scheduler for ocaml. It implements a wrapper
+ * around [Unix.select].
+ *
+ * Using {!Duppy.Task}, the programmer can easily submit tasks that need to wait
+ * on a socket even, or for a given timeout (possibly zero).
+ *
+ * With {!Duppy.Async}, one can use a scheduler to submit asynchronous tasks.
+ *
+ * {!Duppy.Io} implements recursive easy reading and writing to a [Unix.file_descr]
+ *
+ * Finally, {!Duppy.Monad} and {!Duppy.Monad.Io} provide a monadic interface to
+ * program server code that with an implicit return/reply execution flow.
+ *
+ * The scheduler can use several queues running concurently, each queue
+ * processing ready tasks. Of course, a queue should run in its own thread.*)
+
+(** A scheduler is a device for processing tasks. Several queues might run in
+ * different threads, processing one scheduler's tasks.
+ *
+ * ['a] is the type of objects used for priorities. *)
+type 'a scheduler
+
+(** Initiate a new scheduler
+ * @param compare the comparison function used to sort tasks according to priorities.
+ * Works as in [List.sort] *)
+val create : ?compare:('a -> 'a -> int) -> unit -> 'a scheduler
+
+(** [queue ~log ~priorities s name]
+ * starts a queue, on the scheduler [s] only processing priorities [p]
+ * for which [priorities p] returns [true].
+ *
+ * Several queues can be run concurrently against [s].
+ * @param log Logging function. Default: [Printf.printf "queue %s: %s\n" name]
+ * @param priorities Predicate specifying which priority to process. Default: [fun _ -> _ -> true]
+ *
+ * An exception is raised from this call when duppy's event loops has
+ * crashed. This exception should be considered a MAJOR FAILURE. All current
+ * non-ready tasks registered for the calling scheduler are dropped. You may
+ * restart Duppy's queues after it is raised but it should only be used to terminate
+ * the process diligently!! *)
+val queue :
+ ?log:(string -> unit) ->
+ ?priorities:('a -> bool) ->
+ 'a scheduler ->
+ string ->
+ unit
+
+(** Stop all queues running on that scheduler and wait for them to return. *)
+val stop : 'a scheduler -> unit
+
+(** Core task registration.
+ *
+ * A task will be a set of events to watch, and a corresponding function to
+ * execute when one of the events is trigered.
+ *
+ * The executed function may then return a list of new tasks to schedule. *)
+module Task : sig
+ (** A task is a list of events awaited,
+ * and a function to process events that have occured.
+ *
+ * The ['a] parameter is the type of priorities, ['b] will be a subset of possible
+ * events. *)
+ type ('a, 'b) task = {
+ priority : 'a;
+ events : 'b list;
+ handler : 'b list -> ('a, 'b) task list;
+ }
+
+ (** Type for possible events.
+ *
+ * Please not that currently, under win32, all socket used in ocaml-duppy
+ * are expected to be in blocking mode only! *)
+ type event =
+ [ `Delay of float
+ | `Write of Unix.file_descr
+ | `Read of Unix.file_descr
+ | `Exception of Unix.file_descr ]
+
+ (** Schedule a task. *)
+ val add : 'a scheduler -> ('a, [< event ]) task -> unit
+end
+
+(** Asynchronous task module
+ *
+ * This module implements an asychronous API to {!Duppy.scheduler}
+ * It allows to create a task that will run and then go to sleep. *)
+module Async : sig
+ type t
+
+ (** Exception raised when trying to wake_up a task
+ * that has been previously stopped *)
+ exception Stopped
+
+ (** [add ~priority s f] creates an asynchronous task in [s] with
+ * priority [priority].
+ *
+ * The task executes the function [f].
+ * If the task returns a positive float, the function will be executed
+ * again after this delay. Otherwise it goes to sleep, and
+ * you can use [wake_up] to resume the task and execute [f] again.
+ * Only a single call to [f] is done at each time.
+ * Multiple [wake_up] while previous task has not
+ * finished will result in sequentialized calls to [f]. *)
+ val add : priority:'a -> 'a scheduler -> (unit -> float) -> t
+
+ (** Wake up an asynchronous task.
+ * Raises [Stopped] if the task has been stopped. *)
+ val wake_up : t -> unit
+
+ (** Stop and remove the asynchronous task. Doesn't quit a running task.
+ * Raises [Stopped] if the task has been stopped. *)
+ val stop : t -> unit
+end
+
+(** Module type for Io functor. *)
+module type Transport_t = sig
+ type t
+
+ type bigarray =
+ (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
+
+ val sock : t -> Unix.file_descr
+ val read : t -> Bytes.t -> int -> int -> int
+ val write : t -> Bytes.t -> int -> int -> int
+ val ba_write : t -> bigarray -> int -> int -> int
+end
+
+(** Easy parsing of [Unix.file_descr].
+ *
+ * With {!Duppy.Io.read}, you can pass a file descriptor to the scheduler,
+ * along with a marker, and have it run the associated function when the
+ * marker is found.
+ *
+ * With {!Duppy.Io.write}, the schdeduler will try to write recursively to the file descriptor
+ * the given string. *)
+module type Io_t = sig
+ type socket
+
+ (** Type for markers.
+ *
+ * [Split s] recognizes all regexp allowed by the
+ * [Pcre] module. *)
+ type marker = Length of int | Split of string
+
+ (** Type of [Bigarray] used here. *)
+ type bigarray =
+ (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
+
+ (** Different types of failure.
+ *
+ * [Io_error] is raised when reading or writing
+ * returned 0. This usually means that the socket
+ * was closed. *)
+ type failure =
+ | Io_error
+ | Unix of Unix.error * string * string
+ | Unknown of exn
+ | Timeout
+
+ (** Wrapper to perform a read on a socket and trigger a function when
+ * a marker has been detected, or enough data has been read.
+ * It reads recursively on a socket, splitting into strings seperated
+ * by the marker (if any) and calls the given function on the list of strings.
+ *
+ * Can be used recursively or not, depending on the way you process strings.
+ * Because of Unix's semantic, it is not possible to stop reading
+ * at first marker, so there can be a remaining string. If not used
+ * recursively, the second optional argument may contain a remaining
+ * string. You should then initiate the next read with this value.
+ *
+ * The [on_error] function is used when reading failed on the socket.
+ * Depending on your usage, it can be a hard failure, or simply a lost client.
+ * The string passed to [on_error] contains data read before error
+ * occured.
+ * @param recursive recursively read and process, default: [true]
+ * @param init initial string for reading, default: [""]
+ * @param on_error function used when read failed, default: [fun _ -> ()]
+ * @param timeout Terminate with [Timeout] failure if nothing has been read
+ * after the given amout of time in seconds. More precisely,
+ * the exception is raised when no character have been read
+ * and the socket was not close while waiting. Default: wait
+ * forever. *)
+ val read :
+ ?recursive:bool ->
+ ?init:string ->
+ ?on_error:(string * failure -> unit) ->
+ ?timeout:float ->
+ priority:'a ->
+ 'a scheduler ->
+ socket ->
+ marker ->
+ (string * string option -> unit) ->
+ unit
+
+ (** Similar to [read] but less complex.
+ * [write ?exec ?on_error ?string ?bigarray ~priority scheduler socket]
+ * write data from [string], or from [bigarray] if no string is given,
+ * to [socket], and executes [exec] or [on_error] if errors occured.
+ *
+ * Caveat: on Win32, all file descriptors are expected to be in blocking
+ * mode before being passed to this call due to limitations in the emulation
+ * of the unix/posix API. See code comments for more details.
+ *
+ * @param exec function to execute after writing, default: [fun () -> ()]
+ * @param on_error function to execute when an error occured, default: [fun _ -> ()]
+ * @param string write data from this string
+ * @param bigarray write data from this bigarray, if no [string] is given
+ * @param timeout Terminate with [Timeout] failure if nothing has been written
+ * after the given amout of time in seconds. More precisely,
+ * the exception is raised when no character have been written
+ * and the socket was not close while waiting. Default: wait
+ * forever. *)
+ val write :
+ ?exec:(unit -> unit) ->
+ ?on_error:(failure -> unit) ->
+ ?bigarray:bigarray ->
+ ?offset:int ->
+ ?length:int ->
+ ?string:Bytes.t ->
+ ?timeout:float ->
+ priority:'a ->
+ 'a scheduler ->
+ socket ->
+ unit
+end
+
+module MakeIo (Transport : Transport_t) : Io_t with type socket = Transport.t
+module Io : Io_t with type socket = Unix.file_descr
+
+(** Monadic interface to {!Duppy.Io}.
+ *
+ * This module can be used to write code
+ * that runs in various Duppy's tasks and
+ * raise values in a completely transparent way.
+ *
+ * You can see examples of its use
+ * in the [examples/] directory of the
+ * source code and in the files
+ * [src/tools/{harbor.camlp4,server.camlp4}]
+ * in liquidsoap's code.
+ *
+ * When a server communicates
+ * with a client, it performs several
+ * computations and, eventually, terminates.
+ * A computation can either return a new
+ * value or terminate. For instance:
+ *
+ * - Client connects.
+ * - Server tries to authenticate the client.
+ * - If authentication is ok, proceed with the next step.
+ * - Otherwise terminate.
+ *
+ * The purpose of the monad is to embed
+ * computations which can either return
+ * a new value or raise a value that is used
+ * to terminate. *)
+module Monad : sig
+ (** Type representing a computation
+ * which returns a value of type ['a]
+ * or raises a value of type ['b] *)
+ type ('a, 'b) t
+
+ (** [return x] create a computation that
+ * returns value [x]. *)
+ val return : 'a -> ('a, 'b) t
+
+ (** [raise x] create a computation that raises
+ * value [x]. *)
+ val raise : 'b -> ('a, 'b) t
+
+ (** Compose two computations.
+ * [bind f g] is equivalent to:
+ * [let x = f in g x] where [x]
+ * has f's return type. *)
+ val bind : ('a, 'b) t -> ('a -> ('c, 'b) t) -> ('c, 'b) t
+
+ (** [>>=] is an alternative notation
+ * for [bind] *)
+ val ( >>= ) : ('a, 'b) t -> ('a -> ('c, 'b) t) -> ('c, 'b) t
+
+ (** [run f ~return ~raise ()] executes [f] and process
+ * returned values with [return] or raised values
+ * with [raise]. *)
+ val run : return:('a -> unit) -> raise:('b -> unit) -> ('a, 'b) t -> unit
+
+ (** [catch f g] redirects values [x] raised during
+ * [f]'s execution to [g]. The name suggests the
+ * usual [try .. with ..] exception catching. *)
+ val catch : ('a, 'b) t -> ('b -> ('a, 'c) t) -> ('a, 'c) t
+
+ (** [=<<] is an alternative notation for catch. *)
+ val ( =<< ) : ('b -> ('a, 'c) t) -> ('a, 'b) t -> ('a, 'c) t
+
+ (** [fold_left f a [b1; b2; ..]] returns computation
+ * [ (f a b1) >>= (fun a -> f a b2) >>= ...] *)
+ val fold_left : ('a -> 'b -> ('a, 'c) t) -> 'a -> 'b list -> ('a, 'c) t
+
+ (** [iter f [x1; x2; ..]] returns computation
+ * [f x1 >>= (fun () -> f x2) >>= ...] *)
+ val iter : ('a -> (unit, 'b) t) -> 'a list -> (unit, 'b) t
+
+ (** This module implements monadic
+ * mutex computations. They can be used
+ * to write blocking code that is compatible
+ * with duppy's tasks, i.e. [Mutex.lock m] blocks
+ * the calling computation and not the calling thread. *)
+ module Mutex : sig
+ (** Information used to initialize a Mutex module.
+ * [priority] and [scheduler] are used to initialize a task
+ * which treat mutexes as well as conditions from the below
+ * [Condition] module. *)
+ module type Mutex_control = sig
+ type priority
+
+ val scheduler : priority scheduler
+ val priority : priority
+ end
+
+ module type Mutex_t = sig
+ (** Type for a mutex. *)
+ type mutex
+
+ module Control : Mutex_control
+
+ (** [create ()] creates a mutex. *)
+ val create : unit -> mutex
+
+ (** A computation that locks a mutex
+ * and returns [unit] afterwards. Computation
+ * will be blocked until the mutex is sucessfuly locked. *)
+ val lock : mutex -> (unit, 'a) t
+
+ (** A computation that tries to lock a mutex.
+ * Returns immediatly [true] if the mutex was sucesfully locked
+ * or [false] otherwise. *)
+ val try_lock : mutex -> (bool, 'a) t
+
+ (** A computation that unlocks a mutex.
+ * Should return immediatly. *)
+ val unlock : mutex -> (unit, 'a) t
+ end
+
+ module Factory (Control : Mutex_control) : Mutex_t
+ end
+
+ (** This module implements monadic
+ * condition computations. They can be used
+ * to write waiting code that is compatible
+ * with duppy's tasks, i.e. [Condition.wait c m] blocks
+ * the calling computation and not the calling thread
+ * until [Condition.signal c] or [Condition.broadcast c] has
+ * been called. *)
+ module Condition : sig
+ module Factory (Mutex : Mutex.Mutex_t) : sig
+ (** Type of a condition, used in [wait] and [broadcast] *)
+ type condition
+
+ (** Create a condition. Implementation-wise,
+ * a duppy task is created that will be used to select a
+ * waiting computation, and resume it.
+ * Thus, [priority] and [s] represents, resp., the priority
+ * and scheduler used when running calling process' computation. *)
+ val create : unit -> condition
+
+ (** [wait h m] is a computation that:
+ * {ul
+ * {- Unlock mutex [m]}
+ * {- Wait until [Condition.signal c] or [Condition.broadcast c]
+ has been called}
+ * {- Locks mutex [m]}
+ * {- Returns [unit]}} *)
+ val wait : condition -> Mutex.mutex -> (unit, 'a) t
+
+ (** [broadcast c] is a computation that
+ * resumes all computations waiting on [c]. It should
+ * return immediately. *)
+ val broadcast : condition -> (unit, 'a) t
+
+ (** [signal c] is a computation that resumes one
+ * computation waiting on [c]. It should return
+ * immediately. *)
+ val signal : condition -> (unit, 'a) t
+ end
+ end
+
+ (** This module implements monadic computations
+ * using [Duppy.Io]. It can be used to create
+ * computations that read or write from a socket,
+ * and also to redirect a computation in a different
+ * queue with a new priority. *)
+ module type Monad_io_t = sig
+ type socket
+
+ module Io : Io_t with type socket = socket
+
+ (** {2 Type } *)
+
+ (** A handler for this module
+ * is a record that contains the
+ * required elements. In particular,
+ * [on_error] is a function that transforms
+ * an error raised by [Duppy.Io] to a reply
+ * used to terminate the computation.
+ * [data] is an internal data buffer. It should
+ * be initialized with [""]. It contains the
+ * remaining data that was received when
+ * using [read]. If an error occured,
+ * [data] contain data read before the
+ * error. *)
+ type ('a, 'b) handler = {
+ scheduler : 'a scheduler;
+ socket : Io.socket;
+ mutable data : string;
+ on_error : Io.failure -> 'b;
+ }
+
+ (** {2 Execution flow } *)
+
+ (** [exec ?delay ~priority h f] redirects computation
+ * [f] into a new queue with priority [priority] and
+ * delay [delay] ([0.] by default).
+ * It can be used to redirect a computation that
+ * has to run under a different priority. For instance,
+ * a computation that reads from a socket is generally
+ * not blocking because the function is executed
+ * only when some data is available for reading.
+ * However, if the data that is read needs to be processed
+ * by a computation that can be blocking, then one may
+ * use [exec] to redirect this computation into an
+ * appropriate queue. *)
+ val exec :
+ ?delay:float ->
+ priority:'a ->
+ ('a, 'b) handler ->
+ ('c, 'b) t ->
+ ('c, 'b) t
+
+ (** [delay ~priority h d] creates a computation that returns
+ * [unit] after delay [d] in seconds. *)
+ val delay : priority:'a -> ('a, 'b) handler -> float -> (unit, 'b) t
+
+ (** {2 Read/write } *)
+
+ (** [read ?timeout ~priority ~marker h] creates a
+ * computation that reads from [h.socket]
+ * and returns the first string split
+ * according to [marker]. This function
+ * can be used to create a computation that
+ * reads data from a socket. [timeout] parameter
+ * forces the computation to return an error if
+ * nothing has been read for more than [timeout]
+ * seconds. Default: wait forever. *)
+ val read :
+ ?timeout:float ->
+ priority:'a ->
+ marker:Io.marker ->
+ ('a, 'b) handler ->
+ (string, 'b) t
+
+ (** [read_all ?timeout ~priority s sock] creates a
+ * computation that reads all data from [sock]
+ * and returns it. Raised value contains data
+ * read before an error occured. *)
+ val read_all :
+ ?timeout:float ->
+ priority:'a ->
+ 'a scheduler ->
+ Io.socket ->
+ (string, string * Io.failure) t
+
+ (** [write ?timeout ~priority h s] creates a computation
+ * that writes string [s] to [h.socket]. This
+ * function can be used to create a computation
+ * that sends data to a socket. [timeout] parameter
+ * forces the computation to return an error if
+ * nothing has been written for more than [timeout]
+ * seconds. Default: wait forever. *)
+ val write :
+ ?timeout:float ->
+ priority:'a ->
+ ('a, 'b) handler ->
+ ?offset:int ->
+ ?length:int ->
+ Bytes.t ->
+ (unit, 'b) t
+
+ (** [write_bigarray ?timeout ~priority h ba] creates a computation
+ * that writes data from [ba] to [h.socket]. This function
+ * can to create a computation that writes data to a socket. *)
+ val write_bigarray :
+ ?timeout:float ->
+ priority:'a ->
+ ('a, 'b) handler ->
+ Io.bigarray ->
+ (unit, 'b) t
+ end
+
+ module MakeIo (Io : Io_t) :
+ Monad_io_t with type socket = Io.socket and module Io = Io
+
+ module Io : Monad_io_t with type socket = Unix.file_descr and module Io = Io
+end
+
+(** {2 Some culture..}
+ * {e Duppy is a Caribbean patois word of West African origin meaning ghost or spirit.
+ * Much of Caribbean folklore revolves around duppies.
+ * Duppies are generally regarded as malevolent spirits.
+ * They are said to come out and haunt people at night mostly,
+ * and people from the islands claim to have seen them.
+ * The 'Rolling Calf', 'Three footed horse' or 'Old Higue' are examples of the more malicious spirits. }
+ * {R {{:http://en.wikipedia.org/wiki/Duppy} http://en.wikipedia.org/wiki/Duppy}}*)
diff --git a/src/duppy_stubs.c b/src/duppy_stubs.c
new file mode 100644
index 0000000..ac70284
--- /dev/null
+++ b/src/duppy_stubs.c
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2010 Savonet team
+ *
+ * This file is part of Ocaml-duppy.
+ *
+ * Ocaml-duppy is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Ocaml-duppy is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Ocaml-duppy; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ */
+
+#include <caml/alloc.h>
+#include <caml/signals.h>
+#include <caml/unixsupport.h>
+#include <caml/memory.h>
+#include <caml/bigarray.h>
+#include <caml/fail.h>
+#include <caml/threads.h>
+
+#include <errno.h>
+
+/* On native Windows platforms, many macros are not defined. */
+# if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
+
+#ifndef EWOULDBLOCK
+#define EWOULDBLOCK EAGAIN
+#endif
+
+#endif
+
+#ifdef WIN32
+#define Fd_val(fd) win_CRT_fd_of_filedescr(fd)
+#define Val_fd(fd) caml_failwith("Val_fd")
+#else
+#define Fd_val(fd) Int_val(fd)
+#define Val_fd(fd) Val_int(fd)
+#endif
+
+#ifndef WIN32
+#include <poll.h>
+
+CAMLprim value caml_poll(value _read, value _write, value _err, value _timeout) {
+ CAMLparam3(_read, _write, _err);
+ CAMLlocal4(_pread, _pwrite, _perr, _ret);
+
+ struct pollfd *fds;
+ nfds_t nfds = 0;
+ nfds_t nread = 0;
+ nfds_t nwrite = 0;
+ nfds_t nerr = 0;
+ int timeout;
+ size_t last = 0;
+ int n, ret;
+
+ if (Double_val(_timeout) == -1)
+ timeout = -1;
+ else
+ timeout = Double_val(_timeout) * 1000;
+
+ nfds += Wosize_val(_read);
+ nfds += Wosize_val(_write);
+ nfds += Wosize_val(_err);
+
+ fds = calloc(nfds,sizeof(struct pollfd));
+ if (fds == NULL) caml_raise_out_of_memory();
+
+ for (n = 0; n < Wosize_val(_read); n++) {
+ fds[last+n].fd = Fd_val(Field(_read,n));
+ fds[last+n].events = POLLIN;
+ }
+ last += Wosize_val(_read);
+
+ for (n = 0; n < Wosize_val(_write); n++) {
+ fds[last+n].fd = Fd_val(Field(_write,n));
+ fds[last+n].events = POLLOUT;
+ }
+ last += Wosize_val(_write);
+
+ for (n = 0; n < Wosize_val(_err); n++) {
+ fds[last+n].fd = Fd_val(Field(_err,n));
+ fds[last+n].events = POLLERR;
+ }
+
+ caml_release_runtime_system();
+ ret = poll(fds, nfds, timeout);
+ caml_acquire_runtime_system();
+
+ if (ret == -1) {
+ free(fds);
+ uerror("poll",Nothing);
+ }
+
+ for (n = 0; n < nfds; n++) {
+ if (fds[n].revents & POLLIN)
+ nread++;
+ if (fds[n].revents & POLLOUT)
+ nwrite++;
+ if (fds[n].revents & POLLERR)
+ nerr++;
+ }
+
+ _pread = caml_alloc_tuple(nread);
+ nread = 0;
+
+ _pwrite = caml_alloc_tuple(nwrite);
+ nwrite = 0;
+
+ _perr = caml_alloc_tuple(nerr);
+ nerr = 0;
+
+ for (n = 0; n < nfds; n++) {
+ if (fds[n].revents & POLLIN) {
+ Store_field(_pread, nread, Val_fd(fds[n].fd));
+ nread++;
+ }
+ if (fds[n].revents & POLLOUT) {
+ Store_field(_pwrite, nwrite, Val_fd(fds[n].fd));
+ nwrite++;
+ }
+ if (fds[n].revents & POLLERR) {
+ Store_field(_pread, nerr, Val_fd(fds[n].fd));
+ nerr++;
+ }
+ }
+
+ free(fds);
+
+ _ret = caml_alloc_tuple(3);
+ Store_field(_ret, 0, _pread);
+ Store_field(_ret, 1, _pwrite);
+ Store_field(_ret, 2, _perr);
+
+ CAMLreturn(_ret);
+}
+#else
+CAMLprim value caml_poll(value _read, value _write, value _err, value _timeout) {
+ caml_failwith("caml_poll");
+}
+#endif
+
+CAMLprim value ocaml_duppy_write_ba(value _fd, value ba, value _ofs, value _len)
+{
+ CAMLparam2(ba,_fd) ;
+ int fd = Fd_val(_fd);
+ long ofs = Long_val(_ofs);
+ long len = Long_val(_len);
+ void *buf = Caml_ba_data_val(ba);
+ int ret;
+
+ int written = 0;
+ while (len > 0) {
+ caml_enter_blocking_section();
+ ret = write(fd, buf+ofs, len);
+ caml_leave_blocking_section();
+ if (ret == -1) {
+ if ((errno == EAGAIN || errno == EWOULDBLOCK) && written > 0) break;
+ uerror("write", Nothing);
+ }
+ written += ret;
+ ofs += ret;
+ len -= ret;
+ }
+
+ CAMLreturn(Val_long(written));
+}
+