summaryrefslogtreecommitdiff
path: root/rtsp.c
diff options
context:
space:
mode:
Diffstat (limited to 'rtsp.c')
-rw-r--r--rtsp.c1360
1 files changed, 925 insertions, 435 deletions
diff --git a/rtsp.c b/rtsp.c
index 0267e7f..20fc6b8 100644
--- a/rtsp.c
+++ b/rtsp.c
@@ -3,7 +3,7 @@
* Copyright (c) James Laird 2013
* Modifications associated with audio synchronization, mutithreading and
- * metadata handling copyright (c) Mike Brady 2014-2018
+ * metadata handling copyright (c) Mike Brady 2014-2019
* All rights reserved.
*
* Permission is hereby granted, free of charge, to any person
@@ -46,15 +46,16 @@
#include "config.h"
-#ifdef HAVE_LIBSSL
+#ifdef CONFIG_OPENSSL
#include <openssl/md5.h>
#endif
-#ifdef HAVE_LIBMBEDTLS
+#ifdef CONFIG_MBEDTLS
#include <mbedtls/md5.h>
+#include <mbedtls/version.h>
#endif
-#ifdef HAVE_LIBPOLARSSL
+#ifdef CONFIG_POLARSSL
#include <polarssl/md5.h>
#endif
@@ -63,10 +64,14 @@
#include "rtp.h"
#include "rtsp.h"
-#ifdef HAVE_METADATA_HUB
+#ifdef CONFIG_METADATA_HUB
#include "metadata_hub.h"
#endif
+#ifdef CONFIG_MQTT
+#include "mqtt.h"
+#endif
+
#ifdef AF_INET6
#define INETx_ADDRSTRLEN INET6_ADDRSTRLEN
#else
@@ -85,7 +90,11 @@ enum rtsp_read_request_response {
};
// Mike Brady's part...
-static pthread_mutex_t play_lock = PTHREAD_MUTEX_INITIALIZER;
+
+int metadata_running = 0;
+
+// always lock use this when accessing the playing conn value
+static pthread_mutex_t playing_conn_lock = PTHREAD_MUTEX_INITIALIZER;
// every time we want to retain or release a reference count, lock it with this
// if a reference count is read as zero, it means the it's being deallocated.
@@ -97,8 +106,6 @@ static pthread_mutex_t reference_counter_lock = PTHREAD_MUTEX_INITIALIZER;
// static int please_shutdown = 0;
// static pthread_t playing_thread = 0;
-static rtsp_conn_info **conns = NULL;
-
int RTSP_connection_index = 1;
#ifdef CONFIG_METADATA
@@ -115,7 +122,10 @@ typedef struct {
} pc_queue; // producer-consumer queue
#endif
+static int msg_indexes = 1;
+
typedef struct {
+ int index_number;
uint32_t referenceCount; // we might start using this...
unsigned int nheaders;
char *name[16];
@@ -152,6 +162,12 @@ void pc_queue_init(pc_queue *the_queue, char *items, size_t item_size, uint32_t
the_queue->eoq = 0;
}
+void pc_queue_delete(pc_queue *the_queue) {
+ pthread_cond_destroy(&the_queue->pc_queue_item_removed_signal);
+ pthread_cond_destroy(&the_queue->pc_queue_item_added_signal);
+ pthread_mutex_destroy(&the_queue->pc_queue_lock);
+}
+
int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier,
int block);
@@ -159,17 +175,26 @@ int send_ssnc_metadata(uint32_t code, char *data, uint32_t length, int block) {
return send_metadata('ssnc', code, data, length, NULL, block);
}
+void pc_queue_cleanup_handler(void *arg) {
+ // debug(1, "pc_queue_cleanup_handler called.");
+ pc_queue *the_queue = (pc_queue *)arg;
+ int rc = pthread_mutex_unlock(&the_queue->pc_queue_lock);
+ if (rc)
+ debug(1, "Error unlocking for pc_queue_add_item or pc_queue_get_item.");
+}
+
int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) {
int rc;
if (the_queue) {
if (block == 0) {
- rc = pthread_mutex_trylock(&the_queue->pc_queue_lock);
+ rc = debug_mutex_lock(&the_queue->pc_queue_lock, 10000, 2);
if (rc == EBUSY)
return EBUSY;
} else
rc = pthread_mutex_lock(&the_queue->pc_queue_lock);
if (rc)
debug(1, "Error locking for pc_queue_add_item");
+ pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue);
while (the_queue->count == the_queue->capacity) {
rc = pthread_cond_wait(&the_queue->pc_queue_item_removed_signal, &the_queue->pc_queue_lock);
if (rc)
@@ -192,9 +217,7 @@ int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) {
rc = pthread_cond_signal(&the_queue->pc_queue_item_added_signal);
if (rc)
debug(1, "Error signalling after pc_queue_add_item");
- rc = pthread_mutex_unlock(&the_queue->pc_queue_lock);
- if (rc)
- debug(1, "Error unlocking for pc_queue_add_item");
+ pthread_cleanup_pop(1); // unlock the queue lock.
} else {
debug(1, "Adding an item to a NULL queue");
}
@@ -207,6 +230,7 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) {
rc = pthread_mutex_lock(&the_queue->pc_queue_lock);
if (rc)
debug(1, "Error locking for pc_queue_get_item");
+ pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue);
while (the_queue->count == 0) {
rc = pthread_cond_wait(&the_queue->pc_queue_item_added_signal, &the_queue->pc_queue_lock);
if (rc)
@@ -227,9 +251,7 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) {
rc = pthread_cond_signal(&the_queue->pc_queue_item_removed_signal);
if (rc)
debug(1, "Error signalling after pc_queue_removed_item");
- rc = pthread_mutex_unlock(&the_queue->pc_queue_lock);
- if (rc)
- debug(1, "Error unlocking for pc_queue_get_item");
+ pthread_cleanup_pop(1); // unlock the queue lock.
} else {
debug(1, "Removing an item from a NULL queue");
}
@@ -238,6 +260,60 @@ int pc_queue_get_item(pc_queue *the_queue, void *the_stuff) {
#endif
+int have_player(rtsp_conn_info *conn) {
+ int response = 0;
+ debug_mutex_lock(&playing_conn_lock, 1000000, 3);
+ if (playing_conn == conn) // this connection definitely has the play lock
+ response = 1;
+ debug_mutex_unlock(&playing_conn_lock, 3);
+ return response;
+}
+
+void player_watchdog_thread_cleanup_handler(void *arg) {
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ debug(3, "Connection %d: Watchdog Exit.", conn->connection_number);
+}
+
+void *player_watchdog_thread_code(void *arg) {
+ pthread_cleanup_push(player_watchdog_thread_cleanup_handler, arg);
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ do {
+ usleep(2000000); // check every two seconds
+ // debug(3, "Connection %d: Check the thread is doing something...", conn->connection_number);
+ if ((config.dont_check_timeout == 0) && (config.timeout != 0)) {
+ debug_mutex_lock(&conn->watchdog_mutex, 1000, 0);
+ uint64_t last_watchdog_bark_time = conn->watchdog_bark_time;
+ debug_mutex_unlock(&conn->watchdog_mutex, 0);
+ if (last_watchdog_bark_time != 0) {
+ uint64_t time_since_last_bark = (get_absolute_time_in_fp() - last_watchdog_bark_time) >> 32;
+ uint64_t ct = config.timeout; // go from int to 64-bit int
+
+ if (time_since_last_bark >= ct) {
+ conn->watchdog_barks++;
+ if (conn->watchdog_barks == 1) {
+ // debuglev = 3; // tell us everything.
+ debug(1, "Connection %d: As Yeats almost said, \"Too long a silence / can make a stone "
+ "of the heart\".",
+ conn->connection_number);
+ conn->stop = 1;
+ pthread_cancel(conn->thread);
+ } else if (conn->watchdog_barks == 3) {
+ if ((config.cmd_unfixable) && (conn->unfixable_error_reported == 0)) {
+ conn->unfixable_error_reported = 1;
+ command_execute(config.cmd_unfixable, "unable_to_cancel_play_session", 1);
+ } else {
+ warn("an unrecoverable error, \"unable_to_cancel_play_session\", has been detected.",
+ conn->connection_number);
+ }
+ }
+ }
+ }
+ }
+ } while (1);
+ pthread_cleanup_pop(0); // should never happen
+ pthread_exit(NULL);
+}
+
void ask_other_rtsp_conversation_threads_to_stop(pthread_t except_this_thread);
void rtsp_request_shutdown_stream(void) {
@@ -257,7 +333,20 @@ static void track_thread(rtsp_conn_info *conn) {
}
}
-static void cleanup_threads(void) {
+void cancel_all_RTSP_threads(void) {
+ int i;
+ for (i = 0; i < nconns; i++) {
+ debug(1, "Connection %d: cancelling.", conns[i]->connection_number);
+ pthread_cancel(conns[i]->thread);
+ }
+ for (i = 0; i < nconns; i++) {
+ debug(1, "Connection %d: joining.", conns[i]->connection_number);
+ pthread_join(conns[i]->thread, NULL);
+ free(conns[i]);
+ }
+}
+
+void cleanup_threads(void) {
void *retval;
int i;
// debug(2, "culling threads.");
@@ -267,8 +356,6 @@ static void cleanup_threads(void) {
conns[i]->connection_number);
pthread_join(conns[i]->thread, &retval);
debug(3, "RTSP connection thread %d deleted...", conns[i]->connection_number);
- if (conns[i] == playing_conn)
- playing_conn = NULL;
free(conns[i]);
nconns--;
if (nconns)
@@ -288,8 +375,11 @@ void ask_other_rtsp_conversation_threads_to_stop(pthread_t except_this_thread) {
for (i = 0; i < nconns; i++) {
if (((except_this_thread == 0) || (pthread_equal(conns[i]->thread, except_this_thread) == 0)) &&
(conns[i]->running != 0)) {
- conns[i]->stop = 1;
- pthread_kill(conns[i]->thread, SIGUSR1);
+ pthread_cancel(conns[i]->thread);
+ pthread_join(conns[i]->thread, NULL);
+ debug(1, "Connection %d: asked to stop.", conns[i]->connection_number);
+ // conns[i]->stop = 1;
+ // pthread_kill(conns[i]->thread, SIGUSR1);
}
}
}
@@ -317,32 +407,35 @@ static char *nextline(char *in, int inbuf) {
return out;
}
-static void msg_retain(rtsp_message *msg) {
- if (msg) {
- int rc = pthread_mutex_lock(&reference_counter_lock);
- if (rc)
- debug(1, "Error %d locking reference counter lock");
+void msg_retain(rtsp_message *msg) {
+ int rc = pthread_mutex_lock(&reference_counter_lock);
+ if (rc)
+ debug(1, "Error %d locking reference counter lock");
+ if (msg > (rtsp_message *)0x00010000) {
msg->referenceCount++;
+ // debug(1,"msg_retain -- item %d reference count %d.", msg->index_number, msg->referenceCount);
rc = pthread_mutex_unlock(&reference_counter_lock);
if (rc)
debug(1, "Error %d unlocking reference counter lock");
} else {
- debug(1, "null rtsp_message pointer passed to retain");
+ debug(1, "invalid rtsp_message pointer 0x%x passed to retain", (uintptr_t)msg);
}
}
-static rtsp_message *msg_init(void) {
+rtsp_message *msg_init(void) {
rtsp_message *msg = malloc(sizeof(rtsp_message));
if (msg) {
memset(msg, 0, sizeof(rtsp_message));
msg->referenceCount = 1; // from now on, any access to this must be protected with the lock
+ msg->index_number = msg_indexes++;
} else {
- die("can not allocate memory for an rtsp_message.");
+ die("msg_init -- can not allocate memory for rtsp_message %d.", msg_indexes);
}
+ // debug(1,"msg_init -- create item %d.", msg->index_number);
return msg;
}
-static int msg_add_header(rtsp_message *msg, char *name, char *value) {
+int msg_add_header(rtsp_message *msg, char *name, char *value) {
if (msg->nheaders >= sizeof(msg->name) / sizeof(char *)) {
warn("too many headers?!");
return 1;
@@ -355,7 +448,7 @@ static int msg_add_header(rtsp_message *msg, char *name, char *value) {
return 0;
}
-static char *msg_get_header(rtsp_message *msg, char *name) {
+char *msg_get_header(rtsp_message *msg, char *name) {
unsigned int i;
for (i = 0; i < msg->nheaders; i++)
if (!strcasecmp(msg->name[i], name))
@@ -363,7 +456,7 @@ static char *msg_get_header(rtsp_message *msg, char *name) {
return NULL;
}
-static void debug_print_msg_headers(int level, rtsp_message *msg) {
+void debug_print_msg_headers(int level, rtsp_message *msg) {
unsigned int i;
for (i = 0; i < msg->nheaders; i++) {
debug(level, " Type: \"%s\", content: \"%s\"", msg->name[i], msg->value[i]);
@@ -393,16 +486,11 @@ static void debug_print_msg_content(int level, rtsp_message *msg) {
}
*/
-static void msg_free(rtsp_message *msg) {
-
- if (msg) {
- int rc = pthread_mutex_lock(&reference_counter_lock);
- if (rc)
- debug(1, "Error %d locking reference counter lock during msg_free()", rc);
+void msg_free(rtsp_message **msgh) {
+ debug_mutex_lock(&reference_counter_lock, 1000, 0);
+ if (*msgh > (rtsp_message *)0x00010000) {
+ rtsp_message *msg = *msgh;
msg->referenceCount--;
- rc = pthread_mutex_unlock(&reference_counter_lock);
- if (rc)
- debug(1, "Error %d unlocking reference counter lock during msg_free()", rc);
if (msg->referenceCount == 0) {
unsigned int i;
for (i = 0; i < msg->nheaders; i++) {
@@ -411,17 +499,27 @@ static void msg_free(rtsp_message *msg) {
}
if (msg->content)
free(msg->content);
+ // debug(1,"msg_free item %d -- free.",msg->index_number);
+ uintptr_t index = (msg->index_number) & 0xFFFF;
+ if (index == 0)
+ index = 0x10000; // ensure it doesn't fold to zero.
+ *msgh =
+ (rtsp_message *)(index); // put a version of the index number of the freed message in here
free(msg);
- } // else {
- // debug(1,"rtsp_message reference count non-zero:
- // %d!",msg->referenceCount);
- //}
- } else {
- debug(1, "null rtsp_message pointer passed to msg_free()");
+ } else {
+ // debug(1,"msg_free item %d -- decrement reference to
+ // %d.",msg->index_number,msg->referenceCount);
+ }
+ } else if (*msgh != NULL) {
+ debug(1,
+ "msg_free: error attempting to free an allocated but already-freed rtsp_message, number "
+ "%d.",
+ (uintptr_t)*msgh);
}
+ debug_mutex_unlock(&reference_counter_lock, 0);
}
-static int msg_handle_line(rtsp_message **pmsg, char *line) {
+int msg_handle_line(rtsp_message **pmsg, char *line) {
rtsp_message *msg = *pmsg;
if (!msg) {
@@ -430,7 +528,7 @@ static int msg_handle_line(rtsp_message **pmsg, char *line) {
char *sp, *p;
sp = NULL; // this is to quieten a compiler warning
- // debug(1, "received request: %s", line);
+ debug(3, "RTSP Message Received: \"%s\".", line);
p = strtok_r(line, " ", &sp);
if (!p)
@@ -471,36 +569,43 @@ static int msg_handle_line(rtsp_message **pmsg, char *line) {
}
fail:
- *pmsg = NULL;
- msg_free(msg);
+ msg_free(pmsg);
return 0;
}
-static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn,
- rtsp_message **the_packet) {
- enum rtsp_read_request_response reply = rtsp_read_request_response_ok;
- ssize_t buflen = 4096;
- char *buf = malloc(buflen + 1);
+enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, rtsp_message **the_packet) {
- rtsp_message *msg = NULL;
+ *the_packet = NULL; // need this for erro handling
+ enum rtsp_read_request_response reply = rtsp_read_request_response_ok;
+ ssize_t buflen = 4096;
+ int release_buffer = 0; // on exit, don't deallocate the buffer if everything was okay
+ char *buf = malloc(buflen + 1); // add a NUL at the end
+ if (!buf) {
+ warn("rtsp_read_request: can't get a buffer.");
+ return (rtsp_read_request_response_error);
+ }
+ pthread_cleanup_push(malloc_cleanup, buf);
ssize_t nread;
ssize_t inbuf = 0;
int msg_size = -1;
while (msg_size < 0) {
- fd_set readfds;
- FD_ZERO(&readfds);
- FD_SET(conn->fd, &readfds);
- do {
- memory_barrier();
- } while (conn->stop == 0 &&
- pselect(conn->fd + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
+ /*
+fd_set readfds;
+FD_ZERO(&readfds);
+FD_SET(conn->fd, &readfds);
+do {
+ memory_barrier();
+} while (conn->stop == 0 &&
+ pselect(conn->fd + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
+*/
if (conn->stop != 0) {
debug(3, "RTSP conversation thread %d shutdown requested.", conn->connection_number);
reply = rtsp_read_request_response_immediate_shutdown_requested;
goto shutdown;
}
+
nread = read(conn->fd, buf + inbuf, buflen - inbuf);
if (nread == 0) {
@@ -513,9 +618,16 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn,
if (nread < 0) {
if (errno == EINTR)
continue;
- char errorstring[1024];
- strerror_r(errno, (char *)errorstring, sizeof(errorstring));
- debug(1, "rtsp_read_request_response_read_error %d: \"%s\".", errno, (char *)errorstring);
+ if (errno == EAGAIN) {
+ debug(1, "Getting Error 11 -- EAGAIN from a blocking read!");
+ continue;
+ }
+ if (errno != ECONNRESET) {
+ char errorstring[1024];
+ strerror_r(errno, (char *)errorstring, sizeof(errorstring));
+ debug(1, "Connection %d: rtsp_read_request_response_read_error %d: \"%s\".",
+ conn->connection_number, errno, (char *)errorstring);
+ }
reply = rtsp_read_request_response_read_error;
goto shutdown;
}
@@ -523,9 +635,9 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn,
char *next;
while (msg_size < 0 && (next = nextline(buf, inbuf))) {
- msg_size = msg_handle_line(&msg, buf);
+ msg_size = msg_handle_line(the_packet, buf);
- if (!msg) {
+ if (!(*the_packet)) {
warn("no RTSP header received");
reply = rtsp_read_request_response_bad_packet;
goto shutdown;
@@ -538,7 +650,7 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn,
}
if (msg_size > buflen) {
- buf = realloc(buf, msg_size);
+ buf = realloc(buf, msg_size + 1);
if (!buf) {
warn("too much content");
reply = rtsp_read_request_response_error;
@@ -571,6 +683,8 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn,
warning_message_sent = 1;
}
}
+
+ /*
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(conn->fd, &readfds);
@@ -578,6 +692,8 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn,
memory_barrier();
} while (conn->stop == 0 &&
pselect(conn->fd + 1, &readfds, NULL, NULL, NULL, &pselect_sigset) <= 0);
+ */
+
if (conn->stop != 0) {
debug(1, "RTSP shutdown requested.");
reply = rtsp_read_request_response_immediate_shutdown_requested;
@@ -586,7 +702,7 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn,
size_t read_chunk = msg_size - inbuf;
if (read_chunk > max_read_chunk)
read_chunk = max_read_chunk;
- usleep(40000); // wait about 40 milliseconds between reads of up to about 64 kB
+ usleep(80000); // wait about 80 milliseconds between reads of up to about 64 kB
nread = read(conn->fd, buf + inbuf, read_chunk);
if (!nread) {
reply = rtsp_read_request_response_error;
@@ -602,23 +718,22 @@ static enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn,
inbuf += nread;
}
+ rtsp_message *msg = *the_packet;
msg->contentlength = inbuf;
msg->content = buf;
+ char *jp = inbuf + buf;
+ *jp = '\0';
*the_packet = msg;
- return reply;
-
shutdown:
- if (msg) {
- msg_free(msg); // which will free the content and everything else
+ if (reply != rtsp_read_request_response_ok) {
+ msg_free(the_packet);
+ release_buffer = 1; // allow the buffer to be released
}
- // in case the message wasn't formed or wasn't fully initialised
- if ((msg && (msg->content == NULL)) || (!msg))
- free(buf);
- *the_packet = NULL;
+ pthread_cleanup_pop(release_buffer);
return reply;
}
-static void msg_write_response(int fd, rtsp_message *resp) {
+int msg_write_response(int fd, rtsp_message *resp) {
char pkt[2048];
int pktfree = sizeof(pkt);
char *p = pkt;
@@ -636,8 +751,10 @@ static void msg_write_response(int fd, rtsp_message *resp) {
n = snprintf(p, pktfree, "%s: %s\r\n", resp->name[i], resp->value[i]);
pktfree -= n;
p += n;
- if (pktfree <= 1024)
- die("Attempted to write overlong RTSP packet 1");
+ if (pktfree <= 1024) {
+ debug(1, "Attempted to write overlong RTSP packet 1");
+ return -1;
+ }
}
// Here, if there's content, write the Content-Length header ...
@@ -647,8 +764,10 @@ static void msg_write_response(int fd, rtsp_message *resp) {
n = snprintf(p, pktfree, "Content-Length: %d\r\n", resp->contentlength);
pktfree -= n;
p += n;
- if (pktfree <= 1024)
- die("Attempted to write overlong RTSP packet 2");
+ if (pktfree <= 1024) {
+ debug(1, "Attempted to write overlong RTSP packet 2");
+ return -2;
+ }
debug(1, "Content is \"%s\"", resp->content);
memcpy(p, resp->content, resp->contentlength);
pktfree -= resp->contentlength;
@@ -659,212 +778,248 @@ static void msg_write_response(int fd, rtsp_message *resp) {
pktfree -= n;
p += n;
- if (pktfree <= 1024)
- die("Attempted to write overlong RTSP packet 3");
-
- if (write(fd, pkt, p - pkt) != p - pkt)
- debug(1, "Error writing an RTSP packet -- requested bytes not fully written.");
+ if (pktfree <= 1024) {
+ debug(1, "Attempted to write overlong RTSP packet 3");
+ return -3;
+ }
+ ssize_t reply = write(fd, pkt, p - pkt);
+ if (reply == -1) {
+ char errorstring[1024];
+ strerror_r(errno, (char *)errorstring, sizeof(errorstring));
+ debug(1, "msg_write_response error %d: \"%s\".", errno, (char *)errorstring);
+ return -4;
+ }
+ if (reply != p - pkt) {
+ debug(1, "msg_write_response error -- requested bytes: %d not fully written: %d.", p - pkt,
+ reply);
+ return -5;
+ }
+ return 0;
}
-static void handle_record(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) {
+void handle_record(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) {
debug(2, "Connection %d: RECORD", conn->connection_number);
+ if (have_player(conn)) {
+ if (conn->player_thread)
+ warn("Connection %d: RECORD: Duplicate RECORD message -- ignored", conn->connection_number);
+ else
+ player_play(conn); // the thread better be 0
- if (conn->player_thread)
- warn("Duplicate RECORD message -- ignored");
- else
- player_play(conn); // the thread better be 0
-
- resp->respcode = 200;
- // I think this is for telling the client what the absolute minimum latency
- // actually is,
- // and when the client specifies a latency, it should be added to this figure.
+ resp->respcode = 200;
+ // I think this is for telling the client what the absolute minimum latency
+ // actually is,
+ // and when the client specifies a latency, it should be added to this figure.
- // Thus, [the old version of] AirPlay's latency figure of 77175, when added to 11025 gives you
- // exactly 88200
- // and iTunes' latency figure of 88553, when added to 11025 gives you 99578,
- // pretty close to the 99400 we guessed.
+ // Thus, [the old version of] AirPlay's latency figure of 77175, when added to 11025 gives you
+ // exactly 88200
+ // and iTunes' latency figure of 88553, when added to 11025 gives you 99578,
+ // pretty close to the 99400 we guessed.
- msg_add_header(resp, "Audio-Latency", "11025");
+ msg_add_header(resp, "Audio-Latency", "11025");
- char *p;
- uint32_t rtptime = 0;
- char *hdr = msg_get_header(req, "RTP-Info");
+ char *p;
+ uint32_t rtptime = 0;
+ char *hdr = msg_get_header(req, "RTP-Info");
- if (hdr) {
- // debug(1,"FLUSH message received: \"%s\".",hdr);
- // get the rtp timestamp
- p = strstr(hdr, "rtptime=");
- if (p) {
- p = strchr(p, '=');
+ if (hdr) {
+ // debug(1,"FLUSH message received: \"%s\".",hdr);
+ // get the rtp timestamp
+ p = strstr(hdr, "rtptime=");
if (p) {
- rtptime = uatoi(p + 1); // unsigned integer -- up to 2^32-1
- // rtptime--;
- // debug(1,"RTSP Flush Requested by handle_record: %u.",rtptime);
- player_flush(rtptime, conn);
+ p = strchr(p, '=');
+ if (p) {
+ rtptime = uatoi(p + 1); // unsigned integer -- up to 2^32-1
+ // rtptime--;
+ // debug(1,"RTSP Flush Requested by handle_record: %u.",rtptime);
+ player_flush(rtptime, conn);
+ }
}
}
+ } else {
+ warn("Connection %d RECORD received without having the player (no ANNOUNCE?)",
+ conn->connection_number);
+ resp->respcode = 451;
}
}
-static void handle_options(rtsp_conn_info *conn, __attribute__((unused)) rtsp_message *req,
- rtsp_message *resp) {
+void handle_options(rtsp_conn_info *conn, __attribute__((unused)) rtsp_message *req,
+ rtsp_message *resp) {
debug(3, "Connection %d: OPTIONS", conn->connection_number);
resp->respcode = 200;
- msg_add_header(resp, "Public",
- "ANNOUNCE, SETUP, RECORD, "
- "PAUSE, FLUSH, TEARDOWN, "
- "OPTIONS, GET_PARAMETER, SET_PARAMETER");
+ msg_add_header(resp, "Public", "ANNOUNCE, SETUP, RECORD, "
+ "PAUSE, FLUSH, TEARDOWN, "
+ "OPTIONS, GET_PARAMETER, SET_PARAMETER");
}
-static void handle_teardown(rtsp_conn_info *conn, __attribute__((unused)) rtsp_message *req,
- rtsp_message *resp) {
+void handle_teardown(rtsp_conn_info *conn, __attribute__((unused)) rtsp_message *req,
+ rtsp_message *resp) {
debug(2, "Connection %d: TEARDOWN", conn->connection_number);
- // if (!rtsp_playing())
- // debug(1, "This RTSP connection thread (%d) doesn't think it's playing, but "
- // "it's sending a response to teardown anyway",conn->connection_number);
- resp->respcode = 200;
- msg_add_header(resp, "Connection", "close");
-
- debug(3,
+ if (have_player(conn)) {
+ resp->respcode = 200;
+ msg_add_header(resp, "Connection", "close");
+ debug(
+ 3,
"TEARDOWN: synchronously terminating the player thread of RTSP conversation thread %d (2).",
conn->connection_number);
- player_stop(conn);
- debug(3, "TEARDOWN: successful termination of playing thread of RTSP conversation thread %d.",
- conn->connection_number);
+ player_stop(conn);
+ debug(3, "TEARDOWN: successful termination of playing thread of RTSP conversation thread %d.",
+ conn->connection_number);
+ } else {
+ warn("Connection %d TEARDOWN received without having the player (no ANNOUNCE?)",
+ conn->connection_number);
+ resp->respcode = 451;
+ }
}
-static void handle_flush(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) {
+void handle_flush(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) {
debug(3, "Connection %d: FLUSH", conn->connection_number);
- // if (!rtsp_playing())
- // debug(1, "This RTSP conversation thread (%d) doesn't think it's playing, but "
- // "it's sending a response to flush anyway",conn->connection_number);
- char *p = NULL;
- uint32_t rtptime = 0;
- char *hdr = msg_get_header(req, "RTP-Info");
-
- if (hdr) {
- // debug(1,"FLUSH message received: \"%s\".",hdr);
- // get the rtp timestamp
- p = strstr(hdr, "rtptime=");
- if (p) {
- p = strchr(p, '=');
- if (p)
- rtptime = uatoi(p + 1); // unsigned integer -- up to 2^32-1
+ if (have_player(conn)) {
+ char *p = NULL;
+ uint32_t rtptime = 0;
+ char *hdr = msg_get_header(req, "RTP-Info");
+
+ if (hdr) {
+ // debug(1,"FLUSH message received: \"%s\".",hdr);
+ // get the rtp timestamp
+ p = strstr(hdr, "rtptime=");
+ if (p) {
+ p = strchr(p, '=');
+ if (p)
+ rtptime = uatoi(p + 1); // unsigned integer -- up to 2^32-1
+ }
}
- }
// debug(1,"RTSP Flush Requested: %u.",rtptime);
#ifdef CONFIG_METADATA
- if (p)
- send_metadata('ssnc', 'flsr', p + 1, strlen(p + 1), req, 1);
- else
- send_metadata('ssnc', 'flsr', NULL, 0, NULL, 0);
+ if (p)
+ send_metadata('ssnc', 'flsr', p + 1, strlen(p + 1), req, 1);
+ else
+ send_metadata('ssnc', 'flsr', NULL, 0, NULL, 0);
#endif
- player_flush(rtptime, conn); // will not crash even it there is no player thread.
- resp->respcode = 200;
+ player_flush(rtptime, conn); // will not crash even it there is no player thread.
+ resp->respcode = 200;
+
+ } else {
+ warn("Connection %d FLUSH received without having the player (no ANNOUNCE?)",
+ conn->connection_number);
+ resp->respcode = 451;
+ }
}
-static void handle_setup(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) {
+void handle_setup(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) {
debug(3, "Connection %d: SETUP", conn->connection_number);
- uint16_t cport, tport;
-
- char *ar = msg_get_header(req, "Active-Remote");
- if (ar) {
- debug(2, "Connection %d: SETUP -- Active-Remote string seen: \"%s\".", conn->connection_number,
- ar);
- // get the active remote
- char *p;
- conn->dacp_active_remote = strtoul(ar, &p, 10);
+ resp->respcode = 451; // invalid arguments -- expect them
+ if (have_player(conn)) {
+ uint16_t cport, tport;
+ char *ar = msg_get_header(req, "Active-Remote");
+ if (ar) {
+ debug(2, "Connection %d: SETUP -- Active-Remote string seen: \"%s\".",
+ conn->connection_number, ar);
+ // get the active remote
+ char *p;
+ conn->dacp_active_remote = strtoul(ar, &p, 10);
#ifdef CONFIG_METADATA
- send_metadata('ssnc', 'acre', ar, strlen(ar), req, 1);
+ send_metadata('ssnc', 'acre', ar, strlen(ar), req, 1);
#endif
- } else {
- debug(2, "Connection %d: SETUP -- Note: no Active-Remote information the SETUP Record.",
- conn->connection_number);
- conn->dacp_active_remote = 0;
- }
+ } else {
+ debug(2, "Connection %d: SETUP -- Note: no Active-Remote information the SETUP Record.",
+ conn->connection_number);
+ conn->dacp_active_remote = 0;
+ }
- ar = msg_get_header(req, "DACP-ID");
- if (ar) {
- debug(2, "Connection %d: SETUP -- DACP-ID string seen: \"%s\".", conn->connection_number, ar);
- if (conn->dacp_id) // this is in case SETUP was previously called
- free(conn->dacp_id);
- conn->dacp_id = strdup(ar);
+ ar = msg_get_header(req, "DACP-ID");
+ if (ar) {
+ debug(2, "Connection %d: SETUP -- DACP-ID string seen: \"%s\".", conn->connection_number, ar);
+ if (conn->dacp_id) // this is in case SETUP was previously called
+ free(conn->dacp_id);
+ conn->dacp_id = strdup(ar);
#ifdef CONFIG_METADATA
- send_metadata('ssnc', 'daid', ar, strlen(ar), req, 1);
+ send_metadata('ssnc', 'daid', ar, strlen(ar), req, 1);
#endif
- } else {
- debug(2, "Connection %d: SETUP doesn't include DACP-ID string information.",
- conn->connection_number);
- if (conn->dacp_id) // this is in case SETUP was previously called
- free(conn->dacp_id);
- conn->dacp_id = NULL;
- }
-
- char *hdr = msg_get_header(req, "Transport");
- if (!hdr) {
- debug(1, "Connection %d: SETUP doesn't contain a Transport header.", conn->connection_number);
- goto error;
- }
+ } else {
+ debug(2, "Connection %d: SETUP doesn't include DACP-ID string information.",
+ conn->connection_number);
+ if (conn->dacp_id) // this is in case SETUP was previously called
+ free(conn->dacp_id);
+ conn->dacp_id = NULL;
+ }
- char *p;
- p = strstr(hdr, "control_port=");
- if (!p) {
- debug(1, "Connection %d: SETUP doesn't specify a control_port.", conn->connection_number);
- goto error;
- }
- p = strchr(p, '=') + 1;
- cport = atoi(p);
+ char *hdr = msg_get_header(req, "Transport");
+ if (hdr) {
+ char *p;
+ p = strstr(hdr, "control_port=");
+ if (p) {
+ p = strchr(p, '=') + 1;
+ cport = atoi(p);
- p = strstr(hdr, "timing_port=");
- if (!p) {
- debug(1, "Connection %d: SETUP doesn't specify a timing_port.", conn->connection_number);
- goto error;
- }
- p = strchr(p, '=') + 1;
- tport = atoi(p);
-
- if (conn->rtp_running) {
- if ((conn->remote_control_port != cport) || (conn->remote_timing_port != tport)) {
- warn("Connection %d: Duplicate SETUP message with different control (old %u, new %u) or "
- "timing (old %u, new "
- "%u) ports! This is probably fatal!",
- conn->connection_number, conn->remote_control_port, cport, conn->remote_timing_port,
- tport);
+ p = strstr(hdr, "timing_port=");
+ if (p) {
+ p = strchr(p, '=') + 1;
+ tport = atoi(p);
+
+ if (conn->rtp_running) {
+ if ((conn->remote_control_port != cport) || (conn->remote_timing_port != tport)) {
+ warn("Connection %d: Duplicate SETUP message with different control (old %u, new %u) "
+ "or "
+ "timing (old %u, new "
+ "%u) ports! This is probably fatal!",
+ conn->connection_number, conn->remote_control_port, cport,
+ conn->remote_timing_port, tport);
+ } else {
+ warn("Connection %d: Duplicate SETUP message with the same control (%u) and timing "
+ "(%u) "
+ "ports. This is "
+ "probably not fatal.",
+ conn->connection_number, conn->remote_control_port, conn->remote_timing_port);
+ }
+ } else {
+ rtp_setup(&conn->local, &conn->remote, cport, tport, conn);
+ }
+ if (conn->local_audio_port != 0) {
+
+ char resphdr[256] = "";
+ snprintf(resphdr, sizeof(resphdr),
+ "RTP/AVP/"
+ "UDP;unicast;interleaved=0-1;mode=record;control_port=%d;"
+ "timing_port=%d;server_"
+ "port=%d",
+ conn->local_control_port, conn->local_timing_port, conn->local_audio_port);
+
+ msg_add_header(resp, "Transport", resphdr);
+
+ msg_add_header(resp, "Session", "1");
+
+ resp->respcode = 200; // it all worked out okay
+ debug(1, "Connection %d: SETUP DACP-ID \"%s\" from %s to %s with UDP ports Control: "
+ "%d, Timing: %d and Audio: %d.",
+ conn->connection_number, conn->dacp_id, &conn->client_ip_string,
+ &conn->self_ip_string, conn->local_control_port, conn->local_timing_port,
+ conn->local_audio_port);
+
+ } else {
+ debug(1, "Connection %d: SETUP seems to specify a null audio port.",
+ conn->connection_number);
+ }
+ } else {
+ debug(1, "Connection %d: SETUP doesn't specify a timing_port.", conn->connection_number);
+ }
+ } else {
+ debug(1, "Connection %d: SETUP doesn't specify a control_port.", conn->connection_number);
+ }
} else {
- warn("Connection %d: Duplicate SETUP message with the same control (%u) and timing (%u) "
- "ports. This is "
- "probably not fatal.",
- conn->connection_number, conn->remote_control_port, conn->remote_timing_port);
+ debug(1, "Connection %d: SETUP doesn't contain a Transport header.", conn->connection_number);
}
+ if (resp->respcode != 200) {
+ debug(1, "Connection %d: SETUP error -- releasing the player lock.", conn->connection_number);
+ debug_mutex_lock(&playing_conn_lock, 1000000, 3);
+ if (playing_conn == conn) // if we have the player
+ playing_conn = NULL; // let it go
+ debug_mutex_unlock(&playing_conn_lock, 3);
+ }
+
} else {
- rtp_setup(&conn->local, &conn->remote, cport, tport, conn);
- }
- if (conn->local_audio_port == 0) {
- debug(1, "Connection %d: SETUP seems to specify a null audio port.", conn->connection_number);
- goto error;
+ warn("Connection %d SETUP received without having the player (no ANNOUNCE?)",
+ conn->connection_number);
}
-
- char resphdr[256] = "";
- snprintf(resphdr, sizeof(resphdr),
- "RTP/AVP/"
- "UDP;unicast;interleaved=0-1;mode=record;control_port=%d;"
- "timing_port=%d;server_"
- "port=%d",
- conn->local_control_port, conn->local_timing_port, conn->local_audio_port);
-
- msg_add_header(resp, "Transport", resphdr);
-
- msg_add_header(resp, "Session", "1");
-
- resp->respcode = 200;
- return;
-
-error:
- warn("Connection %d: SETUP -- Error in setup request -- unlocking play lock.",
- conn->connection_number);
- playing_conn = NULL;
- pthread_mutex_unlock(&play_lock);
- resp->respcode = 451; // invalid arguments
}
/*
@@ -874,26 +1029,44 @@ static void handle_ignore(rtsp_conn_info *conn, rtsp_message *req, rtsp_message
}
*/
-static void handle_set_parameter_parameter(rtsp_conn_info *conn, rtsp_message *req,
- __attribute__((unused)) rtsp_message *resp) {
+void handle_set_parameter_parameter(rtsp_conn_info *conn, rtsp_message *req,
+ __attribute__((unused)) rtsp_message *resp) {
+
char *cp = req->content;
int cp_left = req->contentlength;
+ /*
+ int k = cp_left;
+ if (k>max_bytes)
+ k = max_bytes;
+ for (i = 0; i < k; i++)
+ snprintf((char *)buf + 2 * i, 3, "%02x", cp[i]);
+ debug(1, "handle_set_parameter_parameter: \"%s\".",buf);
+ */
+
char *next;
while (cp_left && cp) {
next = nextline(cp, cp_left);
- cp_left -= next - cp;
+ // note: "next" will return NULL if there is no \r or \n or \r\n at the end of this
+ // but we are always guaranteed that if cp is not null, it will be pointing to something
+ // NUL-terminated
- if (!strncmp(cp, "volume: ", 8)) {
- float volume = atof(cp + 8);
+ if (next)
+ cp_left -= (next - cp);
+ else
+ cp_left = 0;
+
+ if (!strncmp(cp, "volume: ", strlen("volume: "))) {
+ float volume = atof(cp + strlen("volume: "));
// debug(2, "AirPlay request to set volume to: %f.", volume);
player_volume(volume, conn);
} else
#ifdef CONFIG_METADATA
- if (!strncmp(cp, "progress: ", 10)) {
- char *progress = cp + 10;
- // debug(2, "progress: \"%s\"\n",
- // progress); // rtpstampstart/rtpstampnow/rtpstampend 44100 per second
+ if (!strncmp(cp, "progress: ", strlen("progress: "))) {
+ char *progress = cp + strlen("progress: ");
+ // debug(2, "progress: \"%s\"",progress); // rtpstampstart/rtpstampnow/rtpstampend 44100 per
+ // second
send_ssnc_metadata('prgr', strdup(progress), strlen(progress), 1);
+
} else
#endif
{
@@ -931,6 +1104,8 @@ static void handle_set_parameter_parameter(rtsp_conn_info *conn, rtsp_message *r
// 'PICT' -- the payload is a picture, either a JPEG or a PNG. Check the
// first few bytes to see
// which.
+// 'abeg' -- active mode entered. No arguments
+// 'aend' -- active mode exited. No arguments
// 'pbeg' -- play stream begin. No arguments
// 'pend' -- play stream end. No arguments
// 'pfls' -- play stream flush. No arguments
@@ -1044,9 +1219,9 @@ static char *metadata_sockmsg;
#define metadata_queue_size 500
metadata_package metadata_queue_items[metadata_queue_size];
-static pthread_t metadata_thread;
+pthread_t metadata_thread;
-void metadata_create(void) {
+void metadata_create_multicast_socket(void) {
if (config.metadata_enabled == 0)
return;
@@ -1068,7 +1243,7 @@ void metadata_create(void) {
if (metadata_sockmsg) {
memset(metadata_sockmsg, 0, config.metadata_sockmsglength);
} else {
- die("Could not malloc metadata socket buffer");
+ die("Could not malloc metadata multicast socket buffer");
}
}
}
@@ -1087,6 +1262,15 @@ void metadata_create(void) {
umask(oldumask);
}
+void metadata_delete_multicast_socket(void) {
+ if (config.metadata_enabled == 0)
+ return;
+ shutdown(metadata_sock, SHUT_RDWR); // we want to immediately deallocate the buffer
+ close(metadata_sock);
+ if (metadata_sockmsg)
+ free(metadata_sockmsg);
+}
+
void metadata_open(void) {
if (config.metadata_enabled == 0)
return;
@@ -1104,12 +1288,12 @@ void metadata_open(void) {
free(path);
}
-/*
static void metadata_close(void) {
+ if (fd < 0)
+ return;
close(fd);
fd = -1;
}
-*/
void metadata_process(uint32_t type, uint32_t code, char *data, uint32_t length) {
// debug(2, "Process metadata with type %x, code %x and length %u.", type, code, length);
@@ -1231,32 +1415,63 @@ void metadata_process(uint32_t type, uint32_t code, char *data, uint32_t length)
}
}
+void metadata_thread_cleanup_function(__attribute__((unused)) void *arg) {
+ debug(2, "metadata_thread_cleanup_function called");
+ metadata_delete_multicast_socket();
+ metadata_close();
+ pc_queue_delete(&metadata_queue);
+}
+
+void metadata_pack_cleanup_function(void *arg) {
+ // debug(1, "metadata_pack_cleanup_function called");
+ metadata_package *pack = (metadata_package *)arg;
+ if (pack->carrier)
+ msg_free(&pack->carrier); // release the message
+ else if (pack->data)
+ free(pack->data);
+}
+
void *metadata_thread_function(__attribute__((unused)) void *ignore) {
- metadata_create();
+ // create a pc_queue for passing information to a threaded metadata handler
+ pc_queue_init(&metadata_queue, (char *)&metadata_queue_items, sizeof(metadata_package),
+ metadata_queue_size);
+ metadata_create_multicast_socket();
metadata_package pack;
+ pthread_cleanup_push(metadata_thread_cleanup_function, NULL);
while (1) {
pc_queue_get_item(&metadata_queue, &pack);
+ pthread_cleanup_push(metadata_pack_cleanup_function, (void *)&pack);
if (config.metadata_enabled) {
metadata_process(pack.type, pack.code, pack.data, pack.length);
-#ifdef HAVE_METADATA_HUB
+#ifdef CONFIG_METADATA_HUB
metadata_hub_process_metadata(pack.type, pack.code, pack.data, pack.length);
#endif
+
+#ifdef CONFIG_MQTT
+ if (config.mqtt_enabled) {
+ mqtt_process_metadata(pack.type, pack.code, pack.data, pack.length);
+ }
+#endif
}
- if (pack.carrier)
- msg_free(pack.carrier); // release the message
- else if (pack.data)
- free(pack.data);
+ pthread_cleanup_pop(1);
}
+ pthread_cleanup_pop(1); // will never happen
pthread_exit(NULL);
}
void metadata_init(void) {
- // create a pc_queue for passing information to a threaded metadata handler
- pc_queue_init(&metadata_queue, (char *)&metadata_queue_items, sizeof(metadata_package),
- metadata_queue_size);
int ret = pthread_create(&metadata_thread, NULL, metadata_thread_function, NULL);
if (ret)
debug(1, "Failed to create metadata thread!");
+ metadata_running = 1;
+}
+
+void metadata_stop(void) {
+ if (metadata_running) {
+ debug(2, "metadata_stop called.");
+ pthread_cancel(metadata_thread);
+ pthread_join(metadata_thread, NULL);
+ }
}
int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rtsp_message *carrier,
@@ -1293,14 +1508,17 @@ int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rts
pack.code = code;
pack.data = data;
pack.length = length;
- if (carrier)
- msg_retain(carrier);
pack.carrier = carrier;
+ if (pack.carrier)
+ msg_retain(pack.carrier);
int rc = pc_queue_add_item(&metadata_queue, &pack, block);
- if ((rc == EBUSY) && (carrier))
- msg_free(carrier);
- if (rc == EBUSY)
- warn("Metadata queue is busy, dropping message of type 0x%08X, code 0x%08X.", type, code);
+ if (rc == EBUSY) {
+ if (pack.carrier)
+ msg_free(&pack.carrier);
+ else if (data)
+ free(data);
+ warn("Metadata queue is busy, discarding message of type 0x%08X, code 0x%08X.", type, code);
+ }
return rc;
}
@@ -1367,7 +1585,7 @@ static void handle_set_parameter(rtsp_conn_info *conn, rtsp_message *req, rtsp_m
char *ct = msg_get_header(req, "Content-Type");
if (ct) {
- // debug(2, "SET_PARAMETER Content-Type:\"%s\".", ct);
+// debug(2, "SET_PARAMETER Content-Type:\"%s\".", ct);
#ifdef CONFIG_METADATA
// It seems that the rtptime of the message is used as a kind of an ID that
@@ -1449,59 +1667,93 @@ static void handle_set_parameter(rtsp_conn_info *conn, rtsp_message *req, rtsp_m
}
static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) {
- debug(2, "Connection %d: ANNOUNCE", conn->connection_number);
+ debug(3, "Connection %d: ANNOUNCE", conn->connection_number);
+
int have_the_player = 0;
+ int should_wait = 0; // this will be true if you're trying to break in to the current session
+ int interrupting_current_session = 0;
+
+ // try to become the current playing_conn
- // interrupt session if permitted
- if (pthread_mutex_trylock(&play_lock) == 0) {
+ debug_mutex_lock(&playing_conn_lock, 1000000, 3); // get it
+
+ if (playing_conn == NULL) {
+ playing_conn = conn;
have_the_player = 1;
- } else if ((playing_conn) &&
- (playing_conn->connection_number == conn->connection_number)) { // duplicate ANNOUNCE
- warn("Duplicate ANNOUNCE, by the look of it!");
+ } else if (playing_conn == conn) {
have_the_player = 1;
- } else {
- int should_wait = 0;
-
- if (!playing_conn)
- die("Non existent playing_conn with play_lock enabled.");
- debug(1, "RTSP Conversation thread %d already playing when asked by thread %d.",
- playing_conn->connection_number, conn->connection_number);
- if (playing_conn->stop) {
- debug(1, "Playing connection is already shutting down; waiting for it...");
- should_wait = 1;
- } else if (config.allow_session_interruption == 1) {
- // some other thread has the player ... ask it to relinquish the thread
- debug(1, "ANNOUNCE: playing connection %d being interrupted by connection %d.",
- playing_conn->connection_number, conn->connection_number);
- if (playing_conn == conn) {
- debug(1, "ANNOUNCE asking to stop itself.");
- } else {
- playing_conn->stop = 1;
- memory_barrier();
- pthread_kill(playing_conn->thread, SIGUSR1);
- should_wait = 1;
+ warn("Duplicate ANNOUNCE, by the look of it!");
+ } else if (playing_conn->stop) {
+ debug(1, "Connection %d ANNOUNCE is waiting for connection %d to shut down.",
+ conn->connection_number, playing_conn->connection_number);
+ should_wait = 1;
+ } else if (config.allow_session_interruption == 1) {
+ debug(2, "Connection %d: ANNOUNCE: asking playing connection %d to shut down.",
+ conn->connection_number, playing_conn->connection_number);
+ playing_conn->stop = 1;
+ interrupting_current_session = 1;
+ should_wait = 1;
+ pthread_cancel(playing_conn->thread); // asking the RTSP thread to exit
+ }
+ debug_mutex_unlock(&playing_conn_lock, 3);
+
+ if (should_wait) {
+ int time_remaining = 3000000; // must be signed, as it could go negative...
+
+ while ((time_remaining > 0) && (have_the_player == 0)) {
+ debug_mutex_lock(&playing_conn_lock, 1000000, 3); // get it
+ if (playing_conn == NULL) {
+ playing_conn = conn;
+ have_the_player = 1;
+ }
+ debug_mutex_unlock(&playing_conn_lock, 3);
+
+ if (have_the_player == 0) {
+ usleep(100000);
+ time_remaining -= 100000;
}
}
- if (should_wait) {
- usleep(1000000); // here, it is possible for other connections to come in and nab the player.
- debug(1, "Try to get the player now");
+ if ((have_the_player == 1) && (interrupting_current_session == 1)) {
+ debug(2, "Connection %d: ANNOUNCE got the player", conn->connection_number);
+ } else {
+ debug(2, "Connection %d: ANNOUNCE failed to get the player", conn->connection_number);
}
- if (pthread_mutex_trylock(&play_lock) == 0)
- have_the_player = 1;
- else
- debug(1, "ANNOUNCE failed to get the player");
}
if (have_the_player) {
- playing_conn = conn; // the present connection is now playing
- debug(3, "RTSP conversation thread %d has acquired play lock.", conn->connection_number);
+ debug(3, "Connection %d: ANNOUNCE has acquired play lock.", conn->connection_number);
+
+ // now, if this new session did not break in, then it's okay to reset the next UDP ports
+ // to the start of the range
+
+ if (interrupting_current_session == 0) { // will be zero if it wasn't waiting to break in
+ resetFreeUDPPort();
+ }
+
+ /*
+ {
+ char *cp = req->content;
+ int cp_left = req->contentlength;
+ while (cp_left > 1) {
+ if (strlen(cp) != 0)
+ debug(1,">>>>>> %s", cp);
+ cp += strlen(cp) + 1;
+ cp_left -= strlen(cp) + 1;
+ }
+ }
+*/
+
+ conn->stream.type = ast_unknown;
resp->respcode = 456; // 456 - Header Field Not Valid for Resource
+ char *pssid = NULL;
char *paesiv = NULL;
char *prsaaeskey = NULL;
char *pfmtp = NULL;
char *pminlatency = NULL;
char *pmaxlatency = NULL;
+ // char *pAudioMediaInfo = NULL;
+ char *pUncompressedCDAudio = NULL;
char *cp = req->content;
int cp_left = req->contentlength;
char *next;
@@ -1509,24 +1761,67 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag
next = nextline(cp, cp_left);
cp_left -= next - cp;
- if (!strncmp(cp, "a=fmtp:", 7))
- pfmtp = cp + 7;
+ if (!strncmp(cp, "a=rtpmap:96 L16/44100/2", strlen("a=rtpmap:96 L16/44100/2")))
+ pUncompressedCDAudio = cp + strlen("a=rtpmap:96 L16/44100/2");
+
+ // if (!strncmp(cp, "m=audio", strlen("m=audio")))
+ // pAudioMediaInfo = cp + strlen("m=audio");
+
+ if (!strncmp(cp, "o=iTunes", strlen("o=iTunes")))
+ pssid = cp + strlen("o=iTunes");
+
+ if (!strncmp(cp, "a=fmtp:", strlen("a=fmtp:")))
+ pfmtp = cp + strlen("a=fmtp:");
- if (!strncmp(cp, "a=aesiv:", 8))
- paesiv = cp + 8;
+ if (!strncmp(cp, "a=aesiv:", strlen("a=aesiv:")))
+ paesiv = cp + strlen("a=aesiv:");
- if (!strncmp(cp, "a=rsaaeskey:", 12))
- prsaaeskey = cp + 12;
+ if (!strncmp(cp, "a=rsaaeskey:", strlen("a=rsaaeskey:")))
+ prsaaeskey = cp + strlen("a=rsaaeskey:");
- if (!strncmp(cp, "a=min-latency:", 14))
- pminlatency = cp + 14;
+ if (!strncmp(cp, "a=min-latency:", strlen("a=min-latency:")))
+ pminlatency = cp + strlen("a=min-latency:");
- if (!strncmp(cp, "a=max-latency:", 14))
- pmaxlatency = cp + 14;
+ if (!strncmp(cp, "a=max-latency:", strlen("a=max-latency:")))
+ pmaxlatency = cp + strlen("a=max-latency:");
cp = next;
}
+ if (pUncompressedCDAudio) {
+ debug(2, "An uncompressed PCM stream has been detected.");
+ conn->stream.type = ast_uncompressed;
+ conn->max_frames_per_packet = 352; // number of audio frames per packet.
+ conn->input_rate = 44100;
+ conn->input_num_channels = 2;
+ conn->input_bit_depth = 16;
+ conn->input_bytes_per_frame = conn->input_num_channels * ((conn->input_bit_depth + 7) / 8);
+
+ /*
+ int y = strlen(pAudioMediaInfo);
+ if (y > 0) {
+ char obf[4096];
+ if (y > 4096)
+ y = 4096;
+ char *p = pAudioMediaInfo;
+ char *obfp = obf;
+ int obfc;
+ for (obfc = 0; obfc < y; obfc++) {
+ snprintf(obfp, 3, "%02X", (unsigned int)*p);
+ p++;
+ obfp += 2;
+ };
+ *obfp = 0;
+ debug(1, "AudioMediaInfo: \"%s\".", obf);
+ }
+ */
+ }
+
+ if (pssid) {
+ uint32_t ssid = uatoi(pssid);
+ debug(3, "Synchronisation Source Identifier: %08X,%u", ssid, ssid);
+ }
+
if (pminlatency) {
conn->minimum_latency = atoi(pminlatency);
debug(3, "Minimum latency %d specified", conn->minimum_latency);
@@ -1545,22 +1840,6 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag
// debug(1,"Encrypted session requested");
}
- if (!pfmtp) {
- warn("FMTP params missing from the following ANNOUNCE message:");
- // print each line of the request content
- // the problem is that nextline has replace all returns, newlines, etc. by
- // NULLs
- char *cp = req->content;
- int cp_left = req->contentlength;
- while (cp_left > 1) {
- if (strlen(cp) != 0)
- warn(" %s", cp);
- cp += strlen(cp) + 1;
- cp_left -= strlen(cp) + 1;
- }
- goto out;
- }
-
if (conn->stream.encrypted) {
int len, keylen;
uint8_t *aesiv = base64_dec(paesiv, &len);
@@ -1583,12 +1862,41 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag
memcpy(conn->stream.aeskey, aeskey, 16);
free(aeskey);
}
- unsigned int i;
- for (i = 0; i < sizeof(conn->stream.fmtp) / sizeof(conn->stream.fmtp[0]); i++)
- conn->stream.fmtp[i] = atoi(strsep(&pfmtp, " \t"));
- // here we should check the sanity ot the fmtp values
- // for (i = 0; i < sizeof(conn->stream.fmtp) / sizeof(conn->stream.fmtp[0]); i++)
- // debug(1," fmtp[%2d] is: %10d",i,conn->stream.fmtp[i]);
+
+ if (pfmtp) {
+ conn->stream.type = ast_apple_lossless;
+ debug(3, "An ALAC stream has been detected.");
+ unsigned int i;
+ for (i = 0; i < sizeof(conn->stream.fmtp) / sizeof(conn->stream.fmtp[0]); i++)
+ conn->stream.fmtp[i] = atoi(strsep(&pfmtp, " \t"));
+ // here we should check the sanity ot the fmtp values
+ // for (i = 0; i < sizeof(conn->stream.fmtp) / sizeof(conn->stream.fmtp[0]); i++)
+ // debug(1," fmtp[%2d] is: %10d",i,conn->stream.fmtp[i]);
+
+ // set the parameters of the player (as distinct from the parameters of the decoder -- that's
+ // done later).
+ conn->max_frames_per_packet = conn->stream.fmtp[1]; // number of audio frames per packet.
+ conn->input_rate = conn->stream.fmtp[11];
+ conn->input_num_channels = conn->stream.fmtp[7];
+ conn->input_bit_depth = conn->stream.fmtp[3];
+ conn->input_bytes_per_frame = conn->input_num_channels * ((conn->input_bit_depth + 7) / 8);
+ }
+
+ if (conn->stream.type == ast_unknown) {
+ warn("Can not process the following ANNOUNCE message:");
+ // print each line of the request content
+ // the problem is that nextline has replace all returns, newlines, etc. by
+ // NULLs
+ char *cp = req->content;
+ int cp_left = req->contentlength;
+ while (cp_left > 1) {
+ if (strlen(cp) != 0)
+ warn(" %s", cp);
+ cp += strlen(cp) + 1;
+ cp_left -= strlen(cp) + 1;
+ }
+ goto out;
+ }
char *hdr = msg_get_header(req, "X-Apple-Client-Name");
if (hdr) {
@@ -1621,15 +1929,18 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag
resp->respcode = 200;
} else {
resp->respcode = 453;
- debug(1, "Already playing.");
+ debug(1, "Connection %d: ANNOUNCE failed because another connection is already playing.",
+ conn->connection_number);
}
out:
if (resp->respcode != 200 && resp->respcode != 453) {
- debug(1, "Error in handling ANNOUNCE on conversation thread %d. Unlocking the play lock.",
+ debug(1, "Connection %d: Error in handling ANNOUNCE. Unlocking the play lock.",
conn->connection_number);
- playing_conn = NULL;
- pthread_mutex_unlock(&play_lock);
+ debug_mutex_lock(&playing_conn_lock, 1000000, 3); // get it
+ if (playing_conn == conn) // if we managed to acquire it
+ playing_conn = NULL; // let it go
+ debug_mutex_unlock(&playing_conn_lock, 3);
}
}
@@ -1702,21 +2013,21 @@ static void apple_challenge(int fd, rtsp_message *req, rtsp_message *resp) {
if (padding)
*padding = 0;
- msg_add_header(resp, "Apple-Response", encoded);
+ msg_add_header(resp, "Apple-Response", encoded); // will be freed when the response is freed.
free(challresp);
free(encoded);
}
static char *make_nonce(void) {
uint8_t random[8];
- int fd = open("/dev/random", O_RDONLY);
+ int fd = open("/dev/urandom", O_RDONLY);
if (fd < 0)
- die("could not open /dev/random!");
+ die("could not open /dev/urandom!");
// int ignore =
if (read(fd, random, sizeof(random)) != sizeof(random))
- debug(1, "Error reading /dev/random");
+ debug(1, "Error reading /dev/urandom");
close(fd);
- return base64_enc(random, 8);
+ return base64_enc(random, 8); // returns a pointer to malloc'ed memory
}
static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) {
@@ -1760,9 +2071,11 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) {
uint8_t digest_urp[16], digest_mu[16], digest_total[16];
-#ifdef HAVE_LIBSSL
+#ifdef CONFIG_OPENSSL
MD5_CTX ctx;
+ int oldState;
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState);
MD5_Init(&ctx);
MD5_Update(&ctx, username, strlen(username));
MD5_Update(&ctx, ":", 1);
@@ -1775,9 +2088,25 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) {
MD5_Update(&ctx, ":", 1);
MD5_Update(&ctx, uri, strlen(uri));
MD5_Final(digest_mu, &ctx);
+ pthread_setcancelstate(oldState, NULL);
#endif
-#ifdef HAVE_LIBMBEDTLS
+#ifdef CONFIG_MBEDTLS
+#if MBEDTLS_VERSION_MINOR >= 7
+ mbedtls_md5_context tctx;
+ mbedtls_md5_starts_ret(&tctx);
+ mbedtls_md5_update_ret(&tctx, (const unsigned char *)username, strlen(username));
+ mbedtls_md5_update_ret(&tctx, (unsigned char *)":", 1);
+ mbedtls_md5_update_ret(&tctx, (const unsigned char *)realm, strlen(realm));
+ mbedtls_md5_update_ret(&tctx, (unsigned char *)":", 1);
+ mbedtls_md5_update_ret(&tctx, (const unsigned char *)config.password, strlen(config.password));
+ mbedtls_md5_finish_ret(&tctx, digest_urp);
+ mbedtls_md5_starts_ret(&tctx);
+ mbedtls_md5_update_ret(&tctx, (const unsigned char *)req->method, strlen(req->method));
+ mbedtls_md5_update_ret(&tctx, (unsigned char *)":", 1);
+ mbedtls_md5_update_ret(&tctx, (const unsigned char *)uri, strlen(uri));
+ mbedtls_md5_finish_ret(&tctx, digest_mu);
+#else
mbedtls_md5_context tctx;
mbedtls_md5_starts(&tctx);
mbedtls_md5_update(&tctx, (const unsigned char *)username, strlen(username));
@@ -1792,8 +2121,9 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) {
mbedtls_md5_update(&tctx, (const unsigned char *)uri, strlen(uri));
mbedtls_md5_finish(&tctx, digest_mu);
#endif
+#endif
-#ifdef HAVE_LIBPOLARSSL
+#ifdef CONFIG_POLARSSL
md5_context tctx;
md5_starts(&tctx);
md5_update(&tctx, (const unsigned char *)username, strlen(username));
@@ -1814,7 +2144,8 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) {
for (i = 0; i < 16; i++)
snprintf((char *)buf + 2 * i, 3, "%02x", digest_urp[i]);
-#ifdef HAVE_LIBSSL
+#ifdef CONFIG_OPENSSL
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState);
MD5_Init(&ctx);
MD5_Update(&ctx, buf, 32);
MD5_Update(&ctx, ":", 1);
@@ -1824,9 +2155,21 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) {
snprintf((char *)buf + 2 * i, 3, "%02x", digest_mu[i]);
MD5_Update(&ctx, buf, 32);
MD5_Final(digest_total, &ctx);
+ pthread_setcancelstate(oldState, NULL);
#endif
-#ifdef HAVE_LIBMBEDTLS
+#ifdef CONFIG_MBEDTLS
+#if MBEDTLS_VERSION_MINOR >= 7
+ mbedtls_md5_starts_ret(&tctx);
+ mbedtls_md5_update_ret(&tctx, buf, 32);
+ mbedtls_md5_update_ret(&tctx, (unsigned char *)":", 1);
+ mbedtls_md5_update_ret(&tctx, (const unsigned char *)*nonce, strlen(*nonce));
+ mbedtls_md5_update_ret(&tctx, (unsigned char *)":", 1);
+ for (i = 0; i < 16; i++)
+ snprintf((char *)buf + 2 * i, 3, "%02x", digest_mu[i]);
+ mbedtls_md5_update_ret(&tctx, buf, 32);
+ mbedtls_md5_finish_ret(&tctx, digest_total);
+#else
mbedtls_md5_starts(&tctx);
mbedtls_md5_update(&tctx, buf, 32);
mbedtls_md5_update(&tctx, (unsigned char *)":", 1);
@@ -1837,8 +2180,9 @@ static int rtsp_auth(char **nonce, rtsp_message *req, rtsp_message *resp) {
mbedtls_md5_update(&tctx, buf, 32);
mbedtls_md5_finish(&tctx, digest_total);
#endif
+#endif
-#ifdef HAVE_LIBPOLARSSL
+#ifdef CONFIG_POLARSSL
md5_starts(&tctx);
md5_update(&tctx, buf, 32);
md5_update(&tctx, (unsigned char *)":", 1);
@@ -1867,37 +2211,138 @@ authenticate:
return 1;
}
+void rtsp_conversation_thread_cleanup_function(void *arg) {
+ rtsp_conn_info *conn = (rtsp_conn_info *)arg;
+ int oldState;
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState);
+
+ debug(3, "Connection %d: rtsp_conversation_thread_func_cleanup_function called.",
+ conn->connection_number);
+ if (conn->player_thread)
+ player_stop(conn);
+
+ debug(3, "Closing timing, control and audio sockets...");
+ if (conn->control_socket)
+ close(conn->control_socket);
+ if (conn->timing_socket)
+ close(conn->timing_socket);
+ if (conn->audio_socket)
+ close(conn->audio_socket);
+
+ if (conn->fd > 0) {
+ debug(3, "Connection %d: closing fd %d.", conn->connection_number, conn->fd);
+ close(conn->fd);
+ debug(3, "Connection %d: closed fd %d.", conn->connection_number, conn->fd);
+ }
+ if (conn->auth_nonce) {
+ free(conn->auth_nonce);
+ conn->auth_nonce = NULL;
+ }
+ rtp_terminate(conn);
+
+ if (conn->dacp_id) {
+ free(conn->dacp_id);
+ conn->dacp_id = NULL;
+ }
+
+ // remove flow control and mutexes
+ int rc = pthread_mutex_destroy(&conn->volume_control_mutex);
+ if (rc)
+ debug(1, "Connection %d: error %d destroying volume_control_mutex.", conn->connection_number,
+ rc);
+ rc = pthread_cond_destroy(&conn->flowcontrol);
+ if (rc)
+ debug(1, "Connection %d: error %d destroying flow control condition variable.",
+ conn->connection_number, rc);
+ rc = pthread_mutex_destroy(&conn->ab_mutex);
+ if (rc)
+ debug(1, "Connection %d: error %d destroying ab_mutex.", conn->connection_number, rc);
+ rc = pthread_mutex_destroy(&conn->flush_mutex);
+ if (rc)
+ debug(1, "Connection %d: error %d destroying flush_mutex.", conn->connection_number, rc);
+
+ debug(3, "Cancel watchdog thread.");
+ pthread_cancel(conn->player_watchdog_thread);
+ debug(3, "Join watchdog thread.");
+ pthread_join(conn->player_watchdog_thread, NULL);
+ debug(3, "Delete watchdog mutex.");
+ pthread_mutex_destroy(&conn->watchdog_mutex);
+
+ debug(3, "Connection %d: Checking play lock.", conn->connection_number);
+ debug_mutex_lock(&playing_conn_lock, 1000000, 3); // get it
+ if (playing_conn == conn) { // if it's ours
+ debug(3, "Connection %d: Unlocking play lock.", conn->connection_number);
+ playing_conn = NULL; // let it go
+ }
+ debug_mutex_unlock(&playing_conn_lock, 3);
+
+ debug(2, "Connection %d: terminated.", conn->connection_number);
+ conn->running = 0;
+ pthread_setcancelstate(oldState, NULL);
+}
+
+void msg_cleanup_function(void *arg) {
+ // debug(3, "msg_cleanup_function called.");
+ msg_free((rtsp_message **)arg);
+}
+
static void *rtsp_conversation_thread_func(void *pconn) {
rtsp_conn_info *conn = pconn;
- // create the player thread lock.
- int rwli = pthread_rwlock_init(&conn->player_thread_lock, NULL);
- if (rwli != 0)
- die("Error %d initialising player_thread_lock for conversation thread %d.", rwli,
- conn->connection_number);
+ // create the watchdog mutex, initialise the watchdog time and start the watchdog thread;
+ conn->watchdog_bark_time = get_absolute_time_in_fp();
+ pthread_mutex_init(&conn->watchdog_mutex, NULL);
+ pthread_create(&conn->player_watchdog_thread, NULL, &player_watchdog_thread_code, (void *)conn);
+
+ int rc = pthread_mutex_init(&conn->flush_mutex, NULL);
+ if (rc)
+ die("Connection %d: error %d initialising flush_mutex.", conn->connection_number, rc);
+ rc = pthread_mutex_init(&conn->ab_mutex, NULL);
+ if (rc)
+ die("Connection %d: error %d initialising ab_mutex.", conn->connection_number, rc);
+// set the flowcontrol condition variable to wait on a monotonic clock
+#ifdef COMPILE_FOR_LINUX_AND_FREEBSD_AND_CYGWIN_AND_OPENBSD
+ pthread_condattr_t attr;
+ pthread_condattr_init(&attr);
+ pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); // can't do this in OS X, and don't need it.
+ rc = pthread_cond_init(&conn->flowcontrol, &attr);
+#endif
+#ifdef COMPILE_FOR_OSX
+ rc = pthread_cond_init(&conn->flowcontrol, NULL);
+#endif
+ if (rc)
+ die("Connection %d: error %d initialising flow control condition variable.",
+ conn->connection_number, rc);
+ rc = pthread_mutex_init(&conn->volume_control_mutex, NULL);
+ if (rc)
+ die("Connection %d: error %d initialising volume_control_mutex.", conn->connection_number, rc);
- rtp_initialise(conn);
+ // nothing before this is cancellable
+ pthread_cleanup_push(rtsp_conversation_thread_cleanup_function, (void *)conn);
- rtsp_message *req, *resp;
- char *hdr, *auth_nonce = NULL;
+ rtp_initialise(conn);
+ char *hdr = NULL;
enum rtsp_read_request_response reply;
int rtsp_read_request_attempt_count = 1; // 1 means exit immediately
+ rtsp_message *req, *resp;
while (conn->stop == 0) {
int debug_level = 3; // for printing the request and response
reply = rtsp_read_request(conn, &req);
if (reply == rtsp_read_request_response_ok) {
+ pthread_cleanup_push(msg_cleanup_function, (void *)&req);
+ resp = msg_init();
+ pthread_cleanup_push(msg_cleanup_function, (void *)&resp);
+ resp->respcode = 400;
+
if (strcmp(req->method, "OPTIONS") !=
0) // the options message is very common, so don't log it until level 3
debug_level = 2;
- debug(debug_level,
- "RTSP thread %d received an RTSP Packet of type \"%s\":", conn->connection_number,
- req->method),
+ debug(debug_level, "Connection %d: Received an RTSP Packet of type \"%s\":",
+ conn->connection_number, req->method),
debug_print_msg_headers(debug_level, req);
- resp = msg_init();
- resp->respcode = 400;
apple_challenge(conn->fd, req, resp);
hdr = msg_get_header(req, "CSeq");
@@ -1906,7 +2351,7 @@ static void *rtsp_conversation_thread_func(void *pconn) {
// msg_add_header(resp, "Audio-Jack-Status", "connected; type=analog");
msg_add_header(resp, "Server", "AirTunes/105.1");
- if ((conn->authorized == 1) || (rtsp_auth(&auth_nonce, req, resp)) == 0) {
+ if ((conn->authorized == 1) || (rtsp_auth(&conn->auth_nonce, req, resp)) == 0) {
conn->authorized = 1; // it must have been authorized or didn't need a password
struct method_handler *mh;
int method_selected = 0;
@@ -1917,12 +2362,32 @@ static void *rtsp_conversation_thread_func(void *pconn) {
break;
}
}
- if (method_selected == 0)
- debug(1, "RTSP thread %d: Unrecognised and unhandled rtsp request \"%s\".",
+ if (method_selected == 0) {
+ debug(3, "Connection %d: Unrecognised and unhandled rtsp request \"%s\".",
conn->connection_number, req->method);
+
+ int y = req->contentlength;
+ if (y > 0) {
+ char obf[4096];
+ if (y > 4096)
+ y = 4096;
+ char *p = req->content;
+ char *obfp = obf;
+ int obfc;
+ for (obfc = 0; obfc < y; obfc++) {
+ snprintf(obfp, 3, "%02X", (unsigned int)*p);
+ p++;
+ obfp += 2;
+ };
+ *obfp = 0;
+ debug(3, "Content: \"%s\".", obf);
+ }
+ }
}
- debug(debug_level, "RTSP thread %d: RTSP Response:", conn->connection_number);
+ debug(debug_level, "Connection %d: RTSP Response:", conn->connection_number);
debug_print_msg_headers(debug_level, resp);
+
+ /*
fd_set writefds;
FD_ZERO(&writefds);
FD_SET(conn->fd, &writefds);
@@ -1930,11 +2395,27 @@ static void *rtsp_conversation_thread_func(void *pconn) {
memory_barrier();
} while (conn->stop == 0 &&
pselect(conn->fd + 1, NULL, &writefds, NULL, NULL, &pselect_sigset) <= 0);
+ */
+
if (conn->stop == 0) {
- msg_write_response(conn->fd, resp);
+ int err = msg_write_response(conn->fd, resp);
+ if (err) {
+ debug(1, "Connection %d: Unable to write an RTSP message response. Terminating the "
+ "connection.",
+ conn->connection_number);
+ struct linger so_linger;
+ so_linger.l_onoff = 1; // "true"
+ so_linger.l_linger = 0;
+ err = setsockopt(conn->fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger);
+ if (err)
+ debug(1, "Could not set the RTSP socket to abort due to a write error on closing.");
+ conn->stop = 1;
+ // if (debuglev >= 1)
+ // debuglev = 3; // see what happens next
+ }
}
- msg_free(req);
- msg_free(resp);
+ pthread_cleanup_pop(1);
+ pthread_cleanup_pop(1);
} else {
int tstop = 0;
if (reply == rtsp_read_request_response_immediate_shutdown_requested)
@@ -1943,58 +2424,41 @@ static void *rtsp_conversation_thread_func(void *pconn) {
(reply == rtsp_read_request_response_read_error)) {
if (conn->player_thread) {
rtsp_read_request_attempt_count--;
- if (rtsp_read_request_attempt_count == 0)
+ if (rtsp_read_request_attempt_count == 0) {
tstop = 1;
- else {
+ if (reply == rtsp_read_request_response_read_error) {
+ struct linger so_linger;
+ so_linger.l_onoff = 1; // "true"
+ so_linger.l_linger = 0;
+ int err = setsockopt(conn->fd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof so_linger);
+ if (err)
+ debug(1, "Could not set the RTSP socket to abort due to a read error on closing.");
+ }
+ // debuglev = 3; // see what happens next
+ } else {
if (reply == rtsp_read_request_response_channel_closed)
- debug(2, "RTSP channel unexpectedly closed -- will try again %d time(s).",
- rtsp_read_request_attempt_count);
+ debug(2,
+ "Connection %d: RTSP channel unexpectedly closed -- will try again %d time(s).",
+ conn->connection_number, rtsp_read_request_attempt_count);
if (reply == rtsp_read_request_response_read_error)
- debug(2, "RTSP channel read error -- will try again %d time(s).",
- rtsp_read_request_attempt_count);
+ debug(2, "Connection %d: RTSP channel read error -- will try again %d time(s).",
+ conn->connection_number, rtsp_read_request_attempt_count);
usleep(20000);
}
} else {
tstop = 1;
}
} else {
- debug(1, "rtsp_read_request error %d, packet ignored.", (int)reply);
+ debug(1, "Connection %d: rtsp_read_request error %d, packet ignored.",
+ conn->connection_number, (int)reply);
}
if (tstop) {
- debug(3, "Synchronously terminate playing thread of RTSP conversation thread %d.",
- conn->connection_number);
-
- if (conn->player_thread)
- debug(1, "RTSP Channel unexpectedly closed or a serious error occured -- closing the "
- "player thread.");
- player_stop(conn);
- debug(3, "Successful termination of playing thread of RTSP conversation thread %d.",
- conn->connection_number);
- debug(3, "Request termination of RTSP conversation thread %d.", conn->connection_number);
+ debug(3, "Connection %d: Terminate RTSP connection.", conn->connection_number);
conn->stop = 1;
}
}
}
-
- if (conn->fd > 0)
- close(conn->fd);
- if (auth_nonce)
- free(auth_nonce);
- rtp_terminate(conn);
- if (playing_conn == conn) {
- debug(3, "Unlocking play lock on RTSP conversation thread %d.", conn->connection_number);
- playing_conn = NULL;
- pthread_mutex_unlock(&play_lock);
- }
- debug(2, "Connection %d: RTSP thread terminated.", conn->connection_number);
- conn->running = 0;
-
- // release the player_thread_lock
- int rwld = pthread_rwlock_destroy(&conn->player_thread_lock);
- if (rwld)
- debug(1, "Error %d destroying player_thread_lock for conversation thread %d.", rwld,
- conn->connection_number);
-
+ pthread_cleanup_pop(1);
pthread_exit(NULL);
}
@@ -2017,7 +2481,21 @@ static const char *format_address(struct sockaddr *fsa) {
}
*/
+void rtsp_listen_loop_cleanup_handler(__attribute__((unused)) void *arg) {
+ int oldState;
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState);
+ debug(1, "rtsp_listen_loop_cleanup_handler called.");
+ cancel_all_RTSP_threads();
+ int *sockfd = (int *)arg;
+ mdns_unregister();
+ if (sockfd)
+ free(sockfd);
+ pthread_setcancelstate(oldState, NULL);
+}
+
void rtsp_listen_loop(void) {
+ int oldState;
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState);
struct addrinfo hints, *info, *p;
char portstr[6];
int *sockfd = NULL;
@@ -2059,6 +2537,18 @@ void rtsp_listen_loop(void) {
fcntl(fd, F_SETFD, FD_CLOEXEC);
ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
+ struct timeval tv;
+ tv.tv_sec = 3; // three seconds write timeout
+ tv.tv_usec = 0;
+ if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (const char *)&tv, sizeof tv) == -1)
+ debug(1, "Error %d setting send timeout for rtsp writeback.", errno);
+
+ if ((config.dont_check_timeout == 0) && (config.timeout != 0)) {
+ tv.tv_sec = config.timeout; // 120 seconds read timeout by default.
+ tv.tv_usec = 0;
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof tv) == -1)
+ debug(1, "Error %d setting read timeout for rtsp connection.", errno);
+ }
#ifdef IPV6_V6ONLY
// some systems don't support v4 access on v6 sockets, but some do.
// since we need to account for two sockets we might as well
@@ -2110,12 +2600,12 @@ void rtsp_listen_loop(void) {
mdns_register();
- // printf("Listening for connections.");
- // shairport_startup_complete();
-
+ pthread_setcancelstate(oldState, NULL);
int acceptfd;
struct timeval tv;
+ pthread_cleanup_push(rtsp_listen_loop_cleanup_handler, (void *)sockfd);
do {
+ pthread_testcancel();
tv.tv_sec = 60;
tv.tv_usec = 0;
@@ -2150,7 +2640,8 @@ void rtsp_listen_loop(void) {
conn->fd = accept(acceptfd, (struct sockaddr *)&conn->remote, &slen);
if (conn->fd < 0) {
- debug(1, "New RTSP connection on port %d not accepted:", config.port);
+ debug(1, "Connection %d: New connection on port %d not accepted:", conn->connection_number,
+ config.port);
perror("failed to accept connection");
free(conn);
} else {
@@ -2169,8 +2660,8 @@ void rtsp_listen_loop(void) {
sa = (struct sockaddr_in *)&conn->remote;
inet_ntop(AF_INET, &(sa->sin_addr), remote_ip4, INET_ADDRSTRLEN);
unsigned short int rport = ntohs(sa->sin_port);
- debug(2, "New RTSP connection from %s:%u to self at %s:%u on conversation thread %d.",
- remote_ip4, rport, ip4, tport, conn->connection_number);
+ debug(2, "Connection %d: new connection from %s:%u to self at %s:%u.",
+ conn->connection_number, remote_ip4, rport, ip4, tport);
}
#ifdef AF_INET6
if (local_info->SAFAMILY == AF_INET6) {
@@ -2186,8 +2677,8 @@ void rtsp_listen_loop(void) {
sa6 = (struct sockaddr_in6 *)&conn->remote; // pretend this is loaded with something
inet_ntop(AF_INET6, &(sa6->sin6_addr), remote_ip6, INET6_ADDRSTRLEN);
u_int16_t rport = ntohs(sa6->sin6_port);
- debug(2, "New RTSP connection from [%s]:%u to self at [%s]:%u on conversation thread %d.",
- remote_ip6, rport, ip6, tport, conn->connection_number);
+ debug(2, "Connection %d: new connection from [%s]:%u to self at [%s]:%u.",
+ conn->connection_number, remote_ip6, rport, ip6, tport);
}
#endif
@@ -2199,23 +2690,22 @@ void rtsp_listen_loop(void) {
// conn->thread = rtsp_conversation_thread;
// conn->stop = 0; // record's memory has been zeroed
// conn->authorized = 0; // record's memory has been zeroed
- fcntl(conn->fd, F_SETFL, O_NONBLOCK);
+ // fcntl(conn->fd, F_SETFL, O_NONBLOCK);
ret = pthread_create(&conn->thread, NULL, rtsp_conversation_thread_func,
conn); // also acts as a memory barrier
- if (ret)
- die("Failed to create RTSP receiver thread %d!", conn->connection_number);
+ if (ret) {
+ char errorstring[1024];
+ strerror_r(ret, (char *)errorstring, sizeof(errorstring));
+ die("Connection %d: cannot create an RTSP conversation thread. Error %d: \"%s\".",
+ conn->connection_number, ret, (char *)errorstring);
+ }
debug(3, "Successfully created RTSP receiver thread %d.", conn->connection_number);
conn->running = 1; // this must happen before the thread is tracked
track_thread(conn);
}
} while (1);
- mdns_unregister();
-
- if (sockfd)
- free(sockfd);
-
- // perror("select");
- // die("fell out of the RTSP select loop");
+ pthread_cleanup_pop(1); // should never happen
+ debug(1, "Oops -- fell out of the RTSP select loop");
}