TCPServerIOCP.h 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. #pragma once
  2. #include <map>
  3. #include <list>
  4. #include "ITCPServer.h"
  5. #include <list>
  6. using namespace std;
  7. //#define TCPIOCP_MANAGER _T("TcpServer监视")
  8. #define UPDATE_ONE_DATA WM_USER + 1001
  9. //#define ADD_TCPIOCP WM_USER + 1002
  10. //#define ADD_CONNECT_TCPTOCP WM_USER + 1003
  11. //#define UPDATE_ONE_DATA_JDSP_RECV WM_USER + 1004
  12. //#define UPDATE_ONE_DATA_JDSP_SEND WM_USER + 1005
  13. class CIOCP
  14. {
  15. public:
  16. //CIOCP构造函数
  17. CIOCP(int nMaxConcurrent = -1)
  18. {
  19. m_hIOCP = NULL;
  20. if (nMaxConcurrent != -1)
  21. {
  22. Create(nMaxConcurrent);
  23. }
  24. }
  25. //CIOCP析构函数
  26. ~CIOCP()
  27. {
  28. if (m_hIOCP != NULL)
  29. {
  30. CloseHandle(m_hIOCP);
  31. }
  32. }
  33. //关闭IOCP
  34. BOOL Close()
  35. {
  36. BOOL bResult = CloseHandle(m_hIOCP);
  37. m_hIOCP = NULL;
  38. return bResult;
  39. }
  40. //创建IOCP, nMaxConcurrency指定最大线程并发数量, 0 默认为CPU核数
  41. bool Create(int nMaxConcurrency = 0)
  42. {
  43. m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nMaxConcurrency);
  44. //ASSERT(m_hIOCP != NULL);
  45. return (m_hIOCP != NULL);
  46. }
  47. //为设备(文件,socket,邮件槽,管道等)关联一个IOCP
  48. bool AssociateDevice(HANDLE hDevice, ULONG_PTR Compkey)
  49. {
  50. bool fOk = (CreateIoCompletionPort(hDevice, m_hIOCP, Compkey, 0) == m_hIOCP);
  51. return fOk;
  52. }
  53. //为Socket关联一个IOCP
  54. bool AssociateSocket(SOCKET hSocket, ULONG_PTR CompKey)
  55. {
  56. return AssociateDevice((HANDLE)hSocket, CompKey);
  57. }
  58. //为IOCP传递事件通知
  59. bool PostStatus(ULONG_PTR CompKey, DWORD dwNumBytes = 0, OVERLAPPED* po = NULL)
  60. {
  61. bool fOk = PostQueuedCompletionStatus(m_hIOCP, dwNumBytes, CompKey, po);
  62. return fOk;
  63. }
  64. //从IO完成端口队列中获取事件通知, IO完成队列中没有事件时,该函数阻塞
  65. bool GetStatus(ULONG_PTR* pCompKey, PDWORD pdwNumBytes, OVERLAPPED** ppo, DWORD dwMillseconds = INFINITE)
  66. {
  67. return GetQueuedCompletionStatus(m_hIOCP, pdwNumBytes, pCompKey, ppo, dwMillseconds);
  68. }
  69. //获取IOCP对象
  70. const HANDLE GetIOCP()
  71. {
  72. return m_hIOCP;
  73. }
  74. private:
  75. HANDLE m_hIOCP= INVALID_HANDLE_VALUE; //IOCP句柄
  76. };
  77. /*
  78. 在IOCP编程模型中,需要用到GetQueuedCompletionStatus()函数获取已完成的事件
  79. 但是该函数的返回参数无 Socket或者buffer的描述信息
  80. 一个简单的方法是:创建一个新的结构,该结构第一个参数是OVERLAPPED
  81. 由于AcceptEx, WSASend 等重叠IO操作传入的是Overlapped结构体的地址,调用AcceptEx等重叠IO操作,在Overlapped结构体后面开辟新的空间
  82. 写入socket或者buffer的信息,即可将socket或者buffer的信息由GetQueuedComletionStatus带回
  83. */
  84. #define MAXBUF 8*1024
  85. enum IOOperType
  86. {
  87. TYPE_ACP, //accept事件到达,有新的连接请求
  88. TYPE_RECV, //数据接收事件
  89. TYPE_SEND, //数据发送事件
  90. TYPE_CLOSE_SOCK, //服务端主动关闭socket
  91. TYPE_CLOSE, //关闭事件
  92. TYPE_NO_OPER
  93. };
  94. class COverlappedIOInfo : public OVERLAPPED
  95. {
  96. public:
  97. COverlappedIOInfo(void)
  98. {
  99. m_socket = INVALID_SOCKET;
  100. ResetOverlapped();
  101. ResetRecvBuffer();
  102. ResetSendBuffer();
  103. }
  104. ~COverlappedIOInfo(void)
  105. {
  106. if (m_socket != INVALID_SOCKET)
  107. {
  108. closesocket(m_socket);
  109. m_socket = INVALID_SOCKET;
  110. }
  111. }
  112. void ResetOverlapped()
  113. {
  114. Internal = InternalHigh = 0;
  115. Offset = OffsetHigh = 0;
  116. hEvent = NULL;
  117. }
  118. void ResetRecvBuffer()
  119. {
  120. ZeroMemory(m_crecvBuf, MAXBUF);
  121. m_recvBuf.buf = m_crecvBuf;
  122. m_recvBuf.len = MAXBUF;
  123. }
  124. void ResetSendBuffer()
  125. {
  126. ZeroMemory(m_csendBuf, MAXBUF);
  127. m_sendBuf.buf = m_csendBuf;
  128. m_sendBuf.len = MAXBUF;
  129. }
  130. public:
  131. SOCKET m_socket; //套接字
  132. //接收缓冲区,用于AcceptEx, WSARecv操作
  133. WSABUF m_recvBuf;
  134. char m_crecvBuf[MAXBUF];
  135. //发送缓冲区, 用于WSASend操作
  136. WSABUF m_sendBuf;
  137. char m_csendBuf[MAXBUF];
  138. //对端地址
  139. BOOL bIPv6addr=FALSE;
  140. sockaddr_in m_addr;
  141. SOCKADDR_IN6 m_addr_IPv6;
  142. ClientInfo m_cltInfo;
  143. };
  144. //目的:防止断开连接后,其他地方还保留有连接信息。保留15分钟
  145. struct CConnectCache
  146. {
  147. std::shared_ptr<COverlappedIOInfo> pConnInfo; //连接信息
  148. DWORD dwDisConnect; //断开时间
  149. };
  150. #define HISTORY_CONN_STATIC_COUNT 500
  151. //#define IN_WINDOWS
  152. typedef enum connDataWay {
  153. CONN_DATA_SEND = 0,
  154. CONN_DATA_RECV
  155. }ConnDataWay;
  156. typedef struct connDataNode {
  157. SYSTEMTIME time;
  158. ConnDataWay way; // 0 send. 1 recv.
  159. char* data;
  160. int iDataLen;
  161. connDataNode() {
  162. memset(&time, 0, sizeof(SYSTEMTIME));
  163. data = NULL;
  164. iDataLen = 0;
  165. }
  166. ~connDataNode() {
  167. if (data) delete[]data;
  168. }
  169. }ConnDataNode;
  170. typedef struct connHistoryData {
  171. ConnDataNode dataBuffer[HISTORY_CONN_STATIC_COUNT];
  172. HANDLE m_mutex;
  173. int startIndex;
  174. int iLastIndex;
  175. int endIndex;
  176. bool bIsFull;
  177. connHistoryData() {
  178. startIndex = endIndex = iLastIndex = 0;
  179. bIsFull = false;
  180. m_mutex = CreateMutex(NULL, FALSE, NULL);
  181. }
  182. ~connHistoryData() {
  183. if (m_mutex) {
  184. CloseHandle(m_mutex);
  185. m_mutex = NULL;
  186. }
  187. }
  188. bool isFull() {
  189. return bIsFull;
  190. }
  191. bool Lock() {
  192. if (m_mutex) {
  193. WaitForSingleObject(m_mutex, INFINITE);
  194. return true;
  195. }
  196. return false;
  197. }
  198. void Unlock() {
  199. if (m_mutex) ReleaseMutex(m_mutex);
  200. }
  201. void AddData(char* buf, int iLen, ConnDataWay way) {
  202. if (!Lock()) return;
  203. ::GetLocalTime(&dataBuffer[endIndex].time);
  204. dataBuffer[endIndex].way = way;
  205. if (dataBuffer[endIndex].data) delete[]dataBuffer[endIndex].data;
  206. dataBuffer[endIndex].data = new char[iLen];
  207. if (!dataBuffer[endIndex].data) return;
  208. memcpy(dataBuffer[endIndex].data, buf, iLen);
  209. dataBuffer[endIndex].iDataLen = iLen;
  210. iLastIndex = endIndex;
  211. endIndex = (endIndex + 1) % HISTORY_CONN_STATIC_COUNT;
  212. if (bIsFull) startIndex = endIndex;
  213. else if (endIndex == startIndex) bIsFull = true;
  214. Unlock();
  215. }
  216. }ConnHistoryData;
  217. class TCPServerIOCP : public ITCPServer {
  218. public:
  219. bool StartServer(ITcpServerCallBack* pUser, int iPort, CString strLocalIP = _T(""));
  220. bool StartServer_IPv6(ITcpServerCallBack* pUser, int iPort, CString strLocalIP = _T(""));
  221. void StopServer();
  222. BOOL SendData(const char* pData, int iLen, ClientInfo* pCltInfo);
  223. BOOL SendData(const char* pData, int iLen, CString strIP);
  224. BOOL SendData(const char* pData, int iLen, CString strIP, int port, SOCKET& sock);
  225. BOOL SendData(const char* pData, int iLen, SOCKET& sock);
  226. BOOL SendData(const char* pData, int iLen);
  227. ITcpServerCallBack* m_pCallBackUser = nullptr;
  228. ITcpServerCallBack* m_pCallBackUser_IPv6 = nullptr;
  229. BOOL m_bStarted;
  230. BOOL m_bStarted_IPv6;
  231. void Log(char* sz);
  232. void (*pLog)(char*);//日志接口
  233. std::function<void(int, CString, int, const void*, int, int, int)> m_callBackTcpLog;
  234. //获取IP列表
  235. virtual void GetIPList(std::vector<CString> &vec);
  236. virtual void GetConnectInfo(std::vector<ClientInfo *> &vec);
  237. virtual void GetHistoryConnectInfo(std::map<CString, ConnHistoryInfo *> &mapHis)
  238. {
  239. mapHis.insert(m_mapConnHistory.begin(), m_mapConnHistory.end());
  240. }
  241. virtual ConnHistoryInfo *GetHistoryConnectInfo(const CString &strIP)
  242. {
  243. auto pFind = m_mapConnHistory.find(strIP);
  244. if (pFind != m_mapConnHistory.end()) {
  245. return pFind->second;
  246. }
  247. return NULL;
  248. }
  249. private:
  250. //新连接绑定历史记录
  251. void BindHistoryData(COverlappedIOInfo* info, bool bRecv = false);
  252. public:
  253. //构造函数
  254. TCPServerIOCP();
  255. //虚构函数
  256. ~TCPServerIOCP();
  257. //开始监听
  258. bool StartListen(unsigned short port, string ip);
  259. bool StartListen_IPv6(unsigned short port, string ip);
  260. int CalcTimePassSecond(SYSTEMTIME* stLast, SYSTEMTIME* stNow = NULL);
  261. void ConnectionMaintain();
  262. /*
  263. 释放3个部分步骤:
  264. 1. 清空IOCP线程队列,退出线程
  265. 2. 清空等待accept套接字m_vecAcps
  266. 3. 清空已连接套接字m_vecContInfo并清空缓存
  267. */
  268. void CloseServer();
  269. //私有成员函数
  270. //启动CPU*2个线程,返回已启动的线程个数
  271. int StartWorkThreadPool();
  272. int StartWorkThreadPool_IPv6();
  273. //处理accept请求,NumberOfBytes=0表示没有收到第一帧数据, >0时表示收到了第一帧数据
  274. bool DoAccept(SOCKET sockAccept, SOCKADDR_IN* ClientAddr);
  275. bool DoAccept_IPv6(SOCKET sockAccept, SOCKADDR_IN6* ClientAddr);
  276. //投递recv请求
  277. bool PostRecv(COverlappedIOInfo* info);
  278. //处理recv请求
  279. bool DoRecv(COverlappedIOInfo* info);
  280. //从已连接的socket列表中移除socket及释放空间
  281. bool DeleteLink(SOCKET s, char* strError);
  282. BOOL IsIPOnline(CString strIP);
  283. int StaticConnData(CString strIp, const char* data, int iLen, ConnDataWay way);
  284. virtual void ClearConnHistoryInfo();
  285. //winsock版本类型
  286. WSAData m_wsaData;
  287. //临时加一个黑名单地址 scz
  288. ULONG m_blackaddr = 0;
  289. //端口监听套接字
  290. SOCKET m_sListen;
  291. SOCKET m_sListen_IPv6;
  292. //已建立连接的信息,每个结构含有一个套接字、发送缓冲区、接收缓冲区、对端地址
  293. vector<COverlappedIOInfo*> m_vecContInfo;
  294. //缓冲连接信息
  295. std::list<CConnectCache> m_listContInfo;
  296. CCriticalSection m_csCacheListLock;
  297. //操作vector的互斥访问锁
  298. CCriticalSection m_csClientVectorLock;
  299. //CIOCP封装类
  300. CIOCP m_iocp;
  301. CIOCP m_iocp_IPv6;
  302. //连接历史信息和状态信息
  303. std::map<CString, ConnHistoryInfo*> m_mapConnHistory;//暂时弃用,后续启用时记得加锁防止多线程并发访问。多线程操作会出现map有两个相同的key
  304. static void ListenThread_IOCPServer(LPVOID lpParam);
  305. static void ListenThread_IOCPServer_IPv6(LPVOID lpParam);
  306. CString m_strServerIP;
  307. int m_iServerPort;
  308. DWORD m_lastError;
  309. CString m_strServerIP_IPv6;
  310. int m_iServerPort_IPv6;
  311. };