diff options
author | Lennart Poettering <lennart@poettering.net> | 2017-12-15 22:24:52 +0100 |
---|---|---|
committer | Sven Eden <yamakuzure@gmx.net> | 2018-05-30 07:49:55 +0200 |
commit | 004bb7654fb0ca644e70ae22221adfecdcb4a8ed (patch) | |
tree | 7163c295455c55b82b02e6d1c372a8f3423f9a18 /src/libelogind/sd-bus/sd-bus.c | |
parent | 5295adaf6b864f44b0f6ad8ec5aeda540d9c236e (diff) |
sd-bus: optionally, use inotify to wait for bus sockets to appear
This adds a "watch-bind" feature to sd-bus connections. If set and the
AF_UNIX socket we are connecting to doesn't exist yet, we'll establish
an inotify watch instead, and wait for the socket to appear. In other
words, a missing AF_UNIX just makes connecting slower.
This is useful for daemons such as networkd or resolved that shall be
able to run during early-boot, before dbus-daemon is up, and want to
connect to dbus-daemon as soon as it becomes ready.
Diffstat (limited to 'src/libelogind/sd-bus/sd-bus.c')
-rw-r--r-- | src/libelogind/sd-bus/sd-bus.c | 339 |
1 files changed, 229 insertions, 110 deletions
diff --git a/src/libelogind/sd-bus/sd-bus.c b/src/libelogind/sd-bus/sd-bus.c index c71b0fa22..380881972 100644 --- a/src/libelogind/sd-bus/sd-bus.c +++ b/src/libelogind/sd-bus/sd-bus.c @@ -75,8 +75,8 @@ } while (false) static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec); -static int attach_io_events(sd_bus *b); -static void detach_io_events(sd_bus *b); +static void bus_detach_io_events(sd_bus *b); +static void bus_detach_inotify_event(sd_bus *b); static thread_local sd_bus *default_system_bus = NULL; #if 0 /// UNNEEDED by elogind @@ -84,16 +84,26 @@ static thread_local sd_bus *default_user_bus = NULL; #endif // 0 static thread_local sd_bus *default_starter_bus = NULL; -static void bus_close_fds(sd_bus *b) { +void bus_close_io_fds(sd_bus *b) { assert(b); - detach_io_events(b); + bus_detach_io_events(b); if (b->input_fd != b->output_fd) safe_close(b->output_fd); b->output_fd = b->input_fd = safe_close(b->input_fd); } +void bus_close_inotify_fd(sd_bus *b) { + assert(b); + + bus_detach_inotify_event(b); + + b->inotify_fd = safe_close(b->inotify_fd); + b->inotify_watches = mfree(b->inotify_watches); + b->n_inotify_watches = 0; +} + static void bus_reset_queues(sd_bus *b) { assert(b); @@ -137,7 +147,8 @@ static void bus_free(sd_bus *b) { if (b->default_bus_ptr) *b->default_bus_ptr = NULL; - bus_close_fds(b); + bus_close_io_fds(b); + bus_close_inotify_fd(b); free(b->label); free(b->groups); @@ -187,6 +198,7 @@ _public_ int sd_bus_new(sd_bus **ret) { r->n_ref = REFCNT_INIT; r->input_fd = r->output_fd = -1; + r->inotify_fd = -1; r->message_version = 1; r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME; r->hello_flags |= KDBUS_HELLO_ACCEPT_FD; @@ -384,6 +396,22 @@ _public_ int sd_bus_get_allow_interactive_authorization(sd_bus *bus) { return bus->allow_interactive_authorization; } +_public_ int sd_bus_set_watch_bind(sd_bus *bus, int b) { + assert_return(bus, -EINVAL); + assert_return(bus->state == BUS_UNSET, -EPERM); + assert_return(!bus_pid_changed(bus), -ECHILD); + + bus->watch_bind = b; + return 0; +} + +_public_ int sd_bus_get_watch_bind(sd_bus *bus) { + assert_return(bus, -EINVAL); + assert_return(!bus_pid_changed(bus), -ECHILD); + + return bus->watch_bind; +} + static int hello_callback(sd_bus_message *reply, void *userdata, sd_bus_error *error) { const char *s; sd_bus *bus; @@ -906,7 +934,8 @@ static int bus_start_address(sd_bus *b) { assert(b); for (;;) { - bus_close_fds(b); + bus_close_io_fds(b); + bus_close_inotify_fd(b); /* If you provide multiple different bus-addresses, we * try all of them in order and use the first one that @@ -914,20 +943,25 @@ static int bus_start_address(sd_bus *b) { if (b->exec_path) r = bus_socket_exec(b); - else if ((b->nspid > 0 || b->machine) && b->sockaddr.sa.sa_family != AF_UNSPEC) r = bus_container_connect_socket(b); - else if (b->sockaddr.sa.sa_family != AF_UNSPEC) r = bus_socket_connect(b); - else goto next; if (r >= 0) { - r = attach_io_events(b); - if (r >= 0) - return r; + int q; + + q = bus_attach_io_events(b); + if (q < 0) + return q; + + q = bus_attach_inotify_event(b); + if (q < 0) + return q; + + return r; } b->last_connect_error = -r; @@ -1320,7 +1354,8 @@ _public_ void sd_bus_close(sd_bus *bus) { * the bus object and the bus may be freed */ bus_reset_queues(bus); - bus_close_fds(bus); + bus_close_io_fds(bus); + bus_close_inotify_fd(bus); } _public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) { @@ -1337,7 +1372,7 @@ _public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) { static void bus_enter_closing(sd_bus *bus) { assert(bus); - if (!IN_SET(bus->state, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING)) + if (!IN_SET(bus->state, BUS_WATCH_BIND, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING)) return; bus->state = BUS_CLOSING; @@ -1988,7 +2023,16 @@ _public_ int sd_bus_get_fd(sd_bus *bus) { assert_return(bus->input_fd == bus->output_fd, -EPERM); assert_return(!bus_pid_changed(bus), -ECHILD); - return bus->input_fd; + if (bus->state == BUS_CLOSED) + return -ENOTCONN; + + if (bus->inotify_fd >= 0) + return bus->inotify_fd; + + if (bus->input_fd >= 0) + return bus->input_fd; + + return -ENOTCONN; } _public_ int sd_bus_get_events(sd_bus *bus) { @@ -1997,23 +2041,40 @@ _public_ int sd_bus_get_events(sd_bus *bus) { assert_return(bus, -EINVAL); assert_return(!bus_pid_changed(bus), -ECHILD); - if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING) + switch (bus->state) { + + case BUS_UNSET: + case BUS_CLOSED: return -ENOTCONN; - if (bus->state == BUS_OPENING) + case BUS_WATCH_BIND: + flags |= POLLIN; + break; + + case BUS_OPENING: flags |= POLLOUT; - else if (bus->state == BUS_AUTHENTICATING) { + break; + case BUS_AUTHENTICATING: if (bus_socket_auth_needs_write(bus)) flags |= POLLOUT; flags |= POLLIN; + break; - } else if (IN_SET(bus->state, BUS_RUNNING, BUS_HELLO)) { + case BUS_RUNNING: + case BUS_HELLO: if (bus->rqueue_size <= 0) flags |= POLLIN; if (bus->wqueue_size > 0) flags |= POLLOUT; + break; + + case BUS_CLOSING: + break; + + default: + assert_not_reached("Unknown state"); } return flags; @@ -2034,39 +2095,45 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) { return 1; } - if (bus->state == BUS_CLOSING) { - *timeout_usec = 0; - return 1; - } + switch (bus->state) { - if (bus->state == BUS_AUTHENTICATING) { + case BUS_AUTHENTICATING: *timeout_usec = bus->auth_timeout; return 1; - } - if (!IN_SET(bus->state, BUS_RUNNING, BUS_HELLO)) { - *timeout_usec = (uint64_t) -1; - return 0; - } + case BUS_RUNNING: + case BUS_HELLO: + if (bus->rqueue_size > 0) { + *timeout_usec = 0; + return 1; + } + + c = prioq_peek(bus->reply_callbacks_prioq); + if (!c) { + *timeout_usec = (uint64_t) -1; + return 0; + } + + if (c->timeout == 0) { + *timeout_usec = (uint64_t) -1; + return 0; + } + + *timeout_usec = c->timeout; + return 1; - if (bus->rqueue_size > 0) { + case BUS_CLOSING: *timeout_usec = 0; return 1; - } - c = prioq_peek(bus->reply_callbacks_prioq); - if (!c) { + case BUS_WATCH_BIND: + case BUS_OPENING: *timeout_usec = (uint64_t) -1; return 0; - } - if (c->timeout == 0) { - *timeout_usec = (uint64_t) -1; - return 0; + default: + assert_not_reached("Unknown or unexpected stat"); } - - *timeout_usec = c->timeout; - return 1; } static int process_timeout(sd_bus *bus) { @@ -2129,8 +2196,8 @@ static int process_timeout(sd_bus *bus) { sd_bus_slot_unref(slot); - /* When this is the hello message and it failed, then make sure to propagate the error up, don't just log and - * ignore the callback handler's return value. */ + /* When this is the hello message and it timed out, then make sure to propagate the error up, don't just log + * and ignore the callback handler's return value. */ if (is_hello) return r; @@ -2234,8 +2301,8 @@ static int process_reply(sd_bus *bus, sd_bus_message *m) { sd_bus_slot_unref(slot); - /* When this is the hello message and it timed out, then make sure to propagate the error up, don't just log - * and ignore the callback handler's return value. */ + /* When this is the hello message and it failed, then make sure to propagate the error up, don't just log and + * ignore the callback handler's return value. */ if (is_hello) return r; @@ -2671,48 +2738,44 @@ static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priorit case BUS_CLOSED: return -ECONNRESET; + case BUS_WATCH_BIND: + r = bus_socket_process_watch_bind(bus); + break; + case BUS_OPENING: r = bus_socket_process_opening(bus); - if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) { - bus_enter_closing(bus); - r = 1; - } else if (r < 0) - return r; - if (ret) - *ret = NULL; - return r; + break; case BUS_AUTHENTICATING: r = bus_socket_process_authenticating(bus); - if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) { - bus_enter_closing(bus); - r = 1; - } else if (r < 0) - return r; - - if (ret) - *ret = NULL; - - return r; + break; case BUS_RUNNING: case BUS_HELLO: r = process_running(bus, hint_priority, priority, ret); - if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) { - bus_enter_closing(bus); - r = 1; + if (r >= 0) + return r; - if (ret) - *ret = NULL; - } - - return r; + /* This branch initializes *ret, hence we don't use the generic error checking below */ + break; case BUS_CLOSING: return process_closing(bus, ret); + + default: + assert_not_reached("Unknown state"); } - assert_not_reached("Unknown state"); + if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) { + bus_enter_closing(bus); + r = 1; + } else if (r < 0) + return r; + + if (ret) + *ret = NULL; + + return r; } _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) { @@ -2725,7 +2788,7 @@ _public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_messa static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) { struct pollfd p[2] = {}; - int r, e, n; + int r, n; struct timespec ts; usec_t m = USEC_INFINITY; @@ -2737,45 +2800,52 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) { if (!BUS_IS_OPEN(bus->state)) return -ENOTCONN; - e = sd_bus_get_events(bus); - if (e < 0) - return e; - - if (need_more) - /* The caller really needs some more data, he doesn't - * care about what's already read, or any timeouts - * except its own. */ - e |= POLLIN; - else { - usec_t until; - /* The caller wants to process if there's something to - * process, but doesn't care otherwise */ - - r = sd_bus_get_timeout(bus, &until); - if (r < 0) - return r; - if (r > 0) { - usec_t nw; - nw = now(CLOCK_MONOTONIC); - m = until > nw ? until - nw : 0; - } - } - - if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m)) - m = timeout_usec; + if (bus->state == BUS_WATCH_BIND) { + assert(bus->inotify_fd >= 0); - p[0].fd = bus->input_fd; - if (bus->output_fd == bus->input_fd) { - p[0].events = e; + p[0].events = POLLIN; + p[0].fd = bus->inotify_fd; n = 1; } else { - p[0].events = e & POLLIN; - p[1].fd = bus->output_fd; - p[1].events = e & POLLOUT; - n = 2; + int e; + + e = sd_bus_get_events(bus); + if (e < 0) + return e; + + if (need_more) + /* The caller really needs some more data, he doesn't + * care about what's already read, or any timeouts + * except its own. */ + e |= POLLIN; + else { + usec_t until; + /* The caller wants to process if there's something to + * process, but doesn't care otherwise */ + + r = sd_bus_get_timeout(bus, &until); + if (r < 0) + return r; + if (r > 0) + m = usec_sub_unsigned(until, now(CLOCK_MONOTONIC)); + } + + p[0].fd = bus->input_fd; + if (bus->output_fd == bus->input_fd) { + p[0].events = e; + n = 1; + } else { + p[0].events = e & POLLIN; + p[1].fd = bus->output_fd; + p[1].events = e & POLLOUT; + n = 2; + } } - r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL); + if (timeout_usec != (uint64_t) -1 && (m == USEC_INFINITY || timeout_usec < m)) + m = timeout_usec; + + r = ppoll(p, n, m == USEC_INFINITY ? NULL : timespec_store(&ts, m), NULL); if (r < 0) return -errno; @@ -2811,6 +2881,10 @@ _public_ int sd_bus_flush(sd_bus *bus) { if (!BUS_IS_OPEN(bus->state)) return -ENOTCONN; + /* We never were connected? Don't hang in inotify for good, as there's no timeout set for it */ + if (bus->state == BUS_WATCH_BIND) + return -EUNATCH; + r = bus_ensure_running(bus); if (r < 0) return r; @@ -2983,6 +3057,8 @@ static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userd assert(bus); + /* Note that this is called both on input_fd, output_fd as well as inotify_fd events */ + r = sd_bus_process(bus, NULL); if (r < 0) { log_debug_errno(r, "Processing of bus failed, closing down: %m"); @@ -3070,7 +3146,7 @@ static int quit_callback(sd_event_source *event, void *userdata) { return 1; } -static int attach_io_events(sd_bus *bus) { +int bus_attach_io_events(sd_bus *bus) { int r; assert(bus); @@ -3124,7 +3200,7 @@ static int attach_io_events(sd_bus *bus) { return 0; } -static void detach_io_events(sd_bus *bus) { +static void bus_detach_io_events(sd_bus *bus) { assert(bus); if (bus->input_io_event_source) { @@ -3138,6 +3214,44 @@ static void detach_io_events(sd_bus *bus) { } } +int bus_attach_inotify_event(sd_bus *bus) { + int r; + + assert(bus); + + if (bus->inotify_fd < 0) + return 0; + + if (!bus->event) + return 0; + + if (!bus->inotify_event_source) { + r = sd_event_add_io(bus->event, &bus->inotify_event_source, bus->inotify_fd, EPOLLIN, io_callback, bus); + if (r < 0) + return r; + + r = sd_event_source_set_priority(bus->inotify_event_source, bus->event_priority); + if (r < 0) + return r; + + r = sd_event_source_set_description(bus->inotify_event_source, "bus-inotify"); + } else + r = sd_event_source_set_io_fd(bus->inotify_event_source, bus->inotify_fd); + if (r < 0) + return r; + + return 0; +} + +static void bus_detach_inotify_event(sd_bus *bus) { + assert(bus); + + if (bus->inotify_event_source) { + sd_event_source_set_enabled(bus->inotify_event_source, SD_EVENT_OFF); + bus->inotify_event_source = sd_event_source_unref(bus->inotify_event_source); + } +} + _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) { int r; @@ -3178,7 +3292,11 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) { if (r < 0) goto fail; - r = attach_io_events(bus); + r = bus_attach_io_events(bus); + if (r < 0) + goto fail; + + r = bus_attach_inotify_event(bus); if (r < 0) goto fail; @@ -3195,7 +3313,8 @@ _public_ int sd_bus_detach_event(sd_bus *bus) { if (!bus->event) return 0; - detach_io_events(bus); + bus_detach_io_events(bus); + bus_detach_inotify_event(bus); if (bus->time_event_source) { sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF); |