| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- #include "stdafx.h"
- #include "SuperTCPClient.h"
- #include "Simplelog.h"
- #include <ZlDataDefine.h>
- #include "AppService.h"
- #include <jsonxx.h>
- #include "MonitorObject.h"
- using namespace jsonxx;
- CSuperTCPClient::CSuperTCPClient()
- {
- }
- CSuperTCPClient::~CSuperTCPClient()
- {
- //if (m_pBuffer) m_pBuffer->Reset();
- }
- void CSuperTCPClient::Close()
- {
- if (m_hSocket != INVALID_SOCKET)
- SPDLOG_ERROR("主动断开TCP链路:{}:{}", m_strServerIP, m_nServerPort);
- return __super::Close();
- }
- void CSuperTCPClient::Insert(LPBYTE pPack, int nPackLen, uint32_t packno, E_ZL_PROTOCAL protocal, bool r)
- {
- if (m_task.size() > 1000)
- SPDLOG_WARN("待发送数据队列异常:{}", m_task.size());
- if (nPackLen > 1500) SPDLOG_INFO("send e packno:{})", packno);
- auto ret = Send(pPack, nPackLen);
- if (ret == SOCKET_ERROR)
- {
- m_pFather->sendfail++;
- SPDLOG_WARN("send fail. packno:{})", packno);
- }
- else if (nPackLen == ret)
- {
- m_pFather->sendcount++;
- m_pFather->sendlen += ret;
- }
- else
- ASSERT(0);
- //加入队列并发送
- if (r || ret == SOCKET_ERROR)
- {
- CSendTask* pTask = new CSendTask(pPack, nPackLen, packno, protocal, r);
- std::lock_guard<std::mutex> lock(m_mtx);
- m_task.emplace_back(pTask);
- if (m_task.size() > 1500)
- {
- auto it = m_task.front();
- m_task.pop_front();
- it->join();
- }
- }
- //统计发送量
- time_t tmNow;
- time(&tmNow);
- m_mapSendCout[tmNow / 3600] += nPackLen;
- if (m_mapSendCout.size() > 1)
- {
- auto it = m_mapSendCout.begin();
- string strLog;
- auto& item = it->second;
- if (it->second > 1024 * 1024 * 1024)
- strLog = fmt::format("{}GB {}MB {} KB", item / (1024 * 1024 * 1024),
- item % (1024 * 1024 * 1024) / (1024 * 1024), item % (1024 * 1024) / 1024);
- else if (it->second > 1024 * 1024)
- strLog = fmt::format("{}MB {} KB", item / (1024 * 1024), item % (1024 * 1024) / 1024);
- else
- strLog = fmt::format("{} KB ", item / 1024);
- SPDLOG_WARN("上送流量统计:{} {}", CTime(it->first * 3600).Format("%Y-%m-%d %H:%M:%S"), strLog);
- m_mapSendCout.erase(it);
- }
- }
- void CSuperTCPClient::OnConnect(int nErrorCode)
- {
- SPDLOG_ERROR("收到TCP链路成功:{}:{}", m_strServerIP, m_nServerPort);
- return __super::OnConnect(nErrorCode);
- }
- void CSuperTCPClient::OnClose(int nErrorCode)
- {
- SPDLOG_ERROR("收到TCP链路断开:{}:{}", m_strServerIP, m_nServerPort);
- return __super::OnClose(nErrorCode);
- }
- BOOL CSuperTCPClient::RecvPack(uint32_t packno, E_ZL_PROTOCAL protocal)
- {
- m_pFather->recvcount++;
- SPDLOG_INFO("recv 协议号:{} packno:{})", uint8_t(protocal), packno);
- std::lock_guard<std::mutex> lock(m_mtx);
- for (auto it = m_task.begin(); it != m_task.end(); it++)
- {
- auto p = *it;
- if (p->packno == packno && p->protocal == protocal)
- {
- m_task.erase(it);
- p->join();
- }
- return TRUE;
- }
- return FALSE;
- }
- BOOL CSuperTCPClient::ReSend()
- {
- if (m_task.size() == 0) return TRUE;
- std::lock_guard<std::mutex> lock(m_mtx);
- if (m_task.size() > 0)
- {
- auto& it = m_task.front();
- auto ret = Send(it->pdata, it->nlen) != SOCKET_ERROR;
- if (ret)
- {
- m_pFather->sendlen += ret;
- m_pFather->sendcount++;
- }
- else
- m_pFather->sendfail++;
- return ret;
- }
- return FALSE;
- }
- BOOL CSuperTCPClient::HandleSubNotify(LPHJDATAHEAD2 lpHead, char* json, int json_len)
- {
- Object obj;
- obj.parse(string(json, json_len));
- if (obj.empty()) return FALSE;
- auto momp = obj.get<string>("momp");
- if (momp.empty()) return FALSE;
- string imei; int idx;
- CMonitorObjectMng::Instance()->MOMP2IMEI(momp, imei, idx);
- return CAppService::Instance()->GetHandle()->SendMsgToDevice(imei.c_str());
- }
- void CSuperTCPClient::ProcessPack(LPBYTE pPack, int nPackLen)
- {
- //解析
- if (!CHjDataConver::conver_recvpack(pPack, nPackLen))
- return;
- LPHJDATAHEAD2 lpHead = (LPHJDATAHEAD2)pPack;
- if (lpHead->protocol == E_ZL_PROTOCAL::ZL_KEEP)
- return ;
- //SPDLOG_INFO("recv {}:{} : {}", m_strServerIP, m_nServerPort, CSimpleLog::GetHexString2(pPack, nPackLen));
- BOOL bRet = FALSE;
- switch (lpHead->protocol)
- {
- case E_ZL_PROTOCAL::ZL_MO:
- case E_ZL_PROTOCAL::ZL_MP:
- case E_ZL_PROTOCAL::ZL_RESIST_DATA:
- case E_ZL_PROTOCAL::ZL_RESIST_REAL:
- case E_ZL_PROTOCAL::ZL_TEMP_HUMI:
- case E_ZL_PROTOCAL::ZL_ALARM_SET:
- case E_ZL_PROTOCAL::ZL_ALARM_DATA:
- case E_ZL_PROTOCAL::ZL_ALARM_ACK:
- case E_ZL_PROTOCAL::ZL_ALARM_HANDLE:
- case E_ZL_PROTOCAL::ZL_SVG:
- case E_ZL_PROTOCAL::ZL_REFER:
- case E_ZL_PROTOCAL::ZL_MOVE:
- case E_ZL_PROTOCAL::ZL_ALARM_UNACK:
- bRet = RecvPack(lpHead->packno, lpHead->protocol); //发出确认包
- break;
- case E_ZL_PROTOCAL::DL_SUB_NOTIFY:
- HandleSubNotify(lpHead, (char*)pPack + sizeof(HJDATAHEAD2), lpHead->len);
- break;
- case E_ZL_PROTOCAL::DL_SYNC:
- CAppService::Instance()->SetSycnSuperTime();
- break;
- case E_ZL_PROTOCAL::DL_ALARM_ACK:
- case E_ZL_PROTOCAL::DL_ALARM_HANDLE:
- break;
- default:
- CSimpleLog::Warn(fmt::format("收到未实现的命令:{}", uint8_t(lpHead->protocol)).c_str());
- break;
- }
- }
|