diff options
Diffstat (limited to 'src/send.cc')
-rw-r--r-- | src/send.cc | 377 |
1 files changed, 255 insertions, 122 deletions
diff --git a/src/send.cc b/src/send.cc index 1b854fc..647ffc7 100644 --- a/src/send.cc +++ b/src/send.cc @@ -1,5 +1,5 @@ // nullmailer -- a simple relay-only MTA -// Copyright (C) 2012 Bruce Guenter <bruce@untroubled.org> +// Copyright (C) 2016 Bruce Guenter <bruce@untroubled.org> // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -24,6 +24,7 @@ #include <dirent.h> #include <errno.h> #include <signal.h> +#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/stat.h> @@ -31,23 +32,44 @@ #include <sys/wait.h> #include <unistd.h> #include "ac/time.h" +#include "argparse.h" +#include "autoclose.h" #include "configio.h" #include "defines.h" #include "errcodes.h" #include "fdbuf/fdbuf.h" +#include "forkexec.h" #include "hostname.h" #include "itoa.h" #include "list.h" #include "selfpipe.h" #include "setenv.h" +const char* cli_program = "nullmailer-send"; + selfpipe selfpipe; +typedef enum { tempfail=-1, permfail=0, success=1 } tristate; + +struct message +{ + time_t timestamp; + mystring filename; +}; + typedef list<mystring> slist; +typedef list<struct message> msglist; -#define fail(MSG) do { fout << MSG << endl; return false; } while(0) -#define fail2(MSG1,MSG2) do{ fout << MSG1 << MSG2 << endl; return false; }while(0) -#define fail_sys(MSG) do{ fout << MSG << strerror(errno) << endl; return false; }while(0) +#define msg1(MSG) do{ fout << MSG << endl; }while(0) +#define msg2(MSG1,MSG2) do{ fout << MSG1 << MSG2 << endl; }while(0) +#define msg1sys(MSG) do{ fout << MSG << strerror(errno) << endl; }while(0) +#define fail(MSG) do { msg1(MSG); return false; } while(0) +#define fail2(MSG1,MSG2) do{ msg2(MSG1,MSG2); return false; }while(0) +#define fail1sys(MSG) do{ msg1sys(MSG); return false; }while(0) +#define tempfail1sys(MSG) do{ msg1sys(MSG); return tempfail; }while(0) + +static mystring trigger_path; +static mystring msg_dir; struct remote { @@ -55,7 +77,8 @@ struct remote mystring host; mystring proto; - slist options; + mystring program; + mystring options; remote(const slist& list); ~remote(); }; @@ -66,76 +89,62 @@ remote::remote(const slist& lst) { slist::const_iter iter = lst; host = *iter; + options = "host=" + host + "\n"; ++iter; if(!iter) proto = default_proto; else { proto = *iter; - for(++iter; iter; ++iter) - options.append(*iter); + for(++iter; iter; ++iter) { + mystring option = *iter; + // Strip prefix "--" + if (option[0] == '-' && option[1] == '-') + option = option.right(2); + options += option; + options += '\n'; + } } + options += '\n'; + program = CONFIG_PATH(PROTOCOLS, NULL, proto.c_str()); } remote::~remote() { } typedef list<remote> rlist; -unsigned ws_split(const mystring& str, slist& lst) -{ - lst.empty(); - const char* ptr = str.c_str(); - const char* end = ptr + str.length(); - unsigned count = 0; - for(;;) { - while(ptr < end && isspace(*ptr)) - ++ptr; - const char* start = ptr; - while(ptr < end && !isspace(*ptr)) - ++ptr; - if(ptr == start) - break; - lst.append(mystring(start, ptr-start)); - ++count; - } - return count; -} - static rlist remotes; static int minpause = 60; static int pausetime = minpause; static int maxpause = 24*60*60; static int sendtimeout = 60*60; +static int queuelifetime = 7*24*60*60; bool load_remotes() { slist rtmp; - if(!config_readlist("remotes", rtmp) || - rtmp.count() == 0) - return false; + config_readlist("remotes", rtmp); remotes.empty(); for(slist::const_iter r(rtmp); r; r++) { if((*r)[0] == '#') continue; - slist parts; - if(!ws_split(*r, parts)) + arglist parts; + if (!parse_args(parts, *r)) continue; remotes.append(remote(parts)); } - return remotes.count() > 0; + if (remotes.count() == 0) + fail("No remote hosts listed for delivery"); + return true; } bool load_config() { mystring hh; - bool result = true; if (!config_read("helohost", hh)) hh = me; setenv("HELOHOST", hh.c_str(), 1); - if(!load_remotes()) - result = false; - int oldminpause = minpause; if(!config_readint("pausetime", minpause)) minpause = 60; @@ -143,69 +152,61 @@ bool load_config() maxpause = 24*60*60; if(!config_readint("sendtimeout", sendtimeout)) sendtimeout = 60*60; + if(!config_readint("queuelifetime", queuelifetime)) + queuelifetime = 7*24*60*60; if (minpause != oldminpause) pausetime = minpause; - return result; + return load_remotes(); } -static slist files; -static bool reload_files = false; +static msglist messages; +static bool reload_messages = false; void catch_alrm(int) { signal(SIGALRM, catch_alrm); - reload_files = true; + reload_messages = true; } -bool load_files() +bool load_messages() { - reload_files = false; + reload_messages = false; fout << "Rescanning queue." << endl; DIR* dir = opendir("."); if(!dir) - fail_sys("Cannot open queue directory: "); - files.empty(); + fail1sys("Cannot open queue directory: "); + messages.empty(); struct dirent* entry; while((entry = readdir(dir)) != 0) { const char* name = entry->d_name; - if(name[0] == '.') + if (name[0] == '.') + continue; + struct stat st; + if (stat(name, &st) < 0) { + fout << "Could not stat " << name << ", skipping." << endl; continue; - files.append(name); + } + struct message m = { st.st_mtime, name }; + messages.append(m); } closedir(dir); return true; } -void exec_protocol(int fd, remote& remote) +tristate catchsender(fork_exec& fp) { - if(close(0) == -1 || dup2(fd, 0) == -1 || close(fd) == -1) - return; - mystring program = PROTOCOL_DIR + remote.proto; - const char* args[3+remote.options.count()]; - unsigned i = 0; - args[i++] = program.c_str(); - for(slist::const_iter opt(remote.options); opt; opt++) - args[i++] = strdup((*opt).c_str()); - args[i++] = remote.host.c_str(); - args[i++] = 0; - execv(args[0], (char**)args); -} - -bool catchsender(pid_t pid) -{ - int status; - for (;;) { switch (selfpipe.waitsig(sendtimeout)) { case 0: // timeout - kill(pid, SIGTERM); - waitpid(pid, &status, 0); + fout << "Sending timed out, killing protocol" << endl; + fp.kill(SIGTERM); selfpipe.waitsig(); // catch the signal from killing the child - fail("Sending timed out, killing protocol"); + return tempfail; case -1: - fail_sys("Error waiting for the child signal: "); + msg1sys("Error waiting for the child signal: "); + return tempfail; case SIGCHLD: break; default: @@ -214,72 +215,201 @@ bool catchsender(pid_t pid) break; } - if(waitpid(pid, &status, 0) == -1) - fail_sys("Error catching the child process return value: "); + int status = fp.wait_status(); + if(status < 0) { + fout << "Error catching the child process return value: " + << strerror(errno) << endl; + return tempfail; + } else { if(WIFEXITED(status)) { status = WEXITSTATUS(status); - if(status) - fail2("Sending failed: ", errorstr(status)); + if(status) { + fout << "Sending failed: " << errorstr(status) << endl; + return (status & ERR_PERMANENT_FLAG) ? permfail : tempfail; + } else { fout << "Sent file." << endl; - return true; + return success; } } - else - fail("Sending process crashed or was killed."); + else { + fout << "Sending process crashed or was killed." << endl; + return tempfail; + } } } -bool send_one(mystring filename, remote& remote) +bool log_msg(mystring& filename, remote& remote, int fd) { - int fd = open(filename.c_str(), O_RDONLY); - if(fd == -1) { + fout << "Starting delivery:" + << " host: " << remote.host + << " protocol: " << remote.proto + << " file: " << filename << endl; + fdibuf in(fd); + mystring line; + mystring msg; + if (in.getline(line, '\n')) { + msg = "From: <"; + msg += line; + msg += '>'; + bool has_to = false; + while (in.getline(line, '\n')) { + if (!line) + break; + msg += has_to ? ", " : " to: "; + has_to = true; + msg += '<'; + msg += line; + msg += '>'; + } + fout << msg << endl; + while (in.getline(line, '\n')) { + if (!line) + break; + if (line.left(11).lower() == "message-id:") + fout << line << endl; + } + lseek(fd, 0, SEEK_SET); + return true; + } + fout << endl << "Can't read message" << endl; + return false; +} + +static bool copy_output(int fd, mystring& output) +{ + output = ""; + char buf[256]; + ssize_t rd; + while ((rd = read(fd, buf, sizeof buf)) > 0) + output += mystring(buf, rd); + return rd == 0; +} + +tristate send_one(mystring filename, remote& remote, mystring& output) +{ + autoclose fd = open(filename.c_str(), O_RDONLY); + if(fd < 0) { fout << "Can't open file '" << filename << "'" << endl; + return tempfail; + } + log_msg(filename, remote, fd); + + fork_exec fp(remote.proto.c_str()); + int redirs[] = { REDIRECT_PIPE_TO, REDIRECT_PIPE_FROM, REDIRECT_NONE, fd }; + if (!fp.start(remote.program.c_str(), 4, redirs)) + return tempfail; + + if (write(redirs[0], remote.options.c_str(), remote.options.length()) != (ssize_t)remote.options.length()) + fout << "Warning: Writing options to protocol failed" << endl; + close(redirs[0]); + + tristate result = catchsender(fp); + if (!copy_output(redirs[1], output)) + fout << "Warning: Could not read output from protocol" << endl; + close(redirs[1]); + return result; +} + +static void parse_output(const mystring& output, const remote& remote, mystring& status, mystring& diag) +{ + diag = remote.proto.upper(); + diag += "; "; + diag += output.strip(); + diag.subst('\n', '/'); + status = "5.0.0"; + for (unsigned i = 0; i < output.length()-5; i++) + if (isdigit(output[i]) + && output[i+1] == '.' + && isdigit(output[i+2]) + && output[i+3] == '.' + && isdigit(output[i+4])) { + status = output.sub(i, 5); + break; + } +} + +bool bounce_msg(const message& msg, const remote& remote, const mystring& output) +{ + mystring failed = "../failed/"; + failed += msg.filename; + fout << "Moving message " << msg.filename << " into failed" << endl; + if (rename(msg.filename.c_str(), failed.c_str()) == -1) { + fout << "Can't rename file: " << strerror(errno) << endl; return false; } - fout << "Starting delivery: protocol: " << remote.proto - << " host: " << remote.host - << " file: " << filename << endl; - pid_t pid = fork(); - switch(pid) { - case -1: - fail_sys("Fork failed: "); - case 0: - exec_protocol(fd, remote); - exit(ERR_EXEC_FAILED); - default: - close(fd); - if(!catchsender(pid)) - return false; - if(unlink(filename.c_str()) == -1) - fail_sys("Can't unlink file: "); + autoclose fd = open(failed.c_str(), O_RDONLY); + if (fd < 0) + fout << "Can't open file '" << failed << "' to create bounce message" << endl; + else { + fout << "Generating bounce for '" << msg.filename << "'" << endl; + queue_pipe qp; + autoclose pfd = qp.start(); + if (pfd > 0) { + mystring program = program_path("nullmailer-dsn"); + fork_exec dsn("nullmailer-dsn"); + int redirs[] = { fd, pfd }; + mystring status_code, diag_code; + parse_output(output, remote, status_code, diag_code); + const char* args[] = { program.c_str(), + "--last-attempt", itoa(time(NULL)), + "--remote", remote.host.c_str(), + "--diagnostic-code", diag_code.c_str(), + status_code.c_str(), NULL }; + dsn.start(args, 2, redirs); + // Everything else cleans up itself + } } return true; } -bool send_all() +void send_all() { - if(!load_config()) - fail("Could not load the config"); - if(remotes.count() <= 0) - fail("No remote hosts listed for delivery"); - if(files.count() == 0) - return true; + if(!load_config()) { + fout << "Could not load the config" << endl; + return; + } + if(remotes.count() <= 0) { + fout << "No remote hosts listed for delivery"; + return; + } + if(messages.count() == 0) + return; fout << "Starting delivery, " - << itoa(files.count()) << " message(s) in queue." << endl; + << itoa(messages.count()) << " message(s) in queue." << endl; + mystring output; for(rlist::iter remote(remotes); remote; remote++) { - slist::iter file(files); - while(file) { - if(send_one(*file, *remote)) - files.remove(file); - else - file++; + msglist::iter msg(messages); + while(msg) { + switch (send_one((*msg).filename, *remote, output)) { + case tempfail: + if (time(0) - (*msg).timestamp > queuelifetime) { + if (bounce_msg(*msg, *remote, output)) { + messages.remove(msg); + continue; + } + } + msg++; + break; + case permfail: + if (bounce_msg(*msg, *remote, output)) + messages.remove(msg); + else + msg++; + break; + default: + if(unlink((*msg).filename.c_str()) == -1) { + fout << "Can't unlink file: " << strerror(errno) << endl; + msg++; + } + else + messages.remove(msg); + } } } fout << "Delivery complete, " - << itoa(files.count()) << " message(s) remain." << endl; - return true; + << itoa(messages.count()) << " message(s) remain." << endl; } static int trigger; @@ -289,12 +419,12 @@ static int trigger2; bool open_trigger() { - trigger = open(QUEUE_TRIGGER, O_RDONLY|O_NONBLOCK); + trigger = open(trigger_path.c_str(), O_RDONLY|O_NONBLOCK); #ifdef NAMEDPIPEBUG - trigger2 = open(QUEUE_TRIGGER, O_WRONLY|O_NONBLOCK); + trigger2 = open(trigger_path.c_str(), O_WRONLY|O_NONBLOCK); #endif if(trigger == -1) - fail_sys("Could not open trigger file: "); + fail1sys("Could not open trigger file: "); return true; } @@ -318,7 +448,7 @@ bool do_select() FD_SET(trigger, &readfds); struct timeval timeout; - if (files.count() == 0) + if (messages.count() == 0) pausetime = maxpause; timeout.tv_sec = pausetime; timeout.tv_usec = 0; @@ -331,20 +461,23 @@ bool do_select() if(s == 1) { fout << "Trigger pulled." << endl; read_trigger(); - reload_files = true; + reload_messages = true; pausetime = minpause; } else if(s == -1 && errno != EINTR) - fail_sys("Internal error in select: "); + fail1sys("Internal error in select: "); else if(s == 0) - reload_files = true; - if(reload_files) - load_files(); + reload_messages = true; + if(reload_messages) + load_messages(); return true; } int main(int, char*[]) { + trigger_path = CONFIG_PATH(QUEUE, NULL, "trigger"); + msg_dir = CONFIG_PATH(QUEUE, NULL, "queue"); + read_hostnames(); if(!selfpipe) { @@ -355,7 +488,7 @@ int main(int, char*[]) if(!open_trigger()) return 1; - if(chdir(QUEUE_MSG_DIR) == -1) { + if(chdir(msg_dir.c_str()) == -1) { fout << "Could not chdir to queue message directory." << endl; return 1; } @@ -363,7 +496,7 @@ int main(int, char*[]) signal(SIGALRM, catch_alrm); signal(SIGHUP, SIG_IGN); load_config(); - load_files(); + load_messages(); for(;;) { send_all(); if (minpause == 0) break; |