#include "stdafx.h" #include "TCPServerIOCP.h" //namespace iocpSock { // std::string format(const char* pszFmt, ...) // { // std::string str; // va_list args; // va_start(args, pszFmt); // { // int nLength = _vscprintf(pszFmt, args); // nLength += 1;//上面返回的长度是包含\0,这里加上 // std::vector vectorChars(nLength); // _vsnprintf(vectorChars.data(), nLength, pszFmt, args); // str.assign(vectorChars.data()); // } // va_end(args); // return str; // } //} // //std::vector m_vecTCPIOCP; void TCPServerIOCP::ListenThread_IOCPServer(LPVOID lpParam) { TCPServerIOCP* pServ = (TCPServerIOCP*)lpParam; if (pServ == NULL) return; //等待客户端连接 SOCKADDR_IN clientAddr; int c_size = sizeof(SOCKADDR_IN); while (1) { memset(&clientAddr, 0, sizeof(SOCKADDR_IN)); SOCKET sClient = accept(pServ->m_sListen, (SOCKADDR*)&clientAddr, &c_size); if (pServ->m_bStarted == FALSE) break; if (sClient != INVALID_SOCKET)//有效连接 { pServ->DoAccept(sClient, &clientAddr); } else { int iErr = GetLastError(); string str = fmt::format("[TCPServer][error]{} Failed accept, 错误码:{}", (LPCSTR)pServ->GetIOCPName(), iErr); pServ->Log((char*)str.c_str()); continue; } } } void TCPServerIOCP::ListenThread_IOCPServer_IPv6(LPVOID lpParam) { TCPServerIOCP* pServ = (TCPServerIOCP*)lpParam; if (pServ == NULL) return; //等待客户端连接 SOCKADDR_IN6 clientAddr; int c_size = sizeof(SOCKADDR_IN6); while (1) { memset(&clientAddr, 0, sizeof(SOCKADDR_IN6)); SOCKET sClient = accept(pServ->m_sListen_IPv6, (SOCKADDR*)&clientAddr, &c_size); if (pServ->m_bStarted_IPv6 == FALSE) break; if (sClient != INVALID_SOCKET)//有效连接 { pServ->DoAccept_IPv6(sClient, &clientAddr); } else { int iErr = GetLastError(); string str = fmt::format("[TCPServer][error]{} Failed accept, 错误码:{}", (LPCSTR)pServ->GetIOCPName(), iErr); pServ->Log((char*)str.c_str()); continue; } } } DWORD WINAPI ConnectionMaintainThread(LPVOID lpParam) { TCPServerIOCP* pServ = (TCPServerIOCP*)lpParam; pServ->ConnectionMaintain(); return 0; } bool TCPServerIOCP::StartServer(ITcpServerCallBack* pUser, int iPort, CString strLocalIP /*= ""*/) { m_strServerIP = strLocalIP; m_iServerPort = iPort; m_pCallBackUser = pUser; USES_CONVERSION; if (iAutoClearDeadConnectionTime > 0) { DWORD dwThread = 0; HANDLE hThread = CreateThread(NULL, 0, ::ConnectionMaintainThread, (LPVOID)this, 0, &dwThread); if (hThread) CloseHandle(hThread); } string strip = strLocalIP; return StartListen(iPort, strip); } bool TCPServerIOCP::StartServer_IPv6(ITcpServerCallBack* pUser, int iPort, CString strLocalIP /*= ""*/) { m_strServerIP_IPv6 = strLocalIP; m_iServerPort_IPv6 = iPort; m_pCallBackUser_IPv6 = pUser; USES_CONVERSION; //超时断开连接 if (iAutoClearDeadConnectionTime > 0) { DWORD dwThread = 0; HANDLE hThread = CreateThread(NULL, 0, ::ConnectionMaintainThread, (LPVOID)this, 0, &dwThread); if (hThread) CloseHandle(hThread); } return StartListen_IPv6(iPort, (string)strLocalIP); } void TCPServerIOCP::StopServer() { closesocket(m_sListen); CloseServer(); m_bStarted = FALSE; closesocket(m_sListen_IPv6); WSACleanup(); m_bStarted_IPv6 = FALSE; } BOOL TCPServerIOCP::SendData(const char* pData, int iLen, ClientInfo* pCltInfo) { //socket是阻塞式socket,当socket发送缓冲区满时,该函数会阻塞;在发送大数据时会出现阻塞状态 if (iLen == 0) return FALSE; if (!pCltInfo) return FALSE; BOOL bRet = SendData(pData, iLen, pCltInfo->strIP, pCltInfo->iPort, pCltInfo->sock); StaticConnData(pCltInfo->strIP, pData, iLen, CONN_DATA_SEND); if (bRet) pCltInfo->iSendSucCount += iLen; else pCltInfo->iSendFailCount += iLen; return bRet; } BOOL TCPServerIOCP::SendData(const char* pData, int iLen, CString strIP) { BOOL bRet = FALSE; m_csClientVectorLock.Lock(); std::vector::iterator iter = m_vecContInfo.begin(); //所有该ip的客户端都发送,一个ip有两个连接,发送给了僵尸连接却没有发送给正常连接 for (; iter != m_vecContInfo.end(); ++iter) { if ((*iter) && (*iter)->m_cltInfo.strIP == strIP && (*iter)->m_cltInfo.sock != NULL) { bRet = SendData(pData, iLen, &(*iter)->m_cltInfo); } } m_csClientVectorLock.Unlock(); return bRet; } BOOL TCPServerIOCP::SendData(const char* pData, int iLen, SOCKET& sock) { BOOL bRet = FALSE; m_csClientVectorLock.Lock(); std::vector::iterator iter = m_vecContInfo.begin(); bool bSend = false; //所有该ip的客户端都发送,一个ip有两个连接,发送给了僵尸连接却没有发送给正常连接 for (; iter != m_vecContInfo.end(); ++iter) { if ((*iter) && (*iter)->m_cltInfo.sock == sock) { bRet = SendData(pData, iLen, &(*iter)->m_cltInfo); bSend = true; } } if (!bSend) { SendData(pData, iLen, "专项发送", 0, sock); } m_csClientVectorLock.Unlock(); return bRet; } BOOL TCPServerIOCP::SendData(const char* pData, int iLen, CString strIP, int port, SOCKET& sock) { if (sock == 0) return FALSE; //长沙南150机站chrono::steady_clock::now()出现奔溃 //暂时假设系统资源紧张导致,先注释 2023.9.13 卢涛 //auto s = chrono::steady_clock::now(); int iRet = send(sock, pData, iLen, 0); //auto s2 = chrono::steady_clock::now(); //auto s_dif = chrono::duration_cast(s2 - s).count(); int iErr = GetLastError(); /*if (s_dif > 500) { if (m_callBackTcpLog) { auto str = fmt::format("[TCPServerIOCP][send]数据发送间隔超过500ms. 对方地址{}:{} 耗时{} ms 长度:{} send返回值:{}", (LPCSTR)strIP, port, s_dif, iLen, iRet); m_callBackTcpLog(3, strIP, port, (void*)str.c_str(), str.length(), iRet, iErr); } }*/ if (m_callBackTcpLog != nullptr) { m_callBackTcpLog(0, strIP, port, pData, iLen, iRet, iErr); } if (iRet == -1) { string strError; if (iErr == WSAETIMEDOUT) { strError = "超时";//客户端不接受数据或者接受处理缓慢可能导致此问题 string str = fmt::format("[TCPServer][error]socket:{} 数据发送失败,错误码:{}({})", sock, iErr, strError.c_str()); if (m_callBackTcpLog) m_callBackTcpLog(3, strIP, port, (void*)str.c_str(), str.length(), iRet, iErr); closesocket(sock); sock = 0; } else if (iErr == WSAENOTSOCK) { auto str = fmt::format("[TCPServer][error]在一个非套接字上尝试了一个操作。,iErr={}", iErr); if (m_callBackTcpLog) m_callBackTcpLog(3, strIP, port, (void*)str.c_str(), str.length(), iRet, iErr); closesocket(sock); sock = 0; } else if (iErr == WSAECONNABORTED) { auto str = fmt::format("[TCPServer][error]你的主机中的软件中止了一个已建立的连接,iErr={}", iErr); if (m_callBackTcpLog) m_callBackTcpLog(3, strIP, port, (void*)str.c_str(), str.length(), iRet, iErr); closesocket(sock); sock = 0; } else { closesocket(sock); sock = 0; } return FALSE; } return TRUE; } //广播发送 BOOL TCPServerIOCP::SendData(const char* pData, int iLen) { //std::vector& vecClt = m_vecContInfo; //std::vector::iterator iter = vecClt.begin(); ClientInfo* ptrClientInfo = NULL; //for (; iter != vecClt.end(); ++iter) //for(auto & iter: m_vecContInfo) //{ // //ptrClientInfo = &(*iter)->m_cltInfo; // ptrClientInfo = &(iter->m_cltInfo); // SendData(pData, iLen, ptrClientInfo); //} m_csClientVectorLock.Lock(); for (auto iter = m_vecContInfo.begin(); iter != m_vecContInfo.end();) { //ptrClientInfo = &(*iter)->m_cltInfo; ptrClientInfo = &((*iter)->m_cltInfo); if (FALSE == SendData(pData, iLen, ptrClientInfo)) { m_csCacheListLock.Lock(); m_listContInfo.push_back({ std::shared_ptr(*iter), GetTickCount() }); m_csCacheListLock.Unlock(); iter = m_vecContInfo.erase(iter); } else iter++; } m_csClientVectorLock.Unlock(); return TRUE; } /*构造函数*/ void TCPServerIOCP::Log(char* sz) { if (pLog) { pLog(sz); } } TCPServerIOCP::TCPServerIOCP() : m_callBackTcpLog(nullptr) { WSAStartup(MAKEWORD(2, 2), &m_wsaData); m_mapConnHistory.clear(); //if(bAddVec) //{ // m_vecTCPIOCP.push_back(this); // HWND hWnd = FindWindow(NULL, TCPIOCP_MANAGER); // if (hWnd) PostMessage(hWnd, ADD_TCPIOCP, 0, 0); //} m_bStarted = FALSE; //pLog = NULL; } /*析构函数*/ TCPServerIOCP::~TCPServerIOCP() { CloseServer(); WSACleanup(); for (auto it = m_mapConnHistory.begin(); it != m_mapConnHistory.end(); it++) { if (it->second) delete it->second; } m_mapConnHistory.clear(); } /*TCP Server开启监听*/ bool TCPServerIOCP::StartListen(unsigned short port, string ip) { //listen socket需要将accept操作投递到完成端口,因此listen socket属性必须有重叠IO m_sListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (m_sListen == INVALID_SOCKET) { return false; } //创建并设置IOCP并发线程数量 if (m_iocp.Create() == false) { return false; } if (!m_iocp.AssociateSocket(m_sListen, TYPE_ACP)) { return false; } sockaddr_in service; service.sin_family = AF_INET; service.sin_port = htons(port); if (ip.empty()) { service.sin_addr.s_addr = INADDR_ANY; } else { service.sin_addr.s_addr = inet_addr(ip.c_str()); } if (::bind(m_sListen, (sockaddr*)&service, sizeof(service)) == SOCKET_ERROR) { m_lastError = GetLastError(); return false; } if (listen(m_sListen, SOMAXCONN) == SOCKET_ERROR) { } //启动工作者线程 int threadnum = StartWorkThreadPool(); Sleep(20); m_bStarted = TRUE; _beginthread(TCPServerIOCP::ListenThread_IOCPServer, 0, this); return true; } /*TCP Server_IPv6开启监听*/ bool TCPServerIOCP::StartListen_IPv6(unsigned short port, string ip) { WSADATA wsaData; // 初始化Winsock int result = WSAStartup(MAKEWORD(2, 2), &wsaData); if (result != 0) { return 1; } // 创建socket m_sListen_IPv6 = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); if (m_sListen_IPv6 == INVALID_SOCKET) { WSACleanup(); return 1; } //创建并设置IOCP并发线程数量 if (m_iocp_IPv6.Create() == false) { return false; } if (!m_iocp_IPv6.AssociateSocket(m_sListen_IPv6, TYPE_ACP)) { return false; } // 准备地址信息 sockaddr_in6 addr; ZeroMemory(&addr, sizeof(addr)); addr.sin6_family = AF_INET6; addr.sin6_addr = in6addr_any; // 监听任何地址 addr.sin6_port = htons(port); // 端口9999 // 绑定socket _WINSOCK2API_::bind(m_sListen_IPv6, (SOCKADDR*)&addr, sizeof(addr)); // 开始监听 if (listen(m_sListen_IPv6, SOMAXCONN) == SOCKET_ERROR) { int error = WSAGetLastError(); closesocket(m_sListen_IPv6); WSACleanup(); return 1; } //启动工作者线程 int threadnum = StartWorkThreadPool_IPv6(); Sleep(20); m_bStarted_IPv6 = TRUE; _beginthread(TCPServerIOCP::ListenThread_IOCPServer_IPv6, 0, this); return true; } int TCPServerIOCP::CalcTimePassSecond(SYSTEMTIME* stLast, SYSTEMTIME* stNow) { SYSTEMTIME stNowTime; if (stNow) { stNowTime = *stNow; } else { GetLocalTime(&stNowTime); } CTime dTimeNow(stNowTime); CTime dTimeLast(*stLast); CTimeSpan dTimeSpan = dTimeNow - dTimeLast; int iSecnonSpan = dTimeSpan.GetTotalSeconds(); return abs(iSecnonSpan); } void TCPServerIOCP::ConnectionMaintain() { while (1) { Sleep(5000); m_csClientVectorLock.Lock(); std::vector::iterator iter = m_vecContInfo.begin(); vector vecDeadList; for (; iter != m_vecContInfo.end(); ++iter) { ClientInfo* pci = &(*iter)->m_cltInfo; int iUnactiveTime = CalcTimePassSecond(&pci->stLastActive); if (iUnactiveTime > iAutoClearDeadConnectionTime && iAutoClearDeadConnectionTime != 0) { vecDeadList.push_back(*iter); } } for (int i = 0; i < vecDeadList.size(); i++) { COverlappedIOInfo* p = vecDeadList.at(i); if (p->m_cltInfo.sock == NULL) continue; if (m_callBackTcpLog != nullptr) { m_callBackTcpLog(2, p->m_cltInfo.strIP, p->m_cltInfo.iPort, "心跳超时断开", 0, 0, 1); } string str = fmt::format("[TCPServer][trace][{}][socketid:{}]心跳超时断开", (LPCSTR)GetIOCPName(), (int)p->m_cltInfo.sock); Log((char *)str.c_str()); //关闭socket,GetQueuedCompletionStatus返回false,未决的WSA_RECV似乎是丢弃了 closesocket(p->m_cltInfo.sock); p->m_cltInfo.sock = NULL; p->m_cltInfo.isValid = false; int iErr = WSAGetLastError(); string strErrorInfo = ""; //关闭当前连接 //m_iocp.PostStatus(TYPE_CLOSE_SOCK, 0, p); } m_csClientVectorLock.Unlock(); } } //COverlappedIOInfo 对象必须由WorkThread来删除。如果其他线程删除该对象。GetQueuedCompletionStatus返回会拿到野指针 //因为该对象被 PostRecv放到完成端口中使用了 DWORD WINAPI WorkThread(LPVOID lpParam) { TCPServerIOCP* pServ = (TCPServerIOCP*)lpParam; while (true) { DWORD NumberOfBytes = 0; ULONG_PTR completionKey = 0; OVERLAPPED* ol = NULL; //阻塞调用GetQueuedCompletionStatus获取完成端口事件 //GetQueuedCompletionStatus成功返回 if (GetQueuedCompletionStatus(pServ->m_iocp.GetIOCP(), &NumberOfBytes, &completionKey, &ol, WSA_INFINITE)) { COverlappedIOInfo* olinfo = (COverlappedIOInfo*)ol; if (completionKey == TYPE_CLOSE) { break; } //客户端断开连接 if (NumberOfBytes == 0 && (completionKey == TYPE_RECV || completionKey == TYPE_SEND)) { string str = fmt::format("[TCPServer][trace][{}][socketid:{}]错误信息:客户端主动断开socket连接", (LPCSTR)pServ->GetIOCPName(), (int)olinfo->m_socket); pServ->Log((char*)str.c_str()); pServ->DeleteLink(olinfo->m_socket, "客户端主动断开socket连接"); continue; } if (completionKey == TYPE_CLOSE_SOCK) { pServ->DeleteLink(olinfo->m_socket, "TYPE_CLOSE_SOCK主动关闭连接"); continue; } switch (completionKey) { case TYPE_ACP: { }break; case TYPE_RECV: { //运行到此处,数据已经从socket缓冲区接收到overlapped info中 olinfo->m_recvBuf.len = NumberOfBytes; pServ->DoRecv(olinfo); pServ->PostRecv(olinfo); }break; case TYPE_SEND: { }break; default: break; } } else//GetQueuedCompletionStatus出错 { int res = WSAGetLastError(); CString strError; strError.Format("[err:%d]", res); COverlappedIOInfo *olinfo = (COverlappedIOInfo *)ol; SOCKET sk = olinfo ? olinfo->m_socket : 0; switch (res) { case ERROR_NETNAME_DELETED: { string str = fmt::format("[TCPServer][trace][{}][socketid:{}]错误信息:指定的网络名不再可用", (LPCSTR)pServ->GetIOCPName(), (int)sk); strError += "客户端主动断开socket连接"; pServ->Log((char*)str.c_str()); } break; case ERROR_CONNECTION_ABORTED: { string str = fmt::format("[TCPServer][debug][{}][socketid:{}]错误信息:服务端主动断开socket连接", (LPCSTR)pServ->GetIOCPName(), (int)sk); strError += "服务端主动断开socket连接"; pServ->Log((char*)str.c_str()); } break; default: break; } if (olinfo) { pServ->DeleteLink(olinfo->m_socket, strError.GetBuffer()); } } } return 0; } DWORD WINAPI WorkThread_IPv6(LPVOID lpParam) { TCPServerIOCP* pServ = (TCPServerIOCP*)lpParam; while (true) { DWORD NumberOfBytes = 0; ULONG_PTR completionKey = 0; OVERLAPPED* ol = NULL; //阻塞调用GetQueuedCompletionStatus获取完成端口事件 //GetQueuedCompletionStatus成功返回 if (GetQueuedCompletionStatus(pServ->m_iocp_IPv6.GetIOCP(), &NumberOfBytes, &completionKey, &ol, WSA_INFINITE)) { COverlappedIOInfo* olinfo = (COverlappedIOInfo*)ol; if (completionKey == TYPE_CLOSE) { break; } //客户端断开连接 if (NumberOfBytes == 0 && (completionKey == TYPE_RECV || completionKey == TYPE_SEND)) { string str = fmt::format("[TCPServer][trace][{}][socketid:{}]错误信息:客户端主动断开socket连接", (LPCSTR)pServ->GetIOCPName(), (int)olinfo->m_socket); pServ->Log((char*)str.c_str()); pServ->DeleteLink(olinfo->m_socket, "客户端主动断开socket连接"); continue; } if (completionKey == TYPE_CLOSE_SOCK) { pServ->DeleteLink(olinfo->m_socket, "TYPE_CLOSE_SOCK主动关闭连接"); continue; } switch (completionKey) { case TYPE_ACP: { }break; case TYPE_RECV: { //运行到此处,数据已经从socket缓冲区接收到overlapped info中 olinfo->m_recvBuf.len = NumberOfBytes; pServ->DoRecv(olinfo); pServ->PostRecv(olinfo); }break; case TYPE_SEND: { }break; default: break; } } else//GetQueuedCompletionStatus出错 { int res = WSAGetLastError(); CString strError; strError.Format("[err:%d]", res); COverlappedIOInfo* olinfo = (COverlappedIOInfo*)ol; SOCKET sk = olinfo ? olinfo->m_socket : 0; switch (res) { case ERROR_NETNAME_DELETED: { string str = fmt::format("[TCPServer][trace][{}][socketid:{}]错误信息:指定的网络名不再可用", (LPCSTR)pServ->GetIOCPName(), (int)sk); strError += "客户端主动断开socket连接"; pServ->Log((char*)str.c_str()); } break; case ERROR_CONNECTION_ABORTED: { string str = fmt::format("[TCPServer][debug][{}][socketid:{}]错误信息:服务端主动断开socket连接", (LPCSTR)pServ->GetIOCPName(), (int)sk); strError += "服务端主动断开socket连接"; pServ->Log((char*)str.c_str()); } break; default: break; } if (olinfo) { pServ->DeleteLink(olinfo->m_socket, strError.GetBuffer()); } } } return 0; } /*启动工作者线程池*/ int TCPServerIOCP::StartWorkThreadPool() { DWORD dwThread = 0; HANDLE hThread = CreateThread(NULL, 0, WorkThread, (LPVOID)this, 0, &dwThread); CloseHandle(hThread); return 0; } int TCPServerIOCP::StartWorkThreadPool_IPv6() { DWORD dwThread = 0; HANDLE hThread = CreateThread(NULL, 0, WorkThread_IPv6, (LPVOID)this, 0, &dwThread); CloseHandle(hThread); return 0; } bool TCPServerIOCP::DoAccept(SOCKET sockAccept, SOCKADDR_IN* ClientAddr) { //临时加的代码屏蔽某个IP if (ClientAddr->sin_addr.S_un.S_addr == m_blackaddr) { if (m_callBackTcpLog != nullptr) m_callBackTcpLog(1, inet_ntoa(ClientAddr->sin_addr), htons(ClientAddr->sin_port), "连接成功, 但在黑名单内直接关闭", 0, 0, 1); closesocket(sockAccept); return false; } //删除缓冲里面超过30分钟的数据 m_csCacheListLock.Lock(); DWORD dwNowTick = GetTickCount(); m_listContInfo.remove_if([&dwNowTick](const CConnectCache& it) { return dwNowTick - it.dwDisConnect > 30 * 60 * 1000; }); m_csCacheListLock.Unlock(); //新连接的socket是accept函数默认创建的,因此该socket是阻塞式socket COverlappedIOInfo* olinfo = new COverlappedIOInfo();//其他地方内存泄漏很容易导致此处崩溃,原因未知,如果发现此处异常,很可能某处存在内存泄漏 if (!olinfo) return false; int nNetTimeout = 3000;//3秒,由于客户端的异常,没有及时接受数据导致服务端发送缓冲区满了之后,可能导致send超时返回。为防止客户端缺陷不收数据导致服务端send函数永久阻塞,增加该超时设置 //设置发送超时 setsockopt(sockAccept, SOL_SOCKET, SO_SNDTIMEO, (char*)&nNetTimeout, sizeof(int)); olinfo->m_socket = sockAccept; olinfo->m_addr = *ClientAddr; olinfo->m_cltInfo.pTcpServer1 = this; CString strIP; strIP.Format(_T("%d.%d.%d.%d"), olinfo->m_addr.sin_addr.S_un.S_un_b.s_b1, olinfo->m_addr.sin_addr.S_un.S_un_b.s_b2, olinfo->m_addr.sin_addr.S_un.S_un_b.s_b3, olinfo->m_addr.sin_addr.S_un.S_un_b.s_b4); memcpy(olinfo->m_cltInfo.strIP, strIP.GetBuffer(0), strIP.GetLength()); olinfo->m_cltInfo.strIP[strIP.GetLength()] = '\0'; olinfo->m_cltInfo.sock = olinfo->m_socket; olinfo->m_cltInfo.iPort = ntohs(olinfo->m_addr.sin_port); if (m_callBackTcpLog != nullptr) { m_callBackTcpLog(1, strIP, olinfo->m_cltInfo.iPort, "连接成功", 0, 0, 1); } //服务端只收取recv,同时监听recv和send可用设计位偏移,用或运算实现 if (m_iocp.AssociateSocket(olinfo->m_socket, TYPE_RECV)) { //绑定历史记录 BindHistoryData(olinfo); m_csClientVectorLock.Lock(); m_vecContInfo.push_back(olinfo); m_csClientVectorLock.Unlock(); m_pCallBackUser->ConnStatusChange(&olinfo->m_cltInfo, true); PostRecv(olinfo); } else { delete olinfo; return false; } return true; } bool TCPServerIOCP::DoAccept_IPv6(SOCKET sockAccept, SOCKADDR_IN6* ClientAddr) { //删除缓冲里面超过30分钟的数据 m_csCacheListLock.Lock(); DWORD dwNowTick = GetTickCount(); m_listContInfo.remove_if([&dwNowTick](const CConnectCache& it) { return dwNowTick - it.dwDisConnect > 30 * 60 * 1000; }); m_csCacheListLock.Unlock(); //新连接的socket是accept函数默认创建的,因此该socket是阻塞式socket COverlappedIOInfo* olinfo = new COverlappedIOInfo();//其他地方内存泄漏很容易导致此处崩溃,原因未知,如果发现此处异常,很可能某处存在内存泄漏 if (!olinfo) return false; int nNetTimeout = 3000;//3秒,由于客户端的异常,没有及时接受数据导致服务端发送缓冲区满了之后,可能导致send超时返回。为防止客户端缺陷不收数据导致服务端send函数永久阻塞,增加该超时设置 //设置发送超时 setsockopt(sockAccept, SOL_SOCKET, SO_SNDTIMEO, (char*)&nNetTimeout, sizeof(int)); olinfo->m_socket = sockAccept; olinfo->bIPv6addr = TRUE; olinfo->m_addr_IPv6 = *ClientAddr; olinfo->m_cltInfo.pTcpServer1 = this; unsigned char* ipBytes = olinfo->m_addr_IPv6.sin6_addr.u.Byte; CString strIP; strIP.Format(_T("%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x:%02x%02x"), ipBytes[0], ipBytes[1], ipBytes[2], ipBytes[3], ipBytes[4], ipBytes[5], ipBytes[6], ipBytes[7], ipBytes[8], ipBytes[9], ipBytes[10], ipBytes[11], ipBytes[12], ipBytes[13], ipBytes[14], ipBytes[15]); memcpy(olinfo->m_cltInfo.strIP, strIP.GetBuffer(0), strIP.GetLength()); olinfo->m_cltInfo.strIP[strIP.GetLength()] = '\0'; olinfo->m_cltInfo.sock = olinfo->m_socket; olinfo->m_cltInfo.iPort = ntohs(olinfo->m_addr.sin_port); if (m_callBackTcpLog != nullptr) { m_callBackTcpLog(1, strIP, olinfo->m_cltInfo.iPort, "连接成功", 0, 0, 1); } //服务端只收取recv,同时监听recv和send可用设计位偏移,用或运算实现 if (m_iocp_IPv6.AssociateSocket(olinfo->m_socket, TYPE_RECV)) { //绑定历史记录 BindHistoryData(olinfo); m_csClientVectorLock.Lock(); m_vecContInfo.push_back(olinfo); m_csClientVectorLock.Unlock(); m_pCallBackUser->ConnStatusChange(&olinfo->m_cltInfo, true); PostRecv(olinfo); } else { delete olinfo; return false; } return true; } /*投递一个接收数据完成端口*/ /* WSARecv 只是向系统提交一个异步接收请求,这个请求会在有数据到达之后返回,并且放入完成队列通知工作线程,这个异步接收请求到此完成,继续提交请求是为了接收下一个数据包,也就是说,每次请求返回之后必须再次提交。 类似这样: 1. 你让一个前台去等待接待一个客人(WSARecv)然后继续做你的事情 2. 你的秘书会一直等着资料夹有新文件然后拿给你然后继续等待有新文件(loop) 3. 前台接待客人之后把客户资料放到资料夹之后就不会继续去前台接待客人了 4. 这个时候如果你不再派前台继续去等待接待客人(WSARecv),那么你的资料夹不会有新的资料了,这时就需要再次指派前台去等待接待新的客人(再次WSARecv) */ bool TCPServerIOCP::PostRecv(COverlappedIOInfo* info) { if (GetStopRecvStatus()) return true; DWORD BytesRecevd = 0; DWORD dwFlags = 0; info->ResetOverlapped(); info->ResetRecvBuffer(); int recvNum = WSARecv(info->m_socket, &info->m_recvBuf, 1, &info->m_recvBuf.len, &dwFlags, (OVERLAPPED*)info, NULL); int iLastError = 0; if (recvNum != 0) { iLastError = WSAGetLastError(); if (WSA_IO_PENDING != iLastError) { } else if (WSAENOTSOCK == iLastError) { //socket被超时线程关闭 或 其他原因 DeleteLink(info->m_socket, "socket被超时线程关闭 或 其他原因"); } else if (WSAECONNRESET == iLastError) //呗远程主机强制关闭 { DeleteLink(info->m_socket, "socket呗远程主机强制关闭"); } else { } } return true; } //新连接绑定历史记录 void TCPServerIOCP::BindHistoryData(COverlappedIOInfo* info, bool bRecv/* = false*/) { CString strIP = info->m_cltInfo.strIP; auto pFindNew = m_mapConnHistory.find(strIP); if (pFindNew == m_mapConnHistory.end()) { ConnHistoryInfo* newInfo = new ConnHistoryInfo; newInfo->strIp = strIP; newInfo->socket = info->m_socket; newInfo->bOnline = true; newInfo->bProxy = false; newInfo->bRecving = bRecv; newInfo->pCltInfo = &info->m_cltInfo; ::GetLocalTime(&newInfo->lastConnectTime); m_mapConnHistory[strIP] = newInfo; } else { pFindNew->second->Lock(); pFindNew->second->socket = info->m_socket; pFindNew->second->iReconnnectCount++; pFindNew->second->bOnline = true; pFindNew->second->bProxy = false; pFindNew->second->bRecving = bRecv; pFindNew->second->pCltInfo = &info->m_cltInfo; ::GetLocalTime(&pFindNew->second->lastConnectTime); pFindNew->second->Unlock(); } } /*接收数据处理句柄*/ bool TCPServerIOCP::DoRecv(COverlappedIOInfo* info) { //auto pFindOld = m_mapConnHistory.find(info->m_cltInfo.strIP); //if (pFindOld != m_mapConnHistory.end()) //{ // if (!pFindOld->second->bRecving) // { // pFindOld->second->bRecving = true; // HWND hWnd = FindWindow(NULL, TCPIOCP_MANAGER); // if (hWnd) { // SendMessage(hWnd, ADD_CONNECT_TCPTOCP, 0, (WPARAM)info->m_cltInfo.strIP); // } // } //} info->m_cltInfo.iRecvCount += info->m_recvBuf.len; m_pCallBackUser->OnRecvData_TCPServer((BYTE*)info->m_recvBuf.buf, info->m_recvBuf.len, &info->m_cltInfo); GetLocalTime(&info->m_cltInfo.stLastActive); return true; } /*删除失效连接句柄*/ bool TCPServerIOCP::DeleteLink(SOCKET s, char* strError) { m_csClientVectorLock.Lock(); std::vector::iterator iter = m_vecContInfo.begin(); bool found = false;//标志位,客户端在列表中找到与否 for (; iter != m_vecContInfo.end(); ++iter) { if ((*iter) && s == (*iter)->m_socket) { if (m_mapConnHistory.find((*iter)->m_cltInfo.strIP) != m_mapConnHistory.end()) { m_mapConnHistory[(*iter)->m_cltInfo.strIP]->Lock(); m_mapConnHistory[(*iter)->m_cltInfo.strIP]->bOnline = false; m_mapConnHistory[(*iter)->m_cltInfo.strIP]->pCltInfo = NULL; m_mapConnHistory[(*iter)->m_cltInfo.strIP]->Unlock(); } COverlappedIOInfo* ol = *iter; if (m_callBackTcpLog != nullptr) { m_callBackTcpLog(2, ol->m_cltInfo.strIP, ol->m_cltInfo.iPort, strError, 0, 0, 1); } m_pCallBackUser->ConnStatusChange(&ol->m_cltInfo, FALSE); closesocket(s); m_vecContInfo.erase(iter); if (ol)//避免重复删除 { //delete ol; //ol = NULL; m_csCacheListLock.Lock(); m_listContInfo.push_back({ std::shared_ptr(ol), GetTickCount() }); m_csCacheListLock.Unlock(); } found = true;//找到了 break; } } if (!found)//客户端在列表中没有找到 { } m_csClientVectorLock.Unlock(); return true; } BOOL TCPServerIOCP::IsIPOnline(CString strIP) { m_csClientVectorLock.Lock(); std::vector::iterator iter = m_vecContInfo.begin(); BOOL found = FALSE;//标志位,客户端在列表中找到与否 for (; iter != m_vecContInfo.end(); ++iter) { if (strIP == (*iter)->m_cltInfo.strIP) { found = TRUE;//找到了 break; } } m_csClientVectorLock.Unlock(); return found; } /*关闭服务器*/ void TCPServerIOCP::CloseServer() { //1.清空IOCP线程队列,退出工作线程,给所有的线程发送PostQueueCompletionStatus信息 if (false == m_iocp.PostStatus(TYPE_CLOSE)) { } //3.清空已连接的套接字m_vecContInfo并清空缓存 for (int i = 0; i < m_vecContInfo.size(); i++) { COverlappedIOInfo* olinfo = m_vecContInfo.at(i); closesocket(olinfo->m_socket); if (m_callBackTcpLog != nullptr) { m_callBackTcpLog(2, olinfo->m_cltInfo.strIP, olinfo->m_cltInfo.iPort, "关闭服务器连接断开", 0, 0, 1); } delete olinfo; } m_vecContInfo.clear(); } int TCPServerIOCP::StaticConnData(CString strIp, const char* data, int iLen, ConnDataWay way) { //if (strIp == GetMonitoring() && m_mapConnHistory.find(strIp) != m_mapConnHistory.end()) { // HWND hWnd = FindWindow(NULL, TCPIOCP_MANAGER); // if (hWnd) { // ConnDataNode* node = new ConnDataNode; // ::GetLocalTime(&node->time); // node->data = new char[iLen]; // memcpy(node->data, data, iLen); // node->iDataLen = iLen; // node->way = way; // PostMessage(hWnd, UPDATE_ONE_DATA, (WPARAM)this, (LPARAM)node); // } //} return 0; } void TCPServerIOCP::ClearConnHistoryInfo() { for (auto it = m_mapConnHistory.begin(); it != m_mapConnHistory.end(); it++) { it->second->Lock(); it->second->iSendSucCount = it->second->iSendFailCount = it->second->iReconnnectCount = it->second->iRecvCount = 0; it->second->Unlock(); } } //获取IP列表 void TCPServerIOCP::GetIPList(std::vector &vec) { m_csClientVectorLock.Lock(); for (int i = 0; i < m_vecContInfo.size(); i++) { CString str = m_vecContInfo.at(i)->m_cltInfo.strIP; vec.push_back(str); } m_csClientVectorLock.Unlock(); } void TCPServerIOCP::GetConnectInfo(std::vector &vec) { m_csClientVectorLock.Lock(); for (int i = 0; i < m_vecContInfo.size(); i++) { vec.push_back(&m_vecContInfo.at(i)->m_cltInfo); } m_csClientVectorLock.Unlock(); }