diff options
Diffstat (limited to 'src/filtration/filtration_processor.h')
-rw-r--r-- | src/filtration/filtration_processor.h | 266 |
1 files changed, 132 insertions, 134 deletions
diff --git a/src/filtration/filtration_processor.h b/src/filtration/filtration_processor.h index 8eea6cf..ef98570 100644 --- a/src/filtration/filtration_processor.h +++ b/src/filtration/filtration_processor.h @@ -32,23 +32,22 @@ #include <pcap/pcap.h> -#include "utils/log.h" -#include "utils/out.h" -#include "utils/sessions.h" -#include "utils/profiler.h" #include "controller/parameters.h" #include "filtration/packet.h" #include "filtration/sessions_hash.h" -#include "protocols/rpc/rpc_header.h" #include "protocols/nfs3/nfs3_utils.h" #include "protocols/nfs4/nfs4_utils.h" +#include "protocols/rpc/rpc_header.h" +#include "utils/log.h" +#include "utils/out.h" +#include "utils/profiler.h" +#include "utils/sessions.h" //------------------------------------------------------------------------------ namespace NST { namespace filtration { - -using namespace NST::protocols::rpc;//FIXME: It is not good to use "using" in headers - should be removed +using namespace NST::protocols::rpc; //FIXME: It is not good to use "using" in headers - should be removed using ProcEnumNFS3 = API::ProcEnumNFS3; using NFS3Validator = NST::protocols::NFS3::Validator; using NFS4Validator = NST::protocols::NFS4::Validator; @@ -67,77 +66,77 @@ struct UDPSession : public utils::NetworkSession { public: UDPSession(Writer* w, uint32_t max_rpc_hdr) - : collection{w, this} - , nfs3_rw_hdr_max{max_rpc_hdr} + : collection{w, this} + , nfs3_rw_hdr_max{max_rpc_hdr} { } - UDPSession(UDPSession&&) = delete; - UDPSession(const UDPSession&) = delete; + UDPSession(UDPSession&&) = delete; + UDPSession(const UDPSession&) = delete; UDPSession& operator=(const UDPSession&) = delete; void collect(PacketInfo& info) { // TODO: this code must be generalized with RPCFiltrator class - uint32_t hdr_len {0}; - auto msg = reinterpret_cast<const MessageHeader*const>(info.data); + uint32_t hdr_len{0}; + auto msg = reinterpret_cast<const MessageHeader* const>(info.data); switch(msg->type()) { - case MsgType::CALL: + case MsgType::CALL: + { + auto call = static_cast<const CallHeader* const>(msg); + if(RPCValidator::check(call)) { - auto call = static_cast<const CallHeader*const>(msg); - if(RPCValidator::check(call)) + if(NFS3Validator::check(call)) { - if (NFS3Validator::check(call)) + uint32_t proc{call->proc()}; + if(ProcEnumNFS3::WRITE == proc) // truncate NFSv3 WRITE call message to NFSv3-RW-limit { - uint32_t proc {call->proc()}; - if (ProcEnumNFS3::WRITE == proc) // truncate NFSv3 WRITE call message to NFSv3-RW-limit - { - hdr_len = (nfs3_rw_hdr_max < info.dlen ? nfs3_rw_hdr_max : info.dlen); - } - else - { - if (ProcEnumNFS3::READ == proc) - nfs3_read_match.insert(call->xid()); - hdr_len = info.dlen; - } - } - else if (NFS4Validator::check(call)) - { - hdr_len = info.dlen; // fully collect NFSv4 messages + hdr_len = (nfs3_rw_hdr_max < info.dlen ? nfs3_rw_hdr_max : info.dlen); } else { - return; + if(ProcEnumNFS3::READ == proc) + nfs3_read_match.insert(call->xid()); + hdr_len = info.dlen; } } + else if(NFS4Validator::check(call)) + { + hdr_len = info.dlen; // fully collect NFSv4 messages + } else { return; } } - break; - case MsgType::REPLY: + else { - auto reply = static_cast<const ReplyHeader*const>(msg); - if(RPCValidator::check(reply)) - { - // Truncate NFSv3 READ reply message to NFSv3-RW-limit - //* Collect fully if reply received before matching call - if (nfs3_read_match.erase(reply->xid()) > 0) - { - hdr_len = (nfs3_rw_hdr_max < info.dlen ? nfs3_rw_hdr_max : info.dlen); - } - else - hdr_len = info.dlen; - } - else // isn't RPC reply, stream is corrupt + return; + } + } + break; + case MsgType::REPLY: + { + auto reply = static_cast<const ReplyHeader* const>(msg); + if(RPCValidator::check(reply)) + { + // Truncate NFSv3 READ reply message to NFSv3-RW-limit + //* Collect fully if reply received before matching call + if(nfs3_read_match.erase(reply->xid()) > 0) { - return; + hdr_len = (nfs3_rw_hdr_max < info.dlen ? nfs3_rw_hdr_max : info.dlen); } + else + hdr_len = info.dlen; } - break; - default: + else // isn't RPC reply, stream is corrupt + { return; + } + } + break; + default: + return; } collection.allocate(); @@ -148,8 +147,8 @@ public: } typename Writer::Collection collection; - uint32_t nfs3_rw_hdr_max; - MessageSet nfs3_read_match; + uint32_t nfs3_rw_hdr_max; + MessageSet nfs3_read_match; }; // Represents TCP conversation between node A and node B @@ -157,28 +156,28 @@ template <typename StreamReader> class TCPSession : public utils::NetworkSession { public: - struct Flow { // Helpers for comparison sequence numbers // Idea for gt: either x > y, or y is much bigger (assume wrap) - inline static bool GT_SEQ(uint32_t x, uint32_t y){ return (int32_t)((y) - (x)) < 0; } - inline static bool LT_SEQ(uint32_t x, uint32_t y){ return (int32_t)((x) - (y)) < 0; } - inline static bool GE_SEQ(uint32_t x, uint32_t y){ return (int32_t)((y) - (x)) <= 0; } - inline static bool LE_SEQ(uint32_t x, uint32_t y){ return (int32_t)((x) - (y)) <= 0; } - inline static bool EQ_SEQ(uint32_t x, uint32_t y){ return (x) ==(y); } - + inline static bool GT_SEQ(uint32_t x, uint32_t y) { return (int32_t)((y) - (x)) < 0; } + inline static bool LT_SEQ(uint32_t x, uint32_t y) { return (int32_t)((x) - (y)) < 0; } + inline static bool GE_SEQ(uint32_t x, uint32_t y) { return (int32_t)((y) - (x)) <= 0; } + inline static bool LE_SEQ(uint32_t x, uint32_t y) { return (int32_t)((x) - (y)) <= 0; } + inline static bool EQ_SEQ(uint32_t x, uint32_t y) { return (x) == (y); } friend class TCPSession<StreamReader>; - Flow() : fragments{nullptr}, sequence{0} + Flow() + : fragments{nullptr} + , sequence{0} { } ~Flow() { reset(); } - Flow(Flow&&) = delete; - Flow(const Flow&) = delete; + Flow(Flow&&) = delete; + Flow(const Flow&) = delete; Flow& operator=(const Flow&) = delete; void reset() @@ -196,20 +195,20 @@ public: void reassemble(PacketInfo& info) { - uint32_t seq {info.tcp->seq()}; - uint32_t len {info.dlen}; + uint32_t seq{info.tcp->seq()}; + uint32_t len{info.dlen}; - if( sequence == 0 ) // this is the first time we have seen this src's sequence number + if(sequence == 0) // this is the first time we have seen this src's sequence number { sequence = seq + len; - if( info.tcp->is(tcp_header::SYN) ) + if(info.tcp->is(tcp_header::SYN)) { sequence++; } if(len > 0) { - reader.push(info); // write out the packet data + reader.push(info); // write out the packet data } return; @@ -217,20 +216,19 @@ public: // if we are here, we have already seen this src, let's // try and figure out if this packet is in the right place - if( LT_SEQ(seq, sequence) ) + if(LT_SEQ(seq, sequence)) { // this sequence number seems dated, but // check the end to make sure it has no more // info than we have already seen - uint32_t newseq {seq + len}; - if( GT_SEQ(newseq, sequence) ) + uint32_t newseq{seq + len}; + if(GT_SEQ(newseq, sequence)) { - // this one has more than we have seen. let's get the // payload that we have not seen - uint32_t new_len {sequence - seq}; + uint32_t new_len{sequence - seq}; - if ( info.dlen <= new_len ) + if(info.dlen <= new_len) { info.data = nullptr; info.dlen = 0; @@ -249,21 +247,22 @@ public: } } - if ( EQ_SEQ(seq, sequence) ) // right on time + if(EQ_SEQ(seq, sequence)) // right on time { sequence += len; - if( info.tcp->is(tcp_header::SYN) ) sequence++; + if(info.tcp->is(tcp_header::SYN)) sequence++; - if( info.data && info.dlen > 0) + if(info.data && info.dlen > 0) { reader.push(info); } // done with the packet, see if it caused a fragment to fit - while( check_fragments(0) ); + while(check_fragments(0)) + ; } else // out of order packet { - if(info.dlen > 0 && GT_SEQ(seq, sequence) ) + if(info.dlen > 0 && GT_SEQ(seq, sequence)) { //TRACE("ADD FRAGMENT seq: %u dlen: %u sequence: %u", seq, info.dlen, sequence); fragments = Packet::create(info, fragments); @@ -273,37 +272,37 @@ public: bool check_fragments(const uint32_t acknowledged) { - Packet* current {fragments}; - if( current ) + Packet* current{fragments}; + if(current) { - Packet* prev {nullptr}; - uint32_t lowest_seq {current->tcp->seq()}; - while( current ) + Packet* prev{nullptr}; + uint32_t lowest_seq{current->tcp->seq()}; + while(current) { - const uint32_t current_seq {current->tcp->seq()}; - const uint32_t current_len {current->dlen}; + const uint32_t current_seq{current->tcp->seq()}; + const uint32_t current_len{current->dlen}; - if( GT_SEQ(lowest_seq, current_seq) ) // lowest_seq > current_seq + if(GT_SEQ(lowest_seq, current_seq)) // lowest_seq > current_seq { lowest_seq = current_seq; } - if( LT_SEQ(current_seq, sequence) ) // current_seq < sequence + if(LT_SEQ(current_seq, sequence)) // current_seq < sequence { // this sequence number seems dated, but // check the end to make sure it has no more // info than we have already seen - uint32_t newseq {current_seq + current_len}; - if( GT_SEQ(newseq, sequence) ) + uint32_t newseq{current_seq + current_len}; + if(GT_SEQ(newseq, sequence)) { // this one has more than we have seen. let's get the // payload that we have not seen. This happens when // part of this frame has been retransmitted - uint32_t new_pos {sequence - current_seq}; + uint32_t new_pos{sequence - current_seq}; sequence += (current_len - new_pos); - if ( current->dlen > new_pos ) + if(current->dlen > new_pos) { current->data += new_pos; current->dlen -= new_pos; @@ -312,9 +311,9 @@ public: } // Remove the fragment from the list as the "new" part of it - // has been processed or its data has been seen already in + // has been processed or its data has been seen already in // another packet. - if( prev ) + if(prev) { prev->next = current->next; } @@ -328,11 +327,11 @@ public: return true; } - if( EQ_SEQ(current_seq, sequence) ) + if(EQ_SEQ(current_seq, sequence)) { // this fragment fits the stream sequence += current_len; - if( prev ) + if(prev) { prev->next = current->next; } @@ -346,11 +345,11 @@ public: return true; } - prev = current; + prev = current; current = current->next; - }// end while + } // end while - if( GT_SEQ(acknowledged, lowest_seq) ) // acknowledged > lowest_seq + if(GT_SEQ(acknowledged, lowest_seq)) // acknowledged > lowest_seq { //TRACE("acknowledged(%u) > lowest_seq(%u) seq:%u", acknowledged, lowest_seq, sequence); // There are frames missing in the capture stream that were seen @@ -365,9 +364,9 @@ public: } private: - StreamReader reader; // reader of acknowledged data stream - Packet* fragments; // list of not yet acked fragments - uint32_t sequence; + StreamReader reader; // reader of acknowledged data stream + Packet* fragments; // list of not yet acked fragments + uint32_t sequence; }; template <typename Writer> @@ -376,16 +375,17 @@ public: flows[0].reader.set_writer(this, w, max_rpc_hdr); flows[1].reader.set_writer(this, w, max_rpc_hdr); } - TCPSession(TCPSession&&) = delete; - TCPSession(const TCPSession&) = delete; + TCPSession(TCPSession&&) = delete; + TCPSession(const TCPSession&) = delete; TCPSession& operator=(const TCPSession&) = delete; void collect(PacketInfo& info) { - const uint32_t ack {info.tcp->ack()}; + const uint32_t ack{info.tcp->ack()}; //check whether this frame acks fragments that were already seen. - while( flows[1-info.direction].check_fragments(ack) ); + while(flows[1 - info.direction].check_fragments(ack)) + ; flows[info.direction].reassemble(info); } @@ -393,24 +393,21 @@ public: Flow flows[2]; }; -template -< +template < typename Reader, typename Writer, - typename Filtrator -> + typename Filtrator> class FiltrationProcessor { public: - explicit FiltrationProcessor(std::unique_ptr<Reader>& r, std::unique_ptr<Writer>& w) - : reader{std::move(r)} - , writer{std::move(w)} - , ipv4_tcp_sessions{writer.get()} - , ipv4_udp_sessions{writer.get()} - , ipv6_tcp_sessions{writer.get()} - , ipv6_udp_sessions{writer.get()} + : reader{std::move(r)} + , writer{std::move(w)} + , ipv4_tcp_sessions{writer.get()} + , ipv4_udp_sessions{writer.get()} + , ipv6_tcp_sessions{writer.get()} + , ipv6_udp_sessions{writer.get()} { // check datalink layer datalink = reader->datalink(); @@ -427,7 +424,7 @@ public: void run() { - bool done {reader->loop(this, callback)}; + bool done{reader->loop(this, callback)}; if(done) { throw controller::ProcessingDone("Filtration is done"); @@ -439,9 +436,9 @@ public: reader->break_loop(); } - static void callback(u_char *user, const struct pcap_pkthdr *pkthdr, const u_char* packet) + static void callback(u_char* user, const struct pcap_pkthdr* pkthdr, const u_char* packet) { - PROF;// Calc how much time was spent in this func + PROF; // Calc how much time was spent in this func auto processor = reinterpret_cast<FiltrationProcessor*>(user); PacketInfo info(pkthdr, packet, processor->datalink); @@ -450,46 +447,47 @@ public: { if(pkthdr->caplen != pkthdr->len) { - LOGONCE("pcap packet was truncated by snaplen option this " - "packed won't correclty reassembled to TCP stream"); + LOGONCE( + "pcap packet was truncated by snaplen option this " + "packed won't correclty reassembled to TCP stream"); return; } - if(info.ipv4) // Ethernet:IPv4:TCP + if(info.ipv4) // Ethernet:IPv4:TCP { return processor->ipv4_tcp_sessions.collect_packet(info); } - else if(info.ipv6) // Ethernet:IPv6:TCP + else if(info.ipv6) // Ethernet:IPv6:TCP { return processor->ipv6_tcp_sessions.collect_packet(info); } } else if(info.udp) { - if(info.ipv4) // Ethernet:IPv4:UDP + if(info.ipv4) // Ethernet:IPv4:UDP { return processor->ipv4_udp_sessions.collect_packet(info); } - else if(info.ipv6) // Ethernet:IPv6:UDP + else if(info.ipv6) // Ethernet:IPv6:UDP { return processor->ipv6_udp_sessions.collect_packet(info); } } - LOGONCE("only following stack of protocol is supported: " - "Ethernet II:IPv4|IPv6(except additional fragments):TCP|UDP"); + LOGONCE( + "only following stack of protocol is supported: " + "Ethernet II:IPv4|IPv6(except additional fragments):TCP|UDP"); } private: - std::unique_ptr<Reader> reader; std::unique_ptr<Writer> writer; - SessionsHash< IPv4TCPMapper, TCPSession <Filtrator> , Writer > ipv4_tcp_sessions; - SessionsHash< IPv4UDPMapper, UDPSession < Writer > , Writer > ipv4_udp_sessions; + SessionsHash<IPv4TCPMapper, TCPSession<Filtrator>, Writer> ipv4_tcp_sessions; + SessionsHash<IPv4UDPMapper, UDPSession<Writer>, Writer> ipv4_udp_sessions; - SessionsHash< IPv6TCPMapper, TCPSession < Filtrator> , Writer > ipv6_tcp_sessions; - SessionsHash< IPv6UDPMapper, UDPSession < Writer > , Writer > ipv6_udp_sessions; + SessionsHash<IPv6TCPMapper, TCPSession<Filtrator>, Writer> ipv6_tcp_sessions; + SessionsHash<IPv6UDPMapper, UDPSession<Writer>, Writer> ipv6_udp_sessions; int datalink; }; @@ -497,5 +495,5 @@ private: } // namespace filtration } // namespace NST //------------------------------------------------------------------------------ -#endif//FILTRATION_PROCESSOR_H +#endif // FILTRATION_PROCESSOR_H //------------------------------------------------------------------------------ |