diff options
author | Mike Brady <mikebrady@eircom.net> | 2018-07-30 10:07:11 +0100 |
---|---|---|
committer | Mike Brady <mikebrady@eircom.net> | 2018-07-30 10:07:11 +0100 |
commit | 76c5dd92856cde63067c6a830cbd0f454444ff86 (patch) | |
tree | f671144fc8ccb9dd487a05fbb53a4b5472b797eb | |
parent | d70d7e5ec5c7782f52ba27c192deb4b82db11cad (diff) | |
parent | 4ea9de7b8bdc4c9af6c978411d9288b1eed50f25 (diff) |
Stop using SIGUSR1 for cancelling threads, use pthread_cancel and friends instead, fix some memory leaks, add accurate ouput rate calculation to statistics, add "quit" verb to MPRIS and native d-bus interfaces. Probably still buggy.
-rw-r--r-- | audio.c | 1 | ||||
-rw-r--r-- | audio.h | 4 | ||||
-rw-r--r-- | audio_alsa.c | 470 | ||||
-rw-r--r-- | audio_ao.c | 2 | ||||
-rw-r--r-- | audio_dummy.c | 4 | ||||
-rw-r--r-- | audio_pa.c | 3 | ||||
-rw-r--r-- | audio_pipe.c | 3 | ||||
-rw-r--r-- | audio_sndio.c | 5 | ||||
-rw-r--r-- | audio_soundio.c | 3 | ||||
-rw-r--r-- | audio_stdout.c | 3 | ||||
-rw-r--r-- | common.c | 57 | ||||
-rw-r--r-- | common.h | 12 | ||||
-rw-r--r-- | dacp.c | 104 | ||||
-rw-r--r-- | dacp.h | 6 | ||||
-rw-r--r-- | dbus-service.c | 48 | ||||
-rw-r--r-- | dbus-service.h | 1 | ||||
-rw-r--r-- | mdns_avahi.c | 139 | ||||
-rw-r--r-- | metadata_hub.c | 48 | ||||
-rw-r--r-- | metadata_hub.h | 3 | ||||
-rw-r--r-- | mpris-service.c | 17 | ||||
-rw-r--r-- | mqtt.c | 282 | ||||
-rw-r--r-- | mqtt.h | 11 | ||||
-rw-r--r-- | org.gnome.ShairportSync.xml | 1 | ||||
-rw-r--r-- | player.c | 868 | ||||
-rw-r--r-- | player.h | 20 | ||||
-rw-r--r-- | rtp.c | 18 | ||||
-rw-r--r-- | rtsp.c | 269 | ||||
-rw-r--r-- | rtsp.h | 6 | ||||
-rw-r--r-- | shairport.c | 267 |
29 files changed, 1562 insertions, 1113 deletions
@@ -1,6 +1,7 @@ /* * Audio driver handler. This file is part of Shairport. * Copyright (c) James Laird 2013 + * Modifications (c) Mike Brady 2014 -- 2018 * All rights reserved. * * Permission is hereby granted, free of charge, to any person @@ -22,7 +22,7 @@ typedef struct { void (*start)(int sample_rate, int sample_format); // block of samples - void (*play)(void *buf, int samples); + int (*play)(void *buf, int samples); void (*stop)(void); // may be null if not implemented @@ -33,6 +33,8 @@ typedef struct { // will change dynamically, so keep watching it. Implemented in ALSA only. // returns a negative error code if there's a problem int (*delay)(long *the_delay); // snd_pcm_sframes_t is a signed long + int (*rate_info)(uint64_t *elapsed_time, + uint64_t *frames_played); // use this to get the true rate of the DAC // may be NULL, in which case soft volume is applied void (*volume)(double vol); diff --git a/audio_alsa.c b/audio_alsa.c index e2a53cc..27995d5 100644 --- a/audio_alsa.c +++ b/audio_alsa.c @@ -30,6 +30,7 @@ #include "audio.h" #include "common.h" #include <alsa/asoundlib.h> +#include <inttypes.h> #include <math.h> #include <memory.h> #include <pthread.h> @@ -40,11 +41,12 @@ static void help(void); static int init(int argc, char **argv); static void deinit(void); static void start(int i_sample_rate, int i_sample_format); -static void play(void *buf, int samples); +static int play(void *buf, int samples); static void stop(void); static void flush(void); int delay(long *the_delay); void do_mute(int request); +int get_rate_information(uint64_t *elapsed_time, uint64_t *frames_played); static void volume(double vol); void do_volume(double vol); @@ -64,6 +66,7 @@ audio_output audio_alsa = { .flush = &flush, .delay = &delay, .play = &play, + .rate_info = &get_rate_information, .mute = NULL, // a function will be provided if it can, and is allowed to, do hardware mute .volume = NULL, // a function will be provided if it can do hardware volume .parameters = ¶meters}; @@ -110,6 +113,16 @@ int alsa_characteristics_already_listed = 0; static snd_pcm_uframes_t period_size_requested, buffer_size_requested; static int set_period_size_request, set_buffer_size_request; +static uint64_t measurement_start_time; +static uint64_t frames_played_at_measurement_start_time; + +static uint64_t measurement_time; +static uint64_t frames_played_at_measurement_time; + +static uint64_t frames_sent_for_playing; +static uint64_t frame_index; +static int measurement_data_is_valid; + static void help(void) { printf(" -d output-device set the output device [default*|...]\n" " -m mixer-device set the mixer device ['output-device'*|...]\n" @@ -121,6 +134,7 @@ static void help(void) { void set_alsa_out_dev(char *dev) { alsa_out_dev = dev; } int open_mixer() { + int response = 0; if (hardware_mixer) { debug(3, "Open Mixer"); int ret = 0; @@ -128,25 +142,38 @@ int open_mixer() { snd_mixer_selem_id_set_index(alsa_mix_sid, alsa_mix_index); snd_mixer_selem_id_set_name(alsa_mix_sid, alsa_mix_ctrl); - if ((snd_mixer_open(&alsa_mix_handle, 0)) < 0) - die("Failed to open mixer"); - debug(3, "Mixer device name is \"%s\".", alsa_mix_dev); - if ((snd_mixer_attach(alsa_mix_handle, alsa_mix_dev)) < 0) - die("Failed to attach mixer"); - if ((snd_mixer_selem_register(alsa_mix_handle, NULL, NULL)) < 0) - die("Failed to register mixer element"); - - ret = snd_mixer_load(alsa_mix_handle); - if (ret < 0) - die("Failed to load mixer element"); - debug(3, "Mixer Control name is \"%s\".", alsa_mix_ctrl); - alsa_mix_elem = snd_mixer_find_selem(alsa_mix_handle, alsa_mix_sid); - if (!alsa_mix_elem) - die("Failed to find mixer element"); - return 1; - } else { - return 0; + if ((snd_mixer_open(&alsa_mix_handle, 0)) < 0) { + debug(1, "Failed to open mixer"); + response = -1; + } else { + debug(3, "Mixer device name is \"%s\".", alsa_mix_dev); + if ((snd_mixer_attach(alsa_mix_handle, alsa_mix_dev)) < 0) { + debug(1, "Failed to attach mixer"); + response = -2; + } else { + if ((snd_mixer_selem_register(alsa_mix_handle, NULL, NULL)) < 0) { + debug(1, "Failed to register mixer element"); + response = -3; + } else { + ret = snd_mixer_load(alsa_mix_handle); + if (ret < 0) { + debug(1, "Failed to load mixer element"); + response = -4; + } else { + debug(3, "Mixer Control name is \"%s\".", alsa_mix_ctrl); + alsa_mix_elem = snd_mixer_find_selem(alsa_mix_handle, alsa_mix_sid); + if (!alsa_mix_elem) { + debug(1, "Failed to find mixer element"); + response = -5; + } else { + response = 1; // we found a hardware mixer and successfully opened it + } + } + } + } + } } + return response; } void close_mixer() { @@ -166,8 +193,8 @@ void do_snd_mixer_selem_set_playback_dB_all(snd_mixer_elem_t *mix_elem, double v } static int init(int argc, char **argv) { - debug_mutex_lock(&alsa_mutex, 1000, 1); // debug(2,"audio_alsa init called."); + int response = 0; // this will be what we return to the caller. const char *str; int value; // double dvalue; @@ -219,8 +246,9 @@ static int init(int argc, char **argv) { else if (strcasecmp(str, "yes") == 0) config.no_sync = 1; else { - debug_mutex_unlock(&alsa_mutex, 3); - die("Invalid disable_synchronization option choice \"%s\". It should be \"yes\" or \"no\""); + warn("Invalid disable_synchronization option choice \"%s\". It should be \"yes\" or " + "\"no\". It is set to \"no\"."); + config.no_sync = 0; } } @@ -233,9 +261,9 @@ static int init(int argc, char **argv) { else if (strcasecmp(str, "yes") == 0) config.alsa_use_hardware_mute = 1; else { - debug_mutex_unlock(&alsa_mutex, 3); - die("Invalid mute_using_playback_switch option choice \"%s\". It should be \"yes\" or " - "\"no\""); + warn("Invalid mute_using_playback_switch option choice \"%s\". It should be \"yes\" or " + "\"no\". It is set to \"no\"."); + config.alsa_use_hardware_mute = 0; } } @@ -246,9 +274,9 @@ static int init(int argc, char **argv) { else if (strcasecmp(str, "yes") == 0) config.alsa_use_hardware_mute = 1; else { - debug_mutex_unlock(&alsa_mutex, 3); - die("Invalid use_hardware_mute_if_available option choice \"%s\". It should be \"yes\" or " - "\"no\""); + warn("Invalid use_hardware_mute_if_available option choice \"%s\". It should be \"yes\" or " + "\"no\". It is set to \"no\"."); + config.alsa_use_hardware_mute = 0; } } @@ -269,11 +297,11 @@ static int init(int argc, char **argv) { else if (strcasecmp(str, "S8") == 0) config.output_format = SPS_FORMAT_S8; else { - debug_mutex_unlock(&alsa_mutex, 3); - die("Invalid output format \"%s\". It should be \"U8\", \"S8\", \"S16\", \"S24\", " - "\"S24_3LE\", \"S24_3BE\" or " - "\"S32\"", - str); + warn("Invalid output format \"%s\". It should be \"U8\", \"S8\", \"S16\", \"S24\", " + "\"S24_3LE\", \"S24_3BE\" or " + "\"S32\". It is set to \"S16\".", + str); + config.output_format = SPS_FORMAT_S16; } } @@ -288,8 +316,10 @@ static int init(int argc, char **argv) { config.output_rate = value; break; default: - debug_mutex_unlock(&alsa_mutex, 3); - die("Invalid output rate \"%d\". It should be a multiple of 44,100 up to 352,800", value); + warn("Invalid output rate \"%d\". It should be a multiple of 44,100 up to 352,800. It is " + "set to 44,100", + value); + config.output_rate = 44100; } } @@ -300,8 +330,9 @@ static int init(int argc, char **argv) { else if (strcasecmp(str, "yes") == 0) config.no_mmap = 0; else { - debug_mutex_unlock(&alsa_mutex, 3); - die("Invalid use_mmap_if_available option choice \"%s\". It should be \"yes\" or \"no\""); + warn("Invalid use_mmap_if_available option choice \"%s\". It should be \"yes\" or \"no\". " + "It is set to \"yes\"."); + config.no_mmap = 0; } } /* Get the optional period size value */ @@ -309,10 +340,10 @@ static int init(int argc, char **argv) { set_period_size_request = 1; debug(1, "Value read for period size is %d.", value); if (value < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - die("Invalid alsa period size setting \"%d\". It " - "must be greater than 0.", - value); + warn("Invalid alsa period size setting \"%d\". It " + "must be greater than 0. No setting is made.", + value); + set_period_size_request = 0; } else { period_size_requested = value; } @@ -323,10 +354,10 @@ static int init(int argc, char **argv) { set_buffer_size_request = 1; debug(1, "Value read for buffer size is %d.", value); if (value < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - die("Invalid alsa buffer size setting \"%d\". It " - "must be greater than 0.", - value); + warn("Invalid alsa buffer size setting \"%d\". It " + "must be greater than 0. No setting is made.", + value); + set_buffer_size_request = 0; } else { buffer_size_requested = value; } @@ -361,15 +392,13 @@ static int init(int argc, char **argv) { alsa_mix_index = strtol(optarg, NULL, 10); break; default: + warn("Invalid audio option \"-%c\" specified -- ignored.", opt); help(); - debug_mutex_unlock(&alsa_mutex, 3); - die("Invalid audio option -%c specified", opt); } } if (optind < argc) { - debug_mutex_unlock(&alsa_mutex, 3); - die("Invalid audio argument: %s", argv[optind]); + warn("Invalid audio argument: \"%s\" -- ignored", argv[optind]); } debug(1, "alsa output device name is \"%s\".", alsa_out_dev); @@ -379,91 +408,94 @@ static int init(int argc, char **argv) { if (alsa_mix_dev == NULL) alsa_mix_dev = alsa_out_dev; - // Open mixer - - open_mixer(); - - if (snd_mixer_selem_get_playback_volume_range(alsa_mix_elem, &alsa_mix_minv, &alsa_mix_maxv) < - 0) - debug(1, "Can't read mixer's [linear] min and max volumes."); - else { - if (snd_mixer_selem_get_playback_dB_range(alsa_mix_elem, &alsa_mix_mindb, &alsa_mix_maxdb) == - 0) { - - audio_alsa.volume = &volume; // insert the volume function now we know it can do dB stuff - audio_alsa.parameters = ¶meters; // likewise the parameters stuff - if (alsa_mix_mindb == SND_CTL_TLV_DB_GAIN_MUTE) { - // For instance, the Raspberry Pi does this - debug(1, "Lowest dB value is a mute"); - mixer_volume_setting_gives_mute = 1; - alsa_mix_mute = SND_CTL_TLV_DB_GAIN_MUTE; // this may not be necessary -- it's always - // going to be SND_CTL_TLV_DB_GAIN_MUTE, right? - // debug(1, "Try minimum volume + 1 as lowest true attenuation value"); - if (snd_mixer_selem_ask_playback_vol_dB(alsa_mix_elem, alsa_mix_minv + 1, - &alsa_mix_mindb) != 0) - debug(1, "Can't get dB value corresponding to a minimum volume + 1."); - } - debug(1, "Hardware mixer has dB volume from %f to %f.", (1.0 * alsa_mix_mindb) / 100.0, - (1.0 * alsa_mix_maxdb) / 100.0); - } else { - // use the linear scale and do the db conversion ourselves - debug(1, "note: the hardware mixer specified -- \"%s\" -- does not have " - "a dB volume scale.", - alsa_mix_ctrl); - - if (snd_ctl_open(&ctl, alsa_mix_dev, 0) < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - die("Cannot open control \"%s\"", alsa_mix_dev); - } - if (snd_ctl_elem_id_malloc(&elem_id) < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - die("Cannot allocate memory for control \"%s\"", alsa_mix_dev); - } - snd_ctl_elem_id_set_interface(elem_id, SND_CTL_ELEM_IFACE_MIXER); - snd_ctl_elem_id_set_name(elem_id, alsa_mix_ctrl); + // Now, start trying to initialise the alsa device with the settings obtained + pthread_cleanup_debug_mutex_lock(&alsa_mutex, 1000, 1); + if (open_mixer() == 1) { + if (snd_mixer_selem_get_playback_volume_range(alsa_mix_elem, &alsa_mix_minv, &alsa_mix_maxv) < + 0) + debug(1, "Can't read mixer's [linear] min and max volumes."); + else { + if (snd_mixer_selem_get_playback_dB_range(alsa_mix_elem, &alsa_mix_mindb, + &alsa_mix_maxdb) == 0) { - if (snd_ctl_get_dB_range(ctl, elem_id, &alsa_mix_mindb, &alsa_mix_maxdb) == 0) { - debug(1, "Volume control \"%s\" has dB volume from %f to %f.", alsa_mix_ctrl, - (1.0 * alsa_mix_mindb) / 100.0, (1.0 * alsa_mix_maxdb) / 100.0); - has_softvol = 1; audio_alsa.volume = &volume; // insert the volume function now we know it can do dB stuff audio_alsa.parameters = ¶meters; // likewise the parameters stuff + if (alsa_mix_mindb == SND_CTL_TLV_DB_GAIN_MUTE) { + // For instance, the Raspberry Pi does this + debug(1, "Lowest dB value is a mute"); + mixer_volume_setting_gives_mute = 1; + alsa_mix_mute = SND_CTL_TLV_DB_GAIN_MUTE; // this may not be necessary -- it's always + // going to be SND_CTL_TLV_DB_GAIN_MUTE, right? + // debug(1, "Try minimum volume + 1 as lowest true attenuation value"); + if (snd_mixer_selem_ask_playback_vol_dB(alsa_mix_elem, alsa_mix_minv + 1, + &alsa_mix_mindb) != 0) + debug(1, "Can't get dB value corresponding to a minimum volume + 1."); + } + debug(1, "Hardware mixer has dB volume from %f to %f.", (1.0 * alsa_mix_mindb) / 100.0, + (1.0 * alsa_mix_maxdb) / 100.0); } else { - debug(1, "Cannot get the dB range from the volume control \"%s\"", alsa_mix_ctrl); + // use the linear scale and do the db conversion ourselves + warn("The hardware mixer specified -- \"%s\" -- does not have " + "a dB volume scale.", + alsa_mix_ctrl); + + if (snd_ctl_open(&ctl, alsa_mix_dev, 0) < 0) { + warn("Cannot open control \"%s\"", alsa_mix_dev); + response = -1; + } + if (snd_ctl_elem_id_malloc(&elem_id) < 0) { + debug(1, "Cannot allocate memory for control \"%s\"", alsa_mix_dev); + elem_id = NULL; + response = -2; + } else { + snd_ctl_elem_id_set_interface(elem_id, SND_CTL_ELEM_IFACE_MIXER); + snd_ctl_elem_id_set_name(elem_id, alsa_mix_ctrl); + + if (snd_ctl_get_dB_range(ctl, elem_id, &alsa_mix_mindb, &alsa_mix_maxdb) == 0) { + debug(1, "Volume control \"%s\" has dB volume from %f to %f.", alsa_mix_ctrl, + (1.0 * alsa_mix_mindb) / 100.0, (1.0 * alsa_mix_maxdb) / 100.0); + has_softvol = 1; + audio_alsa.volume = + &volume; // insert the volume function now we know it can do dB stuff + audio_alsa.parameters = ¶meters; // likewise the parameters stuff + } else { + debug(1, "Cannot get the dB range from the volume control \"%s\"", alsa_mix_ctrl); + } + } + /* + debug(1, "Min and max volumes are %d and + %d.",alsa_mix_minv,alsa_mix_maxv); + alsa_mix_maxdb = 0; + if ((alsa_mix_maxv!=0) && (alsa_mix_minv!=0)) + alsa_mix_mindb = + -20*100*(log10(alsa_mix_maxv*1.0)-log10(alsa_mix_minv*1.0)); + else if (alsa_mix_maxv!=0) + alsa_mix_mindb = -20*100*log10(alsa_mix_maxv*1.0); + audio_alsa.volume = &linear_volume; // insert the linear volume function + audio_alsa.parameters = ¶meters; // likewise the parameters stuff + debug(1,"Max and min dB calculated are %d and + %d.",alsa_mix_maxdb,alsa_mix_mindb); + */ } - - /* - debug(1, "Min and max volumes are %d and - %d.",alsa_mix_minv,alsa_mix_maxv); - alsa_mix_maxdb = 0; - if ((alsa_mix_maxv!=0) && (alsa_mix_minv!=0)) - alsa_mix_mindb = - -20*100*(log10(alsa_mix_maxv*1.0)-log10(alsa_mix_minv*1.0)); - else if (alsa_mix_maxv!=0) - alsa_mix_mindb = -20*100*log10(alsa_mix_maxv*1.0); - audio_alsa.volume = &linear_volume; // insert the linear volume function - audio_alsa.parameters = ¶meters; // likewise the parameters stuff - debug(1,"Max and min dB calculated are %d and - %d.",alsa_mix_maxdb,alsa_mix_mindb); - */ } + if (((config.alsa_use_hardware_mute == 1) && + (snd_mixer_selem_has_playback_switch(alsa_mix_elem))) || + mixer_volume_setting_gives_mute) { + audio_alsa.mute = &mute; // insert the mute function now we know it can do muting stuff + // debug(1, "Has mixer and mute ability we will use."); + } else { + // debug(1, "Has mixer but not using hardware mute."); + } + close_mixer(); } - if (((config.alsa_use_hardware_mute == 1) && - (snd_mixer_selem_has_playback_switch(alsa_mix_elem))) || - mixer_volume_setting_gives_mute) { - audio_alsa.mute = &mute; // insert the mute function now we know it can do muting stuff - // debug(1, "Has mixer and mute ability we will use."); - } else { - // debug(1, "Has mixer but not using hardware mute."); - } - close_mixer(); + debug_mutex_unlock(&alsa_mutex, 3); + pthread_cleanup_pop(0); // release the mutex } else { // debug(1, "Has no mixer and thus no hardware mute."); } alsa_mix_handle = NULL; - debug_mutex_unlock(&alsa_mutex, 3); - return 0; + return response; } static void deinit(void) { @@ -473,7 +505,6 @@ static void deinit(void) { int open_alsa_device(void) { // the alsa mutex is already acquired when this is called - const snd_pcm_uframes_t minimal_buffer_headroom = 352 * 2; // we accept this much headroom in the hardware buffer, but we'll // accept less @@ -501,17 +532,16 @@ int open_alsa_device(void) { ret = snd_pcm_open(&alsa_handle, alsa_out_dev, SND_PCM_STREAM_PLAYBACK, 0); if (ret < 0) - return (ret); + return (-10); snd_pcm_hw_params_alloca(&alsa_params); ret = snd_pcm_hw_params_any(alsa_handle, alsa_params); if (ret < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - ; - die("audio_alsa: Broken configuration for device \"%s\": no configurations " - "available", - alsa_out_dev); + warn("audio_alsa: Broken configuration for device \"%s\": no configurations " + "available", + alsa_out_dev); + return -11; } if ((config.no_mmap == 0) && @@ -534,9 +564,9 @@ int open_alsa_device(void) { ret = snd_pcm_hw_params_set_access(alsa_handle, alsa_params, access); if (ret < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - die("audio_alsa: Access type not available for device \"%s\": %s", alsa_out_dev, - snd_strerror(ret)); + warn("audio_alsa: Access type not available for device \"%s\": %s", alsa_out_dev, + snd_strerror(ret)); + return -12; } snd_pcm_format_t sf; switch (sample_format) { @@ -562,28 +592,29 @@ int open_alsa_device(void) { sf = SND_PCM_FORMAT_S32; break; default: - debug_mutex_unlock(&alsa_mutex, 3); sf = SND_PCM_FORMAT_S16; // this is just to quieten a compiler warning - die("Unsupported output format at audio_alsa.c"); + debug(1, "Unsupported output format at audio_alsa.c"); + return -1; } ret = snd_pcm_hw_params_set_format(alsa_handle, alsa_params, sf); if (ret < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - die("audio_alsa: Sample format %d not available for device \"%s\": %s", sample_format, - alsa_out_dev, snd_strerror(ret)); + warn("audio_alsa: Sample format %d not available for device \"%s\": %s", sample_format, + alsa_out_dev, snd_strerror(ret)); + return -2; } ret = snd_pcm_hw_params_set_channels(alsa_handle, alsa_params, 2); if (ret < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - die("audio_alsa: Channels count (2) not available for device \"%s\": %s", alsa_out_dev, - snd_strerror(ret)); + warn("audio_alsa: Channels count (2) not available for device \"%s\": %s", alsa_out_dev, + snd_strerror(ret)); + return -3; } ret = snd_pcm_hw_params_set_rate_near(alsa_handle, alsa_params, &my_sample_rate, &dir); if (ret < 0) { - die("audio_alsa: Rate %iHz not available for playback: %s", desired_sample_rate, - snd_strerror(ret)); + warn("audio_alsa: Rate %iHz not available for playback: %s", desired_sample_rate, + snd_strerror(ret)); + return -4; } if (set_period_size_request != 0) { @@ -591,9 +622,10 @@ int open_alsa_device(void) { ret = snd_pcm_hw_params_set_period_size_near(alsa_handle, alsa_params, &period_size_requested, &dir); if (ret < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - die("audio_alsa: cannot set period size of %lu: %s", period_size_requested, - snd_strerror(ret)); + warn("audio_alsa: cannot set period size of %lu: %s", period_size_requested, + snd_strerror(ret)); + return -5; + } else { snd_pcm_uframes_t actual_period_size; snd_pcm_hw_params_get_period_size(alsa_params, &actual_period_size, &dir); if (actual_period_size != period_size_requested) @@ -607,35 +639,36 @@ int open_alsa_device(void) { debug(1, "Attempting to set the buffer size to %lu", buffer_size_requested); ret = snd_pcm_hw_params_set_buffer_size_near(alsa_handle, alsa_params, &buffer_size_requested); if (ret < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - die("audio_alsa: cannot set buffer size of %lu: %s", buffer_size_requested, - snd_strerror(ret)); + warn("audio_alsa: cannot set buffer size of %lu: %s", buffer_size_requested, + snd_strerror(ret)); + return -6; + } else { + snd_pcm_uframes_t actual_buffer_size; + snd_pcm_hw_params_get_buffer_size(alsa_params, &actual_buffer_size); + if (actual_buffer_size != buffer_size_requested) + inform("Actual period size set to a different value than requested. Requested: %lu, actual " + "setting: %lu", + buffer_size_requested, actual_buffer_size); } - snd_pcm_uframes_t actual_buffer_size; - snd_pcm_hw_params_get_buffer_size(alsa_params, &actual_buffer_size); - if (actual_buffer_size != buffer_size_requested) - inform("Actual period size set to a different value than requested. Requested: %lu, actual " - "setting: %lu", - buffer_size_requested, actual_buffer_size); } ret = snd_pcm_hw_params(alsa_handle, alsa_params); if (ret < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - die("audio_alsa: Unable to set hw parameters for device \"%s\": %s.", alsa_out_dev, - snd_strerror(ret)); + warn("audio_alsa: Unable to set hw parameters for device \"%s\": %s.", alsa_out_dev, + snd_strerror(ret)); + return -7; } if (my_sample_rate != desired_sample_rate) { - debug_mutex_unlock(&alsa_mutex, 3); - die("Can't set the D/A converter to %d.", desired_sample_rate); + warn("Can't set the D/A converter to %d.", desired_sample_rate); + return -8; } ret = snd_pcm_hw_params_get_buffer_size(alsa_params, &actual_buffer_length); if (ret < 0) { - debug_mutex_unlock(&alsa_mutex, 3); - die("audio_alsa: Unable to get hw buffer length for device \"%s\": %s.", alsa_out_dev, - snd_strerror(ret)); + warn("audio_alsa: Unable to get hw buffer length for device \"%s\": %s.", alsa_out_dev, + snd_strerror(ret)); + return -9; } if (actual_buffer_length < config.audio_backend_buffer_desired_length + minimal_buffer_headroom) { @@ -794,7 +827,7 @@ int open_alsa_device(void) { } } - return (0); + return 0; } static void start(int i_sample_rate, int i_sample_format) { @@ -808,6 +841,9 @@ static void start(int i_sample_rate, int i_sample_format) { sample_format = SPS_FORMAT_S16; // default else sample_format = i_sample_format; + + frame_index = 0; + measurement_data_is_valid = 0; } int delay(long *the_delay) { @@ -817,7 +853,7 @@ int delay(long *the_delay) { if (alsa_handle == NULL) { return -ENODEV; } else { - debug_mutex_lock(&alsa_mutex, 10000, 1); + pthread_cleanup_debug_mutex_lock(&alsa_mutex, 10000, 1); int derr; if (snd_pcm_state(alsa_handle) == SND_PCM_STATE_RUNNING) { *the_delay = 0; // just to see what happens @@ -827,23 +863,29 @@ int delay(long *the_delay) { snd_strerror(reply), *the_delay); snd_pcm_recover(alsa_handle, reply, 1); } - } else if (snd_pcm_state(alsa_handle) == SND_PCM_STATE_PREPARED) { - *the_delay = 0; - reply = 0; // no error } else { - if (snd_pcm_state(alsa_handle) == SND_PCM_STATE_XRUN) { + frame_index = 0; // we'll be starting over... + measurement_data_is_valid = 0; + + if (snd_pcm_state(alsa_handle) == SND_PCM_STATE_PREPARED) { *the_delay = 0; reply = 0; // no error } else { - reply = -EIO; - debug(1, "Error -- ALSA delay(): bad state: %d.", snd_pcm_state(alsa_handle)); - } - if ((derr = snd_pcm_prepare(alsa_handle))) { - snd_pcm_recover(alsa_handle, derr, 1); - debug(1, "Error preparing after delay error: \"%s\".", snd_strerror(derr)); + if (snd_pcm_state(alsa_handle) == SND_PCM_STATE_XRUN) { + *the_delay = 0; + reply = 0; // no error + } else { + reply = -EIO; + debug(1, "Error -- ALSA delay(): bad state: %d.", snd_pcm_state(alsa_handle)); + } + if ((derr = snd_pcm_prepare(alsa_handle))) { + snd_pcm_recover(alsa_handle, derr, 1); + debug(1, "Error preparing after delay error: \"%s\".", snd_strerror(derr)); + } } } debug_mutex_unlock(&alsa_mutex, 3); + pthread_cleanup_pop(0); // here, occasionally pretend there's a problem with pcm_get_delay() // if ((random() % 100000) < 3) // keep it pretty rare // reply = -EPERM; // pretend something bad has happened @@ -851,11 +893,22 @@ int delay(long *the_delay) { } } -static void play(void *buf, int samples) { +int get_rate_information(uint64_t *elapsed_time, uint64_t *frames_played) { + if (measurement_data_is_valid) { + *elapsed_time = measurement_time - measurement_start_time; + *frames_played = frames_played_at_measurement_time - frames_played_at_measurement_start_time; + } else { + *elapsed_time = 0; + *frames_played = 0; + } + return 0; +} + +static int play(void *buf, int samples) { // debug(3,"audio_alsa play called."); int ret = 0; if (alsa_handle == NULL) { - debug_mutex_lock(&alsa_mutex, 10000, 1); + pthread_cleanup_debug_mutex_lock(&alsa_mutex, 10000, 1); ret = open_alsa_device(); if (ret == 0) { if (audio_alsa.volume) @@ -863,20 +916,23 @@ static void play(void *buf, int samples) { if (audio_alsa.mute) do_mute(0); } + debug_mutex_unlock(&alsa_mutex, 3); + pthread_cleanup_pop(0); // release the mutex } if (ret == 0) { - debug_mutex_lock(&alsa_mutex, 10000, 1); + pthread_cleanup_debug_mutex_lock(&alsa_mutex, 10000, 1); // snd_pcm_sframes_t current_delay = 0; - int err; + int err, err2; if (snd_pcm_state(alsa_handle) == SND_PCM_STATE_XRUN) { if ((err = snd_pcm_prepare(alsa_handle))) { snd_pcm_recover(alsa_handle, err, 1); debug(1, "Error preparing after underrun: \"%s\".", snd_strerror(err)); } - } - if ((snd_pcm_state(alsa_handle) == SND_PCM_STATE_PREPARED) || - (snd_pcm_state(alsa_handle) == SND_PCM_STATE_RUNNING)) { + frame_index = 0; // we'll be starting over + measurement_data_is_valid = 0; + } else if ((snd_pcm_state(alsa_handle) == SND_PCM_STATE_PREPARED) || + (snd_pcm_state(alsa_handle) == SND_PCM_STATE_RUNNING)) { if (buf == NULL) debug(1, "NULL buffer passed to pcm_writei -- skipping it"); if (samples == 0) @@ -884,10 +940,41 @@ static void play(void *buf, int samples) { if ((samples != 0) && (buf != NULL)) { err = alsa_pcm_write(alsa_handle, buf, samples); if (err < 0) { + frame_index = 0; + measurement_data_is_valid = 0; debug(1, "Error %d writing %d samples in play(): \"%s\".", err, samples, snd_strerror(err)); snd_pcm_recover(alsa_handle, err, 1); } + if (frame_index == 0) { + frames_sent_for_playing = samples; + } else { + frames_sent_for_playing += samples; + } + const uint64_t start_measurement_from_this_frame = + (2 * 44100) / 352; // two seconds of frames… + frame_index++; + if ((frame_index == start_measurement_from_this_frame) || (frame_index % 32 == 0)) { + long fl = 0; + err2 = snd_pcm_delay(alsa_handle, &fl); + if (err2 != 0) { + frame_index = 0; + measurement_data_is_valid = 0; + debug(1, "Error %d in delay(): \"%s\". Delay reported is %d frames.", err2, + snd_strerror(err2), fl); + snd_pcm_recover(alsa_handle, err2, 1); + } + uint64_t tf = get_absolute_time_in_fp(); + frames_played_at_measurement_time = frames_sent_for_playing - fl; + if (frame_index == start_measurement_from_this_frame) { + frames_played_at_measurement_start_time = frames_played_at_measurement_time; + measurement_start_time = tf; + } else { + frames_played_at_measurement_time = frames_played_at_measurement_time; + measurement_time = tf; + measurement_data_is_valid = 1; + } + } } } else { debug(1, "Error -- ALSA device in incorrect state (%d) for play.", @@ -896,16 +983,21 @@ static void play(void *buf, int samples) { snd_pcm_recover(alsa_handle, err, 1); debug(1, "Error preparing after play error: \"%s\".", snd_strerror(err)); } + frame_index = 0; + measurement_data_is_valid = 0; } debug_mutex_unlock(&alsa_mutex, 3); + pthread_cleanup_pop(0); // release the mutex } + return ret; } static void flush(void) { // debug(2,"audio_alsa flush called."); - debug_mutex_lock(&alsa_mutex, 10000, 1); + pthread_cleanup_debug_mutex_lock(&alsa_mutex, 10000, 1); int derr; do_mute(1); + if (alsa_handle) { if ((derr = snd_pcm_drop(alsa_handle))) @@ -917,10 +1009,12 @@ static void flush(void) { // flush also closes the device if ((derr = snd_pcm_close(alsa_handle))) debug(1, "Error %d (\"%s\") closing the output device.", derr, snd_strerror(derr)); - + frame_index = 0; + measurement_data_is_valid = 0; alsa_handle = NULL; } debug_mutex_unlock(&alsa_mutex, 3); + pthread_cleanup_pop(0); // release the mutex } static void stop(void) { @@ -941,7 +1035,7 @@ static void parameters(audio_parameters *info) { void do_volume(double vol) { // caller is assumed to have the alsa_mutex when using this function debug(3, "Setting volume db to %f.", vol); set_volume = vol; - if (volume_set_request && open_mixer()) { + if (volume_set_request && (open_mixer() == 1)) { if (has_softvol) { if (ctl && elem_id) { snd_ctl_elem_value_t *value; @@ -973,10 +1067,11 @@ void do_volume(double vol) { // caller is assumed to have the alsa_mutex when us } void volume(double vol) { - debug_mutex_lock(&alsa_mutex, 1000, 1); + pthread_cleanup_debug_mutex_lock(&alsa_mutex, 1000, 1); volume_set_request = 1; // an external request has been made to set the volume do_volume(vol); debug_mutex_unlock(&alsa_mutex, 3); + pthread_cleanup_pop(0); // release the mutex } /* @@ -999,11 +1094,12 @@ static void linear_volume(double vol) { static void mute(int mute_state_requested) { // debug(1,"External Mute Request: %d",mute_state_requested); - debug_mutex_lock(&alsa_mutex, 10000, 1); + pthread_cleanup_debug_mutex_lock(&alsa_mutex, 10000, 1); mute_request_pending = 1; overriding_mute_state_requested = mute_state_requested; do_mute(mute_state_requested); debug_mutex_unlock(&alsa_mutex, 3); + pthread_cleanup_pop(0); // release the mutex } void do_mute(int mute_state_requested) { @@ -1024,7 +1120,7 @@ void do_mute(int mute_state_requested) { if (config.alsa_use_hardware_mute == 1) { if (mute_request_pending == 0) local_mute_state_requested = mute_state_requested; - if (open_mixer()) { + if (open_mixer() == 1) { if (local_mute_state_requested) { // debug(1,"Playback Switch mute actually done"); if (snd_mixer_selem_has_playback_switch(alsa_mix_elem)) @@ -121,7 +121,7 @@ static void deinit(void) { static void start(__attribute__((unused)) int sample_rate, __attribute__((unused)) int sample_format) {} -static void play(void *buf, int samples) { ao_play(dev, buf, samples * 4); } +static int play(void *buf, int samples) { return ao_play(dev, buf, samples * 4); } static void stop(void) {} diff --git a/audio_dummy.c b/audio_dummy.c index 5f55bab..3f30cb4 100644 --- a/audio_dummy.c +++ b/audio_dummy.c @@ -48,7 +48,9 @@ static void start(int sample_rate, __attribute__((unused)) int sample_format) { debug(1, "dummy audio output started at Fs=%d Hz\n", sample_rate); } -static void play(__attribute__((unused)) void *buf, __attribute__((unused)) int samples) {} +static int play(__attribute__((unused)) void *buf, __attribute__((unused)) int samples) { + return 0; +} static void stop(void) { debug(1, "dummy audio stopped\n"); } @@ -198,7 +198,7 @@ static void start(__attribute__((unused)) int sample_rate, pa_threaded_mainloop_unlock(mainloop); } -static void play(void *buf, int samples) { +static int play(void *buf, int samples) { // debug(1,"pa_play of %d samples.",samples); // copy the samples into the queue size_t bytes_to_transfer = samples * 2 * 2; @@ -224,6 +224,7 @@ static void play(void *buf, int samples) { pa_stream_cork(stream, 0, stream_success_cb, mainloop); pa_threaded_mainloop_unlock(mainloop); } + return 0; } int pa_delay(long *the_delay) { diff --git a/audio_pipe.c b/audio_pipe.c index c080277..057a975 100644 --- a/audio_pipe.c +++ b/audio_pipe.c @@ -54,7 +54,7 @@ static void start(__attribute__((unused)) int sample_rate, } } -static void play(void *buf, int samples) { +static int play(void *buf, int samples) { // if the file is not open, try to open it. char errorstring[1024]; if (fd == -1) { @@ -73,6 +73,7 @@ static void play(void *buf, int samples) { warn("Error %d opening the pipe named \"%s\": \"%s\".", errno, pipename, errorstring); warned = 1; } + return warned; } static void stop(void) { diff --git a/audio_sndio.c b/audio_sndio.c index d9cef65..203a7db 100644 --- a/audio_sndio.c +++ b/audio_sndio.c @@ -33,7 +33,7 @@ static int init(int, char **); static void onmove_cb(void *, int); static void deinit(void); static void start(int, int); -static void play(void *, int); +static int play(void *, int); static void stop(void); static void onmove_cb(void *, int); static int delay(long *); @@ -222,12 +222,13 @@ static void start(__attribute__((unused)) int sample_rate, pthread_mutex_unlock(&sndio_mutex); } -static void play(void *buf, int frames) { +static int play(void *buf, int frames) { if (frames > 0) { pthread_mutex_lock(&sndio_mutex); written += sio_write(hdl, buf, frames * framesize); pthread_mutex_unlock(&sndio_mutex); } + return 0; } static void stop() { diff --git a/audio_soundio.c b/audio_soundio.c index c46e00d..2fed5ee 100644 --- a/audio_soundio.c +++ b/audio_soundio.c @@ -167,7 +167,7 @@ static void start(int sample_rate, int sample_format) { debug(1, "libsoundio output started\n"); } -static void play(void *buf, int samples) { +static int play(void *buf, int samples) { // int err; int free_bytes = soundio_ring_buffer_free_count(ring_buffer); int written_bytes = 0; @@ -186,6 +186,7 @@ static void play(void *buf, int samples) { soundio_ring_buffer_advance_write_ptr(ring_buffer, write_bytes); debug(3, "[<<---] Written to buffer : %d\n", written_bytes); } + return 0; } static void parameters(audio_parameters *info) { diff --git a/audio_stdout.c b/audio_stdout.c index 3e27ad1..72f10c8 100644 --- a/audio_stdout.c +++ b/audio_stdout.c @@ -43,7 +43,7 @@ static void start(__attribute__((unused)) int sample_rate, fd = STDOUT_FILENO; } -static void play(void *buf, int samples) { +static int play(void *buf, int samples) { char errorstring[1024]; int warned = 0; int rc = write(fd, buf, samples * 4); @@ -52,6 +52,7 @@ static void play(void *buf, int samples) { warn("Error %d writing to stdout: \"%s\".", errno, errorstring); warned = 1; } + return rc; } static void stop(void) { @@ -120,7 +120,6 @@ void die(const char *format, ...) { daemon_log(LOG_EMERG, "% 20.9f|*fatal error: %s", tss, s); else daemon_log(LOG_EMERG, "fatal error: %s", s); - shairport_shutdown(); exit(1); } @@ -178,11 +177,26 @@ void debug(int level, const char *format, ...) { void inform(const char *format, ...) { char s[1024]; s[0] = 0; + uint64_t time_now = get_absolute_time_in_fp(); + uint64_t time_since_start = time_now - fp_time_at_startup; + uint64_t time_since_last_debug_message = time_now - fp_time_at_last_debug_message; + fp_time_at_last_debug_message = time_now; + uint64_t divisor = (uint64_t)1 << 32; + double tss = 1.0 * time_since_start / divisor; + double tsl = 1.0 * time_since_last_debug_message / divisor; va_list args; va_start(args, format); vsnprintf(s, sizeof(s), format, args); va_end(args); - daemon_log(LOG_INFO, "%s", s); + + if ((debuglev) && (config.debugger_show_elapsed_time) && (config.debugger_show_relative_time)) + daemon_log(LOG_INFO, "|% 20.9f|% 20.9f|%s", tss, tsl, s); + else if ((debuglev) && (config.debugger_show_relative_time)) + daemon_log(LOG_INFO, "% 20.9f|%s", tsl, s); + else if ((debuglev) && (config.debugger_show_elapsed_time)) + daemon_log(LOG_INFO, "% 20.9f|%s", tss, s); + else + daemon_log(LOG_INFO, "%s", s); } // The following two functions are adapted slightly and with thanks from Jonathan Leffler's sample @@ -385,7 +399,8 @@ uint8_t *base64_dec(char *input, int *outlen) { nread = BIO_read(b64, buf, bufsize); - BIO_free_all(bmem); + // BIO_free_all(bmem); + BIO_free_all(b64); *outlen = nread; return buf; @@ -419,8 +434,7 @@ static char super_secret_key[] = #ifdef HAVE_LIBSSL uint8_t *rsa_apply(uint8_t *input, int inlen, int *outlen, int mode) { - static RSA *rsa = NULL; - + RSA *rsa = NULL; if (!rsa) { BIO *bmem = BIO_new_mem_buf(super_secret_key, -1); rsa = PEM_read_bio_RSAPrivateKey(bmem, NULL, NULL, NULL); @@ -732,19 +746,15 @@ double flat_vol2attn(double vol, long max_db, long min_db) { double vol2attn(double vol, long max_db, long min_db) { -// We use a little coordinate geometry to build a transfer function from the volume passed in to the -// device's dynamic range. -// (See the diagram in the documents folder.) -// The x axis is the "volume in" which will be from -30 to 0. The y axis will be the "volume out" -// which will be from the bottom of the range to the top. -// We build the transfer function from one or more lines. We characterise each line with two -// numbers: -// the first is where on x the line starts when y=0 (x can be from 0 to -30); the second is where on -// y the line stops when when x is -30. -// thus, if the line was characterised as {0,-30}, it would be an identity transfer. -// Assuming, for example, a dynamic range of lv=-60 to hv=0 -// Typically we'll use three lines -- a three order transfer function -// First: {0,30} giving a gentle slope -- the 30 comes from half the dynamic range +// We use a little coordinate geometry to build a transfer function from the volume passed in to +// the device's dynamic range. (See the diagram in the documents folder.) The x axis is the +// "volume in" which will be from -30 to 0. The y axis will be the "volume out" which will be from +// the bottom of the range to the top. We build the transfer function from one or more lines. We +// characterise each line with two numbers: the first is where on x the line starts when y=0 (x +// can be from 0 to -30); the second is where on y the line stops when when x is -30. thus, if the +// line was characterised as {0,-30}, it would be an identity transfer. Assuming, for example, a +// dynamic range of lv=-60 to hv=0 Typically we'll use three lines -- a three order transfer +// function First: {0,30} giving a gentle slope -- the 30 comes from half the dynamic range // Second: {-5,-30-(lv+30)/2} giving a faster slope from y=0 at x=-12 to y=-42.5 at x=-30 // Third: {-17,lv} giving a fast slope from y=0 at x=-19 to y=-60 at x=-30 @@ -950,8 +960,6 @@ int64_t r64i() { return (ranval(&rx) >> 1); } /* generate an array of 64-bit random numbers */ const int ranarraylength = 1009; // these will be 8-byte numbers. -uint64_t *ranarray; - int ranarraynext; void ranarrayinit() { @@ -1068,13 +1076,14 @@ int sps_pthread_mutex_timedlock(pthread_mutex_t *mutex, useconds_t dally_time, int sps_pthread_mutex_timedlock(pthread_mutex_t *mutex, useconds_t dally_time, const char *debugmessage, int debuglevel) { + // this is not pthread_cancellation safe because is contains a cancellation point useconds_t time_to_wait = dally_time; int r = pthread_mutex_trylock(mutex); while ((r == EBUSY) && (time_to_wait > 0)) { useconds_t st = time_to_wait; if (st > 1000) st = 1000; - sps_nanosleep(0, st * 1000); + sps_nanosleep(0, st * 1000); // this contains a cancellation point time_to_wait -= st; r = pthread_mutex_trylock(mutex); } @@ -1096,6 +1105,8 @@ int sps_pthread_mutex_timedlock(pthread_mutex_t *mutex, useconds_t dally_time, int _debug_mutex_lock(pthread_mutex_t *mutex, useconds_t dally_time, const char *filename, const int line, int debuglevel) { + if (debuglevel > debuglev) + return pthread_mutex_lock(mutex); uint64_t time_at_start = get_absolute_time_in_fp(); char dstring[1000]; memset(dstring, 0, sizeof(dstring)); @@ -1116,6 +1127,8 @@ int _debug_mutex_lock(pthread_mutex_t *mutex, useconds_t dally_time, const char int _debug_mutex_unlock(pthread_mutex_t *mutex, const char *filename, const int line, int debuglevel) { + if (debuglevel > debuglev) + return pthread_mutex_unlock(mutex); char dstring[1000]; char errstr[512]; memset(dstring, 0, sizeof(dstring)); @@ -1128,6 +1141,8 @@ int _debug_mutex_unlock(pthread_mutex_t *mutex, const char *filename, const int return r; } +void pthread_cleanup_debug_mutex_unlock(void *arg) { pthread_mutex_unlock((pthread_mutex_t *)arg); } + char *get_version_string() { char *version_string = malloc(200); if (version_string) { @@ -2,6 +2,7 @@ #define _COMMON_H #include <libconfig.h> +#include <pthread.h> #include <signal.h> #include <stdint.h> #include <sys/socket.h> @@ -79,6 +80,7 @@ typedef struct { char *password; char *service_name; // the name for the shairport service, e.g. "Shairport Sync Version %v running // on host %h" + #ifdef CONFIG_PA char *pa_application_name; // the name under which Shairport Sync shows up as an "Application" in // the Sound Preferences in most desktop Linuxes. @@ -227,6 +229,7 @@ void r64init(uint64_t seed); uint64_t r64u(); int64_t r64i(); +uint64_t *ranarray; void r64arrayinit(); uint64_t ranarray64u(); int64_t ranarray64i(); @@ -263,6 +266,9 @@ uint64_t fp_time_at_startup, fp_time_at_last_debug_message; long endianness; uint32_t uatoi(const char *nptr); +// this is for allowing us to cancel the whole program +pthread_t main_thread_id; + shairport_cfg config; config_t config_file_stuff; @@ -292,6 +298,12 @@ int _debug_mutex_unlock(pthread_mutex_t *mutex, const char *filename, const int #define debug_mutex_unlock(mu, d) _debug_mutex_unlock(mu, __FILE__, __LINE__, d) +void pthread_cleanup_debug_mutex_unlock(void *arg); + +#define pthread_cleanup_debug_mutex_lock(mu, t, d) \ + if (_debug_mutex_lock(mu, t, __FILE__, __LINE__, d) == 0) \ + pthread_cleanup_push(pthread_cleanup_debug_mutex_unlock, (void *)mu) + char *get_version_string(); // mallocs a string space -- remember to free it afterwards void sps_nanosleep(const time_t sec, @@ -119,6 +119,35 @@ static pthread_mutex_t dacp_conversation_lock; static pthread_mutex_t dacp_server_information_lock; static pthread_cond_t dacp_server_information_cv = PTHREAD_COND_INITIALIZER; +void addrinfo_cleanup(void *arg) { + // debug(1, "addrinfo cleanup called."); + struct addrinfo **info = (struct addrinfo **)arg; + freeaddrinfo(*info); +} + +void mutex_lock_cleanup(void *arg) { + // debug(1, "mutex lock cleanup called."); + pthread_mutex_t *m = (pthread_mutex_t *)arg; + pthread_mutex_unlock(m); +} + +void connect_cleanup(void *arg) { + // debug(1, "connect cleanup called."); + int *fd = (int *)arg; + close(*fd); +} + +void http_cleanup(void *arg) { + // debug(1, "http cleanup called."); + struct http_roundtripper *rt = (struct http_roundtripper *)arg; + http_free(rt); +} + +void malloc_cleanup(void *arg) { + // debug(1, "malloc cleanup called."); + free(arg); +} + int dacp_send_command(const char *command, char **body, ssize_t *bodysize) { // will malloc space for the body or set it to NULL -- the caller should free it. @@ -163,12 +192,13 @@ int dacp_send_command(const char *command, char **body, ssize_t *bodysize) { // debug(1,"Error %d \"%s\" at getaddrinfo.",ires,gai_strerror(ires)); response.code = 498; // Bad Address information for the DACP server } else { - + pthread_cleanup_push(addrinfo_cleanup, (void *)&res); // only do this one at a time -- not sure it is necessary, but better safe than sorry int mutex_reply = sps_pthread_mutex_timedlock(&dacp_conversation_lock, 2000000, command, 1); // int mutex_reply = pthread_mutex_lock(&dacp_conversation_lock); if (mutex_reply == 0) { + pthread_cleanup_push(mutex_lock_cleanup, (void *)&dacp_conversation_lock); // debug(1,"dacp_conversation_lock acquired for command \"%s\".",command); // make a socket: @@ -192,6 +222,7 @@ int dacp_send_command(const char *command, char **body, ssize_t *bodysize) { debug(3, "DACP connect failed with errno %d.", errno); response.code = 496; // Can't connect to the DACP server } else { + pthread_cleanup_push(connect_cleanup, (void *)&sockfd); // debug(1,"DACP connect succeeded."); snprintf(message, sizeof(message), @@ -210,9 +241,11 @@ int dacp_send_command(const char *command, char **body, ssize_t *bodysize) { response.body = malloc(2048); // it can resize this if necessary response.malloced_size = 2048; + pthread_cleanup_push(malloc_cleanup, response.body); struct http_roundtripper rt; http_init(&rt, responseFuncs, &response); + pthread_cleanup_push(http_cleanup, &rt); int needmore = 1; int looperror = 0; @@ -251,13 +284,18 @@ int dacp_send_command(const char *command, char **body, ssize_t *bodysize) { response.size = 0; } // debug(1,"Size of response body is %d",response.size); - http_free(&rt); + pthread_cleanup_pop(1); // this should call http_cleanup + // http_free(&rt); + pthread_cleanup_pop( + 0); // this should *not* free the malloced buffer -- just pop the malloc cleanup } + pthread_cleanup_pop(1); // this should close the socket + // close(sockfd); + // debug(1,"DACP socket closed."); } - close(sockfd); - // debug(1,"DACP socket closed."); } - pthread_mutex_unlock(&dacp_conversation_lock); + pthread_cleanup_pop(1); // this should unlock the dacp_conversation_lock); + // pthread_mutex_unlock(&dacp_conversation_lock); // debug(1,"Sent command\"%s\" with a response body of size %d.",command,response.size); // debug(1,"dacp_conversation_lock released."); } else { @@ -266,6 +304,8 @@ int dacp_send_command(const char *command, char **body, ssize_t *bodysize) { command); response.code = 494; // This client is already busy } + pthread_cleanup_pop(1); // this should free the addrinfo + // freeaddrinfo(res); } *body = response.body; *bodysize = response.size; @@ -290,12 +330,10 @@ void relinquish_dacp_server_information(rtsp_conn_info *conn) { // as the conn's connection number // this is to signify that the player has stopped, but only if another thread (with a different // index) hasn't already taken over the dacp service - sps_pthread_mutex_timedlock( - &dacp_server_information_lock, 500000, - "set_dacp_server_information couldn't get DACP server information lock in 0.5 second!.", 2); + debug_mutex_lock(&dacp_server_information_lock, 500000, 2); if (dacp_server.players_connection_thread_index == conn->connection_number) dacp_server.players_connection_thread_index = 0; - pthread_mutex_unlock(&dacp_server_information_lock); + debug_mutex_unlock(&dacp_server_information_lock, 3); } // this will be running on the thread of its caller, not of the conversation thread... @@ -305,9 +343,7 @@ void relinquish_dacp_server_information(rtsp_conn_info *conn) { // Thus, we can keep the DACP port that might have previously been discovered void set_dacp_server_information(rtsp_conn_info *conn) { // debug(1, "set_dacp_server_information"); - sps_pthread_mutex_timedlock( - &dacp_server_information_lock, 500000, - "set_dacp_server_information couldn't get DACP server information lock in 0.5 second!.", 2); + debug_mutex_lock(&dacp_server_information_lock, 500000, 2); dacp_server.players_connection_thread_index = conn->connection_number; if ((conn->dacp_id == NULL) || (strcmp(conn->dacp_id, dacp_server.dacp_id) != 0)) { @@ -356,23 +392,20 @@ void set_dacp_server_information(rtsp_conn_info *conn) { debug(2, "set_dacp_server_information set active-remote id to %" PRIu32 ".", dacp_server.active_remote_id); pthread_cond_signal(&dacp_server_information_cv); - pthread_mutex_unlock(&dacp_server_information_lock); + debug_mutex_unlock(&dacp_server_information_lock, 3); } void dacp_monitor_port_update_callback(char *dacp_id, uint16_t port) { debug(2, "dacp_monitor_port_update_callback with Remote ID \"%s\" and port number %d.", dacp_id, port); - sps_pthread_mutex_timedlock( - &dacp_server_information_lock, 500000, - "dacp_monitor_port_update_callback couldn't get DACP server information lock in 0.5 second!.", - 2); + debug_mutex_lock(&dacp_server_information_lock, 500000, 2); if (strcmp(dacp_id, dacp_server.dacp_id) == 0) { dacp_server.port = port; if (port == 0) dacp_server.scan_enable = 0; else { dacp_server.scan_enable = 1; - debug(2, "dacp_monitor_port_update_callback enables scan"); + // debug(2, "dacp_monitor_port_update_callback enables scan"); } // metadata_hub_modify_prolog(); // int ch = metadata_store.dacp_server_active != dacp_server.scan_enable; @@ -382,8 +415,14 @@ void dacp_monitor_port_update_callback(char *dacp_id, uint16_t port) { debug(1, "dacp port monitor reporting on an out-of-use remote."); } pthread_cond_signal(&dacp_server_information_cv); + debug_mutex_unlock(&dacp_server_information_lock, 3); +} + +void dacp_monitor_thread_code_cleanup(__attribute__((unused)) void *arg) { + // debug(1, "dacp_monitor_thread_code_cleanup called."); pthread_mutex_unlock(&dacp_server_information_lock); } + void *dacp_monitor_thread_code(__attribute__((unused)) void *na) { int scan_index = 0; // char server_reply[10000]; @@ -397,6 +436,9 @@ void *dacp_monitor_thread_code(__attribute__((unused)) void *na) { sps_pthread_mutex_timedlock( &dacp_server_information_lock, 500000, "dacp_monitor_thread_code couldn't get DACP server information lock in 0.5 second!.", 2); + int32_t the_volume; + + pthread_cleanup_push(dacp_monitor_thread_code_cleanup, NULL); if (dacp_server.scan_enable == 0) { metadata_hub_modify_prolog(); int ch = (metadata_store.dacp_server_active != 0) || @@ -416,7 +458,6 @@ void *dacp_monitor_thread_code(__attribute__((unused)) void *na) { idle_scan_count = 0; } scan_index++; - int32_t the_volume; result = dacp_get_volume(&the_volume); // just want the http code if ((result == 496) || (result == 403) || (result == 501)) { @@ -438,7 +479,9 @@ void *dacp_monitor_thread_code(__attribute__((unused)) void *na) { debug(1, "DACP server status scanning stopped."); dacp_server.scan_enable = 0; } - pthread_mutex_unlock(&dacp_server_information_lock); + pthread_cleanup_pop(1); + + // pthread_mutex_unlock(&dacp_server_information_lock); // debug(1, "DACP Server ID \"%u\" at \"%s:%u\", scan %d.", dacp_server.active_remote_id, // dacp_server.ip_string, dacp_server.port, scan_index); @@ -764,7 +807,7 @@ void *dacp_monitor_thread_code(__attribute__((unused)) void *na) { sleep(config.scan_interval_when_inactive); } } - debug(1, "DACP monitor thread exiting."); + debug(1, "DACP monitor thread exiting -- should never happen."); pthread_exit(NULL); } @@ -816,6 +859,14 @@ void dacp_monitor_start() { pthread_create(&dacp_monitor_thread, NULL, dacp_monitor_thread_code, NULL); } +void dacp_monitor_stop() { + debug(1, "dacp_monitor_stop"); + pthread_cancel(dacp_monitor_thread); + pthread_join(dacp_monitor_thread, NULL); + pthread_mutex_destroy(&dacp_server_information_lock); + pthread_mutex_destroy(&dacp_conversation_lock); +} + uint32_t dacp_tlv_crawl(char **p, int32_t *length) { char typecode[5]; memcpy(typecode, *p, 4); @@ -914,12 +965,13 @@ int dacp_get_speaker_list(dacp_spkr_stuff *speaker_info, int max_size_of_array, sp -= item_size; le -= 8; speaker_index++; - if (speaker_index == max_size_of_array) + if (speaker_index == max_size_of_array) { return 413; // Payload Too Large -- too many speakers + } speaker_info[speaker_index].active = 0; speaker_info[speaker_index].speaker_number = 0; speaker_info[speaker_index].volume = 0; - speaker_info[speaker_index].name = NULL; + speaker_info[speaker_index].name[0] = '\0'; } else { le -= item_size + 8; char *t; @@ -929,8 +981,10 @@ int dacp_get_speaker_list(dacp_spkr_stuff *speaker_info, int max_size_of_array, switch (type) { case 'minm': t = sp - item_size; - speaker_info[speaker_index].name = strndup(t, item_size); - // debug(1," \"%s\"",speaker_info[speaker_index].name); + strncpy((char *)&speaker_info[speaker_index].name, t, + sizeof(speaker_info[speaker_index].name)); + speaker_info[speaker_index].name[sizeof(speaker_info[speaker_index].name) - 1] = + '\0'; // just in case break; case 'cmvo': t = sp - item_size; @@ -10,10 +10,11 @@ typedef struct dacp_speaker_stuff { int64_t speaker_number; int active; int32_t volume; - char *name; // this is really just for debugging + char name[128]; // this is really just for debugging } dacp_spkr_stuff; void dacp_monitor_start(); +void dacp_monitor_stop(); uint32_t dacp_tlv_crawl( char **p, @@ -26,6 +27,9 @@ int dacp_get_speaker_list(dacp_spkr_stuff *speaker_array, int max_size_of_array, void set_dacp_server_information(rtsp_conn_info *conn); // tell the DACP conversation thread that // the dacp server information has been set // or changed +void relinquish_dacp_server_information(rtsp_conn_info *conn); // tell the DACP conversation thread + // that the player thread is no + // longer associated with it. void dacp_monitor_port_update_callback( char *dacp_id, uint16_t port); // a callback to say the port is no longer in use int send_simple_dacp_command(const char *command); diff --git a/dbus-service.c b/dbus-service.c index ebcd337..90e68a0 100644 --- a/dbus-service.c +++ b/dbus-service.c @@ -19,6 +19,8 @@ ShairportSyncDiagnostics *shairportSyncDiagnosticsSkeleton = NULL; ShairportSyncRemoteControl *shairportSyncRemoteControlSkeleton = NULL; ShairportSyncAdvancedRemoteControl *shairportSyncAdvancedRemoteControlSkeleton = NULL; +guint ownerID = 0; + void dbus_metadata_watcher(struct metadata_bundle *argc, __attribute__((unused)) void *userdata) { char response[100]; const char *th; @@ -43,16 +45,15 @@ void dbus_metadata_watcher(struct metadata_bundle *argc, __attribute__((unused)) shairport_sync_advanced_remote_control_set_available(shairportSyncAdvancedRemoteControlSkeleton, FALSE); } - + if (argc->progress_string) { - // debug(1, "Check progress string"); - th = shairport_sync_remote_control_get_progress_string( - shairportSyncRemoteControlSkeleton); - if ((th == NULL) || (strcasecmp(th, argc->progress_string) != 0)) { - // debug(1, "Progress string should be changed"); - shairport_sync_remote_control_set_progress_string( - shairportSyncRemoteControlSkeleton, argc->progress_string); - } + // debug(1, "Check progress string"); + th = shairport_sync_remote_control_get_progress_string(shairportSyncRemoteControlSkeleton); + if ((th == NULL) || (strcasecmp(th, argc->progress_string) != 0)) { + // debug(1, "Progress string should be changed"); + shairport_sync_remote_control_set_progress_string(shairportSyncRemoteControlSkeleton, + argc->progress_string); + } } switch (argc->player_state) { @@ -162,7 +163,8 @@ void dbus_metadata_watcher(struct metadata_bundle *argc, __attribute__((unused)) if ((argc->track_metadata) && (argc->track_metadata->item_id)) { char trackidstring[128]; // debug(1, "Set ID using mper ID: \"%u\".",argc->item_id); - snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", argc->track_metadata->item_id); + snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", + argc->track_metadata->item_id); GVariant *trackid = g_variant_new("o", trackidstring); g_variant_builder_add(dict_builder, "{sv}", "mpris:trackid", trackid); } @@ -544,6 +546,18 @@ gboolean notify_loop_status_callback(ShairportSyncAdvancedRemoteControl *skeleto return TRUE; } +static gboolean on_handle_quit(ShairportSync *skeleton, GDBusMethodInvocation *invocation, + __attribute__((unused)) const gchar *command, + __attribute__((unused)) gpointer user_data) { + debug(1, "quit requested (native interface)"); + if (main_thread_id) + debug(1, "Cancelling main thread results in %d.", pthread_cancel(main_thread_id)); + else + debug(1, "Main thread ID is NULL."); + shairport_sync_complete_quit(skeleton, invocation); + return TRUE; +} + static gboolean on_handle_remote_command(ShairportSync *skeleton, GDBusMethodInvocation *invocation, const gchar *command, __attribute__((unused)) gpointer user_data) { @@ -589,6 +603,8 @@ static void on_dbus_name_acquired(GDBusConnection *connection, const gchar *name g_signal_connect(shairportSyncSkeleton, "notify::loudness-threshold", G_CALLBACK(notify_loudness_threshold_callback), NULL); + g_signal_connect(shairportSyncSkeleton, "handle-quit", G_CALLBACK(on_handle_quit), NULL); + g_signal_connect(shairportSyncSkeleton, "handle-remote-command", G_CALLBACK(on_handle_remote_command), NULL); @@ -760,7 +776,15 @@ int start_dbus_service() { dbus_bus_type = G_BUS_TYPE_SESSION; // debug(1, "Looking for a Shairport Sync native D-Bus interface \"org.gnome.ShairportSync\" on // the %s bus.",(config.dbus_service_bus_type == DBT_session) ? "session" : "system"); - g_bus_own_name(dbus_bus_type, "org.gnome.ShairportSync", G_BUS_NAME_OWNER_FLAGS_NONE, NULL, - on_dbus_name_acquired, on_dbus_name_lost, NULL, NULL); + ownerID = g_bus_own_name(dbus_bus_type, "org.gnome.ShairportSync", G_BUS_NAME_OWNER_FLAGS_NONE, + NULL, on_dbus_name_acquired, on_dbus_name_lost, NULL, NULL); return 0; // this is just to quieten a compiler warning } + +void stop_dbus_service() { + debug(1, "stopping dbus service"); + if (ownerID) + g_bus_unown_name(ownerID); + else + debug(1, "Zero OwnerID for \"org.gnome.ShairportSync\"."); +} diff --git a/dbus-service.h b/dbus-service.h index ea6e976..a00d06e 100644 --- a/dbus-service.h +++ b/dbus-service.h @@ -6,5 +6,6 @@ ShairportSync *shairportSyncSkeleton; int start_dbus_service(); +void stop_dbus_service(); #endif /* #ifndef DBUS_SERVICE_H */ diff --git a/mdns_avahi.c b/mdns_avahi.c index 28aa9ec..d9eb4b0 100644 --- a/mdns_avahi.c +++ b/mdns_avahi.c @@ -84,33 +84,33 @@ static void resolve_callback(AvahiServiceResolver *r, AVAHI_GCC_UNUSED AvahiIfIn /* Called whenever a service has been resolved successfully or timed out */ switch (event) { - case AVAHI_RESOLVER_FAILURE: - debug(2, "(Resolver) Failed to resolve service '%s' of type '%s' in domain '%s': %s.", name, - type, domain, avahi_strerror(avahi_client_errno(avahi_service_resolver_get_client(r)))); - break; - case AVAHI_RESOLVER_FOUND: { - // char a[AVAHI_ADDRESS_STR_MAX], *t; - // debug(1, "Resolve callback: Service '%s' of type '%s' in domain '%s':", name, type, domain); - char *dacpid = strstr(name, "iTunes_Ctrl_"); - if (dacpid) { - dacpid += strlen("iTunes_Ctrl_"); - if (strcmp(dacpid, dbs->dacp_id) == 0) { - debug(3, "Client's DACP port: %u.", port); - #ifdef HAVE_DACP_CLIENT - dacp_monitor_port_update_callback(dacpid, port); - #endif - #ifdef CONFIG_METADATA - char portstring[20]; - memset(portstring, 0, sizeof(portstring)); - snprintf(portstring, sizeof(portstring), "%u", port); - send_ssnc_metadata('dapo', strdup(portstring), strlen(portstring), 0); - #endif - } - } else { - debug(1, "Resolve callback: Can't see a DACP string in a DACP Record!"); + case AVAHI_RESOLVER_FAILURE: + debug(2, "(Resolver) Failed to resolve service '%s' of type '%s' in domain '%s': %s.", name, + type, domain, avahi_strerror(avahi_client_errno(avahi_service_resolver_get_client(r)))); + break; + case AVAHI_RESOLVER_FOUND: { + // char a[AVAHI_ADDRESS_STR_MAX], *t; + // debug(1, "Resolve callback: Service '%s' of type '%s' in domain '%s':", name, type, domain); + char *dacpid = strstr(name, "iTunes_Ctrl_"); + if (dacpid) { + dacpid += strlen("iTunes_Ctrl_"); + if (strcmp(dacpid, dbs->dacp_id) == 0) { + debug(3, "Client's DACP port: %u.", port); +#ifdef HAVE_DACP_CLIENT + dacp_monitor_port_update_callback(dacpid, port); +#endif +#ifdef CONFIG_METADATA + char portstring[20]; + memset(portstring, 0, sizeof(portstring)); + snprintf(portstring, sizeof(portstring), "%u", port); + send_ssnc_metadata('dapo', strdup(portstring), strlen(portstring), 0); +#endif } + } else { + debug(1, "Resolve callback: Can't see a DACP string in a DACP Record!"); } } + } // debug(1,"service resolver freed by resolve_callback"); check_avahi_response(1, avahi_service_resolver_free(r)); } @@ -164,47 +164,47 @@ static void register_service(AvahiClient *c); static void egroup_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, AVAHI_GCC_UNUSED void *userdata) { switch (state) { - case AVAHI_ENTRY_GROUP_ESTABLISHED: - /* The entry group has been established successfully */ - debug(1, "avahi: service '%s' successfully added.", service_name); - break; - - case AVAHI_ENTRY_GROUP_COLLISION: { - char *n; - - /* A service name collision with a remote service - * happened. Let's pick a new name */ - debug(1, "avahi name collision -- look for another"); - n = avahi_alternative_service_name(service_name); - if (service_name) - avahi_free(service_name); - else - debug(1, "avahi attempt to free a NULL service name"); - service_name = n; + case AVAHI_ENTRY_GROUP_ESTABLISHED: + /* The entry group has been established successfully */ + debug(1, "avahi: service '%s' successfully added.", service_name); + break; - debug(2, "avahi: service name collision, renaming service to '%s'", service_name); + case AVAHI_ENTRY_GROUP_COLLISION: { + char *n; - /* And recreate the services */ - register_service(avahi_entry_group_get_client(g)); - break; - } + /* A service name collision with a remote service + * happened. Let's pick a new name */ + debug(1, "avahi name collision -- look for another"); + n = avahi_alternative_service_name(service_name); + if (service_name) + avahi_free(service_name); + else + debug(1, "avahi attempt to free a NULL service name"); + service_name = n; + + debug(2, "avahi: service name collision, renaming service to '%s'", service_name); + + /* And recreate the services */ + register_service(avahi_entry_group_get_client(g)); + break; + } - case AVAHI_ENTRY_GROUP_FAILURE: - debug(1, "avahi: entry group failure: %s", - avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g)))); - break; + case AVAHI_ENTRY_GROUP_FAILURE: + debug(1, "avahi: entry group failure: %s", + avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g)))); + break; - case AVAHI_ENTRY_GROUP_UNCOMMITED: - debug(2, "avahi: service '%s' group is not yet committed.", service_name); - break; + case AVAHI_ENTRY_GROUP_UNCOMMITED: + debug(2, "avahi: service '%s' group is not yet committed.", service_name); + break; - case AVAHI_ENTRY_GROUP_REGISTERING: - debug(2, "avahi: service '%s' group is registering.", service_name); - break; + case AVAHI_ENTRY_GROUP_REGISTERING: + debug(2, "avahi: service '%s' group is registering.", service_name); + break; - default: - debug(1, "avahi: unhandled egroup state: %d", state); - break; + default: + debug(1, "avahi: unhandled egroup state: %d", state); + break; } } @@ -385,21 +385,27 @@ static int avahi_register(char *srvname, int srvport) { static void avahi_unregister(void) { debug(1, "avahi: avahi_unregister."); if (tpoll) { + debug(1, "avahi: stop the threaded poll."); avahi_threaded_poll_stop(tpoll); if (client) { + debug(1, "avahi: free the client."); avahi_client_free(client); client = NULL; } else { debug(1, "avahi attempting to unregister a NULL client"); } + debug(1, "avahi: free the threaded poll."); avahi_threaded_poll_free(tpoll); tpoll = NULL; + } else { + debug(1, "No avahi threaded poll."); } - if (service_name) + if (service_name) { + debug(1, "avahi: free the service name."); free(service_name); - else + } else debug(1, "avahi attempt to free NULL service name"); service_name = NULL; } @@ -464,21 +470,18 @@ void *avahi_dacp_monitor(char *dacp_id) { } void avahi_dacp_dont_monitor(void *userdata) { - debug(3,"avahi_dacp_dont_monitor"); + debug(3, "avahi_dacp_dont_monitor"); if (userdata) { dacp_browser_struct *dbs = (dacp_browser_struct *)userdata; // stop and dispose of everything - /*if (dbs->service_poll) - avahi_threaded_poll_stop((dbs)->service_poll); - */ if (dbs->service_poll) { - avahi_threaded_poll_stop(dbs->service_poll); + avahi_threaded_poll_stop(dbs->service_poll); avahi_threaded_poll_lock(dbs->service_poll); if (dbs->service_browser) avahi_service_browser_free(dbs->service_browser); if (dbs->service_client) avahi_client_free(dbs->service_client); - avahi_threaded_poll_unlock(dbs->service_poll); + avahi_threaded_poll_unlock(dbs->service_poll); avahi_threaded_poll_free(dbs->service_poll); } free(dbs->dacp_id); @@ -487,7 +490,7 @@ void avahi_dacp_dont_monitor(void *userdata) { } else { debug(1, "Avahi DACP Monitor is not running."); } - debug(3,"avahi_dacp_dont_monitor exit"); + debug(3, "avahi_dacp_dont_monitor exit"); } mdns_backend mdns_avahi = {.name = "avahi", diff --git a/metadata_hub.c b/metadata_hub.c index 9d648fc..3ddbcb4 100644 --- a/metadata_hub.c +++ b/metadata_hub.c @@ -90,12 +90,30 @@ void metadata_hub_release_track_metadata(struct track_metadata_bundle *track_met } } +void metadata_hub_release_track_artwork(void) { + // debug(1,"release track artwork"); + release_char_string(&metadata_store.cover_art_pathname); +} + void metadata_hub_init(void) { // debug(1, "Metadata bundle initialisation."); memset(&metadata_store, 0, sizeof(metadata_store)); track_metadata = NULL; } +void metadata_hub_stop(void) { + debug(1, "metadata_hub_stop."); + metadata_hub_release_track_artwork(); + if (metadata_store.track_metadata) { + metadata_hub_release_track_metadata(metadata_store.track_metadata); + metadata_store.track_metadata = NULL; + } + if (track_metadata) { + metadata_hub_release_track_metadata(track_metadata); + track_metadata = NULL; + } +} + void add_metadata_watcher(metadata_watcher fn, void *userdata) { int i; for (i = 0; i < number_of_watchers; i++) { @@ -108,32 +126,34 @@ void add_metadata_watcher(metadata_watcher fn, void *userdata) { } } -void metadata_hub_modify_prolog(void) { - // always run this before changing an entry or a sequence of entries in the metadata_hub - // debug(1, "locking metadata hub for writing"); - if (pthread_rwlock_trywrlock(&metadata_hub_re_lock) != 0) { - debug(2, "Metadata_hub write lock is already taken -- must wait."); - pthread_rwlock_wrlock(&metadata_hub_re_lock); - debug(2, "Okay -- acquired the metadata_hub write lock."); - } -} - -void metadata_hub_release_track_artwork(void) { - // debug(1,"release track artwork"); - release_char_string(&metadata_store.cover_art_pathname); +void metadata_hub_unlock_hub_mutex_cleanup(__attribute__((unused)) void *arg) { + // debug(1, "metadata_hub_unlock_hub_mutex_cleanup called."); + pthread_rwlock_unlock(&metadata_hub_re_lock); } void run_metadata_watchers(void) { int i; // debug(1, "locking metadata hub for reading"); pthread_rwlock_rdlock(&metadata_hub_re_lock); + pthread_cleanup_push(metadata_hub_unlock_hub_mutex_cleanup, NULL); for (i = 0; i < number_of_watchers; i++) { if (metadata_store.watchers[i]) { metadata_store.watchers[i](&metadata_store, metadata_store.watchers_data[i]); } } // debug(1, "unlocking metadata hub for reading"); - pthread_rwlock_unlock(&metadata_hub_re_lock); + // pthread_rwlock_unlock(&metadata_hub_re_lock); + pthread_cleanup_pop(1); +} + +void metadata_hub_modify_prolog(void) { + // always run this before changing an entry or a sequence of entries in the metadata_hub + // debug(1, "locking metadata hub for writing"); + if (pthread_rwlock_trywrlock(&metadata_hub_re_lock) != 0) { + debug(2, "Metadata_hub write lock is already taken -- must wait."); + pthread_rwlock_unlock(&metadata_hub_re_lock); + debug(2, "Okay -- acquired the metadata_hub write lock."); + } } void metadata_hub_modify_epilog(int modified) { diff --git a/metadata_hub.h b/metadata_hub.h index 66368f9..65e93a2 100644 --- a/metadata_hub.h +++ b/metadata_hub.h @@ -56,7 +56,7 @@ typedef struct metadata_bundle { char *client_ip; // IP number used by the audio source (i.e. the "client"), which is also the DACP // server char *server_ip; // IP number used by Shairport Sync - char *progress_string; // progress string, emitted by the source from time to time + char *progress_string; // progress string, emitted by the source from time to time int player_thread_active; // true if a play thread is running int dacp_server_active; // true if there's a reachable DACP server (assumed to be the Airplay // client) ; false otherwise @@ -92,6 +92,7 @@ struct metadata_bundle metadata_store; void add_metadata_watcher(metadata_watcher fn, void *userdata); void metadata_hub_init(void); +void metadata_hub_stop(void); void metadata_hub_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length); void metadata_hub_reset_track_metadata(void); void metadata_hub_release_track_artwork(void); diff --git a/mpris-service.c b/mpris-service.c index 3d6cf96..668d776 100644 --- a/mpris-service.c +++ b/mpris-service.c @@ -123,7 +123,8 @@ void mpris_metadata_watcher(struct metadata_bundle *argc, __attribute__((unused) } else if ((argc->track_metadata) && (argc->track_metadata->item_id)) { char trackidstring[128]; // debug(1, "Set ID using mper ID: \"%u\".",argc->item_id); - snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", argc->track_metadata->item_id); + snprintf(trackidstring, sizeof(trackidstring), "/org/gnome/ShairportSync/mper_%u", + argc->track_metadata->item_id); GVariant *trackid = g_variant_new("o", trackidstring); g_variant_builder_add(dict_builder, "{sv}", "mpris:trackid", trackid); } @@ -182,6 +183,14 @@ void mpris_metadata_watcher(struct metadata_bundle *argc, __attribute__((unused) // media_player2_player_set_volume(mprisPlayerPlayerSkeleton, metadata_store.speaker_volume); } +static gboolean on_handle_quit(MediaPlayer2 *skeleton, GDBusMethodInvocation *invocation, + __attribute__((unused)) gpointer user_data) { + debug(1, "quit requested (MPRIS interface)."); + pthread_cancel(main_thread_id); + media_player2_complete_quit(skeleton, invocation); + return TRUE; +} + static gboolean on_handle_next(MediaPlayer2Player *skeleton, GDBusMethodInvocation *invocation, __attribute__((unused)) gpointer user_data) { send_simple_dacp_command("nextitem"); @@ -242,7 +251,7 @@ static void on_mpris_name_acquired(GDBusConnection *connection, const gchar *nam media_player2_set_desktop_entry(mprisPlayerSkeleton, "shairport-sync"); media_player2_set_identity(mprisPlayerSkeleton, "Shairport Sync"); - media_player2_set_can_quit(mprisPlayerSkeleton, FALSE); + media_player2_set_can_quit(mprisPlayerSkeleton, TRUE); media_player2_set_can_raise(mprisPlayerSkeleton, FALSE); media_player2_set_has_track_list(mprisPlayerSkeleton, FALSE); media_player2_set_supported_uri_schemes(mprisPlayerSkeleton, empty_string_array); @@ -260,6 +269,8 @@ static void on_mpris_name_acquired(GDBusConnection *connection, const gchar *nam media_player2_player_set_can_seek(mprisPlayerPlayerSkeleton, FALSE); media_player2_player_set_can_control(mprisPlayerPlayerSkeleton, TRUE); + g_signal_connect(mprisPlayerSkeleton, "handle-quit", G_CALLBACK(on_handle_quit), NULL); + g_signal_connect(mprisPlayerPlayerSkeleton, "handle-play", G_CALLBACK(on_handle_play), NULL); g_signal_connect(mprisPlayerPlayerSkeleton, "handle-pause", G_CALLBACK(on_handle_pause), NULL); g_signal_connect(mprisPlayerPlayerSkeleton, "handle-play-pause", G_CALLBACK(on_handle_play_pause), @@ -291,7 +302,7 @@ static void on_mpris_name_lost(__attribute__((unused)) GDBusConnection *connecti // name,(mpris_bus_type==G_BUS_TYPE_SESSION) ? "session" : "system"); pid_t pid = getpid(); char interface_name[256] = ""; - snprintf(interface_name, sizeof(interface_name), "org.mpris.MediaPlayer2.ShairportSync.i%d", pid); + snprintf(interface_name, sizeof(interface_name), "org.mpris.MediaPlayer2.ShairportSync.i%d", pid); GBusType mpris_bus_type = G_BUS_TYPE_SYSTEM; if (config.mpris_service_bus_type == DBT_session) mpris_bus_type = G_BUS_TYPE_SESSION; @@ -11,170 +11,168 @@ #include "rtp.h" #include "dacp.h" -#include <mosquitto.h> #include "mqtt.h" +#include <mosquitto.h> -//this holds the mosquitto client +// this holds the mosquitto client struct mosquitto *global_mosq = NULL; char *topic = NULL; int connected = 0; -//mosquitto logging -void _cb_log( __attribute__((unused)) struct mosquitto *mosq, - __attribute__((unused)) void *userdata, int level, const char *str){ - switch(level){ - case MOSQ_LOG_DEBUG: - debug(1, str); - break; - case MOSQ_LOG_INFO: - debug(2, str); - break; - case MOSQ_LOG_NOTICE: - debug(3, str); - break; - case MOSQ_LOG_WARNING: - inform(str); - break; - case MOSQ_LOG_ERR: { - die("MQTT: Error: %s\n", str); - } +// mosquitto logging +void _cb_log(__attribute__((unused)) struct mosquitto *mosq, __attribute__((unused)) void *userdata, + int level, const char *str) { + switch (level) { + case MOSQ_LOG_DEBUG: + debug(1, str); + break; + case MOSQ_LOG_INFO: + debug(2, str); + break; + case MOSQ_LOG_NOTICE: + debug(3, str); + break; + case MOSQ_LOG_WARNING: + inform(str); + break; + case MOSQ_LOG_ERR: { + die("MQTT: Error: %s\n", str); + } } } -//mosquitto message handler -void on_message( __attribute__((unused)) struct mosquitto* mosq, - __attribute__((unused)) void* userdata, const struct mosquitto_message* msg){ - - //null-terminate the payload - char payload[msg->payloadlen+1]; - memcpy(payload,msg->payload,msg->payloadlen); - payload[msg->payloadlen]=0; - - debug(1, "[MQTT]: received Message on topic %s: %s\n",msg->topic, payload); - - //All recognized commands - char* commands[] = { - "command", "beginff", "beginrew", "mutetoggle", "nextitem", "previtem", "pause", - "playpause", "play", "stop", "playresume", "shuffle_songs", "volumedown", "volumeup", - NULL}; - - int it=0; - - //send command if it's a valid one - while(commands[it++]!=NULL){ - if( (size_t)msg->payloadlen>=strlen(commands[it]) && - strncmp(msg->payload, commands[it], strlen(commands[it]))==0 - ){ - debug(1, "[MQTT]: DACP Command: %s\n",commands[it]); +// mosquitto message handler +void on_message(__attribute__((unused)) struct mosquitto *mosq, + __attribute__((unused)) void *userdata, const struct mosquitto_message *msg) { + + // null-terminate the payload + char payload[msg->payloadlen + 1]; + memcpy(payload, msg->payload, msg->payloadlen); + payload[msg->payloadlen] = 0; + + debug(1, "[MQTT]: received Message on topic %s: %s\n", msg->topic, payload); + + // All recognized commands + char *commands[] = {"command", "beginff", "beginrew", "mutetoggle", "nextitem", + "previtem", "pause", "playpause", "play", "stop", + "playresume", "shuffle_songs", "volumedown", "volumeup", NULL}; + + int it = 0; + + // send command if it's a valid one + while (commands[it++] != NULL) { + if ((size_t)msg->payloadlen >= strlen(commands[it]) && + strncmp(msg->payload, commands[it], strlen(commands[it])) == 0) { + debug(1, "[MQTT]: DACP Command: %s\n", commands[it]); send_simple_dacp_command(commands[it]); break; } } } -void on_disconnect( __attribute__((unused)) struct mosquitto* mosq, - __attribute__((unused)) void* userdata, - __attribute__((unused)) int rc){ +void on_disconnect(__attribute__((unused)) struct mosquitto *mosq, + __attribute__((unused)) void *userdata, __attribute__((unused)) int rc) { connected = 0; debug(1, "[MQTT]: disconnected"); } -void on_connect(struct mosquitto* mosq, - __attribute__((unused)) void* userdata, - __attribute__((unused)) int rc){ +void on_connect(struct mosquitto *mosq, __attribute__((unused)) void *userdata, + __attribute__((unused)) int rc) { connected = 1; debug(1, "[MQTT]: connected"); - - //subscribe if requested - if(config.mqtt_enable_remote){ - char remotetopic[strlen(config.mqtt_topic)+8]; - snprintf(remotetopic,strlen(config.mqtt_topic)+8,"%s/remote",config.mqtt_topic); - mosquitto_subscribe(mosq,NULL,remotetopic,0); + + // subscribe if requested + if (config.mqtt_enable_remote) { + char remotetopic[strlen(config.mqtt_topic) + 8]; + snprintf(remotetopic, strlen(config.mqtt_topic) + 8, "%s/remote", config.mqtt_topic); + mosquitto_subscribe(mosq, NULL, remotetopic, 0); } } -//helper function to publish under a topic and automatically append the main topic -void mqtt_publish(char* topic, char* data, uint32_t length){ - char fulltopic[strlen(config.mqtt_topic)+strlen(topic)+3]; - snprintf(fulltopic, strlen(config.mqtt_topic)+strlen(topic)+2, "%s/%s", config.mqtt_topic, topic); - debug(1, "[MQTT]: publishing under %s",fulltopic); - +// helper function to publish under a topic and automatically append the main topic +void mqtt_publish(char *topic, char *data, uint32_t length) { + char fulltopic[strlen(config.mqtt_topic) + strlen(topic) + 3]; + snprintf(fulltopic, strlen(config.mqtt_topic) + strlen(topic) + 2, "%s/%s", config.mqtt_topic, + topic); + debug(1, "[MQTT]: publishing under %s", fulltopic); + int rc; - if((rc=mosquitto_publish(global_mosq, NULL, fulltopic, length, data, 0, 0))!=MOSQ_ERR_SUCCESS) { - switch(rc){ - case MOSQ_ERR_NO_CONN: - debug(1, "[MQTT]: Publish failed: not connected to broker"); - break; - default: - debug(1, "[MQTT]: Publish failed: unknown error"); - break; + if ((rc = mosquitto_publish(global_mosq, NULL, fulltopic, length, data, 0, 0)) != + MOSQ_ERR_SUCCESS) { + switch (rc) { + case MOSQ_ERR_NO_CONN: + debug(1, "[MQTT]: Publish failed: not connected to broker"); + break; + default: + debug(1, "[MQTT]: Publish failed: unknown error"); + break; } } } -//handler for incoming metadata -void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length){ - if(global_mosq==NULL || connected!=1){ +// handler for incoming metadata +void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length) { + if (global_mosq == NULL || connected != 1) { debug(3, "[MQTT]: Client not connected, skipping metadata handling"); return; } - if(config.mqtt_publish_raw){ + if (config.mqtt_publish_raw) { uint32_t val; char topic[] = "____/____"; - - val=htonl(type); - memcpy(topic,&val, 4); - val=htonl(code); - memcpy(topic+5,&val, 4); + + val = htonl(type); + memcpy(topic, &val, 4); + val = htonl(code); + memcpy(topic + 5, &val, 4); mqtt_publish(topic, data, length); } - if(config.mqtt_publish_parsed){ - if(type=='core'){ + if (config.mqtt_publish_parsed) { + if (type == 'core') { switch (code) { - case 'asar': - mqtt_publish("artist", data, length); - break; - case 'asal': - mqtt_publish("album", data, length); - break; - case 'minm': - mqtt_publish("title", data, length); - break; - case 'asgn': - mqtt_publish("genre", data, length); - break; - case 'asfm': - mqtt_publish("format", data, length); - break; + case 'asar': + mqtt_publish("artist", data, length); + break; + case 'asal': + mqtt_publish("album", data, length); + break; + case 'minm': + mqtt_publish("title", data, length); + break; + case 'asgn': + mqtt_publish("genre", data, length); + break; + case 'asfm': + mqtt_publish("format", data, length); + break; } - }else if(type=='ssnc'){ + } else if (type == 'ssnc') { switch (code) { - case 'asal': - mqtt_publish("songalbum", data, length); - break; - case 'pvol': - mqtt_publish("volume", data, length); - break; - case 'clip': - mqtt_publish("client_ip", data, length); - break; - case 'pbeg': - mqtt_publish("play_start", data, length); - break; - case 'pend': - mqtt_publish("play_end", data, length); - break; - case 'pfls': - mqtt_publish("play_flush", data, length); - break; - case 'prsm': - mqtt_publish("play_resume", data, length); - break; - case 'PICT': - if(config.mqtt_publish_parsed){ - mqtt_publish("cover", data, length); - } - break; + case 'asal': + mqtt_publish("songalbum", data, length); + break; + case 'pvol': + mqtt_publish("volume", data, length); + break; + case 'clip': + mqtt_publish("client_ip", data, length); + break; + case 'pbeg': + mqtt_publish("play_start", data, length); + break; + case 'pend': + mqtt_publish("play_end", data, length); + break; + case 'pfls': + mqtt_publish("play_flush", data, length); + break; + case 'prsm': + mqtt_publish("play_resume", data, length); + break; + case 'PICT': + if (config.mqtt_publish_parsed) { + mqtt_publish("cover", data, length); + } + break; } } } @@ -182,50 +180,44 @@ void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t le return; } - int initialise_mqtt() { debug(1, "Initialising MQTT"); - if(config.mqtt_hostname==NULL){ + if (config.mqtt_hostname == NULL) { debug(1, "[MQTT]: Not initialized, as the hostname is not set"); return 0; } int keepalive = 60; mosquitto_lib_init(); - if( !(global_mosq = mosquitto_new(config.service_name, true, NULL)) ){ + if (!(global_mosq = mosquitto_new(config.service_name, true, NULL))) { die("[MQTT]: FATAL: Could not create mosquitto object! %d\n", global_mosq); } - if( - config.mqtt_cafile != NULL || - config.mqtt_capath != NULL || - config.mqtt_certfile != NULL || - config.mqtt_keyfile != NULL - ){ - if(mosquitto_tls_set(global_mosq,config.mqtt_cafile, config.mqtt_capath, config.mqtt_certfile, config.mqtt_keyfile, NULL) != MOSQ_ERR_SUCCESS) { + if (config.mqtt_cafile != NULL || config.mqtt_capath != NULL || config.mqtt_certfile != NULL || + config.mqtt_keyfile != NULL) { + if (mosquitto_tls_set(global_mosq, config.mqtt_cafile, config.mqtt_capath, config.mqtt_certfile, + config.mqtt_keyfile, NULL) != MOSQ_ERR_SUCCESS) { die("[MQTT]: TLS Setup failed"); } } - if( - config.mqtt_username != NULL || - config.mqtt_password != NULL - ){ - if(mosquitto_username_pw_set(global_mosq,config.mqtt_username,config.mqtt_password) != MOSQ_ERR_SUCCESS) { + if (config.mqtt_username != NULL || config.mqtt_password != NULL) { + if (mosquitto_username_pw_set(global_mosq, config.mqtt_username, config.mqtt_password) != + MOSQ_ERR_SUCCESS) { die("[MQTT]: Username/Password set failed"); } } mosquitto_log_callback_set(global_mosq, _cb_log); - - if(config.mqtt_enable_remote){ + + if (config.mqtt_enable_remote) { mosquitto_message_callback_set(global_mosq, on_message); } - + mosquitto_disconnect_callback_set(global_mosq, on_disconnect); mosquitto_connect_callback_set(global_mosq, on_connect); - if(mosquitto_connect(global_mosq, config.mqtt_hostname, config.mqtt_port, keepalive)){ + if (mosquitto_connect(global_mosq, config.mqtt_hostname, config.mqtt_port, keepalive)) { inform("[MQTT]: Could not establish a mqtt connection"); } - if(mosquitto_loop_start(global_mosq) != MOSQ_ERR_SUCCESS){ + if (mosquitto_loop_start(global_mosq) != MOSQ_ERR_SUCCESS) { inform("[MQTT]: Could start MQTT Main loop"); } @@ -1,15 +1,14 @@ #ifndef MQTT_H #define MQTT_H -#include <stdint.h> #include <mosquitto.h> - +#include <stdint.h> int initialise_mqtt(); void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length); -void mqtt_publish(char* topic, char* data, uint32_t length); +void mqtt_publish(char *topic, char *data, uint32_t length); void mqtt_setup(); -void on_connect(struct mosquitto* mosq, void* userdata, int rc); -void on_disconnect(struct mosquitto* mosq, void* userdata, int rc); -void on_message(struct mosquitto* mosq, void* userdata, const struct mosquitto_message* msg); +void on_connect(struct mosquitto *mosq, void *userdata, int rc); +void on_disconnect(struct mosquitto *mosq, void *userdata, int rc); +void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg); void _cb_log(struct mosquitto *mosq, void *userdata, int level, const char *str); #endif /* #ifndef MQTT_H */ diff --git a/org.gnome.ShairportSync.xml b/org.gnome.ShairportSync.xml index aa693fd..1b734bf 100644 --- a/org.gnome.ShairportSync.xml +++ b/org.gnome.ShairportSync.xml @@ -1,6 +1,7 @@ <?xml version="1.0" encoding="UTF-8" ?> <node name="/" xmlns:doc="http://www.freedesktop.org/dbus/1.0/doc.dtd"> <interface name="org.gnome.ShairportSync"> + <method name="Quit"/> <property name="LoudnessFilterActive" type="b" access="readwrite" /> <property name="LoudnessThreshold" type="d" access="readwrite" /> <method name="RemoteCommand"> @@ -315,18 +315,16 @@ static int alac_decode(short *dest, int *destlen, uint8_t *buf, int len, rtsp_co } if (outsize > toutsize) { - debug(2, - "Output from alac_decode larger (%d bytes, not frames) than expected (%d bytes) -- " - "truncated, but buffer overflow possible! Encrypted = %d.", + debug(2, "Output from alac_decode larger (%d bytes, not frames) than expected (%d bytes) -- " + "truncated, but buffer overflow possible! Encrypted = %d.", outsize, toutsize, conn->stream.encrypted); reply = -1; // output packet is the wrong size } *destlen = outsize / conn->input_bytes_per_frame; if ((outsize % conn->input_bytes_per_frame) != 0) - debug(1, - "Number of audio frames (%d) does not correspond exactly to the number of bytes (%d) " - "and the audio frame size (%d).", + debug(1, "Number of audio frames (%d) does not correspond exactly to the number of bytes (%d) " + "and the audio frame size (%d).", *destlen, outsize, conn->input_bytes_per_frame); return reply; } @@ -420,7 +418,8 @@ static int init_decoder(int32_t fmtp[12], rtsp_conn_info *conn) { conn->input_bytes_per_frame = conn->input_num_channels * ((conn->input_bit_depth + 7) / 8); - alac = alac_create(conn->input_bit_depth, conn->input_num_channels); + alac = alac_create(conn->input_bit_depth, + conn->input_num_channels); // no pthread cancellation point in here if (!alac) return 1; conn->decoder_info = alac; @@ -436,10 +435,10 @@ static int init_decoder(int32_t fmtp[12], rtsp_conn_info *conn) { alac->setinfo_82 = fmtp[9]; alac->setinfo_86 = fmtp[10]; alac->setinfo_8a_rate = fmtp[11]; - alac_allocate_buffers(alac); + alac_allocate_buffers(alac); // no pthread cancellation point in here #ifdef HAVE_APPLE_ALAC - apple_alac_init(fmtp); + apple_alac_init(fmtp); // no pthread cancellation point in here #endif return 0; @@ -465,193 +464,172 @@ static void free_audio_buffers(rtsp_conn_info *conn) { free(conn->audio_buffer[i].data); } -void player_thread_lock_cleanup(void *arg) { - rtsp_conn_info *conn = (rtsp_conn_info *)arg; - debug(3, "Cleaning up player_thread_lock."); - pthread_rwlock_unlock(&conn->player_thread_lock); -} - void player_put_packet(seq_t seqno, uint32_t actual_timestamp, int64_t timestamp, uint8_t *data, int len, rtsp_conn_info *conn) { - if (pthread_rwlock_tryrdlock(&conn->player_thread_lock) == 0) { - pthread_cleanup_push(player_thread_lock_cleanup, (void *)conn); - if (conn->player_thread != NULL) { + // all timestamps are done at the output rate + // the "actual_timestamp" is the one that comes in the packet, and is carried over for + // debugging + // and checking only. - // all timestamps are done at the output rate - // the "actual_timestamp" is the one that comes in the packet, and is carried over for - // debugging - // and checking only. + int64_t ltimestamp = timestamp * conn->output_sample_ratio; - int64_t ltimestamp = timestamp * conn->output_sample_ratio; - - // ignore a request to flush that has been made before the first packet... - if (conn->packet_count == 0) { - debug_mutex_lock(&conn->flush_mutex, 1000, 1); - conn->flush_requested = 0; - conn->flush_rtp_timestamp = 0; - debug_mutex_unlock(&conn->flush_mutex, 3); - } + // ignore a request to flush that has been made before the first packet... + if (conn->packet_count == 0) { + debug_mutex_lock(&conn->flush_mutex, 1000, 1); + conn->flush_requested = 0; + conn->flush_rtp_timestamp = 0; + debug_mutex_unlock(&conn->flush_mutex, 3); + } - debug_mutex_lock(&conn->ab_mutex, 30000, 1); - conn->packet_count++; - conn->time_of_last_audio_packet = get_absolute_time_in_fp(); - if (conn->connection_state_to_output) { // if we are supposed to be processing these packets + debug_mutex_lock(&conn->ab_mutex, 30000, 1); + conn->packet_count++; + conn->time_of_last_audio_packet = get_absolute_time_in_fp(); + if (conn->connection_state_to_output) { // if we are supposed to be processing these packets + + // if (flush_rtp_timestamp != 0) + // debug(1,"Flush_rtp_timestamp is %u",flush_rtp_timestamp); + + if ((conn->flush_rtp_timestamp != 0) && (ltimestamp <= conn->flush_rtp_timestamp)) { + debug(3, + "Dropping flushed packet in player_put_packet, seqno %u, timestamp %lld, flushing to " + "timestamp: %lld.", + seqno, ltimestamp, conn->flush_rtp_timestamp); + } else { + if ((conn->flush_rtp_timestamp != 0x0) && + (ltimestamp > conn->flush_rtp_timestamp)) // if we have gone past the flush boundary time + conn->flush_rtp_timestamp = 0x0; - // if (flush_rtp_timestamp != 0) - // debug(1,"Flush_rtp_timestamp is %u",flush_rtp_timestamp); + abuf_t *abuf = 0; - if ((conn->flush_rtp_timestamp != 0) && (ltimestamp <= conn->flush_rtp_timestamp)) { - debug( - 3, - "Dropping flushed packet in player_put_packet, seqno %u, timestamp %lld, flushing to " - "timestamp: %lld.", - seqno, ltimestamp, conn->flush_rtp_timestamp); - } else { - if ((conn->flush_rtp_timestamp != 0x0) && - (ltimestamp > - conn->flush_rtp_timestamp)) // if we have gone past the flush boundary time - conn->flush_rtp_timestamp = 0x0; - - abuf_t *abuf = 0; - - if (!conn->ab_synced) { - debug(3, "syncing to seqno %u.", seqno); - conn->ab_write = seqno; - conn->ab_read = seqno; - conn->ab_synced = 1; - } + if (!conn->ab_synced) { + debug(3, "syncing to seqno %u.", seqno); + conn->ab_write = seqno; + conn->ab_read = seqno; + conn->ab_synced = 1; + } - // here, we should check for missing frames - int resend_interval = (((250 * 44100) / 352) / 1000); // approximately 250 ms intervals - const int number_of_resend_attempts = 8; - int latency_based_resend_interval = - (conn->latency) / (number_of_resend_attempts * conn->max_frames_per_packet); - if (latency_based_resend_interval > resend_interval) - resend_interval = latency_based_resend_interval; - - if (conn->resend_interval != resend_interval) { - debug(2, "Resend interval for latency of %" PRId64 " frames is %d frames.", - conn->latency, resend_interval); - conn->resend_interval = resend_interval; - } + // here, we should check for missing frames + int resend_interval = (((250 * 44100) / 352) / 1000); // approximately 250 ms intervals + const int number_of_resend_attempts = 8; + int latency_based_resend_interval = + (conn->latency) / (number_of_resend_attempts * conn->max_frames_per_packet); + if (latency_based_resend_interval > resend_interval) + resend_interval = latency_based_resend_interval; + + if (conn->resend_interval != resend_interval) { + debug(2, "Resend interval for latency of %" PRId64 " frames is %d frames.", conn->latency, + resend_interval); + conn->resend_interval = resend_interval; + } - if (conn->ab_write == seqno) { // expected packet - abuf = conn->audio_buffer + BUFIDX(seqno); - conn->ab_write = SUCCESSOR(seqno); - } else if (seq_order(conn->ab_write, seqno, conn->ab_read)) { // newer than expected - // if (ORDINATE(seqno)>(BUFFER_FRAMES*7)/8) - // debug(1,"An interval of %u frames has opened, with ab_read: %u, ab_write: %u and - // seqno: - // %u.",seq_diff(ab_read,seqno),ab_read,ab_write,seqno); - int32_t gap = seq_diff(conn->ab_write, seqno, conn->ab_read); - if (gap <= 0) - debug(1, "Unexpected gap size: %d.", gap); - int i; - for (i = 0; i < gap; i++) { - abuf = conn->audio_buffer + BUFIDX(seq_sum(conn->ab_write, i)); - abuf->ready = 0; // to be sure, to be sure - abuf->resend_level = 0; - abuf->timestamp = 0; - abuf->given_timestamp = 0; - abuf->sequence_number = 0; - } - // debug(1,"N %d s %u.",seq_diff(ab_write,PREDECESSOR(seqno))+1,ab_write); - abuf = conn->audio_buffer + BUFIDX(seqno); - // rtp_request_resend(ab_write, gap); - // resend_requests++; - conn->ab_write = SUCCESSOR(seqno); - } else if (seq_order(conn->ab_read, seqno, conn->ab_read)) { // late but not yet played - conn->late_packets++; - abuf = conn->audio_buffer + BUFIDX(seqno); - /* - if (abuf->ready) - debug(1,"Late apparently duplicate packet received that is %d packets - late.",seq_diff(seqno, conn->ab_write, conn->ab_read)); - else - debug(1,"Late packet received that is %d packets late.",seq_diff(seqno, - conn->ab_write, conn->ab_read)); - */ - } else { // too late. + if (conn->ab_write == seqno) { // expected packet + abuf = conn->audio_buffer + BUFIDX(seqno); + conn->ab_write = SUCCESSOR(seqno); + } else if (seq_order(conn->ab_write, seqno, conn->ab_read)) { // newer than expected + // if (ORDINATE(seqno)>(BUFFER_FRAMES*7)/8) + // debug(1,"An interval of %u frames has opened, with ab_read: %u, ab_write: %u and + // seqno: + // %u.",seq_diff(ab_read,seqno),ab_read,ab_write,seqno); + int32_t gap = seq_diff(conn->ab_write, seqno, conn->ab_read); + if (gap <= 0) + debug(1, "Unexpected gap size: %d.", gap); + int i; + for (i = 0; i < gap; i++) { + abuf = conn->audio_buffer + BUFIDX(seq_sum(conn->ab_write, i)); + abuf->ready = 0; // to be sure, to be sure + abuf->resend_level = 0; + abuf->timestamp = 0; + abuf->given_timestamp = 0; + abuf->sequence_number = 0; + } + // debug(1,"N %d s %u.",seq_diff(ab_write,PREDECESSOR(seqno))+1,ab_write); + abuf = conn->audio_buffer + BUFIDX(seqno); + // rtp_request_resend(ab_write, gap); + // resend_requests++; + conn->ab_write = SUCCESSOR(seqno); + } else if (seq_order(conn->ab_read, seqno, conn->ab_read)) { // late but not yet played + conn->late_packets++; + abuf = conn->audio_buffer + BUFIDX(seqno); + /* + if (abuf->ready) + debug(1,"Late apparently duplicate packet received that is %d packets + late.",seq_diff(seqno, conn->ab_write, conn->ab_read)); + else + debug(1,"Late packet received that is %d packets late.",seq_diff(seqno, + conn->ab_write, conn->ab_read)); + */ + } else { // too late. - // debug(1,"Too late packet received that is %d packets late.",seq_diff(seqno, - // conn->ab_write, conn->ab_read)); - conn->too_late_packets++; - } - // pthread_mutex_unlock(&ab_mutex); - - if (abuf) { - int datalen = conn->max_frames_per_packet; - if (alac_decode(abuf->data, &datalen, data, len, conn) == 0) { - abuf->ready = 1; - abuf->length = datalen; - abuf->timestamp = ltimestamp; - abuf->given_timestamp = actual_timestamp; - abuf->sequence_number = seqno; - } else { - debug(1, "Bad audio packet detected and discarded."); - abuf->ready = 0; - abuf->resend_level = 0; - abuf->timestamp = 0; - abuf->given_timestamp = 0; - abuf->sequence_number = 0; - } - } + // debug(1,"Too late packet received that is %d packets late.",seq_diff(seqno, + // conn->ab_write, conn->ab_read)); + conn->too_late_packets++; + } + // pthread_mutex_unlock(&ab_mutex); + + if (abuf) { + int datalen = conn->max_frames_per_packet; + if (alac_decode(abuf->data, &datalen, data, len, conn) == 0) { + abuf->ready = 1; + abuf->length = datalen; + abuf->timestamp = ltimestamp; + abuf->given_timestamp = actual_timestamp; + abuf->sequence_number = seqno; + } else { + debug(1, "Bad audio packet detected and discarded."); + abuf->ready = 0; + abuf->resend_level = 0; + abuf->timestamp = 0; + abuf->given_timestamp = 0; + abuf->sequence_number = 0; + } + } - // pthread_mutex_lock(&ab_mutex); - int rc = pthread_cond_signal(&conn->flowcontrol); - if (rc) - debug(1, "Error signalling flowcontrol."); - - // if it's at the expected time, do a look back for missing packets - // but release the ab_mutex when doing a resend - if (!conn->ab_buffering) { - int j; - for (j = 1; j <= number_of_resend_attempts; j++) { - // check j times, after a short period of has elapsed, assuming 352 frames per packet - // the higher the step_exponent, the less it will try. 1 means it will try very - // hard. 2.0 seems good. - float step_exponent = 2.0; - int back_step = (int)(resend_interval * pow(j, step_exponent)); - int k; - for (k = -1; k <= 1; k++) { - if ((back_step + k) < - seq_diff(conn->ab_read, conn->ab_write, - conn->ab_read)) { // if it's within the range of frames in use... - int item_to_check = (conn->ab_write - (back_step + k)) & 0xffff; - seq_t next = item_to_check; - abuf_t *check_buf = conn->audio_buffer + BUFIDX(next); - if ((!check_buf->ready) && - (check_buf->resend_level < - j)) { // prevent multiple requests from the same level of lookback - check_buf->resend_level = j; - if (config.disable_resend_requests == 0) { - if (((int)(resend_interval * pow(j + 1, step_exponent)) + k) >= - seq_diff(conn->ab_read, conn->ab_write, conn->ab_read)) - debug(3, - "Last-ditch (#%d) resend request for packet %u in range %u to %u. " - "Looking back %d packets.", - j, next, conn->ab_read, conn->ab_write, back_step + k); - debug_mutex_unlock(&conn->ab_mutex, 3); - rtp_request_resend(next, 1, conn); - conn->resend_requests++; - debug_mutex_lock(&conn->ab_mutex, 20000, 1); - } - } + // pthread_mutex_lock(&ab_mutex); + int rc = pthread_cond_signal(&conn->flowcontrol); + if (rc) + debug(1, "Error signalling flowcontrol."); + + // if it's at the expected time, do a look back for missing packets + // but release the ab_mutex when doing a resend + if (!conn->ab_buffering) { + int j; + for (j = 1; j <= number_of_resend_attempts; j++) { + // check j times, after a short period of has elapsed, assuming 352 frames per packet + // the higher the step_exponent, the less it will try. 1 means it will try very + // hard. 2.0 seems good. + float step_exponent = 2.0; + int back_step = (int)(resend_interval * pow(j, step_exponent)); + int k; + for (k = -1; k <= 1; k++) { + if ((back_step + k) < + seq_diff(conn->ab_read, conn->ab_write, + conn->ab_read)) { // if it's within the range of frames in use... + int item_to_check = (conn->ab_write - (back_step + k)) & 0xffff; + seq_t next = item_to_check; + abuf_t *check_buf = conn->audio_buffer + BUFIDX(next); + if ((!check_buf->ready) && + (check_buf->resend_level < + j)) { // prevent multiple requests from the same level of lookback + check_buf->resend_level = j; + if (config.disable_resend_requests == 0) { + if (((int)(resend_interval * pow(j + 1, step_exponent)) + k) >= + seq_diff(conn->ab_read, conn->ab_write, conn->ab_read)) + debug(3, "Last-ditch (#%d) resend request for packet %u in range %u to %u. " + "Looking back %d packets.", + j, next, conn->ab_read, conn->ab_write, back_step + k); + debug_mutex_unlock(&conn->ab_mutex, 3); + rtp_request_resend(next, 1, conn); + conn->resend_requests++; + debug_mutex_lock(&conn->ab_mutex, 20000, 1); } } } } } } - debug_mutex_unlock(&conn->ab_mutex, 3); - } else { - debug(1, "player_put_packet discarded packet %d because the player thread was gone."); } - pthread_cleanup_pop(1); - // pthread_rwlock_unlock(&conn->player_thread_lock); - } else { - debug(1, "player_put_packet discarded packet %d because the player thread was locked.", seqno); } + debug_mutex_unlock(&conn->ab_mutex, 3); } int32_t rand_in_range(int32_t exclusive_range_limit) { @@ -793,6 +771,11 @@ static inline void process_sample(int32_t sample, char **outp, enum sps_format_t *outp += result; } +void buffer_get_frame_cleanup_handler(void *arg) { + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + debug_mutex_unlock(&conn->ab_mutex, 3); +} + // get the next frame, when available. return 0 if underrun/stream reset. static abuf_t *buffer_get_frame(rtsp_conn_info *conn) { // int16_t buf_fill; @@ -802,8 +785,12 @@ static abuf_t *buffer_get_frame(rtsp_conn_info *conn) { int notified_buffer_empty = 0; // diagnostic only debug_mutex_lock(&conn->ab_mutex, 30000, 1); + int wait; long dac_delay = 0; // long because alsa returns a long + + pthread_cleanup_push(buffer_get_frame_cleanup_handler, + (void *)conn); // undo what's been done so far do { // get the time local_time_now = get_absolute_time_in_fp(); // type okay @@ -820,12 +807,12 @@ static abuf_t *buffer_get_frame(rtsp_conn_info *conn) { // if (conn->packet_count>500) { //for testing -- about 4 seconds of play first if ((local_time_now > conn->time_of_last_audio_packet) && (local_time_now - conn->time_of_last_audio_packet >= ct << 32)) { - debug(1, - "As Yeats almost said, \"Too long a silence / can make a stone of the heart\" " - "from RTSP conversation %d.", + debug(1, "As Yeats almost said, \"Too long a silence / can make a stone of the heart\" " + "from RTSP conversation %d.", conn->connection_number); conn->stop = 1; - pthread_kill(conn->thread, SIGUSR1); + pthread_cancel(conn->thread); + // pthread_kill(conn->thread, SIGUSR1); } } int rco = get_requested_connection_state_to_output(); @@ -843,8 +830,8 @@ static abuf_t *buffer_get_frame(rtsp_conn_info *conn) { debug_mutex_lock(&conn->flush_mutex, 1000, 1); if (conn->flush_requested == 1) { if (config.output->flush) - config.output->flush(); - ab_resync(conn); + config.output->flush(); // no cancellation points + ab_resync(conn); // no cancellation points conn->first_packet_timestamp = 0; conn->first_packet_time_to_play = 0; conn->time_since_play_started = 0; @@ -1212,8 +1199,7 @@ static abuf_t *buffer_get_frame(rtsp_conn_info *conn) { } do_wait = 1; } - wait = (conn->ab_buffering || (do_wait != 0) || (!conn->ab_synced)) && - (!conn->player_thread_please_stop); + wait = (conn->ab_buffering || (do_wait != 0) || (!conn->ab_synced)); if (wait) { uint64_t time_to_wait_for_wakeup_fp = @@ -1230,7 +1216,8 @@ static abuf_t *buffer_get_frame(rtsp_conn_info *conn) { time_of_wakeup.tv_sec = sec; time_of_wakeup.tv_nsec = nsec; // pthread_cond_timedwait(&conn->flowcontrol, &conn->ab_mutex, &time_of_wakeup); - int rc = pthread_cond_timedwait(&conn->flowcontrol, &conn->ab_mutex, &time_of_wakeup); + int rc = pthread_cond_timedwait(&conn->flowcontrol, &conn->ab_mutex, + &time_of_wakeup); // this is a pthread cancellation point if (rc != 0) debug(3, "pthread_cond_timedwait returned error code %d.", rc); #endif @@ -1245,14 +1232,7 @@ static abuf_t *buffer_get_frame(rtsp_conn_info *conn) { } } while (wait); - if (conn->player_thread_please_stop) { - debug(3, "buffer_get_frame exiting due to thread stop request."); - debug_mutex_unlock(&conn->ab_mutex, 3); - return 0; - } - // seq_t read = conn->ab_read; - if (!curframe->ready) { // debug(1, "Supplying a silent frame for frame %u", read); conn->missing_packets++; @@ -1261,7 +1241,8 @@ static abuf_t *buffer_get_frame(rtsp_conn_info *conn) { curframe->ready = 0; curframe->resend_level = 0; conn->ab_read = SUCCESSOR(conn->ab_read); - debug_mutex_unlock(&conn->ab_mutex, 3); + + pthread_cleanup_pop(1); return curframe; } @@ -1439,41 +1420,85 @@ typedef struct stats { // statistics for running averages int64_t sync_error, correction, drift; } stats_t; -void *player_thread_func(void *arg) { - // note, the thread will be started up with the player_thread_lock locked. You must release it - // quickly. - +void player_thread_cleanup_handler(void *arg) { + debug(1, "player_thread_cleanup_handler called"); rtsp_conn_info *conn = (rtsp_conn_info *)arg; - pthread_rwlock_wrlock(&conn->player_thread_lock); + debug(3, "Connection %d: player thread main loop exit.", conn->connection_number); - int rc = pthread_mutex_init(&conn->ab_mutex, NULL); - if (rc) - debug(1, "Error initialising ab_mutex."); - rc = pthread_mutex_init(&conn->flush_mutex, NULL); - if (rc) - debug(1, "Error initialising flush_mutex."); -// set the flowcontrol condition variable to wait on a monotonic clock -#ifdef COMPILE_FOR_LINUX_AND_FREEBSD_AND_CYGWIN_AND_OPENBSD - pthread_condattr_t attr; - pthread_condattr_init(&attr); - pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); // can't do this in OS X, and don't need it. - rc = pthread_cond_init(&conn->flowcontrol, &attr); -#endif -#ifdef COMPILE_FOR_OSX - rc = pthread_cond_init(&conn->flowcontrol, NULL); + if (config.statistics_requested) { + int rawSeconds = (int)difftime(time(NULL), conn->playstart); + int elapsedHours = rawSeconds / 3600; + int elapsedMin = (rawSeconds / 60) % 60; + int elapsedSec = rawSeconds % 60; + if (conn->frame_rate_status == 0) + inform("Playback Stopped. Total playing time %02d:%02d:%02d at %0.2f frames per second.", + elapsedHours, elapsedMin, elapsedSec, conn->frame_rate); + else + inform("Playback Stopped. Total playing time %02d:%02d:%02d.", elapsedHours, elapsedMin, + elapsedSec); + } + +#ifdef HAVE_DACP_CLIENT + + relinquish_dacp_server_information( + conn); // say it doesn't belong to this conversation thread any more... + +#else + // stop watching for DACP port number stuff + // this is only used for compatability, if dacp stuff isn't enabled. + if (conn->dapo_private_storage) { + mdns_dacp_dont_monitor(conn->dapo_private_storage); + conn->dapo_private_storage = NULL; + } else { + debug(2, "DACP Monitor already stopped"); + } #endif - if (rc) - debug(1, "Error initialising flowcontrol condition variable."); - pthread_rwlock_unlock(&conn->player_thread_lock); + debug(2, "Cancelling timing, control and audio threads..."); + debug(2, "Cancel timing thread."); + pthread_cancel(conn->rtp_timing_thread); + debug(2, "Join timing thread."); + pthread_join(conn->rtp_timing_thread, NULL); + debug(2, "Timing thread terminated."); + debug(2, "Cancel control thread."); + pthread_cancel(conn->rtp_control_thread); + debug(2, "Join control thread."); + pthread_join(conn->rtp_control_thread, NULL); + debug(2, "Control thread terminated."); + debug(2, "Cancel audio thread."); + pthread_cancel(conn->rtp_audio_thread); + debug(2, "Join audio thread."); + pthread_join(conn->rtp_audio_thread, NULL); + debug(2, "Audio thread terminated."); + + if (conn->outbuf) { + free(conn->outbuf); + conn->outbuf = NULL; + } + if (conn->sbuf) { + free(conn->sbuf); + conn->sbuf = NULL; + } + if (conn->tbuf) { + free(conn->tbuf); + conn->tbuf = NULL; + } + free_audio_buffers(conn); + terminate_decoders(conn); + if (config.output->stop) + config.output->stop(); - // it's safe now + clear_reference_timestamp(conn); + conn->rtp_running = 0; +} + +void *player_thread_func(void *arg) { + rtsp_conn_info *conn = (rtsp_conn_info *)arg; conn->packet_count = 0; conn->previous_random_number = 0; conn->input_bytes_per_frame = 4; - conn->player_thread_please_stop = 0; conn->decoder_in_use = 0; conn->ab_buffering = 1; conn->ab_synced = 0; @@ -1487,12 +1512,13 @@ void *player_thread_func(void *arg) { conn->latency = 88200; } - config.output->start(config.output_rate, config.output_format); + config.output->start(config.output_rate, config.output_format); // will need a corresponding stop init_decoder((int32_t *)&conn->stream.fmtp, - conn); // this sets up incoming rate, bit depth, channels - // must be after decoder init - init_buffer(conn); + conn); // this sets up incoming rate, bit depth, channels. + // No pthread cancellation point in here + // This must be after init_decoder + init_buffer(conn); // will need a corresponding deallocation if (conn->stream.encrypted) { #ifdef HAVE_LIBMBEDTLS @@ -1541,12 +1567,6 @@ void *player_thread_func(void *arg) { debug(3, "Output frame bytes is %d.", conn->output_bytes_per_frame); - // create and start the timing, control and audio receiver threads - pthread_t rtp_audio_thread, rtp_control_thread, rtp_timing_thread; - pthread_create(&rtp_audio_thread, NULL, &rtp_audio_receiver, (void *)conn); - pthread_create(&rtp_control_thread, NULL, &rtp_control_receiver, (void *)conn); - pthread_create(&rtp_timing_thread, NULL, &rtp_timing_receiver, (void *)conn); - conn->session_corrections = 0; conn->play_segment_reference_frame = 0; // zero signals that we are not in a play segment @@ -1573,7 +1593,10 @@ void *player_thread_func(void *arg) { int32_t minimum_buffer_occupancy = INT32_MAX; int32_t maximum_buffer_occupancy = INT32_MIN; - time_t playstart = time(NULL); + conn->playstart = time(NULL); + + conn->frame_rate = 0.0; + conn->frame_rate_status = -1; // zero means okay conn->buffer_occupancy = 0; @@ -1594,12 +1617,7 @@ void *player_thread_func(void *arg) { static char rnstate[256]; initstate(time(NULL), rnstate, 256); - signed short *inbuf, *tbuf; - - int32_t *sbuf; - - char *outbuf; - + signed short *inbuf; int inbuflength; int output_bit_depth = 16; // default; @@ -1639,27 +1657,26 @@ void *player_thread_func(void *arg) { // if ((input_rate!=config.output_rate) || (input_bit_depth!=output_bit_depth)) { // debug(1,"Define tbuf of length // %d.",output_bytes_per_frame*(max_frames_per_packet*output_sample_ratio+max_frame_size_change)); - tbuf = malloc( - sizeof(int32_t) * 2 * - (conn->max_frames_per_packet * conn->output_sample_ratio + conn->max_frame_size_change)); - if (tbuf == NULL) + conn->tbuf = + malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio + + conn->max_frame_size_change)); + if (conn->tbuf == NULL) die("Failed to allocate memory for the transition buffer."); - sbuf = 0; // initialise this, because soxr stuffing might be chosen later - sbuf = malloc( - sizeof(int32_t) * 2 * - (conn->max_frames_per_packet * conn->output_sample_ratio + conn->max_frame_size_change)); - if (sbuf == NULL) - debug(1, "Failed to allocate memory for the sbuf buffer."); + conn->sbuf = + malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio + + conn->max_frame_size_change)); + if (conn->sbuf == NULL) + die("Failed to allocate memory for the sbuf buffer."); // The size of these dependents on the number of frames, the size of each frame and the maximum // size change - outbuf = malloc( + conn->outbuf = malloc( conn->output_bytes_per_frame * (conn->max_frames_per_packet * conn->output_sample_ratio + conn->max_frame_size_change)); - if (outbuf == NULL) + if (conn->outbuf == NULL) die("Failed to allocate memory for an output buffer."); conn->first_packet_timestamp = 0; conn->missing_packets = conn->late_packets = conn->too_late_packets = conn->resend_requests = 0; @@ -1670,7 +1687,9 @@ void *player_thread_func(void *arg) { // stop looking elsewhere for DACP stuff #ifdef HAVE_DACP_CLIENT - set_dacp_server_information(conn); // this will start scanning when a port is registered by the + // debug(1, "Set dacp server info"); + // this does not have pthread cancellation points in it (assuming avahi doesn't) + set_dacp_server_information(conn); // this will start scanning when a port is registered by the // code initiated by the mdns_dacp_monitor #else // this is only used for compatability, if dacp stuff isn't enabled. @@ -1679,7 +1698,8 @@ void *player_thread_func(void *arg) { if (conn->dapo_private_storage) debug(1, "DACP monitor already initialised?"); else - conn->dapo_private_storage = mdns_dacp_monitor(conn->dacp_id); + // this does not have pthread cancellation points in it (assuming avahi doesn't) + conn->dapo_private_storage = mdns_dacp_monitor(conn->dacp_id); // ?? #endif conn->framesProcessedInThisEpoch = 0; @@ -1699,7 +1719,8 @@ void *player_thread_func(void *arg) { "resend requests, " "min DAC queue size, " "min buffer occupancy, " - "max buffer occupancy"); + "max buffer occupancy, " + "frames per second"); } else { inform("sync error in milliseconds, " "total packets, " @@ -1725,12 +1746,19 @@ void *player_thread_func(void *arg) { // set the default volume to whaterver it was before, as stored in the config airplay_volume debug(3, "Set initial volume to %f.", config.airplay_volume); - - player_volume(config.airplay_volume, conn); + player_volume(config.airplay_volume, conn); // ?? int64_t frames_to_drop = 0; + + // create and start the timing, control and audio receiver threads + pthread_create(&conn->rtp_audio_thread, NULL, &rtp_audio_receiver, (void *)conn); + pthread_create(&conn->rtp_control_thread, NULL, &rtp_control_receiver, (void *)conn); + pthread_create(&conn->rtp_timing_thread, NULL, &rtp_timing_receiver, (void *)conn); + + pthread_cleanup_push(player_thread_cleanup_handler, arg); // undo what's been done so far + // debug(1, "Play begin"); - while (!conn->player_thread_please_stop) { - abuf_t *inframe = buffer_get_frame(conn); + while (1) { + abuf_t *inframe = buffer_get_frame(conn); // this has cancellation point(s) if (inframe) { inbuf = inframe->data; inbuflength = inframe->length; @@ -1752,9 +1780,8 @@ void *player_thread_func(void *arg) { } else { // the player may change the contents of the buffer, so it has to be zeroed each time; // might as well malloc and freee it locally - memset(silence, 0, - conn->output_bytes_per_frame * conn->max_frames_per_packet * - conn->output_sample_ratio); + memset(silence, 0, conn->output_bytes_per_frame * conn->max_frames_per_packet * + conn->output_sample_ratio); config.output->play(silence, conn->max_frames_per_packet * conn->output_sample_ratio); free(silence); } @@ -1774,9 +1801,8 @@ void *player_thread_func(void *arg) { } else { // the player may change the contents of the buffer, so it has to be zeroed each time; // might as well malloc and freee it locally - memset(silence, 0, - conn->output_bytes_per_frame * conn->max_frames_per_packet * - conn->output_sample_ratio); + memset(silence, 0, conn->output_bytes_per_frame * conn->max_frames_per_packet * + conn->output_sample_ratio); config.output->play(silence, conn->max_frames_per_packet * conn->output_sample_ratio); free(silence); } @@ -1806,7 +1832,7 @@ void *player_thread_func(void *arg) { int32_t ll = 0, rl = 0; int16_t *inps = inbuf; // int16_t *outps = tbuf; - int32_t *outpl = (int32_t *)tbuf; + int32_t *outpl = (int32_t *)conn->tbuf; for (i = 0; i < inbuflength; i++) { ls = *inps++; rs = *inps++; @@ -1923,9 +1949,8 @@ void *player_thread_func(void *arg) { SUCCESSOR(conn->last_seqno_read); // int32_t from seq_t, i.e. uint16_t, so okay. if (inframe->sequence_number != conn->last_seqno_read) { // seq_t, ei.e. uint16_t and int32_t, so okay - debug(2, - "Player: packets out of sequence: expected: %u, got: %u, with ab_read: %u " - "and ab_write: %u.", + debug(2, "Player: packets out of sequence: expected: %u, got: %u, with ab_read: %u " + "and ab_write: %u.", conn->last_seqno_read, inframe->sequence_number, conn->ab_read, conn->ab_write); conn->last_seqno_read = inframe->sequence_number; // reset warning... } @@ -1989,7 +2014,7 @@ void *player_thread_func(void *arg) { abs_sync_error = -abs_sync_error; if ((config.no_sync == 0) && (inframe->timestamp != 0) && - (!conn->player_thread_please_stop) && (config.resyncthreshold > 0.0) && + (config.resyncthreshold > 0.0) && (abs_sync_error > config.resyncthreshold * config.output_rate)) { if (abs_sync_error > 3 * config.output_rate) { warn("Very large sync error: %" PRId64 " frames, with delay: %" PRId64 @@ -2020,12 +2045,15 @@ void *player_thread_func(void *arg) { silence_length = filler_length * 5; char *long_silence = malloc(conn->output_bytes_per_frame * silence_length); - if (long_silence == NULL) - die("Failed to allocate memory for a long_silence buffer of %d frames.", - silence_length); - memset(long_silence, 0, conn->output_bytes_per_frame * silence_length); - config.output->play(long_silence, silence_length); - free(long_silence); + if (long_silence) { + memset(long_silence, 0, conn->output_bytes_per_frame * silence_length); + config.output->play(long_silence, silence_length); + free(long_silence); + } else { + warn("Failed to allocate memory for a long_silence buffer of %d frames for a " + "sync error of %" PRId64 " frames.", + silence_length, sync_error); + } } } else { @@ -2076,8 +2104,8 @@ void *player_thread_func(void *arg) { #ifdef CONFIG_CONVOLUTION || config.convolution #endif - ) { - int32_t *tbuf32 = (int32_t *)tbuf; + ) { + int32_t *tbuf32 = (int32_t *)conn->tbuf; float fbuf_l[inbuflength]; float fbuf_r[inbuflength]; @@ -2127,15 +2155,15 @@ void *player_thread_func(void *arg) { case ST_basic: // if (amount_to_stuff) debug(1,"Basic stuff..."); play_samples = - stuff_buffer_basic_32((int32_t *)tbuf, inbuflength, config.output_format, - outbuf, amount_to_stuff, enable_dither, conn); + stuff_buffer_basic_32((int32_t *)conn->tbuf, inbuflength, config.output_format, + conn->outbuf, amount_to_stuff, enable_dither, conn); break; case ST_soxr: #ifdef HAVE_LIBSOXR // if (amount_to_stuff) debug(1,"Soxr stuff..."); - play_samples = stuff_buffer_soxr_32((int32_t *)tbuf, (int32_t *)sbuf, inbuflength, - config.output_format, outbuf, amount_to_stuff, - enable_dither, conn); + play_samples = stuff_buffer_soxr_32((int32_t *)conn->tbuf, (int32_t *)conn->sbuf, + inbuflength, config.output_format, conn->outbuf, + amount_to_stuff, enable_dither, conn); #endif break; } @@ -2155,20 +2183,20 @@ void *player_thread_func(void *arg) { } */ - if (outbuf == NULL) + if (conn->outbuf == NULL) debug(1, "NULL outbuf to play -- skipping it."); else { if (play_samples == 0) debug(1, "play_samples==0 skipping it (1)."); else - config.output->play(outbuf, play_samples); + config.output->play(conn->outbuf, play_samples); } // check for loss of sync // timestamp of zero means an inserted silent frame in place of a missing frame /* if ((config.no_sync == 0) && (inframe->timestamp != 0) && - (!conn->player_thread_please_stop) && (config.resyncthreshold > 0.0) && + && (config.resyncthreshold > 0.0) && (abs_sync_error > config.resyncthreshold * config.output_rate)) { sync_error_out_of_bounds++; // debug(1,"Sync error out of bounds: Error: %lld; previous error: %lld; DAC: %lld; @@ -2190,12 +2218,13 @@ void *player_thread_func(void *arg) { } else { // if there is no delay procedure, or it's not working or not allowed, there can be no // synchronising - play_samples = stuff_buffer_basic_32((int32_t *)tbuf, inbuflength, config.output_format, - outbuf, 0, enable_dither, conn); - if (outbuf == NULL) + play_samples = + stuff_buffer_basic_32((int32_t *)conn->tbuf, inbuflength, config.output_format, + conn->outbuf, 0, enable_dither, conn); + if (conn->outbuf == NULL) debug(1, "NULL outbuf to play -- skipping it."); else - config.output->play(outbuf, play_samples); // remove the (short*)! + config.output->play(conn->outbuf, play_samples); // remove the (short*)! } // mark the frame as finished @@ -2264,55 +2293,62 @@ void *player_thread_func(void *arg) { if (at_least_one_frame_seen) { if ((config.output->delay)) { if (config.no_sync == 0) { - inform( - " %*.1f," /* Sync error in milliseconds */ - "%*.1f," /* net correction in ppm */ - "%*.1f," /* corrections in ppm */ - "%*d," /* total packets */ - "%*llu," /* missing packets */ - "%*llu," /* late packets */ - "%*llu," /* too late packets */ - "%*llu," /* resend requests */ - "%*lli," /* min DAC queue size */ - "%*d," /* min buffer occupancy */ - "%*d", /* max buffer occupancy */ - 9, /* should be 10, but there's an explicit space at the start to ensure - alignment */ - 1000 * moving_average_sync_error / config.output_rate, 10, - moving_average_correction * 1000000 / (352 * conn->output_sample_ratio), 10, - moving_average_insertions_plus_deletions * 1000000 / - (352 * conn->output_sample_ratio), - 12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7, - conn->too_late_packets, 7, conn->resend_requests, 7, minimum_dac_queue_size, - 5, minimum_buffer_occupancy, 5, maximum_buffer_occupancy); + if (config.output->rate_info) { + uint64_t elapsed_play_time, frames_played; + conn->frame_rate_status = + config.output->rate_info(&elapsed_play_time, &frames_played); + if (conn->frame_rate_status == 0) { + conn->frame_rate = + 1.0 * (frames_played * (uint64_t)0x100000000) / elapsed_play_time; + } + } + inform("%*.1f," /* Sync error in milliseconds */ + "%*.1f," /* net correction in ppm */ + "%*.1f," /* corrections in ppm */ + "%*d," /* total packets */ + "%*llu," /* missing packets */ + "%*llu," /* late packets */ + "%*llu," /* too late packets */ + "%*llu," /* resend requests */ + "%*lli," /* min DAC queue size */ + "%*d," /* min buffer occupancy */ + "%*d," /* max buffer occupancy */ + "%*.2f", /* frame rate */ + 10, + 1000 * moving_average_sync_error / config.output_rate, 10, + moving_average_correction * 1000000 / (352 * conn->output_sample_ratio), + 10, moving_average_insertions_plus_deletions * 1000000 / + (352 * conn->output_sample_ratio), + 12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7, + conn->too_late_packets, 7, conn->resend_requests, 7, + minimum_dac_queue_size, 5, minimum_buffer_occupancy, 5, + maximum_buffer_occupancy, 10, conn->frame_rate); } else { - inform(" %*.1f," /* Sync error in milliseconds */ - "%*d," /* total packets */ - "%*llu," /* missing packets */ - "%*llu," /* late packets */ - "%*llu," /* too late packets */ - "%*llu," /* resend requests */ - "%*lli," /* min DAC queue size */ - "%*d," /* min buffer occupancy */ - "%*d", /* max buffer occupancy */ - 9, /* should be 10, but there's an explicit space at the start to ensure - alignment */ + inform("%*.1f," /* Sync error in milliseconds */ + "%*d," /* total packets */ + "%*llu," /* missing packets */ + "%*llu," /* late packets */ + "%*llu," /* too late packets */ + "%*llu," /* resend requests */ + "%*lli," /* min DAC queue size */ + "%*d," /* min buffer occupancy */ + "%*d", /* max buffer occupancy */ + 10, 1000 * moving_average_sync_error / config.output_rate, 12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7, conn->too_late_packets, 7, conn->resend_requests, 7, minimum_dac_queue_size, 5, minimum_buffer_occupancy, 5, maximum_buffer_occupancy); } } else { - inform(" %*.1f," /* Sync error in milliseconds */ - "%*d," /* total packets */ - "%*llu," /* missing packets */ - "%*llu," /* late packets */ - "%*llu," /* too late packets */ - "%*llu," /* resend requests */ - "%*d," /* min buffer occupancy */ - "%*d", /* max buffer occupancy */ - 9, /* should be 10, but there's an explicit space at the start to ensure - alignment */ + inform("%*.1f," /* Sync error in milliseconds */ + "%*d," /* total packets */ + "%*llu," /* missing packets */ + "%*llu," /* late packets */ + "%*llu," /* too late packets */ + "%*llu," /* resend requests */ + "%*d," /* min buffer occupancy */ + "%*d", /* max buffer occupancy */ + 10, 1000 * moving_average_sync_error / config.output_rate, 12, play_number, 7, conn->missing_packets, 7, conn->late_packets, 7, conn->too_late_packets, 7, conn->resend_requests, 5, minimum_buffer_occupancy, 5, @@ -2331,83 +2367,83 @@ void *player_thread_func(void *arg) { } } - debug(3, "Connection %d: player thread main loop exit.", conn->connection_number); + debug(1, "This should never be called."); - if (config.statistics_requested) { - int rawSeconds = (int)difftime(time(NULL), playstart); - int elapsedHours = rawSeconds / 3600; - int elapsedMin = (rawSeconds / 60) % 60; - int elapsedSec = rawSeconds % 60; - inform("Playback Stopped. Total playing time %02d:%02d:%02d.", elapsedHours, elapsedMin, - elapsedSec); - } + /* all done in the cleanup... -#ifndef HAVE_DACP_CLIENT - // stop watching for DACP port number stuff - // this is only used for compatability, if dacp stuff isn't enabled. - if (conn->dapo_private_storage) { - mdns_dacp_dont_monitor(conn->dapo_private_storage); - conn->dapo_private_storage = NULL; - } else { - debug(2, "DACP Monitor already stopped"); - } -#endif + debug(3, "Connection %d: player thread main loop exit.", conn->connection_number); - debug(3, "Connection %d: stopping output device.", conn->connection_number); + if (config.statistics_requested) { + int rawSeconds = (int)difftime(time(NULL), conn->playstart); + int elapsedHours = rawSeconds / 3600; + int elapsedMin = (rawSeconds / 60) % 60; + int elapsedSec = rawSeconds % 60; + inform("Playback Stopped. Total playing time %02d:%02d:%02d.", elapsedHours, elapsedMin, + elapsedSec); + } - if (config.output->stop) - config.output->stop(); + #ifdef HAVE_DACP_CLIENT - debug(2, "Cancelling timing, control and audio threads..."); - debug(2, "Cancel timing thread."); - pthread_cancel(rtp_timing_thread); - debug(2, "Join timing thread."); - pthread_join(rtp_timing_thread, NULL); - debug(2, "Timing thread terminated."); - debug(2, "Cancel control thread."); - pthread_cancel(rtp_control_thread); - debug(2, "Join control thread."); - pthread_join(rtp_control_thread, NULL); - debug(2, "Control thread terminated."); - debug(2, "Cancel audio thread."); - pthread_cancel(rtp_audio_thread); - debug(2, "Join audio thread."); - pthread_join(rtp_audio_thread, NULL); - debug(2, "Audio thread terminated."); - clear_reference_timestamp(conn); - conn->rtp_running = 0; - - debug(2, "Freeing audio buffers and decoders."); + relinquish_dacp_server_information(conn); // say it doesn't belong to this conversation thread + any more... - free_audio_buffers(conn); - terminate_decoders(conn); - // remove flow control and mutexes - rc = pthread_cond_destroy(&conn->flowcontrol); - if (rc) - debug(1, "Error destroying flowcontrol condition variable."); - rc = pthread_mutex_destroy(&conn->flush_mutex); - if (rc) - debug(1, "Error destroying flush_mutex variable."); - rc = pthread_mutex_destroy(&conn->ab_mutex); - if (rc) - debug(1, "Error destroying ab_mutex variable."); - debug(2, "Connection %d: player thread terminated.", conn->connection_number); - if (conn->dacp_id) { - free(conn->dacp_id); - conn->dacp_id = NULL; - } - if (outbuf) - free(outbuf); - if (tbuf) - free(tbuf); - if (sbuf) - free(sbuf); + #else + // stop watching for DACP port number stuff + // this is only used for compatability, if dacp stuff isn't enabled. + if (conn->dapo_private_storage) { + mdns_dacp_dont_monitor(conn->dapo_private_storage); + conn->dapo_private_storage = NULL; + } else { + debug(2, "DACP Monitor already stopped"); + } + #endif + + debug(2, "Cancelling timing, control and audio threads..."); + debug(2, "Cancel timing thread."); + pthread_cancel(rtp_timing_thread); + debug(2, "Join timing thread."); + pthread_join(rtp_timing_thread, NULL); + debug(2, "Timing thread terminated."); + debug(2, "Cancel control thread."); + pthread_cancel(rtp_control_thread); + debug(2, "Join control thread."); + pthread_join(rtp_control_thread, NULL); + debug(2, "Control thread terminated."); + debug(2, "Cancel audio thread."); + pthread_cancel(rtp_audio_thread); + debug(2, "Join audio thread."); + pthread_join(rtp_audio_thread, NULL); + debug(2, "Audio thread terminated."); + clear_reference_timestamp(conn); + conn->rtp_running = 0; + + debug(3, "Connection %d: stopping output device.", conn->connection_number); + + if (config.output->stop) + config.output->stop(); + + debug(2, "Freeing audio buffers and decoders."); + + free_audio_buffers(conn); + terminate_decoders(conn); + debug(2, "Connection %d: player thread terminated.", conn->connection_number); + if (outbuf) + free(outbuf); + if (tbuf) + free(tbuf); + if (sbuf) + free(sbuf); + */ + pthread_cleanup_pop(1); pthread_exit(NULL); } // takes the volume as specified by the airplay protocol void player_volume_without_notification(double airplay_volume, rtsp_conn_info *conn) { + // no cancellation points here if we assume that the mute call to the back end has no cancellation + // points + // The volume ranges -144.0 (mute) or -30 -- 0. See // http://git.zx2c4.com/Airtunes2/about/#setting-volume // By examination, the -30 -- 0 range is linear on the slider; i.e. the slider is calibrated in 30 @@ -2441,7 +2477,7 @@ void player_volume_without_notification(double airplay_volume, rtsp_conn_info *c int32_t hw_min_db, hw_max_db, hw_range_db, min_db, max_db; // hw_range_db is a flag; if 0 means no mixer - if (config.output->parameters) { + if (config.output->parameters) { // no cancellation points in here audio_parameters audio_information; // have a hardware mixer config.output->parameters(&audio_information); @@ -2571,9 +2607,9 @@ void player_volume_without_notification(double airplay_volume, rtsp_conn_info *c if (config.ignore_volume_control == 1) scaled_attenuation = max_db; else if (config.volume_control_profile == VCP_standard) - scaled_attenuation = vol2attn(airplay_volume, max_db, min_db); + scaled_attenuation = vol2attn(airplay_volume, max_db, min_db); // no cancellation points else if (config.volume_control_profile == VCP_flat) - scaled_attenuation = flat_vol2attn(airplay_volume, max_db, min_db); + scaled_attenuation = flat_vol2attn(airplay_volume, max_db, min_db); // no cancellation points else debug(1, "Unrecognised volume control profile"); @@ -2608,7 +2644,7 @@ void player_volume_without_notification(double airplay_volume, rtsp_conn_info *c // %f",software_attenuation,temp_fix_volume,airplay_volume); conn->fix_volume = temp_fix_volume; - memory_barrier(); + memory_barrier(); // no cancellation points if (config.loudness) loudness_set_volume(software_attenuation / 100); @@ -2640,36 +2676,31 @@ void player_volume(double airplay_volume, rtsp_conn_info *conn) { } void do_flush(int64_t timestamp, rtsp_conn_info *conn) { + debug(3, "Flush requested up to %u. It seems as if 0 is special.", timestamp); debug_mutex_lock(&conn->flush_mutex, 1000, 1); conn->flush_requested = 1; // if (timestamp!=0) conn->flush_rtp_timestamp = timestamp; // flush all packets up to (and including?) this - debug_mutex_unlock(&conn->flush_mutex, 3); conn->play_segment_reference_frame = 0; conn->play_number_after_flush = 0; + debug_mutex_unlock(&conn->flush_mutex, 3); + #ifdef CONFIG_METADATA // only send a flush metadata message if the first packet has been seen -- it's a bogus message // otherwise if (conn->first_packet_timestamp) { debug(2, "pfls"); - send_ssnc_metadata('pfls', NULL, 0, 1); + send_ssnc_metadata('pfls', NULL, 0, 1); // contains cancellation points } #endif + debug(3, "Flush request made."); } void player_flush(int64_t timestamp, rtsp_conn_info *conn) { debug(3, "player_flush"); - if (pthread_rwlock_tryrdlock(&conn->player_thread_lock) == 0) { - if (conn->player_thread != NULL) - do_flush(timestamp, conn); - else - debug(1, "Flush requested when player thread is gone."); - pthread_rwlock_unlock(&conn->player_thread_lock); - } else { - debug(3, "Can't acquire a read lock for a flush -- ignored."); - } + do_flush(timestamp, conn); } int player_play(rtsp_conn_info *conn) { @@ -2682,7 +2713,7 @@ int player_play(rtsp_conn_info *conn) { command_start(); #ifdef CONFIG_METADATA debug(2, "pbeg"); - send_ssnc_metadata('pbeg', NULL, 0, 1); + send_ssnc_metadata('pbeg', NULL, 0, 1); // contains cancellation points #endif pthread_t *pt = malloc(sizeof(pthread_t)); if (pt == NULL) @@ -2694,8 +2725,6 @@ int player_play(rtsp_conn_info *conn) { int rc = pthread_attr_setstacksize(&tattr, size); if (rc) debug(1, "Error setting stack size for player_thread: %s", strerror(errno)); - - // hack alert -- the player thread itself releases the player_thread_lock rwlock as soon as it's // finished initialising. pthread_create(pt, &tattr, player_thread_func, (void *)conn); pthread_attr_destroy(&tattr); @@ -2703,28 +2732,23 @@ int player_play(rtsp_conn_info *conn) { } int player_stop(rtsp_conn_info *conn) { + // note -- this may be called from another connection thread. debug(3, "player_stop"); - pthread_rwlock_wrlock(&conn->player_thread_lock); - debug(3, "player_thread_lock acquired"); if (conn->player_thread) { - debug(3, "player_thread exists"); - conn->player_thread_please_stop = 1; - pthread_cond_signal(&conn->flowcontrol); // tell it to give up - pthread_kill(*conn->player_thread, SIGUSR1); - debug(3, "player_thread signalled"); + debug(2, "player_thread cancel..."); + pthread_cancel(*conn->player_thread); pthread_join(*conn->player_thread, NULL); + debug(2, "player_thread joined."); free(conn->player_thread); conn->player_thread = NULL; - pthread_rwlock_unlock(&conn->player_thread_lock); #ifdef CONFIG_METADATA debug(2, "pend"); - send_ssnc_metadata('pend', NULL, 0, 1); + send_ssnc_metadata('pend', NULL, 0, 1); // contains cancellation points #endif command_stop(); return 0; } else { - pthread_rwlock_unlock(&conn->player_thread_lock); - debug(3, "player thread of RTSP conversation %d is already deleted.", conn->connection_number); + debug(3, "Connection %d: player thread already deleted.", conn->connection_number); return -1; } } @@ -76,17 +76,26 @@ typedef struct { // otherwise int fd; - int authorized; // set if a password is required and has been supplied + int authorized; // set if a password is required and has been supplied + char *auth_nonce; // the session nonce, if needed stream_cfg stream; SOCKADDR remote, local; int stop; int running; - pthread_t thread, timer_requester; - + time_t playstart; + pthread_t thread, timer_requester, rtp_audio_thread, rtp_control_thread, rtp_timing_thread; // pthread_t *ptp; - pthread_t *player_thread; - pthread_rwlock_t player_thread_lock; // used to control access by "outsiders" + // buffers to delete on exit + signed short *tbuf; + int32_t *sbuf; + char *outbuf; + + // for holding the rate information until printed out at the end of a session + double frame_rate; + int frame_rate_status; + + pthread_t *player_thread; abuf_t audio_buffer[BUFFER_FRAMES]; int max_frames_per_packet, input_num_channels, input_bit_depth, input_rate; int input_bytes_per_frame, output_bytes_per_frame, output_sample_ratio; @@ -95,7 +104,6 @@ typedef struct { alac_file *decoder_info; uint64_t packet_count; int connection_state_to_output; - int player_thread_please_stop; uint64_t first_packet_time_to_play; int64_t time_since_play_started; // nanoseconds // stats @@ -68,9 +68,9 @@ void rtp_terminate(rtsp_conn_info *conn) { void rtp_audio_receiver_cleanup_handler(void *arg) { debug(3, "Audio Receiver Cleanup."); rtsp_conn_info *conn = (rtsp_conn_info *)arg; - debug(1,"shutdown audio socket."); - shutdown(conn->audio_socket,SHUT_RDWR); - debug(1,"close audio socket."); + debug(1, "shutdown audio socket."); + shutdown(conn->audio_socket, SHUT_RDWR); + debug(1, "close audio socket."); close(conn->audio_socket); debug(3, "Audio Receiver Cleanup Successful."); } @@ -191,9 +191,9 @@ void *rtp_audio_receiver(void *arg) { void rtp_control_handler_cleanup_handler(void *arg) { debug(3, "Control Receiver Cleanup."); rtsp_conn_info *conn = (rtsp_conn_info *)arg; - debug(1,"shutdown control socket."); - shutdown(conn->control_socket,SHUT_RDWR); - debug(1,"close control socket."); + debug(1, "shutdown control socket."); + shutdown(conn->control_socket, SHUT_RDWR); + debug(1, "close control socket."); close(conn->control_socket); debug(3, "Control Receiver Cleanup Successful."); } @@ -477,9 +477,9 @@ void rtp_timing_receiver_cleanup_handler(void *arg) { rtsp_conn_info *conn = (rtsp_conn_info *)arg; pthread_cancel(conn->timer_requester); pthread_join(conn->timer_requester, NULL); - debug(1,"shutdown timing socket."); - shutdown(conn->timing_socket,SHUT_RDWR); - debug(1,"close timing socket."); + debug(1, "shutdown timing socket."); + shutdown(conn->timing_socket, SHUT_RDWR); + debug(1, "close timing socket."); close(conn->timing_socket); debug(3, "Timing Receiver Cleanup Successful."); } @@ -101,8 +101,6 @@ static pthread_mutex_t reference_counter_lock = PTHREAD_MUTEX_INITIALIZER; // static int please_shutdown = 0; // static pthread_t playing_thread = 0; -static rtsp_conn_info **conns = NULL; - int RTSP_connection_index = 1; #ifdef CONFIG_METADATA @@ -156,6 +154,12 @@ void pc_queue_init(pc_queue *the_queue, char *items, size_t item_size, uint32_t the_queue->eoq = 0; } +void pc_queue_delete(pc_queue *the_queue) { + pthread_cond_destroy(&the_queue->pc_queue_item_removed_signal); + pthread_cond_destroy(&the_queue->pc_queue_item_added_signal); + pthread_mutex_destroy(&the_queue->pc_queue_lock); +} + int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier, int block); @@ -163,6 +167,13 @@ int send_ssnc_metadata(uint32_t code, char *data, uint32_t length, int block) { return send_metadata('ssnc', code, data, length, NULL, block); } +void pc_queue_cleanup_handler(void *arg) { + pc_queue *the_queue = (pc_queue *)arg; + int rc = pthread_mutex_unlock(&the_queue->pc_queue_lock); + if (rc) + debug(1, "Error unlocking for pc_queue_add_item or pc_queue_get_item."); +} + int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) { int rc; if (the_queue) { @@ -174,6 +185,7 @@ int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) { rc = pthread_mutex_lock(&the_queue->pc_queue_lock); if (rc) debug(1, "Error locking for pc_queue_add_item"); + pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue); while (the_queue->count == the_queue->capacity) { rc = pthread_cond_wait(&the_queue->pc_queue_item_removed_signal, &the_queue->pc_queue_lock); if (rc) @@ -196,9 +208,7 @@ int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) { rc = pthread_cond_signal(&the_queue->pc_queue_item_added_signal); if (rc) debug(1, "Error signalling after pc_queue_add_item"); - rc = pthread_mutex_unlock(&the_queue->pc_queue_lock); - if (rc) - debug(1, "Error unlocking for pc_queue_add_item"); + pthread_cleanup_pop(1); // unlock the queue lock. } else { debug(1, "Adding an item to a NULL queue"); } @@ -211,6 +221,7 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) { rc = pthread_mutex_lock(&the_queue->pc_queue_lock); if (rc) debug(1, "Error locking for pc_queue_get_item"); + pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue); while (the_queue->count == 0) { rc = pthread_cond_wait(&the_queue->pc_queue_item_added_signal, &the_queue->pc_queue_lock); if (rc) @@ -231,9 +242,7 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) { rc = pthread_cond_signal(&the_queue->pc_queue_item_removed_signal); if (rc) debug(1, "Error signalling after pc_queue_removed_item"); - rc = pthread_mutex_unlock(&the_queue->pc_queue_lock); - if (rc) - debug(1, "Error unlocking for pc_queue_get_item"); + pthread_cleanup_pop(1); // unlock the queue lock. } else { debug(1, "Removing an item from a NULL queue"); } @@ -261,6 +270,21 @@ static void track_thread(rtsp_conn_info *conn) { } } +void cancel_all_RTSP_threads(void) { + int i; + for (i = 0; i < nconns; i++) { + debug(1, "Connection %d: cancelling.", conns[i]->connection_number); + pthread_cancel(conns[i]->thread); + } + for (i = 0; i < nconns; i++) { + debug(1, "Connection %d: joining.", conns[i]->connection_number); + pthread_join(conns[i]->thread, NULL); + if (conns[i] == playing_conn) + playing_conn = NULL; + free(conns[i]); + } +} + static void cleanup_threads(void) { void *retval; int i; @@ -292,8 +316,11 @@ void ask_other_rtsp_conversation_threads_to_stop(pthread_t except_this_thread) { for (i = 0; i < nconns; i++) { if (((except_this_thread == 0) || (pthread_equal(conns[i]->thread, except_this_thread) == 0)) && (conns[i]->running != 0)) { - conns[i]->stop = 1; - pthread_kill(conns[i]->thread, SIGUSR1); + pthread_cancel(conns[i]->thread); + pthread_join(conns[i]->thread, NULL); + debug(1, "Connection %d: asked to stop.", conns[i]->connection_number); + // conns[i]->stop = 1; + // pthread_kill(conns[i]->thread, SIGUSR1); } } } @@ -893,9 +920,10 @@ static void handle_set_parameter_parameter(rtsp_conn_info *conn, rtsp_message *r #ifdef CONFIG_METADATA if (!strncmp(cp, "progress: ", 10)) { char *progress = cp + 10; - // debug(2, "progress: \"%s\"\n", - // progress); // rtpstampstart/rtpstampnow/rtpstampend 44100 per second + // debug(2, "progress: \"%s\"\n",progress); // rtpstampstart/rtpstampnow/rtpstampend 44100 per + // second send_ssnc_metadata('prgr', strdup(progress), strlen(progress), 1); + } else #endif { @@ -1046,9 +1074,9 @@ static char *metadata_sockmsg; #define metadata_queue_size 500 metadata_package metadata_queue_items[metadata_queue_size]; -static pthread_t metadata_thread; +pthread_t metadata_thread; -void metadata_create(void) { +void metadata_create_multicast_socket(void) { if (config.metadata_enabled == 0) return; @@ -1070,7 +1098,7 @@ void metadata_create(void) { if (metadata_sockmsg) { memset(metadata_sockmsg, 0, config.metadata_sockmsglength); } else { - die("Could not malloc metadata socket buffer"); + die("Could not malloc metadata multicast socket buffer"); } } } @@ -1089,6 +1117,15 @@ void metadata_create(void) { umask(oldumask); } +void metadata_delete_multicast_socket(void) { + if (config.metadata_enabled == 0) + return; + shutdown(metadata_sock, SHUT_RDWR); // we want to immediately deallocate the buffer + close(metadata_sock); + if (metadata_sockmsg) + free(metadata_sockmsg); +} + void metadata_open(void) { if (config.metadata_enabled == 0) return; @@ -1106,12 +1143,12 @@ void metadata_open(void) { free(path); } -/* static void metadata_close(void) { + if (fd < 0) + return; close(fd); fd = -1; } -*/ void metadata_process(uint32_t type, uint32_t code, char *data, uint32_t length) { // debug(2, "Process metadata with type %x, code %x and length %u.", type, code, length); @@ -1233,11 +1270,32 @@ void metadata_process(uint32_t type, uint32_t code, char *data, uint32_t length) } } +void metadata_thread_cleanup_function(__attribute__((unused)) void *arg) { + debug(1, "metadata_thread_cleanup_function called"); + metadata_delete_multicast_socket(); + metadata_close(); + pc_queue_delete(&metadata_queue); +} + +void metadata_pack_cleanup_function(void *arg) { + // debug(1, "metadata_pack_cleanup_function called"); + metadata_package *pack = (metadata_package *)arg; + if (pack->carrier) + msg_free(pack->carrier); // release the message + else if (pack->data) + free(pack->data); +} + void *metadata_thread_function(__attribute__((unused)) void *ignore) { - metadata_create(); + // create a pc_queue for passing information to a threaded metadata handler + pc_queue_init(&metadata_queue, (char *)&metadata_queue_items, sizeof(metadata_package), + metadata_queue_size); + metadata_create_multicast_socket(); metadata_package pack; + pthread_cleanup_push(metadata_thread_cleanup_function, NULL); while (1) { pc_queue_get_item(&metadata_queue, &pack); + pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack); if (config.metadata_enabled) { metadata_process(pack.type, pack.code, pack.data, pack.length); #ifdef HAVE_METADATA_HUB @@ -1249,23 +1307,24 @@ void *metadata_thread_function(__attribute__((unused)) void *ignore) { } #endif } - if (pack.carrier) - msg_free(pack.carrier); // release the message - else if (pack.data) - free(pack.data); + pthread_cleanup_pop(1); } + pthread_cleanup_pop(1); // will never happen pthread_exit(NULL); } void metadata_init(void) { - // create a pc_queue for passing information to a threaded metadata handler - pc_queue_init(&metadata_queue, (char *)&metadata_queue_items, sizeof(metadata_package), - metadata_queue_size); int ret = pthread_create(&metadata_thread, NULL, metadata_thread_function, NULL); if (ret) debug(1, "Failed to create metadata thread!"); } +void metadata_stop(void) { + debug(1, "metadata_stop called."); + pthread_cancel(metadata_thread); + pthread_join(metadata_thread, NULL); +} + int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier, int block) { @@ -1471,21 +1530,20 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag if (!playing_conn) die("Non existent playing_conn with play_lock enabled."); - debug(1, "RTSP Conversation thread %d already playing when asked by thread %d.", + debug(1, "Connection %d: already playing when asked by connection %d.", playing_conn->connection_number, conn->connection_number); if (playing_conn->stop) { debug(1, "Playing connection is already shutting down; waiting for it..."); should_wait = 1; } else if (config.allow_session_interruption == 1) { - // some other thread has the player ... ask it to relinquish the thread + // some thread has the player ... ask it to relinquish the thread debug(1, "ANNOUNCE: playing connection %d being interrupted by connection %d.", playing_conn->connection_number, conn->connection_number); if (playing_conn == conn) { - debug(1, "ANNOUNCE asking to stop itself."); + debug(1, "ANNOUNCE asking to stop itself! Nothing done."); } else { + player_stop(playing_conn); playing_conn->stop = 1; - memory_barrier(); - pthread_kill(playing_conn->thread, SIGUSR1); should_wait = 1; } } @@ -1497,7 +1555,7 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag if (pthread_mutex_trylock(&play_lock) == 0) have_the_player = 1; else - debug(1, "ANNOUNCE failed to get the player"); + debug(1, "Connection %d: ANNOUNCE failed to get the player", conn->connection_number); } if (have_the_player) { @@ -1638,7 +1696,8 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag resp->respcode = 200; } else { resp->respcode = 453; - debug(1, "Already playing."); + debug(1, "Connection %d: failed because a connection is already playing.", + conn->connection_number); } out: @@ -1719,7 +1778,7 @@ static void apple_challenge(int fd, rtsp_message *req, rtsp_message *resp) { if (padding) *padding = 0; - msg_add_header(resp, "Apple-Response", encoded); + msg_add_header(resp, "Apple-Response", encoded); // will be freed when the response is freed. free(challresp); free(encoded); } @@ -1733,7 +1792,7 @@ static char *make_nonce(void) { if (read(fd, random, sizeof(random)) != sizeof(random)) debug(1, "Error reading /dev/random"); close(fd); - return base64_enc(random, 8); + return base64_enc(random, 8); // returns a pointer to malloc'ed memory } static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) { @@ -1884,36 +1943,100 @@ authenticate: return 1; } +void rtsp_conversation_thread_cleanup_function(void *arg) { + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + // debug(1, "Connection %d: rtsp_conversation_thread_func_cleanup_function called.", + // conn->connection_number); + player_stop(conn); + if (conn->fd > 0) { + // debug(1, "Connection %d: closing fd %d.", + // conn->connection_number,conn->fd); + close(conn->fd); + } + if (conn->auth_nonce) { + free(conn->auth_nonce); + conn->auth_nonce = NULL; + } + rtp_terminate(conn); + if (playing_conn == conn) { + debug(3, "Unlocking play lock on RTSP conversation thread %d.", conn->connection_number); + playing_conn = NULL; + pthread_mutex_unlock(&play_lock); + } + + if (conn->dacp_id) { + free(conn->dacp_id); + conn->dacp_id = NULL; + } + + debug(2, "Connection %d: RTSP thread terminated.", conn->connection_number); + conn->running = 0; + + // remove flow control and mutexes + int rc = pthread_cond_destroy(&conn->flowcontrol); + if (rc) + debug(1, "Connection %d: error %d destroying flow control condition variable.", + conn->connection_number, rc); + rc = pthread_mutex_destroy(&conn->ab_mutex); + if (rc) + debug(1, "Connection %d: error %d destroying ab_mutex.", conn->connection_number, rc); + rc = pthread_mutex_destroy(&conn->flush_mutex); + if (rc) + debug(1, "Connection %d: error %d destroying flush_mutex.", conn->connection_number, rc); +} + +void msg_cleanup_function(void *arg) { + // debug(1, "msg_cleanup_function called."); + msg_free((rtsp_message *)arg); +} + static void *rtsp_conversation_thread_func(void *pconn) { rtsp_conn_info *conn = pconn; - // create the player thread lock. - int rwli = pthread_rwlock_init(&conn->player_thread_lock, NULL); - if (rwli != 0) - die("Error %d initialising player_thread_lock for conversation thread %d.", rwli, - conn->connection_number); + int rc = pthread_mutex_init(&conn->flush_mutex, NULL); + if (rc) + die("Connection %d: error %d initialising flush_mutex.", conn->connection_number, rc); + rc = pthread_mutex_init(&conn->ab_mutex, NULL); + if (rc) + die("Connection %d: error %d initialising ab_mutex.", conn->connection_number, rc); +// set the flowcontrol condition variable to wait on a monotonic clock +#ifdef COMPILE_FOR_LINUX_AND_FREEBSD_AND_CYGWIN_AND_OPENBSD + pthread_condattr_t attr; + pthread_condattr_init(&attr); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); // can't do this in OS X, and don't need it. + rc = pthread_cond_init(&conn->flowcontrol, &attr); +#endif +#ifdef COMPILE_FOR_OSX + rc = pthread_cond_init(&conn->flowcontrol, NULL); +#endif + if (rc) + die("Connection %d: error %d initialising flow control condition variable.", + conn->connection_number, rc); rtp_initialise(conn); - - rtsp_message *req, *resp; - char *hdr, *auth_nonce = NULL; + char *hdr = NULL; enum rtsp_read_request_response reply; int rtsp_read_request_attempt_count = 1; // 1 means exit immediately + rtsp_message *req, *resp; + pthread_cleanup_push(rtsp_conversation_thread_cleanup_function, (void *)conn); while (conn->stop == 0) { int debug_level = 3; // for printing the request and response reply = rtsp_read_request(conn, &req); if (reply == rtsp_read_request_response_ok) { + pthread_cleanup_push(msg_cleanup_function, (void *)req); + resp = msg_init(); + pthread_cleanup_push(msg_cleanup_function, (void *)resp); + resp->respcode = 400; + if (strcmp(req->method, "OPTIONS") != 0) // the options message is very common, so don't log it until level 3 debug_level = 2; debug(debug_level, "RTSP thread %d received an RTSP Packet of type \"%s\":", conn->connection_number, req->method), debug_print_msg_headers(debug_level, req); - resp = msg_init(); - resp->respcode = 400; apple_challenge(conn->fd, req, resp); hdr = msg_get_header(req, "CSeq"); @@ -1922,7 +2045,7 @@ static void *rtsp_conversation_thread_func(void *pconn) { // msg_add_header(resp, "Audio-Jack-Status", "connected; type=analog"); msg_add_header(resp, "Server", "AirTunes/105.1"); - if ((conn->authorized == 1) || (rtsp_auth(&auth_nonce, req, resp)) == 0) { + if ((conn->authorized == 1) || (rtsp_auth(&conn->auth_nonce, req, resp)) == 0) { conn->authorized = 1; // it must have been authorized or didn't need a password struct method_handler *mh; int method_selected = 0; @@ -1949,8 +2072,8 @@ static void *rtsp_conversation_thread_func(void *pconn) { if (conn->stop == 0) { msg_write_response(conn->fd, resp); } - msg_free(req); - msg_free(resp); + pthread_cleanup_pop(1); + pthread_cleanup_pop(1); } else { int tstop = 0; if (reply == rtsp_read_request_response_immediate_shutdown_requested) @@ -1991,26 +2114,7 @@ static void *rtsp_conversation_thread_func(void *pconn) { } } } - - if (conn->fd > 0) - close(conn->fd); - if (auth_nonce) - free(auth_nonce); - rtp_terminate(conn); - if (playing_conn == conn) { - debug(3, "Unlocking play lock on RTSP conversation thread %d.", conn->connection_number); - playing_conn = NULL; - pthread_mutex_unlock(&play_lock); - } - debug(2, "Connection %d: RTSP thread terminated.", conn->connection_number); - conn->running = 0; - - // release the player_thread_lock - int rwld = pthread_rwlock_destroy(&conn->player_thread_lock); - if (rwld) - debug(1, "Error %d destroying player_thread_lock for conversation thread %d.", rwld, - conn->connection_number); - + pthread_cleanup_pop(1); pthread_exit(NULL); } @@ -2033,6 +2137,15 @@ static const char *format_address(struct sockaddr *fsa) { } */ +void rtsp_listen_loop_cleanup_handler(__attribute__((unused)) void *arg) { + debug(1, "rtsp_listen_loop_cleanup_handler called."); + cancel_all_RTSP_threads(); + int *sockfd = (int *)arg; + mdns_unregister(); + if (sockfd) + free(sockfd); +} + void rtsp_listen_loop(void) { struct addrinfo hints, *info, *p; char portstr[6]; @@ -2131,7 +2244,9 @@ void rtsp_listen_loop(void) { int acceptfd; struct timeval tv; + pthread_cleanup_push(rtsp_listen_loop_cleanup_handler, (void *)sockfd); do { + pthread_testcancel(); tv.tv_sec = 60; tv.tv_usec = 0; @@ -2166,7 +2281,8 @@ void rtsp_listen_loop(void) { conn->fd = accept(acceptfd, (struct sockaddr *)&conn->remote, &slen); if (conn->fd < 0) { - debug(1, "New RTSP connection on port %d not accepted:", config.port); + debug(1, "Connection %d: New connection on port %d not accepted:", conn->connection_number, + config.port); perror("failed to accept connection"); free(conn); } else { @@ -2185,8 +2301,8 @@ void rtsp_listen_loop(void) { sa = (struct sockaddr_in *)&conn->remote; inet_ntop(AF_INET, &(sa->sin_addr), remote_ip4, INET_ADDRSTRLEN); unsigned short int rport = ntohs(sa->sin_port); - debug(2, "New RTSP connection from %s:%u to self at %s:%u on conversation thread %d.", - remote_ip4, rport, ip4, tport, conn->connection_number); + debug(2, "Connection %d: new connection from %s:%u to self at %s:%u.", + conn->connection_number, remote_ip4, rport, ip4, tport); } #ifdef AF_INET6 if (local_info->SAFAMILY == AF_INET6) { @@ -2202,8 +2318,8 @@ void rtsp_listen_loop(void) { sa6 = (struct sockaddr_in6 *)&conn->remote; // pretend this is loaded with something inet_ntop(AF_INET6, &(sa6->sin6_addr), remote_ip6, INET6_ADDRSTRLEN); u_int16_t rport = ntohs(sa6->sin6_port); - debug(2, "New RTSP connection from [%s]:%u to self at [%s]:%u on conversation thread %d.", - remote_ip6, rport, ip6, tport, conn->connection_number); + debug(2, "Connection %d: new connection from [%s]:%u to self at [%s]:%u.", + conn->connection_number, remote_ip6, rport, ip6, tport); } #endif @@ -2227,11 +2343,6 @@ void rtsp_listen_loop(void) { } } while (1); - mdns_unregister(); - - if (sockfd) - free(sockfd); - - // perror("select"); - // die("fell out of the RTSP select loop"); + pthread_cleanup_pop(0); + debug(1, "Oops -- fell out of the RTSP select loop"); } @@ -4,14 +4,18 @@ #include "player.h" rtsp_conn_info *playing_conn; +rtsp_conn_info **conns; void rtsp_listen_loop(void); // void rtsp_shutdown_stream(void); void rtsp_request_shutdown_stream(void); -// initialise the metadata stuff +void cancel_all_RTSP_threads(void); + +// initialise and completely delete the metadata stuff void metadata_init(void); +void metadata_stop(void); // sends metadata out to the metadata pipe, if enabled. // It is sent with the type 'ssnc' the given code, data and length diff --git a/shairport.c b/shairport.c index c5450ad..5ce011f 100644 --- a/shairport.c +++ b/shairport.c @@ -36,7 +36,6 @@ #include <stdio.h> #include <stdlib.h> #include <sys/socket.h> -#include <sys/socket.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/wait.h> @@ -60,6 +59,11 @@ #include <glib.h> #endif +#include "common.h" +#include "mdns.h" +#include "rtp.h" +#include "rtsp.h" + #if defined(HAVE_DACP_CLIENT) #include "dacp.h" #endif @@ -80,11 +84,6 @@ #include "mpris-service.h" #endif -#include "common.h" -#include "mdns.h" -#include "rtp.h" -#include "rtsp.h" - #include <libdaemon/dexec.h> #include <libdaemon/dfork.h> #include <libdaemon/dlog.h> @@ -95,20 +94,20 @@ #include <FFTConvolver/convolver.h> #endif -static inline int config_set_lookup_bool(config_t* cfg, char* where, int* dst) { +static inline int config_set_lookup_bool(config_t *cfg, char *where, int *dst) { const char *str = 0; if (config_lookup_string(cfg, where, &str)) { - if (strcasecmp(str, "no") == 0){ - (*dst)=0; + if (strcasecmp(str, "no") == 0) { + (*dst) = 0; return 1; - }else if (strcasecmp(str, "yes") == 0){ - (*dst)=1; + } else if (strcasecmp(str, "yes") == 0) { + (*dst) = 1; return 1; - }else{ + } else { die("Invalid %s option choice \"%s\". It should be \"yes\" or \"no\"", where, str); return 0; } - }else{ + } else { return 0; } } @@ -117,22 +116,11 @@ static int shutting_down = 0; char configuration_file_path[4096 + 1]; char actual_configuration_file_path[4096 + 1]; -void shairport_shutdown() { - if (shutting_down) - return; - shutting_down = 1; - mdns_unregister(); - rtsp_request_shutdown_stream(); - if (config.output) - config.output->deinit(); -} - static void sig_ignore(__attribute__((unused)) int foo, __attribute__((unused)) siginfo_t *bar, __attribute__((unused)) void *baz) {} static void sig_shutdown(__attribute__((unused)) int foo, __attribute__((unused)) siginfo_t *bar, __attribute__((unused)) void *baz) { debug(1, "shutdown requested..."); - shairport_shutdown(); // daemon_log(LOG_NOTICE, "exit..."); daemon_retval_send(255); daemon_pid_file_remove(); @@ -392,6 +380,7 @@ int parse_options(int argc, char **argv) { debug(2, "Looking for configuration file at full path \"%s\"", config_file_real_path); /* Read the file. If there is an error, report it and exit. */ if (config_read_file(&config_file_stuff, config_file_real_path)) { + free(config_file_real_path); // make config.cfg point to it config.cfg = &config_file_stuff; /* Get the Service Name. */ @@ -404,7 +393,8 @@ int parse_options(int argc, char **argv) { config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_with_pid_file", &daemonisewith); /* Get the Just_Daemonize setting. */ - config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_without_pid_file", &daemonisewithout); + config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_without_pid_file", + &daemonisewithout); if ((daemonisewith) && (daemonisewithout)) die("Select either daemonize_with_pid_file or daemonize_without_pid_file -- you have " @@ -474,7 +464,8 @@ int parse_options(int argc, char **argv) { } /* Get the statistics setting. */ - if (!config_set_lookup_bool(config.cfg, "general.statistics", &(config.statistics_requested))) { + if (!config_set_lookup_bool(config.cfg, "general.statistics", + &(config.statistics_requested))) { warn("The \"general\" \"statistics\" setting is deprecated. Please use the \"diagnostics\" " "\"statistics\" setting instead."); } @@ -881,63 +872,61 @@ int parse_options(int argc, char **argv) { #endif #ifdef CONFIG_MQTT - int tmpval=0; - config_set_lookup_bool(config.cfg, "mqtt.enabled", &config.mqtt_enabled); - if(config.mqtt_enabled && !config.metadata_enabled){ - die("You need to have metadata enabled in order to use mqtt"); - } - if (config_lookup_string(config.cfg, "mqtt.hostname", &str)) { - config.mqtt_hostname = (char *)str; - //TODO: Document that, if this is false, whole mqtt func is disabled - } - if (config_lookup_int(config.cfg, "mqtt.port", &tmpval)) { - config.mqtt_port = tmpval; - }else{ - //TODO: Is this the correct way to set a default value? - config.mqtt_port = 1883; - } - - if (config_lookup_string(config.cfg, "mqtt.username", &str)) { - config.mqtt_username = (char *)str; - } - if (config_lookup_string(config.cfg, "mqtt.password", &str)) { - config.mqtt_password = (char *)str; - } - int capath=0; - if (config_lookup_string(config.cfg, "mqtt.capath", &str)) { - config.mqtt_capath = (char *)str; - capath=1; - } - if (config_lookup_string(config.cfg, "mqtt.cafile", &str)) { - if(capath) - die("Supply either mqtt cafile or mqtt capath -- you have supplied both!"); - config.mqtt_cafile = (char *)str; - } - int certkeynum=0; - if (config_lookup_string(config.cfg, "mqtt.certfile", &str)) { - config.mqtt_certfile = (char *)str; - certkeynum++; - } - if (config_lookup_string(config.cfg, "mqtt.keyfile", &str)) { - config.mqtt_keyfile = (char *)str; - certkeynum++; - } - if( certkeynum!=0 && certkeynum!=2){ - die("If you want to use TLS Client Authentication, you have to specify " - "mqtt.certfile AND mqtt.keyfile.\nYou have supplied only one of them.\n" - "If you do not want to use TLS Client Authentication, leave both empty." - ); - } - - if(config_lookup_string(config.cfg, "mqtt.topic", &str)){ - config.mqtt_topic = (char *)str; - } - config_set_lookup_bool(config.cfg, "mqtt.publish_raw", &config.mqtt_publish_raw); - config_set_lookup_bool(config.cfg, "mqtt.publish_parsed", &config.mqtt_publish_parsed); - config_set_lookup_bool(config.cfg, "mqtt.publish_cover", &config.mqtt_publish_cover); - config_set_lookup_bool(config.cfg, "mqtt.enable_remote", &config.mqtt_enable_remote); + int tmpval = 0; + config_set_lookup_bool(config.cfg, "mqtt.enabled", &config.mqtt_enabled); + if (config.mqtt_enabled && !config.metadata_enabled) { + die("You need to have metadata enabled in order to use mqtt"); + } + if (config_lookup_string(config.cfg, "mqtt.hostname", &str)) { + config.mqtt_hostname = (char *)str; + // TODO: Document that, if this is false, whole mqtt func is disabled + } + if (config_lookup_int(config.cfg, "mqtt.port", &tmpval)) { + config.mqtt_port = tmpval; + } else { + // TODO: Is this the correct way to set a default value? + config.mqtt_port = 1883; + } + + if (config_lookup_string(config.cfg, "mqtt.username", &str)) { + config.mqtt_username = (char *)str; + } + if (config_lookup_string(config.cfg, "mqtt.password", &str)) { + config.mqtt_password = (char *)str; + } + int capath = 0; + if (config_lookup_string(config.cfg, "mqtt.capath", &str)) { + config.mqtt_capath = (char *)str; + capath = 1; + } + if (config_lookup_string(config.cfg, "mqtt.cafile", &str)) { + if (capath) + die("Supply either mqtt cafile or mqtt capath -- you have supplied both!"); + config.mqtt_cafile = (char *)str; + } + int certkeynum = 0; + if (config_lookup_string(config.cfg, "mqtt.certfile", &str)) { + config.mqtt_certfile = (char *)str; + certkeynum++; + } + if (config_lookup_string(config.cfg, "mqtt.keyfile", &str)) { + config.mqtt_keyfile = (char *)str; + certkeynum++; + } + if (certkeynum != 0 && certkeynum != 2) { + die("If you want to use TLS Client Authentication, you have to specify " + "mqtt.certfile AND mqtt.keyfile.\nYou have supplied only one of them.\n" + "If you do not want to use TLS Client Authentication, leave both empty."); + } + + if (config_lookup_string(config.cfg, "mqtt.topic", &str)) { + config.mqtt_topic = (char *)str; + } + config_set_lookup_bool(config.cfg, "mqtt.publish_raw", &config.mqtt_publish_raw); + config_set_lookup_bool(config.cfg, "mqtt.publish_parsed", &config.mqtt_publish_parsed); + config_set_lookup_bool(config.cfg, "mqtt.publish_cover", &config.mqtt_publish_cover); + config_set_lookup_bool(config.cfg, "mqtt.enable_remote", &config.mqtt_enable_remote); #endif - free(config_file_real_path); } // now, do the command line options again, but this time do them fully -- it's a unix convention @@ -1035,13 +1024,13 @@ int parse_options(int argc, char **argv) { free(i2); free(i3); free(vs); - + #ifdef CONFIG_MQTT // mqtt topic was not set. As we have the service name just now, set it - if(config.mqtt_topic==NULL){ - int topic_length=1+strlen(config.service_name)+1; - char* topic=malloc(topic_length+1); - snprintf(topic,topic_length,"/%s/",config.service_name); + if (config.mqtt_topic == NULL) { + int topic_length = 1 + strlen(config.service_name) + 1; + char *topic = malloc(topic_length + 1); + snprintf(topic, topic_length, "/%s/", config.service_name); config.mqtt_topic = topic; } #endif @@ -1062,13 +1051,14 @@ int parse_options(int argc, char **argv) { } #if defined(HAVE_DBUS) || defined(HAVE_MPRIS) -GMainLoop *loop; +GMainLoop *g_main_loop; pthread_t dbus_thread; void *dbus_thread_func(__attribute__((unused)) void *arg) { - loop = g_main_loop_new(NULL, FALSE); - g_main_loop_run(loop); - return NULL; + g_main_loop = g_main_loop_new(NULL, FALSE); + g_main_loop_run(g_main_loop); + debug(1, "g_main_loop thread exit"); + pthread_exit(NULL); } #endif @@ -1130,7 +1120,18 @@ const char *pid_file_proc(void) { } void exit_function() { - // debug(1, "exit function called..."); + debug(1, "exit function called..."); + // cancel_all_RTSP_threads(); + if (conns) + free(conns); // make sure the connections have been deleted first + if (config.service_name) + free(config.service_name); + if (config.regtype) + free(config.regtype); + if (config.computed_piddir) + free(config.computed_piddir); + if (ranarray) + free((void *)ranarray); if (config.cfg) config_destroy(config.cfg); if (config.appName) @@ -1138,7 +1139,54 @@ void exit_function() { // probably should be freeing malloc'ed memory here, including strdup-created strings... } +void main_cleanup_handler(__attribute__((unused)) void *arg) { + + debug(1, "main cleanup handler called."); +#ifdef HAVE_MQTT + if (config.mqtt_enabled) { + // terminate_mqtt(); + } +#endif + +#if defined(HAVE_DBUS) || defined(HAVE_MPRIS) +#ifdef HAVE_MPRIS +// stop_mpris_service(); +#endif +#ifdef HAVE_DBUS + stop_dbus_service(); +#endif + debug(1, "Stopping DBUS Loop Thread"); + g_main_loop_quit(g_main_loop); + pthread_join(dbus_thread, NULL); +#endif + +#ifdef HAVE_DACP_CLIENT + debug(1, "Stopping DACP Monitor"); + dacp_monitor_stop(); +#endif + +#ifdef HAVE_METADATA_HUB + debug(1, "Stopping metadata hub"); + metadata_hub_stop(); +#endif + +#ifdef CONFIG_METADATA + metadata_stop(); // close down the metadata pipe +#endif + if (config.output->deinit) { + debug(1, "Deinitialise the audio backend."); + config.output->deinit(); + } + daemon_log(LOG_NOTICE, "Unexpected exit..."); + daemon_retval_send(0); + daemon_pid_file_remove(); + daemon_signal_done(); + exit(0); +} + int main(int argc, char **argv) { + conns = NULL; // no connections active + memset((void *)&main_thread_id, 0, sizeof(main_thread_id)); fp_time_at_startup = get_absolute_time_in_fp(); fp_time_at_last_debug_message = fp_time_at_startup; // debug(1,"startup"); @@ -1251,7 +1299,7 @@ int main(int argc, char **argv) { daemon_pid_file_ident = daemon_log_ident = daemon_ident_from_argv0(argv[0]); daemon_pid_file_proc = pid_file_proc; - + /* Check if we are called with -D or --disconnectFromOutput parameter */ if (argc >= 2 && ((strcmp(argv[1], "-D") == 0) || (strcmp(argv[1], "--disconnectFromOutput") == 0))) { @@ -1361,10 +1409,11 @@ int main(int argc, char **argv) { /* Close FDs */ if (daemon_close_all(-1) < 0) { daemon_log(LOG_ERR, "Failed to close all file descriptors: %s", strerror(errno)); - /* Send the error condition to the parent process */ daemon_retval_send(1); - goto finish; + + daemon_signal_done(); + return 0; } /* Create the PID file if required */ @@ -1375,12 +1424,16 @@ int main(int argc, char **argv) { if ((result != 0) && (result != -EEXIST)) { // error creating or accessing the PID file directory daemon_retval_send(3); - goto finish; + + daemon_signal_done(); + return 0; } if (daemon_pid_file_create() < 0) { daemon_log(LOG_ERR, "Could not create PID file (%s).", strerror(errno)); + daemon_retval_send(2); - goto finish; + daemon_signal_done(); + return 0; } } @@ -1390,6 +1443,10 @@ int main(int argc, char **argv) { /* end libdaemon stuff */ } + main_thread_id = pthread_self(); + if (!main_thread_id) + debug(1, "Main thread is set up to be NULL!"); + signal_setup(); // make sure the program can create files that group and world can read @@ -1402,6 +1459,8 @@ int main(int argc, char **argv) { } config.output->init(argc - audio_arg, argv + audio_arg); + pthread_cleanup_push(main_cleanup_handler, NULL); + // daemon_log(LOG_NOTICE, "startup"); switch (endianness) { @@ -1577,7 +1636,7 @@ int main(int argc, char **argv) { #endif #ifdef HAVE_MQTT - if(config.mqtt_enabled){ + if (config.mqtt_enabled) { initialise_mqtt(); } #endif @@ -1586,10 +1645,10 @@ int main(int argc, char **argv) { rtsp_listen_loop(); // should not reach this... - shairport_shutdown(); -finish: - daemon_log(LOG_NOTICE, "Unexpected exit..."); - daemon_retval_send(255); - daemon_pid_file_remove(); - return 1; + // daemon_log(LOG_NOTICE, "Unexpected exit..."); + // daemon_retval_send(0); + // daemon_pid_file_remove(); + pthread_cleanup_pop(1); + debug(1, "Odd exit point"); + pthread_exit(NULL); } |