#pragma once #include #include #include "ITCPServer.h" #include using namespace std; //#define TCPIOCP_MANAGER _T("TcpServer监视") #define UPDATE_ONE_DATA WM_USER + 1001 //#define ADD_TCPIOCP WM_USER + 1002 //#define ADD_CONNECT_TCPTOCP WM_USER + 1003 //#define UPDATE_ONE_DATA_JDSP_RECV WM_USER + 1004 //#define UPDATE_ONE_DATA_JDSP_SEND WM_USER + 1005 class CIOCP { public: //CIOCP构造函数 CIOCP(int nMaxConcurrent = -1) { m_hIOCP = NULL; if (nMaxConcurrent != -1) { Create(nMaxConcurrent); } } //CIOCP析构函数 ~CIOCP() { if (m_hIOCP != NULL) { CloseHandle(m_hIOCP); } } //关闭IOCP BOOL Close() { BOOL bResult = CloseHandle(m_hIOCP); m_hIOCP = NULL; return bResult; } //创建IOCP, nMaxConcurrency指定最大线程并发数量, 0 默认为CPU核数 bool Create(int nMaxConcurrency = 0) { m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nMaxConcurrency); //ASSERT(m_hIOCP != NULL); return (m_hIOCP != NULL); } //为设备(文件,socket,邮件槽,管道等)关联一个IOCP bool AssociateDevice(HANDLE hDevice, ULONG_PTR Compkey) { bool fOk = (CreateIoCompletionPort(hDevice, m_hIOCP, Compkey, 0) == m_hIOCP); return fOk; } //为Socket关联一个IOCP bool AssociateSocket(SOCKET hSocket, ULONG_PTR CompKey) { return AssociateDevice((HANDLE)hSocket, CompKey); } //为IOCP传递事件通知 bool PostStatus(ULONG_PTR CompKey, DWORD dwNumBytes = 0, OVERLAPPED* po = NULL) { bool fOk = PostQueuedCompletionStatus(m_hIOCP, dwNumBytes, CompKey, po); return fOk; } //从IO完成端口队列中获取事件通知, IO完成队列中没有事件时,该函数阻塞 bool GetStatus(ULONG_PTR* pCompKey, PDWORD pdwNumBytes, OVERLAPPED** ppo, DWORD dwMillseconds = INFINITE) { return GetQueuedCompletionStatus(m_hIOCP, pdwNumBytes, pCompKey, ppo, dwMillseconds); } //获取IOCP对象 const HANDLE GetIOCP() { return m_hIOCP; } private: HANDLE m_hIOCP= INVALID_HANDLE_VALUE; //IOCP句柄 }; /* 在IOCP编程模型中,需要用到GetQueuedCompletionStatus()函数获取已完成的事件 但是该函数的返回参数无 Socket或者buffer的描述信息 一个简单的方法是:创建一个新的结构,该结构第一个参数是OVERLAPPED 由于AcceptEx, WSASend 等重叠IO操作传入的是Overlapped结构体的地址,调用AcceptEx等重叠IO操作,在Overlapped结构体后面开辟新的空间 写入socket或者buffer的信息,即可将socket或者buffer的信息由GetQueuedComletionStatus带回 */ #define MAXBUF 8*1024 enum IOOperType { TYPE_ACP, //accept事件到达,有新的连接请求 TYPE_RECV, //数据接收事件 TYPE_SEND, //数据发送事件 TYPE_CLOSE_SOCK, //服务端主动关闭socket TYPE_CLOSE, //关闭事件 TYPE_NO_OPER }; class COverlappedIOInfo : public OVERLAPPED { public: COverlappedIOInfo(void) { m_socket = INVALID_SOCKET; ResetOverlapped(); ResetRecvBuffer(); ResetSendBuffer(); } ~COverlappedIOInfo(void) { if (m_socket != INVALID_SOCKET) { closesocket(m_socket); m_socket = INVALID_SOCKET; } } void ResetOverlapped() { Internal = InternalHigh = 0; Offset = OffsetHigh = 0; hEvent = NULL; } void ResetRecvBuffer() { ZeroMemory(m_crecvBuf, MAXBUF); m_recvBuf.buf = m_crecvBuf; m_recvBuf.len = MAXBUF; } void ResetSendBuffer() { ZeroMemory(m_csendBuf, MAXBUF); m_sendBuf.buf = m_csendBuf; m_sendBuf.len = MAXBUF; } public: SOCKET m_socket; //套接字 //接收缓冲区,用于AcceptEx, WSARecv操作 WSABUF m_recvBuf; char m_crecvBuf[MAXBUF]; //发送缓冲区, 用于WSASend操作 WSABUF m_sendBuf; char m_csendBuf[MAXBUF]; //对端地址 BOOL bIPv6addr=FALSE; sockaddr_in m_addr; SOCKADDR_IN6 m_addr_IPv6; ClientInfo m_cltInfo; }; //目的:防止断开连接后,其他地方还保留有连接信息。保留15分钟 struct CConnectCache { std::shared_ptr pConnInfo; //连接信息 DWORD dwDisConnect; //断开时间 }; #define HISTORY_CONN_STATIC_COUNT 500 //#define IN_WINDOWS typedef enum connDataWay { CONN_DATA_SEND = 0, CONN_DATA_RECV }ConnDataWay; typedef struct connDataNode { SYSTEMTIME time; ConnDataWay way; // 0 send. 1 recv. char* data; int iDataLen; connDataNode() { memset(&time, 0, sizeof(SYSTEMTIME)); data = NULL; iDataLen = 0; } ~connDataNode() { if (data) delete[]data; } }ConnDataNode; typedef struct connHistoryData { ConnDataNode dataBuffer[HISTORY_CONN_STATIC_COUNT]; HANDLE m_mutex; int startIndex; int iLastIndex; int endIndex; bool bIsFull; connHistoryData() { startIndex = endIndex = iLastIndex = 0; bIsFull = false; m_mutex = CreateMutex(NULL, FALSE, NULL); } ~connHistoryData() { if (m_mutex) { CloseHandle(m_mutex); m_mutex = NULL; } } bool isFull() { return bIsFull; } bool Lock() { if (m_mutex) { WaitForSingleObject(m_mutex, INFINITE); return true; } return false; } void Unlock() { if (m_mutex) ReleaseMutex(m_mutex); } void AddData(char* buf, int iLen, ConnDataWay way) { if (!Lock()) return; ::GetLocalTime(&dataBuffer[endIndex].time); dataBuffer[endIndex].way = way; if (dataBuffer[endIndex].data) delete[]dataBuffer[endIndex].data; dataBuffer[endIndex].data = new char[iLen]; if (!dataBuffer[endIndex].data) return; memcpy(dataBuffer[endIndex].data, buf, iLen); dataBuffer[endIndex].iDataLen = iLen; iLastIndex = endIndex; endIndex = (endIndex + 1) % HISTORY_CONN_STATIC_COUNT; if (bIsFull) startIndex = endIndex; else if (endIndex == startIndex) bIsFull = true; Unlock(); } }ConnHistoryData; class TCPServerIOCP : public ITCPServer { public: bool StartServer(ITcpServerCallBack* pUser, int iPort, CString strLocalIP = _T("")); bool StartServer_IPv6(ITcpServerCallBack* pUser, int iPort, CString strLocalIP = _T("")); void StopServer(); BOOL SendData(const char* pData, int iLen, ClientInfo* pCltInfo); BOOL SendData(const char* pData, int iLen, CString strIP); BOOL SendData(const char* pData, int iLen, CString strIP, int port, SOCKET& sock); BOOL SendData(const char* pData, int iLen, SOCKET& sock); BOOL SendData(const char* pData, int iLen); ITcpServerCallBack* m_pCallBackUser = nullptr; ITcpServerCallBack* m_pCallBackUser_IPv6 = nullptr; BOOL m_bStarted; BOOL m_bStarted_IPv6; void Log(char* sz); void (*pLog)(char*);//日志接口 std::function m_callBackTcpLog; //获取IP列表 virtual void GetIPList(std::vector &vec); virtual void GetConnectInfo(std::vector &vec); virtual void GetHistoryConnectInfo(std::map &mapHis) { mapHis.insert(m_mapConnHistory.begin(), m_mapConnHistory.end()); } virtual ConnHistoryInfo *GetHistoryConnectInfo(const CString &strIP) { auto pFind = m_mapConnHistory.find(strIP); if (pFind != m_mapConnHistory.end()) { return pFind->second; } return NULL; } private: //新连接绑定历史记录 void BindHistoryData(COverlappedIOInfo* info, bool bRecv = false); public: //构造函数 TCPServerIOCP(); //虚构函数 ~TCPServerIOCP(); //开始监听 bool StartListen(unsigned short port, string ip); bool StartListen_IPv6(unsigned short port, string ip); int CalcTimePassSecond(SYSTEMTIME* stLast, SYSTEMTIME* stNow = NULL); void ConnectionMaintain(); /* 释放3个部分步骤: 1. 清空IOCP线程队列,退出线程 2. 清空等待accept套接字m_vecAcps 3. 清空已连接套接字m_vecContInfo并清空缓存 */ void CloseServer(); //私有成员函数 //启动CPU*2个线程,返回已启动的线程个数 int StartWorkThreadPool(); int StartWorkThreadPool_IPv6(); //处理accept请求,NumberOfBytes=0表示没有收到第一帧数据, >0时表示收到了第一帧数据 bool DoAccept(SOCKET sockAccept, SOCKADDR_IN* ClientAddr); bool DoAccept_IPv6(SOCKET sockAccept, SOCKADDR_IN6* ClientAddr); //投递recv请求 bool PostRecv(COverlappedIOInfo* info); //处理recv请求 bool DoRecv(COverlappedIOInfo* info); //从已连接的socket列表中移除socket及释放空间 bool DeleteLink(SOCKET s, char* strError); BOOL IsIPOnline(CString strIP); int StaticConnData(CString strIp, const char* data, int iLen, ConnDataWay way); virtual void ClearConnHistoryInfo(); //winsock版本类型 WSAData m_wsaData; //临时加一个黑名单地址 scz ULONG m_blackaddr = 0; //端口监听套接字 SOCKET m_sListen; SOCKET m_sListen_IPv6; //已建立连接的信息,每个结构含有一个套接字、发送缓冲区、接收缓冲区、对端地址 vector m_vecContInfo; //缓冲连接信息 std::list m_listContInfo; CCriticalSection m_csCacheListLock; //操作vector的互斥访问锁 CCriticalSection m_csClientVectorLock; //CIOCP封装类 CIOCP m_iocp; CIOCP m_iocp_IPv6; //连接历史信息和状态信息 std::map m_mapConnHistory;//暂时弃用,后续启用时记得加锁防止多线程并发访问。多线程操作会出现map有两个相同的key static void ListenThread_IOCPServer(LPVOID lpParam); static void ListenThread_IOCPServer_IPv6(LPVOID lpParam); CString m_strServerIP; int m_iServerPort; DWORD m_lastError; CString m_strServerIP_IPv6; int m_iServerPort_IPv6; };