SuperManager.cpp 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #include "stdafx.h"
  2. #include "SuperManager.h"
  3. #include <ODBC/DBConnectPool.h>
  4. #include <ZlDataDefine.h>
  5. #include <Simplelog.h>
  6. #include <AppService.h>
  7. CSendTask::CSendTask(LPBYTE pPack, int nPackLen, uint32_t no, E_ZL_PROTOCAL proto, bool r)
  8. {
  9. pdata = new BYTE[nPackLen];
  10. //SPDLOG_ERROR("mem:new {:X}", (DWORD_PTR)pdata);
  11. memcpy(pdata, pPack, nPackLen);
  12. nlen = nPackLen;
  13. this->packno = no;
  14. this->protocal = proto;
  15. bRePeatSend = r;
  16. }
  17. void CSendTask::join()
  18. {
  19. delete this;
  20. }
  21. CSendTask::~CSendTask()
  22. {
  23. //SPDLOG_ERROR("mem:del {:X}", (DWORD_PTR)pdata);
  24. delete[] pdata;
  25. pdata = nullptr;
  26. nlen = 0;
  27. }
  28. CSuperManager::CSuperManager()
  29. {
  30. }
  31. CSuperManager::~CSuperManager()
  32. {
  33. Stop();
  34. }
  35. BOOL CSuperManager::Start(CString m_strIniPath, uint16_t port /*= 10089*/)
  36. {
  37. Stop();
  38. m_super_num = ::GetPrivateProfileInt("SET", "super_num", 0, m_strIniPath);
  39. ::WritePrivateProfileStringA("SET", "super_num", (to_string(m_super_num) + " #推送个数").c_str(), m_strIniPath);
  40. if (m_super_num == 0 || m_super_num > 10) return TRUE;
  41. //m_handler = new CSuperHandler();
  42. for (size_t i = 1; i <= m_super_num; i++)
  43. {
  44. char szTemp[100] = { 0 };
  45. ::GetPrivateProfileStringA("SET", fmt::format("super_ip_{}", i).c_str(), "", szTemp, sizeof(szTemp), m_strIniPath);
  46. if (szTemp[0x00] == 0x00)
  47. {
  48. ::WritePrivateProfileStringA("SET", fmt::format("super_ip_{}", i).c_str(), "127.0.0.1", m_strIniPath);
  49. continue;
  50. }
  51. uint16_t p = ::GetPrivateProfileInt("SET", fmt::format("super_port_{}", i).c_str(), 10090, m_strIniPath);
  52. //添加记录
  53. auto& it = m_super[szTemp];
  54. strcpy_s(it.ip, 16, szTemp);
  55. it.port = p;
  56. it.addr.sin_family = AF_INET;
  57. inet_pton(AF_INET, szTemp, &it.addr.sin_addr);
  58. it.addr.sin_port = htons(p);
  59. //设置缓冲区
  60. //it.m_socket.SetBuffer(&it.m_buffer);
  61. it.m_client.SetBuffer(&it.m_buffer);
  62. it.m_client.set_addr(it.addr);
  63. //设置处理器
  64. it.handler.SetFather(&it);
  65. it.handler.SetClient(&it.m_client);
  66. it.m_client.SetProtocolHandler(&it.handler);
  67. auto ret = it.m_client.conn_server(); //链接服务器
  68. if (ret != 0) return FALSE;
  69. }
  70. m_bWork = true;
  71. m_pThread = new std::thread(ThreadProcForSend, (DWORD_PTR)this);
  72. return TRUE;
  73. }
  74. void CSuperManager::Stop()
  75. {
  76. m_bWork = false;
  77. if (m_pThread)
  78. {
  79. m_pThread->join();
  80. delete m_pThread;
  81. m_pThread = nullptr;
  82. }
  83. //if (m_socket)
  84. //{
  85. // m_socket->Close();
  86. // m_socket = nullptr;
  87. //}
  88. //关闭链路
  89. for (auto& it : m_super)
  90. {
  91. it.second.m_socket.Close();
  92. it.second.m_client.close();
  93. }
  94. Sleep(0);
  95. m_super.clear();
  96. }
  97. CSuperManager CSuperManager::obj;
  98. void CSuperManager::ThreadProcForSend(DWORD_PTR pThis)
  99. {
  100. auto pService = (CSuperManager*)pThis;
  101. uint32_t last_pack_no = 0;
  102. E_ZL_PROTOCAL last_protocal = E_ZL_PROTOCAL::ZL_UNREALIZED;
  103. bool bSleep = false;
  104. time_t tmNow;
  105. uint8_t bBeart[100] = { 0 };
  106. int nBeart = CHjDataConver::conver_sendpack(bBeart, 0, 0, 0, 0, E_ZL_PROTOCAL::ZL_KEEP,
  107. OR_DATA_INFO(0, 0, 1, 2, OPT_TYPE::OPT_SYNC));
  108. time_t tmLastConnTime = 0;
  109. time_t tmLastSendBeart = 0;
  110. do
  111. {
  112. time(&tmNow);
  113. Sleep(950);
  114. //if (tmLastConnTime != tmNow)
  115. //{//每秒重连一次
  116. // tmLastConnTime = tmNow;
  117. // for (auto& it : pService->m_super)
  118. // {
  119. // if (it.second.m_socket.IsInactive(180))
  120. // {
  121. // SPDLOG_INFO("TCP重连 {}:{}", it.second.ip, it.second.port);
  122. // it.second.m_socket.Close();
  123. // if (false == it.second.m_socket.ConnectServer(it.second.ip, it.second.port))
  124. // SPDLOG_WARN("TCP重连失败 {}:{}", it.second.ip, it.second.port);
  125. // }
  126. // }
  127. //}
  128. if (tmNow - tmLastSendBeart >= 30)
  129. {//30秒发送一次心跳
  130. tmLastSendBeart = tmNow;
  131. for (auto& it : pService->m_super)
  132. {
  133. //auto ret = it.second.m_socket.Send(bBeart, nBeart);
  134. auto ret = it.second.handler.Send(bBeart, nBeart);
  135. if (ret)
  136. {
  137. it.second.sendlen += nBeart;
  138. if (it.second.tmLastSendBeart == 0) it.second.tmLastSendBeart = tmNow;
  139. auto dif = tmNow - it.second.tmLastSendBeart;
  140. it.second.tmLastSendBeart = tmNow;
  141. if (dif > 180)
  142. SPDLOG_WARN("心跳发送间隔超时{}:{},{} s", it.second.ip, it.second.port, dif);
  143. }
  144. }
  145. }
  146. {//1秒重新发送一次
  147. for (auto& it : pService->m_super)
  148. //it.second.m_socket.ReSend();
  149. it.second.handler.ReSend();
  150. }
  151. } while (pService->m_bWork);
  152. }