| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 | 
							- #include "stdafx.h"
 
- #include "SuperTCPClient.h"
 
- #include "Simplelog.h"
 
- #include <ZlDataDefine.h>
 
- #include "AppService.h"
 
- #include <jsonxx.h>
 
- #include "MonitorObject.h"
 
- using namespace jsonxx;
 
- CSuperTCPClient::CSuperTCPClient()
 
- {
 
- }
 
- CSuperTCPClient::~CSuperTCPClient()
 
- {
 
- 	//if (m_pBuffer) m_pBuffer->Reset();
 
- }
 
- void CSuperTCPClient::Close()
 
- {
 
- 	if (m_hSocket != INVALID_SOCKET)
 
- 		SPDLOG_ERROR("主动断开TCP链路:{}:{}", m_strServerIP, m_nServerPort);
 
- 	return __super::Close();
 
- }
 
- void CSuperTCPClient::Insert(LPBYTE pPack, int nPackLen, uint32_t packno, E_ZL_PROTOCAL protocal, bool r)
 
- {
 
- 	if (m_task.size() > 1000)
 
- 		SPDLOG_WARN("待发送数据队列异常:{}", m_task.size());
 
- 	if (nPackLen > 1500) SPDLOG_INFO("send e packno:{})", packno);
 
- 	auto ret = Send(pPack, nPackLen);
 
- 	if (ret == SOCKET_ERROR)
 
- 	{
 
- 		m_pFather->sendfail++;
 
- 		SPDLOG_WARN("send fail. packno:{})", packno);
 
- 	}
 
- 	else if (nPackLen == ret)
 
- 	{
 
- 		m_pFather->sendcount++;
 
- 		m_pFather->sendlen += ret;
 
- 	}
 
- 	else
 
- 		ASSERT(0);
 
- 	//加入队列并发送
 
- 	if (r || ret == SOCKET_ERROR)
 
- 	{
 
- 		CSendTask* pTask = new CSendTask(pPack, nPackLen, packno, protocal, r);
 
- 		std::lock_guard<std::mutex> lock(m_mtx);
 
- 		m_task.emplace_back(pTask);
 
- 		if (m_task.size() > 1500)
 
- 		{
 
- 			auto it = m_task.front();
 
- 			m_task.pop_front();
 
- 			it->join();
 
- 		}
 
- 	}
 
- 	//统计发送量
 
- 	time_t tmNow;
 
- 	time(&tmNow);
 
- 	m_mapSendCout[tmNow / 3600] += nPackLen;
 
- 	if (m_mapSendCout.size() > 1)
 
- 	{
 
- 		auto it = m_mapSendCout.begin();
 
- 		string strLog;
 
- 		auto& item = it->second;
 
- 		if (it->second > 1024 * 1024 * 1024)
 
- 			strLog = fmt::format("{}GB {}MB {} KB", item / (1024 * 1024 * 1024), 
 
- 				item % (1024 * 1024 * 1024) / (1024 * 1024), item % (1024 * 1024) / 1024);
 
- 		else if (it->second > 1024 * 1024)
 
- 			strLog = fmt::format("{}MB {} KB", item / (1024 * 1024), item % (1024 * 1024) / 1024);
 
- 		else 
 
- 			strLog = fmt::format("{} KB ", item / 1024);
 
- 		SPDLOG_WARN("上送流量统计:{} {}", CTime(it->first * 3600).Format("%Y-%m-%d %H:%M:%S"), strLog);
 
- 		m_mapSendCout.erase(it);
 
- 	}
 
- }
 
- void CSuperTCPClient::OnConnect(int nErrorCode)
 
- {
 
- 	SPDLOG_ERROR("收到TCP链路成功:{}:{}", m_strServerIP, m_nServerPort);
 
- 	return __super::OnConnect(nErrorCode);
 
- }
 
- void CSuperTCPClient::OnClose(int nErrorCode)
 
- {
 
- 	SPDLOG_ERROR("收到TCP链路断开:{}:{}", m_strServerIP, m_nServerPort);
 
- 	return __super::OnClose(nErrorCode);
 
- }
 
- BOOL CSuperTCPClient::RecvPack(uint32_t packno, E_ZL_PROTOCAL protocal)
 
- {
 
- 	m_pFather->recvcount++;
 
- 	SPDLOG_INFO("recv 协议号:{} packno:{})", uint8_t(protocal), packno);
 
- 	std::lock_guard<std::mutex> lock(m_mtx);
 
- 	for (auto it = m_task.begin(); it != m_task.end(); it++)
 
- 	{
 
- 		auto p = *it;
 
- 		if (p->packno == packno && p->protocal == protocal)
 
- 		{
 
- 			m_task.erase(it);
 
- 			p->join();
 
- 		}
 
- 		return TRUE;
 
- 	}
 
- 	return FALSE;
 
- }
 
- BOOL CSuperTCPClient::ReSend()
 
- {
 
- 	if (m_task.size() == 0) return TRUE;
 
- 	std::lock_guard<std::mutex> lock(m_mtx);
 
- 	if (m_task.size() > 0)
 
- 	{
 
- 		auto& it = m_task.front();
 
- 		auto ret = Send(it->pdata, it->nlen) != SOCKET_ERROR;
 
- 		if (ret)
 
- 		{
 
- 			m_pFather->sendlen += ret;
 
- 			m_pFather->sendcount++;
 
- 		}
 
- 		else
 
- 			m_pFather->sendfail++;
 
- 		return ret;
 
- 	}
 
- 	return FALSE;
 
- }
 
- BOOL CSuperTCPClient::HandleSubNotify(LPHJDATAHEAD2 lpHead, char* json, int json_len)
 
- {
 
- 	Object obj;
 
- 	obj.parse(string(json, json_len));
 
- 	if (obj.empty()) return FALSE;
 
- 	auto momp = obj.get<string>("momp");
 
- 	if (momp.empty()) return FALSE;
 
- 	string imei; int idx;
 
- 	CMonitorObjectMng::Instance()->MOMP2IMEI(momp, imei, idx);
 
- 	return CAppService::Instance()->GetHandle()->SendMsgToDevice(imei.c_str());
 
- }
 
- void CSuperTCPClient::ProcessPack(LPBYTE pPack, int nPackLen)
 
- {
 
- 	//解析
 
- 	if (!CHjDataConver::conver_recvpack(pPack, nPackLen))
 
- 		return;
 
- 	LPHJDATAHEAD2 lpHead = (LPHJDATAHEAD2)pPack;
 
- 	if (lpHead->protocol == E_ZL_PROTOCAL::ZL_KEEP)
 
- 		return ;
 
- 	//SPDLOG_INFO("recv {}:{} : {}", m_strServerIP, m_nServerPort, CSimpleLog::GetHexString2(pPack, nPackLen));
 
- 	BOOL bRet = FALSE;
 
- 	switch (lpHead->protocol)
 
- 	{
 
- 	case E_ZL_PROTOCAL::ZL_MO:
 
- 	case E_ZL_PROTOCAL::ZL_MP:
 
- 	case E_ZL_PROTOCAL::ZL_RESIST_DATA:
 
- 	case E_ZL_PROTOCAL::ZL_RESIST_REAL:
 
- 	case E_ZL_PROTOCAL::ZL_TEMP_HUMI:
 
- 	case E_ZL_PROTOCAL::ZL_ALARM_SET:
 
- 	case E_ZL_PROTOCAL::ZL_ALARM_DATA:
 
- 	case E_ZL_PROTOCAL::ZL_ALARM_ACK:
 
- 	case E_ZL_PROTOCAL::ZL_ALARM_HANDLE:
 
- 	case E_ZL_PROTOCAL::ZL_SVG:
 
- 	case E_ZL_PROTOCAL::ZL_REFER:
 
- 	case E_ZL_PROTOCAL::ZL_MOVE:
 
- 	case E_ZL_PROTOCAL::ZL_ALARM_UNACK:
 
- 		bRet = RecvPack(lpHead->packno, lpHead->protocol);  //发出确认包
 
- 		break;
 
- 	case E_ZL_PROTOCAL::DL_SUB_NOTIFY:
 
- 		HandleSubNotify(lpHead, (char*)pPack + sizeof(HJDATAHEAD2), lpHead->len);
 
- 		break;
 
- 	case E_ZL_PROTOCAL::DL_SYNC:
 
- 		CAppService::Instance()->SetSycnSuperTime();
 
- 		break;
 
- 	case E_ZL_PROTOCAL::DL_ALARM_ACK:
 
- 	case E_ZL_PROTOCAL::DL_ALARM_HANDLE:
 
- 		break;
 
- 	default:
 
- 		CSimpleLog::Warn(fmt::format("收到未实现的命令:{}", uint8_t(lpHead->protocol)).c_str());
 
- 		break;
 
- 	}
 
- }
 
 
  |