diff options
author | Colin Watson <cjwatson@debian.org> | 2007-08-28 11:06:10 +0100 |
---|---|---|
committer | Colin Watson <cjwatson@debian.org> | 2007-08-28 11:06:10 +0100 |
commit | 14f72ab95d51d5cc6cf89b4a4517d17af9e9f780 (patch) | |
tree | 559e52d9147c561fb7fb4e877d17f3fc80433d5f /lib | |
parent | 151dbea65dba7b8c079126e80efc9b0151002e72 (diff) |
* lib/pipeline.h (struct command): Add support for commands that
consist of calling a function rather than executing a process.
(struct pipeline): Add want_infile and want_outfile members. Note
that infile and outfile default to NULL. Add source, buffer,
buflen, bufmax, line_cache, and peek_offset members.
(command_new_function, command_dump, command_tostring,
pipeline_connect, pipeline_pump, pipeline_read, pipeline_peek,
pipeline_peek_size, pipeline_peek_skip, pipeline_readline,
pipeline_peekline): New prototypes.
(pipeline_join): Update description for want_infile and
want_outfile.
* lib/pipeline.c (command_new, command_dup, command_arg,
command_argv, command_args, command_argstr, command_free): Update
for 'struct command' changes.
(command_new_function, command_dump, command_tostring): New
functions.
(pipeline_new, pipeline_join, pipeline_dump, pipeline_tostring):
Update for 'struct pipeline' changes.
(pipeline_dump): Use command_dump.
(pipeline_tostring): Use command_tostring.
(pipeline_start): Implement want_infile, want_outfile, and
function commands. Make zero-command case work properly (read
directly from input file).
(pipeline_connect, pipeline_pump, get_block, pipeline_read,
pipeline_peek, pipeline_peek_size, pipeline_peek_skip, get_line,
pipeline_readline, pipeline_peekline): New functions.
* lib/decompress.c, lib/decompress.h: New files, implementing a
decompression abstraction layer.
* lib/Makefile.in: Always compile decompress.c.
* src/lexgrog.l (YY_INPUT): Define to read from a 'struct pipeline'.
(find_name): Use decompress_open/decompress_fdopen rather than
older decompression methods.
* src/check_mandirs.c (test_manfile): Remove decompression code;
find_name will handle this itself now.
* src/compression.c (create_ztemp, decompress, remove_ztemp,
get_ztemp): Remove.
* src/man.c (checked_popen): Remove.
(local_man_loop): Remove decompression code.
(get_preprocessors_from_file): Convert to peeking the first line
from a pipeline.
(get_preprocessors): Adjust arguments and
get_preprocessors_from_file call.
(remove_stdintmp, create_stdintmp): Remove.
(make_roff_command): Remove special handling of stdin, now handled
by peeking a pipeline.
(open_cat_stream): Don't start the returned pipeline.
(close_cat_stream): Remove unnecessary COMP_CAT special case.
(format_display_and_save): Take a decompressor pipeline argument.
Simplify by using pipeline_connect and pipeline_pump.
(format_display): Take a decompressor pipeline argument. Use
pipeline_connect and pipeline_pump.
(display): Use decompress_open/decompress_fdopen rather than older
decompression methods.
(display_filesystem, display_database): Drop remove_ztemp calls.
* src/straycats.c (check_for_stray): Use decompression layer.
Rearrange control flow a bit to reduce duplication.
* src/ult_src.c (ult_src): Use pipeline and decompression layers
rather than older decompression methods.
* configure.ac: Remove decompressor variable. Check for gzopen in
libz.
* include/manconfig.h.in (DECOMPRESSOR): Remove.
(comp_list): Declare extern here.
(decompress, remove_ztemp, get_ztemp): Remove.
* src/man_db.conf.in (decompressor): Remove.
* docs/TODO: Update.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/Makefile.in | 3 | ||||
-rw-r--r-- | lib/decompress.c | 158 | ||||
-rw-r--r-- | lib/decompress.h | 36 | ||||
-rw-r--r-- | lib/pipeline.c | 744 | ||||
-rw-r--r-- | lib/pipeline.h | 143 |
5 files changed, 1033 insertions, 51 deletions
diff --git a/lib/Makefile.in b/lib/Makefile.in index c6b8bef3..5cce2532 100644 --- a/lib/Makefile.in +++ b/lib/Makefile.in @@ -38,7 +38,7 @@ ALLSRCS = alloca.c basename.c cleanup.c error.c fnmatch.c getopt.c getopt1.c \ unsetenv.c xmalloc.c xstrdup.c xstrndup.c \ waitpid.c tempfile.c mkdtemp.c mkstemp.c tempname.c hashtable.c \ xstrsignal.c xsigaction.c pipeline.c getcwdalloc.c pathsearch.c \ - linelength.c + linelength.c decompress.c ALLOBJS = $(ALLSRCS:.c=.o) @@ -46,6 +46,7 @@ XOBJS = xstrdup.o xstrndup.o xmalloc.o error.o xstrsignal.o xsigaction.o ALL = @LIBOBJS@ @ALLOCA@ $(libobjects) cleanup.o strappend.o tempfile.o \ hashtable.o pipeline.o getcwdalloc.o pathsearch.o debug.o linelength.o \ + decompress.o \ $(XOBJS) all: $(library) diff --git a/lib/decompress.c b/lib/decompress.c new file mode 100644 index 00000000..8be68a61 --- /dev/null +++ b/lib/decompress.c @@ -0,0 +1,158 @@ +/* + * decompress.c: decompression abstraction layer + * + * Copyright (C) 2007 Colin Watson. + * + * This file is part of man-db. + * + * man-db is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * man-db is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with man-db; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif /* HAVE_CONFIG_H */ + +#if defined(STDC_HEADERS) +# include <string.h> +# include <stdlib.h> +#elif defined(HAVE_STRING_H) +# include <string.h> +#elif defined(HAVE_STRINGS_H) +# include <strings.h> +#else /* no string(s) header */ +#endif /* STDC_HEADERS */ + +#include <stdio.h> +#include <sys/types.h> +#include <sys/stat.h> +#ifdef HAVE_UNISTD_H +# include <unistd.h> +#endif +#include <fcntl.h> + +#ifdef HAVE_LIBZ +# include "zlib.h" +#endif /* HAVE_LIBZ */ + +#include "manconfig.h" +#include "comp_src.h" +#include "pipeline.h" +#include "decompress.h" + +#ifdef HAVE_LIBZ + +static void decompress_zlib (void *data ATTRIBUTE_UNUSED) +{ + gzFile zlibfile; + + zlibfile = gzdopen (dup (fileno (stdin)), "r"); + if (!zlibfile) + return; + + for (;;) { + char buffer[4096]; + int r = gzread (zlibfile, buffer, 4096); + if (r <= 0) + break; + if (fwrite (buffer, 1, (size_t) r, stdout) < (size_t) r) + break; + } + + gzclose (zlibfile); + return; +} + +#endif /* HAVE_LIBZ */ + +pipeline *decompress_open (const char *filename) +{ + command *cmd; + pipeline *p; + struct stat st; +#ifdef HAVE_LIBZ + size_t filename_len; +#endif /* HAVE_LIBZ */ + char *ext; + struct compression *comp; + + if (stat (filename, &st) < 0) + return NULL; + +#ifdef HAVE_LIBZ + filename_len = strlen (filename); + if (filename_len > 3 && STREQ (filename + filename_len - 3, ".gz")) { + /* informational only; no shell quoting concerns */ + char *name = strappend (NULL, "zcat < ", filename, NULL); + cmd = command_new_function (name, &decompress_zlib, NULL); + free (name); + p = pipeline_new_commands (cmd, NULL); + goto got_pipeline; + } +#endif /* HAVE_LIBZ */ + + ext = strrchr (filename, '.'); + if (ext) { + ++ext; + + for (comp = comp_list; comp->ext; ++comp) { + if (!STREQ (comp->ext, ext)) + continue; + + cmd = command_new_argstr (comp->prog); + command_arg (cmd, filename); + p = pipeline_new_commands (cmd, NULL); + goto got_pipeline; + } + } + +#ifdef HAVE_GZIP + /* HP-UX */ + ext = strstr (filename, ".Z/"); + if (ext) { + cmd = command_new_argstr (GUNZIP " -S \"\""); + command_arg (cmd, filename); + p = pipeline_new_commands (cmd, NULL); + goto got_pipeline; + } +#endif + + p = pipeline_new (); + +got_pipeline: + p->want_infile = filename; + p->want_out = -1; + pipeline_start (p); + return p; +} + +pipeline *decompress_fdopen (int fd) +{ + pipeline *p; +#ifdef HAVE_LIBZ + command *cmd; +#endif /* HAVE_LIBZ */ + +#ifdef HAVE_LIBZ + cmd = command_new_function ("zcat", &decompress_zlib, NULL); + p = pipeline_new_commands (cmd, NULL); +#else /* HAVE_LIBZ */ + p = pipeline_new (); +#endif /* HAVE_LIBZ */ + + p->want_in = fd; + p->want_out = -1; + pipeline_start (p); + return p; +} diff --git a/lib/decompress.h b/lib/decompress.h new file mode 100644 index 00000000..5cbabbee --- /dev/null +++ b/lib/decompress.h @@ -0,0 +1,36 @@ +/* + * decompress.h: interface to decompression abstraction layer + * + * Copyright (C) 2007 Colin Watson. + * + * This file is part of man-db. + * + * man-db is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * man-db is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with man-db; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef MAN_DECOMPRESS_H +#define MAN_DECOMPRESS_H + +#include "lib/pipeline.h" + +struct decompress; + +/* Open a decompressor reading from FILENAME. */ +pipeline *decompress_open (const char *filename); + +/* Open a decompressor reading from file descriptor FD. */ +pipeline *decompress_fdopen (int fd); + +#endif /* MAN_DECOMPRESS_H */ diff --git a/lib/pipeline.c b/lib/pipeline.c index 228abccb..42a1f8b0 100644 --- a/lib/pipeline.c +++ b/lib/pipeline.c @@ -1,6 +1,6 @@ /* Copyright (C) 1989, 1990, 1991, 1992, 2000, 2001, 2002, 2003 * Free Software Foundation, Inc. - * Copyright (C) 2003 Colin Watson. + * Copyright (C) 2003, 2004, 2005, 2006, 2007 Colin Watson. * Written for groff by James Clark (jjc@jclark.com) * Heavily adapted and extended for man-db by Colin Watson. * @@ -29,17 +29,20 @@ #include <stdlib.h> #include <signal.h> #include <errno.h> +/* TODO: requires POSIX 1003.1-2001 */ +#include <sys/select.h> +#include <sys/time.h> #include <sys/types.h> #include <fcntl.h> #ifdef HAVE_UNISTD_H # include <unistd.h> #endif +#include <fcntl.h> #include <stdarg.h> #include <assert.h> - -#ifdef HAVE_STRERROR #include <string.h> -#else + +#ifndef HAVE_STRERROR extern char *strerror (); #endif @@ -65,15 +68,20 @@ extern char *strerror (); command *command_new (const char *name) { command *cmd = xmalloc (sizeof *cmd); + struct command_process *cmdp; char *name_copy; + cmd->tag = COMMAND_PROCESS; cmd->name = xstrdup (name); - cmd->argc = 0; - cmd->argv_max = 4; - cmd->argv = xmalloc (cmd->argv_max * sizeof *cmd->argv); cmd->nice = 0; cmd->discard_err = 0; + cmdp = &cmd->u.process; + + cmdp->argc = 0; + cmdp->argv_max = 4; + cmdp->argv = xmalloc (cmdp->argv_max * sizeof *cmdp->argv); + /* argv[0] is the basename of the command name. */ name_copy = xstrdup (name); command_arg (cmd, basename (name_copy)); @@ -243,43 +251,90 @@ command *command_new_argstr (const char *argstr) return cmd; } +command *command_new_function (const char *name, + command_function_type *func, void *data) +{ + command *cmd = xmalloc (sizeof *cmd); + struct command_function *cmdf; + + cmd->tag = COMMAND_FUNCTION; + cmd->name = xstrdup (name); + cmd->nice = 0; + + cmdf = &cmd->u.function; + + cmdf->func = func; + cmdf->data = data; + + return cmd; +} + command *command_dup (command *cmd) { command *newcmd = xmalloc (sizeof *newcmd); int i; + newcmd->tag = cmd->tag; newcmd->name = xstrdup (cmd->name); - newcmd->argc = cmd->argc; - newcmd->argv_max = cmd->argv_max; - assert (newcmd->argc < newcmd->argv_max); - newcmd->argv = xmalloc (newcmd->argv_max * sizeof *newcmd->argv); newcmd->nice = cmd->nice; newcmd->discard_err = cmd->discard_err; - for (i = 0; i < cmd->argc; ++i) - newcmd->argv[i] = xstrdup (cmd->argv[i]); - newcmd->argv[cmd->argc] = NULL; + switch (newcmd->tag) { + case COMMAND_PROCESS: { + struct command_process *cmdp = &cmd->u.process; + struct command_process *newcmdp = &newcmd->u.process; + + newcmdp->argc = cmdp->argc; + newcmdp->argv_max = cmdp->argv_max; + assert (newcmdp->argc < newcmdp->argv_max); + newcmdp->argv = xmalloc + (newcmdp->argv_max * sizeof *newcmdp->argv); + + for (i = 0; i < cmdp->argc; ++i) + newcmdp->argv[i] = xstrdup (cmdp->argv[i]); + newcmdp->argv[cmdp->argc] = NULL; + + break; + } + + case COMMAND_FUNCTION: { + struct command_function *cmdf = &cmd->u.function; + struct command_function *newcmdf = &newcmd->u.function; + + newcmdf->func = cmdf->func; + newcmdf->data = cmdf->data; + + break; + } + } return newcmd; } void command_arg (command *cmd, const char *arg) { - if (cmd->argc + 1 >= cmd->argv_max) { - cmd->argv_max *= 2; - cmd->argv = xrealloc (cmd->argv, - cmd->argv_max * sizeof *cmd->argv); + struct command_process *cmdp; + + assert (cmd->tag == COMMAND_PROCESS); + cmdp = &cmd->u.process; + + if (cmdp->argc + 1 >= cmdp->argv_max) { + cmdp->argv_max *= 2; + cmdp->argv = xrealloc (cmdp->argv, + cmdp->argv_max * sizeof *cmdp->argv); } - cmd->argv[cmd->argc++] = xstrdup (arg); - assert (cmd->argc < cmd->argv_max); - cmd->argv[cmd->argc] = NULL; + cmdp->argv[cmdp->argc++] = xstrdup (arg); + assert (cmdp->argc < cmdp->argv_max); + cmdp->argv[cmdp->argc] = NULL; } void command_argv (command *cmd, va_list argv) { const char *arg = va_arg (argv, const char *); + assert (cmd->tag == COMMAND_PROCESS); + while (arg) { command_arg (cmd, arg); arg = va_arg (argv, const char *); @@ -290,6 +345,8 @@ void command_args (command *cmd, ...) { va_list argv; + assert (cmd->tag == COMMAND_PROCESS); + va_start (argv, cmd); command_argv (cmd, argv); va_end (argv); @@ -299,12 +356,63 @@ void command_argstr (command *cmd, const char *argstr) { char *arg; + assert (cmd->tag == COMMAND_PROCESS); + while ((arg = argstr_get_word (&argstr))) { command_arg (cmd, arg); free (arg); } } +void command_dump (command *cmd, FILE *stream) +{ + switch (cmd->tag) { + case COMMAND_PROCESS: { + struct command_process *cmdp = &cmd->u.process; + int i; + + fputs (cmd->name, stream); + for (i = 1; i < cmdp->argc; ++i) { + /* TODO: escape_shell()? */ + putc (' ', stream); + fputs (cmdp->argv[i], stream); + } + + break; + } + + case COMMAND_FUNCTION: + fputs (cmd->name, stream); + break; + } +} + +char *command_tostring (command *cmd) +{ + char *out = NULL; + + switch (cmd->tag) { + case COMMAND_PROCESS: { + struct command_process *cmdp = &cmd->u.process; + int i; + + out = strappend (out, cmd->name, NULL); + for (i = 1; i < cmdp->argc; ++i) + /* TODO: escape_shell()? */ + out = strappend (out, " ", cmdp->argv[i], + NULL); + + break; + } + + case COMMAND_FUNCTION: + out = xstrdup (cmd->name); + break; + } + + return out; +} + void command_free (command *cmd) { int i; @@ -313,9 +421,22 @@ void command_free (command *cmd) return; free (cmd->name); - for (i = 0; i < cmd->argc; ++i) - free (cmd->argv[i]); - free (cmd->argv); + + switch (cmd->tag) { + case COMMAND_PROCESS: { + struct command_process *cmdp = &cmd->u.process; + + for (i = 0; i < cmdp->argc; ++i) + free (cmdp->argv[i]); + free (cmdp->argv); + + break; + } + + case COMMAND_FUNCTION: + break; + } + free (cmd); } @@ -332,8 +453,14 @@ pipeline *pipeline_new (void) p->pids = NULL; p->statuses = NULL; p->want_in = p->want_out = 0; + p->want_infile = p->want_outfile = NULL; p->infd = p->outfd = -1; p->infile = p->outfile = NULL; + p->source = NULL; + p->buffer = NULL; + p->buflen = p->bufmax = 0; + p->line_cache = NULL; + p->peek_offset = 0; return p; } @@ -373,7 +500,9 @@ pipeline *pipeline_join (pipeline *p1, pipeline *p2) p->pids = NULL; p->statuses = NULL; p->want_in = p1->want_in; + p->want_infile = p1->want_infile; p->want_out = p2->want_out; + p->want_outfile = p2->want_outfile; p->infd = p1->infd; p->outfd = p2->outfd; p->infile = p1->infile; @@ -387,6 +516,31 @@ pipeline *pipeline_join (pipeline *p1, pipeline *p2) return p; } +void pipeline_connect (pipeline *source, pipeline *sink, ...) +{ + va_list argv; + pipeline *arg; + + /* We must be in control of output from the source pipeline. If the + * source isn't started, we can force this. + */ + if (!source->pids) { + source->want_out = -1; + source->want_outfile = NULL; + } + assert (source->want_out < 0); + assert (!source->want_outfile); + + va_start (argv, sink); + for (arg = sink; arg; arg = va_arg (argv, pipeline *)) { + assert (!arg->pids); /* not started */ + arg->source = source; + arg->want_in = -1; + arg->want_infile = NULL; + } + va_end (argv); +} + void pipeline_command (pipeline *p, command *cmd) { if (p->ncommands >= p->commands_max) { @@ -461,33 +615,27 @@ FILE *pipeline_get_outfile (pipeline *p) void pipeline_dump (pipeline *p, FILE *stream) { - int i, j; + int i; for (i = 0; i < p->ncommands; ++i) { - fputs (p->commands[i]->name, stream); - for (j = 1; j < p->commands[i]->argc; ++j) { - /* TODO: escape_shell()? */ - putc (' ', stream); - fputs (p->commands[i]->argv[j], stream); - } + command_dump (p->commands[i], stream); if (i < p->ncommands - 1) fputs (" | ", stream); } - fprintf (stream, " [input: %d, output: %d]\n", - p->want_in, p->want_out); + fprintf (stream, " [input: {%d, %s}, output: {%d, %s}]\n", + p->want_in, p->want_infile ? p->want_infile : "NULL", + p->want_out, p->want_outfile ? p->want_outfile : "NULL"); } char *pipeline_tostring (pipeline *p) { char *out = NULL; - int i, j; + int i; for (i = 0; i < p->ncommands; ++i) { - out = strappend (out, p->commands[i]->name, NULL); - for (j = 1; j < p->commands[i]->argc; ++j) - /* TODO: escape_shell()? */ - out = strappend (out, " ", p->commands[i]->argv[j], - NULL); + char *cmdout = command_tostring (p->commands[i]); + out = strappend (out, cmdout, NULL); + free (cmdout); if (i < p->ncommands - 1) out = strappend (out, " | ", NULL); } @@ -603,6 +751,12 @@ void pipeline_start (pipeline *p) p->infd = infd[1]; } else if (p->want_in > 0) last_input = p->want_in; + else if (p->want_infile) { + last_input = open (p->want_infile, O_RDONLY); + if (last_input < 0) + error (FATAL, errno, _("can't open %s"), + p->want_infile); + } for (i = 0; i < p->ncommands; i++) { int pdes[2]; @@ -616,8 +770,17 @@ void pipeline_start (pipeline *p) p->outfd = pdes[0]; output_read = pdes[0]; output_write = pdes[1]; - } else if (i == p->ncommands - 1 && p->want_out > 0) - output_write = p->want_out; + } else if (i == p->ncommands - 1) { + if (p->want_out > 0) + output_write = p->want_out; + else if (p->want_outfile) { + output_write = open (p->want_outfile, + O_WRONLY); + if (output_write < 0) + error (FATAL, errno, "can't open %s", + p->want_outfile); + } + } /* Block SIGCHLD so that the signal handler doesn't collect * the exit status before we've filled in the pids array. @@ -696,7 +859,26 @@ void pipeline_start (pipeline *p) xsigaction (SIGINT, &osa_sigint, NULL); xsigaction (SIGQUIT, &osa_sigquit, NULL); - execvp (p->commands[i]->name, p->commands[i]->argv); + switch (p->commands[i]->tag) { + case COMMAND_PROCESS: { + struct command_process *cmdp = + &p->commands[i]->u.process; + execvp (p->commands[i]->name, + cmdp->argv); + } + + /* TODO: ideally, could there be a facility + * to execute non-blocking functions without + * needing to fork? + */ + case COMMAND_FUNCTION: { + struct command_function *cmdf = + &p->commands[i]->u.function; + (*cmdf->func) (cmdf->data); + exit (0); + } + } + error (EXEC_FAILED_EXIT_STATUS, errno, _("can't execute %s"), p->commands[i]->name); } @@ -722,6 +904,9 @@ void pipeline_start (pipeline *p) debug ("Started \"%s\", pid %d\n", p->commands[i]->name, pid); } + + if (p->ncommands == 0) + p->outfd = last_input; } static int sigchld = 0; @@ -954,3 +1139,478 @@ void pipeline_install_sigchld (void) if (xsigaction (SIGCHLD, &act, NULL) == -1) error (FATAL, errno, _("can't install SIGCHLD handler")); } + +void pipeline_pump (pipeline *p, ...) +{ + va_list argv; + int argc, i, j; + pipeline *arg, **pieces; + size_t *pos; + int *known_source, *blocking_in, *blocking_out, *waiting, *write_error; + struct sigaction sa, osa_sigpipe; + + /* Count pipelines and allocate space for arrays. */ + va_start (argv, p); + argc = 0; + for (arg = p; arg; arg = va_arg (argv, pipeline *)) + ++argc; + va_end (argv); + pieces = xmalloc (argc * sizeof *pieces); + pos = xmalloc (argc * sizeof *pos); + known_source = xmalloc (argc * sizeof *known_source); + blocking_in = xmalloc (argc * sizeof *blocking_in); + blocking_out = xmalloc (argc * sizeof *blocking_out); + waiting = xmalloc (argc * sizeof *waiting); + write_error = xmalloc (argc * sizeof *write_error); + + /* Set up arrays of pipelines and their read positions. Start all + * pipelines if necessary. + */ + va_start (argv, p); + for (arg = p, i = 0; i < argc; arg = va_arg (argv, pipeline *), ++i) { + pieces[i] = arg; + pos[i] = 0; + if (!pieces[i]->pids) + pipeline_start (pieces[i]); + } + assert (arg == NULL); + va_end (argv); + + /* All source pipelines must be supplied as arguments. */ + memset (known_source, 0, argc * sizeof *known_source); + for (i = 0; i < argc; ++i) { + int found = 0; + if (!pieces[i]->source) + continue; + for (j = 0; j < argc; ++j) { + if (pieces[i]->source == pieces[j]) { + known_source[j] = found = 1; + break; + } + } + assert (found); + } + + memset (blocking_in, 0, argc * sizeof *blocking_in); + memset (blocking_out, 0, argc * sizeof *blocking_out); + for (i = 0; i < argc; ++i) { + int flags; + if (pieces[i]->infd != -1) { + flags = fcntl (pieces[i]->infd, F_GETFL); + if (!(flags & O_NONBLOCK)) { + blocking_in[i] = 1; + fcntl (pieces[i]->infd, F_SETFL, + flags | O_NONBLOCK); + } + } + if (pieces[i]->outfd != -1) { + flags = fcntl (pieces[i]->outfd, F_GETFL); + if (!(flags & O_NONBLOCK)) { + blocking_out[i] = 1; + fcntl (pieces[i]->outfd, F_SETFL, + flags | O_NONBLOCK); + } + } + } + + memset (waiting, 0, argc * sizeof *waiting); + memset (write_error, 0, argc * sizeof *write_error); + +#ifdef SIGPIPE + sa.sa_handler = SIG_IGN; + sigemptyset (&sa.sa_mask); + sa.sa_flags = 0; + xsigaction (SIGPIPE, &sa, &osa_sigpipe); +#endif + +#ifdef SA_RESTART + /* We rely on getting EINTR from select. */ + xsigaction (SIGCHLD, NULL, &sa); + sa.sa_flags &= ~SA_RESTART; + xsigaction (SIGCHLD, &sa, NULL); +#endif + + for (;;) { + fd_set rfds, wfds; + int maxfd = -1; + int ret; + + /* If a source dies and all data from it has been written to + * all sinks, close the writing end of the pipe to each of + * its sinks. + */ + for (i = 0; i < argc; ++i) { + if (!known_source[i] || pieces[i]->outfd != -1 || + pipeline_peek_size (pieces[i])) + continue; + for (j = 0; j < argc; ++j) { + if (pieces[j]->source == pieces[i] && + pieces[j]->infd != -1) { + if (close (pieces[j]->infd)) + error (0, errno, + _("closing pipeline " + "input failed")); + pieces[j]->infd = -1; + } + } + } + + /* If all sinks on a source have died, close the reading end + * of the pipe from that source. + */ + for (i = 0; i < argc; ++i) { + int got_sink = 0; + if (!known_source[i] || pieces[i]->outfd == -1) + continue; + for (j = 0; j < argc; ++j) { + if (pieces[j]->source == pieces[i] && + pieces[j]->infd != -1) { + got_sink = 1; + break; + } + } + if (got_sink) + continue; + if (close (pieces[i]->outfd)) + error (0, errno, + _("closing pipeline output failed")); + pieces[i]->outfd = -1; + } + + FD_ZERO (&rfds); + FD_ZERO (&wfds); + for (i = 0; i < argc; ++i) { + if (pieces[i]->source && pieces[i]->infd != -1 && + !waiting[i]) { + FD_SET (pieces[i]->infd, &wfds); + if (pieces[i]->infd > maxfd) + maxfd = pieces[i]->infd; + } + if (known_source[i] && pieces[i]->outfd != -1) { + FD_SET (pieces[i]->outfd, &rfds); + if (pieces[i]->outfd > maxfd) + maxfd = pieces[i]->outfd; + } + } + if (maxfd == -1) + break; /* nothing meaningful left to do */ + + ret = select (maxfd + 1, &rfds, &wfds, NULL, NULL); + if (ret < 0 && errno == EINTR) { + /* Did a source or sink pipeline die? */ + for (i = 0; i < argc; ++i) { + if (pieces[i]->ncommands == 0) + continue; + if (known_source[i] && + pieces[i]->outfd != -1) { + int last = pieces[i]->ncommands - 1; + assert (pieces[i]->statuses); + if (pieces[i]->statuses[last] != -1) { + debug ("source pipeline %d " + "died\n", i); + close (pieces[i]->outfd); + pieces[i]->outfd = -1; + } + } + if (pieces[i]->source && + pieces[i]->infd != -1) { + assert (pieces[i]->statuses); + if (pieces[i]->statuses[0] != -1) { + debug ("sink pipeline %d " + "died\n", i); + close (pieces[i]->infd); + pieces[i]->infd = -1; + } + } + } + continue; + } else if (ret < 0) + error (FATAL, errno, "select"); + + /* Read a block of data from each available source pipeline. */ + for (i = 0; i < argc; ++i) { + size_t peek_size, len; + + if (!known_source[i] || pieces[i]->outfd == -1) + continue; + if (!FD_ISSET (pieces[i]->outfd, &rfds)) + continue; + + peek_size = pipeline_peek_size (pieces[i]); + len = peek_size + 4096; + if (!pipeline_peek (pieces[i], &len) || + len == peek_size) { + /* Error or end-of-file; skip this pipeline + * from now on. + */ + debug ("source pipeline %d returned error " + "or EOF\n", i); + close (pieces[i]->outfd); + pieces[i]->outfd = -1; + } else + /* This is rather a large hammer. Whenever + * any data is read from any source + * pipeline, we go through and retry all + * sink pipelines, even if they aren't + * receiving data from the source in + * question. This probably results in a few + * more passes around the select() loop, but + * it eliminates some annoyingly fiddly + * bookkeeping. + */ + memset (waiting, 0, argc * sizeof *waiting); + } + + /* Write as much data as we can to each available sink + * pipeline. + */ + for (i = 0; i < argc; ++i) { + const char *block; + size_t peek_size; + ssize_t w; + size_t minpos; + + if (!pieces[i]->source || pieces[i]->infd == -1) + continue; + if (!FD_ISSET (pieces[i]->infd, &wfds)) + continue; + peek_size = pipeline_peek_size (pieces[i]->source); + if (peek_size <= pos[i]) { + /* Disable reading until data is read from a + * source fd or a child process exits, so + * that we neither spin nor block if the + * source is slow. + */ + waiting[i] = 1; + continue; + } + + /* peek a block from the source */ + block = pipeline_peek (pieces[i]->source, &peek_size); + /* should all already be in the peek cache */ + assert (block); + assert (peek_size); + + /* write as much of it as will fit to the sink */ + for (;;) { + w = write (pieces[i]->infd, block + pos[i], + peek_size - pos[i]); + if (w >= 0 || errno == EAGAIN) + break; + if (errno == EINTR) + continue; + /* Failure to save a cat page shouldn't + * impede displaying the page in a pager, so + * we report errors later. + */ + if (errno != EPIPE) + write_error[i] = errno; + close (pieces[i]->infd); + pieces[i]->infd = -1; + goto next_sink; + } + pos[i] += w; + minpos = pos[i]; + + /* check other sinks on the same source, and update + * the source's read position if earlier data is no + * longer needed by any sink + */ + for (j = 0; j < argc; ++j) { + if (pieces[i]->source != pieces[j]->source || + pieces[j]->infd == -1) + continue; + if (pos[j] < minpos) + minpos = pos[j]; + /* If the source is dead and all data has + * been written to this sink, close the + * writing end of the pipe to the sink. + */ + if (pieces[j]->source->outfd == -1 && + pos[j] >= peek_size) { + close (pieces[j]->infd); + pieces[j]->infd = -1; + } + } + + /* If some data has been written to all sinks, + * discard it from the source's peek cache. + */ + pipeline_peek_skip (pieces[i]->source, minpos); + for (j = 0; j < argc; ++j) { + if (pieces[i]->source == pieces[j]->source) + pos[j] -= minpos; + } +next_sink: ; + } + } + +#ifdef SA_RESTART + xsigaction (SIGCHLD, NULL, &sa); + sa.sa_flags |= SA_RESTART; + xsigaction (SIGCHLD, &sa, NULL); +#endif + +#ifdef SIGPIPE + xsigaction (SIGPIPE, &osa_sigpipe, NULL); +#endif + + for (i = 0; i < argc; ++i) { + int flags; + if (blocking_in[i] && pieces[i]->infd != -1) { + flags = fcntl (pieces[i]->infd, F_GETFL); + fcntl (pieces[i]->infd, F_SETFL, flags & ~O_NONBLOCK); + } + if (blocking_out[i] && pieces[i]->outfd != -1) { + flags = fcntl (pieces[i]->outfd, F_GETFL); + fcntl (pieces[i]->outfd, F_SETFL, flags & ~O_NONBLOCK); + } + } + + for (i = 0; i < argc; ++i) { + if (write_error[i]) + error (FATAL, write_error[i], "write to sink %d", i); + } + + free (write_error); + free (waiting); + free (blocking_out); + free (blocking_in); + free (pieces); + free (pos); +} + +/* ---------------------------------------------------------------------- */ + +/* Functions to read output from pipelines. */ + +static const char *get_block (pipeline *p, size_t *len, int peek) +{ + size_t readstart = 0, retstart = 0; + size_t space = p->bufmax; + size_t toread = *len; + ssize_t r; + + if (p->buffer && p->peek_offset) { + if (p->peek_offset >= toread) { + /* We've got the whole thing in the peek cache; just + * return it. + */ + const char *buffer; + assert (p->peek_offset <= p->buflen); + buffer = p->buffer + p->buflen - p->peek_offset; + if (!peek) + p->peek_offset -= toread; + return buffer; + } else { + readstart = p->buflen; + retstart = p->buflen - p->peek_offset; + space -= p->buflen; + toread -= p->peek_offset; + } + } + + if (toread > space) { + if (p->buffer) + p->bufmax = readstart + toread; + else + p->bufmax = toread; + p->buffer = realloc (p->buffer, p->bufmax + 1); + } + + if (!peek) + p->peek_offset = 0; + + assert (p->outfd != -1); + r = read (p->outfd, p->buffer + readstart, toread); + if (r == -1) + return NULL; + p->buflen = readstart + r; + if (peek) + p->peek_offset += r; + *len -= (toread - r); + + return p->buffer + retstart; +} + +const char *pipeline_read (pipeline *p, size_t *len) +{ + return get_block (p, len, 0); +} + +const char *pipeline_peek (pipeline *p, size_t *len) +{ + return get_block (p, len, 1); +} + +size_t pipeline_peek_size (pipeline *p) +{ + if (!p->buffer) + return 0; + return p->peek_offset; +} + +void pipeline_peek_skip (pipeline *p, size_t len) +{ + if (len > 0) { + assert (p->buffer); + assert (len <= p->peek_offset); + p->peek_offset -= len; + } +} + +/* readline and peekline repeatedly peek larger and larger buffers until + * they find a newline or they fail. readline then adjusts the peek offset. + */ + +static const char *get_line (pipeline *p, size_t *outlen) +{ + const size_t block = 4096; + const char *buffer = NULL, *end = NULL; + int i; + + if (p->line_cache) { + free (p->line_cache); + p->line_cache = NULL; + } + + if (outlen) + *outlen = 0; + + for (i = 0; ; ++i) { + size_t plen = block * (i + 1); + + buffer = get_block (p, &plen, 1); + if (!buffer || plen == 0) + return NULL; + + end = strchr (buffer + block * i, '\n'); + if (!end && plen < block * (i + 1)) + /* end of file, no newline found */ + end = buffer + plen - 1; + if (end) + break; + } + + if (end) { + p->line_cache = xstrndup (buffer, end - buffer + 1); + if (outlen) + *outlen = end - buffer + 1; + return p->line_cache; + } else + return NULL; +} + +const char *pipeline_readline (pipeline *p) +{ + size_t buflen; + const char *buffer = get_line (p, &buflen); + if (buffer) + p->peek_offset -= buflen; + return buffer; +} + +const char *pipeline_peekline (pipeline *p) +{ + return get_line (p, NULL); +} diff --git a/lib/pipeline.h b/lib/pipeline.h index 123829ae..e3951856 100644 --- a/lib/pipeline.h +++ b/lib/pipeline.h @@ -1,6 +1,6 @@ /* Copyright (C) 1989, 1990, 1991, 1992, 2000, 2002 * Free Software Foundation, Inc. - * Copyright (C) 2003 Colin Watson. + * Copyright (C) 2003, 2004, 2005, 2007 Colin Watson. * Written for groff by James Clark (jjc@jclark.com) * Adapted for man-db by Colin Watson. * @@ -28,13 +28,29 @@ #include <stdarg.h> #include <sys/types.h> +enum command_tag { + COMMAND_PROCESS, + COMMAND_FUNCTION +}; + +typedef void command_function_type (void *); + typedef struct command { + enum command_tag tag; char *name; - int argc; - int argv_max; /* size of allocated array */ - char **argv; int nice; int discard_err; /* discard stderr? */ + union { + struct command_process { + int argc; + int argv_max; /* size of allocated array */ + char **argv; + } process; + struct command_function { + command_function_type *func; + void *data; + } function; + } u; } command; typedef struct pipeline { @@ -49,17 +65,48 @@ typedef struct pipeline { * whole pipeline. If negative, pipeline_start() will create pipes * and store the input writing half and the output reading half in * infd and outfd as appropriate. If zero, input and output will be - * left as stdin and stdout. + * left as stdin and stdout unless want_infile or want_outfile + * respectively is set. */ int want_in, want_out; - /* See above. The caller should consider these read-only. */ + /* To be set (and freed) by the caller. If non-NULL, these contain + * files to open and use as the input and output of the whole + * pipeline. These are only used if want_in or want_out respectively + * is zero. The value of using these rather than simply opening the + * files before starting the pipeline is that the files will be + * opened with the same privileges under which the pipeline is being + * run. + */ + const char *want_infile, *want_outfile; + + /* See above. Default to -1. The caller should consider these + * read-only. + */ int infd, outfd; /* Set by pipeline_get_infile() and pipeline_get_outfile() - * respectively. + * respectively. Default to NULL. */ FILE *infile, *outfile; + + /* Set by pipeline_connect() to record that this pipeline reads its + * input from another pipeline. Defaults to NULL. + */ + struct pipeline *source; + + /* Private buffer for use by read/peek functions. */ + char *buffer; + size_t buflen, bufmax; + + /* The last line returned by readline/peekline. Private. */ + char *line_cache; + + /* The amount of data at the end of buffer which has been + * read-ahead, either by an explicit peek or by readline/peekline + * reading a block at a time to save work. Private. + */ + size_t peek_offset; } pipeline; /* ---------------------------------------------------------------------- */ @@ -83,6 +130,17 @@ command *command_new_args (const char *name, ...) ATTRIBUTE_SENTINEL; */ command *command_new_argstr (const char *argstr); +/* Construct a new command that calls a given function rather than executing + * a process. The data argument is passed as the function's only argument. + * There is currently no provision for freeing data (TODO), so the caller + * should make sure that data's lifetime exceeds that of the command. + * + * command_* functions that deal with arguments cannot be used with the + * command returned by this function. + */ +command *command_new_function (const char *name, + command_function_type *func, void *data); + /* Return a duplicate of a command. */ command *command_dup (command *cmd); @@ -103,6 +161,14 @@ void command_args (command *cmd, ...) ATTRIBUTE_SENTINEL; */ void command_argstr (command *cmd, const char *argstr); +/* Dump a string representation of a command to stream. */ +void command_dump (command *cmd, FILE *stream); + +/* Return a string representation of a command. The caller should free the + * result. + */ +char *command_tostring (command *cmd); + /* Destroy a command. Safely does nothing on NULL. */ void command_free (command *cmd); @@ -120,10 +186,26 @@ pipeline *pipeline_new_commandv (command *cmd1, va_list cmdv); pipeline *pipeline_new_commands (command *cmd1, ...) ATTRIBUTE_SENTINEL; /* Joins two pipelines, neither of which are allowed to be started. Discards - * want_out and outfd from p1, and want_in and infd from p2. + * want_out, want_outfile, and outfd from p1, and want_in, want_infile, and + * infd from p2. */ pipeline *pipeline_join (pipeline *p1, pipeline *p2); +/* Connect the input of one or more sink pipelines to the output of a source + * pipeline. The source pipeline may be started, but in that case want_out + * must be negative; otherwise, discards want_out from source. In any event, + * discards want_in from all sinks, none of which are allowed to be started. + * Terminate arguments with NULL. + * + * This is an application-level connection; data may be intercepted between + * the pipelines by the program before calling pipeline_pump(), which sets + * data flowing from the source to the sinks. It is primarily useful when + * more than one sink pipeline is involved, in which case the pipelines + * cannot simply be concatenated into one. + */ +void pipeline_connect (pipeline *source, pipeline *sink, ...) + ATTRIBUTE_SENTINEL; + /* Add a command to a pipeline. */ void pipeline_command (pipeline *p, command *cmd); @@ -174,4 +256,49 @@ int pipeline_wait (pipeline *p); */ void pipeline_install_sigchld (void); +/* Pump data among one or more pipelines connected using pipeline_connect() + * until all source pipelines have reached end-of-file and all data has been + * written to all sinks (or failed). All relevant pipelines must be + * supplied: that is, no pipeline that has been connected to a source + * pipeline may be supplied unless that source pipeline is also supplied. + * Automatically starts all pipelines if they are not already started, but + * does not wait for them. + */ +void pipeline_pump (pipeline *p, ...) ATTRIBUTE_SENTINEL; + +/* ---------------------------------------------------------------------- */ + +/* Functions to read output from pipelines. */ + +/* Read len bytes of data from the pipeline, returning the data block. len + * is updated with the number of bytes read. + */ +const char *pipeline_read (pipeline *p, size_t *len); + +/* Look ahead in the pipeline's output for len bytes of data, returning the + * data block. len is updated with the number of bytes read. The starting + * position of the next read or peek is not affected by this call. + */ +const char *pipeline_peek (pipeline *p, size_t *len); + +/* Return the number of bytes of data that can be read using pipeline_read + * or pipeline_peek solely from the peek cache, without having to read from + * the pipeline itself (and thus potentially block). + */ +size_t pipeline_peek_size (pipeline *p); + +/* Skip over and discard len bytes of data from the peek cache. Asserts that + * enough data is available to skip, so you may want to check using + * pipeline_peek_size first. + */ +void pipeline_peek_skip (pipeline *p, size_t len); + +/* Read a line of data from the pipeline, returning it. */ +const char *pipeline_readline (pipeline *p); + +/* Look ahead in the pipeline's output for a line of data, returning it. The + * starting position of the next read or peek is not affected by this call. + */ +const char *pipeline_peekline (pipeline *p); + #endif /* PIPELINE_H */ |