diff options
author | Stéphane Glondu <glondu@debian.org> | 2021-11-28 17:37:56 +0100 |
---|---|---|
committer | Stéphane Glondu <glondu@debian.org> | 2021-11-28 17:37:56 +0100 |
commit | 72db6d910338499b96f773e0498d4f9fe32607e2 (patch) | |
tree | f6a74276a8cdab5d8dd2020079ee417625b67724 |
Import ocaml-duppy_0.9.2.orig.tar.gz
[dgit import orig ocaml-duppy_0.9.2.orig.tar.gz]
-rw-r--r-- | .github/workflows/ci.yml | 24 | ||||
-rw-r--r-- | .gitignore | 7 | ||||
-rw-r--r-- | .merlin | 4 | ||||
-rw-r--r-- | .ocamlformat | 9 | ||||
-rwxr-xr-x | .travis-ci.sh | 15 | ||||
-rw-r--r-- | CHANGES | 133 | ||||
-rw-r--r-- | COPYING | 504 | ||||
-rw-r--r-- | README.md | 48 | ||||
-rw-r--r-- | dune-project | 17 | ||||
-rw-r--r-- | duppy.opam | 29 | ||||
-rw-r--r-- | examples/dune | 9 | ||||
-rw-r--r-- | examples/http.ml | 555 | ||||
-rw-r--r-- | examples/index.html | 1 | ||||
-rw-r--r-- | examples/telnet.ml | 151 | ||||
-rw-r--r-- | src/dune | 8 | ||||
-rw-r--r-- | src/duppy.ml | 1077 | ||||
-rw-r--r-- | src/duppy.mli | 549 | ||||
-rw-r--r-- | src/duppy_stubs.c | 176 |
18 files changed, 3316 insertions, 0 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..5f6a8f0 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,24 @@ +name: CI + +on: [push] + +jobs: + build: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, macos-latest] + steps: + - uses: actions/checkout@v1 + - name: Setup OCaml + uses: avsm/setup-ocaml@master + - name: Install depext module + run: opam install -y depext + - name: Pin locally + run: opam pin -y add --no-action . + - name: Install locally + run: opam depext -y -i duppy + - name: Build locally + run: eval $(opam env) && dune build + - name: Run tests locally + run: eval $(opam env) && dune runtest diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6fadc6f --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +*~ +_build +*.byte +*.native +_tests +.merlin +*.install @@ -0,0 +1,4 @@ +B src/** +S src/** +B +threads +PKG pcre diff --git a/.ocamlformat b/.ocamlformat new file mode 100644 index 0000000..533d804 --- /dev/null +++ b/.ocamlformat @@ -0,0 +1,9 @@ +profile = conventional +break-separators = after +space-around-lists = false +doc-comments = before +match-indent = 2 +match-indent-nested = always +parens-ite +exp-grouping = preserve +module-item-spacing = compact diff --git a/.travis-ci.sh b/.travis-ci.sh new file mode 100755 index 0000000..82e886d --- /dev/null +++ b/.travis-ci.sh @@ -0,0 +1,15 @@ +# Hacking the build into Travis-CI "C" environment +# See http://anil.recoil.org/2013/09/30/travis-and-ocaml.html + +export OPAMYES=1 +opam init +if [ -n "${OPAM_SWITCH}" ]; then + opam switch ${OPAM_SWITCH} +fi +eval `opam config env` +opam install -y depext dune +opam pin -y add --no-action . +opam depext -y -i duppy + +# compile +dune build @@ -0,0 +1,133 @@ +0.9.2 (07-10-2021) +===== +* Fix deadlock issue at shutdown. + +0.9.1 (06-21-2021) +===== +* Make `stop` synchronous, waiting for all tasks to stop + while sending `Condition.signal`. Should avoid potential + race-conditions when signaling tasks to end. + +0.9.0 (07-10-2020) +===== +* Add offset/length to writing functions. +* Convert to dune. +* Drop unused SSL and SecureTransport optional libs. + +0.8.0 (12-11-2018) +===== +* Removed camlp4 syntactic sugar (unmaintained, unused in liquidsoap now). + +0.7.4 (10-11-2018) +===== +* Fix stack overflow by making recursive function fully tail-rec. (ref savonet/liquidsoap#640) + +0.7.3 (12-09-2018) +===== +* Fix write/select logic on windows systems. (savonet/liquidsoap#610) +* Avoid race conditions when shutting down. + +0.7.2 (28-08-2018) +===== +* Add placeholder implementation for `caml_poll` on Win32. + +0.7.1 (18-08-2018) +===== +* Use poll() when available. +* Wake up all queues when shutting down. + +0.7.0 (03-11-2017) +===== +* Fix bytes compatibility with OCaml 4.06 and above. +* Fix camlp4 availability test. + +0.6.1 (23-08-2017) +===== +* Added SecureTransport support. + +0.6.0 (11-04-2017) +===== +* Added SSL support. + +0.5.2 (03-08-2015) +===== +* Dummy github release. + +0.5.1 (05-08-2013) +===== +* Removed win32 select work-around: patch applied upstream. + +0.5.0 (04-03-2013) +===== +* Remove Panic exception and let original exception bubble through. + +0.4.2 (08-10-2011) +===== +* Reimplemented monadic Mutex and Condition. +* Consume more than one char when waking up Async tasks. + +0.4.1 (04-08-2011) +===== +* Added optional timeout for + all [Duppy.Io] and [Duppy.Monad.Io] + operations. +* Fixed handling of EINTR: update the + timeout when restarting after being + interrupted. + +0.4.0 (26-06-2011) +===== +* Added a monad API to write + server code. +* Close both sides of the pipe + in Duppy.Async +* Make calls to [stop] and [wake_up] + thread-safe in Duppy.Async +* Catch Unix.EINTR when calling Unix.select. + +0.3.2 (19-08-2010) +===== +* Switch from Thread.select to + Unix.select. They are the same on + POSIX and only Unix.select is available + on Win32.. +* Do not use assertions on Mutex.try_lock + on Win32: on this plateform, a thread can + double-lock a mutex, making the assertion + inconsistent. + +0.3.1 (14-10-2009) +===== +* Really catch raised exception on Duppy.Io + operations: catching was missing on recurrent + calls. + +0.3.0 (18-06-2009) +===== +* Added support for --enable-debugging configure option +* Fixed Makefile for BSD: call $(MAKE) for generating documentation. +* Added the possibility to restart the task after the returned positive + delay in Async. +* Added unknown exceptions on Duppy.Io when calling on_error. + +0.2.0 (17-02-2009) +===== +* Fixed typo in Duppy.Async: exception is now Stopped. + +0.1.2 (01-07-2008) +===== +* Changed logic in shutdown for Async interface: + now [Duppy.Async.shutdown t] also wakes the task if + asleep. Still it can't stop a running task. +* Fixed race conditions when a queue starts the select loop: + a task could be submitted, but no queue would wake up. + +0.1.1 (15-04-2008) +===== +* Fixed Conditions usage for non-unix systems +* Fixed typos in the documentation, added some details +* Installs .cmx file + +0.1.0 (07-03-2008) +===== +* Initial release @@ -0,0 +1,504 @@ + GNU LESSER GENERAL PUBLIC LICENSE + Version 2.1, February 1999 + + Copyright (C) 1991, 1999 Free Software Foundation, Inc. + 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + +[This is the first released version of the Lesser GPL. It also counts + as the successor of the GNU Library Public License, version 2, hence + the version number 2.1.] + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +Licenses are intended to guarantee your freedom to share and change +free software--to make sure the software is free for all its users. + + This license, the Lesser General Public License, applies to some +specially designated software packages--typically libraries--of the +Free Software Foundation and other authors who decide to use it. You +can use it too, but we suggest you first think carefully about whether +this license or the ordinary General Public License is the better +strategy to use in any particular case, based on the explanations below. + + When we speak of free software, we are referring to freedom of use, +not price. Our General Public Licenses are designed to make sure that +you have the freedom to distribute copies of free software (and charge +for this service if you wish); that you receive source code or can get +it if you want it; that you can change the software and use pieces of +it in new free programs; and that you are informed that you can do +these things. + + To protect your rights, we need to make restrictions that forbid +distributors to deny you these rights or to ask you to surrender these +rights. These restrictions translate to certain responsibilities for +you if you distribute copies of the library or if you modify it. + + For example, if you distribute copies of the library, whether gratis +or for a fee, you must give the recipients all the rights that we gave +you. You must make sure that they, too, receive or can get the source +code. If you link other code with the library, you must provide +complete object files to the recipients, so that they can relink them +with the library after making changes to the library and recompiling +it. And you must show them these terms so they know their rights. + + We protect your rights with a two-step method: (1) we copyright the +library, and (2) we offer you this license, which gives you legal +permission to copy, distribute and/or modify the library. + + To protect each distributor, we want to make it very clear that +there is no warranty for the free library. Also, if the library is +modified by someone else and passed on, the recipients should know +that what they have is not the original version, so that the original +author's reputation will not be affected by problems that might be +introduced by others. + + Finally, software patents pose a constant threat to the existence of +any free program. We wish to make sure that a company cannot +effectively restrict the users of a free program by obtaining a +restrictive license from a patent holder. Therefore, we insist that +any patent license obtained for a version of the library must be +consistent with the full freedom of use specified in this license. + + Most GNU software, including some libraries, is covered by the +ordinary GNU General Public License. This license, the GNU Lesser +General Public License, applies to certain designated libraries, and +is quite different from the ordinary General Public License. We use +this license for certain libraries in order to permit linking those +libraries into non-free programs. + + When a program is linked with a library, whether statically or using +a shared library, the combination of the two is legally speaking a +combined work, a derivative of the original library. The ordinary +General Public License therefore permits such linking only if the +entire combination fits its criteria of freedom. The Lesser General +Public License permits more lax criteria for linking other code with +the library. + + We call this license the "Lesser" General Public License because it +does Less to protect the user's freedom than the ordinary General +Public License. It also provides other free software developers Less +of an advantage over competing non-free programs. These disadvantages +are the reason we use the ordinary General Public License for many +libraries. However, the Lesser license provides advantages in certain +special circumstances. + + For example, on rare occasions, there may be a special need to +encourage the widest possible use of a certain library, so that it becomes +a de-facto standard. To achieve this, non-free programs must be +allowed to use the library. A more frequent case is that a free +library does the same job as widely used non-free libraries. In this +case, there is little to gain by limiting the free library to free +software only, so we use the Lesser General Public License. + + In other cases, permission to use a particular library in non-free +programs enables a greater number of people to use a large body of +free software. For example, permission to use the GNU C Library in +non-free programs enables many more people to use the whole GNU +operating system, as well as its variant, the GNU/Linux operating +system. + + Although the Lesser General Public License is Less protective of the +users' freedom, it does ensure that the user of a program that is +linked with the Library has the freedom and the wherewithal to run +that program using a modified version of the Library. + + The precise terms and conditions for copying, distribution and +modification follow. Pay close attention to the difference between a +"work based on the library" and a "work that uses the library". The +former contains code derived from the library, whereas the latter must +be combined with the library in order to run. + + GNU LESSER GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License Agreement applies to any software library or other +program which contains a notice placed by the copyright holder or +other authorized party saying it may be distributed under the terms of +this Lesser General Public License (also called "this License"). +Each licensee is addressed as "you". + + A "library" means a collection of software functions and/or data +prepared so as to be conveniently linked with application programs +(which use some of those functions and data) to form executables. + + The "Library", below, refers to any such software library or work +which has been distributed under these terms. A "work based on the +Library" means either the Library or any derivative work under +copyright law: that is to say, a work containing the Library or a +portion of it, either verbatim or with modifications and/or translated +straightforwardly into another language. (Hereinafter, translation is +included without limitation in the term "modification".) + + "Source code" for a work means the preferred form of the work for +making modifications to it. For a library, complete source code means +all the source code for all modules it contains, plus any associated +interface definition files, plus the scripts used to control compilation +and installation of the library. + + Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running a program using the Library is not restricted, and output from +such a program is covered only if its contents constitute a work based +on the Library (independent of the use of the Library in a tool for +writing it). Whether that is true depends on what the Library does +and what the program that uses the Library does. + + 1. You may copy and distribute verbatim copies of the Library's +complete source code as you receive it, in any medium, provided that +you conspicuously and appropriately publish on each copy an +appropriate copyright notice and disclaimer of warranty; keep intact +all the notices that refer to this License and to the absence of any +warranty; and distribute a copy of this License along with the +Library. + + You may charge a fee for the physical act of transferring a copy, +and you may at your option offer warranty protection in exchange for a +fee. + + 2. You may modify your copy or copies of the Library or any portion +of it, thus forming a work based on the Library, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) The modified work must itself be a software library. + + b) You must cause the files modified to carry prominent notices + stating that you changed the files and the date of any change. + + c) You must cause the whole of the work to be licensed at no + charge to all third parties under the terms of this License. + + d) If a facility in the modified Library refers to a function or a + table of data to be supplied by an application program that uses + the facility, other than as an argument passed when the facility + is invoked, then you must make a good faith effort to ensure that, + in the event an application does not supply such function or + table, the facility still operates, and performs whatever part of + its purpose remains meaningful. + + (For example, a function in a library to compute square roots has + a purpose that is entirely well-defined independent of the + application. Therefore, Subsection 2d requires that any + application-supplied function or table used by this function must + be optional: if the application does not supply it, the square + root function must still compute square roots.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Library, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Library, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote +it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Library. + +In addition, mere aggregation of another work not based on the Library +with the Library (or with a work based on the Library) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may opt to apply the terms of the ordinary GNU General Public +License instead of this License to a given copy of the Library. To do +this, you must alter all the notices that refer to this License, so +that they refer to the ordinary GNU General Public License, version 2, +instead of to this License. (If a newer version than version 2 of the +ordinary GNU General Public License has appeared, then you can specify +that version instead if you wish.) Do not make any other change in +these notices. + + Once this change is made in a given copy, it is irreversible for +that copy, so the ordinary GNU General Public License applies to all +subsequent copies and derivative works made from that copy. + + This option is useful when you wish to copy part of the code of +the Library into a program that is not a library. + + 4. You may copy and distribute the Library (or a portion or +derivative of it, under Section 2) in object code or executable form +under the terms of Sections 1 and 2 above provided that you accompany +it with the complete corresponding machine-readable source code, which +must be distributed under the terms of Sections 1 and 2 above on a +medium customarily used for software interchange. + + If distribution of object code is made by offering access to copy +from a designated place, then offering equivalent access to copy the +source code from the same place satisfies the requirement to +distribute the source code, even though third parties are not +compelled to copy the source along with the object code. + + 5. A program that contains no derivative of any portion of the +Library, but is designed to work with the Library by being compiled or +linked with it, is called a "work that uses the Library". Such a +work, in isolation, is not a derivative work of the Library, and +therefore falls outside the scope of this License. + + However, linking a "work that uses the Library" with the Library +creates an executable that is a derivative of the Library (because it +contains portions of the Library), rather than a "work that uses the +library". The executable is therefore covered by this License. +Section 6 states terms for distribution of such executables. + + When a "work that uses the Library" uses material from a header file +that is part of the Library, the object code for the work may be a +derivative work of the Library even though the source code is not. +Whether this is true is especially significant if the work can be +linked without the Library, or if the work is itself a library. The +threshold for this to be true is not precisely defined by law. + + If such an object file uses only numerical parameters, data +structure layouts and accessors, and small macros and small inline +functions (ten lines or less in length), then the use of the object +file is unrestricted, regardless of whether it is legally a derivative +work. (Executables containing this object code plus portions of the +Library will still fall under Section 6.) + + Otherwise, if the work is a derivative of the Library, you may +distribute the object code for the work under the terms of Section 6. +Any executables containing that work also fall under Section 6, +whether or not they are linked directly with the Library itself. + + 6. As an exception to the Sections above, you may also combine or +link a "work that uses the Library" with the Library to produce a +work containing portions of the Library, and distribute that work +under terms of your choice, provided that the terms permit +modification of the work for the customer's own use and reverse +engineering for debugging such modifications. + + You must give prominent notice with each copy of the work that the +Library is used in it and that the Library and its use are covered by +this License. You must supply a copy of this License. If the work +during execution displays copyright notices, you must include the +copyright notice for the Library among them, as well as a reference +directing the user to the copy of this License. Also, you must do one +of these things: + + a) Accompany the work with the complete corresponding + machine-readable source code for the Library including whatever + changes were used in the work (which must be distributed under + Sections 1 and 2 above); and, if the work is an executable linked + with the Library, with the complete machine-readable "work that + uses the Library", as object code and/or source code, so that the + user can modify the Library and then relink to produce a modified + executable containing the modified Library. (It is understood + that the user who changes the contents of definitions files in the + Library will not necessarily be able to recompile the application + to use the modified definitions.) + + b) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (1) uses at run time a + copy of the library already present on the user's computer system, + rather than copying library functions into the executable, and (2) + will operate properly with a modified version of the library, if + the user installs one, as long as the modified version is + interface-compatible with the version that the work was made with. + + c) Accompany the work with a written offer, valid for at + least three years, to give the same user the materials + specified in Subsection 6a, above, for a charge no more + than the cost of performing this distribution. + + d) If distribution of the work is made by offering access to copy + from a designated place, offer equivalent access to copy the above + specified materials from the same place. + + e) Verify that the user has already received a copy of these + materials or that you have already sent this user a copy. + + For an executable, the required form of the "work that uses the +Library" must include any data and utility programs needed for +reproducing the executable from it. However, as a special exception, +the materials to be distributed need not include anything that is +normally distributed (in either source or binary form) with the major +components (compiler, kernel, and so on) of the operating system on +which the executable runs, unless that component itself accompanies +the executable. + + It may happen that this requirement contradicts the license +restrictions of other proprietary libraries that do not normally +accompany the operating system. Such a contradiction means you cannot +use both them and the Library together in an executable that you +distribute. + + 7. You may place library facilities that are a work based on the +Library side-by-side in a single library together with other library +facilities not covered by this License, and distribute such a combined +library, provided that the separate distribution of the work based on +the Library and of the other library facilities is otherwise +permitted, and provided that you do these two things: + + a) Accompany the combined library with a copy of the same work + based on the Library, uncombined with any other library + facilities. This must be distributed under the terms of the + Sections above. + + b) Give prominent notice with the combined library of the fact + that part of it is a work based on the Library, and explaining + where to find the accompanying uncombined form of the same work. + + 8. You may not copy, modify, sublicense, link with, or distribute +the Library except as expressly provided under this License. Any +attempt otherwise to copy, modify, sublicense, link with, or +distribute the Library is void, and will automatically terminate your +rights under this License. However, parties who have received copies, +or rights, from you under this License will not have their licenses +terminated so long as such parties remain in full compliance. + + 9. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Library or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Library (or any work based on the +Library), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Library or works based on it. + + 10. Each time you redistribute the Library (or any work based on the +Library), the recipient automatically receives a license from the +original licensor to copy, distribute, link with or modify the Library +subject to these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties with +this License. + + 11. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Library at all. For example, if a patent +license would not permit royalty-free redistribution of the Library by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Library. + +If any portion of this section is held invalid or unenforceable under any +particular circumstance, the balance of the section is intended to apply, +and the section as a whole is intended to apply in other circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 12. If the distribution and/or use of the Library is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Library under this License may add +an explicit geographical distribution limitation excluding those countries, +so that distribution is permitted only in or among countries not thus +excluded. In such case, this License incorporates the limitation as if +written in the body of this License. + + 13. The Free Software Foundation may publish revised and/or new +versions of the Lesser General Public License from time to time. +Such new versions will be similar in spirit to the present version, +but may differ in detail to address new problems or concerns. + +Each version is given a distinguishing version number. If the Library +specifies a version number of this License which applies to it and +"any later version", you have the option of following the terms and +conditions either of that version or of any later version published by +the Free Software Foundation. If the Library does not specify a +license version number, you may choose any version ever published by +the Free Software Foundation. + + 14. If you wish to incorporate parts of the Library into other free +programs whose distribution conditions are incompatible with these, +write to the author to ask for permission. For software which is +copyrighted by the Free Software Foundation, write to the Free +Software Foundation; we sometimes make exceptions for this. Our +decision will be guided by the two goals of preserving the free status +of all derivatives of our free software and of promoting the sharing +and reuse of software generally. + + NO WARRANTY + + 15. BECAUSE THE LIBRARY IS LICENSED FREE OF CHARGE, THERE IS NO +WARRANTY FOR THE LIBRARY, TO THE EXTENT PERMITTED BY APPLICABLE LAW. +EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR +OTHER PARTIES PROVIDE THE LIBRARY "AS IS" WITHOUT WARRANTY OF ANY +KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE +LIBRARY IS WITH YOU. SHOULD THE LIBRARY PROVE DEFECTIVE, YOU ASSUME +THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN +WRITING WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY +AND/OR REDISTRIBUTE THE LIBRARY AS PERMITTED ABOVE, BE LIABLE TO YOU +FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR +CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE +LIBRARY (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING +RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A +FAILURE OF THE LIBRARY TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF +SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH +DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Libraries + + If you develop a new library, and you want it to be of the greatest +possible use to the public, we recommend making it free software that +everyone can redistribute and change. You can do so by permitting +redistribution under these terms (or, alternatively, under the terms of the +ordinary General Public License). + + To apply these terms, attach the following notices to the library. It is +safest to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least the +"copyright" line and a pointer to where the full notice is found. + + <one line to give the library's name and a brief idea of what it does.> + Copyright (C) <year> <name of author> + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +Also add information on how to contact you by electronic and paper mail. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the library, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the + library `Frob' (a library for tweaking knobs) written by James Random Hacker. + + <signature of Ty Coon>, 1 April 1990 + Ty Coon, President of Vice + +That's all there is to it! + + diff --git a/README.md b/README.md new file mode 100644 index 0000000..d621f21 --- /dev/null +++ b/README.md @@ -0,0 +1,48 @@ +# ocaml-duppy + +ocaml-duppy is an advanced scheduler for Ocaml programmers. + +Please read the COPYING file before using this software. + +## Prerequisites: + +- ocaml >= 4.03.0 + +- findlib >= 1.8.0 + +- ocaml-pcre >= 7.3.4 + +- dune >= 2.0 + +The code may work with earlier versions but these are the one currently +supported. + +## Compilation: + +```sh +$ dune build +``` + +This should build both the native and the byte-code version of the +extension library. + +## Installation: + +Via `opam`: + +```sh +$ opam install duppy +``` + +Via `dune` (for developers): +```sh +$ dune install +``` + +This should install the library file (using ocamlfind) in the +appropriate place. + +## Author: + +This author of this software may be contacted by electronic mail +at the following address: savonet-users@lists.sourceforge.net. diff --git a/dune-project b/dune-project new file mode 100644 index 0000000..6c50961 --- /dev/null +++ b/dune-project @@ -0,0 +1,17 @@ +(lang dune 2.7) +(version 0.9.2) +(name duppy) +(source (github savonet/ocaml-duppy)) +(license GPL-2.0) +(authors "Romain Beauxis <toots@rastageeks.org>") +(maintainers "The Savonet Team <savonet-users@lists.sourceforge.net>") + +(generate_opam_files true) + +(package + (name duppy) + (synopsis "Library providing monadic threads") + (depends + dune + pcre) +) diff --git a/duppy.opam b/duppy.opam new file mode 100644 index 0000000..0095389 --- /dev/null +++ b/duppy.opam @@ -0,0 +1,29 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +version: "0.9.2" +synopsis: "Library providing monadic threads" +maintainer: ["The Savonet Team <savonet-users@lists.sourceforge.net>"] +authors: ["Romain Beauxis <toots@rastageeks.org>"] +license: "GPL-2.0" +homepage: "https://github.com/savonet/ocaml-duppy" +bug-reports: "https://github.com/savonet/ocaml-duppy/issues" +depends: [ + "dune" {>= "2.7"} + "pcre" + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/savonet/ocaml-duppy.git" diff --git a/examples/dune b/examples/dune new file mode 100644 index 0000000..887559f --- /dev/null +++ b/examples/dune @@ -0,0 +1,9 @@ +(executable + (name http) + (modules http) + (libraries duppy)) + +(executable + (name telnet) + (modules telnet) + (libraries duppy)) diff --git a/examples/http.ml b/examples/http.ml new file mode 100644 index 0000000..d6e65ce --- /dev/null +++ b/examples/http.ml @@ -0,0 +1,555 @@ +let non_blocking_queues = ref 3 +let maybe_blocking_queues = ref 1 +let files_path = ref "" +let port = ref 8080 +let usage = "usage: http [options] /path/to/files" + +let () = + let pnum = ref 0 in + let arg s = + incr pnum; + if !pnum > 1 then ( + Printf.eprintf "Error: too many arguments\n"; + exit 1 ) + else files_path := s + in + Arg.parse + [ + ( "--non_blocking_queues", + Arg.Int (fun i -> non_blocking_queues := i), + Printf.sprintf "Number of non-blocking queues. (default: %d)" + !non_blocking_queues ); + ( "--maybe_blocking_queues", + Arg.Int (fun i -> maybe_blocking_queues := i), + Printf.sprintf "Number of maybe-blocking queues. (default: %d)" + !maybe_blocking_queues ); + ( "--port", + Arg.Int (fun i -> port := i), + Printf.sprintf "Port used to bind the server. (default: %d)" !port ); + ] + arg usage; + if !files_path = "" then ( + Printf.printf "%s\n" usage; + exit 1 ) + else () + +type priority = Maybe_blocking | Non_blocking + +let scheduler = Duppy.create () + +type http_method = Post | Get +type http_protocol = Http_11 | Http_10 + +let string_of_protocol = function + | Http_11 -> "HTTP/1.1" + | Http_10 -> "HTTP/1.0" + +let protocol_of_string = function + | "HTTP/1.1" -> Http_11 + | "HTTP/1.0" -> Http_10 + | _ -> assert false + +let string_of_method = function Post -> "POST" | Get -> "GET" + +let method_of_string = function + | "POST" -> Post + | "GET" -> Get + | _ -> assert false + +type data = None | String of string | File of Unix.file_descr + +type request = { + request_protocol : http_protocol; + request_method : http_method; + request_uri : string; + request_headers : (string * string) list; + request_data : data; +} + +type reply = { + reply_protocol : http_protocol; + reply_status : int * string; + reply_headers : (string * string) list; + reply_data : data; +} + +exception Assoc of string + +let assoc_uppercase x y = + try + List.iter + (fun (l, v) -> + if String.uppercase_ascii l = x then raise (Assoc v) else ()) + y; + raise Not_found + with Assoc s -> s + +let server = "dhttpd" + +let html_template = + Printf.sprintf + "<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.1//EN\" \ + \"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd\">\r\n\ + <html xmlns=\"http://www.w3.org/1999/xhtml\" xml:lang=\"en\">\r\n\ + %s</html>" + +let server_error status protocol = + let _, explanation = status in + let data = + String + (html_template + (Printf.sprintf "<head><title>%s</title></head>\r\n<body>%s !</body>" + explanation explanation)) + in + { + reply_protocol = protocol; + reply_status = status; + reply_headers = + [("Content-Type", "text/html; charset=UTF-8"); ("Server", server)]; + reply_data = data; + } + +let error_404 = server_error (404, "File Not Found") +let error_500 = server_error (500, "Bad Request") Http_10 +let error_403 = server_error (403, "Forbidden") + +let http_302 protocol uri = + { + reply_protocol = protocol; + reply_status = (302, "Found"); + reply_headers = [("Location", uri)]; + reply_data = String ""; + } + +type socket_status = Keep | Close + +let send_reply h reply = + let write s = + Duppy.Monad.Io.write ?timeout:None ~priority:Non_blocking h + (Bytes.unsafe_of_string s) + in + let code, status = reply.reply_status in + let http_header = + Printf.sprintf "%s %d %s\r\n%s\r\n\r\n" + (string_of_protocol reply.reply_protocol) + code status + (String.concat "\r\n" + (List.map + (fun (x, y) -> Printf.sprintf "%s: %s" x y) + reply.reply_headers)) + in + Duppy.Monad.bind (write http_header) (fun () -> + match reply.reply_data with + | String s -> write s + | File fd -> + let stats = Unix.fstat fd in + let ba = + Unix.map_file fd Bigarray.char Bigarray.c_layout false + [| stats.Unix.st_size |] + in + let ba = Bigarray.array1_of_genarray ba in + let close () = try Unix.close fd with _ -> () in + let on_error e = + close (); + h.Duppy.Monad.Io.on_error e + in + let h = { h with Duppy.Monad.Io.on_error } in + Duppy.Monad.bind + (Duppy.Monad.Io.write_bigarray ?timeout:None + ~priority:Non_blocking h ba) (fun () -> + Duppy.Monad.return (close ())) + | None -> Duppy.Monad.return ()) + +let parse_headers headers = + let split_header l h = + try + let rex = Pcre.regexp "([^:\\r\\n]+):\\s*([^\\r\\n]+)" in + let sub = Pcre.exec ~rex h in + Duppy.Monad.return + ((Pcre.get_substring sub 1, Pcre.get_substring sub 2) :: l) + with Not_found -> Duppy.Monad.raise error_500 + in + Duppy.Monad.fold_left split_header [] headers + +let index_uri path index protocol uri = + let uri = + try + let ret = Pcre.extract ~rex:(Pcre.regexp "([^\\?]*)\\?") uri in + ret.(1) + with Not_found -> uri + in + try + if Sys.is_directory (Printf.sprintf "%s%s" path uri) then + if uri.[String.length uri - 1] <> '/' then + Duppy.Monad.raise (http_302 protocol (Printf.sprintf "%s/" uri)) + else ( + let index = Printf.sprintf "%s/%s" uri index in + if Sys.file_exists (Printf.sprintf "%s/%s" path index) then + Duppy.Monad.return index + else Duppy.Monad.return uri ) + else Duppy.Monad.return uri + with _ -> Duppy.Monad.return uri + +let file_request path _ request = + let uri = + try + let ret = + Pcre.extract ~rex:(Pcre.regexp "([^\\?]*)\\?.*") request.request_uri + in + ret.(1) + with Not_found -> request.request_uri + in + let __pa_duppy_0 = index_uri path "index.html" request.request_protocol uri in + Duppy.Monad.bind __pa_duppy_0 (fun uri -> + let fname = Printf.sprintf "%s%s" path uri in + if Sys.file_exists fname then ( + try + let fd = Unix.openfile fname [Unix.O_RDONLY] 0o640 in + let stats = Unix.fstat fd in + let headers = + [ + ("Server", server); + ("Content-Length", string_of_int stats.Unix.st_size); + ] + in + let headers = + if Pcre.pmatch ~rex:(Pcre.regexp "\\.html$") fname then + ("Content-Type", "text/html") :: headers + else if Pcre.pmatch ~rex:(Pcre.regexp "\\.css$") fname then + ("Content-Type", "text/css") :: headers + else headers + in + Duppy.Monad.raise + { + reply_protocol = request.request_protocol; + reply_status = (200, "OK"); + reply_headers = headers; + reply_data = File fd; + } + with _ -> Duppy.Monad.raise (error_403 request.request_protocol) ) + else Duppy.Monad.raise (error_404 request.request_protocol)) + +let file_handler = ((fun _ -> Duppy.Monad.return true), file_request !files_path) + +let cgi_handler process path h request = + let uri, args, suffix = + try + let ret = + Pcre.extract ~rex:(Pcre.regexp "([^\\?]*)\\?(.*)") request.request_uri + in + try + let ans = + Pcre.extract ~rex:(Pcre.regexp "^([^/]*)/([^&=]*)$") ret.(2) + in + (ret.(1), ans.(1), ans.(2)) + with Not_found -> (ret.(1), ret.(2), "") + with Not_found -> (request.request_uri, "", "") + in + let __pa_duppy_0 = index_uri path "index.php" request.request_protocol uri in + Duppy.Monad.bind __pa_duppy_0 (fun script -> + let script = Printf.sprintf "%s%s" path script in + let env = + Printf.sprintf + "export SERVER_SOFTWARE=Duppy-httpd/1.0; export \ + SERVER_NAME=localhost; export GATEWAY_INTERFACE=CGI/1.1; export \ + SERVER_PROTOCOL=%s; export SERVER_PORT=%d; export \ + REQUEST_METHOD=%s; export REQUEST_URI=%s; export \ + REDIRECT_STATUS=200; export SCRIPT_FILENAME=%s" + (string_of_protocol request.request_protocol) + !port + (string_of_method request.request_method) + (Filename.quote uri) (Filename.quote script) + in + let env = + Printf.sprintf "%s; export QUERY_STRING=%s" env (Filename.quote args) + in + let env = + let tr_suffix = Printf.sprintf "%s%s" path suffix in + (* Trick ! *) + let tr_suffix = + Printf.sprintf "%s/%s" + (Filename.dirname tr_suffix) + (Filename.basename tr_suffix) + in + Printf.sprintf "%s; export PATH_TRANSLATED=%s; export PATH_INFO=%s" env + (Filename.quote tr_suffix) (Filename.quote suffix) + in + let sanitize s = + Pcre.replace ~pat:"-" ~templ:"_" (String.uppercase_ascii s) + in + let headers = + List.map (fun (x, y) -> (sanitize x, y)) request.request_headers + in + let append env key = + if List.mem_assoc key headers then + Printf.sprintf "%s; export %s=%s" env key + (Filename.quote (List.assoc key headers)) + else env + in + let env = append env "CONTENT_TYPE" in + let env = append env "CONTENT_LENGTH" in + let __pa_duppy_0 = + if List.mem_assoc "AUTHORIZATION" headers then ( + let ret = + Pcre.extract + ~rex:(Pcre.regexp "(^[^\\s]*\\s.*)$") + (List.assoc "AUTHORIZATION" headers) + in + if Array.length ret > 0 then + Duppy.Monad.return + (Printf.sprintf "%s; extract AUTH_TYPE=%s" env ret.(1)) + else Duppy.Monad.raise error_500 ) + else Duppy.Monad.return env + in + Duppy.Monad.bind __pa_duppy_0 (fun env -> + let f env (x, y) = + Printf.sprintf "%s; export HTTP_%s=%s" env x (Filename.quote y) + in + let env = List.fold_left f env headers in + let data = + match request.request_data with + | None -> "" + | String s -> s + | _ -> assert false + in + (* not implemented *) + let process = Printf.sprintf "%s; %s 2>/dev/null" env process in + let in_c, out_c = Unix.open_process process in + let out_s = Unix.descr_of_out_channel out_c in + let h = { h with Duppy.Monad.Io.socket = out_s; data = "" } in + let __pa_duppy_0 = + Duppy.Monad.Io.write ?timeout:None ~priority:Non_blocking h + (Bytes.unsafe_of_string data) + in + Duppy.Monad.bind __pa_duppy_0 (fun () -> + let in_s = Unix.descr_of_in_channel in_c in + let h = { h with Duppy.Monad.Io.socket = in_s; data = "" } in + let __pa_duppy_0 = + Duppy.Monad.Io.read ?timeout:None ~priority:Non_blocking + ~marker:(Duppy.Io.Split "[\r]?\n[\r]?\n") h + in + Duppy.Monad.bind __pa_duppy_0 (fun headers -> + let __pa_duppy_0 = + Duppy.Monad.catch + (Duppy.Monad.Io.read_all ?timeout:None + ~priority:Non_blocking h.Duppy.Monad.Io.scheduler in_s) + (fun (s, _) -> Duppy.Monad.return s) + in + Duppy.Monad.bind __pa_duppy_0 (fun data -> + let data = + Printf.sprintf "%s%s" h.Duppy.Monad.Io.data data + in + ignore (Unix.close_process (in_c, out_c)); + let __pa_duppy_0 = + let headers = Pcre.split ~pat:"\r\n" headers in + parse_headers headers + in + Duppy.Monad.bind __pa_duppy_0 (fun headers -> + let __pa_duppy_0 = + if List.mem_assoc "Status" headers then ( + try + let ans = + Pcre.extract + ~rex:(Pcre.regexp "([\\d]+)\\s(.*)") + (List.assoc "Status" headers) + in + Duppy.Monad.return + ( (int_of_string ans.(1), ans.(2)), + List.filter + (fun (x, _) -> x <> "Status") + headers ) + with _ -> Duppy.Monad.raise error_500 ) + else Duppy.Monad.return ((200, "OK"), headers) + in + Duppy.Monad.bind __pa_duppy_0 + (fun (status, headers) -> + let headers = + ( "Content-length", + string_of_int (String.length data) ) + :: headers + in + Duppy.Monad.raise + { + reply_protocol = request.request_protocol; + reply_status = status; + reply_headers = headers; + reply_data = String data; + }))))))) + +let php_handler = + ( (fun request -> + let __pa_duppy_0 = + index_uri !files_path "index.php" request.request_protocol + request.request_uri + in + Duppy.Monad.bind __pa_duppy_0 (fun uri -> + Duppy.Monad.return (Pcre.pmatch ~rex:(Pcre.regexp "\\.php$") uri))), + cgi_handler "php-cgi" !files_path ) + +let handlers = [php_handler; file_handler] + +let handle_request h request = + let f (check, handler) = + let __pa_duppy_0 = check request in + Duppy.Monad.bind __pa_duppy_0 (fun check -> + if check then handler h request else Duppy.Monad.return ()) + in + Duppy.Monad.catch + (Duppy.Monad.bind (Duppy.Monad.iter f handlers) (fun () -> + Duppy.Monad.return (error_404 request.request_protocol))) + (fun reply -> Duppy.Monad.return reply) + +let parse_request h r = + try + let headers = Pcre.split ~pat:"\r\n" r in + let __pa_duppy_0 = + match headers with + | e :: l -> + let __pa_duppy_0 = parse_headers l in + Duppy.Monad.bind __pa_duppy_0 (fun headers -> + Duppy.Monad.return (e, headers)) + | _ -> Duppy.Monad.raise error_500 + in + Duppy.Monad.bind __pa_duppy_0 (fun (request, headers) -> + let rex = Pcre.regexp "([\\w]+)\\s([^\\s]+)\\s(HTTP/1.[01])" in + let __pa_duppy_0 = + try + let sub = Pcre.exec ~rex request in + let http_method, uri, protocol = + ( Pcre.get_substring sub 1, + Pcre.get_substring sub 2, + Pcre.get_substring sub 3 ) + in + Duppy.Monad.return + (method_of_string http_method, uri, protocol_of_string protocol) + with _ -> Duppy.Monad.raise error_500 + in + Duppy.Monad.bind __pa_duppy_0 (fun (http_method, uri, protocol) -> + let __pa_duppy_0 = + match http_method with + | Get -> Duppy.Monad.return None + | Post -> + let __pa_duppy_0 = + try + let length = assoc_uppercase "CONTENT-LENGTH" headers in + Duppy.Monad.return (int_of_string length) + with + | Not_found -> Duppy.Monad.return 0 + | _ -> Duppy.Monad.raise error_500 + in + Duppy.Monad.bind __pa_duppy_0 (fun len -> + match len with + | 0 -> Duppy.Monad.return None + | d -> + let __pa_duppy_0 = + Duppy.Monad.Io.read ?timeout:None + ~priority:Non_blocking + ~marker:(Duppy.Io.Length d) h + in + Duppy.Monad.bind __pa_duppy_0 (fun data -> + Duppy.Monad.return (String data))) + in + Duppy.Monad.bind __pa_duppy_0 (fun data -> + Duppy.Monad.return + { + request_method = http_method; + request_protocol = protocol; + request_uri = uri; + request_headers = headers; + request_data = data; + }))) + with _ -> Duppy.Monad.raise error_500 + +let handle_client socket = + (* Read and process lines *) + let on_error _ = error_500 in + let h = { Duppy.Monad.Io.scheduler; socket; data = ""; on_error } in + let rec exec () = + let __pa_duppy_0 = + Duppy.Monad.catch + (let __pa_duppy_0 = + Duppy.Monad.Io.read ?timeout:None ~priority:Non_blocking + ~marker:(Duppy.Io.Split "\r\n\r\n") h + in + Duppy.Monad.bind __pa_duppy_0 (fun data -> + let __pa_duppy_0 = parse_request h data in + Duppy.Monad.bind __pa_duppy_0 (fun request -> + let __pa_duppy_0 = handle_request h request in + Duppy.Monad.bind __pa_duppy_0 (fun reply -> + let close_header headers = + try assoc_uppercase "CONNECTION" headers = "close" + with Not_found -> false + in + let keep = + if + request.request_protocol = Http_10 + || close_header request.request_headers + || close_header reply.reply_headers + then Close + else Keep + in + Duppy.Monad.return (keep, reply))))) + (fun reply -> Duppy.Monad.return (Close, reply)) + in + Duppy.Monad.bind __pa_duppy_0 (fun (keep, reply) -> + Duppy.Monad.bind (send_reply h reply) (fun () -> + if keep = Keep then exec () else Duppy.Monad.return ())) + in + let finish _ = try Unix.close socket with _ -> () in + Duppy.Monad.run ~return:finish ~raise:finish (exec ()) + +let new_queue ~priority ~name () = + let priorities p = p = priority in + let queue () = Duppy.queue scheduler ~log:(fun _ -> ()) ~priorities name in + Thread.create queue () + +let bind_addr_inet = Unix.inet_addr_of_string "0.0.0.0" +let bind_addr = Unix.ADDR_INET (bind_addr_inet, !port) +let max_conn = 100 +let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 + +let () = + (* See http://caml.inria.fr/mantis/print_bug_page.php?bug_id=4640 + * for this: we want Unix EPIPE error and not SIGPIPE, which + * crashes the program.. *) + Sys.set_signal Sys.sigpipe Sys.Signal_ignore; + ignore (Unix.sigprocmask Unix.SIG_BLOCK [Sys.sigpipe]); + Unix.setsockopt sock Unix.SO_REUSEADDR true; + let rec incoming _ = + ( try + let s, _ = Unix.accept sock in + handle_client s + with e -> + Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e) + ); + [ + { + Duppy.Task.priority = Non_blocking; + events = [`Read sock]; + handler = incoming; + }; + ] + in + ( try Unix.bind sock bind_addr + with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") -> + failwith (Printf.sprintf "port %d already taken" !port) ); + Unix.listen sock max_conn; + Duppy.Task.add scheduler + { + Duppy.Task.priority = Non_blocking; + events = [`Read sock]; + handler = incoming; + }; + for i = 1 to !non_blocking_queues do + ignore + (new_queue ~priority:Non_blocking + ~name:(Printf.sprintf "Non blocking queue #%d" i) + ()) + done; + for i = 1 to !maybe_blocking_queues do + ignore + (new_queue ~priority:Maybe_blocking + ~name:(Printf.sprintf "Maybe blocking queue #%d" i) + ()) + done; + Duppy.queue scheduler ~log:(fun _ -> ()) "root" diff --git a/examples/index.html b/examples/index.html new file mode 100644 index 0000000..a7f8d9e --- /dev/null +++ b/examples/index.html @@ -0,0 +1 @@ +bla diff --git a/examples/telnet.ml b/examples/telnet.ml new file mode 100644 index 0000000..1511e33 --- /dev/null +++ b/examples/telnet.ml @@ -0,0 +1,151 @@ +type priority = Non_blocking | Maybe_blocking + +let io_priority = Non_blocking + +(* Create scheduler *) +let scheduler = Duppy.create () + +(* Create two queues, + * one for non blocking events + * and another for blocking + * events *) +let new_queue ~priority ~name () = + let log = Printf.printf "%s: %s\n%!" name in + let priorities p = p = priority in + let queue () = Duppy.queue scheduler ~log ~priorities name in + Thread.create queue () + +let th = + ignore (new_queue ~priority:Non_blocking ~name:"Non blocking queue" ()); + ignore (new_queue ~priority:Maybe_blocking ~name:"Maybe blocking queue #1" ()); + new_queue ~priority:Maybe_blocking ~name:"Maybe blocking queue #2" () + +let exec_command s () = + let chan = Unix.open_process_in s in + let rec aux () = + match try Some (input_line chan) with End_of_file -> None with + | None -> [] + | Some s -> s :: aux () + in + let l = aux () in + ignore (Unix.close_process_in chan); + Duppy.Monad.return (String.concat "\r\n" l) + +let commands = Hashtbl.create 10 + +let () = + Hashtbl.add commands "hello" (false, fun () -> Duppy.Monad.return "world"); + Hashtbl.add commands "foo" (false, fun () -> Duppy.Monad.return "bar"); + Hashtbl.add commands "uptime" (true, exec_command "uptime"); + Hashtbl.add commands "date" (true, exec_command "date"); + Hashtbl.add commands "whoami" (true, exec_command "whoami"); + Hashtbl.add commands "sleep" (true, exec_command "sleep 15"); + Hashtbl.add commands "exit" (true, fun () -> Duppy.Monad.raise ()) + +(* Add commands here *) +let help = Buffer.create 10 + +let () = + Buffer.add_string help "List of commands:"; + Hashtbl.iter + (fun x _ -> Buffer.add_string help (Printf.sprintf "\r\n%s" x)) + commands; + Hashtbl.add commands "help" + (false, fun () -> Duppy.Monad.return (Buffer.contents help)) + +let handle_client socket = + let on_error e = + match e with + | Duppy.Io.Io_error -> Printf.printf "Client disconnected" + | Duppy.Io.Unix (c, p, m) -> + Printf.printf "%s" (Printexc.to_string (Unix.Unix_error (c, p, m))) + | Duppy.Io.Unknown e -> Printf.printf "%s" (Printexc.to_string e) + | Duppy.Io.Timeout -> Printf.printf "Timeout" + in + let h = { Duppy.Monad.Io.scheduler; socket; data = ""; on_error } in + (* Read and process lines *) + let rec exec () = + let __pa_duppy_0 = + Duppy.Monad.Io.read ?timeout:None ~priority:io_priority + ~marker:(Duppy.Io.Split "[\r\n]+") h + in + Duppy.Monad.bind __pa_duppy_0 (fun req -> + let __pa_duppy_0 = + try + let blocking, command = Hashtbl.find commands req in + if not blocking then command () + else Duppy.Monad.Io.exec ~priority:Maybe_blocking h (command ()) + with Not_found -> + Duppy.Monad.return + "ERROR: unknown command, type \"help\" to get a list of commands." + in + Duppy.Monad.bind __pa_duppy_0 (fun ans -> + Duppy.Monad.bind + (Duppy.Monad.bind + (Duppy.Monad.Io.write ?timeout:None ~priority:io_priority h + (Bytes.unsafe_of_string "BEGIN\r\n")) + (fun () -> + Duppy.Monad.bind + (Duppy.Monad.Io.write ?timeout:None ~priority:io_priority h + (Bytes.unsafe_of_string ans)) + (fun () -> + Duppy.Monad.Io.write ?timeout:None ~priority:io_priority + h + (Bytes.unsafe_of_string "\r\nEND\r\n")))) + (fun () -> exec ()))) + in + let close () = try Unix.close socket with _ -> () in + let return () = + let on_error e = + on_error e; + close () + in + Duppy.Io.write ~priority:io_priority ~on_error ~exec:close scheduler + ~string:(Bytes.unsafe_of_string "Bye!\r\n") + socket + in + Duppy.Monad.run ~return ~raise:close (exec ()) + +open Unix + +let port = 4123 +let bind_addr_inet = inet_addr_of_string "0.0.0.0" +let bind_addr = ADDR_INET (bind_addr_inet, port) +let max_conn = 10 +let sock = socket PF_INET SOCK_STREAM 0 + +let () = + setsockopt sock SO_REUSEADDR true; + let rec incoming _ = + ( try + let s, caller = accept sock in + let ip = + let a = + match caller with ADDR_INET (a, _) -> a | _ -> assert false + in + try (gethostbyaddr a).h_name with Not_found -> string_of_inet_addr a + in + Printf.printf "New client: %s\n" ip; + handle_client s + with e -> + Printf.printf "Failed to accept new client: %S\n" (Printexc.to_string e) + ); + [ + { + Duppy.Task.priority = io_priority; + Duppy.Task.events = [`Read sock]; + Duppy.Task.handler = incoming; + }; + ] + in + ( try bind sock bind_addr + with Unix.Unix_error (Unix.EADDRINUSE, "bind", "") -> + failwith (Printf.sprintf "port %d already taken" port) ); + listen sock max_conn; + Duppy.Task.add scheduler + { + Duppy.Task.priority = io_priority; + Duppy.Task.events = [`Read sock]; + Duppy.Task.handler = incoming; + }; + Thread.join th diff --git a/src/dune b/src/dune new file mode 100644 index 0000000..0f40400 --- /dev/null +++ b/src/dune @@ -0,0 +1,8 @@ +(library + (name duppy) + (public_name duppy) + (libraries unix threads pcre bigarray) + (foreign_stubs + (language c) + (names duppy_stubs)) + (synopsis "OCaml advanced scheduler")) diff --git a/src/duppy.ml b/src/duppy.ml new file mode 100644 index 0000000..cb0d6e6 --- /dev/null +++ b/src/duppy.ml @@ -0,0 +1,1077 @@ +(***************************************************************************** + + Duppy, a task scheduler for OCaml. + Copyright 2003-2010 Savonet team + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details, fully stated in the COPYING + file at the root of the liquidsoap distribution. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + *****************************************************************************) + +type fd = Unix.file_descr + +external poll : + Unix.file_descr array -> + Unix.file_descr array -> + Unix.file_descr array -> + float -> + Unix.file_descr array * Unix.file_descr array * Unix.file_descr array + = "caml_poll" + +let poll r w e timeout = + let r = Array.of_list r in + let w = Array.of_list w in + let e = Array.of_list e in + let r, w, e = poll r w e timeout in + (Array.to_list r, Array.to_list w, Array.to_list e) + +let select, select_fname = + match Sys.os_type with + | "Unix" -> (poll, "poll") + | _ -> (Unix.select, "select") + +(** [remove f l] is like [List.find f l] but also returns the result of removing + * the found element from the original list. *) +let remove f l = + let rec aux acc = function + | [] -> raise Not_found + | x :: l -> if f x then (x, List.rev_append acc l) else aux (x :: acc) l + in + aux [] l + +(** Events and tasks from the implementation point-of-view: + * we have to hide the 'a parameter. *) + +type e = { r : fd list; w : fd list; x : fd list; t : float } + +type 'a t = { + timestamp : float; + prio : 'a; + enrich : e -> e; + is_ready : e -> (unit -> 'a t list) option; +} + +type 'a scheduler = { + out_pipe : Unix.file_descr; + in_pipe : Unix.file_descr; + compare : 'a -> 'a -> int; + select_m : Mutex.t; + mutable tasks : 'a t list; + tasks_m : Mutex.t; + mutable ready : ('a * (unit -> 'a t list)) list; + ready_m : Mutex.t; + mutable queues : Condition.t list; + queues_m : Mutex.t; + mutable stop : bool; + stop_m : Mutex.t; + queue_stopped_c : Condition.t; +} + +let clear_tasks s = + Mutex.lock s.tasks_m; + s.tasks <- []; + Mutex.unlock s.tasks_m + +let create ?(compare = compare) () = + let out_pipe, in_pipe = Unix.pipe () in + { + out_pipe; + in_pipe; + compare; + select_m = Mutex.create (); + tasks = []; + tasks_m = Mutex.create (); + ready = []; + ready_m = Mutex.create (); + queues = []; + queues_m = Mutex.create (); + stop = false; + stop_m = Mutex.create (); + queue_stopped_c = Condition.create (); + } + +let wake_up s = ignore (Unix.write s.in_pipe (Bytes.of_string "x") 0 1) + +module Task = struct + (** Events and tasks from the user's point-of-view. *) + + type event = + [ `Delay of float | `Write of fd | `Read of fd | `Exception of fd ] + + type ('a, 'b) task = { + priority : 'a; + events : 'b list; + handler : 'b list -> ('a, 'b) task list; + } + + let time () = Unix.gettimeofday () + + let rec t_of_task (task : ('a, [< event ]) task) = + let t0 = time () in + { + timestamp = t0; + prio = task.priority; + enrich = + (fun e -> + List.fold_left + (fun e -> function `Delay s -> { e with t = min e.t (t0 +. s) } + | `Read s -> { e with r = s :: e.r } + | `Write s -> { e with w = s :: e.w } + | `Exception s -> { e with x = s :: e.x }) + e task.events); + is_ready = + (fun e -> + let l = + List.filter + (fun evt -> + match (evt :> event) with + | `Delay s when time () > t0 +. s -> true + | `Read s when List.mem s e.r -> true + | `Write s when List.mem s e.w -> true + | `Exception s when List.mem s e.x -> true + | _ -> false) + task.events + in + if l = [] then None + else Some (fun () -> List.map t_of_task (task.handler l))); + } + + let add_t s items = + let f item = + match item.is_ready { r = []; w = []; x = []; t = 0. } with + | Some f -> + Mutex.lock s.ready_m; + s.ready <- (item.prio, f) :: s.ready; + Mutex.unlock s.ready_m + | None -> + Mutex.lock s.tasks_m; + s.tasks <- item :: s.tasks; + Mutex.unlock s.tasks_m + in + List.iter f items; + wake_up s + + let add s t = add_t s [t_of_task t] +end + +open Task + +let stop s = + clear_tasks s; + Mutex.lock s.stop_m; + s.stop <- true; + Mutex.unlock s.stop_m; + Mutex.lock s.queues_m; + while List.length s.queues > 0 do + wake_up s; + Mutex.lock s.ready_m; + List.iter Condition.signal s.queues; + Mutex.unlock s.ready_m; + Condition.wait s.queue_stopped_c s.queues_m + done; + Mutex.unlock s.queues_m + +let tmp = Bytes.create 1024 + +(** There should be only one call of #process at a time. + * Process waits for tasks to become ready, and moves ready tasks + * to the ready queue. *) +let process s log = + (* Compute the union of all events. *) + let e = + List.fold_left + (fun e t -> t.enrich e) + { r = [s.out_pipe]; w = []; x = []; t = infinity } + s.tasks + in + (* Poll for an event. *) + let r, w, x = + let rec f () = + try + let timeout = if e.t = infinity then -1. else max 0. (e.t -. time ()) in + log + (Printf.sprintf "Enter %s at %f, timeout %f (%d/%d/%d)." select_fname + (time ()) timeout (List.length e.r) (List.length e.w) + (List.length e.x)); + let r, w, x = select e.r e.w e.x timeout in + log + (Printf.sprintf "Left %s at %f (%d/%d/%d)." select_fname (time ()) + (List.length r) (List.length w) (List.length x)); + (r, w, x) + with + | Unix.Unix_error (Unix.EINTR, _, _) -> + (* [EINTR] means that select was interrupted by + * a signal before any of the selected events + * occurred and before the timeout interval expired. + * We catch it and restart.. *) + log (Printf.sprintf "Select interrupted at %f." (time ())); + f () + | e -> + (* Uncaught exception: + * 1) Discards all tasks currently in the loop (we do not know which + * socket caused an error). + * 2) Re-Raise e *) + clear_tasks s; + raise e + in + f () + in + (* Empty the wake_up pipe if needed. *) + let () = + if List.mem s.out_pipe r then + (* For safety, we may absorb more than + * one write. This avoids bad situation + * when exceesive wake_up may fill up the + * pipe's write buffer, causing a wake_up + * to become blocking.. *) + ignore (Unix.read s.out_pipe tmp 0 1024) + in + (* Move ready tasks to the ready list. *) + let e = { r; w; x; t = 0. } in + Mutex.lock s.tasks_m; + (* Split [tasks] into [r]eady and still [w]aiting. *) + let r, w = + List.fold_left + (fun (r, w) t -> + match t.is_ready e with + | Some f -> ((t.prio, f) :: r, w) + | None -> (r, t :: w)) + ([], []) s.tasks + in + s.tasks <- w; + Mutex.unlock s.tasks_m; + Mutex.lock s.ready_m; + s.ready <- + List.stable_sort (fun (p, _) (p', _) -> s.compare p p') (s.ready @ r); + Mutex.unlock s.ready_m + +(** Code for a queue to process ready tasks. + * Returns true a task was found (and hence processed). + * + * s.ready_m *must* be locked before calling + * this function, and is freed *only* + * if some task was processed. *) +let exec s (priorities : 'a -> bool) = + (* This assertion does not work on + * win32 because a thread can double-lock + * the same mutex.. *) + if Sys.os_type <> "Win32" then assert (not (Mutex.try_lock s.ready_m)); + try + let (_, task), remaining = remove (fun (p, _) -> priorities p) s.ready in + s.ready <- remaining; + Mutex.unlock s.ready_m; + add_t s (task ()); + true + with Not_found -> false + +exception Queue_stopped +exception Queue_processed + +(** Main loop for queues. *) +let queue ?log ?(priorities = fun _ -> true) s name = + let log = + match log with Some e -> e | None -> Printf.printf "queue %s: %s\n" name + in + let c = + let c = Condition.create () in + Mutex.lock s.queues_m; + s.queues <- c :: s.queues; + Mutex.unlock s.queues_m; + log (Printf.sprintf "Queue #%d starting..." (List.length s.queues)); + c + in + (* Try to process ready tasks, otherwise try to become the master, + * or be a slave and wait for the master to get some more ready tasks. *) + let run () = + Mutex.lock s.stop_m; + let stop = s.stop in + Mutex.unlock s.stop_m; + if stop then raise Queue_stopped; + (* Lock the ready tasks until the queue has a task to proceed, + * *or* is really ready to restart on its condition, see the + * Condition.wait call below for the atomic unlock and wait. *) + Mutex.lock s.ready_m; + log (Printf.sprintf "There are %d ready tasks." (List.length s.ready)); + if exec s priorities then raise Queue_processed; + let wake () = + let is_ready = + Mutex.lock s.ready_m; + let is_ready = s.ready <> [] in + Mutex.unlock s.ready_m; + is_ready + in + (* Wake up other queues if there are remaining tasks *) + if is_ready then begin + Mutex.lock s.queues_m; + List.iter (fun x -> if x <> c then Condition.signal x) s.queues; + Mutex.unlock s.queues_m + end + in + if Mutex.try_lock s.select_m then begin + (* Processing finished for me + * I can unlock ready_m now.. *) + Mutex.unlock s.ready_m; + process s log; + Mutex.unlock s.select_m; + wake (); + end + else begin + (* We use s.ready_m mutex here. + * Hence, we avoid race conditions + * with any other queue being processing + * a task that would create a new task: + * without this mutex, the new task may not be + * notified to this queue if it is going to sleep + * in concurrency.. + * It also avoid race conditions when restarting + * queues since s.ready_m is locked until all + * queues have been signaled. *) + Condition.wait c s.ready_m; + Mutex.unlock s.ready_m + end + in + let rec f () = + begin + try run () with Queue_processed -> () + end; + (f [@tailcall]) () + in + let on_done () = + Mutex.lock s.queues_m; + s.queues <- List.filter (fun q -> q <> c) s.queues; + Condition.signal s.queue_stopped_c; + Mutex.unlock s.queues_m + in + ( try f () with + | Queue_stopped -> () + | exn -> + on_done (); + raise exn ); + on_done () + +module Async = struct + (* m is used to make sure that + * calls to [wake_up] and [stop] + * are thread-safe. *) + type t = { stop : bool ref; mutable fd : fd option; m : Mutex.t } + + exception Stopped + + let add ~priority (scheduler : 'a scheduler) f = + (* A pipe to wake up the task *) + let out_pipe, in_pipe = Unix.pipe () in + let stop = ref false in + let tmp = Bytes.create 1024 in + let rec task l = + if List.exists (( = ) (`Read out_pipe)) l then + (* Consume data from the pipe *) + ignore (Unix.read out_pipe tmp 0 1024); + if !stop then begin + begin + try + (* This interface is purely asynchronous + * so we close both sides of the pipe here. *) + Unix.close in_pipe; + Unix.close out_pipe + with _ -> () + end; + [] + end + else begin + let delay = f () in + let event = if delay >= 0. then [`Delay delay] else [] in + [{ priority; events = `Read out_pipe :: event; handler = task }] + end + in + let task = { priority; events = [`Read out_pipe]; handler = task } in + add scheduler task; + { stop; fd = Some in_pipe; m = Mutex.create () } + + let wake_up t = + Mutex.lock t.m; + try + begin + match t.fd with + | Some t -> ignore (Unix.write t (Bytes.of_string " ") 0 1) + | None -> raise Stopped + end; + Mutex.unlock t.m + with e -> + Mutex.unlock t.m; + raise e + + let stop t = + Mutex.lock t.m; + try + begin + match t.fd with + | Some c -> + t.stop := true; + ignore (Unix.write c (Bytes.of_string " ") 0 1) + | None -> raise Stopped + end; + t.fd <- None; + Mutex.unlock t.m + with e -> + Mutex.unlock t.m; + raise e +end + +module type Transport_t = sig + type t + + type bigarray = + (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t + + val sock : t -> Unix.file_descr + val read : t -> Bytes.t -> int -> int -> int + val write : t -> Bytes.t -> int -> int -> int + val ba_write : t -> bigarray -> int -> int -> int +end + +module Unix_transport : Transport_t with type t = Unix.file_descr = struct + type t = Unix.file_descr + + type bigarray = + (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t + + let sock s = s + let read = Unix.read + let write = Unix.write + + external ba_write : t -> bigarray -> int -> int -> int + = "ocaml_duppy_write_ba" +end + +module type Io_t = sig + type socket + type marker = Length of int | Split of string + + type bigarray = + (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t + + type failure = + | Io_error + | Unix of Unix.error * string * string + | Unknown of exn + | Timeout + + val read : + ?recursive:bool -> + ?init:string -> + ?on_error:(string * failure -> unit) -> + ?timeout:float -> + priority:'a -> + 'a scheduler -> + socket -> + marker -> + (string * string option -> unit) -> + unit + + val write : + ?exec:(unit -> unit) -> + ?on_error:(failure -> unit) -> + ?bigarray:bigarray -> + ?offset:int -> + ?length:int -> + ?string:Bytes.t -> + ?timeout:float -> + priority:'a -> + 'a scheduler -> + socket -> + unit +end + +module MakeIo (Transport : Transport_t) : Io_t with type socket = Transport.t = +struct + type socket = Transport.t + type marker = Length of int | Split of string + + type failure = + | Io_error + | Unix of Unix.error * string * string + | Unknown of exn + | Timeout + + exception Io + exception Timeout_exc + + type bigarray = + (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t + + let read ?(recursive = false) ?(init = "") ?(on_error = fun _ -> ()) ?timeout + ~priority (scheduler : 'a scheduler) socket marker exec = + let length = 1024 in + let b = Buffer.create length in + let buf = Bytes.make length ' ' in + Buffer.add_string b init; + let unix_socket = Transport.sock socket in + let events, check_timeout = + match timeout with + | None -> ([`Read unix_socket], fun _ -> false) + | Some f -> ([`Read unix_socket; `Delay f], List.mem (`Delay f)) + in + let rec f l = + if check_timeout l then raise Timeout_exc; + if List.mem (`Read unix_socket) l then begin + let input = Transport.read socket buf 0 length in + if input <= 0 then raise Io; + Buffer.add_subbytes b buf 0 input + end; + let ret = + match marker with + | Split r -> + let rex = Pcre.regexp r in + let acc = Buffer.contents b in + let ret = Pcre.full_split ~max:2 ~rex acc in + let rec p l = + match l with + | Pcre.Text x :: Pcre.Delim _ :: l -> + let f b x = + match x with + | Pcre.Text s | Pcre.Delim s -> Buffer.add_string b s + | _ -> () + in + if recursive then begin + Buffer.reset b; + List.iter (f b) l; + Some (x, None) + end + else begin + let b = Buffer.create 10 in + List.iter (f b) l; + Some (x, Some (Buffer.contents b)) + end + | _ :: l' -> p l' + | [] -> None + in + p ret + | Length n when n <= Buffer.length b -> + let s = Buffer.sub b 0 n in + let rem = Buffer.sub b n (Buffer.length b - n) in + if recursive then begin + Buffer.reset b; + Buffer.add_string b rem; + Some (s, None) + end + else Some (s, Some rem) + | _ -> None + in + (* Catch all exceptions.. *) + let f x = + try f x with + | Io -> + on_error (Buffer.contents b, Io_error); + [] + | Timeout_exc -> + on_error (Buffer.contents b, Timeout); + [] + | Unix.Unix_error (x, y, z) -> + on_error (Buffer.contents b, Unix (x, y, z)); + [] + | e -> + on_error (Buffer.contents b, Unknown e); + [] + in + match ret with + | Some x -> ( + match x with + | s, Some _ when recursive -> + exec (s, None); + [{ priority; events; handler = f }] + | _ -> + exec x; + [] ) + | None -> [{ priority; events; handler = f }] + in + (* Catch all exceptions.. *) + let f x = + try f x with + | Io -> + on_error (Buffer.contents b, Io_error); + [] + | Timeout_exc -> + on_error (Buffer.contents b, Timeout); + [] + | Unix.Unix_error (x, y, z) -> + on_error (Buffer.contents b, Unix (x, y, z)); + [] + | e -> + on_error (Buffer.contents b, Unknown e); + [] + in + (* First one is without read, + * in case init contains the wanted match. + * Unless the user sets timeout to 0., this + * should not interfer with user-defined timeout.. *) + let task = + { priority; events = [`Delay 0.; `Read unix_socket]; handler = f } + in + add scheduler task + + let write ?(exec = fun () -> ()) ?(on_error = fun _ -> ()) ?bigarray + ?(offset = 0) ?length ?string ?timeout ~priority + (scheduler : 'a scheduler) socket = + let length, write = + match (string, bigarray) with + | Some s, _ -> + let length = + match length with Some length -> length | None -> Bytes.length s + in + (length, Transport.write socket s) + | None, Some b -> + let length = + match length with + | Some length -> length + | None -> Bigarray.Array1.dim b + in + (length, Transport.ba_write socket b) + | _ -> (0, fun _ _ -> 0) + in + let unix_socket = Transport.sock (socket : Transport.t) in + let exec () = + if Sys.os_type = "Win32" then Unix.clear_nonblock unix_socket; + exec () + in + let events, check_timeout = + match timeout with + | None -> ([`Write unix_socket], fun _ -> false) + | Some f -> ([`Write unix_socket; `Delay f], List.mem (`Delay f)) + in + let rec f pos l = + try + if check_timeout l then raise Timeout_exc; + assert (List.exists (( = ) (`Write unix_socket)) l); + let len = length - pos in + let n = write pos len in + if n <= 0 then ( + on_error Io_error; + [] ) + else if n < len then + [{ priority; events = [`Write unix_socket]; handler = f (pos + n) }] + else ( + exec (); + [] ) + with + | Unix.Unix_error (Unix.EWOULDBLOCK, _, _) when Sys.os_type = "Win32" -> + [{ priority; events = [`Write unix_socket]; handler = f pos }] + | Timeout_exc -> + on_error Timeout; + [] + | Unix.Unix_error (x, y, z) -> + on_error (Unix (x, y, z)); + [] + | e -> + on_error (Unknown e); + [] + in + let task = { priority; events; handler = f offset } in + if length > 0 then + (* Win32 is particularly bad with writting on sockets. It is nearly impossible + * to write proper non-blocking code. send will block on blocking sockets if + * there isn't enough data available instead of returning a partial buffer + * and WSAEventSelect will not return if the socket still has available space. + * Thus, setting the socket to non-blocking and writting as much as we can. *) + if Sys.os_type = "Win32" then begin + Unix.set_nonblock unix_socket; + List.iter (add scheduler) (f offset [`Write unix_socket]) + end + else add scheduler task + else exec () +end + +module Io : Io_t with type socket = Unix.file_descr = MakeIo (Unix_transport) + +(** A monad for implicit continuations or responses *) +module Monad = struct + type ('a, 'b) handler = { return : 'a -> unit; raise : 'b -> unit } + type ('a, 'b) t = ('a, 'b) handler -> unit + + let return x h = h.return x + let raise x h = h.raise x + + let bind f g h = + let ret x = + let process = g x in + process h + in + f { return = ret; raise = h.raise } + + let ( >>= ) = bind + let run ~return:ret ~raise f = f { return = ret; raise } + + let catch f g h = + let raise x = + let process = g x in + process h + in + f { return = h.return; raise } + + let ( =<< ) x y = catch y x + + let rec fold_left f a = function + | [] -> a + | b :: l -> fold_left f (bind a (fun a -> f a b)) l + + let fold_left f a l = fold_left f (return a) l + let iter f l = fold_left (fun () b -> f b) () l + + module Mutex_o = Mutex + + module Mutex = struct + module type Mutex_control = sig + type priority + + val scheduler : priority scheduler + val priority : priority + end + + module type Mutex_t = sig + (** Type for a mutex. *) + type mutex + + module Control : Mutex_control + + (** [create ()] creates a mutex. Implementation-wise, + * a duppy task is created that will be used to select a + * waiting computation, lock the mutex on it and resume it. + * Thus, [priority] and [s] represents, resp., the priority + * and scheduler used when running calling process' computation. *) + val create : unit -> mutex + + (** A computation that locks a mutex + * and returns [unit] afterwards. Computation + * will be blocked until the mutex is sucessfuly locked. *) + val lock : mutex -> (unit, 'a) t + + (** A computation that tries to lock a mutex. + * Returns immediatly [true] if the mutex was sucesfully locked + * or [false] otherwise. *) + val try_lock : mutex -> (bool, 'a) t + + (** A computation that unlocks a mutex. + * Should return immediatly. *) + val unlock : mutex -> (unit, 'a) t + end + + module Factory (Control : Mutex_control) = struct + (* A mutex is either locked or not + * and has a list of tasks waiting to get + * it. *) + type mutex = { + mutable locked : bool; + mutable tasks : (unit -> unit) list; + } + + module Control = Control + + let tmp = Bytes.create 1024 + let x, y = Unix.pipe () + let stop = ref false + let wake_up () = ignore (Unix.write y (Bytes.of_string " ") 0 1) + let ctl_m = Mutex_o.create () + + let finalise _ = + stop := true; + wake_up () + + let mutexes = Queue.create () + let () = Gc.finalise finalise mutexes + + let register () = + let m = { locked = false; tasks = [] } in + Queue.push m mutexes; + m + + let cleanup m = + Mutex_o.lock ctl_m; + let q = Queue.create () in + Queue.iter (fun m' -> if m <> m' then Queue.push m q) mutexes; + Queue.clear mutexes; + Queue.transfer q mutexes; + Mutex_o.unlock ctl_m + + let task f = + { + Task.priority = Control.priority; + events = [`Delay 0.]; + handler = + (fun _ -> + f (); + []); + } + + (* This should only be called when [ctl_m] is locked. *) + let process_mutex tasks m = + if not m.locked then ( + (* I don't think shuffling tasks + * matters here.. *) + match m.tasks with + | x :: l -> + m.tasks <- l; + m.locked <- true; + task x :: tasks + | _ -> tasks ) + else tasks + + let rec handler _ = + Mutex_o.lock ctl_m; + if not !stop then begin + let tasks = Queue.fold process_mutex [] mutexes in + Mutex_o.unlock ctl_m; + ignore (Unix.read x tmp 0 1024); + { Task.priority = Control.priority; events = [`Read x]; handler } + :: tasks + end + else begin + Mutex_o.unlock ctl_m; + try + Unix.close x; + Unix.close y; + [] + with _ -> [] + end + + let () = + Task.add Control.scheduler + { Task.priority = Control.priority; events = [`Read x]; handler } + + let create () = + Mutex_o.lock ctl_m; + let ret = register () in + Mutex_o.unlock ctl_m; + Gc.finalise cleanup ret; + ret + + let lock m h' = + Mutex_o.lock ctl_m; + if not m.locked then begin + m.locked <- true; + Mutex_o.unlock ctl_m; + h'.return () + end + else begin + m.tasks <- h'.return :: m.tasks; + Mutex_o.unlock ctl_m + end + + let try_lock m h' = + Mutex_o.lock ctl_m; + if not m.locked then begin + m.locked <- true; + Mutex_o.unlock ctl_m; + h'.return true + end + else begin + Mutex_o.unlock ctl_m; + h'.return false + end + + let unlock m h' = + Mutex_o.lock ctl_m; + (* Here we allow inter-thread + * and double unlock.. Double unlock + * is not necessarily a problem and + * inter-thread unlock well.. what is + * a thread here ?? :-) *) + m.locked <- false; + let wake = m.tasks <> [] in + Mutex_o.unlock ctl_m; + if wake then wake_up (); + h'.return () + end + end + + module Condition = struct + module Factory (Mutex : Mutex.Mutex_t) = struct + type condition = { + condition_m : Mutex_o.t; + waiting : (unit -> unit) Queue.t; + } + + module Control = Mutex.Control + + let create () = + { condition_m = Mutex_o.create (); waiting = Queue.create () } + + (* Mutex.unlock m needs to happen _after_ + * the task has been registered. *) + let wait c m h = + let proc () = Mutex.lock m h in + Mutex_o.lock c.condition_m; + Queue.push proc c.waiting; + Mutex_o.unlock c.condition_m; + (* Mutex.unlock does not raise exceptions (for now..) *) + let h' = { return = (fun () -> ()); raise = (fun _ -> assert false) } in + Mutex.unlock m h' + + let wake_up h = + let handler _ = + h (); + [] + in + Task.add Control.scheduler + { Task.priority = Control.priority; events = [`Delay 0.]; handler } + + let signal c h = + Mutex_o.lock c.condition_m; + let h' = Queue.pop c.waiting in + Mutex_o.unlock c.condition_m; + wake_up h'; + h.return () + + let broadcast c h = + let q = Queue.create () in + Mutex_o.lock c.condition_m; + Queue.transfer c.waiting q; + Mutex_o.unlock c.condition_m; + Queue.iter wake_up q; + h.return () + end + end + + module type Monad_io_t = sig + type socket + + module Io : Io_t with type socket = socket + + type ('a, 'b) handler = { + scheduler : 'a scheduler; + socket : Io.socket; + mutable data : string; + on_error : Io.failure -> 'b; + } + + val exec : + ?delay:float -> + priority:'a -> + ('a, 'b) handler -> + ('c, 'b) t -> + ('c, 'b) t + + val delay : priority:'a -> ('a, 'b) handler -> float -> (unit, 'b) t + + val read : + ?timeout:float -> + priority:'a -> + marker:Io.marker -> + ('a, 'b) handler -> + (string, 'b) t + + val read_all : + ?timeout:float -> + priority:'a -> + 'a scheduler -> + Io.socket -> + (string, string * Io.failure) t + + val write : + ?timeout:float -> + priority:'a -> + ('a, 'b) handler -> + ?offset:int -> + ?length:int -> + Bytes.t -> + (unit, 'b) t + + val write_bigarray : + ?timeout:float -> + priority:'a -> + ('a, 'b) handler -> + Io.bigarray -> + (unit, 'b) t + end + + module MakeIo (Io : Io_t) = struct + type socket = Io.socket + + module Io = Io + + type ('a, 'b) handler = { + scheduler : 'a scheduler; + socket : Io.socket; + mutable data : string; + on_error : Io.failure -> 'b; + } + + let exec ?(delay = 0.) ~priority h f h' = + let handler _ = + begin + try f h' with e -> h'.raise (h.on_error (Io.Unknown e)) + end; + [] + in + Task.add h.scheduler { Task.priority; events = [`Delay delay]; handler } + + let delay ~priority h delay = exec ~delay ~priority h (return ()) + + let read ?timeout ~priority ~marker h h' = + let process x = + let s = + match x with + | s, None -> + h.data <- ""; + s + | s, Some s' -> + h.data <- s'; + s + in + h'.return s + in + let init = h.data in + h.data <- ""; + let on_error (s, x) = + h.data <- s; + h'.raise (h.on_error x) + in + Io.read ?timeout ~priority ~init ~recursive:false ~on_error h.scheduler + h.socket marker process + + let read_all ?timeout ~priority s sock = + let handler = + { scheduler = s; socket = sock; data = ""; on_error = (fun e -> e) } + in + let buf = Buffer.create 1024 in + let rec f () = + let data = read ?timeout ~priority ~marker:(Io.Length 1024) handler in + let process data = + Buffer.add_string buf data; + f () + in + data >>= process + in + let catch_ret e = + Buffer.add_string buf handler.data; + match e with + | Io.Io_error -> return (Buffer.contents buf) + | e -> raise (Buffer.contents buf, e) + in + catch (f ()) catch_ret + + let write ?timeout ~priority h ?offset ?length s h' = + let on_error x = h'.raise (h.on_error x) in + let exec () = h'.return () in + Io.write ?timeout ~priority ~on_error ~exec ?offset ?length ~string:s + h.scheduler h.socket + + let write_bigarray ?timeout ~priority h ba h' = + let on_error x = h'.raise (h.on_error x) in + let exec () = h'.return () in + Io.write ?timeout ~priority ~on_error ~exec ~bigarray:ba h.scheduler + h.socket + end + + module Io = MakeIo (Io) +end diff --git a/src/duppy.mli b/src/duppy.mli new file mode 100644 index 0000000..be0a4d6 --- /dev/null +++ b/src/duppy.mli @@ -0,0 +1,549 @@ +(***************************************************************************** + + Duppy, a task scheduler for OCaml. + Copyright 2003-2010 Savonet team + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details, fully stated in the COPYING + file at the root of the liquidsoap distribution. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + *****************************************************************************) + +(** Advanced scheduler and monad for server-oriented programming. *) + +(** + * {R {i {v + * The bars could not hold me; + * Force could not control me now. + * They try to keep me down, yeah! + * But Jah put I around. + * (...) + * Let me tell you this - + * I'm a duppy conqueror ! + * v} } } + * {R {b Lee "Scratch" Perry & Bob Marley - Duppy conqueror }} + * + * {2 Duppy task scheduler for OCaml.} + * + * {!Duppy} is a task scheduler for ocaml. It implements a wrapper + * around [Unix.select]. + * + * Using {!Duppy.Task}, the programmer can easily submit tasks that need to wait + * on a socket even, or for a given timeout (possibly zero). + * + * With {!Duppy.Async}, one can use a scheduler to submit asynchronous tasks. + * + * {!Duppy.Io} implements recursive easy reading and writing to a [Unix.file_descr] + * + * Finally, {!Duppy.Monad} and {!Duppy.Monad.Io} provide a monadic interface to + * program server code that with an implicit return/reply execution flow. + * + * The scheduler can use several queues running concurently, each queue + * processing ready tasks. Of course, a queue should run in its own thread.*) + +(** A scheduler is a device for processing tasks. Several queues might run in + * different threads, processing one scheduler's tasks. + * + * ['a] is the type of objects used for priorities. *) +type 'a scheduler + +(** Initiate a new scheduler + * @param compare the comparison function used to sort tasks according to priorities. + * Works as in [List.sort] *) +val create : ?compare:('a -> 'a -> int) -> unit -> 'a scheduler + +(** [queue ~log ~priorities s name] + * starts a queue, on the scheduler [s] only processing priorities [p] + * for which [priorities p] returns [true]. + * + * Several queues can be run concurrently against [s]. + * @param log Logging function. Default: [Printf.printf "queue %s: %s\n" name] + * @param priorities Predicate specifying which priority to process. Default: [fun _ -> _ -> true] + * + * An exception is raised from this call when duppy's event loops has + * crashed. This exception should be considered a MAJOR FAILURE. All current + * non-ready tasks registered for the calling scheduler are dropped. You may + * restart Duppy's queues after it is raised but it should only be used to terminate + * the process diligently!! *) +val queue : + ?log:(string -> unit) -> + ?priorities:('a -> bool) -> + 'a scheduler -> + string -> + unit + +(** Stop all queues running on that scheduler and wait for them to return. *) +val stop : 'a scheduler -> unit + +(** Core task registration. + * + * A task will be a set of events to watch, and a corresponding function to + * execute when one of the events is trigered. + * + * The executed function may then return a list of new tasks to schedule. *) +module Task : sig + (** A task is a list of events awaited, + * and a function to process events that have occured. + * + * The ['a] parameter is the type of priorities, ['b] will be a subset of possible + * events. *) + type ('a, 'b) task = { + priority : 'a; + events : 'b list; + handler : 'b list -> ('a, 'b) task list; + } + + (** Type for possible events. + * + * Please not that currently, under win32, all socket used in ocaml-duppy + * are expected to be in blocking mode only! *) + type event = + [ `Delay of float + | `Write of Unix.file_descr + | `Read of Unix.file_descr + | `Exception of Unix.file_descr ] + + (** Schedule a task. *) + val add : 'a scheduler -> ('a, [< event ]) task -> unit +end + +(** Asynchronous task module + * + * This module implements an asychronous API to {!Duppy.scheduler} + * It allows to create a task that will run and then go to sleep. *) +module Async : sig + type t + + (** Exception raised when trying to wake_up a task + * that has been previously stopped *) + exception Stopped + + (** [add ~priority s f] creates an asynchronous task in [s] with + * priority [priority]. + * + * The task executes the function [f]. + * If the task returns a positive float, the function will be executed + * again after this delay. Otherwise it goes to sleep, and + * you can use [wake_up] to resume the task and execute [f] again. + * Only a single call to [f] is done at each time. + * Multiple [wake_up] while previous task has not + * finished will result in sequentialized calls to [f]. *) + val add : priority:'a -> 'a scheduler -> (unit -> float) -> t + + (** Wake up an asynchronous task. + * Raises [Stopped] if the task has been stopped. *) + val wake_up : t -> unit + + (** Stop and remove the asynchronous task. Doesn't quit a running task. + * Raises [Stopped] if the task has been stopped. *) + val stop : t -> unit +end + +(** Module type for Io functor. *) +module type Transport_t = sig + type t + + type bigarray = + (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t + + val sock : t -> Unix.file_descr + val read : t -> Bytes.t -> int -> int -> int + val write : t -> Bytes.t -> int -> int -> int + val ba_write : t -> bigarray -> int -> int -> int +end + +(** Easy parsing of [Unix.file_descr]. + * + * With {!Duppy.Io.read}, you can pass a file descriptor to the scheduler, + * along with a marker, and have it run the associated function when the + * marker is found. + * + * With {!Duppy.Io.write}, the schdeduler will try to write recursively to the file descriptor + * the given string. *) +module type Io_t = sig + type socket + + (** Type for markers. + * + * [Split s] recognizes all regexp allowed by the + * [Pcre] module. *) + type marker = Length of int | Split of string + + (** Type of [Bigarray] used here. *) + type bigarray = + (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t + + (** Different types of failure. + * + * [Io_error] is raised when reading or writing + * returned 0. This usually means that the socket + * was closed. *) + type failure = + | Io_error + | Unix of Unix.error * string * string + | Unknown of exn + | Timeout + + (** Wrapper to perform a read on a socket and trigger a function when + * a marker has been detected, or enough data has been read. + * It reads recursively on a socket, splitting into strings seperated + * by the marker (if any) and calls the given function on the list of strings. + * + * Can be used recursively or not, depending on the way you process strings. + * Because of Unix's semantic, it is not possible to stop reading + * at first marker, so there can be a remaining string. If not used + * recursively, the second optional argument may contain a remaining + * string. You should then initiate the next read with this value. + * + * The [on_error] function is used when reading failed on the socket. + * Depending on your usage, it can be a hard failure, or simply a lost client. + * The string passed to [on_error] contains data read before error + * occured. + * @param recursive recursively read and process, default: [true] + * @param init initial string for reading, default: [""] + * @param on_error function used when read failed, default: [fun _ -> ()] + * @param timeout Terminate with [Timeout] failure if nothing has been read + * after the given amout of time in seconds. More precisely, + * the exception is raised when no character have been read + * and the socket was not close while waiting. Default: wait + * forever. *) + val read : + ?recursive:bool -> + ?init:string -> + ?on_error:(string * failure -> unit) -> + ?timeout:float -> + priority:'a -> + 'a scheduler -> + socket -> + marker -> + (string * string option -> unit) -> + unit + + (** Similar to [read] but less complex. + * [write ?exec ?on_error ?string ?bigarray ~priority scheduler socket] + * write data from [string], or from [bigarray] if no string is given, + * to [socket], and executes [exec] or [on_error] if errors occured. + * + * Caveat: on Win32, all file descriptors are expected to be in blocking + * mode before being passed to this call due to limitations in the emulation + * of the unix/posix API. See code comments for more details. + * + * @param exec function to execute after writing, default: [fun () -> ()] + * @param on_error function to execute when an error occured, default: [fun _ -> ()] + * @param string write data from this string + * @param bigarray write data from this bigarray, if no [string] is given + * @param timeout Terminate with [Timeout] failure if nothing has been written + * after the given amout of time in seconds. More precisely, + * the exception is raised when no character have been written + * and the socket was not close while waiting. Default: wait + * forever. *) + val write : + ?exec:(unit -> unit) -> + ?on_error:(failure -> unit) -> + ?bigarray:bigarray -> + ?offset:int -> + ?length:int -> + ?string:Bytes.t -> + ?timeout:float -> + priority:'a -> + 'a scheduler -> + socket -> + unit +end + +module MakeIo (Transport : Transport_t) : Io_t with type socket = Transport.t +module Io : Io_t with type socket = Unix.file_descr + +(** Monadic interface to {!Duppy.Io}. + * + * This module can be used to write code + * that runs in various Duppy's tasks and + * raise values in a completely transparent way. + * + * You can see examples of its use + * in the [examples/] directory of the + * source code and in the files + * [src/tools/{harbor.camlp4,server.camlp4}] + * in liquidsoap's code. + * + * When a server communicates + * with a client, it performs several + * computations and, eventually, terminates. + * A computation can either return a new + * value or terminate. For instance: + * + * - Client connects. + * - Server tries to authenticate the client. + * - If authentication is ok, proceed with the next step. + * - Otherwise terminate. + * + * The purpose of the monad is to embed + * computations which can either return + * a new value or raise a value that is used + * to terminate. *) +module Monad : sig + (** Type representing a computation + * which returns a value of type ['a] + * or raises a value of type ['b] *) + type ('a, 'b) t + + (** [return x] create a computation that + * returns value [x]. *) + val return : 'a -> ('a, 'b) t + + (** [raise x] create a computation that raises + * value [x]. *) + val raise : 'b -> ('a, 'b) t + + (** Compose two computations. + * [bind f g] is equivalent to: + * [let x = f in g x] where [x] + * has f's return type. *) + val bind : ('a, 'b) t -> ('a -> ('c, 'b) t) -> ('c, 'b) t + + (** [>>=] is an alternative notation + * for [bind] *) + val ( >>= ) : ('a, 'b) t -> ('a -> ('c, 'b) t) -> ('c, 'b) t + + (** [run f ~return ~raise ()] executes [f] and process + * returned values with [return] or raised values + * with [raise]. *) + val run : return:('a -> unit) -> raise:('b -> unit) -> ('a, 'b) t -> unit + + (** [catch f g] redirects values [x] raised during + * [f]'s execution to [g]. The name suggests the + * usual [try .. with ..] exception catching. *) + val catch : ('a, 'b) t -> ('b -> ('a, 'c) t) -> ('a, 'c) t + + (** [=<<] is an alternative notation for catch. *) + val ( =<< ) : ('b -> ('a, 'c) t) -> ('a, 'b) t -> ('a, 'c) t + + (** [fold_left f a [b1; b2; ..]] returns computation + * [ (f a b1) >>= (fun a -> f a b2) >>= ...] *) + val fold_left : ('a -> 'b -> ('a, 'c) t) -> 'a -> 'b list -> ('a, 'c) t + + (** [iter f [x1; x2; ..]] returns computation + * [f x1 >>= (fun () -> f x2) >>= ...] *) + val iter : ('a -> (unit, 'b) t) -> 'a list -> (unit, 'b) t + + (** This module implements monadic + * mutex computations. They can be used + * to write blocking code that is compatible + * with duppy's tasks, i.e. [Mutex.lock m] blocks + * the calling computation and not the calling thread. *) + module Mutex : sig + (** Information used to initialize a Mutex module. + * [priority] and [scheduler] are used to initialize a task + * which treat mutexes as well as conditions from the below + * [Condition] module. *) + module type Mutex_control = sig + type priority + + val scheduler : priority scheduler + val priority : priority + end + + module type Mutex_t = sig + (** Type for a mutex. *) + type mutex + + module Control : Mutex_control + + (** [create ()] creates a mutex. *) + val create : unit -> mutex + + (** A computation that locks a mutex + * and returns [unit] afterwards. Computation + * will be blocked until the mutex is sucessfuly locked. *) + val lock : mutex -> (unit, 'a) t + + (** A computation that tries to lock a mutex. + * Returns immediatly [true] if the mutex was sucesfully locked + * or [false] otherwise. *) + val try_lock : mutex -> (bool, 'a) t + + (** A computation that unlocks a mutex. + * Should return immediatly. *) + val unlock : mutex -> (unit, 'a) t + end + + module Factory (Control : Mutex_control) : Mutex_t + end + + (** This module implements monadic + * condition computations. They can be used + * to write waiting code that is compatible + * with duppy's tasks, i.e. [Condition.wait c m] blocks + * the calling computation and not the calling thread + * until [Condition.signal c] or [Condition.broadcast c] has + * been called. *) + module Condition : sig + module Factory (Mutex : Mutex.Mutex_t) : sig + (** Type of a condition, used in [wait] and [broadcast] *) + type condition + + (** Create a condition. Implementation-wise, + * a duppy task is created that will be used to select a + * waiting computation, and resume it. + * Thus, [priority] and [s] represents, resp., the priority + * and scheduler used when running calling process' computation. *) + val create : unit -> condition + + (** [wait h m] is a computation that: + * {ul + * {- Unlock mutex [m]} + * {- Wait until [Condition.signal c] or [Condition.broadcast c] + has been called} + * {- Locks mutex [m]} + * {- Returns [unit]}} *) + val wait : condition -> Mutex.mutex -> (unit, 'a) t + + (** [broadcast c] is a computation that + * resumes all computations waiting on [c]. It should + * return immediately. *) + val broadcast : condition -> (unit, 'a) t + + (** [signal c] is a computation that resumes one + * computation waiting on [c]. It should return + * immediately. *) + val signal : condition -> (unit, 'a) t + end + end + + (** This module implements monadic computations + * using [Duppy.Io]. It can be used to create + * computations that read or write from a socket, + * and also to redirect a computation in a different + * queue with a new priority. *) + module type Monad_io_t = sig + type socket + + module Io : Io_t with type socket = socket + + (** {2 Type } *) + + (** A handler for this module + * is a record that contains the + * required elements. In particular, + * [on_error] is a function that transforms + * an error raised by [Duppy.Io] to a reply + * used to terminate the computation. + * [data] is an internal data buffer. It should + * be initialized with [""]. It contains the + * remaining data that was received when + * using [read]. If an error occured, + * [data] contain data read before the + * error. *) + type ('a, 'b) handler = { + scheduler : 'a scheduler; + socket : Io.socket; + mutable data : string; + on_error : Io.failure -> 'b; + } + + (** {2 Execution flow } *) + + (** [exec ?delay ~priority h f] redirects computation + * [f] into a new queue with priority [priority] and + * delay [delay] ([0.] by default). + * It can be used to redirect a computation that + * has to run under a different priority. For instance, + * a computation that reads from a socket is generally + * not blocking because the function is executed + * only when some data is available for reading. + * However, if the data that is read needs to be processed + * by a computation that can be blocking, then one may + * use [exec] to redirect this computation into an + * appropriate queue. *) + val exec : + ?delay:float -> + priority:'a -> + ('a, 'b) handler -> + ('c, 'b) t -> + ('c, 'b) t + + (** [delay ~priority h d] creates a computation that returns + * [unit] after delay [d] in seconds. *) + val delay : priority:'a -> ('a, 'b) handler -> float -> (unit, 'b) t + + (** {2 Read/write } *) + + (** [read ?timeout ~priority ~marker h] creates a + * computation that reads from [h.socket] + * and returns the first string split + * according to [marker]. This function + * can be used to create a computation that + * reads data from a socket. [timeout] parameter + * forces the computation to return an error if + * nothing has been read for more than [timeout] + * seconds. Default: wait forever. *) + val read : + ?timeout:float -> + priority:'a -> + marker:Io.marker -> + ('a, 'b) handler -> + (string, 'b) t + + (** [read_all ?timeout ~priority s sock] creates a + * computation that reads all data from [sock] + * and returns it. Raised value contains data + * read before an error occured. *) + val read_all : + ?timeout:float -> + priority:'a -> + 'a scheduler -> + Io.socket -> + (string, string * Io.failure) t + + (** [write ?timeout ~priority h s] creates a computation + * that writes string [s] to [h.socket]. This + * function can be used to create a computation + * that sends data to a socket. [timeout] parameter + * forces the computation to return an error if + * nothing has been written for more than [timeout] + * seconds. Default: wait forever. *) + val write : + ?timeout:float -> + priority:'a -> + ('a, 'b) handler -> + ?offset:int -> + ?length:int -> + Bytes.t -> + (unit, 'b) t + + (** [write_bigarray ?timeout ~priority h ba] creates a computation + * that writes data from [ba] to [h.socket]. This function + * can to create a computation that writes data to a socket. *) + val write_bigarray : + ?timeout:float -> + priority:'a -> + ('a, 'b) handler -> + Io.bigarray -> + (unit, 'b) t + end + + module MakeIo (Io : Io_t) : + Monad_io_t with type socket = Io.socket and module Io = Io + + module Io : Monad_io_t with type socket = Unix.file_descr and module Io = Io +end + +(** {2 Some culture..} + * {e Duppy is a Caribbean patois word of West African origin meaning ghost or spirit. + * Much of Caribbean folklore revolves around duppies. + * Duppies are generally regarded as malevolent spirits. + * They are said to come out and haunt people at night mostly, + * and people from the islands claim to have seen them. + * The 'Rolling Calf', 'Three footed horse' or 'Old Higue' are examples of the more malicious spirits. } + * {R {{:http://en.wikipedia.org/wiki/Duppy} http://en.wikipedia.org/wiki/Duppy}}*) diff --git a/src/duppy_stubs.c b/src/duppy_stubs.c new file mode 100644 index 0000000..ac70284 --- /dev/null +++ b/src/duppy_stubs.c @@ -0,0 +1,176 @@ +/* + * Copyright 2010 Savonet team + * + * This file is part of Ocaml-duppy. + * + * Ocaml-duppy is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * Ocaml-duppy is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Ocaml-duppy; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + */ + +#include <caml/alloc.h> +#include <caml/signals.h> +#include <caml/unixsupport.h> +#include <caml/memory.h> +#include <caml/bigarray.h> +#include <caml/fail.h> +#include <caml/threads.h> + +#include <errno.h> + +/* On native Windows platforms, many macros are not defined. */ +# if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__ + +#ifndef EWOULDBLOCK +#define EWOULDBLOCK EAGAIN +#endif + +#endif + +#ifdef WIN32 +#define Fd_val(fd) win_CRT_fd_of_filedescr(fd) +#define Val_fd(fd) caml_failwith("Val_fd") +#else +#define Fd_val(fd) Int_val(fd) +#define Val_fd(fd) Val_int(fd) +#endif + +#ifndef WIN32 +#include <poll.h> + +CAMLprim value caml_poll(value _read, value _write, value _err, value _timeout) { + CAMLparam3(_read, _write, _err); + CAMLlocal4(_pread, _pwrite, _perr, _ret); + + struct pollfd *fds; + nfds_t nfds = 0; + nfds_t nread = 0; + nfds_t nwrite = 0; + nfds_t nerr = 0; + int timeout; + size_t last = 0; + int n, ret; + + if (Double_val(_timeout) == -1) + timeout = -1; + else + timeout = Double_val(_timeout) * 1000; + + nfds += Wosize_val(_read); + nfds += Wosize_val(_write); + nfds += Wosize_val(_err); + + fds = calloc(nfds,sizeof(struct pollfd)); + if (fds == NULL) caml_raise_out_of_memory(); + + for (n = 0; n < Wosize_val(_read); n++) { + fds[last+n].fd = Fd_val(Field(_read,n)); + fds[last+n].events = POLLIN; + } + last += Wosize_val(_read); + + for (n = 0; n < Wosize_val(_write); n++) { + fds[last+n].fd = Fd_val(Field(_write,n)); + fds[last+n].events = POLLOUT; + } + last += Wosize_val(_write); + + for (n = 0; n < Wosize_val(_err); n++) { + fds[last+n].fd = Fd_val(Field(_err,n)); + fds[last+n].events = POLLERR; + } + + caml_release_runtime_system(); + ret = poll(fds, nfds, timeout); + caml_acquire_runtime_system(); + + if (ret == -1) { + free(fds); + uerror("poll",Nothing); + } + + for (n = 0; n < nfds; n++) { + if (fds[n].revents & POLLIN) + nread++; + if (fds[n].revents & POLLOUT) + nwrite++; + if (fds[n].revents & POLLERR) + nerr++; + } + + _pread = caml_alloc_tuple(nread); + nread = 0; + + _pwrite = caml_alloc_tuple(nwrite); + nwrite = 0; + + _perr = caml_alloc_tuple(nerr); + nerr = 0; + + for (n = 0; n < nfds; n++) { + if (fds[n].revents & POLLIN) { + Store_field(_pread, nread, Val_fd(fds[n].fd)); + nread++; + } + if (fds[n].revents & POLLOUT) { + Store_field(_pwrite, nwrite, Val_fd(fds[n].fd)); + nwrite++; + } + if (fds[n].revents & POLLERR) { + Store_field(_pread, nerr, Val_fd(fds[n].fd)); + nerr++; + } + } + + free(fds); + + _ret = caml_alloc_tuple(3); + Store_field(_ret, 0, _pread); + Store_field(_ret, 1, _pwrite); + Store_field(_ret, 2, _perr); + + CAMLreturn(_ret); +} +#else +CAMLprim value caml_poll(value _read, value _write, value _err, value _timeout) { + caml_failwith("caml_poll"); +} +#endif + +CAMLprim value ocaml_duppy_write_ba(value _fd, value ba, value _ofs, value _len) +{ + CAMLparam2(ba,_fd) ; + int fd = Fd_val(_fd); + long ofs = Long_val(_ofs); + long len = Long_val(_len); + void *buf = Caml_ba_data_val(ba); + int ret; + + int written = 0; + while (len > 0) { + caml_enter_blocking_section(); + ret = write(fd, buf+ofs, len); + caml_leave_blocking_section(); + if (ret == -1) { + if ((errno == EAGAIN || errno == EWOULDBLOCK) && written > 0) break; + uerror("write", Nothing); + } + written += ret; + ofs += ret; + len -= ret; + } + + CAMLreturn(Val_long(written)); +} + |