#include "stdafx.h" #include "SuperTCPClient.h" #include "Simplelog.h" #include #include "AppService.h" #include #include "MonitorObject.h" using namespace jsonxx; CSuperTCPClient::CSuperTCPClient() { } CSuperTCPClient::~CSuperTCPClient() { //if (m_pBuffer) m_pBuffer->Reset(); } void CSuperTCPClient::Close() { if (m_hSocket != INVALID_SOCKET) SPDLOG_ERROR("主动断开TCP链路:{}:{}", m_strServerIP, m_nServerPort); return __super::Close(); } void CSuperTCPClient::Insert(LPBYTE pPack, int nPackLen, uint32_t packno, E_ZL_PROTOCAL protocal, bool r) { if (m_task.size() > 1000) SPDLOG_WARN("待发送数据队列异常:{}", m_task.size()); if (nPackLen > 1500) SPDLOG_INFO("send e packno:{})", packno); auto ret = Send(pPack, nPackLen); if (ret == SOCKET_ERROR) { m_pFather->sendfail++; SPDLOG_WARN("send fail. packno:{})", packno); } else if (nPackLen == ret) { m_pFather->sendcount++; m_pFather->sendlen += ret; } else ASSERT(0); //加入队列并发送 if (r || ret == SOCKET_ERROR) { CSendTask* pTask = new CSendTask(pPack, nPackLen, packno, protocal, r); std::lock_guard lock(m_mtx); m_task.emplace_back(pTask); if (m_task.size() > 1500) { auto it = m_task.front(); m_task.pop_front(); it->join(); } } //统计发送量 time_t tmNow; time(&tmNow); m_mapSendCout[tmNow / 3600] += nPackLen; if (m_mapSendCout.size() > 1) { auto it = m_mapSendCout.begin(); string strLog; auto& item = it->second; if (it->second > 1024 * 1024 * 1024) strLog = fmt::format("{}GB {}MB {} KB", item / (1024 * 1024 * 1024), item % (1024 * 1024 * 1024) / (1024 * 1024), item % (1024 * 1024) / 1024); else if (it->second > 1024 * 1024) strLog = fmt::format("{}MB {} KB", item / (1024 * 1024), item % (1024 * 1024) / 1024); else strLog = fmt::format("{} KB ", item / 1024); SPDLOG_WARN("上送流量统计:{} {}", CTime(it->first * 3600).Format("%Y-%m-%d %H:%M:%S"), strLog); m_mapSendCout.erase(it); } } void CSuperTCPClient::OnConnect(int nErrorCode) { SPDLOG_ERROR("收到TCP链路成功:{}:{}", m_strServerIP, m_nServerPort); return __super::OnConnect(nErrorCode); } void CSuperTCPClient::OnClose(int nErrorCode) { SPDLOG_ERROR("收到TCP链路断开:{}:{}", m_strServerIP, m_nServerPort); return __super::OnClose(nErrorCode); } BOOL CSuperTCPClient::RecvPack(uint32_t packno, E_ZL_PROTOCAL protocal) { m_pFather->recvcount++; SPDLOG_INFO("recv 协议号:{} packno:{})", uint8_t(protocal), packno); std::lock_guard lock(m_mtx); for (auto it = m_task.begin(); it != m_task.end(); it++) { auto p = *it; if (p->packno == packno && p->protocal == protocal) { m_task.erase(it); p->join(); } return TRUE; } return FALSE; } BOOL CSuperTCPClient::ReSend() { if (m_task.size() == 0) return TRUE; std::lock_guard lock(m_mtx); if (m_task.size() > 0) { auto& it = m_task.front(); auto ret = Send(it->pdata, it->nlen) != SOCKET_ERROR; if (ret) { m_pFather->sendlen += ret; m_pFather->sendcount++; } else m_pFather->sendfail++; return ret; } return FALSE; } BOOL CSuperTCPClient::HandleSubNotify(LPHJDATAHEAD2 lpHead, char* json, int json_len) { Object obj; obj.parse(string(json, json_len)); if (obj.empty()) return FALSE; auto momp = obj.get("momp"); if (momp.empty()) return FALSE; string imei; int idx; CMonitorObjectMng::Instance()->MOMP2IMEI(momp, imei, idx); return CAppService::Instance()->GetHandle()->SendMsgToDevice(imei.c_str()); } void CSuperTCPClient::ProcessPack(LPBYTE pPack, int nPackLen) { //解析 if (!CHjDataConver::conver_recvpack(pPack, nPackLen)) return; LPHJDATAHEAD2 lpHead = (LPHJDATAHEAD2)pPack; if (lpHead->protocol == E_ZL_PROTOCAL::ZL_KEEP) return ; //SPDLOG_INFO("recv {}:{} : {}", m_strServerIP, m_nServerPort, CSimpleLog::GetHexString2(pPack, nPackLen)); BOOL bRet = FALSE; switch (lpHead->protocol) { case E_ZL_PROTOCAL::ZL_MO: case E_ZL_PROTOCAL::ZL_MP: case E_ZL_PROTOCAL::ZL_RESIST_DATA: case E_ZL_PROTOCAL::ZL_RESIST_REAL: case E_ZL_PROTOCAL::ZL_TEMP_HUMI: case E_ZL_PROTOCAL::ZL_ALARM_SET: case E_ZL_PROTOCAL::ZL_ALARM_DATA: case E_ZL_PROTOCAL::ZL_ALARM_ACK: case E_ZL_PROTOCAL::ZL_ALARM_HANDLE: case E_ZL_PROTOCAL::ZL_SVG: case E_ZL_PROTOCAL::ZL_REFER: case E_ZL_PROTOCAL::ZL_MOVE: case E_ZL_PROTOCAL::ZL_ALARM_UNACK: bRet = RecvPack(lpHead->packno, lpHead->protocol); //发出确认包 break; case E_ZL_PROTOCAL::DL_SUB_NOTIFY: HandleSubNotify(lpHead, (char*)pPack + sizeof(HJDATAHEAD2), lpHead->len); break; case E_ZL_PROTOCAL::DL_SYNC: CAppService::Instance()->SetSycnSuperTime(); break; case E_ZL_PROTOCAL::DL_ALARM_ACK: case E_ZL_PROTOCAL::DL_ALARM_HANDLE: break; default: CSimpleLog::Warn(fmt::format("收到未实现的命令:{}", uint8_t(lpHead->protocol)).c_str()); break; } }