summaryrefslogtreecommitdiff
path: root/analyzers/src/json/abstract_tcp_service.h
blob: 8ef0e0064a34f400a706b1aa2b2ba70fb3e1831a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//------------------------------------------------------------------------------
// Author: Ilya Storozhilov
// Description: Abstract TCP-service class declaration
// Copyright (c) 2013-2014 EPAM Systems
//------------------------------------------------------------------------------
/*
    This file is part of Nfstrace.

    Nfstrace is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, version 2 of the License.

    Nfstrace is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with Nfstrace.  If not, see <http://www.gnu.org/licenses/>.
*/
//------------------------------------------------------------------------------
#ifndef ABSTRACT_TCP_SERVICE_H
#define ABSTRACT_TCP_SERVICE_H
//------------------------------------------------------------------------------
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

#include "ip_endpoint.h"
//------------------------------------------------------------------------------
//! TCP-service
/*!
 * Abstract task for TCP-service
 */
class AbstractTcpService
{
public:
    static constexpr int DefaultBacklog = 15;

    AbstractTcpService() = delete;
    //! Constructs TCP-service
    /*!
     * \param workersAmount Amount of workers in thread-pool
     * \param port Port to bind to
     * \param host Hostname/IP-address to listen
     * \param backlog Listen backlog - see listen(2)
     */
    AbstractTcpService(std::size_t workersAmount, int port, const std::string& host = IpEndpoint::WildcardAddress,
                       int backlog = DefaultBacklog);
    //! Destructs stopped TCP-service
    /*!
     * \note Destruction of non-stopped TCP-service causes undefined behaviour
     */
    virtual ~AbstractTcpService();

    //! Returns TRUE if service is in running state
    inline bool isRunning() const
    {
        return _isRunning.load();
    }
    //! Fills 'struct timespec' value using clock timeout
    inline static void fillDuration(struct timespec& ts)
    {
        ts.tv_sec = ClockTimeoutMs / 1000;
        ts.tv_nsec = ClockTimeoutMs % 1000 * 1000000;
    }

    //! Starts TCP-service
    virtual void start();
    //! Stops TCP-service
    virtual void stop();
protected:
    //! Asbtract TCP-service task
    class AbstractTask
    {
    public:
        //! Constructs TCP-service task
        /*!
         * \param socket Socket for I/O
         */
        AbstractTask(int socket);
        AbstractTask() = delete;
        //! Destructs TCP-service task and closes I/O socket
        virtual ~AbstractTask();

        //! Returns a socket for I/O
        inline int socket() const
        {
            return _socket;
        }

        //! Task execution pure virtual method to override
        virtual void execute() = 0;
    private:
        int _socket;
    };

    virtual AbstractTask* createTask(int socket) = 0;
private:
    using ThreadPool = std::vector<std::thread>;

    static constexpr int ClockTimeoutMs = 100;
    static constexpr std::size_t ReadBufferSize = 1024;
    static constexpr std::size_t WriteBufferSize = 4096;
    static constexpr std::size_t HeaderPartSize = 1024;
    static constexpr int MaxTasksQueueSize = 128;

    void runWorker();
    void runListener();

    const int _port;
    const std::string _host;
    const int _backlog;
    std::atomic_bool _isRunning;
    ThreadPool _threadPool;
    std::thread _listenerThread;
    int _serverSocket;
    std::queue<AbstractTask*> _tasksQueue;
    std::mutex _tasksQueueMutex;
    std::condition_variable _tasksQueueCond;
};
//------------------------------------------------------------------------------
#endif//ABSTRACT_TCP_SERVICE_H
//------------------------------------------------------------------------------