SuperTCPClient.cpp 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. #include "stdafx.h"
  2. #include "SuperTCPClient.h"
  3. #include "Simplelog.h"
  4. #include <ZlDataDefine.h>
  5. #include "AppService.h"
  6. #include <jsonxx.h>
  7. #include "MonitorObject.h"
  8. using namespace jsonxx;
  9. CSuperTCPClient::CSuperTCPClient()
  10. {
  11. }
  12. CSuperTCPClient::~CSuperTCPClient()
  13. {
  14. //if (m_pBuffer) m_pBuffer->Reset();
  15. }
  16. void CSuperTCPClient::Close()
  17. {
  18. if (m_hSocket != INVALID_SOCKET)
  19. SPDLOG_ERROR("主动断开TCP链路:{}:{}", m_strServerIP, m_nServerPort);
  20. return __super::Close();
  21. }
  22. void CSuperTCPClient::Insert(LPBYTE pPack, int nPackLen, uint32_t packno, E_ZL_PROTOCAL protocal, bool r)
  23. {
  24. if (m_task.size() > 1000)
  25. SPDLOG_WARN("待发送数据队列异常:{}", m_task.size());
  26. if (nPackLen > 1500) SPDLOG_INFO("send e packno:{})", packno);
  27. auto ret = Send(pPack, nPackLen);
  28. if (ret == SOCKET_ERROR)
  29. {
  30. m_pFather->sendfail++;
  31. SPDLOG_WARN("send fail. packno:{})", packno);
  32. }
  33. else if (nPackLen == ret)
  34. {
  35. m_pFather->sendcount++;
  36. m_pFather->sendlen += ret;
  37. }
  38. else
  39. ASSERT(0);
  40. //加入队列并发送
  41. if (r || ret == SOCKET_ERROR)
  42. {
  43. CSendTask* pTask = new CSendTask(pPack, nPackLen, packno, protocal, r);
  44. std::lock_guard<std::mutex> lock(m_mtx);
  45. m_task.emplace_back(pTask);
  46. if (m_task.size() > 1500)
  47. {
  48. auto it = m_task.front();
  49. m_task.pop_front();
  50. it->join();
  51. }
  52. }
  53. //统计发送量
  54. time_t tmNow;
  55. time(&tmNow);
  56. m_mapSendCout[tmNow / 3600] += nPackLen;
  57. if (m_mapSendCout.size() > 1)
  58. {
  59. auto it = m_mapSendCout.begin();
  60. string strLog;
  61. auto& item = it->second;
  62. if (it->second > 1024 * 1024 * 1024)
  63. strLog = fmt::format("{}GB {}MB {} KB", item / (1024 * 1024 * 1024),
  64. item % (1024 * 1024 * 1024) / (1024 * 1024), item % (1024 * 1024) / 1024);
  65. else if (it->second > 1024 * 1024)
  66. strLog = fmt::format("{}MB {} KB", item / (1024 * 1024), item % (1024 * 1024) / 1024);
  67. else
  68. strLog = fmt::format("{} KB ", item / 1024);
  69. SPDLOG_WARN("上送流量统计:{} {}", CTime(it->first * 3600).Format("%Y-%m-%d %H:%M:%S"), strLog);
  70. m_mapSendCout.erase(it);
  71. }
  72. }
  73. void CSuperTCPClient::OnConnect(int nErrorCode)
  74. {
  75. SPDLOG_ERROR("收到TCP链路成功:{}:{}", m_strServerIP, m_nServerPort);
  76. return __super::OnConnect(nErrorCode);
  77. }
  78. void CSuperTCPClient::OnClose(int nErrorCode)
  79. {
  80. SPDLOG_ERROR("收到TCP链路断开:{}:{}", m_strServerIP, m_nServerPort);
  81. return __super::OnClose(nErrorCode);
  82. }
  83. BOOL CSuperTCPClient::RecvPack(uint32_t packno, E_ZL_PROTOCAL protocal)
  84. {
  85. m_pFather->recvcount++;
  86. SPDLOG_INFO("recv 协议号:{} packno:{})", uint8_t(protocal), packno);
  87. std::lock_guard<std::mutex> lock(m_mtx);
  88. for (auto it = m_task.begin(); it != m_task.end(); it++)
  89. {
  90. auto p = *it;
  91. if (p->packno == packno && p->protocal == protocal)
  92. {
  93. m_task.erase(it);
  94. p->join();
  95. }
  96. return TRUE;
  97. }
  98. return FALSE;
  99. }
  100. BOOL CSuperTCPClient::ReSend()
  101. {
  102. if (m_task.size() == 0) return TRUE;
  103. std::lock_guard<std::mutex> lock(m_mtx);
  104. if (m_task.size() > 0)
  105. {
  106. auto& it = m_task.front();
  107. auto ret = Send(it->pdata, it->nlen) != SOCKET_ERROR;
  108. if (ret)
  109. {
  110. m_pFather->sendlen += ret;
  111. m_pFather->sendcount++;
  112. }
  113. else
  114. m_pFather->sendfail++;
  115. return ret;
  116. }
  117. return FALSE;
  118. }
  119. BOOL CSuperTCPClient::HandleSubNotify(LPHJDATAHEAD2 lpHead, char* json, int json_len)
  120. {
  121. Object obj;
  122. obj.parse(string(json, json_len));
  123. if (obj.empty()) return FALSE;
  124. auto momp = obj.get<string>("momp");
  125. if (momp.empty()) return FALSE;
  126. string imei; int idx;
  127. CMonitorObjectMng::Instance()->MOMP2IMEI(momp, imei, idx);
  128. return CAppService::Instance()->GetHandle()->SendMsgToDevice(imei.c_str());
  129. }
  130. void CSuperTCPClient::ProcessPack(LPBYTE pPack, int nPackLen)
  131. {
  132. //解析
  133. if (!CHjDataConver::conver_recvpack(pPack, nPackLen))
  134. return;
  135. LPHJDATAHEAD2 lpHead = (LPHJDATAHEAD2)pPack;
  136. if (lpHead->protocol == E_ZL_PROTOCAL::ZL_KEEP)
  137. return ;
  138. //SPDLOG_INFO("recv {}:{} : {}", m_strServerIP, m_nServerPort, CSimpleLog::GetHexString2(pPack, nPackLen));
  139. BOOL bRet = FALSE;
  140. switch (lpHead->protocol)
  141. {
  142. case E_ZL_PROTOCAL::ZL_MO:
  143. case E_ZL_PROTOCAL::ZL_MP:
  144. case E_ZL_PROTOCAL::ZL_RESIST_DATA:
  145. case E_ZL_PROTOCAL::ZL_RESIST_REAL:
  146. case E_ZL_PROTOCAL::ZL_TEMP_HUMI:
  147. case E_ZL_PROTOCAL::ZL_ALARM_SET:
  148. case E_ZL_PROTOCAL::ZL_ALARM_DATA:
  149. case E_ZL_PROTOCAL::ZL_ALARM_ACK:
  150. case E_ZL_PROTOCAL::ZL_ALARM_HANDLE:
  151. case E_ZL_PROTOCAL::ZL_SVG:
  152. case E_ZL_PROTOCAL::ZL_REFER:
  153. case E_ZL_PROTOCAL::ZL_MOVE:
  154. case E_ZL_PROTOCAL::ZL_ALARM_UNACK:
  155. bRet = RecvPack(lpHead->packno, lpHead->protocol); //发出确认包
  156. break;
  157. case E_ZL_PROTOCAL::DL_SUB_NOTIFY:
  158. HandleSubNotify(lpHead, (char*)pPack + sizeof(HJDATAHEAD2), lpHead->len);
  159. break;
  160. case E_ZL_PROTOCAL::DL_SYNC:
  161. CAppService::Instance()->SetSycnSuperTime();
  162. break;
  163. case E_ZL_PROTOCAL::DL_ALARM_ACK:
  164. case E_ZL_PROTOCAL::DL_ALARM_HANDLE:
  165. break;
  166. default:
  167. CSimpleLog::Warn(fmt::format("收到未实现的命令:{}", uint8_t(lpHead->protocol)).c_str());
  168. break;
  169. }
  170. }