summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2017-12-15 22:24:52 +0100
committerSven Eden <yamakuzure@gmx.net>2018-05-30 07:49:55 +0200
commit004bb7654fb0ca644e70ae22221adfecdcb4a8ed (patch)
tree7163c295455c55b82b02e6d1c372a8f3423f9a18 /src
parent5295adaf6b864f44b0f6ad8ec5aeda540d9c236e (diff)
sd-bus: optionally, use inotify to wait for bus sockets to appear
This adds a "watch-bind" feature to sd-bus connections. If set and the AF_UNIX socket we are connecting to doesn't exist yet, we'll establish an inotify watch instead, and wait for the socket to appear. In other words, a missing AF_UNIX just makes connecting slower. This is useful for daemons such as networkd or resolved that shall be able to run during early-boot, before dbus-daemon is up, and want to connect to dbus-daemon as soon as it becomes ready.
Diffstat (limited to 'src')
-rw-r--r--src/libelogind/libelogind.sym6
-rw-r--r--src/libelogind/sd-bus/bus-internal.h13
-rw-r--r--src/libelogind/sd-bus/bus-socket.c279
-rw-r--r--src/libelogind/sd-bus/bus-socket.h1
-rw-r--r--src/libelogind/sd-bus/sd-bus.c339
-rw-r--r--src/libelogind/sd-bus/test-bus-watch-bind.c239
-rw-r--r--src/systemd/sd-bus.h4
-rw-r--r--src/test/meson.build6
8 files changed, 762 insertions, 125 deletions
diff --git a/src/libelogind/libelogind.sym b/src/libelogind/libelogind.sym
index ddf51760b..5aa253697 100644
--- a/src/libelogind/libelogind.sym
+++ b/src/libelogind/libelogind.sym
@@ -536,3 +536,9 @@ global:
sd_bus_message_new;
sd_bus_message_seal;
} LIBSYSTEMD_234;
+
+LIBSYSTEMD_237 {
+global:
+ sd_bus_set_watch_bind;
+ sd_bus_get_watch_bind;
+} LIBSYSTEMD_236;
diff --git a/src/libelogind/sd-bus/bus-internal.h b/src/libelogind/sd-bus/bus-internal.h
index 8557d67e1..ab84dde5f 100644
--- a/src/libelogind/sd-bus/bus-internal.h
+++ b/src/libelogind/sd-bus/bus-internal.h
@@ -157,6 +157,7 @@ struct sd_bus_slot {
enum bus_state {
BUS_UNSET,
+ BUS_WATCH_BIND, /* waiting for the socket to appear via inotify */
BUS_OPENING,
BUS_AUTHENTICATING,
BUS_HELLO,
@@ -188,6 +189,7 @@ struct sd_bus {
enum bus_state state;
int input_fd, output_fd;
+ int inotify_fd;
int message_version;
int message_endian;
@@ -210,6 +212,7 @@ struct sd_bus {
bool exited:1;
bool exit_triggered:1;
bool is_local:1;
+ bool watch_bind:1;
int use_memfd;
@@ -293,6 +296,7 @@ struct sd_bus {
sd_event_source *output_io_event_source;
sd_event_source *time_event_source;
sd_event_source *quit_event_source;
+ sd_event_source *inotify_event_source;
sd_event *event;
int event_priority;
@@ -312,6 +316,9 @@ struct sd_bus {
LIST_HEAD(sd_bus_slot, slots);
LIST_HEAD(sd_bus_track, tracks);
+
+ int *inotify_watches;
+ size_t n_inotify_watches;
};
/* For method calls we time-out at 25s, like in the D-Bus reference implementation */
@@ -369,6 +376,12 @@ bool bus_pid_changed(sd_bus *bus);
char *bus_address_escape(const char *v);
+int bus_attach_io_events(sd_bus *b);
+int bus_attach_inotify_event(sd_bus *b);
+
+void bus_close_inotify_fd(sd_bus *b);
+void bus_close_io_fds(sd_bus *b);
+
#define OBJECT_PATH_FOREACH_PREFIX(prefix, path) \
for (char *_slash = ({ strcpy((prefix), (path)); streq((prefix), "/") ? NULL : strrchr((prefix), '/'); }) ; \
_slash && !(_slash[(_slash) == (prefix)] = 0); \
diff --git a/src/libelogind/sd-bus/bus-socket.c b/src/libelogind/sd-bus/bus-socket.c
index 5034d5147..74d90ecdf 100644
--- a/src/libelogind/sd-bus/bus-socket.c
+++ b/src/libelogind/sd-bus/bus-socket.c
@@ -32,9 +32,12 @@
#include "bus-socket.h"
#include "fd-util.h"
#include "format-util.h"
+//#include "fs-util.h"
#include "hexdecoct.h"
+//#include "io-util.h"
#include "macro.h"
#include "missing.h"
+//#include "path-util.h"
#include "selinux-util.h"
#include "signal-util.h"
#include "stdio-util.h"
@@ -688,30 +691,249 @@ int bus_socket_start_auth(sd_bus *b) {
return bus_socket_start_auth_client(b);
}
+static int bus_socket_inotify_setup(sd_bus *b) {
+ _cleanup_free_ int *new_watches = NULL;
+ _cleanup_free_ char *absolute = NULL;
+ size_t n_allocated = 0, n = 0, done = 0, i;
+ unsigned max_follow = 32;
+ const char *p;
+ int wd, r;
+
+ assert(b);
+ assert(b->watch_bind);
+ assert(b->sockaddr.sa.sa_family == AF_UNIX);
+ assert(b->sockaddr.un.sun_path[0] != 0);
+
+ /* Sets up an inotify fd in case watch_bind is enabled: wait until the configured AF_UNIX file system socket
+ * appears before connecting to it. The implemented is pretty simplistic: we just subscribe to relevant changes
+ * to all prefix components of the path, and every time we get an event for that we try to reconnect again,
+ * without actually caring what precisely the event we got told us. If we still can't connect we re-subscribe
+ * to all relevant changes of anything in the path, so that our watches include any possibly newly created path
+ * components. */
+
+ if (b->inotify_fd < 0) {
+ b->inotify_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC);
+ if (b->inotify_fd < 0)
+ return -errno;
+ }
+
+ /* Make sure the path is NUL terminated */
+ p = strndupa(b->sockaddr.un.sun_path, sizeof(b->sockaddr.un.sun_path));
+
+ /* Make sure the path is absolute */
+ r = path_make_absolute_cwd(p, &absolute);
+ if (r < 0)
+ goto fail;
+
+ /* Watch all parent directories, and don't mind any prefix that doesn't exist yet. For the innermost directory
+ * that exists we want to know when files are created or moved into it. For all parents of it we just care if
+ * they are removed or renamed. */
+
+ if (!GREEDY_REALLOC(new_watches, n_allocated, n + 1)) {
+ r = -ENOMEM;
+ goto fail;
+ }
+
+ /* Start with the top-level directory, which is a bit simpler than the rest, since it can't be a symlink, and
+ * always exists */
+ wd = inotify_add_watch(b->inotify_fd, "/", IN_CREATE|IN_MOVED_TO);
+ if (wd < 0) {
+ r = log_debug_errno(errno, "Failed to add inotify watch on /: %m");
+ goto fail;
+ } else
+ new_watches[n++] = wd;
+
+ for (;;) {
+ _cleanup_free_ char *component = NULL, *prefix = NULL, *destination = NULL;
+ size_t n_slashes, n_component;
+ char *c = NULL;
+
+ n_slashes = strspn(absolute + done, "/");
+ n_component = n_slashes + strcspn(absolute + done + n_slashes, "/");
+
+ if (n_component == 0) /* The end */
+ break;
+
+ component = strndup(absolute + done, n_component);
+ if (!component) {
+ r = -ENOMEM;
+ goto fail;
+ }
+
+ /* A trailing slash? That's a directory, and not a socket then */
+ if (path_equal(component, "/")) {
+ r = -EISDIR;
+ goto fail;
+ }
+
+ /* A single dot? Let's eat this up */
+ if (path_equal(component, "/.")) {
+ done += n_component;
+ continue;
+ }
+
+ prefix = strndup(absolute, done + n_component);
+ if (!prefix) {
+ r = -ENOMEM;
+ goto fail;
+ }
+
+ if (!GREEDY_REALLOC(new_watches, n_allocated, n + 1)) {
+ r = -ENOMEM;
+ goto fail;
+ }
+
+ wd = inotify_add_watch(b->inotify_fd, prefix, IN_DELETE_SELF|IN_MOVE_SELF|IN_ATTRIB|IN_CREATE|IN_MOVED_TO|IN_DONT_FOLLOW);
+ log_debug("Added inotify watch for %s on bus %s: %i", prefix, strna(b->description), wd);
+
+ if (wd < 0) {
+ if (IN_SET(errno, ENOENT, ELOOP))
+ break; /* This component doesn't exist yet, or the path contains a cyclic symlink right now */
+
+ r = log_debug_errno(errno, "Failed to add inotify watch on %s: %m", isempty(prefix) ? "/" : prefix);
+ goto fail;
+ } else
+ new_watches[n++] = wd;
+
+ /* Check if this is possibly a symlink. If so, let's follow it and watch it too. */
+ r = readlink_malloc(prefix, &destination);
+ if (r == -EINVAL) { /* not a symlink */
+ done += n_component;
+ continue;
+ }
+ if (r < 0)
+ goto fail;
+
+ if (isempty(destination)) { /* Empty symlink target? Yuck! */
+ r = -EINVAL;
+ goto fail;
+ }
+
+ if (max_follow <= 0) { /* Let's make sure we don't follow symlinks forever */
+ r = -ELOOP;
+ goto fail;
+ }
+
+ if (path_is_absolute(destination)) {
+ /* For absolute symlinks we build the new path and start anew */
+ c = strjoin(destination, absolute + done + n_component);
+ done = 0;
+ } else {
+ _cleanup_free_ char *t = NULL;
+
+ /* For relative symlinks we replace the last component, and try again */
+ t = strndup(absolute, done);
+ if (!t)
+ return -ENOMEM;
+
+ c = strjoin(t, "/", destination, absolute + done + n_component);
+ }
+ if (!c) {
+ r = -ENOMEM;
+ goto fail;
+ }
+
+ free(absolute);
+ absolute = c;
+
+ max_follow--;
+ }
+
+ /* And now, let's remove all watches from the previous iteration we don't need anymore */
+ for (i = 0; i < b->n_inotify_watches; i++) {
+ bool found = false;
+ size_t j;
+
+ for (j = 0; j < n; j++)
+ if (new_watches[j] == b->inotify_watches[i]) {
+ found = true;
+ break;
+ }
+
+ if (found)
+ continue;
+
+ (void) inotify_rm_watch(b->inotify_fd, b->inotify_watches[i]);
+ }
+
+ free_and_replace(b->inotify_watches, new_watches);
+ b->n_inotify_watches = n;
+
+ return 0;
+
+fail:
+ bus_close_inotify_fd(b);
+ return r;
+}
+
int bus_socket_connect(sd_bus *b) {
+ bool inotify_done = false;
int r;
assert(b);
- assert(b->input_fd < 0);
- assert(b->output_fd < 0);
- assert(b->sockaddr.sa.sa_family != AF_UNSPEC);
- b->input_fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
- if (b->input_fd < 0)
- return -errno;
+ for (;;) {
+ assert(b->input_fd < 0);
+ assert(b->output_fd < 0);
+ assert(b->sockaddr.sa.sa_family != AF_UNSPEC);
- b->output_fd = b->input_fd;
+ b->input_fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
+ if (b->input_fd < 0)
+ return -errno;
- bus_socket_setup(b);
+ b->output_fd = b->input_fd;
+ bus_socket_setup(b);
- r = connect(b->input_fd, &b->sockaddr.sa, b->sockaddr_size);
- if (r < 0) {
- if (errno == EINPROGRESS)
- return 1;
+ if (connect(b->input_fd, &b->sockaddr.sa, b->sockaddr_size) < 0) {
+ if (errno == EINPROGRESS) {
- return -errno;
+ /* If we have any inotify watches open, close them now, we don't need them anymore, as
+ * we have successfully initiated a connection */
+ bus_close_inotify_fd(b);
+
+ /* Note that very likely we are already in BUS_OPENING state here, as we enter it when
+ * we start parsing the address string. The only reason we set the state explicitly
+ * here, is to undo BUS_WATCH_BIND, in case we did the inotify magic. */
+ b->state = BUS_OPENING;
+ return 1;
+ }
+
+ if (IN_SET(errno, ENOENT, ECONNREFUSED) && /* ENOENT → unix socket doesn't exist at all; ECONNREFUSED → unix socket stale */
+ b->watch_bind &&
+ b->sockaddr.sa.sa_family == AF_UNIX &&
+ b->sockaddr.un.sun_path[0] != 0) {
+
+ /* This connection attempt failed, let's release the socket for now, and start with a
+ * fresh one when reconnecting. */
+ bus_close_io_fds(b);
+
+ if (inotify_done) {
+ /* inotify set up already, don't do it again, just return now, and remember
+ * that we are waiting for inotify events now. */
+ b->state = BUS_WATCH_BIND;
+ return 1;
+ }
+
+ /* This is a file system socket, and the inotify logic is enabled. Let's create the necessary inotify fd. */
+ r = bus_socket_inotify_setup(b);
+ if (r < 0)
+ return r;
+
+ /* Let's now try to connect a second time, because in theory there's otherwise a race
+ * here: the socket might have been created in the time between our first connect() and
+ * the time we set up the inotify logic. But let's remember that we set up inotify now,
+ * so that we don't do the connect() more than twice. */
+ inotify_done = true;
+
+ } else
+ return -errno;
+ } else
+ break;
}
+ /* Yay, established, we don't need no inotify anymore! */
+ bus_close_inotify_fd(b);
+
return bus_socket_start_auth(b);
}
@@ -1069,3 +1291,34 @@ int bus_socket_process_authenticating(sd_bus *b) {
return bus_socket_read_auth(b);
}
+
+int bus_socket_process_watch_bind(sd_bus *b) {
+ int r, q;
+
+ assert(b);
+ assert(b->state == BUS_WATCH_BIND);
+ assert(b->inotify_fd >= 0);
+
+ r = flush_fd(b->inotify_fd);
+ if (r <= 0)
+ return r;
+
+ log_debug("Got inotify event on bus %s.", strna(b->description));
+
+ /* We flushed events out of the inotify fd. In that case, maybe the socket is valid now? Let's try to connect
+ * to it again */
+
+ r = bus_socket_connect(b);
+ if (r < 0)
+ return r;
+
+ q = bus_attach_io_events(b);
+ if (q < 0)
+ return q;
+
+ q = bus_attach_inotify_event(b);
+ if (q < 0)
+ return q;
+
+ return r;
+}
diff --git a/src/libelogind/sd-bus/bus-socket.h b/src/libelogind/sd-bus/bus-socket.h
index 915a283f5..c180562f9 100644
--- a/src/libelogind/sd-bus/bus-socket.h
+++ b/src/libelogind/sd-bus/bus-socket.h
@@ -34,5 +34,6 @@ int bus_socket_read_message(sd_bus *bus);
int bus_socket_process_opening(sd_bus *b);
int bus_socket_process_authenticating(sd_bus *b);
+int bus_socket_process_watch_bind(sd_bus *b);
bool bus_socket_auth_needs_write(sd_bus *b);
diff --git a/src/libelogind/sd-bus/sd-bus.c b/src/libelogind/sd-bus/sd-bus.c
index c71b0fa22..380881972 100644
--- a/src/libelogind/sd-bus/sd-bus.c
+++ b/src/libelogind/sd-bus/sd-bus.c
@@ -75,8 +75,8 @@
} while (false)
static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
-static int attach_io_events(sd_bus *b);
-static void detach_io_events(sd_bus *b);
+static void bus_detach_io_events(sd_bus *b);
+static void bus_detach_inotify_event(sd_bus *b);
static thread_local sd_bus *default_system_bus = NULL;
#if 0 /// UNNEEDED by elogind
@@ -84,16 +84,26 @@ static thread_local sd_bus *default_user_bus = NULL;
#endif // 0
static thread_local sd_bus *default_starter_bus = NULL;
-static void bus_close_fds(sd_bus *b) {
+void bus_close_io_fds(sd_bus *b) {
assert(b);
- detach_io_events(b);
+ bus_detach_io_events(b);
if (b->input_fd != b->output_fd)
safe_close(b->output_fd);
b->output_fd = b->input_fd = safe_close(b->input_fd);
}
+void bus_close_inotify_fd(sd_bus *b) {
+ assert(b);
+
+ bus_detach_inotify_event(b);
+
+ b->inotify_fd = safe_close(b->inotify_fd);
+ b->inotify_watches = mfree(b->inotify_watches);
+ b->n_inotify_watches = 0;
+}
+
static void bus_reset_queues(sd_bus *b) {
assert(b);
@@ -137,7 +147,8 @@ static void bus_free(sd_bus *b) {
if (b->default_bus_ptr)
*b->default_bus_ptr = NULL;
- bus_close_fds(b);
+ bus_close_io_fds(b);
+ bus_close_inotify_fd(b);
free(b->label);
free(b->groups);
@@ -187,6 +198,7 @@ _public_ int sd_bus_new(sd_bus **ret) {
r->n_ref = REFCNT_INIT;
r->input_fd = r->output_fd = -1;
+ r->inotify_fd = -1;
r->message_version = 1;
r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
@@ -384,6 +396,22 @@ _public_ int sd_bus_get_allow_interactive_authorization(sd_bus *bus) {
return bus->allow_interactive_authorization;
}
+_public_ int sd_bus_set_watch_bind(sd_bus *bus, int b) {
+ assert_return(bus, -EINVAL);
+ assert_return(bus->state == BUS_UNSET, -EPERM);
+ assert_return(!bus_pid_changed(bus), -ECHILD);
+
+ bus->watch_bind = b;
+ return 0;
+}
+
+_public_ int sd_bus_get_watch_bind(sd_bus *bus) {
+ assert_return(bus, -EINVAL);
+ assert_return(!bus_pid_changed(bus), -ECHILD);
+
+ return bus->watch_bind;
+}
+
static int hello_callback(sd_bus_message *reply, void *userdata, sd_bus_error *error) {
const char *s;
sd_bus *bus;
@@ -906,7 +934,8 @@ static int bus_start_address(sd_bus *b) {
assert(b);
for (;;) {
- bus_close_fds(b);
+ bus_close_io_fds(b);
+ bus_close_inotify_fd(b);
/* If you provide multiple different bus-addresses, we
* try all of them in order and use the first one that
@@ -914,20 +943,25 @@ static int bus_start_address(sd_bus *b) {
if (b->exec_path)
r = bus_socket_exec(b);
-
else if ((b->nspid > 0 || b->machine) && b->sockaddr.sa.sa_family != AF_UNSPEC)
r = bus_container_connect_socket(b);
-
else if (b->sockaddr.sa.sa_family != AF_UNSPEC)
r = bus_socket_connect(b);
-
else
goto next;
if (r >= 0) {
- r = attach_io_events(b);
- if (r >= 0)
- return r;
+ int q;
+
+ q = bus_attach_io_events(b);
+ if (q < 0)
+ return q;
+
+ q = bus_attach_inotify_event(b);
+ if (q < 0)
+ return q;
+
+ return r;
}
b->last_connect_error = -r;
@@ -1320,7 +1354,8 @@ _public_ void sd_bus_close(sd_bus *bus) {
* the bus object and the bus may be freed */
bus_reset_queues(bus);
- bus_close_fds(bus);
+ bus_close_io_fds(bus);
+ bus_close_inotify_fd(bus);
}
_public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) {
@@ -1337,7 +1372,7 @@ _public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) {
static void bus_enter_closing(sd_bus *bus) {
assert(bus);
- if (!IN_SET(bus->state, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING))
+ if (!IN_SET(bus->state, BUS_WATCH_BIND, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING))
return;
bus->state = BUS_CLOSING;
@@ -1988,7 +2023,16 @@ _public_ int sd_bus_get_fd(sd_bus *bus) {
assert_return(bus->input_fd == bus->output_fd, -EPERM);
assert_return(!bus_pid_changed(bus), -ECHILD);
- return bus->input_fd;
+ if (bus->state == BUS_CLOSED)
+ return -ENOTCONN;
+
+ if (bus->inotify_fd >= 0)
+ return bus->inotify_fd;
+
+ if (bus->input_fd >= 0)
+ return bus->input_fd;
+
+ return -ENOTCONN;
}
_public_ int sd_bus_get_events(sd_bus *bus) {
@@ -1997,23 +2041,40 @@ _public_ int sd_bus_get_events(sd_bus *bus) {
assert_return(bus, -EINVAL);
assert_return(!bus_pid_changed(bus), -ECHILD);
- if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING)
+ switch (bus->state) {
+
+ case BUS_UNSET:
+ case BUS_CLOSED:
return -ENOTCONN;
- if (bus->state == BUS_OPENING)
+ case BUS_WATCH_BIND:
+ flags |= POLLIN;
+ break;
+
+ case BUS_OPENING:
flags |= POLLOUT;
- else if (bus->state == BUS_AUTHENTICATING) {
+ break;
+ case BUS_AUTHENTICATING:
if (bus_socket_auth_needs_write(bus))
flags |= POLLOUT;
flags |= POLLIN;
+ break;
- } else if (IN_SET(bus->state, BUS_RUNNING, BUS_HELLO)) {
+ case BUS_RUNNING:
+ case BUS_HELLO:
if (bus->rqueue_size <= 0)
flags |= POLLIN;
if (bus->wqueue_size > 0)
flags |= POLLOUT;
+ break;
+
+ case BUS_CLOSING:
+ break;
+
+ default:
+ assert_not_reached("Unknown state");
}
return flags;
@@ -2034,39 +2095,45 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
return 1;
}
- if (bus->state == BUS_CLOSING) {
- *timeout_usec = 0;
- return 1;
- }
+ switch (bus->state) {
- if (bus->state == BUS_AUTHENTICATING) {
+ case BUS_AUTHENTICATING:
*timeout_usec = bus->auth_timeout;
return 1;
- }
- if (!IN_SET(bus->state, BUS_RUNNING, BUS_HELLO)) {
- *timeout_usec = (uint64_t) -1;
- return 0;
- }
+ case BUS_RUNNING:
+ case BUS_HELLO:
+ if (bus->rqueue_size > 0) {
+ *timeout_usec = 0;
+ return 1;
+ }
+
+ c = prioq_peek(bus->reply_callbacks_prioq);
+ if (!c) {
+ *timeout_usec = (uint64_t) -1;
+ return 0;
+ }
+
+ if (c->timeout == 0) {
+ *timeout_usec = (uint64_t) -1;
+ return 0;
+ }
+
+ *timeout_usec = c->timeout;
+ return 1;
- if (bus->rqueue_size > 0) {
+ case BUS_CLOSING:
*timeout_usec = 0;
return 1;
- }
- c = prioq_peek(bus->reply_callbacks_prioq);
- if (!c) {
+ case BUS_WATCH_BIND:
+ case BUS_OPENING:
*timeout_usec = (uint64_t) -1;
return 0;
- }
- if (c->timeout == 0) {
- *timeout_usec = (uint64_t) -1;
- return 0;
+ default:
+ assert_not_reached("Unknown or unexpected stat");
}
-
- *timeout_usec = c->timeout;
- return 1;
}
static int process_timeout(sd_bus *bus) {
@@ -2129,8 +2196,8 @@ static int process_timeout(sd_bus *bus) {
sd_bus_slot_unref(slot);
- /* When this is the hello message and it failed, then make sure to propagate the error up, don't just log and
- * ignore the callback handler's return value. */
+ /* When this is the hello message and it timed out, then make sure to propagate the error up, don't just log
+ * and ignore the callback handler's return value. */
if (is_hello)
return r;
@@ -2234,8 +2301,8 @@ static int process_reply(sd_bus *bus, sd_bus_message *m) {
sd_bus_slot_unref(slot);
- /* When this is the hello message and it timed out, then make sure to propagate the error up, don't just log
- * and ignore the callback handler's return value. */
+ /* When this is the hello message and it failed, then make sure to propagate the error up, don't just log and
+ * ignore the callback handler's return value. */
if (is_hello)
return r;
@@ -2671,48 +2738,44 @@ static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priorit
case BUS_CLOSED:
return -ECONNRESET;
+ case BUS_WATCH_BIND:
+ r = bus_socket_process_watch_bind(bus);
+ break;
+
case BUS_OPENING:
r = bus_socket_process_opening(bus);
- if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
- bus_enter_closing(bus);
- r = 1;
- } else if (r < 0)
- return r;
- if (ret)
- *ret = NULL;
- return r;
+ break;
case BUS_AUTHENTICATING:
r = bus_socket_process_authenticating(bus);
- if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
- bus_enter_closing(bus);
- r = 1;
- } else if (r < 0)
- return r;
-
- if (ret)
- *ret = NULL;
-
- return r;
+ break;
case BUS_RUNNING:
case BUS_HELLO:
r = process_running(bus, hint_priority, priority, ret);
- if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
- bus_enter_closing(bus);
- r = 1;
+ if (r >= 0)
+ return r;
- if (ret)
- *ret = NULL;
- }
-
- return r;
+ /* This branch initializes *ret, hence we don't use the generic error checking below */
+ break;
case BUS_CLOSING:
return process_closing(bus, ret);
+
+ default:
+ assert_not_reached("Unknown state");
}
- assert_not_reached("Unknown state");
+ if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
+ bus_enter_closing(bus);
+ r = 1;
+ } else if (r < 0)
+ return r;
+
+ if (ret)
+ *ret = NULL;
+
+ return r;
}
_public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
@@ -2725,7 +2788,7 @@ _public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_messa
static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
struct pollfd p[2] = {};
- int r, e, n;
+ int r, n;
struct timespec ts;
usec_t m = USEC_INFINITY;
@@ -2737,45 +2800,52 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
if (!BUS_IS_OPEN(bus->state))
return -ENOTCONN;
- e = sd_bus_get_events(bus);
- if (e < 0)
- return e;
-
- if (need_more)
- /* The caller really needs some more data, he doesn't
- * care about what's already read, or any timeouts
- * except its own. */
- e |= POLLIN;
- else {
- usec_t until;
- /* The caller wants to process if there's something to
- * process, but doesn't care otherwise */
-
- r = sd_bus_get_timeout(bus, &until);
- if (r < 0)
- return r;
- if (r > 0) {
- usec_t nw;
- nw = now(CLOCK_MONOTONIC);
- m = until > nw ? until - nw : 0;
- }
- }
-
- if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
- m = timeout_usec;
+ if (bus->state == BUS_WATCH_BIND) {
+ assert(bus->inotify_fd >= 0);
- p[0].fd = bus->input_fd;
- if (bus->output_fd == bus->input_fd) {
- p[0].events = e;
+ p[0].events = POLLIN;
+ p[0].fd = bus->inotify_fd;
n = 1;
} else {
- p[0].events = e & POLLIN;
- p[1].fd = bus->output_fd;
- p[1].events = e & POLLOUT;
- n = 2;
+ int e;
+
+ e = sd_bus_get_events(bus);
+ if (e < 0)
+ return e;
+
+ if (need_more)
+ /* The caller really needs some more data, he doesn't
+ * care about what's already read, or any timeouts
+ * except its own. */
+ e |= POLLIN;
+ else {
+ usec_t until;
+ /* The caller wants to process if there's something to
+ * process, but doesn't care otherwise */
+
+ r = sd_bus_get_timeout(bus, &until);
+ if (r < 0)
+ return r;
+ if (r > 0)
+ m = usec_sub_unsigned(until, now(CLOCK_MONOTONIC));
+ }
+
+ p[0].fd = bus->input_fd;
+ if (bus->output_fd == bus->input_fd) {
+ p[0].events = e;
+ n = 1;
+ } else {
+ p[0].events = e & POLLIN;
+ p[1].fd = bus->output_fd;
+ p[1].events = e & POLLOUT;
+ n = 2;
+ }
}
- r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
+ if (timeout_usec != (uint64_t) -1 && (m == USEC_INFINITY || timeout_usec < m))
+ m = timeout_usec;
+
+ r = ppoll(p, n, m == USEC_INFINITY ? NULL : timespec_store(&ts, m), NULL);
if (r < 0)
return -errno;
@@ -2811,6 +2881,10 @@ _public_ int sd_bus_flush(sd_bus *bus) {
if (!BUS_IS_OPEN(bus->state))
return -ENOTCONN;
+ /* We never were connected? Don't hang in inotify for good, as there's no timeout set for it */
+ if (bus->state == BUS_WATCH_BIND)
+ return -EUNATCH;
+
r = bus_ensure_running(bus);
if (r < 0)
return r;
@@ -2983,6 +3057,8 @@ static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userd
assert(bus);
+ /* Note that this is called both on input_fd, output_fd as well as inotify_fd events */
+
r = sd_bus_process(bus, NULL);
if (r < 0) {
log_debug_errno(r, "Processing of bus failed, closing down: %m");
@@ -3070,7 +3146,7 @@ static int quit_callback(sd_event_source *event, void *userdata) {
return 1;
}
-static int attach_io_events(sd_bus *bus) {
+int bus_attach_io_events(sd_bus *bus) {
int r;
assert(bus);
@@ -3124,7 +3200,7 @@ static int attach_io_events(sd_bus *bus) {
return 0;
}
-static void detach_io_events(sd_bus *bus) {
+static void bus_detach_io_events(sd_bus *bus) {
assert(bus);
if (bus->input_io_event_source) {
@@ -3138,6 +3214,44 @@ static void detach_io_events(sd_bus *bus) {
}
}
+int bus_attach_inotify_event(sd_bus *bus) {
+ int r;
+
+ assert(bus);
+
+ if (bus->inotify_fd < 0)
+ return 0;
+
+ if (!bus->event)
+ return 0;
+
+ if (!bus->inotify_event_source) {
+ r = sd_event_add_io(bus->event, &bus->inotify_event_source, bus->inotify_fd, EPOLLIN, io_callback, bus);
+ if (r < 0)
+ return r;
+
+ r = sd_event_source_set_priority(bus->inotify_event_source, bus->event_priority);
+ if (r < 0)
+ return r;
+
+ r = sd_event_source_set_description(bus->inotify_event_source, "bus-inotify");
+ } else
+ r = sd_event_source_set_io_fd(bus->inotify_event_source, bus->inotify_fd);
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+static void bus_detach_inotify_event(sd_bus *bus) {
+ assert(bus);
+
+ if (bus->inotify_event_source) {
+ sd_event_source_set_enabled(bus->inotify_event_source, SD_EVENT_OFF);
+ bus->inotify_event_source = sd_event_source_unref(bus->inotify_event_source);
+ }
+}
+
_public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
int r;
@@ -3178,7 +3292,11 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
if (r < 0)
goto fail;
- r = attach_io_events(bus);
+ r = bus_attach_io_events(bus);
+ if (r < 0)
+ goto fail;
+
+ r = bus_attach_inotify_event(bus);
if (r < 0)
goto fail;
@@ -3195,7 +3313,8 @@ _public_ int sd_bus_detach_event(sd_bus *bus) {
if (!bus->event)
return 0;
- detach_io_events(bus);
+ bus_detach_io_events(bus);
+ bus_detach_inotify_event(bus);
if (bus->time_event_source) {
sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
diff --git a/src/libelogind/sd-bus/test-bus-watch-bind.c b/src/libelogind/sd-bus/test-bus-watch-bind.c
new file mode 100644
index 000000000..38c80fde6
--- /dev/null
+++ b/src/libelogind/sd-bus/test-bus-watch-bind.c
@@ -0,0 +1,239 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+/***
+ This file is part of systemd.
+
+ Copyright 2017 Lennart Poettering
+
+ systemd is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 2.1 of the License, or
+ (at your option) any later version.
+
+ systemd is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+//#include <pthread.h>
+
+//#include "sd-bus.h"
+//#include "sd-event.h"
+//#include "sd-id128.h"
+
+//#include "alloc-util.h"
+//#include "fd-util.h"
+//#include "fileio.h"
+//#include "fs-util.h"
+//#include "mkdir.h"
+//#include "path-util.h"
+//#include "random-util.h"
+//#include "rm-rf.h"
+//#include "socket-util.h"
+//#include "string-util.h"
+
+static int method_foobar(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
+ log_info("Got Foobar() call.");
+
+ assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0) >= 0);
+ return sd_bus_reply_method_return(m, NULL);
+}
+
+static int method_exit(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
+ log_info("Got Exit() call");
+ assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 1) >= 0);
+ return sd_bus_reply_method_return(m, NULL);
+}
+
+static const sd_bus_vtable vtable[] = {
+ SD_BUS_VTABLE_START(0),
+ SD_BUS_METHOD("Foobar", NULL, NULL, method_foobar, SD_BUS_VTABLE_UNPRIVILEGED),
+ SD_BUS_METHOD("Exit", NULL, NULL, method_exit, SD_BUS_VTABLE_UNPRIVILEGED),
+ SD_BUS_VTABLE_END,
+};
+
+static void* thread_server(void *p) {
+ _cleanup_free_ char *suffixed = NULL, *suffixed2 = NULL, *d = NULL;
+ _cleanup_close_ int fd = -1;
+ union sockaddr_union u = {
+ .un.sun_family = AF_UNIX,
+ };
+ const char *path = p;
+
+ log_debug("Initializing server");
+
+ /* Let's play some games, by slowly creating the socket directory, and renaming it in the middle */
+ (void) usleep(100 * USEC_PER_MSEC);
+
+ assert_se(mkdir_parents(path, 0755) >= 0);
+ (void) usleep(100 * USEC_PER_MSEC);
+
+ d = dirname_malloc(path);
+ assert_se(d);
+ assert_se(asprintf(&suffixed, "%s.%" PRIx64, d, random_u64()) >= 0);
+ assert_se(rename(d, suffixed) >= 0);
+ (void) usleep(100 * USEC_PER_MSEC);
+
+ assert_se(asprintf(&suffixed2, "%s.%" PRIx64, d, random_u64()) >= 0);
+ assert_se(symlink(suffixed2, d) >= 0);
+ (void) usleep(100 * USEC_PER_MSEC);
+
+ assert_se(symlink(basename(suffixed), suffixed2) >= 0);
+ (void) usleep(100 * USEC_PER_MSEC);
+
+ strncpy(u.un.sun_path, path, sizeof(u.un.sun_path));
+
+ fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
+ assert_se(fd >= 0);
+
+ assert_se(bind(fd, &u.sa, SOCKADDR_UN_LEN(u.un)) >= 0);
+ usleep(100 * USEC_PER_MSEC);
+
+ assert_se(listen(fd, SOMAXCONN) >= 0);
+ usleep(100 * USEC_PER_MSEC);
+
+ assert_se(touch(path) >= 0);
+ usleep(100 * USEC_PER_MSEC);
+
+ log_debug("Initialized server");
+
+ for (;;) {
+ _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
+ _cleanup_(sd_event_unrefp) sd_event *event = NULL;
+ sd_id128_t id;
+ int bus_fd, code;
+
+ assert_se(sd_id128_randomize(&id) >= 0);
+
+ assert_se(sd_event_new(&event) >= 0);
+
+ bus_fd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
+ assert_se(bus_fd >= 0);
+
+ log_debug("Accepted server connection");
+
+ assert_se(sd_bus_new(&bus) >= 0);
+ assert_se(sd_bus_set_description(bus, "server") >= 0);
+ assert_se(sd_bus_set_fd(bus, bus_fd, bus_fd) >= 0);
+ assert_se(sd_bus_set_server(bus, true, id) >= 0);
+ /* assert_se(sd_bus_set_anonymous(bus, true) >= 0); */
+
+ assert_se(sd_bus_attach_event(bus, event, 0) >= 0);
+
+ assert_se(sd_bus_add_object_vtable(bus, NULL, "/foo", "foo.TestInterface", vtable, NULL) >= 0);
+
+ assert_se(sd_bus_start(bus) >= 0);
+
+ assert_se(sd_event_loop(event) >= 0);
+
+ assert_se(sd_event_get_exit_code(event, &code) >= 0);
+
+ if (code > 0)
+ break;
+ }
+
+ log_debug("Server done");
+
+ return NULL;
+}
+
+static void* thread_client1(void *p) {
+ _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
+ _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
+ const char *path = p, *t;
+ int r;
+
+ log_debug("Initializing client1");
+
+ assert_se(sd_bus_new(&bus) >= 0);
+ assert_se(sd_bus_set_description(bus, "client1") >= 0);
+
+ t = strjoina("unix:path=", path);
+ assert_se(sd_bus_set_address(bus, t) >= 0);
+ assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
+ assert_se(sd_bus_start(bus) >= 0);
+
+ r = sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Foobar", &error, NULL, NULL);
+ assert_se(r >= 0);
+
+ log_debug("Client1 done");
+
+ return NULL;
+}
+
+static int client2_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
+ assert_se(sd_bus_message_is_method_error(m, NULL) == 0);
+ assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0) >= 0);
+ return 0;
+}
+
+static void* thread_client2(void *p) {
+ _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
+ _cleanup_(sd_event_unrefp) sd_event *event = NULL;
+ const char *path = p, *t;
+
+ log_debug("Initializing client2");
+
+ assert_se(sd_event_new(&event) >= 0);
+ assert_se(sd_bus_new(&bus) >= 0);
+ assert_se(sd_bus_set_description(bus, "client2") >= 0);
+
+ t = strjoina("unix:path=", path);
+ assert_se(sd_bus_set_address(bus, t) >= 0);
+ assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
+ assert_se(sd_bus_attach_event(bus, event, 0) >= 0);
+ assert_se(sd_bus_start(bus) >= 0);
+
+ assert_se(sd_bus_call_method_async(bus, NULL, "foo.bar", "/foo", "foo.TestInterface", "Foobar", client2_callback, NULL, NULL) >= 0);
+
+ assert_se(sd_event_loop(event) >= 0);
+
+ log_debug("Client2 done");
+
+ return NULL;
+}
+
+static void request_exit(const char *path) {
+ _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
+ const char *t;
+
+ assert_se(sd_bus_new(&bus) >= 0);
+
+ t = strjoina("unix:path=", path);
+ assert_se(sd_bus_set_address(bus, t) >= 0);
+ assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
+ assert_se(sd_bus_set_description(bus, "request-exit") >= 0);
+ assert_se(sd_bus_start(bus) >= 0);
+
+ assert_se(sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Exit", NULL, NULL, NULL) >= 0);
+}
+
+int main(int argc, char *argv[]) {
+ _cleanup_(rm_rf_physical_and_freep) char *d = NULL;
+ pthread_t server, client1, client2;
+ char *path;
+
+ log_set_max_level(LOG_DEBUG);
+
+ /* We use /dev/shm here rather than /tmp, since some weird distros might set up /tmp as some weird fs that
+ * doesn't support inotify properly. */
+ assert_se(mkdtemp_malloc("/dev/shm/elogind-watch-bind-XXXXXX", &d) >= 0);
+
+ path = strjoina(d, "/this/is/a/socket");
+
+ assert_se(pthread_create(&server, NULL, thread_server, path) == 0);
+ assert_se(pthread_create(&client1, NULL, thread_client1, path) == 0);
+ assert_se(pthread_create(&client2, NULL, thread_client2, path) == 0);
+
+ assert_se(pthread_join(client1, NULL) == 0);
+ assert_se(pthread_join(client2, NULL) == 0);
+
+ request_exit(path);
+
+ assert_se(pthread_join(server, NULL) == 0);
+
+ return 0;
+}
diff --git a/src/systemd/sd-bus.h b/src/systemd/sd-bus.h
index c13c9b7d3..62f10b632 100644
--- a/src/systemd/sd-bus.h
+++ b/src/systemd/sd-bus.h
@@ -150,8 +150,10 @@ int sd_bus_set_allow_interactive_authorization(sd_bus *bus, int b);
int sd_bus_get_allow_interactive_authorization(sd_bus *bus);
int sd_bus_set_exit_on_disconnect(sd_bus *bus, int b);
int sd_bus_get_exit_on_disconnect(sd_bus *bus);
+int sd_bus_set_watch_bind(sd_bus *bus, int b);
+int sd_bus_get_watch_bind(sd_bus *bus);
-int sd_bus_start(sd_bus *ret);
+int sd_bus_start(sd_bus *bus);
int sd_bus_try_close(sd_bus *bus);
void sd_bus_close(sd_bus *bus);
diff --git a/src/test/meson.build b/src/test/meson.build
index ac8467ee6..72fb33d9a 100644
--- a/src/test/meson.build
+++ b/src/test/meson.build
@@ -821,7 +821,11 @@ tests += [
[threads]],
#if 0 /// UNNEEDED in elogind
-# [['src/libelogind/sd-bus/test-bus-chat.c'],
+# [['src/libsystemd/sd-bus/test-bus-watch-bind.c'],
+# [],
+# [threads], '', 'timeout=120'],
+#
+# [['src/libsystemd/sd-bus/test-bus-chat.c'],
# [],
# [threads]],
#