| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 | #pragma once#include <thread>#include <HjDataDefine.h>#include <map>#include <mutex>#include <ReceiveBuffer.h>#include "libwebsockets.h"#include <rapidjson/stringbuffer.h>#include <rapidjson/writer.h>#include <rapidjson/pointer.h>#include <list>#include <vector>#define  MAX_PAYLOAD_SIZE 20480extern time_t g_stStart;typedef struct tagHist_Query{	lws* wsi = nullptr;	time_t tmStart = 0;	time_t tmEnd = 0;	string imei;	string mo_mp;	short idx = 0;	string type;}HISTORY_QUERY, *LPHISTORY_QUERY;/** * 会话上下文对象,结构根据需要自定义 */struct per_session_data {	lws* wsi = nullptr;	lws_context* context = nullptr;	//lws_protocols* protocol;	uint32_t send_count = 0;   //发送	uint32_t send_fail_count = 0; 	uint32_t recv_count = 0;   //接收	uint64_t send_size = 0;	uint64_t recv_size = 0;	time_t tmConnect = 0;      //链接时间	time_t tmLastHeart = 0;		//最后心跳时间	char	ip[32];		   //链接的IP	//char	name[32];		//链接的主机名称	uint16_t port = 0;			//连接的端口	uint16_t refer = 0;			//引用数	bool isLogin = false;			//是否登录	char token[38] = { 0 }; ;			//登录的token	char username[50] = { 0 };		//登录的用户名	char node[50] = { 0 };	//bool bin;			//是否为二进制	//bool fin;			//是否为结束包	std::list<std::string>  m_lstSubReal;  //订阅实时数据列表	std::thread  *thread_hist = nullptr;  //历史数据发送线程	bool  bWork = false;	bool  bBlock = false;  //历史数据包阻塞	int	 SendHistResist(lws* wsi, string mo_mp, time_t start, time_t end, std::map<time_t, int>& data0, std::map<time_t, int>& data1, std::map<time_t, int>& data2);	int	 SendHistResistForEcharts(lws* wsi, string mo_mp, time_t start, time_t end, std::map<time_t, int>& data0, std::map<time_t, int>& data1, std::map<time_t, int>& data2);	int	 SendHistResistDB(LPHISTORY_QUERY);	int	 SendHistResistDBForEcharts(LPHISTORY_QUERY);		per_session_data()	{		memset(ip, 0, sizeof(ip));		memset(token, 0, sizeof(token));		memset(username, 0, sizeof(username));		memset(node, 0, sizeof(node));	}	~per_session_data()	{		bWork = false;		if (thread_hist)		{			thread_hist->join();			delete thread_hist;			thread_hist = nullptr;		}	}};enum class LWS_SEND_PACK_TYPE{	LSPT_UNKOWN = 0,	LSPT_ALARM_EVENT,	LSPT_TRAFFIC_FLOW,	LSPT_PLATE_DETECT,	LSPT_BOLT_DETECT,};class CLWSServer{public:	CLWSServer();	~CLWSServer();	public:	friend class CLNHandle;	BOOL	Start(uint16_t port);	void	Stop();	BOOL	SendPackToALLClient(const uint8_t*, int len, LWS_SEND_PACK_TYPE type = LWS_SEND_PACK_TYPE::LSPT_UNKOWN, lws_write_protocol protocol = lws_write_protocol::LWS_WRITE_TEXT);	BOOL	SendPackToALLClient_with_noEncode(const uint8_t*, int len, LWS_SEND_PACK_TYPE type = LWS_SEND_PACK_TYPE::LSPT_UNKOWN, lws_write_protocol protocol = lws_write_protocol::LWS_WRITE_TEXT);	int		GetLwsSessionNum();	void	Establishe(lws*);	void	Close(lws*);	void	Destroy(lws*);	void	Recv(lws*, void* in, size_t len);	void	GetSessionDesc(std::map<DWORD_PTR, CString>& session_desc);	CReceiveBuffer*	GetBuffer() { return pBuffer; };	static uint32_t	FindJsonPack(const char * CH, uint32_t len);//协议处理	int		HandleLogin(lws*, rapidjson::Document&);	int		HandleSubNotify(lws*, rapidjson::Document&, std::string& imei_idx);	int		HanldeUnsubNotify(lws*, rapidjson::Document&);	int		HandleQueryHist(lws*, rapidjson::Document&);	int		HandleQueryHistConfirm(lws*, rapidjson::Document&);	int		HandleConfRead(lws*, rapidjson::Document&);	int		HandleConfWrite(lws*, rapidjson::Document&);	//受理报警	int		HandleAlarmAck(lws*, rapidjson::Document&, std::string&, std::string&);	//处理报警	int		HandleAlarmHandle(lws*, rapidjson::Document&, std::string&, std::string&, std::string&);//发送部分	void	SendRealResistData(const string& imei_idx, const int num, const std::vector<int>& vctData0,		const std::vector<int>& vctData1, const std::vector<int>& vctData2, const std::vector<bool>& vctResult, const CTime& atime);	void	SendHumiTemp(const char* imei, const COleDateTime& dt, int humi, int temp);	int		SendUnAckAlarm(lws* wsi);    bool	GeneralResistData(const string mo_mp, const int num, const int index, const std::vector<int>& vctData0,		const std::vector<int>& vctData1, const std::vector<int>& vctData2, const std::vector<bool>& vctResult, const CTime& atime, rapidjson::StringBuffer& strBuf);private:	std::thread *m_pThread = nullptr;	//数据接收缓冲区	CReceiveBuffer* pBuffer = nullptr;	bool m_work = false;	static void ThreadProcForQueryHist(LPHISTORY_QUERY);	static void ThreadProcForQueryHistDB(LPHISTORY_QUERY);	static void ThreadProc(DWORD_PTR, uint16_t);	//客户端链接	std::map<lws*, per_session_data> m_mapLwsSession;	std::mutex  m_mtxSession;	std::mutex  m_mtxRecv;	std::mutex  m_mtxSend;};
 |