main.cpp 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287
  1. ////////////////////////////////////////////////////////////////////////////////////////////////////
  2. //
  3. #include <stdio.h>
  4. #include <signal.h>
  5. #include <linux/limits.h>
  6. #include <sys/socket.h>
  7. #include <arpa/inet.h>
  8. #include <linux/if_arp.h>
  9. #include <netinet/in.h>
  10. #include <ifaddrs.h>
  11. #include <sys/ioctl.h>
  12. #include <fcntl.h>
  13. #include <unistd.h>
  14. #include <getopt.h>
  15. #include <gfa/svc/common/strutil.h>
  16. #include <gfa/svc/common/fileutil.h>
  17. #include <gfa/svc/common/instance.h>
  18. #include <gfa/svc/common/processclock.h>
  19. #include <gfa/svc/common/debug.h>
  20. #include <gfa/svc/mqttcl/mqttclient.h>
  21. #include <gfa/svc/mqttcl/mqttcfg.h>
  22. #include <gfa/svc/mqttcl/mqttdbg.h>
  23. #include "projal.h"
  24. ////////////////////////////////////////////////////////////////////////////////////////////////////
  25. #if _ENABLE_MEM_TRACE
  26. #include <mcheck.h>
  27. class CMtrace
  28. {
  29. public:
  30. CMtrace(void) {
  31. putenv("MALLOC_TRACE=/home/wrk/share/config/services/Debug/Desktop_Qt_5_7_0_GCC_64bit/mqttcl/mtrace.log");
  32. mtrace();
  33. }
  34. ~CMtrace(void){
  35. // muntrace();
  36. }
  37. };
  38. #endif // _ENABLE_MEM_TRACE
  39. #if _TRACK_TIMES
  40. static CProcessClock g_pc;
  41. unsigned long g_nDbgCounter1 = 0;
  42. unsigned long g_nDbgCounter2 = 0;
  43. unsigned long g_nDbgCounter3 = 0;
  44. #endif // _TRACK_TIMES
  45. ////////////////////////////////////////////////////////////////////////////////////////////////////
  46. // app control
  47. #define _APPID GFA_APPCTRL_APPID_MQTTCL
  48. #define _APPNAME "MqttCl"
  49. #define _DEPENDENCIES ((appid_t)(GFA_APPCTRL_APPID_REMANENT))
  50. ////////////////////////////////////////////////////////////////////////////////////////////////////
  51. //
  52. #define _UPDATE_INTERVAL_MS 100
  53. #define _RECONN_INTERVAL_MS 1000
  54. #define _LOGFILE_NAME "mqttcl.log"
  55. ////////////////////////////////////////////////////////////////////////////////////////////////////
  56. #define _SIG_BLOCK(s) sigprocmask(SIG_BLOCK, (s), NULL)
  57. #define _SIG_UNBLOCK(s) sigprocmask(SIG_UNBLOCK, (s), NULL)
  58. #define _NSEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_MS)
  59. #define _USEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_US)
  60. #define _TOPIC_CTRL_KEY_BINLE "binLe"
  61. #define _TOPIC_CTRL_KEY_BINBE "binBe"
  62. #define _TOPIC_CTRL_KEY_JSON "json"
  63. #define _TOPIC_CTRL_KEY_PBUF "pBuf"
  64. #define _TOPIC_CTRL_KEY_QOS "qos"
  65. #define _TOPIC_CTRL_KEY_RETAIN "retained"
  66. #define _TOPIC_CTRL_KEY_REM_RETAINED "delRetained"
  67. #define _TOPIC_CMD_CTRL "CONTROL"
  68. #define _TOPIC_CMD_SET "SET"
  69. #define _TOPIC_CMD_STATUS "STATUS"
  70. #define _CONNECT_MAX_RETRIES(err) (((err) == MOSQ_ERR_EAI) ? 30 : 3)
  71. ////////////////////////////////////////////////////////////////////////////////////////////////////
  72. //
  73. typedef enum _TOPIC_CTRL_CMD
  74. {
  75. TCC_Control,
  76. TCC_SetBinLe,
  77. TCC_SetBinBe,
  78. TCC_SetJson,
  79. TCC_SetPBuf,
  80. TCC_Status
  81. }TOPIC_CTRL_CMD, *LPTOPIC_CTRL_CMD;
  82. typedef const TOPIC_CTRL_CMD *LPCTOPIC_CTRL_CMD;
  83. ////////////////////////////////////////////////////////////////////////////////////////////////////
  84. //
  85. typedef enum _MQTT_CLIENT_STATES // when modifying, don't forget to adjust g_pszStateNames!!!
  86. {
  87. CLS_NotInit,
  88. CLS_SetLastWill,
  89. CLS_SetTLS,
  90. CLS_Unconnected,
  91. CLS_Connect,
  92. CLS_Reconnect,
  93. CLS_Connecting,
  94. CLS_Connected,
  95. CLS_Subscribe,
  96. CLS_Subscribing,
  97. CLS_Subscribed,
  98. CLS_PublishConnect,
  99. CLS_ProcMsg,
  100. CLS_PublishDisconnect,
  101. CLS_Err,
  102. CLS_ShutDown,
  103. CLS_Unsubscribe,
  104. CLS_Disconnect,
  105. CLS_Cleanup,
  106. CLS_Paused,
  107. CLS_Exit
  108. }MQTT_CLIENT_STATES, *LPMQTT_CLIENT_STATES;
  109. typedef const MQTT_CLIENT_STATES *LPCMQTT_CLIENT_STATES;
  110. static const char *g_pszStateNames[] =
  111. {
  112. "Not Init",
  113. "Set Last Will",
  114. "Set TLS",
  115. "Unconnected",
  116. "Connect",
  117. "Reconnect",
  118. "Connecting",
  119. "Connected",
  120. "Subscribe",
  121. "Subscribing",
  122. "Subscribed",
  123. "Publish Connect",
  124. "ProcMsg",
  125. "Publish Disconnect",
  126. "Error",
  127. "ShutDown",
  128. "Unsubscribe",
  129. "Disconnect",
  130. "Cleanup",
  131. "Paused",
  132. "Exit"
  133. };
  134. static const char * _GetClientStateString(MQTT_CLIENT_STATES cs)
  135. {
  136. if(cs >= CLS_NotInit && cs <= CLS_Exit)
  137. return g_pszStateNames[cs];
  138. return "";
  139. }
  140. ////////////////////////////////////////////////////////////////////////////////////////////////////
  141. //
  142. typedef struct _SUB_CTRL_TOPIC
  143. {
  144. _SUB_CTRL_TOPIC(const std::string &&s) : sTopic(s)
  145. {
  146. this->nVarOffset = s.length() - 2;
  147. }
  148. std::string sTopic;
  149. size_t nVarOffset;
  150. }SUB_CTRL_TOPIC, *LPSUB_CTRL_TOPIC;
  151. typedef const SUB_CTRL_TOPIC *LPCSUB_CTRL_TOPIC;
  152. ////////////////////////////////////////////////////////////////////////////////////////////////////
  153. //
  154. //static std::string _CreateDeviceID(void);
  155. static volatile bool g_fRun = false;
  156. static volatile bool g_fPauseImp = false;
  157. static volatile bool g_fPauseCmd = false;
  158. static volatile bool g_fZombie = false;
  159. static appid_t g_nDepRunning = 0;
  160. static sigset_t g_set;
  161. static CLogfile g_lf;
  162. static int g_nLastSig = -1;
  163. static shm_t g_shmShadow;
  164. static MQTT_CLIENT_STATES g_cs = CLS_NotInit;
  165. static MQTT_CLIENT_STATES g_csLast = CLS_NotInit;
  166. static bool g_bConnected = false;
  167. static int g_nSubcribed = 0;
  168. static bool g_bIntr = false;
  169. static int g_nErrRetries = 0;
  170. ////////////////////////////////////////////////////////////////////////////////////////////////////
  171. //
  172. static const char* _GetBaseDir(std::string &rstrBaseDir)
  173. {
  174. char szBaseDir[PATH_MAX];
  175. rstrBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
  176. rtrim(rstrBaseDir, "/");
  177. return rstrBaseDir.c_str();
  178. }
  179. ////////////////////////////////////////////////////////////////////////////////////////////////////
  180. //
  181. static void _SigHandler(int sig)
  182. {
  183. g_nLastSig = sig;
  184. g_bIntr = true;
  185. g_fPauseImp = g_fPauseCmd = g_fZombie = false;
  186. }
  187. ////////////////////////////////////////////////////////////////////////////////////////////////////
  188. //
  189. static MQTT_CLIENT_STATES _cl_usleep(unsigned int t, MQTT_CLIENT_STATES csNext)
  190. {
  191. if(usleep(t) < 0 && errno == EINTR)
  192. return CLS_ShutDown;
  193. return csNext;
  194. }
  195. ////////////////////////////////////////////////////////////////////////////////////////////////////
  196. //
  197. static void _ProcessIncoming(CMqttClient &rcl, CMqttVarTable &vt, CMqttClConfig &cfg, LPCSUB_CTRL_TOPIC pCtrlMap)
  198. {
  199. CMqttVar *pVar;
  200. CMqttMessage *pMsg;
  201. std::string sVarPath;
  202. while((pMsg = rcl.PopRcvMsg()))
  203. {
  204. if(pMsg->TopicMatchesSub(pCtrlMap[TCC_Control].sTopic.c_str()))
  205. {
  206. sVarPath = pMsg->GetTopic(pCtrlMap[TCC_Control].nVarOffset);
  207. if((pVar = vt.Find(sVarPath.c_str())))
  208. {
  209. bool bChanged = false;
  210. int nType, nQos;
  211. CJson_t jtRoot, jtVal;
  212. std::string err;
  213. uint32_t nMaskOn = 0, nMaskOff = 0;
  214. if(pMsg->GetPayloadAsJSON(jtRoot, err))
  215. {
  216. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_RETAIN, jtVal))
  217. {
  218. switch((nType = jtVal.Type()))
  219. {
  220. case JSON_TRUE:
  221. bChanged = pVar->SetRetained(true) || bChanged;
  222. break;
  223. case JSON_FALSE:
  224. bChanged = pVar->SetRetained(false) || bChanged;
  225. break;
  226. default:
  227. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_RETAIN, nType);
  228. break;
  229. }
  230. }
  231. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_REM_RETAINED, jtVal))
  232. {
  233. switch((nType = jtVal.Type()))
  234. {
  235. case JSON_TRUE:
  236. pVar->RemoveRetained(cfg.GetTopicPrefix(), nullptr, rcl.GetMsgQueueSnd(), true);
  237. break;
  238. case JSON_FALSE:
  239. g_lf.Warning("%s: command \"%s\":false has no effect!\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED);
  240. break;
  241. default:
  242. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED, nType);
  243. break;
  244. }
  245. }
  246. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_QOS, jtVal))
  247. {
  248. switch((nType = jtVal.Type()))
  249. {
  250. case JSON_INTEGER:
  251. nQos = (int)json_integer_value(jtVal);
  252. if(nQos < MQTTCL_MIN_QOS)
  253. g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MIN_QOS);
  254. else if(nQos > MQTTCL_MAX_QOS)
  255. g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MAX_QOS);
  256. bChanged = pVar->SetQoS(nQos) || bChanged;
  257. break;
  258. default:
  259. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_QOS, nType);
  260. break;
  261. }
  262. }
  263. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINLE, jtVal))
  264. {
  265. switch((nType = jtVal.Type()))
  266. {
  267. case JSON_TRUE:
  268. nMaskOn |= MQTT_VALUE_BINLE;
  269. break;
  270. case JSON_FALSE:
  271. nMaskOff |= MQTT_VALUE_BINLE;
  272. break;
  273. default:
  274. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINLE, nType);
  275. break;
  276. }
  277. }
  278. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINBE, jtVal))
  279. {
  280. switch((nType = jtVal.Type()))
  281. {
  282. case JSON_TRUE:
  283. nMaskOn |= MQTT_VALUE_BINBE;
  284. break;
  285. case JSON_FALSE:
  286. nMaskOff |= MQTT_VALUE_BINBE;
  287. break;
  288. default:
  289. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINBE, nType);
  290. break;
  291. }
  292. }
  293. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_JSON, jtVal))
  294. {
  295. switch((nType = jtVal.Type()))
  296. {
  297. case JSON_TRUE:
  298. nMaskOn |= MQTT_VALUE_JSON;
  299. break;
  300. case JSON_FALSE:
  301. nMaskOff |= MQTT_VALUE_JSON;
  302. break;
  303. default:
  304. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_JSON, nType);
  305. break;
  306. }
  307. }
  308. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_PBUF, jtVal))
  309. {
  310. switch((nType = jtVal.Type()))
  311. {
  312. case JSON_TRUE:
  313. nMaskOn |= MQTT_VALUE_PBUF;
  314. break;
  315. case JSON_FALSE:
  316. nMaskOff |= MQTT_VALUE_PBUF;
  317. break;
  318. default:
  319. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_PBUF, nType);
  320. break;
  321. }
  322. }
  323. if(nMaskOff)
  324. {
  325. if(pVar->DisablePublish(nMaskOff, &vt))
  326. {
  327. bChanged = true;
  328. }
  329. }
  330. if(nMaskOn)
  331. {
  332. if(pVar->EnablePublish(nMaskOn, &vt))
  333. {
  334. bChanged = true;
  335. }
  336. }
  337. #if _DUMP_ENABLED_VARS
  338. if(bChanged)
  339. vt.DumpPubEnabled();
  340. #endif // _DUMP_ENABLED_VARS
  341. }
  342. else
  343. {
  344. g_lf.Error("%s: JSON error: %s!\n", pMsg->GetTopic().c_str(), err.c_str());
  345. }
  346. }
  347. }
  348. else
  349. {
  350. int nLocks = 0;
  351. static const uint32_t nFormats[] =
  352. {
  353. 0,
  354. MQTT_VALUE_BINLE,
  355. MQTT_VALUE_BINBE,
  356. MQTT_VALUE_JSON,
  357. MQTT_VALUE_PBUF
  358. };
  359. for(int i = TCC_SetBinLe; i <= TCC_SetPBuf; i++)
  360. {
  361. if(pMsg->TopicMatchesSub(pCtrlMap[i].sTopic.c_str()))
  362. {
  363. sVarPath = pMsg->GetTopic(pCtrlMap[i].nVarOffset);
  364. if((pVar = vt.Find(sVarPath.c_str())))
  365. {
  366. if(pVar->PublishEnabled())
  367. {
  368. if(!pVar->SetShmValue(nFormats[i], pMsg, nLocks))
  369. {
  370. // !!!
  371. }
  372. }
  373. else
  374. {
  375. // !!!
  376. }
  377. break;
  378. }
  379. else
  380. {
  381. // !!!
  382. break;
  383. }
  384. }
  385. }
  386. }
  387. pMsg->Release();
  388. }
  389. }
  390. ////////////////////////////////////////////////////////////////////////////////////////////////////
  391. //
  392. static int _ProcessOutgoing(CMqttClient &rcl)
  393. {
  394. int nRet = 0;
  395. CMqttMessage *pMsg;
  396. while((pMsg = rcl.PopSndMsg()))
  397. {
  398. rcl.publish(pMsg);
  399. pMsg->Release();
  400. ++nRet;
  401. }
  402. return nRet;
  403. }
  404. ////////////////////////////////////////////////////////////////////////////////////////////////////
  405. //
  406. static int _Subscribe(CMqttClient &rcl, int nDefaultQOS, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
  407. {
  408. int nRet;
  409. for(size_t i = 0; i < nLenMap; i++)
  410. {
  411. if((nRet = rcl.subscribe(NULL, pCtrlMap[i].sTopic.c_str(), nDefaultQOS)) != MOSQ_ERR_SUCCESS)
  412. break;
  413. TRACE("Subscribed: '%s' - QOS: %d\n", pCtrlMap[i].sTopic.c_str(), nDefaultQOS);
  414. }
  415. return nRet;
  416. }
  417. static void _Unsubscribe(CMqttClient &rcl, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
  418. {
  419. for(size_t i = 0; i < nLenMap; i++)
  420. {
  421. rcl.unsubscribe(NULL, pCtrlMap[i].sTopic.c_str());
  422. }
  423. }
  424. ////////////////////////////////////////////////////////////////////////////////////////////////////
  425. //
  426. static void _OnClientEvents(LPCMQTT_GENERIC_NOTIFICATION pntf, void *pctx)
  427. {
  428. switch(pntf->evt)
  429. {
  430. case NEVT_Log:
  431. if(pntf->log.level == MOSQ_LOG_ERR)
  432. {
  433. TRACE("[%d] - %s\n", pntf->log.level, pntf->log.str);
  434. g_lf.Error("%s\n", pntf->log.str);
  435. }
  436. break;
  437. case NEVT_Connect:
  438. if(pntf->con.rc == MOSQ_ERR_SUCCESS)
  439. g_bConnected = true;
  440. break;
  441. case NEVT_Disconnect:
  442. break;
  443. case NEVT_Subscribe:
  444. ++g_nSubcribed;
  445. break;
  446. case NEVT_Unsubscribe:
  447. break;
  448. default:
  449. break;
  450. }
  451. }
  452. ////////////////////////////////////////////////////////////////////////////////////////////////////
  453. static void _ProcessCtrlMessages(HAPPCTRL hAC, HAPPINFO hAI)
  454. {
  455. ctrlmsg_t nCtrlMsg;
  456. while(!g_bIntr && (nCtrlMsg = ::GfaIpcAppCtrlGetNextCtrlMsg(hAI)))
  457. {
  458. switch(nCtrlMsg)
  459. {
  460. case GFA_APPCTRL_CTRLMSG_STOP:
  461. g_bIntr = true;
  462. g_fPauseImp = false;
  463. g_fPauseCmd = false;
  464. g_fZombie = false;
  465. g_lf.Info("Received Control Message 'Stop'\n");
  466. break;
  467. case GFA_APPCTRL_CTRLMSG_PAUSE:
  468. if(!g_fPauseCmd)
  469. {
  470. g_fPauseCmd = true;
  471. if(!g_fPauseImp)
  472. {
  473. ::GfaIpcAppCtrlSetState(hAC, GIAS_Paused);
  474. g_lf.Info("Received Control Message 'Pause'\n");
  475. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
  476. TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
  477. }
  478. }
  479. break;
  480. case GFA_APPCTRL_CTRLMSG_RESUME:
  481. if(g_fPauseCmd)
  482. {
  483. g_fPauseCmd = false;
  484. if(!g_fPauseImp)
  485. {
  486. g_lf.Info("Received Control Message 'Resume'\n");
  487. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  488. ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
  489. TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  490. }
  491. }
  492. break;
  493. default:
  494. break;
  495. }
  496. }
  497. }
  498. ////////////////////////////////////////////////////////////////////////////////////////////////////
  499. static void _ProcessStateEvents(HAPPCTRL hAC, HAPPINFO hAI)
  500. {
  501. appid_t nAppIdSrc;
  502. bool fOldPaused = g_fPauseImp;
  503. char szDispName[128];
  504. while(!g_bIntr && (nAppIdSrc = ::GfaIpcAppCtrlGetNextStateEvtSrc(hAI)))
  505. {
  506. GfaIpcAppStates state = ::GfaIpcAppCtrlGetState(hAC, nAppIdSrc);
  507. GfaIpcAppCtrlGetDisplayName(hAC, nAppIdSrc, szDispName, sizeof(szDispName));
  508. TRACE("%-8s: State: %s\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  509. if(nAppIdSrc & _DEPENDENCIES)
  510. {
  511. if(state == GIAS_Running)
  512. {
  513. g_lf.Info("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  514. g_nDepRunning |= nAppIdSrc;
  515. }
  516. else
  517. {
  518. g_lf.Warning("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  519. g_nDepRunning &= ~nAppIdSrc;
  520. }
  521. }
  522. }
  523. if(!g_bIntr)
  524. {
  525. g_fPauseImp = (g_nDepRunning != _DEPENDENCIES);
  526. if(!g_fPauseCmd && (fOldPaused != g_fPauseImp))
  527. {
  528. fOldPaused = g_fPauseImp;
  529. GfaIpcAppStates newState = g_fPauseImp ? GIAS_Paused : GIAS_Running;
  530. ::GfaIpcAppCtrlSetState(hAC, newState);
  531. if(g_fPauseImp)
  532. g_lf.Warning("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
  533. else
  534. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
  535. }
  536. }
  537. }
  538. static std::string _GetTopicPrefixString(const CMqttClConfig &cfg)
  539. {
  540. if(cfg.TopicPrefixDisabled())
  541. return "";
  542. return formatString("%s/", cfg.GetTopicPrefix());
  543. }
  544. ////////////////////////////////////////////////////////////////////////////////////////////////////
  545. ////////////////////////////////////////////////////////////////////////////////////////////////////
  546. ////////////////////////////////////////////////////////////////////////////////////////////////////
  547. //
  548. int main(int /*argc*/, char **/*argv*/)
  549. {
  550. int nRet = 0;
  551. CProcessInstance pi;
  552. std::string sDevID;
  553. char szLogFile[PATH_MAX];
  554. std::string strBaseDir;
  555. const char *pszBaseDir = NULL;
  556. HAPPCTRL hAC = NULL;
  557. HAPPINFO hAI;
  558. HSHM hShm = NULL;
  559. void *pShm = NULL;
  560. unsigned long long nUsecWorkTime = 0;
  561. int nTlsMode;
  562. CProcessClock pcWork;
  563. ////////////////////////////////////////////////////////////////////////////////////////////////
  564. // check for multiple instances
  565. if(!pi.LockInstance(UUID_SHM))
  566. {
  567. CLogfile::StdErr("Failed to start instance!\n");
  568. return -1;
  569. }
  570. ////////////////////////////////////////////////////////////////////////////////////////////////
  571. // configure signal handling
  572. struct sigaction sa;
  573. ::sigfillset(&g_set);
  574. sigaddset(&g_set, SIGUSR1);
  575. memset(&sa, 0, sizeof(sa));
  576. sa.sa_handler = _SigHandler;
  577. sigaction(SIGHUP, &sa, NULL); // handles user's terminal disconnect
  578. sigaction(SIGQUIT, &sa, NULL); // handles Ctrl + '\'
  579. sigaction(SIGTERM, &sa, NULL); // handles normal termination
  580. sigaction(SIGABRT, &sa, NULL); // handles abnormal termination (i.e. abort())
  581. sigaction(SIGINT, &sa, NULL); // handles Ctrl + 'C'
  582. sa.sa_handler = SIG_IGN;
  583. sigaction(SIGTSTP, &sa, NULL); // ignores Ctrl + 'Z'
  584. sigaction(SIGSTOP, &sa, NULL); // ignores Stop
  585. sigaction(SIGCONT, &sa, NULL); // ignores Continue
  586. sigaction(SIGCHLD, &sa, NULL); // ignores child process termination
  587. sigaction(0, &sa, NULL); // ignores shell termination
  588. do
  589. {
  590. g_fZombie = true;
  591. ////////////////////////////////////////////////////////////////////////////////////////////
  592. // get the base directory for output files
  593. if(!pszBaseDir)
  594. pszBaseDir = _GetBaseDir(strBaseDir);
  595. CLogfile::StdOut("Using base directory \"%s\".\n", pszBaseDir);
  596. ////////////////////////////////////////////////////////////////////////////////////////////
  597. // initialize log file
  598. sprintf(szLogFile, "%s/%s", pszBaseDir, _LOGFILE_NAME);
  599. if(!g_lf.Open(szLogFile))
  600. {
  601. CLogfile::StdErr("Failed to create/open log file!\n");
  602. nRet = -1;
  603. break;
  604. }
  605. g_lf.Info("Process started.\n");
  606. ////////////////////////////////////////////////////////////////////////////////////////////
  607. // initialize app control
  608. g_lf.Info("Acquire AppCtrl-Handle.\n");
  609. if(!(hAC = ::GfaIpcAppCtrlAcquire(_APPID, _APPNAME, _USEC_FROM_MSEC(_UPDATE_INTERVAL_MS), _USEC_FROM_MSEC(_RECONN_INTERVAL_MS) * 3)))
  610. {
  611. g_lf.Error("Failed to acquire AppCtrl-Handle!\n");
  612. break;
  613. }
  614. ::GfaIpcAppCtrlSetState(hAC, GIAS_Initializing);
  615. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Initializing));
  616. if(!::GfaIpcAppCtrlSubscribeStateEvents(hAC, _DEPENDENCIES))
  617. {
  618. g_lf.Error("Failed to subscribe state event notifications!\n");
  619. break;
  620. }
  621. ////////////////////////////////////////////////////////////////////////////////////////////
  622. // parse the config file
  623. CMqttClConfig cfg(UUID_SHM);
  624. #ifdef MQTTCL_CONFIG_FILE_PATH
  625. std::string strMqttCfg = MQTTCL_CONFIG_FILE_PATH;
  626. #else // MQTTCL_CONFIG_FILE_PATH
  627. char szBaseDir[PATH_MAX];
  628. ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
  629. std::string strMqttCfg = formatString("%s/cfg/mqttcl.cfg.json", szBaseDir);
  630. #endif // MQTTCL_CONFIG_FILE_PATH
  631. if(!cfg.LoadCfg(strMqttCfg.c_str(), g_lf))
  632. {
  633. nRet = -1;
  634. break;
  635. }
  636. nTlsMode = cfg.GetTLSMode();
  637. // TRACE("%s/%s\n", cfg.GetDeviceID(), cfg.GetShmID());
  638. std::string strTopicPrefix = _GetTopicPrefixString(cfg);
  639. ////////////////////////////////////////////////////////////////////////////////////////////
  640. // client control topic map
  641. const SUB_CTRL_TOPIC subCtrlMap[] =
  642. {
  643. formatString("%s%s/#", strTopicPrefix.c_str(), _TOPIC_CMD_CTRL),
  644. formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_BINLE, _TOPIC_CMD_SET),
  645. formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_BINBE, _TOPIC_CMD_SET),
  646. formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_JSON, _TOPIC_CMD_SET),
  647. formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_PBUF, _TOPIC_CMD_SET),
  648. // formatString("%s%s/#", strTopicPrefix.c_str(), _TOPIC_CMD_STATUS)
  649. };
  650. ////////////////////////////////////////////////////////////////////////////////////////////
  651. if(!(hShm = ::acquire_shm(sizeof(shm_t), 1)))
  652. {
  653. g_lf.Error("GfaIpcAcquireSHM failed!\n");
  654. break;
  655. }
  656. g_lf.Info("Acquired SHM Handle.\n");
  657. if(!(pShm = ::GfaIpcAcquirePointer(hShm)))
  658. {
  659. g_lf.Error("GfaIpcAcquirePointer failed!\n");
  660. break;
  661. }
  662. g_lf.Info("Acquired SHM Pointer.\n");
  663. ::GfaIpcDumpSHMROT();
  664. memcpy(&g_shmShadow, (const shm_t*)pShm, sizeof(shm_t));
  665. ////////////////////////////////////////////////////////////////////////////////////////////
  666. int nErr, nLocked = 0, nNumConn = 0;
  667. bool bReconnect, bConnPending;
  668. std::string strErr;
  669. CMqttVarTable vtbl;
  670. CShm_t shm(pShm, &g_shmShadow, hShm, NULL, -1, 0, cfg.GetDefaultQOS(), cfg.GetDefaultRetain());
  671. shm.InitPath(NULL, NULL);
  672. shm.CreateMembersTable(vtbl);
  673. #if _DUMP_ENABLED_VARS
  674. vtbl.DumpPubEnabled();
  675. #endif // _DUMP_ENABLED_VARS
  676. ////////////////////////////////////////////////////////////////////////////////////////////
  677. CMqttClient mqttCl(cfg.GetDeviceID());
  678. mqttCl.SetClientEventCallback(_OnClientEvents, NEVT_Connect | NEVT_Disconnect | NEVT_Subscribe | NEVT_Unsubscribe | NEVT_Message | NEVT_Log, NULL);
  679. g_fZombie = false;
  680. g_fRun = true;
  681. ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
  682. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  683. while(g_fRun)
  684. {
  685. ////////////////////////////////////////////////////////////////////////////////////////
  686. // update app control info
  687. if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsecWorkTime)))
  688. {
  689. _ProcessCtrlMessages(hAC, hAI);
  690. _ProcessStateEvents(hAC, hAI);
  691. }
  692. if(g_fPauseImp || g_fPauseCmd)
  693. {
  694. if(g_cs < CLS_ShutDown)
  695. {
  696. g_csLast = g_cs;
  697. g_cs = CLS_ShutDown;
  698. }
  699. else
  700. {
  701. nUsecWorkTime = 0;
  702. }
  703. }
  704. pcWork.ClockTrigger();
  705. switch(g_cs)
  706. {
  707. case CLS_NotInit:
  708. if((nErr = CMqttClient::Init()) == MOSQ_ERR_SUCCESS)
  709. {
  710. g_cs = CLS_SetTLS;
  711. }
  712. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  713. {
  714. g_csLast = g_cs;
  715. g_cs = CLS_Err;
  716. }
  717. break;
  718. case CLS_SetTLS:
  719. if(nTlsMode == MQTTCL_TLS_MODE_CRT)
  720. {
  721. g_lf.Info("Using TLS with certificates.\n");
  722. if((nErr = mqttCl.tls_set(cfg.GetTlsCaCrtFile(), NULL, cfg.GetTlsClCrtFile(), cfg.GetTlsClKeyFile(), NULL)) != MOSQ_ERR_SUCCESS)
  723. {
  724. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  725. if(g_bIntr)
  726. {
  727. g_csLast = g_cs;
  728. g_cs = CLS_ShutDown;
  729. }
  730. else
  731. {
  732. g_csLast = g_cs;
  733. g_cs = CLS_Err;
  734. }
  735. }
  736. else
  737. {
  738. g_cs = CLS_SetLastWill;
  739. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  740. }
  741. }
  742. else if(nTlsMode == MQTTCL_TLS_MODE_PSK)
  743. {
  744. g_lf.Info("Using TLS with PSK.\n");
  745. if((nErr = mqttCl.tls_psk_set(cfg.GetTlsPSK(), cfg.GetDeviceID(), NULL)) != MOSQ_ERR_SUCCESS)
  746. {
  747. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  748. if(g_bIntr)
  749. {
  750. g_csLast = g_cs;
  751. g_cs = CLS_ShutDown;
  752. }
  753. else
  754. {
  755. g_csLast = g_cs;
  756. g_cs = CLS_Err;
  757. }
  758. }
  759. else
  760. {
  761. g_cs = CLS_SetLastWill;
  762. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  763. }
  764. }
  765. else
  766. {
  767. g_cs = CLS_SetLastWill;
  768. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  769. }
  770. break;
  771. case CLS_SetLastWill:
  772. if(cfg.HasLastWill())
  773. {
  774. std::string strTopic = formatString("%s%s", strTopicPrefix.c_str(), cfg.GetLastWillTopic());
  775. if((nErr = mqttCl.will_set(strTopic.c_str(), cfg.GetLastWillMessageLength(), cfg.GetLastWillMessage(), cfg.GetLastWillQOS(), cfg.GetLastWillRetain())) != MOSQ_ERR_SUCCESS)
  776. {
  777. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  778. }
  779. }
  780. g_cs = CLS_Unconnected;
  781. break;
  782. case CLS_Unconnected:
  783. if(g_bConnected)
  784. {
  785. g_bConnected = false;
  786. g_lf.Warning("Lost connection to broker @ %s:%u. Tying to reconnect ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  787. }
  788. if(!nNumConn)
  789. g_cs = CLS_Connect;
  790. else
  791. g_cs = CLS_Reconnect;
  792. break;
  793. case CLS_Connect:
  794. if((nErr = mqttCl.connect(cfg.GetBrokerAddr(), cfg.GetBrokerPort(), cfg.GetKeepAliveTime())) == MOSQ_ERR_SUCCESS)
  795. g_cs = CLS_Connecting;
  796. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  797. {
  798. TRACE("CLS_Connect: %s\n", strErr.c_str());
  799. if(bConnPending)
  800. g_cs = CLS_Connecting;
  801. else if(bReconnect)
  802. {
  803. g_csLast = g_cs;
  804. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  805. }
  806. else if(g_bIntr)
  807. {
  808. g_csLast = g_cs;
  809. g_cs = CLS_ShutDown;
  810. }
  811. else
  812. {
  813. if(++g_nErrRetries > _CONNECT_MAX_RETRIES(nErr))
  814. {
  815. g_csLast = g_cs;
  816. g_cs = CLS_Err;
  817. }
  818. else
  819. {
  820. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  821. }
  822. }
  823. }
  824. break;
  825. case CLS_Reconnect:
  826. if((nErr = mqttCl.reconnect()) == MOSQ_ERR_SUCCESS)
  827. g_cs = CLS_Connecting;
  828. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  829. {
  830. TRACE("CLS_Reconnect: %s\n", strErr.c_str());
  831. if(bConnPending)
  832. g_cs = CLS_Connecting;
  833. else if(bReconnect)
  834. {
  835. g_csLast = g_cs;
  836. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  837. }
  838. else if(g_bIntr)
  839. {
  840. g_csLast = g_cs;
  841. g_cs = CLS_ShutDown;
  842. }
  843. else
  844. {
  845. if(++g_nErrRetries > _CONNECT_MAX_RETRIES(nErr))
  846. {
  847. g_csLast = g_cs;
  848. g_cs = CLS_Err;
  849. }
  850. else
  851. {
  852. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  853. }
  854. }
  855. }
  856. break;
  857. case CLS_Connecting:
  858. g_nErrRetries = 0;
  859. if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  860. {
  861. TRACE("CLS_Connecting: %s\n", strErr.c_str());
  862. if(bReconnect)
  863. {
  864. g_csLast = g_cs;
  865. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), CLS_Unconnected);
  866. }
  867. else if(g_bIntr)
  868. {
  869. g_csLast = g_cs;
  870. g_cs = CLS_ShutDown;
  871. }
  872. else if(!bConnPending)
  873. {
  874. g_csLast = g_cs;
  875. g_cs = CLS_Err;
  876. }
  877. }
  878. else if(g_bConnected)
  879. {
  880. g_cs = CLS_Connected;
  881. }
  882. break;
  883. case CLS_Connected:
  884. TRACE("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  885. g_lf.Info("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  886. ++nNumConn;
  887. g_nSubcribed = 0;
  888. g_cs = CLS_Subscribe;
  889. break;
  890. case CLS_Subscribe:
  891. g_lf.Info("Subscribing control-topics ...\n");
  892. if((nErr = _Subscribe(mqttCl, cfg.GetDefaultQOS(), subCtrlMap, _COUNTOF(subCtrlMap))) == MOSQ_ERR_SUCCESS)
  893. g_cs = CLS_Subscribing;
  894. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  895. {
  896. if(bConnPending)
  897. g_cs = CLS_Connecting;
  898. else if(bReconnect)
  899. g_cs = CLS_Unconnected;
  900. else if(g_bIntr)
  901. {
  902. g_csLast = g_cs;
  903. g_cs = CLS_ShutDown;
  904. }
  905. else
  906. {
  907. g_csLast = g_cs;
  908. g_cs = CLS_Err;
  909. }
  910. }
  911. break;
  912. case CLS_Subscribing:
  913. if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  914. {
  915. if(bReconnect)
  916. g_cs = CLS_Unconnected;
  917. else if(bConnPending)
  918. g_cs = CLS_Connecting;
  919. else if(g_bIntr)
  920. {
  921. g_csLast = g_cs;
  922. g_cs = CLS_ShutDown;
  923. }
  924. else
  925. {
  926. g_csLast = g_cs;
  927. g_cs = CLS_Err;
  928. }
  929. }
  930. else if(g_nSubcribed == _COUNTOF(subCtrlMap))
  931. {
  932. g_cs = CLS_Subscribed;
  933. }
  934. break;
  935. case CLS_Subscribed:
  936. g_lf.Info("Subscriptions acknowledged.\n");
  937. g_lf.Info("Enter SHM processing loop ...\n");
  938. g_cs = CLS_PublishConnect;
  939. break;
  940. case CLS_PublishConnect:
  941. if(cfg.HasConnectMsg())
  942. {
  943. std::string strTopic = formatString("%s%s", strTopicPrefix.c_str(), cfg.GetConnectTopic());
  944. CMqttMessage *pMsg = CMqttMessage::CreateMessage(strTopic.c_str(), cfg.GetConnectMessage(), cfg.GetConnectMessageLength(), cfg.GetConnectQOS(), cfg.GetConnectRetain());
  945. mqttCl.publish(pMsg);
  946. pMsg->Release();
  947. }
  948. g_cs = CLS_ProcMsg;
  949. break;
  950. case CLS_ProcMsg:
  951. if(mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  952. {
  953. #if _TRACK_TIMES
  954. std::string s1, s2;
  955. pc_time64_t elapsed;
  956. g_nDbgCounter1 = g_nDbgCounter2 = g_nDbgCounter3 = 0;
  957. g_pc.ClockTrigger();
  958. #endif // _TRACK_TIMES
  959. _ProcessIncoming(mqttCl, vtbl, cfg, subCtrlMap);
  960. #if _TRACK_TIMES
  961. if(g_nDbgCounter1)
  962. {
  963. elapsed = g_pc.ClockGetElapsed();
  964. s1 = CProcessClock::Interval2String(elapsed);
  965. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter1);
  966. TRACE("_ProcessIncoming (%lu variables): %s (%s per var)\n", g_nDbgCounter1, s1.c_str(), s2.c_str());
  967. }
  968. g_pc.ClockTrigger();
  969. #endif // _TRACK_TIMES
  970. _SIG_BLOCK(&g_set);
  971. vtbl.CheckShmAndPublish(cfg.GetTopicPrefix(), nullptr, mqttCl.GetMsgQueueSnd(), nLocked);
  972. _SIG_UNBLOCK(&g_set);
  973. #if _TRACK_TIMES
  974. g_nDbgCounter2 = mqttCl.GetMsgQueueSnd().Size();
  975. if(g_nDbgCounter2)
  976. {
  977. elapsed = g_pc.ClockGetElapsed();
  978. s1 = CProcessClock::Interval2String(elapsed);
  979. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter2);
  980. TRACE("CheckShmAndPublish (%lu variables): %s (%s per var)\n", g_nDbgCounter2, s1.c_str(), s2.c_str());
  981. g_pc.ClockTrigger();
  982. }
  983. g_nDbgCounter3 =
  984. #endif // _TRACK_TIMES
  985. _ProcessOutgoing(mqttCl);
  986. #if _TRACK_TIMES
  987. if(g_nDbgCounter3)
  988. {
  989. elapsed = g_pc.ClockGetElapsed();
  990. s1 = CProcessClock::Interval2String(elapsed);
  991. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter3);
  992. TRACE("_ProcessOutgoing (%lu variables): %s (%s per var)\n", g_nDbgCounter3, s1.c_str(), s2.c_str());
  993. }
  994. #endif // _TRACK_TIMES
  995. }
  996. else
  997. {
  998. if(bReconnect)
  999. g_cs = CLS_Unconnected;
  1000. else if(bConnPending)
  1001. g_cs = CLS_Connecting;
  1002. else if(g_bIntr)
  1003. {
  1004. g_csLast = g_cs;
  1005. g_cs = CLS_PublishDisconnect;
  1006. }
  1007. else
  1008. {
  1009. g_csLast = g_cs;
  1010. g_cs = CLS_Err;
  1011. }
  1012. }
  1013. break;
  1014. case CLS_Err:
  1015. g_lf.Error("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
  1016. TRACE("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
  1017. g_fZombie = true;
  1018. g_cs = CLS_PublishDisconnect;
  1019. break;
  1020. case CLS_PublishDisconnect:
  1021. if(cfg.HasLastWillOnExit())
  1022. {
  1023. std::string strTopic = formatString("%s%s", strTopicPrefix.c_str(), cfg.GetLastWillTopic());
  1024. CMqttMessage *pMsg = CMqttMessage::CreateMessage(strTopic.c_str(), cfg.GetLastWillOnExitMessage(), cfg.GetLastWillOnExitMessageLength(), cfg.GetLastWillQOS(), cfg.GetLastWillRetain());
  1025. mqttCl.publish(pMsg);
  1026. pMsg->Release();
  1027. bool bDummy = false;
  1028. mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bDummy, bDummy, bDummy, strErr);
  1029. }
  1030. g_cs = CLS_Disconnect;
  1031. break;
  1032. case CLS_ShutDown:
  1033. if(g_bIntr && g_nLastSig >= 0)
  1034. {
  1035. g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
  1036. g_nLastSig = -1;
  1037. }
  1038. g_lf.Info("Process shutting down ...\n");
  1039. if(g_csLast >= CLS_Subscribed)
  1040. g_cs = CLS_Unsubscribe;
  1041. else if(g_csLast >= CLS_Connected)
  1042. g_cs = CLS_Disconnect;
  1043. else if(g_csLast > CLS_NotInit)
  1044. g_cs = CLS_Cleanup;
  1045. else if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
  1046. g_cs = CLS_Paused;
  1047. else
  1048. g_cs = CLS_Exit;
  1049. break;
  1050. case CLS_Unsubscribe:
  1051. g_lf.Info("Unsubscribe control-topics.\n");
  1052. _Unsubscribe(mqttCl, subCtrlMap, _COUNTOF(subCtrlMap));
  1053. g_nSubcribed = 0;
  1054. g_cs = CLS_Disconnect;
  1055. break;
  1056. case CLS_Disconnect:
  1057. g_lf.Info("Disconnect from broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  1058. mqttCl.disconnect();
  1059. g_bConnected = false;
  1060. g_cs = CLS_Cleanup;
  1061. break;
  1062. case CLS_Cleanup:
  1063. g_lf.Info("Mqtt clean up.\n");
  1064. CMqttClient::Cleanup();
  1065. if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
  1066. g_cs = CLS_Paused;
  1067. else
  1068. g_cs = CLS_Exit;
  1069. break;
  1070. case CLS_Paused:
  1071. if(!g_fPauseImp && !g_fPauseCmd)
  1072. {
  1073. if(!g_bIntr)
  1074. g_cs = CLS_NotInit;
  1075. else
  1076. g_cs = CLS_Exit;
  1077. }
  1078. else
  1079. {
  1080. usleep(_USEC_FROM_MSEC(_UPDATE_INTERVAL_MS));
  1081. continue;
  1082. }
  1083. break;
  1084. case CLS_Exit:
  1085. g_fRun = false;
  1086. break;
  1087. }
  1088. nUsecWorkTime = pcWork.ClockGetElapsed() / 1000;
  1089. }
  1090. }
  1091. while(false);
  1092. ////////////////////////////////////////////////////////////////////////////////////////////////
  1093. if(hShm)
  1094. {
  1095. if(pShm)
  1096. {
  1097. g_lf.Info("Release SHM Pointer.\n");
  1098. ::GfaIpcReleasePointer(hShm, pShm);
  1099. }
  1100. g_lf.Info("Release SHM Handle.\n");
  1101. ::GfaIpcReleaseSHM(hShm);
  1102. }
  1103. if(g_fZombie)
  1104. {
  1105. if(hAC)
  1106. ::GfaIpcAppCtrlSetState(hAC, GIAS_Zombie);
  1107. TRACE("Enter Zombie state ...\n");
  1108. g_lf.Warning("Enter Zombie state ...\n");
  1109. g_lf.Flush();
  1110. pause();
  1111. if(g_nLastSig >= 0)
  1112. g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
  1113. }
  1114. if(hAC)
  1115. {
  1116. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Terminating));
  1117. ::GfaIpcAppCtrlSetState(hAC, GIAS_Terminating);
  1118. g_lf.Info("Releasing App Control ...\n");
  1119. ::GfaIpcAppCtrlRelease(hAC);
  1120. }
  1121. g_lf.Info("Process exit.\n\n");
  1122. g_lf.Close();
  1123. CLogfile::StdErr("MqttCl exit.\n");
  1124. return nRet;
  1125. }