summaryrefslogtreecommitdiff
path: root/src/libsystemd-bus/sd-event.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libsystemd-bus/sd-event.c')
-rw-r--r--src/libsystemd-bus/sd-event.c338
1 files changed, 267 insertions, 71 deletions
diff --git a/src/libsystemd-bus/sd-event.c b/src/libsystemd-bus/sd-event.c
index de96fde8e..511b271d4 100644
--- a/src/libsystemd-bus/sd-event.c
+++ b/src/libsystemd-bus/sd-event.c
@@ -29,10 +29,12 @@
#include "hashmap.h"
#include "util.h"
#include "time-util.h"
+#include "sd-id128.h"
#include "sd-event.h"
#define EPOLL_QUEUE_MAX 64
+#define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
typedef enum EventSourceType {
SOURCE_IO,
@@ -70,8 +72,9 @@ struct sd_event_source {
} io;
struct {
sd_time_handler_t callback;
- usec_t next;
- unsigned prioq_index;
+ usec_t next, accuracy;
+ unsigned earliest_index;
+ unsigned latest_index;
} time;
struct {
sd_signal_handler_t callback;
@@ -100,8 +103,17 @@ struct sd_event {
Prioq *pending;
Prioq *prepare;
- Prioq *monotonic;
- Prioq *realtime;
+
+ /* For both clocks we maintain two priority queues each, one
+ * ordered for the earliest times the events may be
+ * dispatched, and one ordered by the latest times they must
+ * have been dispatched. The range between the top entries in
+ * the two prioqs is the time window we can freely schedule
+ * wakeups in */
+ Prioq *monotonic_earliest;
+ Prioq *monotonic_latest;
+ Prioq *realtime_earliest;
+ Prioq *realtime_latest;
sigset_t sigset;
sd_event_source **signal_sources;
@@ -110,11 +122,13 @@ struct sd_event {
unsigned n_unmuted_child_sources;
unsigned iteration;
- unsigned processed_children;
usec_t realtime_next, monotonic_next;
+ usec_t perturb;
+
bool quit;
+ bool need_process_child;
};
static int pending_prioq_compare(const void *a, const void *b) {
@@ -185,7 +199,7 @@ static int prepare_prioq_compare(const void *a, const void *b) {
return 0;
}
-static int time_prioq_compare(const void *a, const void *b) {
+static int earliest_time_prioq_compare(const void *a, const void *b) {
const sd_event_source *x = a, *y = b;
assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
@@ -218,6 +232,39 @@ static int time_prioq_compare(const void *a, const void *b) {
return 0;
}
+static int latest_time_prioq_compare(const void *a, const void *b) {
+ const sd_event_source *x = a, *y = b;
+
+ assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
+ assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
+
+ /* Unmuted ones first */
+ if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
+ return -1;
+ if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
+ return 1;
+
+ /* Move the pending ones to the end */
+ if (!x->pending && y->pending)
+ return -1;
+ if (x->pending && !y->pending)
+ return 1;
+
+ /* Order by time */
+ if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
+ return -1;
+ if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
+ return -1;
+
+ /* Stability for the rest */
+ if (x < y)
+ return -1;
+ if (x > y)
+ return 1;
+
+ return 0;
+}
+
static void event_free(sd_event *e) {
assert(e);
@@ -235,8 +282,10 @@ static void event_free(sd_event *e) {
prioq_free(e->pending);
prioq_free(e->prepare);
- prioq_free(e->monotonic);
- prioq_free(e->realtime);
+ prioq_free(e->monotonic_earliest);
+ prioq_free(e->monotonic_latest);
+ prioq_free(e->realtime_earliest);
+ prioq_free(e->realtime_latest);
free(e->signal_sources);
@@ -357,11 +406,13 @@ static void source_free(sd_event_source *s) {
break;
case SOURCE_MONOTONIC:
- prioq_remove(s->event->monotonic, s, &s->time.prioq_index);
+ prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
+ prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
break;
case SOURCE_REALTIME:
- prioq_remove(s->event->realtime, s, &s->time.prioq_index);
+ prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
+ prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
break;
case SOURCE_SIGNAL:
@@ -494,6 +545,7 @@ static int event_setup_timer_fd(
struct epoll_event ev = {};
int r, fd;
+ sd_id128_t bootid;
assert(e);
assert(timer_fd);
@@ -514,6 +566,17 @@ static int event_setup_timer_fd(
return -errno;
}
+ /* When we sleep for longer, we try to realign the wakeup to
+ the same time wihtin each second, so that events all across
+ the system can be coalesced into a single CPU
+ wakeup. However, let's take some system-specific randomness
+ for this value, so that in a network of systems with synced
+ clocks timer events are distributed a bit. Here, we
+ calculate a perturbation usec offset from the boot ID. */
+
+ if (sd_id128_get_boot(&bootid) >= 0)
+ e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
+
*timer_fd = fd;
return 0;
}
@@ -523,8 +586,10 @@ static int event_add_time_internal(
EventSourceType type,
int *timer_fd,
clockid_t id,
- Prioq **prioq,
+ Prioq **earliest,
+ Prioq **latest,
uint64_t usec,
+ uint64_t accuracy,
sd_time_handler_t callback,
void *userdata,
sd_event_source **ret) {
@@ -538,13 +603,24 @@ static int event_add_time_internal(
return -EINVAL;
if (!ret)
return -EINVAL;
+ if (usec == (uint64_t) -1)
+ return -EINVAL;
+ if (accuracy == (uint64_t) -1)
+ return -EINVAL;
assert(timer_fd);
- assert(prioq);
+ assert(earliest);
+ assert(latest);
+
+ if (!*earliest) {
+ *earliest = prioq_new(earliest_time_prioq_compare);
+ if (!*earliest)
+ return -ENOMEM;
+ }
- if (!*prioq) {
- *prioq = prioq_new(time_prioq_compare);
- if (!*prioq)
+ if (!*latest) {
+ *latest = prioq_new(latest_time_prioq_compare);
+ if (!*latest)
return -ENOMEM;
}
@@ -559,26 +635,34 @@ static int event_add_time_internal(
return -ENOMEM;
s->time.next = usec;
+ s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
s->time.callback = callback;
- s->time.prioq_index = PRIOQ_IDX_NULL;
+ s->time.earliest_index = PRIOQ_IDX_NULL;
+ s->time.latest_index = PRIOQ_IDX_NULL;
s->userdata = userdata;
- r = prioq_put(*prioq, s, &s->time.prioq_index);
- if (r < 0) {
- source_free(s);
- return r;
- }
+ r = prioq_put(*earliest, s, &s->time.earliest_index);
+ if (r < 0)
+ goto fail;
+
+ r = prioq_put(*latest, s, &s->time.latest_index);
+ if (r < 0)
+ goto fail;
*ret = s;
return 0;
+
+fail:
+ source_free(s);
+ return r;
}
-int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
- return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic, usec, callback, userdata, ret);
+int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
+ return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
}
-int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
- return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime, usec, callback, userdata, ret);
+int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
+ return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
}
static int event_update_signal_fd(sd_event *e) {
@@ -707,6 +791,8 @@ int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t c
}
}
+ e->need_process_child = true;
+
*ret = s;
return 0;
}
@@ -848,10 +934,10 @@ int sd_event_source_set_priority(sd_event_source *s, int priority) {
s->priority = priority;
if (s->pending)
- assert_se(prioq_reshuffle(s->event->pending, s, &s->pending_index) == 0);
+ prioq_reshuffle(s->event->pending, s, &s->pending_index);
if (s->prepare)
- assert_se(prioq_reshuffle(s->event->prepare, s, &s->prepare_index) == 0);
+ prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
return 0;
}
@@ -891,12 +977,14 @@ int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) {
case SOURCE_MONOTONIC:
s->mute = m;
- prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
+ prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
+ prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
break;
case SOURCE_REALTIME:
s->mute = m;
- prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
+ prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
+ prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
break;
case SOURCE_SIGNAL:
@@ -939,12 +1027,14 @@ int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) {
case SOURCE_MONOTONIC:
s->mute = m;
- prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
+ prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
+ prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
break;
case SOURCE_REALTIME:
s->mute = m;
- prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
+ prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
+ prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
break;
case SOURCE_SIGNAL:
@@ -999,6 +1089,8 @@ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
if (!s)
return -EINVAL;
+ if (usec == (uint64_t) -1)
+ return -EINVAL;
if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
return -EDOM;
@@ -1007,10 +1099,13 @@ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
s->time.next = usec;
- if (s->type == SOURCE_REALTIME)
- prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
- else
- prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
+ if (s->type == SOURCE_REALTIME) {
+ prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
+ prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
+ } else {
+ prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
+ prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
+ }
return 0;
}
@@ -1052,40 +1147,99 @@ void* sd_event_source_get_userdata(sd_event_source *s) {
return s->userdata;
}
+static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
+ usec_t c;
+ assert(e);
+ assert(a <= b);
+
+ if (a <= 0)
+ return 0;
+
+ if (b <= a + 1)
+ return a;
+
+ /*
+ Find a good time to wake up again between times a and b. We
+ have two goals here:
+
+ a) We want to wake up as seldom as possible, hence prefer
+ later times over earlier times.
+
+ b) But if we have to wake up, then let's make sure to
+ dispatch as much as possible on the entire system.
+
+ We implement this by waking up everywhere at the same time
+ within any given second if we can, synchronised via the
+ perturbation value determined from the boot ID. If we can't,
+ then we try to find the same spot in every a 250ms
+ step. Otherwise, we pick the last possible time to wake up.
+ */
+
+ c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
+ if (c >= b) {
+ if (_unlikely_(c < USEC_PER_SEC))
+ return b;
+
+ c -= USEC_PER_SEC;
+ }
+
+ if (c >= a)
+ return c;
+
+ c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
+ if (c >= b) {
+ if (_unlikely_(c < USEC_PER_MSEC*250))
+ return b;
+
+ c -= USEC_PER_MSEC*250;
+ }
+
+ if (c >= a)
+ return c;
+
+ return b;
+}
+
static int event_arm_timer(
sd_event *e,
int timer_fd,
- Prioq *prioq,
+ Prioq *earliest,
+ Prioq *latest,
usec_t *next) {
struct itimerspec its = {};
- sd_event_source *s;
+ sd_event_source *a, *b;
+ usec_t t;
int r;
assert_se(e);
assert_se(next);
- s = prioq_peek(prioq);
- if (!s || s->mute == SD_EVENT_MUTED)
+ a = prioq_peek(earliest);
+ if (!a || a->mute == SD_EVENT_MUTED)
return 0;
- if (*next == s->time.next)
+ b = prioq_peek(latest);
+ assert_se(b && b->mute != SD_EVENT_MUTED);
+
+ t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
+ if (*next == t)
return 0;
assert_se(timer_fd >= 0);
- if (s->time.next == 0) {
+ if (t == 0) {
/* We don' want to disarm here, just mean some time looooong ago. */
its.it_value.tv_sec = 0;
its.it_value.tv_nsec = 1;
} else
- timespec_store(&its.it_value, s->time.next);
+ timespec_store(&its.it_value, t);
r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
if (r < 0)
return r;
- *next = s->time.next;
+ *next = t;
return 0;
}
@@ -1131,14 +1285,14 @@ static int flush_timer(sd_event *e, int fd, uint32_t events) {
return 0;
}
-static int process_timer(sd_event *e, usec_t n, Prioq *prioq) {
+static int process_timer(sd_event *e, usec_t n, Prioq *earliest, Prioq *latest) {
sd_event_source *s;
int r;
assert(e);
for (;;) {
- s = prioq_peek(prioq);
+ s = prioq_peek(earliest);
if (!s ||
s->time.next > n ||
s->mute == SD_EVENT_MUTED ||
@@ -1149,9 +1303,8 @@ static int process_timer(sd_event *e, usec_t n, Prioq *prioq) {
if (r < 0)
return r;
- r = prioq_reshuffle(prioq, s, &s->time.prioq_index);
- if (r < 0)
- return r;
+ prioq_reshuffle(earliest, s, &s->time.earliest_index);
+ prioq_reshuffle(latest, s, &s->time.latest_index);
}
return 0;
@@ -1164,6 +1317,8 @@ static int process_child(sd_event *e) {
assert(e);
+ e->need_process_child = false;
+
/*
So, this is ugly. We iteratively invoke waitid() with P_PID
+ WNOHANG for each PID we wait for, instead of using
@@ -1199,7 +1354,6 @@ static int process_child(sd_event *e) {
}
}
- e->processed_children = e->iteration;
return 0;
}
@@ -1323,6 +1477,19 @@ static int event_prepare(sd_event *e) {
return 0;
}
+static sd_event_source* event_next_pending(sd_event *e) {
+ sd_event_source *p;
+
+ p = prioq_peek(e->pending);
+ if (!p)
+ return NULL;
+
+ if (p->mute == SD_EVENT_MUTED)
+ return NULL;
+
+ return p;
+}
+
int sd_event_run(sd_event *e, uint64_t timeout) {
struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
sd_event_source *p;
@@ -1340,25 +1507,21 @@ int sd_event_run(sd_event *e, uint64_t timeout) {
if (r < 0)
return r;
- r = event_arm_timer(e, e->monotonic_fd, e->monotonic, &e->monotonic_next);
- if (r < 0)
- return r;
+ if (event_next_pending(e) || e->need_process_child)
+ timeout = 0;
- r = event_arm_timer(e, e->realtime_fd, e->realtime, &e->realtime_next);
- if (r < 0)
- return r;
+ if (timeout > 0) {
+ r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
+ if (r < 0)
+ return r;
- if (e->iteration == 1 && !hashmap_isempty(e->child_sources))
- /* On the first iteration, there might be already some
- * zombies for us to care for, hence, don't wait */
- timeout = 0;
- else {
- p = prioq_peek(e->pending);
- if (p && p->mute != SD_EVENT_MUTED)
- timeout = 0;
+ r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
+ if (r < 0)
+ return r;
}
- m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
+ m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
+ timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
if (m < 0)
return m;
@@ -1379,24 +1542,22 @@ int sd_event_run(sd_event *e, uint64_t timeout) {
return r;
}
- r = process_timer(e, n.monotonic, e->monotonic);
+ r = process_timer(e, n.monotonic, e->monotonic_earliest, e->monotonic_latest);
if (r < 0)
return r;
- r = process_timer(e, n.realtime, e->realtime);
+ r = process_timer(e, n.realtime, e->realtime_earliest, e->realtime_latest);
if (r < 0)
return r;
- if (e->iteration == 1 && e->processed_children != 1) {
- /* On the first iteration, make sure we really process
- * all children which might already be zombies. */
+ if (e->need_process_child) {
r = process_child(e);
if (r < 0)
return r;
}
- p = prioq_peek(e->pending);
- if (!p || p->mute == SD_EVENT_MUTED)
+ p = event_next_pending(e);
+ if (!p)
return 0;
return source_dispatch(p);
@@ -1438,3 +1599,38 @@ sd_event *sd_event_get(sd_event_source *s) {
return s->event;
}
+
+int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
+ if (!s)
+ return -EINVAL;
+ if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
+ return -EDOM;
+
+ if (usec == 0)
+ usec = DEFAULT_ACCURACY_USEC;
+
+ if (s->time.accuracy == usec)
+ return 0;
+
+
+ s->time.accuracy = usec;
+
+ if (s->type == SOURCE_REALTIME)
+ prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
+ else
+ prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
+
+ return 0;
+}
+
+int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
+ if (!s)
+ return -EINVAL;
+ if (!usec)
+ return -EINVAL;
+ if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
+ return -EDOM;
+
+ *usec = s->time.accuracy;
+ return 0;
+}