| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- #pragma once
- #include <map>
- #include <list>
- #include "ITCPServer.h"
- #include <list>
- using namespace std;
- //#define TCPIOCP_MANAGER _T("TcpServer监视")
- #define UPDATE_ONE_DATA WM_USER + 1001
- //#define ADD_TCPIOCP WM_USER + 1002
- //#define ADD_CONNECT_TCPTOCP WM_USER + 1003
- //#define UPDATE_ONE_DATA_JDSP_RECV WM_USER + 1004
- //#define UPDATE_ONE_DATA_JDSP_SEND WM_USER + 1005
- class CIOCP
- {
- public:
- //CIOCP构造函数
- CIOCP(int nMaxConcurrent = -1)
- {
- m_hIOCP = NULL;
- if (nMaxConcurrent != -1)
- {
- Create(nMaxConcurrent);
- }
- }
- //CIOCP析构函数
- ~CIOCP()
- {
- if (m_hIOCP != NULL)
- {
- CloseHandle(m_hIOCP);
- }
- }
- //关闭IOCP
- BOOL Close()
- {
- BOOL bResult = CloseHandle(m_hIOCP);
- m_hIOCP = NULL;
- return bResult;
- }
- //创建IOCP, nMaxConcurrency指定最大线程并发数量, 0 默认为CPU核数
- bool Create(int nMaxConcurrency = 0)
- {
- m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nMaxConcurrency);
- //ASSERT(m_hIOCP != NULL);
- return (m_hIOCP != NULL);
- }
- //为设备(文件,socket,邮件槽,管道等)关联一个IOCP
- bool AssociateDevice(HANDLE hDevice, ULONG_PTR Compkey)
- {
- bool fOk = (CreateIoCompletionPort(hDevice, m_hIOCP, Compkey, 0) == m_hIOCP);
- return fOk;
- }
- //为Socket关联一个IOCP
- bool AssociateSocket(SOCKET hSocket, ULONG_PTR CompKey)
- {
- return AssociateDevice((HANDLE)hSocket, CompKey);
- }
- //为IOCP传递事件通知
- bool PostStatus(ULONG_PTR CompKey, DWORD dwNumBytes = 0, OVERLAPPED* po = NULL)
- {
- bool fOk = PostQueuedCompletionStatus(m_hIOCP, dwNumBytes, CompKey, po);
- return fOk;
- }
- //从IO完成端口队列中获取事件通知, IO完成队列中没有事件时,该函数阻塞
- bool GetStatus(ULONG_PTR* pCompKey, PDWORD pdwNumBytes, OVERLAPPED** ppo, DWORD dwMillseconds = INFINITE)
- {
- return GetQueuedCompletionStatus(m_hIOCP, pdwNumBytes, pCompKey, ppo, dwMillseconds);
- }
- //获取IOCP对象
- const HANDLE GetIOCP()
- {
- return m_hIOCP;
- }
- private:
- HANDLE m_hIOCP= INVALID_HANDLE_VALUE; //IOCP句柄
- };
- /*
- 在IOCP编程模型中,需要用到GetQueuedCompletionStatus()函数获取已完成的事件
- 但是该函数的返回参数无 Socket或者buffer的描述信息
- 一个简单的方法是:创建一个新的结构,该结构第一个参数是OVERLAPPED
- 由于AcceptEx, WSASend 等重叠IO操作传入的是Overlapped结构体的地址,调用AcceptEx等重叠IO操作,在Overlapped结构体后面开辟新的空间
- 写入socket或者buffer的信息,即可将socket或者buffer的信息由GetQueuedComletionStatus带回
- */
- #define MAXBUF 8*1024
- enum IOOperType
- {
- TYPE_ACP, //accept事件到达,有新的连接请求
- TYPE_RECV, //数据接收事件
- TYPE_SEND, //数据发送事件
- TYPE_CLOSE_SOCK, //服务端主动关闭socket
- TYPE_CLOSE, //关闭事件
- TYPE_NO_OPER
- };
- class COverlappedIOInfo : public OVERLAPPED
- {
- public:
- COverlappedIOInfo(void)
- {
- m_socket = INVALID_SOCKET;
- ResetOverlapped();
- ResetRecvBuffer();
- ResetSendBuffer();
- }
- ~COverlappedIOInfo(void)
- {
- if (m_socket != INVALID_SOCKET)
- {
- closesocket(m_socket);
- m_socket = INVALID_SOCKET;
- }
- }
- void ResetOverlapped()
- {
- Internal = InternalHigh = 0;
- Offset = OffsetHigh = 0;
- hEvent = NULL;
- }
- void ResetRecvBuffer()
- {
- ZeroMemory(m_crecvBuf, MAXBUF);
- m_recvBuf.buf = m_crecvBuf;
- m_recvBuf.len = MAXBUF;
- }
- void ResetSendBuffer()
- {
- ZeroMemory(m_csendBuf, MAXBUF);
- m_sendBuf.buf = m_csendBuf;
- m_sendBuf.len = MAXBUF;
- }
- public:
- SOCKET m_socket; //套接字
- //接收缓冲区,用于AcceptEx, WSARecv操作
- WSABUF m_recvBuf;
- char m_crecvBuf[MAXBUF];
- //发送缓冲区, 用于WSASend操作
- WSABUF m_sendBuf;
- char m_csendBuf[MAXBUF];
- //对端地址
- BOOL bIPv6addr=FALSE;
- sockaddr_in m_addr;
- SOCKADDR_IN6 m_addr_IPv6;
- ClientInfo m_cltInfo;
- };
- //目的:防止断开连接后,其他地方还保留有连接信息。保留15分钟
- struct CConnectCache
- {
- std::shared_ptr<COverlappedIOInfo> pConnInfo; //连接信息
- DWORD dwDisConnect; //断开时间
- };
- #define HISTORY_CONN_STATIC_COUNT 500
- //#define IN_WINDOWS
- typedef enum connDataWay {
- CONN_DATA_SEND = 0,
- CONN_DATA_RECV
- }ConnDataWay;
- typedef struct connDataNode {
- SYSTEMTIME time;
- ConnDataWay way; // 0 send. 1 recv.
- char* data;
- int iDataLen;
- connDataNode() {
- memset(&time, 0, sizeof(SYSTEMTIME));
- data = NULL;
- iDataLen = 0;
- }
- ~connDataNode() {
- if (data) delete[]data;
- }
- }ConnDataNode;
- typedef struct connHistoryData {
- ConnDataNode dataBuffer[HISTORY_CONN_STATIC_COUNT];
- HANDLE m_mutex;
- int startIndex;
- int iLastIndex;
- int endIndex;
- bool bIsFull;
- connHistoryData() {
- startIndex = endIndex = iLastIndex = 0;
- bIsFull = false;
- m_mutex = CreateMutex(NULL, FALSE, NULL);
- }
- ~connHistoryData() {
- if (m_mutex) {
- CloseHandle(m_mutex);
- m_mutex = NULL;
- }
- }
- bool isFull() {
- return bIsFull;
- }
- bool Lock() {
- if (m_mutex) {
- WaitForSingleObject(m_mutex, INFINITE);
- return true;
- }
- return false;
- }
- void Unlock() {
- if (m_mutex) ReleaseMutex(m_mutex);
- }
- void AddData(char* buf, int iLen, ConnDataWay way) {
- if (!Lock()) return;
- ::GetLocalTime(&dataBuffer[endIndex].time);
- dataBuffer[endIndex].way = way;
- if (dataBuffer[endIndex].data) delete[]dataBuffer[endIndex].data;
- dataBuffer[endIndex].data = new char[iLen];
- if (!dataBuffer[endIndex].data) return;
- memcpy(dataBuffer[endIndex].data, buf, iLen);
- dataBuffer[endIndex].iDataLen = iLen;
- iLastIndex = endIndex;
- endIndex = (endIndex + 1) % HISTORY_CONN_STATIC_COUNT;
- if (bIsFull) startIndex = endIndex;
- else if (endIndex == startIndex) bIsFull = true;
- Unlock();
- }
- }ConnHistoryData;
- class TCPServerIOCP : public ITCPServer {
- public:
- bool StartServer(ITcpServerCallBack* pUser, int iPort, CString strLocalIP = _T(""));
- bool StartServer_IPv6(ITcpServerCallBack* pUser, int iPort, CString strLocalIP = _T(""));
- void StopServer();
- BOOL SendData(const char* pData, int iLen, ClientInfo* pCltInfo);
- BOOL SendData(const char* pData, int iLen, CString strIP);
- BOOL SendData(const char* pData, int iLen, CString strIP, int port, SOCKET& sock);
- BOOL SendData(const char* pData, int iLen, SOCKET& sock);
- BOOL SendData(const char* pData, int iLen);
- ITcpServerCallBack* m_pCallBackUser = nullptr;
- ITcpServerCallBack* m_pCallBackUser_IPv6 = nullptr;
- BOOL m_bStarted;
- BOOL m_bStarted_IPv6;
- void Log(char* sz);
- void (*pLog)(char*);//日志接口
- std::function<void(int, CString, int, const void*, int, int, int)> m_callBackTcpLog;
- //获取IP列表
- virtual void GetIPList(std::vector<CString> &vec);
- virtual void GetConnectInfo(std::vector<ClientInfo *> &vec);
- virtual void GetHistoryConnectInfo(std::map<CString, ConnHistoryInfo *> &mapHis)
- {
- mapHis.insert(m_mapConnHistory.begin(), m_mapConnHistory.end());
- }
- virtual ConnHistoryInfo *GetHistoryConnectInfo(const CString &strIP)
- {
- auto pFind = m_mapConnHistory.find(strIP);
- if (pFind != m_mapConnHistory.end()) {
- return pFind->second;
- }
- return NULL;
- }
- private:
- //新连接绑定历史记录
- void BindHistoryData(COverlappedIOInfo* info, bool bRecv = false);
- public:
- //构造函数
- TCPServerIOCP();
- //虚构函数
- ~TCPServerIOCP();
- //开始监听
- bool StartListen(unsigned short port, string ip);
- bool StartListen_IPv6(unsigned short port, string ip);
- int CalcTimePassSecond(SYSTEMTIME* stLast, SYSTEMTIME* stNow = NULL);
- void ConnectionMaintain();
- /*
- 释放3个部分步骤:
- 1. 清空IOCP线程队列,退出线程
- 2. 清空等待accept套接字m_vecAcps
- 3. 清空已连接套接字m_vecContInfo并清空缓存
- */
- void CloseServer();
- //私有成员函数
- //启动CPU*2个线程,返回已启动的线程个数
- int StartWorkThreadPool();
- int StartWorkThreadPool_IPv6();
- //处理accept请求,NumberOfBytes=0表示没有收到第一帧数据, >0时表示收到了第一帧数据
- bool DoAccept(SOCKET sockAccept, SOCKADDR_IN* ClientAddr);
- bool DoAccept_IPv6(SOCKET sockAccept, SOCKADDR_IN6* ClientAddr);
- //投递recv请求
- bool PostRecv(COverlappedIOInfo* info);
- //处理recv请求
- bool DoRecv(COverlappedIOInfo* info);
- //从已连接的socket列表中移除socket及释放空间
- bool DeleteLink(SOCKET s, char* strError);
- BOOL IsIPOnline(CString strIP);
- int StaticConnData(CString strIp, const char* data, int iLen, ConnDataWay way);
- virtual void ClearConnHistoryInfo();
- //winsock版本类型
- WSAData m_wsaData;
- //临时加一个黑名单地址 scz
- ULONG m_blackaddr = 0;
- //端口监听套接字
- SOCKET m_sListen;
- SOCKET m_sListen_IPv6;
- //已建立连接的信息,每个结构含有一个套接字、发送缓冲区、接收缓冲区、对端地址
- vector<COverlappedIOInfo*> m_vecContInfo;
- //缓冲连接信息
- std::list<CConnectCache> m_listContInfo;
- CCriticalSection m_csCacheListLock;
- //操作vector的互斥访问锁
- CCriticalSection m_csClientVectorLock;
- //CIOCP封装类
- CIOCP m_iocp;
- CIOCP m_iocp_IPv6;
- //连接历史信息和状态信息
- std::map<CString, ConnHistoryInfo*> m_mapConnHistory;//暂时弃用,后续启用时记得加锁防止多线程并发访问。多线程操作会出现map有两个相同的key
- static void ListenThread_IOCPServer(LPVOID lpParam);
- static void ListenThread_IOCPServer_IPv6(LPVOID lpParam);
- CString m_strServerIP;
- int m_iServerPort;
- DWORD m_lastError;
- CString m_strServerIP_IPv6;
- int m_iServerPort_IPv6;
- };
|