summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorColin Watson <cjwatson@debian.org>2007-08-28 11:06:10 +0100
committerColin Watson <cjwatson@debian.org>2007-08-28 11:06:10 +0100
commit14f72ab95d51d5cc6cf89b4a4517d17af9e9f780 (patch)
tree559e52d9147c561fb7fb4e877d17f3fc80433d5f /lib
parent151dbea65dba7b8c079126e80efc9b0151002e72 (diff)
* lib/pipeline.h (struct command): Add support for commands that
consist of calling a function rather than executing a process. (struct pipeline): Add want_infile and want_outfile members. Note that infile and outfile default to NULL. Add source, buffer, buflen, bufmax, line_cache, and peek_offset members. (command_new_function, command_dump, command_tostring, pipeline_connect, pipeline_pump, pipeline_read, pipeline_peek, pipeline_peek_size, pipeline_peek_skip, pipeline_readline, pipeline_peekline): New prototypes. (pipeline_join): Update description for want_infile and want_outfile. * lib/pipeline.c (command_new, command_dup, command_arg, command_argv, command_args, command_argstr, command_free): Update for 'struct command' changes. (command_new_function, command_dump, command_tostring): New functions. (pipeline_new, pipeline_join, pipeline_dump, pipeline_tostring): Update for 'struct pipeline' changes. (pipeline_dump): Use command_dump. (pipeline_tostring): Use command_tostring. (pipeline_start): Implement want_infile, want_outfile, and function commands. Make zero-command case work properly (read directly from input file). (pipeline_connect, pipeline_pump, get_block, pipeline_read, pipeline_peek, pipeline_peek_size, pipeline_peek_skip, get_line, pipeline_readline, pipeline_peekline): New functions. * lib/decompress.c, lib/decompress.h: New files, implementing a decompression abstraction layer. * lib/Makefile.in: Always compile decompress.c. * src/lexgrog.l (YY_INPUT): Define to read from a 'struct pipeline'. (find_name): Use decompress_open/decompress_fdopen rather than older decompression methods. * src/check_mandirs.c (test_manfile): Remove decompression code; find_name will handle this itself now. * src/compression.c (create_ztemp, decompress, remove_ztemp, get_ztemp): Remove. * src/man.c (checked_popen): Remove. (local_man_loop): Remove decompression code. (get_preprocessors_from_file): Convert to peeking the first line from a pipeline. (get_preprocessors): Adjust arguments and get_preprocessors_from_file call. (remove_stdintmp, create_stdintmp): Remove. (make_roff_command): Remove special handling of stdin, now handled by peeking a pipeline. (open_cat_stream): Don't start the returned pipeline. (close_cat_stream): Remove unnecessary COMP_CAT special case. (format_display_and_save): Take a decompressor pipeline argument. Simplify by using pipeline_connect and pipeline_pump. (format_display): Take a decompressor pipeline argument. Use pipeline_connect and pipeline_pump. (display): Use decompress_open/decompress_fdopen rather than older decompression methods. (display_filesystem, display_database): Drop remove_ztemp calls. * src/straycats.c (check_for_stray): Use decompression layer. Rearrange control flow a bit to reduce duplication. * src/ult_src.c (ult_src): Use pipeline and decompression layers rather than older decompression methods. * configure.ac: Remove decompressor variable. Check for gzopen in libz. * include/manconfig.h.in (DECOMPRESSOR): Remove. (comp_list): Declare extern here. (decompress, remove_ztemp, get_ztemp): Remove. * src/man_db.conf.in (decompressor): Remove. * docs/TODO: Update.
Diffstat (limited to 'lib')
-rw-r--r--lib/Makefile.in3
-rw-r--r--lib/decompress.c158
-rw-r--r--lib/decompress.h36
-rw-r--r--lib/pipeline.c744
-rw-r--r--lib/pipeline.h143
5 files changed, 1033 insertions, 51 deletions
diff --git a/lib/Makefile.in b/lib/Makefile.in
index c6b8bef3..5cce2532 100644
--- a/lib/Makefile.in
+++ b/lib/Makefile.in
@@ -38,7 +38,7 @@ ALLSRCS = alloca.c basename.c cleanup.c error.c fnmatch.c getopt.c getopt1.c \
unsetenv.c xmalloc.c xstrdup.c xstrndup.c \
waitpid.c tempfile.c mkdtemp.c mkstemp.c tempname.c hashtable.c \
xstrsignal.c xsigaction.c pipeline.c getcwdalloc.c pathsearch.c \
- linelength.c
+ linelength.c decompress.c
ALLOBJS = $(ALLSRCS:.c=.o)
@@ -46,6 +46,7 @@ XOBJS = xstrdup.o xstrndup.o xmalloc.o error.o xstrsignal.o xsigaction.o
ALL = @LIBOBJS@ @ALLOCA@ $(libobjects) cleanup.o strappend.o tempfile.o \
hashtable.o pipeline.o getcwdalloc.o pathsearch.o debug.o linelength.o \
+ decompress.o \
$(XOBJS)
all: $(library)
diff --git a/lib/decompress.c b/lib/decompress.c
new file mode 100644
index 00000000..8be68a61
--- /dev/null
+++ b/lib/decompress.c
@@ -0,0 +1,158 @@
+/*
+ * decompress.c: decompression abstraction layer
+ *
+ * Copyright (C) 2007 Colin Watson.
+ *
+ * This file is part of man-db.
+ *
+ * man-db is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * man-db is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with man-db; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif /* HAVE_CONFIG_H */
+
+#if defined(STDC_HEADERS)
+# include <string.h>
+# include <stdlib.h>
+#elif defined(HAVE_STRING_H)
+# include <string.h>
+#elif defined(HAVE_STRINGS_H)
+# include <strings.h>
+#else /* no string(s) header */
+#endif /* STDC_HEADERS */
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#ifdef HAVE_UNISTD_H
+# include <unistd.h>
+#endif
+#include <fcntl.h>
+
+#ifdef HAVE_LIBZ
+# include "zlib.h"
+#endif /* HAVE_LIBZ */
+
+#include "manconfig.h"
+#include "comp_src.h"
+#include "pipeline.h"
+#include "decompress.h"
+
+#ifdef HAVE_LIBZ
+
+static void decompress_zlib (void *data ATTRIBUTE_UNUSED)
+{
+ gzFile zlibfile;
+
+ zlibfile = gzdopen (dup (fileno (stdin)), "r");
+ if (!zlibfile)
+ return;
+
+ for (;;) {
+ char buffer[4096];
+ int r = gzread (zlibfile, buffer, 4096);
+ if (r <= 0)
+ break;
+ if (fwrite (buffer, 1, (size_t) r, stdout) < (size_t) r)
+ break;
+ }
+
+ gzclose (zlibfile);
+ return;
+}
+
+#endif /* HAVE_LIBZ */
+
+pipeline *decompress_open (const char *filename)
+{
+ command *cmd;
+ pipeline *p;
+ struct stat st;
+#ifdef HAVE_LIBZ
+ size_t filename_len;
+#endif /* HAVE_LIBZ */
+ char *ext;
+ struct compression *comp;
+
+ if (stat (filename, &st) < 0)
+ return NULL;
+
+#ifdef HAVE_LIBZ
+ filename_len = strlen (filename);
+ if (filename_len > 3 && STREQ (filename + filename_len - 3, ".gz")) {
+ /* informational only; no shell quoting concerns */
+ char *name = strappend (NULL, "zcat < ", filename, NULL);
+ cmd = command_new_function (name, &decompress_zlib, NULL);
+ free (name);
+ p = pipeline_new_commands (cmd, NULL);
+ goto got_pipeline;
+ }
+#endif /* HAVE_LIBZ */
+
+ ext = strrchr (filename, '.');
+ if (ext) {
+ ++ext;
+
+ for (comp = comp_list; comp->ext; ++comp) {
+ if (!STREQ (comp->ext, ext))
+ continue;
+
+ cmd = command_new_argstr (comp->prog);
+ command_arg (cmd, filename);
+ p = pipeline_new_commands (cmd, NULL);
+ goto got_pipeline;
+ }
+ }
+
+#ifdef HAVE_GZIP
+ /* HP-UX */
+ ext = strstr (filename, ".Z/");
+ if (ext) {
+ cmd = command_new_argstr (GUNZIP " -S \"\"");
+ command_arg (cmd, filename);
+ p = pipeline_new_commands (cmd, NULL);
+ goto got_pipeline;
+ }
+#endif
+
+ p = pipeline_new ();
+
+got_pipeline:
+ p->want_infile = filename;
+ p->want_out = -1;
+ pipeline_start (p);
+ return p;
+}
+
+pipeline *decompress_fdopen (int fd)
+{
+ pipeline *p;
+#ifdef HAVE_LIBZ
+ command *cmd;
+#endif /* HAVE_LIBZ */
+
+#ifdef HAVE_LIBZ
+ cmd = command_new_function ("zcat", &decompress_zlib, NULL);
+ p = pipeline_new_commands (cmd, NULL);
+#else /* HAVE_LIBZ */
+ p = pipeline_new ();
+#endif /* HAVE_LIBZ */
+
+ p->want_in = fd;
+ p->want_out = -1;
+ pipeline_start (p);
+ return p;
+}
diff --git a/lib/decompress.h b/lib/decompress.h
new file mode 100644
index 00000000..5cbabbee
--- /dev/null
+++ b/lib/decompress.h
@@ -0,0 +1,36 @@
+/*
+ * decompress.h: interface to decompression abstraction layer
+ *
+ * Copyright (C) 2007 Colin Watson.
+ *
+ * This file is part of man-db.
+ *
+ * man-db is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * man-db is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with man-db; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef MAN_DECOMPRESS_H
+#define MAN_DECOMPRESS_H
+
+#include "lib/pipeline.h"
+
+struct decompress;
+
+/* Open a decompressor reading from FILENAME. */
+pipeline *decompress_open (const char *filename);
+
+/* Open a decompressor reading from file descriptor FD. */
+pipeline *decompress_fdopen (int fd);
+
+#endif /* MAN_DECOMPRESS_H */
diff --git a/lib/pipeline.c b/lib/pipeline.c
index 228abccb..42a1f8b0 100644
--- a/lib/pipeline.c
+++ b/lib/pipeline.c
@@ -1,6 +1,6 @@
/* Copyright (C) 1989, 1990, 1991, 1992, 2000, 2001, 2002, 2003
* Free Software Foundation, Inc.
- * Copyright (C) 2003 Colin Watson.
+ * Copyright (C) 2003, 2004, 2005, 2006, 2007 Colin Watson.
* Written for groff by James Clark (jjc@jclark.com)
* Heavily adapted and extended for man-db by Colin Watson.
*
@@ -29,17 +29,20 @@
#include <stdlib.h>
#include <signal.h>
#include <errno.h>
+/* TODO: requires POSIX 1003.1-2001 */
+#include <sys/select.h>
+#include <sys/time.h>
#include <sys/types.h>
#include <fcntl.h>
#ifdef HAVE_UNISTD_H
# include <unistd.h>
#endif
+#include <fcntl.h>
#include <stdarg.h>
#include <assert.h>
-
-#ifdef HAVE_STRERROR
#include <string.h>
-#else
+
+#ifndef HAVE_STRERROR
extern char *strerror ();
#endif
@@ -65,15 +68,20 @@ extern char *strerror ();
command *command_new (const char *name)
{
command *cmd = xmalloc (sizeof *cmd);
+ struct command_process *cmdp;
char *name_copy;
+ cmd->tag = COMMAND_PROCESS;
cmd->name = xstrdup (name);
- cmd->argc = 0;
- cmd->argv_max = 4;
- cmd->argv = xmalloc (cmd->argv_max * sizeof *cmd->argv);
cmd->nice = 0;
cmd->discard_err = 0;
+ cmdp = &cmd->u.process;
+
+ cmdp->argc = 0;
+ cmdp->argv_max = 4;
+ cmdp->argv = xmalloc (cmdp->argv_max * sizeof *cmdp->argv);
+
/* argv[0] is the basename of the command name. */
name_copy = xstrdup (name);
command_arg (cmd, basename (name_copy));
@@ -243,43 +251,90 @@ command *command_new_argstr (const char *argstr)
return cmd;
}
+command *command_new_function (const char *name,
+ command_function_type *func, void *data)
+{
+ command *cmd = xmalloc (sizeof *cmd);
+ struct command_function *cmdf;
+
+ cmd->tag = COMMAND_FUNCTION;
+ cmd->name = xstrdup (name);
+ cmd->nice = 0;
+
+ cmdf = &cmd->u.function;
+
+ cmdf->func = func;
+ cmdf->data = data;
+
+ return cmd;
+}
+
command *command_dup (command *cmd)
{
command *newcmd = xmalloc (sizeof *newcmd);
int i;
+ newcmd->tag = cmd->tag;
newcmd->name = xstrdup (cmd->name);
- newcmd->argc = cmd->argc;
- newcmd->argv_max = cmd->argv_max;
- assert (newcmd->argc < newcmd->argv_max);
- newcmd->argv = xmalloc (newcmd->argv_max * sizeof *newcmd->argv);
newcmd->nice = cmd->nice;
newcmd->discard_err = cmd->discard_err;
- for (i = 0; i < cmd->argc; ++i)
- newcmd->argv[i] = xstrdup (cmd->argv[i]);
- newcmd->argv[cmd->argc] = NULL;
+ switch (newcmd->tag) {
+ case COMMAND_PROCESS: {
+ struct command_process *cmdp = &cmd->u.process;
+ struct command_process *newcmdp = &newcmd->u.process;
+
+ newcmdp->argc = cmdp->argc;
+ newcmdp->argv_max = cmdp->argv_max;
+ assert (newcmdp->argc < newcmdp->argv_max);
+ newcmdp->argv = xmalloc
+ (newcmdp->argv_max * sizeof *newcmdp->argv);
+
+ for (i = 0; i < cmdp->argc; ++i)
+ newcmdp->argv[i] = xstrdup (cmdp->argv[i]);
+ newcmdp->argv[cmdp->argc] = NULL;
+
+ break;
+ }
+
+ case COMMAND_FUNCTION: {
+ struct command_function *cmdf = &cmd->u.function;
+ struct command_function *newcmdf = &newcmd->u.function;
+
+ newcmdf->func = cmdf->func;
+ newcmdf->data = cmdf->data;
+
+ break;
+ }
+ }
return newcmd;
}
void command_arg (command *cmd, const char *arg)
{
- if (cmd->argc + 1 >= cmd->argv_max) {
- cmd->argv_max *= 2;
- cmd->argv = xrealloc (cmd->argv,
- cmd->argv_max * sizeof *cmd->argv);
+ struct command_process *cmdp;
+
+ assert (cmd->tag == COMMAND_PROCESS);
+ cmdp = &cmd->u.process;
+
+ if (cmdp->argc + 1 >= cmdp->argv_max) {
+ cmdp->argv_max *= 2;
+ cmdp->argv = xrealloc (cmdp->argv,
+ cmdp->argv_max * sizeof *cmdp->argv);
}
- cmd->argv[cmd->argc++] = xstrdup (arg);
- assert (cmd->argc < cmd->argv_max);
- cmd->argv[cmd->argc] = NULL;
+ cmdp->argv[cmdp->argc++] = xstrdup (arg);
+ assert (cmdp->argc < cmdp->argv_max);
+ cmdp->argv[cmdp->argc] = NULL;
}
void command_argv (command *cmd, va_list argv)
{
const char *arg = va_arg (argv, const char *);
+ assert (cmd->tag == COMMAND_PROCESS);
+
while (arg) {
command_arg (cmd, arg);
arg = va_arg (argv, const char *);
@@ -290,6 +345,8 @@ void command_args (command *cmd, ...)
{
va_list argv;
+ assert (cmd->tag == COMMAND_PROCESS);
+
va_start (argv, cmd);
command_argv (cmd, argv);
va_end (argv);
@@ -299,12 +356,63 @@ void command_argstr (command *cmd, const char *argstr)
{
char *arg;
+ assert (cmd->tag == COMMAND_PROCESS);
+
while ((arg = argstr_get_word (&argstr))) {
command_arg (cmd, arg);
free (arg);
}
}
+void command_dump (command *cmd, FILE *stream)
+{
+ switch (cmd->tag) {
+ case COMMAND_PROCESS: {
+ struct command_process *cmdp = &cmd->u.process;
+ int i;
+
+ fputs (cmd->name, stream);
+ for (i = 1; i < cmdp->argc; ++i) {
+ /* TODO: escape_shell()? */
+ putc (' ', stream);
+ fputs (cmdp->argv[i], stream);
+ }
+
+ break;
+ }
+
+ case COMMAND_FUNCTION:
+ fputs (cmd->name, stream);
+ break;
+ }
+}
+
+char *command_tostring (command *cmd)
+{
+ char *out = NULL;
+
+ switch (cmd->tag) {
+ case COMMAND_PROCESS: {
+ struct command_process *cmdp = &cmd->u.process;
+ int i;
+
+ out = strappend (out, cmd->name, NULL);
+ for (i = 1; i < cmdp->argc; ++i)
+ /* TODO: escape_shell()? */
+ out = strappend (out, " ", cmdp->argv[i],
+ NULL);
+
+ break;
+ }
+
+ case COMMAND_FUNCTION:
+ out = xstrdup (cmd->name);
+ break;
+ }
+
+ return out;
+}
+
void command_free (command *cmd)
{
int i;
@@ -313,9 +421,22 @@ void command_free (command *cmd)
return;
free (cmd->name);
- for (i = 0; i < cmd->argc; ++i)
- free (cmd->argv[i]);
- free (cmd->argv);
+
+ switch (cmd->tag) {
+ case COMMAND_PROCESS: {
+ struct command_process *cmdp = &cmd->u.process;
+
+ for (i = 0; i < cmdp->argc; ++i)
+ free (cmdp->argv[i]);
+ free (cmdp->argv);
+
+ break;
+ }
+
+ case COMMAND_FUNCTION:
+ break;
+ }
+
free (cmd);
}
@@ -332,8 +453,14 @@ pipeline *pipeline_new (void)
p->pids = NULL;
p->statuses = NULL;
p->want_in = p->want_out = 0;
+ p->want_infile = p->want_outfile = NULL;
p->infd = p->outfd = -1;
p->infile = p->outfile = NULL;
+ p->source = NULL;
+ p->buffer = NULL;
+ p->buflen = p->bufmax = 0;
+ p->line_cache = NULL;
+ p->peek_offset = 0;
return p;
}
@@ -373,7 +500,9 @@ pipeline *pipeline_join (pipeline *p1, pipeline *p2)
p->pids = NULL;
p->statuses = NULL;
p->want_in = p1->want_in;
+ p->want_infile = p1->want_infile;
p->want_out = p2->want_out;
+ p->want_outfile = p2->want_outfile;
p->infd = p1->infd;
p->outfd = p2->outfd;
p->infile = p1->infile;
@@ -387,6 +516,31 @@ pipeline *pipeline_join (pipeline *p1, pipeline *p2)
return p;
}
+void pipeline_connect (pipeline *source, pipeline *sink, ...)
+{
+ va_list argv;
+ pipeline *arg;
+
+ /* We must be in control of output from the source pipeline. If the
+ * source isn't started, we can force this.
+ */
+ if (!source->pids) {
+ source->want_out = -1;
+ source->want_outfile = NULL;
+ }
+ assert (source->want_out < 0);
+ assert (!source->want_outfile);
+
+ va_start (argv, sink);
+ for (arg = sink; arg; arg = va_arg (argv, pipeline *)) {
+ assert (!arg->pids); /* not started */
+ arg->source = source;
+ arg->want_in = -1;
+ arg->want_infile = NULL;
+ }
+ va_end (argv);
+}
+
void pipeline_command (pipeline *p, command *cmd)
{
if (p->ncommands >= p->commands_max) {
@@ -461,33 +615,27 @@ FILE *pipeline_get_outfile (pipeline *p)
void pipeline_dump (pipeline *p, FILE *stream)
{
- int i, j;
+ int i;
for (i = 0; i < p->ncommands; ++i) {
- fputs (p->commands[i]->name, stream);
- for (j = 1; j < p->commands[i]->argc; ++j) {
- /* TODO: escape_shell()? */
- putc (' ', stream);
- fputs (p->commands[i]->argv[j], stream);
- }
+ command_dump (p->commands[i], stream);
if (i < p->ncommands - 1)
fputs (" | ", stream);
}
- fprintf (stream, " [input: %d, output: %d]\n",
- p->want_in, p->want_out);
+ fprintf (stream, " [input: {%d, %s}, output: {%d, %s}]\n",
+ p->want_in, p->want_infile ? p->want_infile : "NULL",
+ p->want_out, p->want_outfile ? p->want_outfile : "NULL");
}
char *pipeline_tostring (pipeline *p)
{
char *out = NULL;
- int i, j;
+ int i;
for (i = 0; i < p->ncommands; ++i) {
- out = strappend (out, p->commands[i]->name, NULL);
- for (j = 1; j < p->commands[i]->argc; ++j)
- /* TODO: escape_shell()? */
- out = strappend (out, " ", p->commands[i]->argv[j],
- NULL);
+ char *cmdout = command_tostring (p->commands[i]);
+ out = strappend (out, cmdout, NULL);
+ free (cmdout);
if (i < p->ncommands - 1)
out = strappend (out, " | ", NULL);
}
@@ -603,6 +751,12 @@ void pipeline_start (pipeline *p)
p->infd = infd[1];
} else if (p->want_in > 0)
last_input = p->want_in;
+ else if (p->want_infile) {
+ last_input = open (p->want_infile, O_RDONLY);
+ if (last_input < 0)
+ error (FATAL, errno, _("can't open %s"),
+ p->want_infile);
+ }
for (i = 0; i < p->ncommands; i++) {
int pdes[2];
@@ -616,8 +770,17 @@ void pipeline_start (pipeline *p)
p->outfd = pdes[0];
output_read = pdes[0];
output_write = pdes[1];
- } else if (i == p->ncommands - 1 && p->want_out > 0)
- output_write = p->want_out;
+ } else if (i == p->ncommands - 1) {
+ if (p->want_out > 0)
+ output_write = p->want_out;
+ else if (p->want_outfile) {
+ output_write = open (p->want_outfile,
+ O_WRONLY);
+ if (output_write < 0)
+ error (FATAL, errno, "can't open %s",
+ p->want_outfile);
+ }
+ }
/* Block SIGCHLD so that the signal handler doesn't collect
* the exit status before we've filled in the pids array.
@@ -696,7 +859,26 @@ void pipeline_start (pipeline *p)
xsigaction (SIGINT, &osa_sigint, NULL);
xsigaction (SIGQUIT, &osa_sigquit, NULL);
- execvp (p->commands[i]->name, p->commands[i]->argv);
+ switch (p->commands[i]->tag) {
+ case COMMAND_PROCESS: {
+ struct command_process *cmdp =
+ &p->commands[i]->u.process;
+ execvp (p->commands[i]->name,
+ cmdp->argv);
+ }
+
+ /* TODO: ideally, could there be a facility
+ * to execute non-blocking functions without
+ * needing to fork?
+ */
+ case COMMAND_FUNCTION: {
+ struct command_function *cmdf =
+ &p->commands[i]->u.function;
+ (*cmdf->func) (cmdf->data);
+ exit (0);
+ }
+ }
+
error (EXEC_FAILED_EXIT_STATUS, errno,
_("can't execute %s"), p->commands[i]->name);
}
@@ -722,6 +904,9 @@ void pipeline_start (pipeline *p)
debug ("Started \"%s\", pid %d\n", p->commands[i]->name, pid);
}
+
+ if (p->ncommands == 0)
+ p->outfd = last_input;
}
static int sigchld = 0;
@@ -954,3 +1139,478 @@ void pipeline_install_sigchld (void)
if (xsigaction (SIGCHLD, &act, NULL) == -1)
error (FATAL, errno, _("can't install SIGCHLD handler"));
}
+
+void pipeline_pump (pipeline *p, ...)
+{
+ va_list argv;
+ int argc, i, j;
+ pipeline *arg, **pieces;
+ size_t *pos;
+ int *known_source, *blocking_in, *blocking_out, *waiting, *write_error;
+ struct sigaction sa, osa_sigpipe;
+
+ /* Count pipelines and allocate space for arrays. */
+ va_start (argv, p);
+ argc = 0;
+ for (arg = p; arg; arg = va_arg (argv, pipeline *))
+ ++argc;
+ va_end (argv);
+ pieces = xmalloc (argc * sizeof *pieces);
+ pos = xmalloc (argc * sizeof *pos);
+ known_source = xmalloc (argc * sizeof *known_source);
+ blocking_in = xmalloc (argc * sizeof *blocking_in);
+ blocking_out = xmalloc (argc * sizeof *blocking_out);
+ waiting = xmalloc (argc * sizeof *waiting);
+ write_error = xmalloc (argc * sizeof *write_error);
+
+ /* Set up arrays of pipelines and their read positions. Start all
+ * pipelines if necessary.
+ */
+ va_start (argv, p);
+ for (arg = p, i = 0; i < argc; arg = va_arg (argv, pipeline *), ++i) {
+ pieces[i] = arg;
+ pos[i] = 0;
+ if (!pieces[i]->pids)
+ pipeline_start (pieces[i]);
+ }
+ assert (arg == NULL);
+ va_end (argv);
+
+ /* All source pipelines must be supplied as arguments. */
+ memset (known_source, 0, argc * sizeof *known_source);
+ for (i = 0; i < argc; ++i) {
+ int found = 0;
+ if (!pieces[i]->source)
+ continue;
+ for (j = 0; j < argc; ++j) {
+ if (pieces[i]->source == pieces[j]) {
+ known_source[j] = found = 1;
+ break;
+ }
+ }
+ assert (found);
+ }
+
+ memset (blocking_in, 0, argc * sizeof *blocking_in);
+ memset (blocking_out, 0, argc * sizeof *blocking_out);
+ for (i = 0; i < argc; ++i) {
+ int flags;
+ if (pieces[i]->infd != -1) {
+ flags = fcntl (pieces[i]->infd, F_GETFL);
+ if (!(flags & O_NONBLOCK)) {
+ blocking_in[i] = 1;
+ fcntl (pieces[i]->infd, F_SETFL,
+ flags | O_NONBLOCK);
+ }
+ }
+ if (pieces[i]->outfd != -1) {
+ flags = fcntl (pieces[i]->outfd, F_GETFL);
+ if (!(flags & O_NONBLOCK)) {
+ blocking_out[i] = 1;
+ fcntl (pieces[i]->outfd, F_SETFL,
+ flags | O_NONBLOCK);
+ }
+ }
+ }
+
+ memset (waiting, 0, argc * sizeof *waiting);
+ memset (write_error, 0, argc * sizeof *write_error);
+
+#ifdef SIGPIPE
+ sa.sa_handler = SIG_IGN;
+ sigemptyset (&sa.sa_mask);
+ sa.sa_flags = 0;
+ xsigaction (SIGPIPE, &sa, &osa_sigpipe);
+#endif
+
+#ifdef SA_RESTART
+ /* We rely on getting EINTR from select. */
+ xsigaction (SIGCHLD, NULL, &sa);
+ sa.sa_flags &= ~SA_RESTART;
+ xsigaction (SIGCHLD, &sa, NULL);
+#endif
+
+ for (;;) {
+ fd_set rfds, wfds;
+ int maxfd = -1;
+ int ret;
+
+ /* If a source dies and all data from it has been written to
+ * all sinks, close the writing end of the pipe to each of
+ * its sinks.
+ */
+ for (i = 0; i < argc; ++i) {
+ if (!known_source[i] || pieces[i]->outfd != -1 ||
+ pipeline_peek_size (pieces[i]))
+ continue;
+ for (j = 0; j < argc; ++j) {
+ if (pieces[j]->source == pieces[i] &&
+ pieces[j]->infd != -1) {
+ if (close (pieces[j]->infd))
+ error (0, errno,
+ _("closing pipeline "
+ "input failed"));
+ pieces[j]->infd = -1;
+ }
+ }
+ }
+
+ /* If all sinks on a source have died, close the reading end
+ * of the pipe from that source.
+ */
+ for (i = 0; i < argc; ++i) {
+ int got_sink = 0;
+ if (!known_source[i] || pieces[i]->outfd == -1)
+ continue;
+ for (j = 0; j < argc; ++j) {
+ if (pieces[j]->source == pieces[i] &&
+ pieces[j]->infd != -1) {
+ got_sink = 1;
+ break;
+ }
+ }
+ if (got_sink)
+ continue;
+ if (close (pieces[i]->outfd))
+ error (0, errno,
+ _("closing pipeline output failed"));
+ pieces[i]->outfd = -1;
+ }
+
+ FD_ZERO (&rfds);
+ FD_ZERO (&wfds);
+ for (i = 0; i < argc; ++i) {
+ if (pieces[i]->source && pieces[i]->infd != -1 &&
+ !waiting[i]) {
+ FD_SET (pieces[i]->infd, &wfds);
+ if (pieces[i]->infd > maxfd)
+ maxfd = pieces[i]->infd;
+ }
+ if (known_source[i] && pieces[i]->outfd != -1) {
+ FD_SET (pieces[i]->outfd, &rfds);
+ if (pieces[i]->outfd > maxfd)
+ maxfd = pieces[i]->outfd;
+ }
+ }
+ if (maxfd == -1)
+ break; /* nothing meaningful left to do */
+
+ ret = select (maxfd + 1, &rfds, &wfds, NULL, NULL);
+ if (ret < 0 && errno == EINTR) {
+ /* Did a source or sink pipeline die? */
+ for (i = 0; i < argc; ++i) {
+ if (pieces[i]->ncommands == 0)
+ continue;
+ if (known_source[i] &&
+ pieces[i]->outfd != -1) {
+ int last = pieces[i]->ncommands - 1;
+ assert (pieces[i]->statuses);
+ if (pieces[i]->statuses[last] != -1) {
+ debug ("source pipeline %d "
+ "died\n", i);
+ close (pieces[i]->outfd);
+ pieces[i]->outfd = -1;
+ }
+ }
+ if (pieces[i]->source &&
+ pieces[i]->infd != -1) {
+ assert (pieces[i]->statuses);
+ if (pieces[i]->statuses[0] != -1) {
+ debug ("sink pipeline %d "
+ "died\n", i);
+ close (pieces[i]->infd);
+ pieces[i]->infd = -1;
+ }
+ }
+ }
+ continue;
+ } else if (ret < 0)
+ error (FATAL, errno, "select");
+
+ /* Read a block of data from each available source pipeline. */
+ for (i = 0; i < argc; ++i) {
+ size_t peek_size, len;
+
+ if (!known_source[i] || pieces[i]->outfd == -1)
+ continue;
+ if (!FD_ISSET (pieces[i]->outfd, &rfds))
+ continue;
+
+ peek_size = pipeline_peek_size (pieces[i]);
+ len = peek_size + 4096;
+ if (!pipeline_peek (pieces[i], &len) ||
+ len == peek_size) {
+ /* Error or end-of-file; skip this pipeline
+ * from now on.
+ */
+ debug ("source pipeline %d returned error "
+ "or EOF\n", i);
+ close (pieces[i]->outfd);
+ pieces[i]->outfd = -1;
+ } else
+ /* This is rather a large hammer. Whenever
+ * any data is read from any source
+ * pipeline, we go through and retry all
+ * sink pipelines, even if they aren't
+ * receiving data from the source in
+ * question. This probably results in a few
+ * more passes around the select() loop, but
+ * it eliminates some annoyingly fiddly
+ * bookkeeping.
+ */
+ memset (waiting, 0, argc * sizeof *waiting);
+ }
+
+ /* Write as much data as we can to each available sink
+ * pipeline.
+ */
+ for (i = 0; i < argc; ++i) {
+ const char *block;
+ size_t peek_size;
+ ssize_t w;
+ size_t minpos;
+
+ if (!pieces[i]->source || pieces[i]->infd == -1)
+ continue;
+ if (!FD_ISSET (pieces[i]->infd, &wfds))
+ continue;
+ peek_size = pipeline_peek_size (pieces[i]->source);
+ if (peek_size <= pos[i]) {
+ /* Disable reading until data is read from a
+ * source fd or a child process exits, so
+ * that we neither spin nor block if the
+ * source is slow.
+ */
+ waiting[i] = 1;
+ continue;
+ }
+
+ /* peek a block from the source */
+ block = pipeline_peek (pieces[i]->source, &peek_size);
+ /* should all already be in the peek cache */
+ assert (block);
+ assert (peek_size);
+
+ /* write as much of it as will fit to the sink */
+ for (;;) {
+ w = write (pieces[i]->infd, block + pos[i],
+ peek_size - pos[i]);
+ if (w >= 0 || errno == EAGAIN)
+ break;
+ if (errno == EINTR)
+ continue;
+ /* Failure to save a cat page shouldn't
+ * impede displaying the page in a pager, so
+ * we report errors later.
+ */
+ if (errno != EPIPE)
+ write_error[i] = errno;
+ close (pieces[i]->infd);
+ pieces[i]->infd = -1;
+ goto next_sink;
+ }
+ pos[i] += w;
+ minpos = pos[i];
+
+ /* check other sinks on the same source, and update
+ * the source's read position if earlier data is no
+ * longer needed by any sink
+ */
+ for (j = 0; j < argc; ++j) {
+ if (pieces[i]->source != pieces[j]->source ||
+ pieces[j]->infd == -1)
+ continue;
+ if (pos[j] < minpos)
+ minpos = pos[j];
+ /* If the source is dead and all data has
+ * been written to this sink, close the
+ * writing end of the pipe to the sink.
+ */
+ if (pieces[j]->source->outfd == -1 &&
+ pos[j] >= peek_size) {
+ close (pieces[j]->infd);
+ pieces[j]->infd = -1;
+ }
+ }
+
+ /* If some data has been written to all sinks,
+ * discard it from the source's peek cache.
+ */
+ pipeline_peek_skip (pieces[i]->source, minpos);
+ for (j = 0; j < argc; ++j) {
+ if (pieces[i]->source == pieces[j]->source)
+ pos[j] -= minpos;
+ }
+next_sink: ;
+ }
+ }
+
+#ifdef SA_RESTART
+ xsigaction (SIGCHLD, NULL, &sa);
+ sa.sa_flags |= SA_RESTART;
+ xsigaction (SIGCHLD, &sa, NULL);
+#endif
+
+#ifdef SIGPIPE
+ xsigaction (SIGPIPE, &osa_sigpipe, NULL);
+#endif
+
+ for (i = 0; i < argc; ++i) {
+ int flags;
+ if (blocking_in[i] && pieces[i]->infd != -1) {
+ flags = fcntl (pieces[i]->infd, F_GETFL);
+ fcntl (pieces[i]->infd, F_SETFL, flags & ~O_NONBLOCK);
+ }
+ if (blocking_out[i] && pieces[i]->outfd != -1) {
+ flags = fcntl (pieces[i]->outfd, F_GETFL);
+ fcntl (pieces[i]->outfd, F_SETFL, flags & ~O_NONBLOCK);
+ }
+ }
+
+ for (i = 0; i < argc; ++i) {
+ if (write_error[i])
+ error (FATAL, write_error[i], "write to sink %d", i);
+ }
+
+ free (write_error);
+ free (waiting);
+ free (blocking_out);
+ free (blocking_in);
+ free (pieces);
+ free (pos);
+}
+
+/* ---------------------------------------------------------------------- */
+
+/* Functions to read output from pipelines. */
+
+static const char *get_block (pipeline *p, size_t *len, int peek)
+{
+ size_t readstart = 0, retstart = 0;
+ size_t space = p->bufmax;
+ size_t toread = *len;
+ ssize_t r;
+
+ if (p->buffer && p->peek_offset) {
+ if (p->peek_offset >= toread) {
+ /* We've got the whole thing in the peek cache; just
+ * return it.
+ */
+ const char *buffer;
+ assert (p->peek_offset <= p->buflen);
+ buffer = p->buffer + p->buflen - p->peek_offset;
+ if (!peek)
+ p->peek_offset -= toread;
+ return buffer;
+ } else {
+ readstart = p->buflen;
+ retstart = p->buflen - p->peek_offset;
+ space -= p->buflen;
+ toread -= p->peek_offset;
+ }
+ }
+
+ if (toread > space) {
+ if (p->buffer)
+ p->bufmax = readstart + toread;
+ else
+ p->bufmax = toread;
+ p->buffer = realloc (p->buffer, p->bufmax + 1);
+ }
+
+ if (!peek)
+ p->peek_offset = 0;
+
+ assert (p->outfd != -1);
+ r = read (p->outfd, p->buffer + readstart, toread);
+ if (r == -1)
+ return NULL;
+ p->buflen = readstart + r;
+ if (peek)
+ p->peek_offset += r;
+ *len -= (toread - r);
+
+ return p->buffer + retstart;
+}
+
+const char *pipeline_read (pipeline *p, size_t *len)
+{
+ return get_block (p, len, 0);
+}
+
+const char *pipeline_peek (pipeline *p, size_t *len)
+{
+ return get_block (p, len, 1);
+}
+
+size_t pipeline_peek_size (pipeline *p)
+{
+ if (!p->buffer)
+ return 0;
+ return p->peek_offset;
+}
+
+void pipeline_peek_skip (pipeline *p, size_t len)
+{
+ if (len > 0) {
+ assert (p->buffer);
+ assert (len <= p->peek_offset);
+ p->peek_offset -= len;
+ }
+}
+
+/* readline and peekline repeatedly peek larger and larger buffers until
+ * they find a newline or they fail. readline then adjusts the peek offset.
+ */
+
+static const char *get_line (pipeline *p, size_t *outlen)
+{
+ const size_t block = 4096;
+ const char *buffer = NULL, *end = NULL;
+ int i;
+
+ if (p->line_cache) {
+ free (p->line_cache);
+ p->line_cache = NULL;
+ }
+
+ if (outlen)
+ *outlen = 0;
+
+ for (i = 0; ; ++i) {
+ size_t plen = block * (i + 1);
+
+ buffer = get_block (p, &plen, 1);
+ if (!buffer || plen == 0)
+ return NULL;
+
+ end = strchr (buffer + block * i, '\n');
+ if (!end && plen < block * (i + 1))
+ /* end of file, no newline found */
+ end = buffer + plen - 1;
+ if (end)
+ break;
+ }
+
+ if (end) {
+ p->line_cache = xstrndup (buffer, end - buffer + 1);
+ if (outlen)
+ *outlen = end - buffer + 1;
+ return p->line_cache;
+ } else
+ return NULL;
+}
+
+const char *pipeline_readline (pipeline *p)
+{
+ size_t buflen;
+ const char *buffer = get_line (p, &buflen);
+ if (buffer)
+ p->peek_offset -= buflen;
+ return buffer;
+}
+
+const char *pipeline_peekline (pipeline *p)
+{
+ return get_line (p, NULL);
+}
diff --git a/lib/pipeline.h b/lib/pipeline.h
index 123829ae..e3951856 100644
--- a/lib/pipeline.h
+++ b/lib/pipeline.h
@@ -1,6 +1,6 @@
/* Copyright (C) 1989, 1990, 1991, 1992, 2000, 2002
* Free Software Foundation, Inc.
- * Copyright (C) 2003 Colin Watson.
+ * Copyright (C) 2003, 2004, 2005, 2007 Colin Watson.
* Written for groff by James Clark (jjc@jclark.com)
* Adapted for man-db by Colin Watson.
*
@@ -28,13 +28,29 @@
#include <stdarg.h>
#include <sys/types.h>
+enum command_tag {
+ COMMAND_PROCESS,
+ COMMAND_FUNCTION
+};
+
+typedef void command_function_type (void *);
+
typedef struct command {
+ enum command_tag tag;
char *name;
- int argc;
- int argv_max; /* size of allocated array */
- char **argv;
int nice;
int discard_err; /* discard stderr? */
+ union {
+ struct command_process {
+ int argc;
+ int argv_max; /* size of allocated array */
+ char **argv;
+ } process;
+ struct command_function {
+ command_function_type *func;
+ void *data;
+ } function;
+ } u;
} command;
typedef struct pipeline {
@@ -49,17 +65,48 @@ typedef struct pipeline {
* whole pipeline. If negative, pipeline_start() will create pipes
* and store the input writing half and the output reading half in
* infd and outfd as appropriate. If zero, input and output will be
- * left as stdin and stdout.
+ * left as stdin and stdout unless want_infile or want_outfile
+ * respectively is set.
*/
int want_in, want_out;
- /* See above. The caller should consider these read-only. */
+ /* To be set (and freed) by the caller. If non-NULL, these contain
+ * files to open and use as the input and output of the whole
+ * pipeline. These are only used if want_in or want_out respectively
+ * is zero. The value of using these rather than simply opening the
+ * files before starting the pipeline is that the files will be
+ * opened with the same privileges under which the pipeline is being
+ * run.
+ */
+ const char *want_infile, *want_outfile;
+
+ /* See above. Default to -1. The caller should consider these
+ * read-only.
+ */
int infd, outfd;
/* Set by pipeline_get_infile() and pipeline_get_outfile()
- * respectively.
+ * respectively. Default to NULL.
*/
FILE *infile, *outfile;
+
+ /* Set by pipeline_connect() to record that this pipeline reads its
+ * input from another pipeline. Defaults to NULL.
+ */
+ struct pipeline *source;
+
+ /* Private buffer for use by read/peek functions. */
+ char *buffer;
+ size_t buflen, bufmax;
+
+ /* The last line returned by readline/peekline. Private. */
+ char *line_cache;
+
+ /* The amount of data at the end of buffer which has been
+ * read-ahead, either by an explicit peek or by readline/peekline
+ * reading a block at a time to save work. Private.
+ */
+ size_t peek_offset;
} pipeline;
/* ---------------------------------------------------------------------- */
@@ -83,6 +130,17 @@ command *command_new_args (const char *name, ...) ATTRIBUTE_SENTINEL;
*/
command *command_new_argstr (const char *argstr);
+/* Construct a new command that calls a given function rather than executing
+ * a process. The data argument is passed as the function's only argument.
+ * There is currently no provision for freeing data (TODO), so the caller
+ * should make sure that data's lifetime exceeds that of the command.
+ *
+ * command_* functions that deal with arguments cannot be used with the
+ * command returned by this function.
+ */
+command *command_new_function (const char *name,
+ command_function_type *func, void *data);
+
/* Return a duplicate of a command. */
command *command_dup (command *cmd);
@@ -103,6 +161,14 @@ void command_args (command *cmd, ...) ATTRIBUTE_SENTINEL;
*/
void command_argstr (command *cmd, const char *argstr);
+/* Dump a string representation of a command to stream. */
+void command_dump (command *cmd, FILE *stream);
+
+/* Return a string representation of a command. The caller should free the
+ * result.
+ */
+char *command_tostring (command *cmd);
+
/* Destroy a command. Safely does nothing on NULL. */
void command_free (command *cmd);
@@ -120,10 +186,26 @@ pipeline *pipeline_new_commandv (command *cmd1, va_list cmdv);
pipeline *pipeline_new_commands (command *cmd1, ...) ATTRIBUTE_SENTINEL;
/* Joins two pipelines, neither of which are allowed to be started. Discards
- * want_out and outfd from p1, and want_in and infd from p2.
+ * want_out, want_outfile, and outfd from p1, and want_in, want_infile, and
+ * infd from p2.
*/
pipeline *pipeline_join (pipeline *p1, pipeline *p2);
+/* Connect the input of one or more sink pipelines to the output of a source
+ * pipeline. The source pipeline may be started, but in that case want_out
+ * must be negative; otherwise, discards want_out from source. In any event,
+ * discards want_in from all sinks, none of which are allowed to be started.
+ * Terminate arguments with NULL.
+ *
+ * This is an application-level connection; data may be intercepted between
+ * the pipelines by the program before calling pipeline_pump(), which sets
+ * data flowing from the source to the sinks. It is primarily useful when
+ * more than one sink pipeline is involved, in which case the pipelines
+ * cannot simply be concatenated into one.
+ */
+void pipeline_connect (pipeline *source, pipeline *sink, ...)
+ ATTRIBUTE_SENTINEL;
+
/* Add a command to a pipeline. */
void pipeline_command (pipeline *p, command *cmd);
@@ -174,4 +256,49 @@ int pipeline_wait (pipeline *p);
*/
void pipeline_install_sigchld (void);
+/* Pump data among one or more pipelines connected using pipeline_connect()
+ * until all source pipelines have reached end-of-file and all data has been
+ * written to all sinks (or failed). All relevant pipelines must be
+ * supplied: that is, no pipeline that has been connected to a source
+ * pipeline may be supplied unless that source pipeline is also supplied.
+ * Automatically starts all pipelines if they are not already started, but
+ * does not wait for them.
+ */
+void pipeline_pump (pipeline *p, ...) ATTRIBUTE_SENTINEL;
+
+/* ---------------------------------------------------------------------- */
+
+/* Functions to read output from pipelines. */
+
+/* Read len bytes of data from the pipeline, returning the data block. len
+ * is updated with the number of bytes read.
+ */
+const char *pipeline_read (pipeline *p, size_t *len);
+
+/* Look ahead in the pipeline's output for len bytes of data, returning the
+ * data block. len is updated with the number of bytes read. The starting
+ * position of the next read or peek is not affected by this call.
+ */
+const char *pipeline_peek (pipeline *p, size_t *len);
+
+/* Return the number of bytes of data that can be read using pipeline_read
+ * or pipeline_peek solely from the peek cache, without having to read from
+ * the pipeline itself (and thus potentially block).
+ */
+size_t pipeline_peek_size (pipeline *p);
+
+/* Skip over and discard len bytes of data from the peek cache. Asserts that
+ * enough data is available to skip, so you may want to check using
+ * pipeline_peek_size first.
+ */
+void pipeline_peek_skip (pipeline *p, size_t len);
+
+/* Read a line of data from the pipeline, returning it. */
+const char *pipeline_readline (pipeline *p);
+
+/* Look ahead in the pipeline's output for a line of data, returning it. The
+ * starting position of the next read or peek is not affected by this call.
+ */
+const char *pipeline_peekline (pipeline *p);
+
#endif /* PIPELINE_H */