summaryrefslogtreecommitdiff
path: root/src/filtration/filtration_processor.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/filtration/filtration_processor.h')
-rw-r--r--src/filtration/filtration_processor.h266
1 files changed, 132 insertions, 134 deletions
diff --git a/src/filtration/filtration_processor.h b/src/filtration/filtration_processor.h
index 8eea6cf..ef98570 100644
--- a/src/filtration/filtration_processor.h
+++ b/src/filtration/filtration_processor.h
@@ -32,23 +32,22 @@
#include <pcap/pcap.h>
-#include "utils/log.h"
-#include "utils/out.h"
-#include "utils/sessions.h"
-#include "utils/profiler.h"
#include "controller/parameters.h"
#include "filtration/packet.h"
#include "filtration/sessions_hash.h"
-#include "protocols/rpc/rpc_header.h"
#include "protocols/nfs3/nfs3_utils.h"
#include "protocols/nfs4/nfs4_utils.h"
+#include "protocols/rpc/rpc_header.h"
+#include "utils/log.h"
+#include "utils/out.h"
+#include "utils/profiler.h"
+#include "utils/sessions.h"
//------------------------------------------------------------------------------
namespace NST
{
namespace filtration
{
-
-using namespace NST::protocols::rpc;//FIXME: It is not good to use "using" in headers - should be removed
+using namespace NST::protocols::rpc; //FIXME: It is not good to use "using" in headers - should be removed
using ProcEnumNFS3 = API::ProcEnumNFS3;
using NFS3Validator = NST::protocols::NFS3::Validator;
using NFS4Validator = NST::protocols::NFS4::Validator;
@@ -67,77 +66,77 @@ struct UDPSession : public utils::NetworkSession
{
public:
UDPSession(Writer* w, uint32_t max_rpc_hdr)
- : collection{w, this}
- , nfs3_rw_hdr_max{max_rpc_hdr}
+ : collection{w, this}
+ , nfs3_rw_hdr_max{max_rpc_hdr}
{
}
- UDPSession(UDPSession&&) = delete;
- UDPSession(const UDPSession&) = delete;
+ UDPSession(UDPSession&&) = delete;
+ UDPSession(const UDPSession&) = delete;
UDPSession& operator=(const UDPSession&) = delete;
void collect(PacketInfo& info)
{
// TODO: this code must be generalized with RPCFiltrator class
- uint32_t hdr_len {0};
- auto msg = reinterpret_cast<const MessageHeader*const>(info.data);
+ uint32_t hdr_len{0};
+ auto msg = reinterpret_cast<const MessageHeader* const>(info.data);
switch(msg->type())
{
- case MsgType::CALL:
+ case MsgType::CALL:
+ {
+ auto call = static_cast<const CallHeader* const>(msg);
+ if(RPCValidator::check(call))
{
- auto call = static_cast<const CallHeader*const>(msg);
- if(RPCValidator::check(call))
+ if(NFS3Validator::check(call))
{
- if (NFS3Validator::check(call))
+ uint32_t proc{call->proc()};
+ if(ProcEnumNFS3::WRITE == proc) // truncate NFSv3 WRITE call message to NFSv3-RW-limit
{
- uint32_t proc {call->proc()};
- if (ProcEnumNFS3::WRITE == proc) // truncate NFSv3 WRITE call message to NFSv3-RW-limit
- {
- hdr_len = (nfs3_rw_hdr_max < info.dlen ? nfs3_rw_hdr_max : info.dlen);
- }
- else
- {
- if (ProcEnumNFS3::READ == proc)
- nfs3_read_match.insert(call->xid());
- hdr_len = info.dlen;
- }
- }
- else if (NFS4Validator::check(call))
- {
- hdr_len = info.dlen; // fully collect NFSv4 messages
+ hdr_len = (nfs3_rw_hdr_max < info.dlen ? nfs3_rw_hdr_max : info.dlen);
}
else
{
- return;
+ if(ProcEnumNFS3::READ == proc)
+ nfs3_read_match.insert(call->xid());
+ hdr_len = info.dlen;
}
}
+ else if(NFS4Validator::check(call))
+ {
+ hdr_len = info.dlen; // fully collect NFSv4 messages
+ }
else
{
return;
}
}
- break;
- case MsgType::REPLY:
+ else
{
- auto reply = static_cast<const ReplyHeader*const>(msg);
- if(RPCValidator::check(reply))
- {
- // Truncate NFSv3 READ reply message to NFSv3-RW-limit
- //* Collect fully if reply received before matching call
- if (nfs3_read_match.erase(reply->xid()) > 0)
- {
- hdr_len = (nfs3_rw_hdr_max < info.dlen ? nfs3_rw_hdr_max : info.dlen);
- }
- else
- hdr_len = info.dlen;
- }
- else // isn't RPC reply, stream is corrupt
+ return;
+ }
+ }
+ break;
+ case MsgType::REPLY:
+ {
+ auto reply = static_cast<const ReplyHeader* const>(msg);
+ if(RPCValidator::check(reply))
+ {
+ // Truncate NFSv3 READ reply message to NFSv3-RW-limit
+ //* Collect fully if reply received before matching call
+ if(nfs3_read_match.erase(reply->xid()) > 0)
{
- return;
+ hdr_len = (nfs3_rw_hdr_max < info.dlen ? nfs3_rw_hdr_max : info.dlen);
}
+ else
+ hdr_len = info.dlen;
}
- break;
- default:
+ else // isn't RPC reply, stream is corrupt
+ {
return;
+ }
+ }
+ break;
+ default:
+ return;
}
collection.allocate();
@@ -148,8 +147,8 @@ public:
}
typename Writer::Collection collection;
- uint32_t nfs3_rw_hdr_max;
- MessageSet nfs3_read_match;
+ uint32_t nfs3_rw_hdr_max;
+ MessageSet nfs3_read_match;
};
// Represents TCP conversation between node A and node B
@@ -157,28 +156,28 @@ template <typename StreamReader>
class TCPSession : public utils::NetworkSession
{
public:
-
struct Flow
{
// Helpers for comparison sequence numbers
// Idea for gt: either x > y, or y is much bigger (assume wrap)
- inline static bool GT_SEQ(uint32_t x, uint32_t y){ return (int32_t)((y) - (x)) < 0; }
- inline static bool LT_SEQ(uint32_t x, uint32_t y){ return (int32_t)((x) - (y)) < 0; }
- inline static bool GE_SEQ(uint32_t x, uint32_t y){ return (int32_t)((y) - (x)) <= 0; }
- inline static bool LE_SEQ(uint32_t x, uint32_t y){ return (int32_t)((x) - (y)) <= 0; }
- inline static bool EQ_SEQ(uint32_t x, uint32_t y){ return (x) ==(y); }
-
+ inline static bool GT_SEQ(uint32_t x, uint32_t y) { return (int32_t)((y) - (x)) < 0; }
+ inline static bool LT_SEQ(uint32_t x, uint32_t y) { return (int32_t)((x) - (y)) < 0; }
+ inline static bool GE_SEQ(uint32_t x, uint32_t y) { return (int32_t)((y) - (x)) <= 0; }
+ inline static bool LE_SEQ(uint32_t x, uint32_t y) { return (int32_t)((x) - (y)) <= 0; }
+ inline static bool EQ_SEQ(uint32_t x, uint32_t y) { return (x) == (y); }
friend class TCPSession<StreamReader>;
- Flow() : fragments{nullptr}, sequence{0}
+ Flow()
+ : fragments{nullptr}
+ , sequence{0}
{
}
~Flow()
{
reset();
}
- Flow(Flow&&) = delete;
- Flow(const Flow&) = delete;
+ Flow(Flow&&) = delete;
+ Flow(const Flow&) = delete;
Flow& operator=(const Flow&) = delete;
void reset()
@@ -196,20 +195,20 @@ public:
void reassemble(PacketInfo& info)
{
- uint32_t seq {info.tcp->seq()};
- uint32_t len {info.dlen};
+ uint32_t seq{info.tcp->seq()};
+ uint32_t len{info.dlen};
- if( sequence == 0 ) // this is the first time we have seen this src's sequence number
+ if(sequence == 0) // this is the first time we have seen this src's sequence number
{
sequence = seq + len;
- if( info.tcp->is(tcp_header::SYN) )
+ if(info.tcp->is(tcp_header::SYN))
{
sequence++;
}
if(len > 0)
{
- reader.push(info); // write out the packet data
+ reader.push(info); // write out the packet data
}
return;
@@ -217,20 +216,19 @@ public:
// if we are here, we have already seen this src, let's
// try and figure out if this packet is in the right place
- if( LT_SEQ(seq, sequence) )
+ if(LT_SEQ(seq, sequence))
{
// this sequence number seems dated, but
// check the end to make sure it has no more
// info than we have already seen
- uint32_t newseq {seq + len};
- if( GT_SEQ(newseq, sequence) )
+ uint32_t newseq{seq + len};
+ if(GT_SEQ(newseq, sequence))
{
-
// this one has more than we have seen. let's get the
// payload that we have not seen
- uint32_t new_len {sequence - seq};
+ uint32_t new_len{sequence - seq};
- if ( info.dlen <= new_len )
+ if(info.dlen <= new_len)
{
info.data = nullptr;
info.dlen = 0;
@@ -249,21 +247,22 @@ public:
}
}
- if ( EQ_SEQ(seq, sequence) ) // right on time
+ if(EQ_SEQ(seq, sequence)) // right on time
{
sequence += len;
- if( info.tcp->is(tcp_header::SYN) ) sequence++;
+ if(info.tcp->is(tcp_header::SYN)) sequence++;
- if( info.data && info.dlen > 0)
+ if(info.data && info.dlen > 0)
{
reader.push(info);
}
// done with the packet, see if it caused a fragment to fit
- while( check_fragments(0) );
+ while(check_fragments(0))
+ ;
}
else // out of order packet
{
- if(info.dlen > 0 && GT_SEQ(seq, sequence) )
+ if(info.dlen > 0 && GT_SEQ(seq, sequence))
{
//TRACE("ADD FRAGMENT seq: %u dlen: %u sequence: %u", seq, info.dlen, sequence);
fragments = Packet::create(info, fragments);
@@ -273,37 +272,37 @@ public:
bool check_fragments(const uint32_t acknowledged)
{
- Packet* current {fragments};
- if( current )
+ Packet* current{fragments};
+ if(current)
{
- Packet* prev {nullptr};
- uint32_t lowest_seq {current->tcp->seq()};
- while( current )
+ Packet* prev{nullptr};
+ uint32_t lowest_seq{current->tcp->seq()};
+ while(current)
{
- const uint32_t current_seq {current->tcp->seq()};
- const uint32_t current_len {current->dlen};
+ const uint32_t current_seq{current->tcp->seq()};
+ const uint32_t current_len{current->dlen};
- if( GT_SEQ(lowest_seq, current_seq) ) // lowest_seq > current_seq
+ if(GT_SEQ(lowest_seq, current_seq)) // lowest_seq > current_seq
{
lowest_seq = current_seq;
}
- if( LT_SEQ(current_seq, sequence) ) // current_seq < sequence
+ if(LT_SEQ(current_seq, sequence)) // current_seq < sequence
{
// this sequence number seems dated, but
// check the end to make sure it has no more
// info than we have already seen
- uint32_t newseq {current_seq + current_len};
- if( GT_SEQ(newseq, sequence) )
+ uint32_t newseq{current_seq + current_len};
+ if(GT_SEQ(newseq, sequence))
{
// this one has more than we have seen. let's get the
// payload that we have not seen. This happens when
// part of this frame has been retransmitted
- uint32_t new_pos {sequence - current_seq};
+ uint32_t new_pos{sequence - current_seq};
sequence += (current_len - new_pos);
- if ( current->dlen > new_pos )
+ if(current->dlen > new_pos)
{
current->data += new_pos;
current->dlen -= new_pos;
@@ -312,9 +311,9 @@ public:
}
// Remove the fragment from the list as the "new" part of it
- // has been processed or its data has been seen already in
+ // has been processed or its data has been seen already in
// another packet.
- if( prev )
+ if(prev)
{
prev->next = current->next;
}
@@ -328,11 +327,11 @@ public:
return true;
}
- if( EQ_SEQ(current_seq, sequence) )
+ if(EQ_SEQ(current_seq, sequence))
{
// this fragment fits the stream
sequence += current_len;
- if( prev )
+ if(prev)
{
prev->next = current->next;
}
@@ -346,11 +345,11 @@ public:
return true;
}
- prev = current;
+ prev = current;
current = current->next;
- }// end while
+ } // end while
- if( GT_SEQ(acknowledged, lowest_seq) ) // acknowledged > lowest_seq
+ if(GT_SEQ(acknowledged, lowest_seq)) // acknowledged > lowest_seq
{
//TRACE("acknowledged(%u) > lowest_seq(%u) seq:%u", acknowledged, lowest_seq, sequence);
// There are frames missing in the capture stream that were seen
@@ -365,9 +364,9 @@ public:
}
private:
- StreamReader reader; // reader of acknowledged data stream
- Packet* fragments; // list of not yet acked fragments
- uint32_t sequence;
+ StreamReader reader; // reader of acknowledged data stream
+ Packet* fragments; // list of not yet acked fragments
+ uint32_t sequence;
};
template <typename Writer>
@@ -376,16 +375,17 @@ public:
flows[0].reader.set_writer(this, w, max_rpc_hdr);
flows[1].reader.set_writer(this, w, max_rpc_hdr);
}
- TCPSession(TCPSession&&) = delete;
- TCPSession(const TCPSession&) = delete;
+ TCPSession(TCPSession&&) = delete;
+ TCPSession(const TCPSession&) = delete;
TCPSession& operator=(const TCPSession&) = delete;
void collect(PacketInfo& info)
{
- const uint32_t ack {info.tcp->ack()};
+ const uint32_t ack{info.tcp->ack()};
//check whether this frame acks fragments that were already seen.
- while( flows[1-info.direction].check_fragments(ack) );
+ while(flows[1 - info.direction].check_fragments(ack))
+ ;
flows[info.direction].reassemble(info);
}
@@ -393,24 +393,21 @@ public:
Flow flows[2];
};
-template
-<
+template <
typename Reader,
typename Writer,
- typename Filtrator
->
+ typename Filtrator>
class FiltrationProcessor
{
public:
-
explicit FiltrationProcessor(std::unique_ptr<Reader>& r,
std::unique_ptr<Writer>& w)
- : reader{std::move(r)}
- , writer{std::move(w)}
- , ipv4_tcp_sessions{writer.get()}
- , ipv4_udp_sessions{writer.get()}
- , ipv6_tcp_sessions{writer.get()}
- , ipv6_udp_sessions{writer.get()}
+ : reader{std::move(r)}
+ , writer{std::move(w)}
+ , ipv4_tcp_sessions{writer.get()}
+ , ipv4_udp_sessions{writer.get()}
+ , ipv6_tcp_sessions{writer.get()}
+ , ipv6_udp_sessions{writer.get()}
{
// check datalink layer
datalink = reader->datalink();
@@ -427,7 +424,7 @@ public:
void run()
{
- bool done {reader->loop(this, callback)};
+ bool done{reader->loop(this, callback)};
if(done)
{
throw controller::ProcessingDone("Filtration is done");
@@ -439,9 +436,9 @@ public:
reader->break_loop();
}
- static void callback(u_char *user, const struct pcap_pkthdr *pkthdr, const u_char* packet)
+ static void callback(u_char* user, const struct pcap_pkthdr* pkthdr, const u_char* packet)
{
- PROF;// Calc how much time was spent in this func
+ PROF; // Calc how much time was spent in this func
auto processor = reinterpret_cast<FiltrationProcessor*>(user);
PacketInfo info(pkthdr, packet, processor->datalink);
@@ -450,46 +447,47 @@ public:
{
if(pkthdr->caplen != pkthdr->len)
{
- LOGONCE("pcap packet was truncated by snaplen option this "
- "packed won't correclty reassembled to TCP stream");
+ LOGONCE(
+ "pcap packet was truncated by snaplen option this "
+ "packed won't correclty reassembled to TCP stream");
return;
}
- if(info.ipv4) // Ethernet:IPv4:TCP
+ if(info.ipv4) // Ethernet:IPv4:TCP
{
return processor->ipv4_tcp_sessions.collect_packet(info);
}
- else if(info.ipv6) // Ethernet:IPv6:TCP
+ else if(info.ipv6) // Ethernet:IPv6:TCP
{
return processor->ipv6_tcp_sessions.collect_packet(info);
}
}
else if(info.udp)
{
- if(info.ipv4) // Ethernet:IPv4:UDP
+ if(info.ipv4) // Ethernet:IPv4:UDP
{
return processor->ipv4_udp_sessions.collect_packet(info);
}
- else if(info.ipv6) // Ethernet:IPv6:UDP
+ else if(info.ipv6) // Ethernet:IPv6:UDP
{
return processor->ipv6_udp_sessions.collect_packet(info);
}
}
- LOGONCE("only following stack of protocol is supported: "
- "Ethernet II:IPv4|IPv6(except additional fragments):TCP|UDP");
+ LOGONCE(
+ "only following stack of protocol is supported: "
+ "Ethernet II:IPv4|IPv6(except additional fragments):TCP|UDP");
}
private:
-
std::unique_ptr<Reader> reader;
std::unique_ptr<Writer> writer;
- SessionsHash< IPv4TCPMapper, TCPSession <Filtrator> , Writer > ipv4_tcp_sessions;
- SessionsHash< IPv4UDPMapper, UDPSession < Writer > , Writer > ipv4_udp_sessions;
+ SessionsHash<IPv4TCPMapper, TCPSession<Filtrator>, Writer> ipv4_tcp_sessions;
+ SessionsHash<IPv4UDPMapper, UDPSession<Writer>, Writer> ipv4_udp_sessions;
- SessionsHash< IPv6TCPMapper, TCPSession < Filtrator> , Writer > ipv6_tcp_sessions;
- SessionsHash< IPv6UDPMapper, UDPSession < Writer > , Writer > ipv6_udp_sessions;
+ SessionsHash<IPv6TCPMapper, TCPSession<Filtrator>, Writer> ipv6_tcp_sessions;
+ SessionsHash<IPv6UDPMapper, UDPSession<Writer>, Writer> ipv6_udp_sessions;
int datalink;
};
@@ -497,5 +495,5 @@ private:
} // namespace filtration
} // namespace NST
//------------------------------------------------------------------------------
-#endif//FILTRATION_PROCESSOR_H
+#endif // FILTRATION_PROCESSOR_H
//------------------------------------------------------------------------------