// Copyright (C) 1999-2005 Open Source Telecom Corporation. // Copyright (C) 2006-2010 David Sugar, Tycho Softworks. // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. // // As a special exception, you may use this file as part of a free software // library without restriction. Specifically, if other files instantiate // templates or use macros or inline functions from this file, or you compile // this file and link it with other files to produce an executable, this // file does not by itself cause the resulting executable to be covered by // the GNU General Public License. This exception does not however // invalidate any other reasons why the executable file might be covered by // the GNU General Public License. // // This exception applies only to the code released under the name GNU // ccRTP. If you copy code from other releases into a copy of GNU // ccRTP, as the General Public License permits, the exception does // not apply to the code that you add in this way. To avoid misleading // anyone as to the status of such modified files, you must delete // this exception notice from them. // // If you write modifications of your own for GNU ccRTP, it is your choice // whether to permit this exception to apply to your modifications. // If you do not wish that, delete this exception notice. // /** * @file control.cpp * * @short QueueRTCPManager classes implementation. **/ #include "private.h" #include #include #include #include NAMESPACE_COMMONCPP const uint16 QueueRTCPManager::TIMEOUT_MULTIPLIER = 5; const double QueueRTCPManager::RECONSIDERATION_COMPENSATION = 2.718281828 - 1.5; const SDESItemType QueueRTCPManager::firstSchedulable = SDESItemTypeNAME; const SDESItemType QueueRTCPManager::lastSchedulable = SDESItemTypePRIV; /// maximum end to end delay: unlimited const microtimeout_t QueueRTCPManager::defaultEnd2EndDelay = 0; QueueRTCPManager::QueueRTCPManager(uint32 size, RTPApplication& app): RTPDataQueue(size), RTCPCompoundHandler(RTCPCompoundHandler::defaultPathMTU), queueApplication(app), srtcpIndex(0) { controlServiceActive = false; controlBwFract = 0.05f; sendControlBwFract = 0.25; recvControlBwFract = 1-sendControlBwFract; ctrlSendCount = 0; lowerHeadersSize = networkHeaderSize() + transportHeaderSize(); nextScheduledSDESItem = SDESItemTypeNAME; // initialize RTCP timing reconsInfo.rtcpTp.tv_sec = reconsInfo.rtcpTc.tv_sec = reconsInfo.rtcpTn.tv_sec = 0; reconsInfo.rtcpTp.tv_usec = reconsInfo.rtcpTc.tv_usec = reconsInfo.rtcpTn.tv_usec = 0; reconsInfo.rtcpPMembers = 1; rtcpWeSent = false; rtcpAvgSize = sizeof(RTCPFixedHeader) + sizeof(uint32) + sizeof(SenderInfo); rtcpInitial = true; // force an initial check for incoming RTCP packets SysTime::gettimeofday(&rtcpNextCheck,NULL); // check for incoming RTCP packets every 1/4 seconds. rtcpCheckInterval.tv_sec = 0; rtcpCheckInterval.tv_usec = 250000; timersub(&rtcpNextCheck,&rtcpCheckInterval,&rtcpLastCheck); lastSendPacketCount = 0; rtcpMinInterval = 5000000; // 5 seconds. leavingDelay = 1000000; // 1 second end2EndDelay = getDefaultEnd2EndDelay(); // Fill in fixed fields that will never change RTCPPacket* pkt = reinterpret_cast(rtcpSendBuffer); pkt->fh.version = CCRTP_VERSION; // (SSRCCollision will have to take this into account) pkt->info.SR.ssrc = getLocalSSRCNetwork(); // allow to start RTCP service once everything is set up controlServiceActive = true; } // TODO Streamline this code (same as above, put into a separate method) QueueRTCPManager::QueueRTCPManager(uint32 ssrc, uint32 size, RTPApplication& app): RTPDataQueue(&ssrc, size), RTCPCompoundHandler(RTCPCompoundHandler::defaultPathMTU), queueApplication(app), srtcpIndex(0) { controlServiceActive = false; controlBwFract = 0.05f; sendControlBwFract = 0.25; recvControlBwFract = 1-sendControlBwFract; ctrlSendCount = 0; lowerHeadersSize = networkHeaderSize() + transportHeaderSize(); nextScheduledSDESItem = SDESItemTypeNAME; // initialize RTCP timing reconsInfo.rtcpTp.tv_sec = reconsInfo.rtcpTc.tv_sec = reconsInfo.rtcpTn.tv_sec = 0; reconsInfo.rtcpTp.tv_usec = reconsInfo.rtcpTc.tv_usec = reconsInfo.rtcpTn.tv_usec = 0; reconsInfo.rtcpPMembers = 1; rtcpWeSent = false; rtcpAvgSize = sizeof(RTCPFixedHeader) + sizeof(uint32) + sizeof(SenderInfo); rtcpInitial = true; // force an initial check for incoming RTCP packets SysTime::gettimeofday(&rtcpNextCheck,NULL); // check for incoming RTCP packets every 1/4 seconds. rtcpCheckInterval.tv_sec = 0; rtcpCheckInterval.tv_usec = 250000; timersub(&rtcpNextCheck,&rtcpCheckInterval,&rtcpLastCheck); lastSendPacketCount = 0; rtcpMinInterval = 5000000; // 5 seconds. leavingDelay = 1000000; // 1 second end2EndDelay = getDefaultEnd2EndDelay(); // Fill in fixed fields that will never change RTCPPacket* pkt = reinterpret_cast(rtcpSendBuffer); pkt->fh.version = CCRTP_VERSION; // (SSRCCollision will have to take this into account) pkt->info.SR.ssrc = getLocalSSRCNetwork(); // allow to start RTCP service once everything is set up controlServiceActive = true; } QueueRTCPManager::~QueueRTCPManager() { endQueueRTCPManager(); } void QueueRTCPManager::endQueueRTCPManager() { controlServiceActive = false; controlBwFract = sendControlBwFract = 0; removeOutQueueCryptoContextCtrl(NULL); // remove the outgoing crypto context removeInQueueCryptoContextCtrl(NULL); // Remove any incoming crypto contexts } bool QueueRTCPManager::checkSSRCInRTCPPkt(SyncSourceLink& sourceLink, bool is_new, InetAddress& network_address, tpport_t transport_port) { bool result = true; // Test if the source is new and it is not the local one. if ( is_new && sourceLink.getSource()->getID() != getLocalSSRC() ) return result; SyncSource *s = sourceLink.getSource(); if ( s->getControlTransportPort() != transport_port || s->getNetworkAddress() != network_address ) { // SSRC collision or a loop has happened if ( s->getID() != getLocalSSRC() ) { // TODO: Optional error counter. // Note this differs from the default in the RFC. // Discard packet only when the collision is // repeating (to avoid flip-flopping) if ( sourceLink.getPrevConflict() && ( (network_address == sourceLink.getPrevConflict()->networkAddress) && (transport_port == sourceLink.getPrevConflict()->controlTransportPort) ) ) { // discard packet and do not flip-flop result = false; } else { // Record who has collided so that in // the future we can how if the // collision repeats. sourceLink.setPrevConflict(network_address, 0,transport_port); // Change sync source transport address setControlTransportPort(*s,transport_port); setNetworkAddress(*s,network_address); } } else { // Collision or loop of own packets. ConflictingTransportAddress* conflicting = searchControlConflict(network_address, transport_port); if ( conflicting ) { // Optional error counter. updateConflict(*conflicting); result = false; } else { // New collision addConflict(s->getNetworkAddress(), s->getDataTransportPort(), s->getControlTransportPort()); dispatchBYE("SSRC collision detected when receiving RTCP packet"); renewLocalSSRC(); setNetworkAddress(*s,network_address); setControlTransportPort(*s,transport_port); setControlTransportPort(*s,0); sourceLink.initStats(); } } } return result; } void QueueRTCPManager::controlReceptionService() { if ( !controlServiceActive ) return; // A) see if there are incoming RTCP packets SysTime::gettimeofday(&(reconsInfo.rtcpTc),NULL); if ( timercmp(&(reconsInfo.rtcpTc),&rtcpNextCheck,>=) ) { while ( isPendingControl(0) ) takeInControlPacket(); // If this do loops more than once, then we have not // been in time. So it skips until the next future // instant. do { timeval tmp = rtcpNextCheck; timeradd(&rtcpLastCheck,&rtcpCheckInterval, &rtcpNextCheck); rtcpLastCheck = tmp; } while ( timercmp(&(reconsInfo.rtcpTc), &(rtcpNextCheck), >=) ); } } void QueueRTCPManager::controlTransmissionService() { if ( !controlServiceActive ) return; // B) send RTCP packets SysTime::gettimeofday(&(reconsInfo.rtcpTc),NULL); if ( timercmp(&(reconsInfo.rtcpTc),&(reconsInfo.rtcpTn),>=) ) { if ( timerReconsideration() ) { // this would update to last received RTCP packets //while ( isPendingControl(0) ) // takeInControlPacket(); rtcpLastCheck = reconsInfo.rtcpTc; dispatchControlPacket(); if (rtcpInitial) rtcpInitial = false; expireSSRCs(); reconsInfo.rtcpTp = reconsInfo.rtcpTc; // we have updated tp and sent a report, so we // have to recalculate the sending interval timeval T = computeRTCPInterval(); timeradd(&(reconsInfo.rtcpTc),&T,&(reconsInfo.rtcpTn)); // record current number of members for the // next check. reconsInfo.rtcpPMembers = getMembersCount(); } } } bool QueueRTCPManager::timerReconsideration() { bool result = false; // compute again the interval to confirm it under current // circumstances timeval T = computeRTCPInterval(); timeradd(&(reconsInfo.rtcpTp),&T,&(reconsInfo.rtcpTn)); SysTime::gettimeofday(&(reconsInfo.rtcpTc),NULL); if ( timercmp(&(reconsInfo.rtcpTc),&(reconsInfo.rtcpTn),>=) ) { reconsInfo.rtcpTp = reconsInfo.rtcpTc; result = true; } return result; } void QueueRTCPManager::expireSSRCs() {} void QueueRTCPManager::takeInControlPacket() { size_t len = 0; InetHostAddress network_address; tpport_t transport_port; len = recvControl(rtcpRecvBuffer,getPathMTU(),network_address, transport_port); // get time of arrival struct timeval recvtime; SysTime::gettimeofday(&recvtime,NULL); // process a 'len' octets long RTCP compound packet RTCPPacket *pkt = reinterpret_cast(rtcpRecvBuffer); CryptoContextCtrl* pcc = getInQueueCryptoContextCtrl(pkt->getSSRC()); if (pcc == NULL) { pcc = getInQueueCryptoContextCtrl(0); if (pcc != NULL) { pcc = pcc->newCryptoContextForSSRC(pkt->getSSRC()); if (pcc != NULL) { pcc->deriveSrtcpKeys(); setInQueueCryptoContextCtrl(pcc); } } } // If no crypto context: then SRTP/SRTCP is off // If crypto context is available then unprotect data here. If an error // occurs report the error and discard the packet. if (pcc != NULL) { int32 ret; if ((ret = unprotect(rtcpRecvBuffer, len, pcc)) < 0) { // TODO: do more error handling? return; } len = ret; // adjust length after unprotecting the packet } // Check validity of the header fields of the compound packet if ( !RTCPCompoundHandler::checkCompoundRTCPHeader(len) ) return; // TODO: for now, we do nothing with the padding bit // in the header. bool source_created; SyncSourceLink* sourceLink = getSourceBySSRC(pkt->getSSRC(),source_created); SyncSource* s = sourceLink->getSource(); if ( source_created ) { // Set control transport address. setControlTransportPort(*s,transport_port); // Network address is assumed to be the same as the control one setNetworkAddress(*s,network_address); sourceLink->initStats(); sourceLink->setProbation(getMinValidPacketSequence()); if ( sourceLink->getHello() ) onNewSyncSource(*s); } else if ( s->getControlTransportPort() == 0 ) { // Test if RTP data packets had been received but this // is the first control packet from this source. setControlTransportPort(*s,transport_port); } // record reception time sourceLink->lastRTCPPacketTime = recvtime; sourceLink->lastRTCPSRTime = recvtime; size_t pointer = 0; // Check the first packet is a report and do special // processing for SR reports. if ( RTCPPacket::tRR == pkt->fh.type ) { // no special initialization is required for // RR reports, all reports will be processed // in the do-while down here. } else if ( RTCPPacket::tSR == pkt->fh.type ){ if ( checkSSRCInRTCPPkt(*sourceLink,source_created, network_address, transport_port) ) sourceLink->lastRTCPSRTime = recvtime; onGotSR(*s,pkt->info.SR,pkt->fh.block_count); // Advance to the next packet in the compound. pointer += pkt->getLength(); pkt = reinterpret_cast(rtcpRecvBuffer +pointer); } else if ( RTCPPacket::tXR == pkt->fh.type ) { // TODO: handle XR reports. } else { // Ignore RTCP types unknown. } // Process all RR reports. while ( (pointer < len) && (RTCPPacket::tRR == pkt->fh.type) ) { sourceLink = getSourceBySSRC(pkt->getSSRC(), source_created); if ( checkSSRCInRTCPPkt(*sourceLink,source_created, network_address,transport_port) ) onGotRR(*s,pkt->info.RR,pkt->fh.block_count); // Advance to the next packet in the compound pointer += pkt->getLength(); pkt = reinterpret_cast(rtcpRecvBuffer +pointer); } // SDES, APP and BYE. process first everything but the // BYE packets. bool cname_found = false; while ( (pointer < len ) && (pkt->fh.type == RTCPPacket::tSDES || pkt->fh.type == RTCPPacket::tAPP) ) { I ( cname_found || !pkt->fh.padding ); sourceLink = getSourceBySSRC(pkt->getSSRC(), source_created); if ( checkSSRCInRTCPPkt(*sourceLink,source_created, network_address, transport_port) ) { if ( pkt->fh.type == RTCPPacket::tSDES ) { bool cname = onGotSDES(*s,*pkt); cname_found = cname_found? cname_found : cname; } else if ( pkt->fh.type == RTCPPacket::tAPP ) { onGotAPP(*s,pkt->info.APP,pkt->getLength()); // pointer += pkt->getLength(); } else { // error? } } // Get the next packet in the compound. pointer += pkt->getLength(); pkt = reinterpret_cast(rtcpRecvBuffer +pointer); } // TODO: error? if !cname_found // process BYE packets while ( pointer < len ) { if ( pkt->fh.type == RTCPPacket::tBYE ) { sourceLink = getSourceBySSRC(pkt->getSSRC(), source_created); if ( checkSSRCInRTCPPkt(*sourceLink,source_created, network_address, transport_port) ) getBYE(*pkt,pointer,len); } else if ( pkt->fh.type != RTCPPacket::tBYE ) { break; // TODO: check non-BYE out of place. } else { break; } } // Call plug-in in case there are profile extensions // at the end of the SR/RR. if ( pointer != len ) { onGotRRSRExtension(rtcpRecvBuffer + pointer, len - pointer); } // Everything went right, update the RTCP average size updateAvgRTCPSize(len); } bool QueueRTCPManager::end2EndDelayed(IncomingRTPPktLink& pl) { bool result = false; if ( 0 != getEnd2EndDelay() ) { SyncSourceLink* sl = pl.getSourceLink(); void* si = sl->getSenderInfo(); if ( NULL != si ) { RTCPSenderInfo rsi(si); uint32 tsInc = pl.getPacket()->getTimestamp() - rsi.getRTPTimestamp(); // approx. microtimeout_t Inc = tsInc * 1000 / (getCurrentRTPClockRate() / 1000); timeval timevalInc = microtimeout2Timeval(Inc); timeval tNTP = NTP2Timeval(rsi.getNTPTimestampInt(), rsi.getNTPTimestampFrac()); timeval packetTime; timeradd(&tNTP,&timevalInc,&packetTime); timeval now, diff; SysTime::gettimeofday(&now,NULL); timersub(&now,&packetTime,&diff); if ( timeval2microtimeout(diff) > getEnd2EndDelay() ) result = true; } } return result; } void QueueRTCPManager::onGotSR(SyncSource& source, SendReport& SR, uint8) { // We ignore the receiver blocks and just get the sender info // at the beginning of the SR. getLink(source)->setSenderInfo(reinterpret_cast(&(SR.sinfo))); } void QueueRTCPManager::onGotRR(SyncSource& source, RecvReport& RR, uint8 blocks) { for ( uint8 i = 0; i < blocks; i++) { // this generic RTCP manager ignores reports about // other sources than the local one if ( getLocalSSRCNetwork() == RR.ssrc ) { getLink(source)-> setReceiverInfo (reinterpret_cast(&(RR.blocks[i].rinfo))); } } } void QueueRTCPManager::updateAvgRTCPSize(size_t len) { size_t newlen = len; newlen += lowerHeadersSize; rtcpAvgSize = (uint16)(( (15 * rtcpAvgSize) >> 4 ) + ( newlen >> 4)); } bool QueueRTCPManager::getBYE(RTCPPacket& pkt, size_t& pointer, size_t) { if ( 0 == pkt.fh.block_count ) return false; char *reason = NULL; if ( (sizeof(RTCPFixedHeader) + pkt.fh.block_count * sizeof(uint32)) < pkt.getLength() ) { uint16 endpointer = (uint16)(pointer + sizeof(RTCPFixedHeader) + pkt.fh.block_count * sizeof(uint32)); uint16 len = rtcpRecvBuffer[endpointer]; reason = new char[len + 1]; memcpy(reason,rtcpRecvBuffer + endpointer + 1,len); reason[len] = '\0'; } else { // avoid dangerous conversion of NULL to a C++ string. reason = new char[1]; reason[0] = '\0'; } int i = 0; while ( i < pkt.fh.block_count ) { bool created; SyncSourceLink* srcLink = getSourceBySSRC(pkt.getSSRC(),created); i++; if( srcLink->getGoodbye() ) onGotGoodbye(*(srcLink->getSource()),reason); BYESource(pkt.getSSRC()); setState(*(srcLink->getSource()),SyncSource::stateLeaving); reverseReconsideration(); } delete [] reason; pointer += pkt.getLength(); return true; } void QueueRTCPManager::reverseReconsideration() { if ( getMembersCount() < reconsInfo.rtcpPMembers ) { timeval inc; // reconsider reconsInfo.rtcpTn (time for next RTCP packet) microtimeout_t t = (reconsInfo.rtcpTn.tv_sec - reconsInfo.rtcpTc.tv_sec) * 1000000 + (reconsInfo.rtcpTn.tv_usec - reconsInfo.rtcpTc.tv_usec); t *= getMembersCount(); t /= reconsInfo.rtcpPMembers; inc.tv_usec = t % 1000000; inc.tv_sec = t / 1000000; timeradd(&(reconsInfo.rtcpTc),&inc,&(reconsInfo.rtcpTn)); // reconsider tp (time for previous RTCP packet) t = (reconsInfo.rtcpTc.tv_sec - reconsInfo.rtcpTp.tv_sec) * 1000000 + (reconsInfo.rtcpTc.tv_usec - reconsInfo.rtcpTp.tv_usec); t *= getMembersCount(); t /= reconsInfo.rtcpPMembers; inc.tv_usec = t % 1000000; inc.tv_sec = t / 1000000; timeradd(&(reconsInfo.rtcpTc),&inc,&(reconsInfo.rtcpTp)); } reconsInfo.rtcpPMembers = getMembersCount(); } bool QueueRTCPManager::onGotSDES(SyncSource& source, RTCPPacket& pkt) { // Take into account that length fields in SDES items are // 8-bit long, so no ntoh[s|l] is required bool cname_found = false; std::ptrdiff_t pointer = reinterpret_cast(&pkt) - rtcpRecvBuffer; uint16 i = 0; do { size_t len = pkt.getLength(); pointer += sizeof(RTCPFixedHeader); SDESChunk* chunk = (SDESChunk*)(rtcpRecvBuffer + pointer); bool source_created = false; // TODO: avoid searching again the source of the first chunk. SyncSourceLink* sourceLink = getSourceBySSRC(chunk->getSSRC(), source_created); // TODO: check that there are no two chunks with the // same SSRC but different CNAME SyncSource& src = *( sourceLink->getSource() ); if ( onGotSDESChunk(source,*chunk,len) ) cname_found = true; pointer +=len; if( sourceLink->getHello() ) onNewSyncSource(src); i++; } while ( i < pkt.fh.block_count ); return cname_found; } bool QueueRTCPManager::onGotSDESChunk(SyncSource& source, SDESChunk& chunk, size_t len) { bool cname_found = false; bool end = false; SyncSourceLink* srcLink = getLink(source); Participant* part = source.getParticipant(); size_t pointer = sizeof(chunk.ssrc); // process chunk items while ( (pointer < len) && !end ) { SDESItem* item = reinterpret_cast(size_t(&(chunk)) + pointer); if ( item->type > SDESItemTypeEND && item->type <= SDESItemTypeLast) { pointer += sizeof(item->type) + sizeof(item->len) + item->len; if ( NULL == part && SDESItemTypeCNAME == item->type ) { const RTPApplication& app = getApplication(); std::string cname = std::string(item->data,item->len); const Participant* p = app.getParticipant(cname); if ( p ) { part = const_cast(p); setParticipant(*(srcLink->getSource()),*part); } else { part = new Participant("-"); addParticipant(const_cast(getApplication()),*part); } setParticipant(*(srcLink->getSource()),*part); } // support for CNAME updates if ( part ) setSDESItem(part,(SDESItemType)item->type, item->data,item->len); if ( item->type == SDESItemTypeCNAME) { cname_found = true; // note that CNAME must be send in // every RTCP compound, so we only // trust sources that include it. setState(*(srcLink->getSource()), SyncSource::stateActive); } } else if ( item->type == SDESItemTypeEND) { end = true; pointer++; pointer += (pointer & 0x03); // padding } else if ( item->type == SDESItemTypePRIV ) { std::ptrdiff_t prevpointer = pointer; uint8 plength = *( &(item->len) + 1 ); pointer += sizeof(item->type) + sizeof(item->len) + 1; if ( part ) setSDESItem(part,SDESItemTypePRIV, reinterpret_cast(item + pointer),plength); pointer += plength; setPRIVPrefix(part, reinterpret_cast(item + pointer), (item->len - 1 - plength)); pointer = prevpointer + item->len; } else { pointer++; // TODO: error: SDES unknown I( false ); } } return cname_found; } timeval QueueRTCPManager::computeRTCPInterval() { float bwfract = controlBwFract * getSessionBandwidth(); uint32 participants = getMembersCount(); if ( getSendersCount() > 0 && ( getSendersCount() < (getMembersCount() * sendControlBwFract) )) { // reserve "sendControlBwFract" fraction of the total // RTCP bandwith for senders. if (rtcpWeSent) { // we take the side of active senders bwfract *= sendControlBwFract; participants = getSendersCount(); } else { // we take the side of passive receivers bwfract *= recvControlBwFract; participants = getMembersCount() - getSendersCount(); } } microtimeout_t min_interval = rtcpMinInterval; // be a bit quicker at first if ( rtcpInitial ) min_interval /= 2; // this is the real computation: microtimeout_t interval = 0; if ( bwfract != 0 ) { interval = static_cast ((participants * rtcpAvgSize / bwfract) * 1000000); if ( interval < rtcpMinInterval ) interval = rtcpMinInterval; } else { // 100 seconds instead of infinite interval = 100000000; } interval = static_cast(interval * ( 0.5 + (rand() / (RAND_MAX + 1.0)))); timeval result; result.tv_sec = interval / 1000000; result.tv_usec = interval % 1000000; return result; } #define BYE_BUFFER_LENGTH 500 size_t QueueRTCPManager::dispatchBYE(const std::string& reason) { // for this method, see section 6.3.7 in RFC 3550 // never send a BYE packet if never sent an RTP or RTCP packet // before if ( !(getSendPacketCount() || getSendRTCPPacketCount()) ) return 0; if ( getMembersCount() > 50) { // Usurp the scheduler role and apply a back-off // algorithm to avoid BYE floods. SysTime::gettimeofday(&(reconsInfo.rtcpTc),NULL); reconsInfo.rtcpTp = reconsInfo.rtcpTc; setMembersCount(1); setPrevMembersNum(1); rtcpInitial = true; rtcpWeSent = false; rtcpAvgSize = (uint16)(sizeof(RTCPFixedHeader) + sizeof(uint32) + strlen(reason.c_str()) + (4 - (strlen(reason.c_str()) & 0x03))); SysTime::gettimeofday(&(reconsInfo.rtcpTc),NULL); timeval T = computeRTCPInterval(); timeradd(&(reconsInfo.rtcpTp),&T,&(reconsInfo.rtcpTn)); while ( timercmp(&(reconsInfo.rtcpTc),&(reconsInfo.rtcpTn),<) ) { getOnlyBye(); if ( timerReconsideration() ) break; SysTime::gettimeofday(&(reconsInfo.rtcpTc),NULL); } } unsigned char buffer[BYE_BUFFER_LENGTH] = {0}; // Build an empty RR as first packet in the compound. // TODO: provide more information if available. Not really // important, since this is the last packet being sent. RTCPPacket* pkt = reinterpret_cast(buffer); pkt->fh.version = CCRTP_VERSION; pkt->fh.padding = 0; pkt->fh.block_count = 0; pkt->fh.type = RTCPPacket::tRR; pkt->info.RR.ssrc= getLocalSSRCNetwork(); uint16 len1 = sizeof(RTCPFixedHeader) + sizeof(uint32); // 1st pkt len. pkt->fh.length = htons((len1 >> 2) - 1); uint16 len = len1; // whole compound len. // build a BYE packet uint16 padlen = 0; pkt = reinterpret_cast(buffer + len1); pkt->fh.version = CCRTP_VERSION; pkt->fh.block_count = 1; pkt->fh.type = RTCPPacket::tBYE; // add the SSRC identifier pkt->info.BYE.ssrc = getLocalSSRCNetwork(); len += sizeof(RTCPFixedHeader) + sizeof(BYEPacket); // add the optional reason if ( reason.c_str() != NULL ){ pkt->info.BYE.length = (uint8)strlen(reason.c_str()); memcpy(buffer + len,reason.c_str(),pkt->info.BYE.length); len += pkt->info.BYE.length; padlen = 4 - ((len - len1) & 0x03); if ( padlen ) { memset(buffer + len,0,padlen); len += padlen; pkt->info.BYE.length += padlen; } } pkt->fh.length = htons(((len - len1) >> 2) - 1); return sendControlToDestinations(buffer,len); } void QueueRTCPManager::getOnlyBye() { // This method is kind of simplified recvControl timeval wait; timersub(&(reconsInfo.rtcpTn),&(reconsInfo.rtcpTc),&wait); microtimeout_t timer = wait.tv_usec/1000 + wait.tv_sec * 1000; // wait up to reconsInfo.rtcpTn if ( !isPendingControl(timer) ) return; size_t len = 0; InetHostAddress network_address; tpport_t transport_port; while ( (len = recvControl(rtcpRecvBuffer,getPathMTU(), network_address,transport_port)) ) { // Process a len octets long RTCP compound packet // Check validity of the header fields of the compound packet if ( !RTCPCompoundHandler::checkCompoundRTCPHeader(len) ) return; // TODO: For now, we do nothing with the padding bit // in the header. uint32 pointer = 0; RTCPPacket* pkt; while ( pointer < len) { pkt = reinterpret_cast (rtcpRecvBuffer + pointer); if (pkt->fh.type == RTCPPacket::tBYE ) { bool created; SyncSourceLink* srcLink = getSourceBySSRC(pkt->getSSRC(), created); if( srcLink->getGoodbye() ) onGotGoodbye(*(srcLink->getSource()), ""); BYESource(pkt->getSSRC()); } pointer += pkt->getLength(); } } } size_t QueueRTCPManager::dispatchControlPacket(void) { rtcpInitial = false; // Keep in mind: always include a report (in SR or RR) and at // least a SDES with the local CNAME. It is mandatory. // (A) SR or RR, depending on whether we sent. // pkt will point to the packets of the compound RTCPPacket* pkt = reinterpret_cast(rtcpSendBuffer); // Fixed header of the first report pkt->fh.padding = 0; pkt->fh.version = CCRTP_VERSION; // length of the RTCP compound packet. It will increase till // the end of this routine. Both sender and receiver report // carry the general 32-bit long fixed header and a 32-bit // long SSRC identifier. uint16 len = sizeof(RTCPFixedHeader) + sizeof(uint32); // the fields block_count and length will be filled in later // now decide whether to send a SR or a SR if ( lastSendPacketCount != getSendPacketCount() ) { // we have sent rtp packets since last RTCP -> send SR lastSendPacketCount = getSendPacketCount(); pkt->fh.type = RTCPPacket::tSR; pkt->info.SR.ssrc = getLocalSSRCNetwork(); // Fill in sender info block. It would be more // accurate if this were done as late as possible. timeval now; SysTime::gettimeofday(&now,NULL); // NTP MSB and MSB: dependent on current payload type. pkt->info.SR.sinfo.NTPMSW = htonl(now.tv_sec + NTP_EPOCH_OFFSET); pkt->info.SR.sinfo.NTPLSW = htonl((uint32)(((double)(now.tv_usec)*(uint32)(~0))/1000000.0)); // RTP timestamp int32 tstamp = now.tv_usec - getInitialTime().tv_usec; tstamp *= (getCurrentRTPClockRate()/1000); tstamp /= 1000; tstamp += (now.tv_sec - getInitialTime().tv_sec) * getCurrentRTPClockRate(); tstamp += getInitialTimestamp(); pkt->info.SR.sinfo.RTPTimestamp = htonl(tstamp); // sender's packet and octet count pkt->info.SR.sinfo.packetCount = htonl(getSendPacketCount()); pkt->info.SR.sinfo.octetCount = htonl(getSendOctetCount()); len += sizeof(SenderInfo); } else { // RR pkt->fh.type = RTCPPacket::tRR; pkt->info.RR.ssrc = getLocalSSRCNetwork(); } // (B) put report blocks // After adding report blocks, we have to leave room for at // least a CNAME SDES item uint16 available = (uint16)(getPathMTU() - lowerHeadersSize - len - (sizeof(RTCPFixedHeader) + 2*sizeof(uint8) + getApplication().getSDESItem(SDESItemTypeCNAME).length()) - 100); // if we have to go to a new RR packet bool another = false; uint16 prevlen = 0; RRBlock* reports; if ( RTCPPacket::tRR == pkt->fh.type ) reports = pkt->info.RR.blocks; else // ( RTCPPacket::tSR == pkt->fh.type ) reports = pkt->info.SR.blocks; do { uint8 blocks = 0; pkt->fh.block_count = blocks = packReportBlocks(reports,len,available); // the length field specifies 32-bit words pkt->fh.length = htons( ((len - prevlen) >> 2) - 1); prevlen = len; if ( 31 == blocks ) { // we would need room for a new RR packet and // a CNAME SDES if ( len < (available - ( sizeof(RTCPFixedHeader) + sizeof(uint32) + sizeof(RRBlock))) ) { another = true; // Header for this new packet in the compound pkt = reinterpret_cast (rtcpSendBuffer + len); pkt->fh.version = CCRTP_VERSION; pkt->fh.padding = 0; pkt->fh.type = RTCPPacket::tRR; pkt->info.RR.ssrc = getLocalSSRCNetwork(); // appended a new Header and a report block len += sizeof(RTCPFixedHeader)+ sizeof(uint32); reports = pkt->info.RR.blocks; } else { another = false; } } else { another = false; } } while ( (len < available) && another ); // (C) SDES (CNAME) // each SDES chunk must be 32-bit multiple long // fill the padding with 0s packSDES(len); // TODO: virtual for sending APP RTCP packets? // actually send the packet. size_t count = sendControlToDestinations(rtcpSendBuffer,len); ctrlSendCount++; // Everything went right, update the RTCP average size updateAvgRTCPSize(len); return count; } void QueueRTCPManager::packSDES(uint16 &len) { uint16 prevlen = len; RTCPPacket* pkt = reinterpret_cast(rtcpSendBuffer + len); // Fill RTCP fixed header. Note fh.length is not set till the // end of this routine. pkt->fh.version = CCRTP_VERSION; pkt->fh.padding = 0; pkt->fh.block_count = 1; pkt->fh.type = RTCPPacket::tSDES; pkt->info.SDES.ssrc = getLocalSSRCNetwork(); pkt->info.SDES.item.type = SDESItemTypeCNAME; // put CNAME size_t cnameLen = getApplication().getSDESItem(SDESItemTypeCNAME).length(); const char* cname = getApplication().getSDESItem(SDESItemTypeCNAME).c_str(); pkt->info.SDES.item.len = (uint8)cnameLen; len += sizeof(RTCPFixedHeader) + sizeof(pkt->info.SDES.ssrc) + sizeof(pkt->info.SDES.item.type) + sizeof(pkt->info.SDES.item.len); memcpy((rtcpSendBuffer + len),cname,cnameLen); len += (uint16)cnameLen; // pack items other than CNAME (following priorities // stablished inside scheduleSDESItem()). SDESItemType nexttype = scheduleSDESItem(); if ( (nexttype > SDESItemTypeCNAME) && (nexttype <= SDESItemTypeLast ) ) { SDESItem *item = reinterpret_cast(rtcpSendBuffer + len); item->type = nexttype; const char *content = getApplication().getSDESItem(nexttype).c_str(); item->len = (uint8)strlen(content); len += 2; memcpy(reinterpret_cast(rtcpSendBuffer + len), content,item->len); len += item->len; } // pack END item (terminate list of items in this chunk) rtcpSendBuffer[len] = SDESItemTypeEND; len++; uint8 padding = len & 0x03; if ( padding ) { padding = 4 - padding; memset((rtcpSendBuffer + len),SDESItemTypeEND,padding); len += padding; } pkt->fh.length = htons((len - prevlen - 1) >>2); } uint8 QueueRTCPManager::packReportBlocks(RRBlock* blocks, uint16 &len, uint16& available) { uint8 j = 0; // pack as many report blocks as we can SyncSourceLink* i = getFirst(); for ( ; ( ( i != NULL ) && ( len < (available - sizeof(RTCPCompoundHandler::RRBlock)) ) && ( j < 31 ) ); i = i->getNext() ) { SyncSourceLink& srcLink = *i; // update stats. srcLink.computeStats(); blocks[j].ssrc = htonl(srcLink.getSource()->getID()); blocks[j].rinfo.fractionLost = srcLink.getFractionLost(); blocks[j].rinfo.lostMSB = (srcLink.getCumulativePacketLost() & 0xFF0000) >> 16; blocks[j].rinfo.lostLSW = htons(srcLink.getCumulativePacketLost() & 0xFFFF); blocks[j].rinfo.highestSeqNum = htonl(srcLink.getExtendedMaxSeqNum()); blocks[j].rinfo.jitter = htonl(static_cast(srcLink.getJitter())); RTCPCompoundHandler::SenderInfo* si = reinterpret_cast(srcLink.getSenderInfo()); if ( NULL == si ) { blocks[j].rinfo.lsr = 0; blocks[j].rinfo.dlsr = 0; } else { blocks[j].rinfo.lsr = htonl( ((ntohl(si->NTPMSW) & 0x0FFFF) << 16 )+ ((ntohl(si->NTPLSW) & 0xFFFF0000) >> 16) ); timeval now, diff; SysTime::gettimeofday(&now,NULL); timeval last = srcLink.getLastRTCPSRTime(); timersub(&now,&last,&diff); blocks[j].rinfo.dlsr = htonl(timevalIntervalTo65536(diff)); } len += sizeof(RTCPCompoundHandler::RRBlock); j++; } return j; } void QueueRTCPManager::setSDESItem(Participant* part, SDESItemType type, const char* const value, size_t len) { char* buf = new char[len + 1]; memcpy(buf,value,len); buf[len] = '\0'; ParticipantHandler::setSDESItem(part,type,buf); delete [] buf; } void QueueRTCPManager::setPRIVPrefix(Participant* part, const char* const value, size_t len) { char *buf = new char[len + 1]; memcpy(buf,value,len); buf[len] = '\0'; ParticipantHandler::setPRIVPrefix(part,buf); delete [] buf; } SDESItemType QueueRTCPManager::scheduleSDESItem() { uint8 i = 0; // TODO: follow, at least, standard priorities SDESItemType type = nextScheduledSDESItem; while ( (queueApplication.getSDESItem(type).length() <= 0) && i < (lastSchedulable - firstSchedulable) ) { i++; type = nextSDESType(type); } bool empty = true; if ( queueApplication.getSDESItem(type).length() > 0 ) empty = false; nextScheduledSDESItem = nextSDESType(type); if ( empty ) return SDESItemTypeEND; else return type; } SDESItemType QueueRTCPManager::nextSDESType(SDESItemType t) { t = static_cast( static_cast(t) + 1 ); if ( t > lastSchedulable ) t = firstSchedulable; return t; } size_t QueueRTCPManager::sendControlToDestinations(unsigned char* buffer, size_t len) { size_t count = 0; lockDestinationList(); // Cast to have easy access to ssrc et al RTCPPacket *pkt = reinterpret_cast(buffer); CryptoContextCtrl* pcc = getOutQueueCryptoContextCtrl(pkt->getSSRC()); if (pcc == NULL) { pcc = getOutQueueCryptoContextCtrl(0); if (pcc != NULL) { pcc = pcc->newCryptoContextForSSRC(pkt->getSSRC()); if (pcc != NULL) { pcc->deriveSrtcpKeys(); setOutQueueCryptoContextCtrl(pcc); } } } // If no crypto context: then SRTP/SRTCP is off // If crypto context is available then unprotect data here. If an error // occurs report the error and discard the packet. if (pcc != NULL) { len = protect(buffer, len, pcc); } if ( isSingleDestination() ) { count = sendControl(buffer,len); } else { // when no destination has been added, NULL == dest. for (std::list::iterator i = destList.begin(); destList.end() != i; i++) { TransportAddress* dest = *i; setControlPeer(dest->getNetworkAddress(), dest->getControlTransportPort()); count += sendControl(buffer,len); } } unlockDestinationList(); return count; } int32 QueueRTCPManager::protect(uint8* pkt, size_t len, CryptoContextCtrl* pcc) { /* Encrypt the packet */ uint32 ssrc = *(reinterpret_cast(pkt + 4)); // always SSRC of sender ssrc =ntohl(ssrc); pcc->srtcpEncrypt(pkt + 8, len - 8, srtcpIndex, ssrc); uint32 encIndex = srtcpIndex | 0x80000000; // set the E flag uint32* ip = reinterpret_cast(pkt+len); *ip = htonl(encIndex); // NO MKI support yet - here we assume MKI is zero. To build in MKI // take MKI length into account when storing the authentication tag. // Compute MAC and store in packet after the SRTCP index field pcc->srtcpAuthenticate(pkt, len, encIndex, pkt + len + sizeof(uint32)); srtcpIndex++; srtcpIndex &= ~0x80000000; // clear possible overflow return len + pcc->getTagLength() + sizeof(uint32); } int32 QueueRTCPManager::unprotect(uint8* pkt, size_t len, CryptoContextCtrl* pcc) { if (pcc == NULL) { return true; } // Compute the total length of the payload uint32 payloadLen = len - (pcc->getTagLength() + pcc->getMkiLength() + 4); // point to the SRTCP index field just after the real payload const uint32* index = reinterpret_cast(pkt + payloadLen); uint32 ssrc = *(reinterpret_cast(pkt + 4)); // always SSRC of sender ssrc =ntohl(ssrc); uint32 encIndex = ntohl(*index); uint32 remoteIndex = encIndex & ~0x80000000; // index without Encryption flag if (!pcc->checkReplay(remoteIndex)) { return -2; } uint8 mac[20]; // Now get a pointer to the authentication tag field const uint8* tag = pkt + (len - pcc->getTagLength()); // Authenticate includes the index, but not MKI and not (obviously) the tag itself pcc->srtcpAuthenticate(pkt, payloadLen, encIndex, mac); if (memcmp(tag, mac, pcc->getTagLength()) != 0) { return -1; } // Decrypt the content, exclude the very first SRTCP header (fixed, 8 bytes) if (encIndex & 0x80000000) pcc->srtcpEncrypt(pkt + 8, payloadLen - 8, remoteIndex, ssrc); // Update the Crypto-context pcc->update(remoteIndex); return payloadLen; } void QueueRTCPManager::setOutQueueCryptoContextCtrl(CryptoContextCtrl* cc) { std::list::iterator i; MutexLock lock(outCryptoMutex); // check if a CryptoContext for a SSRC already exists. If yes // remove it from list before inserting the new one. for( i = outCryptoContexts.begin(); i!= outCryptoContexts.end(); i++ ) { if( (*i)->getSsrc() == cc->getSsrc() ) { CryptoContextCtrl* tmp = *i; outCryptoContexts.erase(i); delete tmp; break; } } outCryptoContexts.push_back(cc); } void QueueRTCPManager::removeOutQueueCryptoContextCtrl(CryptoContextCtrl* cc) { std::list::iterator i; MutexLock lock(outCryptoMutex); if (cc == NULL) { // Remove any incoming crypto contexts for (i = outCryptoContexts.begin(); i != outCryptoContexts.end(); ) { CryptoContextCtrl* tmp = *i; i = outCryptoContexts.erase(i); delete tmp; } } else { for( i = outCryptoContexts.begin(); i != outCryptoContexts.end(); i++ ) { if( (*i)->getSsrc() == cc->getSsrc() ) { CryptoContextCtrl* tmp = *i; outCryptoContexts.erase(i); delete tmp; return; } } } } CryptoContextCtrl* QueueRTCPManager::getOutQueueCryptoContextCtrl(uint32 ssrc) { std::list::iterator i; MutexLock lock(outCryptoMutex); for( i = outCryptoContexts.begin(); i != outCryptoContexts.end(); i++ ){ if( (*i)->getSsrc() == ssrc) { return (*i); } } return NULL; } void QueueRTCPManager::setInQueueCryptoContextCtrl(CryptoContextCtrl* cc) { std::list::iterator i; MutexLock lock(inCryptoMutex); // check if a CryptoContext for a SSRC already exists. If yes // remove it from list before inserting the new one. for( i = inCryptoContexts.begin(); i!= inCryptoContexts.end(); i++ ) { if( (*i)->getSsrc() == cc->getSsrc() ) { CryptoContextCtrl* tmp = *i; inCryptoContexts.erase(i); delete tmp; break; } } inCryptoContexts.push_back(cc); } void QueueRTCPManager::removeInQueueCryptoContextCtrl(CryptoContextCtrl* cc) { std::list::iterator i; MutexLock lock(inCryptoMutex); if (cc == NULL) { // Remove any incoming crypto contexts for (i = inCryptoContexts.begin(); i != inCryptoContexts.end(); ) { CryptoContextCtrl* tmp = *i; i = inCryptoContexts.erase(i); delete tmp; } } else { for( i = inCryptoContexts.begin(); i!= inCryptoContexts.end(); i++ ){ if( (*i)->getSsrc() == cc->getSsrc() ) { CryptoContextCtrl* tmp = *i; inCryptoContexts.erase(i); delete tmp; return; } } } } CryptoContextCtrl* QueueRTCPManager::getInQueueCryptoContextCtrl(uint32 ssrc) { std::list::iterator i; MutexLock lock(inCryptoMutex); for( i = inCryptoContexts.begin(); i!= inCryptoContexts.end(); i++ ){ if( (*i)->getSsrc() == ssrc) { return (*i); } } return NULL; } END_NAMESPACE /** EMACS ** * Local variables: * mode: c++ * c-basic-offset: 4 * End: */