diff options
Diffstat (limited to 'rtsp.c')
-rw-r--r-- | rtsp.c | 1360 |
1 files changed, 925 insertions, 435 deletions
@@ -3,7 +3,7 @@ * Copyright (c) James Laird 2013 * Modifications associated with audio synchronization, mutithreading and - * metadata handling copyright (c) Mike Brady 2014-2018 + * metadata handling copyright (c) Mike Brady 2014-2019 * All rights reserved. * * Permission is hereby granted, free of charge, to any person @@ -46,15 +46,16 @@ #include "config.h" -#ifdef HAVE_LIBSSL +#ifdef CONFIG_OPENSSL #include <openssl/md5.h> #endif -#ifdef HAVE_LIBMBEDTLS +#ifdef CONFIG_MBEDTLS #include <mbedtls/md5.h> +#include <mbedtls/version.h> #endif -#ifdef HAVE_LIBPOLARSSL +#ifdef CONFIG_POLARSSL #include <polarssl/md5.h> #endif @@ -63,10 +64,14 @@ #include "rtp.h" #include "rtsp.h" -#ifdef HAVE_METADATA_HUB +#ifdef CONFIG_METADATA_HUB #include "metadata_hub.h" #endif +#ifdef CONFIG_MQTT +#include "mqtt.h" +#endif + #ifdef AF_INET6 #define INETx_ADDRSTRLEN INET6_ADDRSTRLEN #else @@ -85,7 +90,11 @@ enum rtsp_read_request_response { }; // Mike Brady's part... -static pthread_mutex_t play_lock = PTHREAD_MUTEX_INITIALIZER; + +int metadata_running = 0; + +// always lock use this when accessing the playing conn value +static pthread_mutex_t playing_conn_lock = PTHREAD_MUTEX_INITIALIZER; // every time we want to retain or release a reference count, lock it with this // if a reference count is read as zero, it means the it's being deallocated. @@ -97,8 +106,6 @@ static pthread_mutex_t reference_counter_lock = PTHREAD_MUTEX_INITIALIZER; // static int please_shutdown = 0; // static pthread_t playing_thread = 0; -static rtsp_conn_info **conns = NULL; - int RTSP_connection_index = 1; #ifdef CONFIG_METADATA @@ -115,7 +122,10 @@ typedef struct { } pc_queue; // producer-consumer queue #endif +static int msg_indexes = 1; + typedef struct { + int index_number; uint32_t referenceCount; // we might start using this... unsigned int nheaders; char *name[16]; @@ -152,6 +162,12 @@ void pc_queue_init(pc_queue *the_queue, char *items, size_t item_size, uint32_t the_queue->eoq = 0; } +void pc_queue_delete(pc_queue *the_queue) { + pthread_cond_destroy(&the_queue->pc_queue_item_removed_signal); + pthread_cond_destroy(&the_queue->pc_queue_item_added_signal); + pthread_mutex_destroy(&the_queue->pc_queue_lock); +} + int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier, int block); @@ -159,17 +175,26 @@ int send_ssnc_metadata(uint32_t code, char *data, uint32_t length, int block) { return send_metadata('ssnc', code, data, length, NULL, block); } +void pc_queue_cleanup_handler(void *arg) { + // debug(1, "pc_queue_cleanup_handler called."); + pc_queue *the_queue = (pc_queue *)arg; + int rc = pthread_mutex_unlock(&the_queue->pc_queue_lock); + if (rc) + debug(1, "Error unlocking for pc_queue_add_item or pc_queue_get_item."); +} + int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) { int rc; if (the_queue) { if (block == 0) { - rc = pthread_mutex_trylock(&the_queue->pc_queue_lock); + rc = debug_mutex_lock(&the_queue->pc_queue_lock, 10000, 2); if (rc == EBUSY) return EBUSY; } else rc = pthread_mutex_lock(&the_queue->pc_queue_lock); if (rc) debug(1, "Error locking for pc_queue_add_item"); + pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue); while (the_queue->count == the_queue->capacity) { rc = pthread_cond_wait(&the_queue->pc_queue_item_removed_signal, &the_queue->pc_queue_lock); if (rc) @@ -192,9 +217,7 @@ int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) { rc = pthread_cond_signal(&the_queue->pc_queue_item_added_signal); if (rc) debug(1, "Error signalling after pc_queue_add_item"); - rc = pthread_mutex_unlock(&the_queue->pc_queue_lock); - if (rc) - debug(1, "Error unlocking for pc_queue_add_item"); + pthread_cleanup_pop(1); // unlock the queue lock. } else { debug(1, "Adding an item to a NULL queue"); } @@ -207,6 +230,7 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) { rc = pthread_mutex_lock(&the_queue->pc_queue_lock); if (rc) debug(1, "Error locking for pc_queue_get_item"); + pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue); while (the_queue->count == 0) { rc = pthread_cond_wait(&the_queue->pc_queue_item_added_signal, &the_queue->pc_queue_lock); if (rc) @@ -227,9 +251,7 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) { rc = pthread_cond_signal(&the_queue->pc_queue_item_removed_signal); if (rc) debug(1, "Error signalling after pc_queue_removed_item"); - rc = pthread_mutex_unlock(&the_queue->pc_queue_lock); - if (rc) - debug(1, "Error unlocking for pc_queue_get_item"); + pthread_cleanup_pop(1); // unlock the queue lock. } else { debug(1, "Removing an item from a NULL queue"); } @@ -238,6 +260,60 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) { #endif +int have_player(rtsp_conn_info *conn) { + int response = 0; + debug_mutex_lock(&playing_conn_lock, 1000000, 3); + if (playing_conn == conn) // this connection definitely has the play lock + response = 1; + debug_mutex_unlock(&playing_conn_lock, 3); + return response; +} + +void player_watchdog_thread_cleanup_handler(void *arg) { + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + debug(3, "Connection %d: Watchdog Exit.", conn->connection_number); +} + +void *player_watchdog_thread_code(void *arg) { + pthread_cleanup_push(player_watchdog_thread_cleanup_handler, arg); + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + do { + usleep(2000000); // check every two seconds + // debug(3, "Connection %d: Check the thread is doing something...", conn->connection_number); + if ((config.dont_check_timeout == 0) && (config.timeout != 0)) { + debug_mutex_lock(&conn->watchdog_mutex, 1000, 0); + uint64_t last_watchdog_bark_time = conn->watchdog_bark_time; + debug_mutex_unlock(&conn->watchdog_mutex, 0); + if (last_watchdog_bark_time != 0) { + uint64_t time_since_last_bark = (get_absolute_time_in_fp() - last_watchdog_bark_time) >> 32; + uint64_t ct = config.timeout; // go from int to 64-bit int + + if (time_since_last_bark >= ct) { + conn->watchdog_barks++; + if (conn->watchdog_barks == 1) { + // debuglev = 3; // tell us everything. + debug(1, "Connection %d: As Yeats almost said, \"Too long a silence / can make a stone " + "of the heart\".", + conn->connection_number); + conn->stop = 1; + pthread_cancel(conn->thread); + } else if (conn->watchdog_barks == 3) { + if ((config.cmd_unfixable) && (conn->unfixable_error_reported == 0)) { + conn->unfixable_error_reported = 1; + command_execute(config.cmd_unfixable, "unable_to_cancel_play_session", 1); + } else { + warn("an unrecoverable error, \"unable_to_cancel_play_session\", has been detected.", + conn->connection_number); + } + } + } + } + } + } while (1); + pthread_cleanup_pop(0); // should never happen + pthread_exit(NULL); +} + void ask_other_rtsp_conversation_threads_to_stop(pthread_t except_this_thread); void rtsp_request_shutdown_stream(void) { @@ -257,7 +333,20 @@ static void track_thread(rtsp_conn_info *conn) { } } -static void cleanup_threads(void) { +void cancel_all_RTSP_threads(void) { + int i; + for (i = 0; i < nconns; i++) { + debug(1, "Connection %d: cancelling.", conns[i]->connection_number); + pthread_cancel(conns[i]->thread); + } + for (i = 0; i < nconns; i++) { + debug(1, "Connection %d: joining.", conns[i]->connection_number); + pthread_join(conns[i]->thread, NULL); + free(conns[i]); + } +} + +void cleanup_threads(void) { void *retval; int i; // debug(2, "culling threads."); @@ -267,8 +356,6 @@ static void cleanup_threads(void) { conns[i]->connection_number); pthread_join(conns[i]->thread, &retval); debug(3, "RTSP connection thread %d deleted...", conns[i]->connection_number); - if (conns[i] == playing_conn) - playing_conn = NULL; free(conns[i]); nconns--; if (nconns) @@ -288,8 +375,11 @@ void ask_other_rtsp_conversation_threads_to_stop(pthread_t except_this_thread) { for (i = 0; i < nconns; i++) { if (((except_this_thread == 0) || (pthread_equal(conns[i]->thread, except_this_thread) == 0)) && (conns[i]->running != 0)) { - conns[i]->stop = 1; - pthread_kill(conns[i]->thread, SIGUSR1); + pthread_cancel(conns[i]->thread); + pthread_join(conns[i]->thread, NULL); + debug(1, "Connection %d: asked to stop.", conns[i]->connection_number); + // conns[i]->stop = 1; + // pthread_kill(conns[i]->thread, SIGUSR1); } } } @@ -317,32 +407,35 @@ static char *nextline(char *in, int inbuf) { return out; } -static void msg_retain(rtsp_message *msg) { - if (msg) { - int rc = pthread_mutex_lock(&reference_counter_lock); - if (rc) - debug(1, "Error %d locking reference counter lock"); +void msg_retain(rtsp_message *msg) { + int rc = pthread_mutex_lock(&reference_counter_lock); + if (rc) + debug(1, "Error %d locking reference counter lock"); + if (msg > (rtsp_message *)0x00010000) { msg->referenceCount++; + // debug(1,"msg_retain -- item %d reference count %d.", msg->index_number, msg->referenceCount); rc = pthread_mutex_unlock(&reference_counter_lock); if (rc) debug(1, "Error %d unlocking reference counter lock"); } else { - debug(1, "null rtsp_message pointer passed to retain"); + debug(1, "invalid rtsp_message pointer 0x%x passed to retain", (uintptr_t)msg); } } -static rtsp_message *msg_init(void) { +rtsp_message *msg_init(void) { rtsp_message *msg = malloc(sizeof(rtsp_message)); if (msg) { memset(msg, 0, sizeof(rtsp_message)); msg->referenceCount = 1; // from now on, any access to this must be protected with the lock + msg->index_number = msg_indexes++; } else { - die("can not allocate memory for an rtsp_message."); + die("msg_init -- can not allocate memory for rtsp_message %d.", msg_indexes); } + // debug(1,"msg_init -- create item %d.", msg->index_number); return msg; } -static int msg_add_header(rtsp_message *msg, char *name, char *value) { +int msg_add_header(rtsp_message *msg, char *name, char *value) { if (msg->nheaders >= sizeof(msg->name) / sizeof(char *)) { warn("too many headers?!"); return 1; @@ -355,7 +448,7 @@ static int msg_add_header(rtsp_message *msg, char *name, char *value) { return 0; } -static char *msg_get_header(rtsp_message *msg, char *name) { +char *msg_get_header(rtsp_message *msg, char *name) { unsigned int i; for (i = 0; i < msg->nheaders; i++) if (!strcasecmp(msg->name[i], name)) @@ -363,7 +456,7 @@ static char *msg_get_header(rtsp_message *msg, char *name) { return NULL; } -static void debug_print_msg_headers(int level, rtsp_message *msg) { +void debug_print_msg_headers(int level, rtsp_message *msg) { unsigned int i; for (i = 0; i < msg->nheaders; i++) { debug(level, " Type: \"%s\", content: \"%s\"", msg->name[i], msg->value[i]); @@ -393,16 +486,11 @@ static void debug_print_msg_content(int level, rtsp_message *msg) { } */ -static void msg_free(rtsp_message *msg) { - - if (msg) { - int rc = pthread_mutex_lock(&reference_counter_lock); - if (rc) - debug(1, "Error %d locking reference counter lock during msg_free()", rc); +void msg_free(rtsp_message **msgh) { + debug_mutex_lock(&reference_counter_lock, 1000, 0); + if (*msgh > (rtsp_message *)0x00010000) { + rtsp_message *msg = *msgh; msg->referenceCount--; - rc = pthread_mutex_unlock(&reference_counter_lock); - if (rc) - debug(1, "Error %d unlocking reference counter lock during msg_free()", rc); if (msg->referenceCount == 0) { unsigned int i; for (i = 0; i < msg->nheaders; i++) { @@ -411,17 +499,27 @@ static void msg_free(rtsp_message *msg) { } if (msg->content) free(msg->content); + // debug(1,"msg_free item %d -- free.",msg->index_number); + uintptr_t index = (msg->index_number) & 0xFFFF; + if (index == 0) + index = 0x10000; // ensure it doesn't fold to zero. + *msgh = + (rtsp_message *)(index); // put a version of the index number of the freed message in here free(msg); - } // else { - // debug(1,"rtsp_message reference count non-zero: - // %d!",msg->referenceCount); - //} - } else { - debug(1, "null rtsp_message pointer passed to msg_free()"); + } else { + // debug(1,"msg_free item %d -- decrement reference to + // %d.",msg->index_number,msg->referenceCount); + } + } else if (*msgh != NULL) { + debug(1, + "msg_free: error attempting to free an allocated but already-freed rtsp_message, number " + "%d.", + (uintptr_t)*msgh); } + debug_mutex_unlock(&reference_counter_lock, 0); } -static int msg_handle_line(rtsp_message **pmsg, char *line) { +int msg_handle_line(rtsp_message **pmsg, char *line) { rtsp_message *msg = *pmsg; if (!msg) { @@ -430,7 +528,7 @@ static int msg_handle_line(rtsp_message **pmsg, char *line) { char *sp, *p; sp = NULL; // this is to quieten a compiler warning - // debug(1, "received request: %s", line); + debug(3, "RTSP Message Received: \"%s\".", line); p = strtok_r(line, " ", &sp); if (!p) @@ -471,36 +569,43 @@ static int msg_handle_line(rtsp_message **pmsg, char *line) { } fail: - *pmsg = NULL; - msg_free(msg); + msg_free(pmsg); return 0; } -static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, - rtsp_message **the_packet) { - enum rtsp_read_request_response reply = rtsp_read_request_response_ok; - ssize_t buflen = 4096; - char *buf = malloc(buflen + 1); +enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, rtsp_message **the_packet) { - rtsp_message *msg = NULL; + *the_packet = NULL; // need this for erro handling + enum rtsp_read_request_response reply = rtsp_read_request_response_ok; + ssize_t buflen = 4096; + int release_buffer = 0; // on exit, don't deallocate the buffer if everything was okay + char *buf = malloc(buflen + 1); // add a NUL at the end + if (!buf) { + warn("rtsp_read_request: can't get a buffer."); + return (rtsp_read_request_response_error); + } + pthread_cleanup_push(malloc_cleanup, buf); ssize_t nread; ssize_t inbuf = 0; int msg_size = -1; while (msg_size < 0) { - fd_set readfds; - FD_ZERO(&readfds); - FD_SET(conn->fd, &readfds); - do { - memory_barrier(); - } while (conn->stop == 0 && - pselect(conn->fd + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0); + /* +fd_set readfds; +FD_ZERO(&readfds); +FD_SET(conn->fd, &readfds); +do { + memory_barrier(); +} while (conn->stop == 0 && + pselect(conn->fd + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0); +*/ if (conn->stop != 0) { debug(3, "RTSP conversation thread %d shutdown requested.", conn->connection_number); reply = rtsp_read_request_response_immediate_shutdown_requested; goto shutdown; } + nread = read(conn->fd, buf + inbuf, buflen - inbuf); if (nread == 0) { @@ -513,9 +618,16 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, if (nread < 0) { if (errno == EINTR) continue; - char errorstring[1024]; - strerror_r(errno, (char *)errorstring, sizeof(errorstring)); - debug(1, "rtsp_read_request_response_read_error %d: \"%s\".", errno, (char *)errorstring); + if (errno == EAGAIN) { + debug(1, "Getting Error 11 -- EAGAIN from a blocking read!"); + continue; + } + if (errno != ECONNRESET) { + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + debug(1, "Connection %d: rtsp_read_request_response_read_error %d: \"%s\".", + conn->connection_number, errno, (char *)errorstring); + } reply = rtsp_read_request_response_read_error; goto shutdown; } @@ -523,9 +635,9 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, char *next; while (msg_size < 0 && (next = nextline(buf, inbuf))) { - msg_size = msg_handle_line(&msg, buf); + msg_size = msg_handle_line(the_packet, buf); - if (!msg) { + if (!(*the_packet)) { warn("no RTSP header received"); reply = rtsp_read_request_response_bad_packet; goto shutdown; @@ -538,7 +650,7 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, } if (msg_size > buflen) { - buf = realloc(buf, msg_size); + buf = realloc(buf, msg_size + 1); if (!buf) { warn("too much content"); reply = rtsp_read_request_response_error; @@ -571,6 +683,8 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, warning_message_sent = 1; } } + + /* fd_set readfds; FD_ZERO(&readfds); FD_SET(conn->fd, &readfds); @@ -578,6 +692,8 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, memory_barrier(); } while (conn->stop == 0 && pselect(conn->fd + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0); + */ + if (conn->stop != 0) { debug(1, "RTSP shutdown requested."); reply = rtsp_read_request_response_immediate_shutdown_requested; @@ -586,7 +702,7 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, size_t read_chunk = msg_size - inbuf; if (read_chunk > max_read_chunk) read_chunk = max_read_chunk; - usleep(40000); // wait about 40 milliseconds between reads of up to about 64 kB + usleep(80000); // wait about 80 milliseconds between reads of up to about 64 kB nread = read(conn->fd, buf + inbuf, read_chunk); if (!nread) { reply = rtsp_read_request_response_error; @@ -602,23 +718,22 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, inbuf += nread; } + rtsp_message *msg = *the_packet; msg->contentlength = inbuf; msg->content = buf; + char *jp = inbuf + buf; + *jp = '\0'; *the_packet = msg; - return reply; - shutdown: - if (msg) { - msg_free(msg); // which will free the content and everything else + if (reply != rtsp_read_request_response_ok) { + msg_free(the_packet); + release_buffer = 1; // allow the buffer to be released } - // in case the message wasn't formed or wasn't fully initialised - if ((msg && (msg->content == NULL)) || (!msg)) - free(buf); - *the_packet = NULL; + pthread_cleanup_pop(release_buffer); return reply; } -static void msg_write_response(int fd, rtsp_message *resp) { +int msg_write_response(int fd, rtsp_message *resp) { char pkt[2048]; int pktfree = sizeof(pkt); char *p = pkt; @@ -636,8 +751,10 @@ static void msg_write_response(int fd, rtsp_message *resp) { n = snprintf(p, pktfree, "%s: %s\r\n", resp->name[i], resp->value[i]); pktfree -= n; p += n; - if (pktfree <= 1024) - die("Attempted to write overlong RTSP packet 1"); + if (pktfree <= 1024) { + debug(1, "Attempted to write overlong RTSP packet 1"); + return -1; + } } // Here, if there's content, write the Content-Length header ... @@ -647,8 +764,10 @@ static void msg_write_response(int fd, rtsp_message *resp) { n = snprintf(p, pktfree, "Content-Length: %d\r\n", resp->contentlength); pktfree -= n; p += n; - if (pktfree <= 1024) - die("Attempted to write overlong RTSP packet 2"); + if (pktfree <= 1024) { + debug(1, "Attempted to write overlong RTSP packet 2"); + return -2; + } debug(1, "Content is \"%s\"", resp->content); memcpy(p, resp->content, resp->contentlength); pktfree -= resp->contentlength; @@ -659,212 +778,248 @@ static void msg_write_response(int fd, rtsp_message *resp) { pktfree -= n; p += n; - if (pktfree <= 1024) - die("Attempted to write overlong RTSP packet 3"); - - if (write(fd, pkt, p - pkt) != p - pkt) - debug(1, "Error writing an RTSP packet -- requested bytes not fully written."); + if (pktfree <= 1024) { + debug(1, "Attempted to write overlong RTSP packet 3"); + return -3; + } + ssize_t reply = write(fd, pkt, p - pkt); + if (reply == -1) { + char errorstring[1024]; + strerror_r(errno, (char *)errorstring, sizeof(errorstring)); + debug(1, "msg_write_response error %d: \"%s\".", errno, (char *)errorstring); + return -4; + } + if (reply != p - pkt) { + debug(1, "msg_write_response error -- requested bytes: %d not fully written: %d.", p - pkt, + reply); + return -5; + } + return 0; } -static void handle_record(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) { +void handle_record(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) { debug(2, "Connection %d: RECORD", conn->connection_number); + if (have_player(conn)) { + if (conn->player_thread) + warn("Connection %d: RECORD: Duplicate RECORD message -- ignored", conn->connection_number); + else + player_play(conn); // the thread better be 0 - if (conn->player_thread) - warn("Duplicate RECORD message -- ignored"); - else - player_play(conn); // the thread better be 0 - - resp->respcode = 200; - // I think this is for telling the client what the absolute minimum latency - // actually is, - // and when the client specifies a latency, it should be added to this figure. + resp->respcode = 200; + // I think this is for telling the client what the absolute minimum latency + // actually is, + // and when the client specifies a latency, it should be added to this figure. - // Thus, [the old version of] AirPlay's latency figure of 77175, when added to 11025 gives you - // exactly 88200 - // and iTunes' latency figure of 88553, when added to 11025 gives you 99578, - // pretty close to the 99400 we guessed. + // Thus, [the old version of] AirPlay's latency figure of 77175, when added to 11025 gives you + // exactly 88200 + // and iTunes' latency figure of 88553, when added to 11025 gives you 99578, + // pretty close to the 99400 we guessed. - msg_add_header(resp, "Audio-Latency", "11025"); + msg_add_header(resp, "Audio-Latency", "11025"); - char *p; - uint32_t rtptime = 0; - char *hdr = msg_get_header(req, "RTP-Info"); + char *p; + uint32_t rtptime = 0; + char *hdr = msg_get_header(req, "RTP-Info"); - if (hdr) { - // debug(1,"FLUSH message received: \"%s\".",hdr); - // get the rtp timestamp - p = strstr(hdr, "rtptime="); - if (p) { - p = strchr(p, '='); + if (hdr) { + // debug(1,"FLUSH message received: \"%s\".",hdr); + // get the rtp timestamp + p = strstr(hdr, "rtptime="); if (p) { - rtptime = uatoi(p + 1); // unsigned integer -- up to 2^32-1 - // rtptime--; - // debug(1,"RTSP Flush Requested by handle_record: %u.",rtptime); - player_flush(rtptime, conn); + p = strchr(p, '='); + if (p) { + rtptime = uatoi(p + 1); // unsigned integer -- up to 2^32-1 + // rtptime--; + // debug(1,"RTSP Flush Requested by handle_record: %u.",rtptime); + player_flush(rtptime, conn); + } } } + } else { + warn("Connection %d RECORD received without having the player (no ANNOUNCE?)", + conn->connection_number); + resp->respcode = 451; } } -static void handle_options(rtsp_conn_info *conn, __attribute__((unused)) rtsp_message *req, - rtsp_message *resp) { +void handle_options(rtsp_conn_info *conn, __attribute__((unused)) rtsp_message *req, + rtsp_message *resp) { debug(3, "Connection %d: OPTIONS", conn->connection_number); resp->respcode = 200; - msg_add_header(resp, "Public", - "ANNOUNCE, SETUP, RECORD, " - "PAUSE, FLUSH, TEARDOWN, " - "OPTIONS, GET_PARAMETER, SET_PARAMETER"); + msg_add_header(resp, "Public", "ANNOUNCE, SETUP, RECORD, " + "PAUSE, FLUSH, TEARDOWN, " + "OPTIONS, GET_PARAMETER, SET_PARAMETER"); } -static void handle_teardown(rtsp_conn_info *conn, __attribute__((unused)) rtsp_message *req, - rtsp_message *resp) { +void handle_teardown(rtsp_conn_info *conn, __attribute__((unused)) rtsp_message *req, + rtsp_message *resp) { debug(2, "Connection %d: TEARDOWN", conn->connection_number); - // if (!rtsp_playing()) - // debug(1, "This RTSP connection thread (%d) doesn't think it's playing, but " - // "it's sending a response to teardown anyway",conn->connection_number); - resp->respcode = 200; - msg_add_header(resp, "Connection", "close"); - - debug(3, + if (have_player(conn)) { + resp->respcode = 200; + msg_add_header(resp, "Connection", "close"); + debug( + 3, "TEARDOWN: synchronously terminating the player thread of RTSP conversation thread %d (2).", conn->connection_number); - player_stop(conn); - debug(3, "TEARDOWN: successful termination of playing thread of RTSP conversation thread %d.", - conn->connection_number); + player_stop(conn); + debug(3, "TEARDOWN: successful termination of playing thread of RTSP conversation thread %d.", + conn->connection_number); + } else { + warn("Connection %d TEARDOWN received without having the player (no ANNOUNCE?)", + conn->connection_number); + resp->respcode = 451; + } } -static void handle_flush(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) { +void handle_flush(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) { debug(3, "Connection %d: FLUSH", conn->connection_number); - // if (!rtsp_playing()) - // debug(1, "This RTSP conversation thread (%d) doesn't think it's playing, but " - // "it's sending a response to flush anyway",conn->connection_number); - char *p = NULL; - uint32_t rtptime = 0; - char *hdr = msg_get_header(req, "RTP-Info"); - - if (hdr) { - // debug(1,"FLUSH message received: \"%s\".",hdr); - // get the rtp timestamp - p = strstr(hdr, "rtptime="); - if (p) { - p = strchr(p, '='); - if (p) - rtptime = uatoi(p + 1); // unsigned integer -- up to 2^32-1 + if (have_player(conn)) { + char *p = NULL; + uint32_t rtptime = 0; + char *hdr = msg_get_header(req, "RTP-Info"); + + if (hdr) { + // debug(1,"FLUSH message received: \"%s\".",hdr); + // get the rtp timestamp + p = strstr(hdr, "rtptime="); + if (p) { + p = strchr(p, '='); + if (p) + rtptime = uatoi(p + 1); // unsigned integer -- up to 2^32-1 + } } - } // debug(1,"RTSP Flush Requested: %u.",rtptime); #ifdef CONFIG_METADATA - if (p) - send_metadata('ssnc', 'flsr', p + 1, strlen(p + 1), req, 1); - else - send_metadata('ssnc', 'flsr', NULL, 0, NULL, 0); + if (p) + send_metadata('ssnc', 'flsr', p + 1, strlen(p + 1), req, 1); + else + send_metadata('ssnc', 'flsr', NULL, 0, NULL, 0); #endif - player_flush(rtptime, conn); // will not crash even it there is no player thread. - resp->respcode = 200; + player_flush(rtptime, conn); // will not crash even it there is no player thread. + resp->respcode = 200; + + } else { + warn("Connection %d FLUSH received without having the player (no ANNOUNCE?)", + conn->connection_number); + resp->respcode = 451; + } } -static void handle_setup(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) { +void handle_setup(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) { debug(3, "Connection %d: SETUP", conn->connection_number); - uint16_t cport, tport; - - char *ar = msg_get_header(req, "Active-Remote"); - if (ar) { - debug(2, "Connection %d: SETUP -- Active-Remote string seen: \"%s\".", conn->connection_number, - ar); - // get the active remote - char *p; - conn->dacp_active_remote = strtoul(ar, &p, 10); + resp->respcode = 451; // invalid arguments -- expect them + if (have_player(conn)) { + uint16_t cport, tport; + char *ar = msg_get_header(req, "Active-Remote"); + if (ar) { + debug(2, "Connection %d: SETUP -- Active-Remote string seen: \"%s\".", + conn->connection_number, ar); + // get the active remote + char *p; + conn->dacp_active_remote = strtoul(ar, &p, 10); #ifdef CONFIG_METADATA - send_metadata('ssnc', 'acre', ar, strlen(ar), req, 1); + send_metadata('ssnc', 'acre', ar, strlen(ar), req, 1); #endif - } else { - debug(2, "Connection %d: SETUP -- Note: no Active-Remote information the SETUP Record.", - conn->connection_number); - conn->dacp_active_remote = 0; - } + } else { + debug(2, "Connection %d: SETUP -- Note: no Active-Remote information the SETUP Record.", + conn->connection_number); + conn->dacp_active_remote = 0; + } - ar = msg_get_header(req, "DACP-ID"); - if (ar) { - debug(2, "Connection %d: SETUP -- DACP-ID string seen: \"%s\".", conn->connection_number, ar); - if (conn->dacp_id) // this is in case SETUP was previously called - free(conn->dacp_id); - conn->dacp_id = strdup(ar); + ar = msg_get_header(req, "DACP-ID"); + if (ar) { + debug(2, "Connection %d: SETUP -- DACP-ID string seen: \"%s\".", conn->connection_number, ar); + if (conn->dacp_id) // this is in case SETUP was previously called + free(conn->dacp_id); + conn->dacp_id = strdup(ar); #ifdef CONFIG_METADATA - send_metadata('ssnc', 'daid', ar, strlen(ar), req, 1); + send_metadata('ssnc', 'daid', ar, strlen(ar), req, 1); #endif - } else { - debug(2, "Connection %d: SETUP doesn't include DACP-ID string information.", - conn->connection_number); - if (conn->dacp_id) // this is in case SETUP was previously called - free(conn->dacp_id); - conn->dacp_id = NULL; - } - - char *hdr = msg_get_header(req, "Transport"); - if (!hdr) { - debug(1, "Connection %d: SETUP doesn't contain a Transport header.", conn->connection_number); - goto error; - } + } else { + debug(2, "Connection %d: SETUP doesn't include DACP-ID string information.", + conn->connection_number); + if (conn->dacp_id) // this is in case SETUP was previously called + free(conn->dacp_id); + conn->dacp_id = NULL; + } - char *p; - p = strstr(hdr, "control_port="); - if (!p) { - debug(1, "Connection %d: SETUP doesn't specify a control_port.", conn->connection_number); - goto error; - } - p = strchr(p, '=') + 1; - cport = atoi(p); + char *hdr = msg_get_header(req, "Transport"); + if (hdr) { + char *p; + p = strstr(hdr, "control_port="); + if (p) { + p = strchr(p, '=') + 1; + cport = atoi(p); - p = strstr(hdr, "timing_port="); - if (!p) { - debug(1, "Connection %d: SETUP doesn't specify a timing_port.", conn->connection_number); - goto error; - } - p = strchr(p, '=') + 1; - tport = atoi(p); - - if (conn->rtp_running) { - if ((conn->remote_control_port != cport) || (conn->remote_timing_port != tport)) { - warn("Connection %d: Duplicate SETUP message with different control (old %u, new %u) or " - "timing (old %u, new " - "%u) ports! This is probably fatal!", - conn->connection_number, conn->remote_control_port, cport, conn->remote_timing_port, - tport); + p = strstr(hdr, "timing_port="); + if (p) { + p = strchr(p, '=') + 1; + tport = atoi(p); + + if (conn->rtp_running) { + if ((conn->remote_control_port != cport) || (conn->remote_timing_port != tport)) { + warn("Connection %d: Duplicate SETUP message with different control (old %u, new %u) " + "or " + "timing (old %u, new " + "%u) ports! This is probably fatal!", + conn->connection_number, conn->remote_control_port, cport, + conn->remote_timing_port, tport); + } else { + warn("Connection %d: Duplicate SETUP message with the same control (%u) and timing " + "(%u) " + "ports. This is " + "probably not fatal.", + conn->connection_number, conn->remote_control_port, conn->remote_timing_port); + } + } else { + rtp_setup(&conn->local, &conn->remote, cport, tport, conn); + } + if (conn->local_audio_port != 0) { + + char resphdr[256] = ""; + snprintf(resphdr, sizeof(resphdr), + "RTP/AVP/" + "UDP;unicast;interleaved=0-1;mode=record;control_port=%d;" + "timing_port=%d;server_" + "port=%d", + conn->local_control_port, conn->local_timing_port, conn->local_audio_port); + + msg_add_header(resp, "Transport", resphdr); + + msg_add_header(resp, "Session", "1"); + + resp->respcode = 200; // it all worked out okay + debug(1, "Connection %d: SETUP DACP-ID \"%s\" from %s to %s with UDP ports Control: " + "%d, Timing: %d and Audio: %d.", + conn->connection_number, conn->dacp_id, &conn->client_ip_string, + &conn->self_ip_string, conn->local_control_port, conn->local_timing_port, + conn->local_audio_port); + + } else { + debug(1, "Connection %d: SETUP seems to specify a null audio port.", + conn->connection_number); + } + } else { + debug(1, "Connection %d: SETUP doesn't specify a timing_port.", conn->connection_number); + } + } else { + debug(1, "Connection %d: SETUP doesn't specify a control_port.", conn->connection_number); + } } else { - warn("Connection %d: Duplicate SETUP message with the same control (%u) and timing (%u) " - "ports. This is " - "probably not fatal.", - conn->connection_number, conn->remote_control_port, conn->remote_timing_port); + debug(1, "Connection %d: SETUP doesn't contain a Transport header.", conn->connection_number); } + if (resp->respcode != 200) { + debug(1, "Connection %d: SETUP error -- releasing the player lock.", conn->connection_number); + debug_mutex_lock(&playing_conn_lock, 1000000, 3); + if (playing_conn == conn) // if we have the player + playing_conn = NULL; // let it go + debug_mutex_unlock(&playing_conn_lock, 3); + } + } else { - rtp_setup(&conn->local, &conn->remote, cport, tport, conn); - } - if (conn->local_audio_port == 0) { - debug(1, "Connection %d: SETUP seems to specify a null audio port.", conn->connection_number); - goto error; + warn("Connection %d SETUP received without having the player (no ANNOUNCE?)", + conn->connection_number); } - - char resphdr[256] = ""; - snprintf(resphdr, sizeof(resphdr), - "RTP/AVP/" - "UDP;unicast;interleaved=0-1;mode=record;control_port=%d;" - "timing_port=%d;server_" - "port=%d", - conn->local_control_port, conn->local_timing_port, conn->local_audio_port); - - msg_add_header(resp, "Transport", resphdr); - - msg_add_header(resp, "Session", "1"); - - resp->respcode = 200; - return; - -error: - warn("Connection %d: SETUP -- Error in setup request -- unlocking play lock.", - conn->connection_number); - playing_conn = NULL; - pthread_mutex_unlock(&play_lock); - resp->respcode = 451; // invalid arguments } /* @@ -874,26 +1029,44 @@ static void handle_ignore(rtsp_conn_info *conn, rtsp_message *req, rtsp_message } */ -static void handle_set_parameter_parameter(rtsp_conn_info *conn, rtsp_message *req, - __attribute__((unused)) rtsp_message *resp) { +void handle_set_parameter_parameter(rtsp_conn_info *conn, rtsp_message *req, + __attribute__((unused)) rtsp_message *resp) { + char *cp = req->content; int cp_left = req->contentlength; + /* + int k = cp_left; + if (k>max_bytes) + k = max_bytes; + for (i = 0; i < k; i++) + snprintf((char *)buf + 2 * i, 3, "%02x", cp[i]); + debug(1, "handle_set_parameter_parameter: \"%s\".",buf); + */ + char *next; while (cp_left && cp) { next = nextline(cp, cp_left); - cp_left -= next - cp; + // note: "next" will return NULL if there is no \r or \n or \r\n at the end of this + // but we are always guaranteed that if cp is not null, it will be pointing to something + // NUL-terminated - if (!strncmp(cp, "volume: ", 8)) { - float volume = atof(cp + 8); + if (next) + cp_left -= (next - cp); + else + cp_left = 0; + + if (!strncmp(cp, "volume: ", strlen("volume: "))) { + float volume = atof(cp + strlen("volume: ")); // debug(2, "AirPlay request to set volume to: %f.", volume); player_volume(volume, conn); } else #ifdef CONFIG_METADATA - if (!strncmp(cp, "progress: ", 10)) { - char *progress = cp + 10; - // debug(2, "progress: \"%s\"\n", - // progress); // rtpstampstart/rtpstampnow/rtpstampend 44100 per second + if (!strncmp(cp, "progress: ", strlen("progress: "))) { + char *progress = cp + strlen("progress: "); + // debug(2, "progress: \"%s\"",progress); // rtpstampstart/rtpstampnow/rtpstampend 44100 per + // second send_ssnc_metadata('prgr', strdup(progress), strlen(progress), 1); + } else #endif { @@ -931,6 +1104,8 @@ static void handle_set_parameter_parameter(rtsp_conn_info *conn, rtsp_message *r // 'PICT' -- the payload is a picture, either a JPEG or a PNG. Check the // first few bytes to see // which. +// 'abeg' -- active mode entered. No arguments +// 'aend' -- active mode exited. No arguments // 'pbeg' -- play stream begin. No arguments // 'pend' -- play stream end. No arguments // 'pfls' -- play stream flush. No arguments @@ -1044,9 +1219,9 @@ static char *metadata_sockmsg; #define metadata_queue_size 500 metadata_package metadata_queue_items[metadata_queue_size]; -static pthread_t metadata_thread; +pthread_t metadata_thread; -void metadata_create(void) { +void metadata_create_multicast_socket(void) { if (config.metadata_enabled == 0) return; @@ -1068,7 +1243,7 @@ void metadata_create(void) { if (metadata_sockmsg) { memset(metadata_sockmsg, 0, config.metadata_sockmsglength); } else { - die("Could not malloc metadata socket buffer"); + die("Could not malloc metadata multicast socket buffer"); } } } @@ -1087,6 +1262,15 @@ void metadata_create(void) { umask(oldumask); } +void metadata_delete_multicast_socket(void) { + if (config.metadata_enabled == 0) + return; + shutdown(metadata_sock, SHUT_RDWR); // we want to immediately deallocate the buffer + close(metadata_sock); + if (metadata_sockmsg) + free(metadata_sockmsg); +} + void metadata_open(void) { if (config.metadata_enabled == 0) return; @@ -1104,12 +1288,12 @@ void metadata_open(void) { free(path); } -/* static void metadata_close(void) { + if (fd < 0) + return; close(fd); fd = -1; } -*/ void metadata_process(uint32_t type, uint32_t code, char *data, uint32_t length) { // debug(2, "Process metadata with type %x, code %x and length %u.", type, code, length); @@ -1231,32 +1415,63 @@ void metadata_process(uint32_t type, uint32_t code, char *data, uint32_t length) } } +void metadata_thread_cleanup_function(__attribute__((unused)) void *arg) { + debug(2, "metadata_thread_cleanup_function called"); + metadata_delete_multicast_socket(); + metadata_close(); + pc_queue_delete(&metadata_queue); +} + +void metadata_pack_cleanup_function(void *arg) { + // debug(1, "metadata_pack_cleanup_function called"); + metadata_package *pack = (metadata_package *)arg; + if (pack->carrier) + msg_free(&pack->carrier); // release the message + else if (pack->data) + free(pack->data); +} + void *metadata_thread_function(__attribute__((unused)) void *ignore) { - metadata_create(); + // create a pc_queue for passing information to a threaded metadata handler + pc_queue_init(&metadata_queue, (char *)&metadata_queue_items, sizeof(metadata_package), + metadata_queue_size); + metadata_create_multicast_socket(); metadata_package pack; + pthread_cleanup_push(metadata_thread_cleanup_function, NULL); while (1) { pc_queue_get_item(&metadata_queue, &pack); + pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack); if (config.metadata_enabled) { metadata_process(pack.type, pack.code, pack.data, pack.length); -#ifdef HAVE_METADATA_HUB +#ifdef CONFIG_METADATA_HUB metadata_hub_process_metadata(pack.type, pack.code, pack.data, pack.length); #endif + +#ifdef CONFIG_MQTT + if (config.mqtt_enabled) { + mqtt_process_metadata(pack.type, pack.code, pack.data, pack.length); + } +#endif } - if (pack.carrier) - msg_free(pack.carrier); // release the message - else if (pack.data) - free(pack.data); + pthread_cleanup_pop(1); } + pthread_cleanup_pop(1); // will never happen pthread_exit(NULL); } void metadata_init(void) { - // create a pc_queue for passing information to a threaded metadata handler - pc_queue_init(&metadata_queue, (char *)&metadata_queue_items, sizeof(metadata_package), - metadata_queue_size); int ret = pthread_create(&metadata_thread, NULL, metadata_thread_function, NULL); if (ret) debug(1, "Failed to create metadata thread!"); + metadata_running = 1; +} + +void metadata_stop(void) { + if (metadata_running) { + debug(2, "metadata_stop called."); + pthread_cancel(metadata_thread); + pthread_join(metadata_thread, NULL); + } } int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier, @@ -1293,14 +1508,17 @@ int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rts pack.code = code; pack.data = data; pack.length = length; - if (carrier) - msg_retain(carrier); pack.carrier = carrier; + if (pack.carrier) + msg_retain(pack.carrier); int rc = pc_queue_add_item(&metadata_queue, &pack, block); - if ((rc == EBUSY) && (carrier)) - msg_free(carrier); - if (rc == EBUSY) - warn("Metadata queue is busy, dropping message of type 0x%08X, code 0x%08X.", type, code); + if (rc == EBUSY) { + if (pack.carrier) + msg_free(&pack.carrier); + else if (data) + free(data); + warn("Metadata queue is busy, discarding message of type 0x%08X, code 0x%08X.", type, code); + } return rc; } @@ -1367,7 +1585,7 @@ static void handle_set_parameter(rtsp_conn_info *conn, rtsp_message *req, rtsp_m char *ct = msg_get_header(req, "Content-Type"); if (ct) { - // debug(2, "SET_PARAMETER Content-Type:\"%s\".", ct); +// debug(2, "SET_PARAMETER Content-Type:\"%s\".", ct); #ifdef CONFIG_METADATA // It seems that the rtptime of the message is used as a kind of an ID that @@ -1449,59 +1667,93 @@ static void handle_set_parameter(rtsp_conn_info *conn, rtsp_message *req, rtsp_m } static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) { - debug(2, "Connection %d: ANNOUNCE", conn->connection_number); + debug(3, "Connection %d: ANNOUNCE", conn->connection_number); + int have_the_player = 0; + int should_wait = 0; // this will be true if you're trying to break in to the current session + int interrupting_current_session = 0; + + // try to become the current playing_conn - // interrupt session if permitted - if (pthread_mutex_trylock(&play_lock) == 0) { + debug_mutex_lock(&playing_conn_lock, 1000000, 3); // get it + + if (playing_conn == NULL) { + playing_conn = conn; have_the_player = 1; - } else if ((playing_conn) && - (playing_conn->connection_number == conn->connection_number)) { // duplicate ANNOUNCE - warn("Duplicate ANNOUNCE, by the look of it!"); + } else if (playing_conn == conn) { have_the_player = 1; - } else { - int should_wait = 0; - - if (!playing_conn) - die("Non existent playing_conn with play_lock enabled."); - debug(1, "RTSP Conversation thread %d already playing when asked by thread %d.", - playing_conn->connection_number, conn->connection_number); - if (playing_conn->stop) { - debug(1, "Playing connection is already shutting down; waiting for it..."); - should_wait = 1; - } else if (config.allow_session_interruption == 1) { - // some other thread has the player ... ask it to relinquish the thread - debug(1, "ANNOUNCE: playing connection %d being interrupted by connection %d.", - playing_conn->connection_number, conn->connection_number); - if (playing_conn == conn) { - debug(1, "ANNOUNCE asking to stop itself."); - } else { - playing_conn->stop = 1; - memory_barrier(); - pthread_kill(playing_conn->thread, SIGUSR1); - should_wait = 1; + warn("Duplicate ANNOUNCE, by the look of it!"); + } else if (playing_conn->stop) { + debug(1, "Connection %d ANNOUNCE is waiting for connection %d to shut down.", + conn->connection_number, playing_conn->connection_number); + should_wait = 1; + } else if (config.allow_session_interruption == 1) { + debug(2, "Connection %d: ANNOUNCE: asking playing connection %d to shut down.", + conn->connection_number, playing_conn->connection_number); + playing_conn->stop = 1; + interrupting_current_session = 1; + should_wait = 1; + pthread_cancel(playing_conn->thread); // asking the RTSP thread to exit + } + debug_mutex_unlock(&playing_conn_lock, 3); + + if (should_wait) { + int time_remaining = 3000000; // must be signed, as it could go negative... + + while ((time_remaining > 0) && (have_the_player == 0)) { + debug_mutex_lock(&playing_conn_lock, 1000000, 3); // get it + if (playing_conn == NULL) { + playing_conn = conn; + have_the_player = 1; + } + debug_mutex_unlock(&playing_conn_lock, 3); + + if (have_the_player == 0) { + usleep(100000); + time_remaining -= 100000; } } - if (should_wait) { - usleep(1000000); // here, it is possible for other connections to come in and nab the player. - debug(1, "Try to get the player now"); + if ((have_the_player == 1) && (interrupting_current_session == 1)) { + debug(2, "Connection %d: ANNOUNCE got the player", conn->connection_number); + } else { + debug(2, "Connection %d: ANNOUNCE failed to get the player", conn->connection_number); } - if (pthread_mutex_trylock(&play_lock) == 0) - have_the_player = 1; - else - debug(1, "ANNOUNCE failed to get the player"); } if (have_the_player) { - playing_conn = conn; // the present connection is now playing - debug(3, "RTSP conversation thread %d has acquired play lock.", conn->connection_number); + debug(3, "Connection %d: ANNOUNCE has acquired play lock.", conn->connection_number); + + // now, if this new session did not break in, then it's okay to reset the next UDP ports + // to the start of the range + + if (interrupting_current_session == 0) { // will be zero if it wasn't waiting to break in + resetFreeUDPPort(); + } + + /* + { + char *cp = req->content; + int cp_left = req->contentlength; + while (cp_left > 1) { + if (strlen(cp) != 0) + debug(1,">>>>>> %s", cp); + cp += strlen(cp) + 1; + cp_left -= strlen(cp) + 1; + } + } +*/ + + conn->stream.type = ast_unknown; resp->respcode = 456; // 456 - Header Field Not Valid for Resource + char *pssid = NULL; char *paesiv = NULL; char *prsaaeskey = NULL; char *pfmtp = NULL; char *pminlatency = NULL; char *pmaxlatency = NULL; + // char *pAudioMediaInfo = NULL; + char *pUncompressedCDAudio = NULL; char *cp = req->content; int cp_left = req->contentlength; char *next; @@ -1509,24 +1761,67 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag next = nextline(cp, cp_left); cp_left -= next - cp; - if (!strncmp(cp, "a=fmtp:", 7)) - pfmtp = cp + 7; + if (!strncmp(cp, "a=rtpmap:96 L16/44100/2", strlen("a=rtpmap:96 L16/44100/2"))) + pUncompressedCDAudio = cp + strlen("a=rtpmap:96 L16/44100/2"); + + // if (!strncmp(cp, "m=audio", strlen("m=audio"))) + // pAudioMediaInfo = cp + strlen("m=audio"); + + if (!strncmp(cp, "o=iTunes", strlen("o=iTunes"))) + pssid = cp + strlen("o=iTunes"); + + if (!strncmp(cp, "a=fmtp:", strlen("a=fmtp:"))) + pfmtp = cp + strlen("a=fmtp:"); - if (!strncmp(cp, "a=aesiv:", 8)) - paesiv = cp + 8; + if (!strncmp(cp, "a=aesiv:", strlen("a=aesiv:"))) + paesiv = cp + strlen("a=aesiv:"); - if (!strncmp(cp, "a=rsaaeskey:", 12)) - prsaaeskey = cp + 12; + if (!strncmp(cp, "a=rsaaeskey:", strlen("a=rsaaeskey:"))) + prsaaeskey = cp + strlen("a=rsaaeskey:"); - if (!strncmp(cp, "a=min-latency:", 14)) - pminlatency = cp + 14; + if (!strncmp(cp, "a=min-latency:", strlen("a=min-latency:"))) + pminlatency = cp + strlen("a=min-latency:"); - if (!strncmp(cp, "a=max-latency:", 14)) - pmaxlatency = cp + 14; + if (!strncmp(cp, "a=max-latency:", strlen("a=max-latency:"))) + pmaxlatency = cp + strlen("a=max-latency:"); cp = next; } + if (pUncompressedCDAudio) { + debug(2, "An uncompressed PCM stream has been detected."); + conn->stream.type = ast_uncompressed; + conn->max_frames_per_packet = 352; // number of audio frames per packet. + conn->input_rate = 44100; + conn->input_num_channels = 2; + conn->input_bit_depth = 16; + conn->input_bytes_per_frame = conn->input_num_channels * ((conn->input_bit_depth + 7) / 8); + + /* + int y = strlen(pAudioMediaInfo); + if (y > 0) { + char obf[4096]; + if (y > 4096) + y = 4096; + char *p = pAudioMediaInfo; + char *obfp = obf; + int obfc; + for (obfc = 0; obfc < y; obfc++) { + snprintf(obfp, 3, "%02X", (unsigned int)*p); + p++; + obfp += 2; + }; + *obfp = 0; + debug(1, "AudioMediaInfo: \"%s\".", obf); + } + */ + } + + if (pssid) { + uint32_t ssid = uatoi(pssid); + debug(3, "Synchronisation Source Identifier: %08X,%u", ssid, ssid); + } + if (pminlatency) { conn->minimum_latency = atoi(pminlatency); debug(3, "Minimum latency %d specified", conn->minimum_latency); @@ -1545,22 +1840,6 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag // debug(1,"Encrypted session requested"); } - if (!pfmtp) { - warn("FMTP params missing from the following ANNOUNCE message:"); - // print each line of the request content - // the problem is that nextline has replace all returns, newlines, etc. by - // NULLs - char *cp = req->content; - int cp_left = req->contentlength; - while (cp_left > 1) { - if (strlen(cp) != 0) - warn(" %s", cp); - cp += strlen(cp) + 1; - cp_left -= strlen(cp) + 1; - } - goto out; - } - if (conn->stream.encrypted) { int len, keylen; uint8_t *aesiv = base64_dec(paesiv, &len); @@ -1583,12 +1862,41 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag memcpy(conn->stream.aeskey, aeskey, 16); free(aeskey); } - unsigned int i; - for (i = 0; i < sizeof(conn->stream.fmtp) / sizeof(conn->stream.fmtp[0]); i++) - conn->stream.fmtp[i] = atoi(strsep(&pfmtp, " \t")); - // here we should check the sanity ot the fmtp values - // for (i = 0; i < sizeof(conn->stream.fmtp) / sizeof(conn->stream.fmtp[0]); i++) - // debug(1," fmtp[%2d] is: %10d",i,conn->stream.fmtp[i]); + + if (pfmtp) { + conn->stream.type = ast_apple_lossless; + debug(3, "An ALAC stream has been detected."); + unsigned int i; + for (i = 0; i < sizeof(conn->stream.fmtp) / sizeof(conn->stream.fmtp[0]); i++) + conn->stream.fmtp[i] = atoi(strsep(&pfmtp, " \t")); + // here we should check the sanity ot the fmtp values + // for (i = 0; i < sizeof(conn->stream.fmtp) / sizeof(conn->stream.fmtp[0]); i++) + // debug(1," fmtp[%2d] is: %10d",i,conn->stream.fmtp[i]); + + // set the parameters of the player (as distinct from the parameters of the decoder -- that's + // done later). + conn->max_frames_per_packet = conn->stream.fmtp[1]; // number of audio frames per packet. + conn->input_rate = conn->stream.fmtp[11]; + conn->input_num_channels = conn->stream.fmtp[7]; + conn->input_bit_depth = conn->stream.fmtp[3]; + conn->input_bytes_per_frame = conn->input_num_channels * ((conn->input_bit_depth + 7) / 8); + } + + if (conn->stream.type == ast_unknown) { + warn("Can not process the following ANNOUNCE message:"); + // print each line of the request content + // the problem is that nextline has replace all returns, newlines, etc. by + // NULLs + char *cp = req->content; + int cp_left = req->contentlength; + while (cp_left > 1) { + if (strlen(cp) != 0) + warn(" %s", cp); + cp += strlen(cp) + 1; + cp_left -= strlen(cp) + 1; + } + goto out; + } char *hdr = msg_get_header(req, "X-Apple-Client-Name"); if (hdr) { @@ -1621,15 +1929,18 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag resp->respcode = 200; } else { resp->respcode = 453; - debug(1, "Already playing."); + debug(1, "Connection %d: ANNOUNCE failed because another connection is already playing.", + conn->connection_number); } out: if (resp->respcode != 200 && resp->respcode != 453) { - debug(1, "Error in handling ANNOUNCE on conversation thread %d. Unlocking the play lock.", + debug(1, "Connection %d: Error in handling ANNOUNCE. Unlocking the play lock.", conn->connection_number); - playing_conn = NULL; - pthread_mutex_unlock(&play_lock); + debug_mutex_lock(&playing_conn_lock, 1000000, 3); // get it + if (playing_conn == conn) // if we managed to acquire it + playing_conn = NULL; // let it go + debug_mutex_unlock(&playing_conn_lock, 3); } } @@ -1702,21 +2013,21 @@ static void apple_challenge(int fd, rtsp_message *req, rtsp_message *resp) { if (padding) *padding = 0; - msg_add_header(resp, "Apple-Response", encoded); + msg_add_header(resp, "Apple-Response", encoded); // will be freed when the response is freed. free(challresp); free(encoded); } static char *make_nonce(void) { uint8_t random[8]; - int fd = open("/dev/random", O_RDONLY); + int fd = open("/dev/urandom", O_RDONLY); if (fd < 0) - die("could not open /dev/random!"); + die("could not open /dev/urandom!"); // int ignore = if (read(fd, random, sizeof(random)) != sizeof(random)) - debug(1, "Error reading /dev/random"); + debug(1, "Error reading /dev/urandom"); close(fd); - return base64_enc(random, 8); + return base64_enc(random, 8); // returns a pointer to malloc'ed memory } static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) { @@ -1760,9 +2071,11 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) { uint8_t digest_urp[16], digest_mu[16], digest_total[16]; -#ifdef HAVE_LIBSSL +#ifdef CONFIG_OPENSSL MD5_CTX ctx; + int oldState; + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState); MD5_Init(&ctx); MD5_Update(&ctx, username, strlen(username)); MD5_Update(&ctx, ":", 1); @@ -1775,9 +2088,25 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) { MD5_Update(&ctx, ":", 1); MD5_Update(&ctx, uri, strlen(uri)); MD5_Final(digest_mu, &ctx); + pthread_setcancelstate(oldState, NULL); #endif -#ifdef HAVE_LIBMBEDTLS +#ifdef CONFIG_MBEDTLS +#if MBEDTLS_VERSION_MINOR >= 7 + mbedtls_md5_context tctx; + mbedtls_md5_starts_ret(&tctx); + mbedtls_md5_update_ret(&tctx, (const unsigned char *)username, strlen(username)); + mbedtls_md5_update_ret(&tctx, (unsigned char *)":", 1); + mbedtls_md5_update_ret(&tctx, (const unsigned char *)realm, strlen(realm)); + mbedtls_md5_update_ret(&tctx, (unsigned char *)":", 1); + mbedtls_md5_update_ret(&tctx, (const unsigned char *)config.password, strlen(config.password)); + mbedtls_md5_finish_ret(&tctx, digest_urp); + mbedtls_md5_starts_ret(&tctx); + mbedtls_md5_update_ret(&tctx, (const unsigned char *)req->method, strlen(req->method)); + mbedtls_md5_update_ret(&tctx, (unsigned char *)":", 1); + mbedtls_md5_update_ret(&tctx, (const unsigned char *)uri, strlen(uri)); + mbedtls_md5_finish_ret(&tctx, digest_mu); +#else mbedtls_md5_context tctx; mbedtls_md5_starts(&tctx); mbedtls_md5_update(&tctx, (const unsigned char *)username, strlen(username)); @@ -1792,8 +2121,9 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) { mbedtls_md5_update(&tctx, (const unsigned char *)uri, strlen(uri)); mbedtls_md5_finish(&tctx, digest_mu); #endif +#endif -#ifdef HAVE_LIBPOLARSSL +#ifdef CONFIG_POLARSSL md5_context tctx; md5_starts(&tctx); md5_update(&tctx, (const unsigned char *)username, strlen(username)); @@ -1814,7 +2144,8 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) { for (i = 0; i < 16; i++) snprintf((char *)buf + 2 * i, 3, "%02x", digest_urp[i]); -#ifdef HAVE_LIBSSL +#ifdef CONFIG_OPENSSL + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState); MD5_Init(&ctx); MD5_Update(&ctx, buf, 32); MD5_Update(&ctx, ":", 1); @@ -1824,9 +2155,21 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) { snprintf((char *)buf + 2 * i, 3, "%02x", digest_mu[i]); MD5_Update(&ctx, buf, 32); MD5_Final(digest_total, &ctx); + pthread_setcancelstate(oldState, NULL); #endif -#ifdef HAVE_LIBMBEDTLS +#ifdef CONFIG_MBEDTLS +#if MBEDTLS_VERSION_MINOR >= 7 + mbedtls_md5_starts_ret(&tctx); + mbedtls_md5_update_ret(&tctx, buf, 32); + mbedtls_md5_update_ret(&tctx, (unsigned char *)":", 1); + mbedtls_md5_update_ret(&tctx, (const unsigned char *)*nonce, strlen(*nonce)); + mbedtls_md5_update_ret(&tctx, (unsigned char *)":", 1); + for (i = 0; i < 16; i++) + snprintf((char *)buf + 2 * i, 3, "%02x", digest_mu[i]); + mbedtls_md5_update_ret(&tctx, buf, 32); + mbedtls_md5_finish_ret(&tctx, digest_total); +#else mbedtls_md5_starts(&tctx); mbedtls_md5_update(&tctx, buf, 32); mbedtls_md5_update(&tctx, (unsigned char *)":", 1); @@ -1837,8 +2180,9 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) { mbedtls_md5_update(&tctx, buf, 32); mbedtls_md5_finish(&tctx, digest_total); #endif +#endif -#ifdef HAVE_LIBPOLARSSL +#ifdef CONFIG_POLARSSL md5_starts(&tctx); md5_update(&tctx, buf, 32); md5_update(&tctx, (unsigned char *)":", 1); @@ -1867,37 +2211,138 @@ authenticate: return 1; } +void rtsp_conversation_thread_cleanup_function(void *arg) { + rtsp_conn_info *conn = (rtsp_conn_info *)arg; + int oldState; + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState); + + debug(3, "Connection %d: rtsp_conversation_thread_func_cleanup_function called.", + conn->connection_number); + if (conn->player_thread) + player_stop(conn); + + debug(3, "Closing timing, control and audio sockets..."); + if (conn->control_socket) + close(conn->control_socket); + if (conn->timing_socket) + close(conn->timing_socket); + if (conn->audio_socket) + close(conn->audio_socket); + + if (conn->fd > 0) { + debug(3, "Connection %d: closing fd %d.", conn->connection_number, conn->fd); + close(conn->fd); + debug(3, "Connection %d: closed fd %d.", conn->connection_number, conn->fd); + } + if (conn->auth_nonce) { + free(conn->auth_nonce); + conn->auth_nonce = NULL; + } + rtp_terminate(conn); + + if (conn->dacp_id) { + free(conn->dacp_id); + conn->dacp_id = NULL; + } + + // remove flow control and mutexes + int rc = pthread_mutex_destroy(&conn->volume_control_mutex); + if (rc) + debug(1, "Connection %d: error %d destroying volume_control_mutex.", conn->connection_number, + rc); + rc = pthread_cond_destroy(&conn->flowcontrol); + if (rc) + debug(1, "Connection %d: error %d destroying flow control condition variable.", + conn->connection_number, rc); + rc = pthread_mutex_destroy(&conn->ab_mutex); + if (rc) + debug(1, "Connection %d: error %d destroying ab_mutex.", conn->connection_number, rc); + rc = pthread_mutex_destroy(&conn->flush_mutex); + if (rc) + debug(1, "Connection %d: error %d destroying flush_mutex.", conn->connection_number, rc); + + debug(3, "Cancel watchdog thread."); + pthread_cancel(conn->player_watchdog_thread); + debug(3, "Join watchdog thread."); + pthread_join(conn->player_watchdog_thread, NULL); + debug(3, "Delete watchdog mutex."); + pthread_mutex_destroy(&conn->watchdog_mutex); + + debug(3, "Connection %d: Checking play lock.", conn->connection_number); + debug_mutex_lock(&playing_conn_lock, 1000000, 3); // get it + if (playing_conn == conn) { // if it's ours + debug(3, "Connection %d: Unlocking play lock.", conn->connection_number); + playing_conn = NULL; // let it go + } + debug_mutex_unlock(&playing_conn_lock, 3); + + debug(2, "Connection %d: terminated.", conn->connection_number); + conn->running = 0; + pthread_setcancelstate(oldState, NULL); +} + +void msg_cleanup_function(void *arg) { + // debug(3, "msg_cleanup_function called."); + msg_free((rtsp_message **)arg); +} + static void *rtsp_conversation_thread_func(void *pconn) { rtsp_conn_info *conn = pconn; - // create the player thread lock. - int rwli = pthread_rwlock_init(&conn->player_thread_lock, NULL); - if (rwli != 0) - die("Error %d initialising player_thread_lock for conversation thread %d.", rwli, - conn->connection_number); + // create the watchdog mutex, initialise the watchdog time and start the watchdog thread; + conn->watchdog_bark_time = get_absolute_time_in_fp(); + pthread_mutex_init(&conn->watchdog_mutex, NULL); + pthread_create(&conn->player_watchdog_thread, NULL, &player_watchdog_thread_code, (void *)conn); + + int rc = pthread_mutex_init(&conn->flush_mutex, NULL); + if (rc) + die("Connection %d: error %d initialising flush_mutex.", conn->connection_number, rc); + rc = pthread_mutex_init(&conn->ab_mutex, NULL); + if (rc) + die("Connection %d: error %d initialising ab_mutex.", conn->connection_number, rc); +// set the flowcontrol condition variable to wait on a monotonic clock +#ifdef COMPILE_FOR_LINUX_AND_FREEBSD_AND_CYGWIN_AND_OPENBSD + pthread_condattr_t attr; + pthread_condattr_init(&attr); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); // can't do this in OS X, and don't need it. + rc = pthread_cond_init(&conn->flowcontrol, &attr); +#endif +#ifdef COMPILE_FOR_OSX + rc = pthread_cond_init(&conn->flowcontrol, NULL); +#endif + if (rc) + die("Connection %d: error %d initialising flow control condition variable.", + conn->connection_number, rc); + rc = pthread_mutex_init(&conn->volume_control_mutex, NULL); + if (rc) + die("Connection %d: error %d initialising volume_control_mutex.", conn->connection_number, rc); - rtp_initialise(conn); + // nothing before this is cancellable + pthread_cleanup_push(rtsp_conversation_thread_cleanup_function, (void *)conn); - rtsp_message *req, *resp; - char *hdr, *auth_nonce = NULL; + rtp_initialise(conn); + char *hdr = NULL; enum rtsp_read_request_response reply; int rtsp_read_request_attempt_count = 1; // 1 means exit immediately + rtsp_message *req, *resp; while (conn->stop == 0) { int debug_level = 3; // for printing the request and response reply = rtsp_read_request(conn, &req); if (reply == rtsp_read_request_response_ok) { + pthread_cleanup_push(msg_cleanup_function, (void *)&req); + resp = msg_init(); + pthread_cleanup_push(msg_cleanup_function, (void *)&resp); + resp->respcode = 400; + if (strcmp(req->method, "OPTIONS") != 0) // the options message is very common, so don't log it until level 3 debug_level = 2; - debug(debug_level, - "RTSP thread %d received an RTSP Packet of type \"%s\":", conn->connection_number, - req->method), + debug(debug_level, "Connection %d: Received an RTSP Packet of type \"%s\":", + conn->connection_number, req->method), debug_print_msg_headers(debug_level, req); - resp = msg_init(); - resp->respcode = 400; apple_challenge(conn->fd, req, resp); hdr = msg_get_header(req, "CSeq"); @@ -1906,7 +2351,7 @@ static void *rtsp_conversation_thread_func(void *pconn) { // msg_add_header(resp, "Audio-Jack-Status", "connected; type=analog"); msg_add_header(resp, "Server", "AirTunes/105.1"); - if ((conn->authorized == 1) || (rtsp_auth(&auth_nonce, req, resp)) == 0) { + if ((conn->authorized == 1) || (rtsp_auth(&conn->auth_nonce, req, resp)) == 0) { conn->authorized = 1; // it must have been authorized or didn't need a password struct method_handler *mh; int method_selected = 0; @@ -1917,12 +2362,32 @@ static void *rtsp_conversation_thread_func(void *pconn) { break; } } - if (method_selected == 0) - debug(1, "RTSP thread %d: Unrecognised and unhandled rtsp request \"%s\".", + if (method_selected == 0) { + debug(3, "Connection %d: Unrecognised and unhandled rtsp request \"%s\".", conn->connection_number, req->method); + + int y = req->contentlength; + if (y > 0) { + char obf[4096]; + if (y > 4096) + y = 4096; + char *p = req->content; + char *obfp = obf; + int obfc; + for (obfc = 0; obfc < y; obfc++) { + snprintf(obfp, 3, "%02X", (unsigned int)*p); + p++; + obfp += 2; + }; + *obfp = 0; + debug(3, "Content: \"%s\".", obf); + } + } } - debug(debug_level, "RTSP thread %d: RTSP Response:", conn->connection_number); + debug(debug_level, "Connection %d: RTSP Response:", conn->connection_number); debug_print_msg_headers(debug_level, resp); + + /* fd_set writefds; FD_ZERO(&writefds); FD_SET(conn->fd, &writefds); @@ -1930,11 +2395,27 @@ static void *rtsp_conversation_thread_func(void *pconn) { memory_barrier(); } while (conn->stop == 0 && pselect(conn->fd + 1, NULL, &writefds, NULL, NULL, &pselect_sigset) <= 0); + */ + if (conn->stop == 0) { - msg_write_response(conn->fd, resp); + int err = msg_write_response(conn->fd, resp); + if (err) { + debug(1, "Connection %d: Unable to write an RTSP message response. Terminating the " + "connection.", + conn->connection_number); + struct linger so_linger; + so_linger.l_onoff = 1; // "true" + so_linger.l_linger = 0; + err = setsockopt(conn->fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger); + if (err) + debug(1, "Could not set the RTSP socket to abort due to a write error on closing."); + conn->stop = 1; + // if (debuglev >= 1) + // debuglev = 3; // see what happens next + } } - msg_free(req); - msg_free(resp); + pthread_cleanup_pop(1); + pthread_cleanup_pop(1); } else { int tstop = 0; if (reply == rtsp_read_request_response_immediate_shutdown_requested) @@ -1943,58 +2424,41 @@ static void *rtsp_conversation_thread_func(void *pconn) { (reply == rtsp_read_request_response_read_error)) { if (conn->player_thread) { rtsp_read_request_attempt_count--; - if (rtsp_read_request_attempt_count == 0) + if (rtsp_read_request_attempt_count == 0) { tstop = 1; - else { + if (reply == rtsp_read_request_response_read_error) { + struct linger so_linger; + so_linger.l_onoff = 1; // "true" + so_linger.l_linger = 0; + int err = setsockopt(conn->fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger); + if (err) + debug(1, "Could not set the RTSP socket to abort due to a read error on closing."); + } + // debuglev = 3; // see what happens next + } else { if (reply == rtsp_read_request_response_channel_closed) - debug(2, "RTSP channel unexpectedly closed -- will try again %d time(s).", - rtsp_read_request_attempt_count); + debug(2, + "Connection %d: RTSP channel unexpectedly closed -- will try again %d time(s).", + conn->connection_number, rtsp_read_request_attempt_count); if (reply == rtsp_read_request_response_read_error) - debug(2, "RTSP channel read error -- will try again %d time(s).", - rtsp_read_request_attempt_count); + debug(2, "Connection %d: RTSP channel read error -- will try again %d time(s).", + conn->connection_number, rtsp_read_request_attempt_count); usleep(20000); } } else { tstop = 1; } } else { - debug(1, "rtsp_read_request error %d, packet ignored.", (int)reply); + debug(1, "Connection %d: rtsp_read_request error %d, packet ignored.", + conn->connection_number, (int)reply); } if (tstop) { - debug(3, "Synchronously terminate playing thread of RTSP conversation thread %d.", - conn->connection_number); - - if (conn->player_thread) - debug(1, "RTSP Channel unexpectedly closed or a serious error occured -- closing the " - "player thread."); - player_stop(conn); - debug(3, "Successful termination of playing thread of RTSP conversation thread %d.", - conn->connection_number); - debug(3, "Request termination of RTSP conversation thread %d.", conn->connection_number); + debug(3, "Connection %d: Terminate RTSP connection.", conn->connection_number); conn->stop = 1; } } } - - if (conn->fd > 0) - close(conn->fd); - if (auth_nonce) - free(auth_nonce); - rtp_terminate(conn); - if (playing_conn == conn) { - debug(3, "Unlocking play lock on RTSP conversation thread %d.", conn->connection_number); - playing_conn = NULL; - pthread_mutex_unlock(&play_lock); - } - debug(2, "Connection %d: RTSP thread terminated.", conn->connection_number); - conn->running = 0; - - // release the player_thread_lock - int rwld = pthread_rwlock_destroy(&conn->player_thread_lock); - if (rwld) - debug(1, "Error %d destroying player_thread_lock for conversation thread %d.", rwld, - conn->connection_number); - + pthread_cleanup_pop(1); pthread_exit(NULL); } @@ -2017,7 +2481,21 @@ static const char *format_address(struct sockaddr *fsa) { } */ +void rtsp_listen_loop_cleanup_handler(__attribute__((unused)) void *arg) { + int oldState; + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState); + debug(1, "rtsp_listen_loop_cleanup_handler called."); + cancel_all_RTSP_threads(); + int *sockfd = (int *)arg; + mdns_unregister(); + if (sockfd) + free(sockfd); + pthread_setcancelstate(oldState, NULL); +} + void rtsp_listen_loop(void) { + int oldState; + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState); struct addrinfo hints, *info, *p; char portstr[6]; int *sockfd = NULL; @@ -2059,6 +2537,18 @@ void rtsp_listen_loop(void) { fcntl(fd, F_SETFD, FD_CLOEXEC); ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + struct timeval tv; + tv.tv_sec = 3; // three seconds write timeout + tv.tv_usec = 0; + if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (const char *)&tv, sizeof tv) == -1) + debug(1, "Error %d setting send timeout for rtsp writeback.", errno); + + if ((config.dont_check_timeout == 0) && (config.timeout != 0)) { + tv.tv_sec = config.timeout; // 120 seconds read timeout by default. + tv.tv_usec = 0; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof tv) == -1) + debug(1, "Error %d setting read timeout for rtsp connection.", errno); + } #ifdef IPV6_V6ONLY // some systems don't support v4 access on v6 sockets, but some do. // since we need to account for two sockets we might as well @@ -2110,12 +2600,12 @@ void rtsp_listen_loop(void) { mdns_register(); - // printf("Listening for connections."); - // shairport_startup_complete(); - + pthread_setcancelstate(oldState, NULL); int acceptfd; struct timeval tv; + pthread_cleanup_push(rtsp_listen_loop_cleanup_handler, (void *)sockfd); do { + pthread_testcancel(); tv.tv_sec = 60; tv.tv_usec = 0; @@ -2150,7 +2640,8 @@ void rtsp_listen_loop(void) { conn->fd = accept(acceptfd, (struct sockaddr *)&conn->remote, &slen); if (conn->fd < 0) { - debug(1, "New RTSP connection on port %d not accepted:", config.port); + debug(1, "Connection %d: New connection on port %d not accepted:", conn->connection_number, + config.port); perror("failed to accept connection"); free(conn); } else { @@ -2169,8 +2660,8 @@ void rtsp_listen_loop(void) { sa = (struct sockaddr_in *)&conn->remote; inet_ntop(AF_INET, &(sa->sin_addr), remote_ip4, INET_ADDRSTRLEN); unsigned short int rport = ntohs(sa->sin_port); - debug(2, "New RTSP connection from %s:%u to self at %s:%u on conversation thread %d.", - remote_ip4, rport, ip4, tport, conn->connection_number); + debug(2, "Connection %d: new connection from %s:%u to self at %s:%u.", + conn->connection_number, remote_ip4, rport, ip4, tport); } #ifdef AF_INET6 if (local_info->SAFAMILY == AF_INET6) { @@ -2186,8 +2677,8 @@ void rtsp_listen_loop(void) { sa6 = (struct sockaddr_in6 *)&conn->remote; // pretend this is loaded with something inet_ntop(AF_INET6, &(sa6->sin6_addr), remote_ip6, INET6_ADDRSTRLEN); u_int16_t rport = ntohs(sa6->sin6_port); - debug(2, "New RTSP connection from [%s]:%u to self at [%s]:%u on conversation thread %d.", - remote_ip6, rport, ip6, tport, conn->connection_number); + debug(2, "Connection %d: new connection from [%s]:%u to self at [%s]:%u.", + conn->connection_number, remote_ip6, rport, ip6, tport); } #endif @@ -2199,23 +2690,22 @@ void rtsp_listen_loop(void) { // conn->thread = rtsp_conversation_thread; // conn->stop = 0; // record's memory has been zeroed // conn->authorized = 0; // record's memory has been zeroed - fcntl(conn->fd, F_SETFL, O_NONBLOCK); + // fcntl(conn->fd, F_SETFL, O_NONBLOCK); ret = pthread_create(&conn->thread, NULL, rtsp_conversation_thread_func, conn); // also acts as a memory barrier - if (ret) - die("Failed to create RTSP receiver thread %d!", conn->connection_number); + if (ret) { + char errorstring[1024]; + strerror_r(ret, (char *)errorstring, sizeof(errorstring)); + die("Connection %d: cannot create an RTSP conversation thread. Error %d: \"%s\".", + conn->connection_number, ret, (char *)errorstring); + } debug(3, "Successfully created RTSP receiver thread %d.", conn->connection_number); conn->running = 1; // this must happen before the thread is tracked track_thread(conn); } } while (1); - mdns_unregister(); - - if (sockfd) - free(sockfd); - - // perror("select"); - // die("fell out of the RTSP select loop"); + pthread_cleanup_pop(1); // should never happen + debug(1, "Oops -- fell out of the RTSP select loop"); } |