| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 | 
							- #pragma once 
 
- #include <map>
 
- #include <list>
 
- #include "ITCPServer.h"
 
- #include <list>
 
- using namespace std;
 
- //#define TCPIOCP_MANAGER _T("TcpServer监视")
 
- #define UPDATE_ONE_DATA WM_USER + 1001
 
- //#define ADD_TCPIOCP WM_USER + 1002
 
- //#define ADD_CONNECT_TCPTOCP WM_USER + 1003
 
- //#define UPDATE_ONE_DATA_JDSP_RECV WM_USER + 1004
 
- //#define UPDATE_ONE_DATA_JDSP_SEND WM_USER + 1005
 
- class CIOCP
 
- {
 
- public:
 
- 	//CIOCP构造函数
 
- 	CIOCP(int nMaxConcurrent = -1)
 
- 	{
 
- 		m_hIOCP = NULL;
 
- 		if (nMaxConcurrent != -1)
 
- 		{
 
- 			Create(nMaxConcurrent);
 
- 		}
 
- 	}
 
- 	//CIOCP析构函数
 
- 	~CIOCP()
 
- 	{
 
- 		if (m_hIOCP != NULL)
 
- 		{
 
- 			CloseHandle(m_hIOCP);
 
- 		}
 
- 	}
 
- 	//关闭IOCP
 
- 	BOOL Close()
 
- 	{
 
- 		BOOL bResult = CloseHandle(m_hIOCP);
 
- 		m_hIOCP = NULL;
 
- 		return bResult;
 
- 	}
 
- 	//创建IOCP, nMaxConcurrency指定最大线程并发数量, 0 默认为CPU核数
 
- 	bool Create(int nMaxConcurrency = 0)
 
- 	{
 
- 		m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nMaxConcurrency);
 
- 		//ASSERT(m_hIOCP != NULL);
 
- 		return (m_hIOCP != NULL);
 
- 	}
 
- 	//为设备(文件,socket,邮件槽,管道等)关联一个IOCP
 
- 	bool AssociateDevice(HANDLE hDevice, ULONG_PTR Compkey)
 
- 	{
 
- 		bool fOk = (CreateIoCompletionPort(hDevice, m_hIOCP, Compkey, 0) == m_hIOCP);
 
- 		return fOk;
 
- 	}
 
- 	//为Socket关联一个IOCP
 
- 	bool AssociateSocket(SOCKET hSocket, ULONG_PTR CompKey)
 
- 	{
 
- 		return AssociateDevice((HANDLE)hSocket, CompKey);
 
- 	}
 
- 	//为IOCP传递事件通知
 
- 	bool PostStatus(ULONG_PTR CompKey, DWORD dwNumBytes = 0, OVERLAPPED* po = NULL)
 
- 	{
 
- 		bool fOk = PostQueuedCompletionStatus(m_hIOCP, dwNumBytes, CompKey, po);
 
- 		return fOk;
 
- 	}
 
- 	//从IO完成端口队列中获取事件通知, IO完成队列中没有事件时,该函数阻塞
 
- 	bool GetStatus(ULONG_PTR* pCompKey, PDWORD pdwNumBytes, OVERLAPPED** ppo, DWORD dwMillseconds = INFINITE)
 
- 	{
 
- 		return GetQueuedCompletionStatus(m_hIOCP, pdwNumBytes, pCompKey, ppo, dwMillseconds);
 
- 	}
 
- 	//获取IOCP对象
 
- 	const HANDLE GetIOCP()
 
- 	{
 
- 		return m_hIOCP;
 
- 	}
 
- private:
 
- 	HANDLE m_hIOCP= INVALID_HANDLE_VALUE;   //IOCP句柄
 
- };
 
- /*
 
- 在IOCP编程模型中,需要用到GetQueuedCompletionStatus()函数获取已完成的事件
 
- 但是该函数的返回参数无 Socket或者buffer的描述信息
 
- 一个简单的方法是:创建一个新的结构,该结构第一个参数是OVERLAPPED
 
- 由于AcceptEx, WSASend 等重叠IO操作传入的是Overlapped结构体的地址,调用AcceptEx等重叠IO操作,在Overlapped结构体后面开辟新的空间
 
- 写入socket或者buffer的信息,即可将socket或者buffer的信息由GetQueuedComletionStatus带回
 
- */
 
- #define MAXBUF 8*1024
 
- enum IOOperType
 
- {
 
- 	TYPE_ACP,					//accept事件到达,有新的连接请求
 
- 	TYPE_RECV,				   //数据接收事件
 
- 	TYPE_SEND,               //数据发送事件
 
- 	TYPE_CLOSE_SOCK,			//服务端主动关闭socket
 
- 	TYPE_CLOSE,             //关闭事件
 
- 	TYPE_NO_OPER
 
- };
 
- class COverlappedIOInfo : public OVERLAPPED
 
- {
 
- public:
 
- 	COverlappedIOInfo(void)
 
- 	{
 
- 		m_socket = INVALID_SOCKET;
 
- 		ResetOverlapped();
 
- 		ResetRecvBuffer();
 
- 		ResetSendBuffer();
 
- 	}
 
- 	~COverlappedIOInfo(void)
 
- 	{
 
- 		if (m_socket != INVALID_SOCKET)
 
- 		{
 
- 			closesocket(m_socket);
 
- 			m_socket = INVALID_SOCKET;
 
- 		}
 
- 	}
 
- 	void ResetOverlapped()
 
- 	{
 
- 		Internal = InternalHigh = 0;
 
- 		Offset = OffsetHigh = 0;
 
- 		hEvent = NULL;
 
- 	}
 
- 	void ResetRecvBuffer()
 
- 	{
 
- 		ZeroMemory(m_crecvBuf, MAXBUF);
 
- 		m_recvBuf.buf = m_crecvBuf;
 
- 		m_recvBuf.len = MAXBUF;
 
- 	}
 
- 	void ResetSendBuffer()
 
- 	{
 
- 		ZeroMemory(m_csendBuf, MAXBUF);
 
- 		m_sendBuf.buf = m_csendBuf;
 
- 		m_sendBuf.len = MAXBUF;
 
- 	}
 
- public:
 
- 	SOCKET m_socket;		//套接字
 
- 							//接收缓冲区,用于AcceptEx, WSARecv操作
 
- 	WSABUF m_recvBuf;
 
- 	char m_crecvBuf[MAXBUF];
 
- 	//发送缓冲区, 用于WSASend操作
 
- 	WSABUF m_sendBuf;
 
- 	char m_csendBuf[MAXBUF];
 
- 	//对端地址
 
- 	BOOL bIPv6addr=FALSE;
 
- 	sockaddr_in m_addr;
 
- 	SOCKADDR_IN6 m_addr_IPv6;
 
- 	ClientInfo m_cltInfo;
 
- };
 
- //目的:防止断开连接后,其他地方还保留有连接信息。保留15分钟
 
- struct CConnectCache
 
- {
 
- 	std::shared_ptr<COverlappedIOInfo>	pConnInfo;		//连接信息
 
- 	DWORD								dwDisConnect;	//断开时间
 
- };
 
- #define HISTORY_CONN_STATIC_COUNT 500
 
- //#define IN_WINDOWS 
 
- typedef enum connDataWay {
 
- 	CONN_DATA_SEND = 0,
 
- 	CONN_DATA_RECV
 
- }ConnDataWay;
 
- typedef struct connDataNode {
 
- 	SYSTEMTIME time;
 
- 	ConnDataWay way; // 0 send. 1 recv.
 
- 	char* data;
 
- 	int iDataLen;
 
- 	connDataNode() {
 
- 		memset(&time, 0, sizeof(SYSTEMTIME));
 
- 		data = NULL;
 
- 		iDataLen = 0;
 
- 	}
 
- 	~connDataNode() {
 
- 		if (data) delete[]data;
 
- 	}
 
- }ConnDataNode;
 
- typedef struct connHistoryData {
 
- 	ConnDataNode dataBuffer[HISTORY_CONN_STATIC_COUNT];
 
- 	HANDLE m_mutex;
 
- 	int startIndex;
 
- 	int iLastIndex;
 
- 	int endIndex;
 
- 	bool bIsFull;
 
- 	connHistoryData() {
 
- 		startIndex = endIndex = iLastIndex = 0;
 
- 		bIsFull = false;
 
- 		m_mutex = CreateMutex(NULL, FALSE, NULL);
 
- 	}
 
- 	~connHistoryData() {
 
- 		if (m_mutex) {
 
- 			CloseHandle(m_mutex);
 
- 			m_mutex = NULL;
 
- 		}
 
- 	}
 
- 	bool isFull() {
 
- 		return bIsFull;
 
- 	}
 
- 	bool Lock() {
 
- 		if (m_mutex) {
 
- 			WaitForSingleObject(m_mutex, INFINITE);
 
- 			return true;
 
- 		}
 
- 		return false;
 
- 	}
 
- 	void Unlock() {
 
- 		if (m_mutex) ReleaseMutex(m_mutex);
 
- 	}
 
- 	void AddData(char* buf, int iLen, ConnDataWay way) {
 
- 		if (!Lock()) return;
 
- 		::GetLocalTime(&dataBuffer[endIndex].time);
 
- 		dataBuffer[endIndex].way = way;
 
- 		if (dataBuffer[endIndex].data) delete[]dataBuffer[endIndex].data;
 
- 		dataBuffer[endIndex].data = new char[iLen];
 
- 		if (!dataBuffer[endIndex].data) return;
 
- 		memcpy(dataBuffer[endIndex].data, buf, iLen);
 
- 		dataBuffer[endIndex].iDataLen = iLen;
 
- 		iLastIndex = endIndex;
 
- 		endIndex = (endIndex + 1) % HISTORY_CONN_STATIC_COUNT;
 
- 		if (bIsFull) startIndex = endIndex;
 
- 		else if (endIndex == startIndex) bIsFull = true;
 
- 		Unlock();
 
- 	}
 
- }ConnHistoryData;
 
- class TCPServerIOCP : public ITCPServer {
 
- public:
 
- 	bool StartServer(ITcpServerCallBack* pUser, int iPort, CString strLocalIP = _T(""));
 
- 	bool StartServer_IPv6(ITcpServerCallBack* pUser, int iPort, CString strLocalIP = _T(""));
 
- 	void StopServer();
 
- 	BOOL SendData(const char* pData, int iLen, ClientInfo* pCltInfo);
 
- 	BOOL SendData(const char* pData, int iLen, CString strIP);
 
- 	BOOL SendData(const char* pData, int iLen, CString strIP, int port, SOCKET& sock);
 
- 	BOOL SendData(const char* pData, int iLen, SOCKET& sock);
 
- 	BOOL SendData(const char* pData, int iLen);
 
- 	ITcpServerCallBack* m_pCallBackUser = nullptr;
 
- 	ITcpServerCallBack* m_pCallBackUser_IPv6 = nullptr;
 
- 	BOOL m_bStarted;
 
- 	BOOL m_bStarted_IPv6;
 
- 	void Log(char* sz);
 
- 	void (*pLog)(char*);//日志接口
 
- 	std::function<void(int, CString, int, const void*, int, int, int)> m_callBackTcpLog;
 
- 	//获取IP列表
 
- 	virtual void GetIPList(std::vector<CString> &vec);
 
- 	virtual void GetConnectInfo(std::vector<ClientInfo *> &vec);
 
- 	virtual void GetHistoryConnectInfo(std::map<CString, ConnHistoryInfo *> &mapHis) 
 
- 	{
 
- 		mapHis.insert(m_mapConnHistory.begin(), m_mapConnHistory.end());
 
- 	}
 
- 	virtual ConnHistoryInfo *GetHistoryConnectInfo(const CString &strIP)
 
- 	{
 
- 		auto pFind = m_mapConnHistory.find(strIP);
 
- 		if (pFind != m_mapConnHistory.end()) {
 
- 			return pFind->second;
 
- 		}
 
- 		return NULL;
 
- 	}
 
- private:
 
- 	//新连接绑定历史记录
 
- 	void BindHistoryData(COverlappedIOInfo* info, bool bRecv = false);
 
- public:
 
- 	//构造函数
 
- 	TCPServerIOCP();
 
- 	//虚构函数
 
- 	~TCPServerIOCP();
 
- 	//开始监听
 
- 	bool StartListen(unsigned short port, string ip);
 
- 	bool StartListen_IPv6(unsigned short port, string ip);
 
- 	int CalcTimePassSecond(SYSTEMTIME* stLast, SYSTEMTIME* stNow = NULL);
 
- 	void ConnectionMaintain();
 
- 	/*
 
- 	释放3个部分步骤:
 
- 	1. 清空IOCP线程队列,退出线程
 
- 	2. 清空等待accept套接字m_vecAcps
 
- 	3. 清空已连接套接字m_vecContInfo并清空缓存
 
- 	*/
 
- 	void CloseServer();
 
- 	//私有成员函数
 
- 	//启动CPU*2个线程,返回已启动的线程个数
 
- 	int StartWorkThreadPool();
 
- 	int StartWorkThreadPool_IPv6();
 
- 	//处理accept请求,NumberOfBytes=0表示没有收到第一帧数据, >0时表示收到了第一帧数据
 
- 	bool DoAccept(SOCKET sockAccept, SOCKADDR_IN* ClientAddr);
 
- 	bool DoAccept_IPv6(SOCKET sockAccept, SOCKADDR_IN6* ClientAddr);
 
- 	//投递recv请求
 
- 	bool PostRecv(COverlappedIOInfo* info);
 
- 	//处理recv请求
 
- 	bool DoRecv(COverlappedIOInfo* info);
 
- 	//从已连接的socket列表中移除socket及释放空间
 
- 	bool DeleteLink(SOCKET s, char* strError);
 
- 	BOOL IsIPOnline(CString strIP);
 
- 	int StaticConnData(CString strIp, const char* data, int iLen, ConnDataWay way);
 
- 	virtual void ClearConnHistoryInfo();
 
- 	//winsock版本类型
 
- 	WSAData m_wsaData;
 
- 	//临时加一个黑名单地址 scz 
 
- 	ULONG m_blackaddr = 0;
 
- 	//端口监听套接字
 
- 	SOCKET m_sListen;
 
- 	SOCKET m_sListen_IPv6;
 
- 	//已建立连接的信息,每个结构含有一个套接字、发送缓冲区、接收缓冲区、对端地址
 
- 	vector<COverlappedIOInfo*> m_vecContInfo;
 
- 	//缓冲连接信息
 
- 	std::list<CConnectCache> m_listContInfo;
 
- 	CCriticalSection m_csCacheListLock;
 
- 	//操作vector的互斥访问锁
 
- 	CCriticalSection m_csClientVectorLock;
 
- 	//CIOCP封装类
 
- 	CIOCP m_iocp;
 
- 	CIOCP m_iocp_IPv6;
 
- 	//连接历史信息和状态信息
 
- 	std::map<CString, ConnHistoryInfo*> m_mapConnHistory;//暂时弃用,后续启用时记得加锁防止多线程并发访问。多线程操作会出现map有两个相同的key
 
- 	static void ListenThread_IOCPServer(LPVOID lpParam);
 
- 	static void ListenThread_IOCPServer_IPv6(LPVOID lpParam);
 
- 	CString m_strServerIP;
 
- 	int m_iServerPort;
 
- 	DWORD m_lastError;
 
- 	CString m_strServerIP_IPv6;
 
- 	int m_iServerPort_IPv6;
 
- };
 
 
  |