#include "stdafx.h" #include "SuperManager.h" #include #include #include #include CSendTask::CSendTask(LPBYTE pPack, int nPackLen, uint32_t no, E_ZL_PROTOCAL proto, bool r) { pdata = new BYTE[nPackLen]; //SPDLOG_ERROR("mem:new {:X}", (DWORD_PTR)pdata); memcpy(pdata, pPack, nPackLen); nlen = nPackLen; this->packno = no; this->protocal = proto; bRePeatSend = r; } void CSendTask::join() { delete this; } CSendTask::~CSendTask() { //SPDLOG_ERROR("mem:del {:X}", (DWORD_PTR)pdata); delete[] pdata; pdata = nullptr; nlen = 0; } CSuperManager::CSuperManager() { } CSuperManager::~CSuperManager() { Stop(); } BOOL CSuperManager::Start(CString m_strIniPath, uint16_t port /*= 10089*/) { Stop(); m_super_num = ::GetPrivateProfileInt("SET", "super_num", 0, m_strIniPath); ::WritePrivateProfileStringA("SET", "super_num", (to_string(m_super_num) + " #推送个数").c_str(), m_strIniPath); if (m_super_num == 0 || m_super_num > 10) return TRUE; //m_handler = new CSuperHandler(); for (size_t i = 1; i <= m_super_num; i++) { char szTemp[100] = { 0 }; ::GetPrivateProfileStringA("SET", fmt::format("super_ip_{}", i).c_str(), "", szTemp, sizeof(szTemp), m_strIniPath); if (szTemp[0x00] == 0x00) { ::WritePrivateProfileStringA("SET", fmt::format("super_ip_{}", i).c_str(), "127.0.0.1", m_strIniPath); continue; } uint16_t p = ::GetPrivateProfileInt("SET", fmt::format("super_port_{}", i).c_str(), 10090, m_strIniPath); //添加记录 auto& it = m_super[szTemp]; strcpy_s(it.ip, 16, szTemp); it.port = p; it.addr.sin_family = AF_INET; inet_pton(AF_INET, szTemp, &it.addr.sin_addr); it.addr.sin_port = htons(p); //设置缓冲区 //it.m_socket.SetBuffer(&it.m_buffer); it.m_client.SetBuffer(&it.m_buffer); it.m_client.set_addr(it.addr); //设置处理器 it.handler.SetFather(&it); it.handler.SetClient(&it.m_client); it.m_client.SetProtocolHandler(&it.handler); auto ret = it.m_client.conn_server(); //链接服务器 if (ret != 0) return FALSE; } m_bWork = true; m_pThread = new std::thread(ThreadProcForSend, (DWORD_PTR)this); return TRUE; } void CSuperManager::Stop() { m_bWork = false; if (m_pThread) { m_pThread->join(); delete m_pThread; m_pThread = nullptr; } //if (m_socket) //{ // m_socket->Close(); // m_socket = nullptr; //} //关闭链路 for (auto& it : m_super) { it.second.m_socket.Close(); it.second.m_client.close(); } Sleep(0); m_super.clear(); } CSuperManager CSuperManager::obj; void CSuperManager::ThreadProcForSend(DWORD_PTR pThis) { auto pService = (CSuperManager*)pThis; uint32_t last_pack_no = 0; E_ZL_PROTOCAL last_protocal = E_ZL_PROTOCAL::ZL_UNREALIZED; bool bSleep = false; time_t tmNow; uint8_t bBeart[100] = { 0 }; int nBeart = CHjDataConver::conver_sendpack(bBeart, 0, 0, 0, 0, E_ZL_PROTOCAL::ZL_KEEP, OR_DATA_INFO(0, 0, 1, 2, OPT_TYPE::OPT_SYNC)); time_t tmLastConnTime = 0; time_t tmLastSendBeart = 0; do { time(&tmNow); Sleep(950); //if (tmLastConnTime != tmNow) //{//每秒重连一次 // tmLastConnTime = tmNow; // for (auto& it : pService->m_super) // { // if (it.second.m_socket.IsInactive(180)) // { // SPDLOG_INFO("TCP重连 {}:{}", it.second.ip, it.second.port); // it.second.m_socket.Close(); // if (false == it.second.m_socket.ConnectServer(it.second.ip, it.second.port)) // SPDLOG_WARN("TCP重连失败 {}:{}", it.second.ip, it.second.port); // } // } //} if (tmNow - tmLastSendBeart >= 30) {//30秒发送一次心跳 tmLastSendBeart = tmNow; for (auto& it : pService->m_super) { //auto ret = it.second.m_socket.Send(bBeart, nBeart); auto ret = it.second.handler.Send(bBeart, nBeart); if (ret) { it.second.sendlen += nBeart; if (it.second.tmLastSendBeart == 0) it.second.tmLastSendBeart = tmNow; auto dif = tmNow - it.second.tmLastSendBeart; it.second.tmLastSendBeart = tmNow; if (dif > 180) SPDLOG_WARN("心跳发送间隔超时{}:{},{} s", it.second.ip, it.second.port, dif); } } } {//1秒重新发送一次 for (auto& it : pService->m_super) //it.second.m_socket.ReSend(); it.second.handler.ReSend(); } } while (pService->m_bWork); }