| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 | #include "stdafx.h"#include "SuperTCPClient.h"#include "Simplelog.h"#include <ZlDataDefine.h>#include "AppService.h"#include <jsonxx.h>#include "MonitorObject.h"using namespace jsonxx;CSuperTCPClient::CSuperTCPClient(){}CSuperTCPClient::~CSuperTCPClient(){	//if (m_pBuffer) m_pBuffer->Reset();}void CSuperTCPClient::Close(){	if (m_hSocket != INVALID_SOCKET)		SPDLOG_ERROR("主动断开TCP链路:{}:{}", m_strServerIP, m_nServerPort);	return __super::Close();}void CSuperTCPClient::Insert(LPBYTE pPack, int nPackLen, uint32_t packno, E_ZL_PROTOCAL protocal, bool r){	if (m_task.size() > 1000)		SPDLOG_WARN("待发送数据队列异常:{}", m_task.size());	if (nPackLen > 1500) SPDLOG_INFO("send e packno:{})", packno);	auto ret = Send(pPack, nPackLen);	if (ret == SOCKET_ERROR)	{		m_pFather->sendfail++;		SPDLOG_WARN("send fail. packno:{})", packno);	}	else if (nPackLen == ret)	{		m_pFather->sendcount++;		m_pFather->sendlen += ret;	}	else		ASSERT(0);	//加入队列并发送	if (r || ret == SOCKET_ERROR)	{		CSendTask* pTask = new CSendTask(pPack, nPackLen, packno, protocal, r);		std::lock_guard<std::mutex> lock(m_mtx);		m_task.emplace_back(pTask);		if (m_task.size() > 1500)		{			auto it = m_task.front();			m_task.pop_front();			it->join();		}	}	//统计发送量	time_t tmNow;	time(&tmNow);	m_mapSendCout[tmNow / 3600] += nPackLen;	if (m_mapSendCout.size() > 1)	{		auto it = m_mapSendCout.begin();		string strLog;		auto& item = it->second;		if (it->second > 1024 * 1024 * 1024)			strLog = fmt::format("{}GB {}MB {} KB", item / (1024 * 1024 * 1024), 				item % (1024 * 1024 * 1024) / (1024 * 1024), item % (1024 * 1024) / 1024);		else if (it->second > 1024 * 1024)			strLog = fmt::format("{}MB {} KB", item / (1024 * 1024), item % (1024 * 1024) / 1024);		else 			strLog = fmt::format("{} KB ", item / 1024);		SPDLOG_WARN("上送流量统计:{} {}", CTime(it->first * 3600).Format("%Y-%m-%d %H:%M:%S"), strLog);		m_mapSendCout.erase(it);	}}void CSuperTCPClient::OnConnect(int nErrorCode){	SPDLOG_ERROR("收到TCP链路成功:{}:{}", m_strServerIP, m_nServerPort);	return __super::OnConnect(nErrorCode);}void CSuperTCPClient::OnClose(int nErrorCode){	SPDLOG_ERROR("收到TCP链路断开:{}:{}", m_strServerIP, m_nServerPort);	return __super::OnClose(nErrorCode);}BOOL CSuperTCPClient::RecvPack(uint32_t packno, E_ZL_PROTOCAL protocal){	m_pFather->recvcount++;	SPDLOG_INFO("recv 协议号:{} packno:{})", uint8_t(protocal), packno);	std::lock_guard<std::mutex> lock(m_mtx);	for (auto it = m_task.begin(); it != m_task.end(); it++)	{		auto p = *it;		if (p->packno == packno && p->protocal == protocal)		{			m_task.erase(it);			p->join();		}		return TRUE;	}	return FALSE;}BOOL CSuperTCPClient::ReSend(){	if (m_task.size() == 0) return TRUE;	std::lock_guard<std::mutex> lock(m_mtx);	if (m_task.size() > 0)	{		auto& it = m_task.front();		auto ret = Send(it->pdata, it->nlen) != SOCKET_ERROR;		if (ret)		{			m_pFather->sendlen += ret;			m_pFather->sendcount++;		}		else			m_pFather->sendfail++;		return ret;	}	return FALSE;}BOOL CSuperTCPClient::HandleSubNotify(LPHJDATAHEAD2 lpHead, char* json, int json_len){	Object obj;	obj.parse(string(json, json_len));	if (obj.empty()) return FALSE;	auto momp = obj.get<string>("momp");	if (momp.empty()) return FALSE;	string imei; int idx;	CMonitorObjectMng::Instance()->MOMP2IMEI(momp, imei, idx);	return CAppService::Instance()->GetHandle()->SendMsgToDevice(imei.c_str());}void CSuperTCPClient::ProcessPack(LPBYTE pPack, int nPackLen){	//解析	if (!CHjDataConver::conver_recvpack(pPack, nPackLen))		return;	LPHJDATAHEAD2 lpHead = (LPHJDATAHEAD2)pPack;	if (lpHead->protocol == E_ZL_PROTOCAL::ZL_KEEP)		return ;	//SPDLOG_INFO("recv {}:{} : {}", m_strServerIP, m_nServerPort, CSimpleLog::GetHexString2(pPack, nPackLen));	BOOL bRet = FALSE;	switch (lpHead->protocol)	{	case E_ZL_PROTOCAL::ZL_MO:	case E_ZL_PROTOCAL::ZL_MP:	case E_ZL_PROTOCAL::ZL_RESIST_DATA:	case E_ZL_PROTOCAL::ZL_RESIST_REAL:	case E_ZL_PROTOCAL::ZL_TEMP_HUMI:	case E_ZL_PROTOCAL::ZL_ALARM_SET:	case E_ZL_PROTOCAL::ZL_ALARM_DATA:	case E_ZL_PROTOCAL::ZL_ALARM_ACK:	case E_ZL_PROTOCAL::ZL_ALARM_HANDLE:	case E_ZL_PROTOCAL::ZL_SVG:	case E_ZL_PROTOCAL::ZL_REFER:	case E_ZL_PROTOCAL::ZL_MOVE:	case E_ZL_PROTOCAL::ZL_ALARM_UNACK:		bRet = RecvPack(lpHead->packno, lpHead->protocol);  //发出确认包		break;	case E_ZL_PROTOCAL::DL_SUB_NOTIFY:		HandleSubNotify(lpHead, (char*)pPack + sizeof(HJDATAHEAD2), lpHead->len);		break;	case E_ZL_PROTOCAL::DL_SYNC:		CAppService::Instance()->SetSycnSuperTime();		break;	case E_ZL_PROTOCAL::DL_ALARM_ACK:	case E_ZL_PROTOCAL::DL_ALARM_HANDLE:		break;	default:		CSimpleLog::Warn(fmt::format("收到未实现的命令:{}", uint8_t(lpHead->protocol)).c_str());		break;	}}
 |