datalogger.cpp 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352
  1. #include <string.h>
  2. #include <malloc.h>
  3. #include <limits.h>
  4. #include <signal.h>
  5. #include <sys/stat.h>
  6. #include "fileutil.h"
  7. #include "strutil.h"
  8. #include "datalogger.h"
  9. #include "processclock.h"
  10. #include "debug.h"
  11. /////////////////////////////////////////////////////////////////////////////
  12. #define _USE_MODIFIED_INDEX 1
  13. #define _TRACK_TIME 1
  14. #define _FILE_SIZE_DELETE_MARGIN_PERCENT 10
  15. #define _SIZE_GUARD_TIMESTAMP_FILE_NAME "sguard.ts"
  16. #define _INVALID_TIMESTAMP_VALUE ((time_t)-1)
  17. /////////////////////////////////////////////////////////////////////////////
  18. CDataLogger::CDataLogger(LPCDLPARAMS pdlp, CLogfile &rlf) : m_lf(rlf),
  19. m_tidSGThread(0),
  20. m_bSGHasSizeLimitPrerequisites(false),
  21. m_bSGInProgress(false),
  22. m_bSGConfigured(false),
  23. m_nSGCurPassUTC(0),
  24. m_nSGLastPassUTC(0),
  25. m_condmtx1(PTHREAD_MUTEX_INITIALIZER),
  26. m_cond1(PTHREAD_COND_INITIALIZER),
  27. m_nLastLogTimestamp(0),
  28. m_bBadDateLogsDetected(false)
  29. {
  30. memset(&m_dlp, 0, sizeof(m_dlp));
  31. memset(&m_gv, 0, sizeof(m_gv));
  32. memset(&m_mtx, 0, sizeof(m_mtx));
  33. memset(&m_mutexAttr, 0, sizeof(m_mutexAttr));
  34. if(pdlp)
  35. memcpy(&m_dlp, pdlp, sizeof(m_dlp));
  36. if(!(m_bSGConfigured = !!m_dlp.nMaxSize || !!m_dlp.nMaxAge))
  37. m_lf.Error("Size guard not configured.\n");
  38. if(m_dlp.pszBaseDir)
  39. strncpy(m_szAppDir, m_dlp.pszBaseDir, sizeof(m_szAppDir) - 1);
  40. else
  41. ::GetAppDirectory(m_szAppDir, sizeof(m_szAppDir));
  42. ::pthread_mutexattr_init(&m_mutexAttr);
  43. ::pthread_mutexattr_settype(&m_mutexAttr, PTHREAD_MUTEX_RECURSIVE);
  44. ::pthread_mutex_init(&m_mtx, &m_mutexAttr);
  45. }
  46. /////////////////////////////////////////////////////////////////////////////
  47. CDataLogger::~CDataLogger(void)
  48. {
  49. Release();
  50. ::pthread_mutex_destroy(&m_mtx);
  51. ::pthread_mutexattr_destroy(&m_mutexAttr);
  52. ::pthread_cond_destroy(&m_cond1);
  53. }
  54. /////////////////////////////////////////////////////////////////////////////
  55. void CDataLogger::Release(void)
  56. {
  57. if(m_tidSGThread)
  58. {
  59. ::pthread_mutex_lock(&m_condmtx1);
  60. bool bSgInProg = m_bSGInProgress;
  61. ::pthread_mutex_unlock(&m_condmtx1);
  62. if(bSgInProg)
  63. CLogfile::StdErr("\nDatalogger is terminating. This may take some time ...\nPlease wait, before powering off the device!\n");
  64. Lock();
  65. m_lf.Lock();
  66. ::pthread_cancel(m_tidSGThread);
  67. m_lf.Unlock();
  68. Unlock();
  69. ::pthread_join(m_tidSGThread, NULL);
  70. m_tidSGThread = 0;
  71. }
  72. }
  73. /////////////////////////////////////////////////////////////////////////////
  74. unsigned long CDataLogger::GetTagID(const char *pszVarPath, int nDataType, int nLogType)
  75. {
  76. CMySqlDB db;
  77. const char *pszID = NULL;
  78. char *pszEndptr;
  79. unsigned long nID = ULONG_MAX;
  80. std::string sSql;
  81. sSql = formatString("select `tagid` from `%s` where `path` = '%s' and `dataType` = %d and `logType` = %d", m_dlp.szTagsTable, pszVarPath, nDataType, nLogType);
  82. if(!db.Connect("localhost", m_dlp.szDBUser, m_dlp.szDBPass, m_dlp.szDBName))
  83. {
  84. m_lf.Error("CDataLogger::GetTagID: DB Error: %s\n", db.LastError().c_str());
  85. return nID;
  86. }
  87. CMySqlResult res = db.Query(sSql.c_str());
  88. bool bError = res.error();
  89. if(bError)
  90. m_lf.Error("CDataLogger::GetTagID: DB Error: %s\n", db.LastError().c_str());
  91. else
  92. {
  93. my_ulonglong nCount = res.RowCount();
  94. if(nCount > 0)
  95. {
  96. MYSQL_ROW pRow = res.FetchRow();
  97. pszID = pRow[0];
  98. nID = strtoul(pszID, &pszEndptr, 10);
  99. }
  100. else
  101. {
  102. sSql = formatString("insert into `%s` (`dataType`, `logType`, `path`) values(%d, %d, '%s')", m_dlp.szTagsTable, nDataType, nLogType, pszVarPath);
  103. res = db.Query(sSql.c_str());
  104. bError = res.error();
  105. if(bError)
  106. m_lf.Error("DB Error: %s\n", db.LastError().c_str());
  107. else
  108. {
  109. sSql = formatString("select `tagid` from `%s` where `path`='%s'", m_dlp.szTagsTable, pszVarPath);
  110. CMySqlResult res = db.Query(sSql.c_str());
  111. bool bError = res.error();
  112. if(bError)
  113. m_lf.Error("DB Error: %s\n", db.LastError().c_str());
  114. else if(res.RowCount() > 0)
  115. {
  116. MYSQL_ROW pRow = res.FetchRow();
  117. pszID = pRow[0];
  118. nID = strtoul(pszID, &pszEndptr, 10);
  119. }
  120. }
  121. }
  122. }
  123. return nID;
  124. }
  125. /////////////////////////////////////////////////////////////////////////////
  126. bool CDataLogger::TableFileExists(const char *pszTableName)
  127. {
  128. char szDataFile[PATH_MAX];
  129. sprintf(szDataFile, "%s%s/%s.ibd", m_gv.szDataDir, m_dlp.szDBName, pszTableName);
  130. return !!FileExist(szDataFile);
  131. }
  132. /////////////////////////////////////////////////////////////////////////////
  133. int64_t CDataLogger::TableFileSize(const char *pszTableName)
  134. {
  135. struct stat statbuf;
  136. char szDataFile[PATH_MAX];
  137. sprintf(szDataFile, "%s%s/%s.ibd", m_gv.szDataDir, m_dlp.szDBName, pszTableName);
  138. if(stat(szDataFile, &statbuf) == -1)
  139. return 0;
  140. return (int64_t)statbuf.st_size;
  141. }
  142. /////////////////////////////////////////////////////////////////////////////
  143. bool CDataLogger::QueryServerVariable(CMySqlDB &rdb, const char *pszVarname, CMySqlVar &val)
  144. {
  145. if(!pszVarname || !*pszVarname)
  146. {
  147. m_lf.Error("CDataLogger::QueryServerVariable: No Variable name!\n");
  148. return false;
  149. }
  150. bool bError;
  151. std::string sSql;
  152. sSql = formatString("select @@%s;", pszVarname);
  153. CMySqlResult res = rdb.Query(sSql.c_str());
  154. bError = res.error();
  155. if(!bError)
  156. {
  157. bError = true;
  158. do
  159. {
  160. my_ulonglong nRowCount = res.RowCount();
  161. unsigned int nFldCount = res.FieldCount();
  162. const MYSQL_FIELD *pFields = res.FetchFields();
  163. if(nRowCount != 1 || nFldCount != 1 || !pFields)
  164. break;
  165. MYSQL_ROW pRow = res.FetchRow();
  166. if(!pRow || !*pRow || !**pRow)
  167. break;
  168. bError = !val.FromField(pFields[0], pRow[0]);
  169. }
  170. while(false);
  171. }
  172. return !bError;
  173. }
  174. /////////////////////////////////////////////////////////////////////////////
  175. #if 0
  176. bool CDataLogger::QueryStatusVariable(CMySqlDB &rdb, const char *pszVarname, CMySqlVar &val)
  177. {
  178. if(!pszVarname || !*pszVarname)
  179. {
  180. m_lf.Error("CDataLogger::QueryStatusVariable: No Variable name!\n");
  181. return false;
  182. }
  183. bool bError;
  184. std::string sSql;
  185. sSql = formatString("show session status like '%s'", pszVarname);
  186. CMySqlResult res = rdb.Query(sSql.c_str());
  187. bError = res.error();
  188. if(!bError)
  189. {
  190. bError = true;
  191. do
  192. {
  193. my_ulonglong nRowCount = res.RowCount();
  194. unsigned int nFldCount = res.FieldCount();
  195. const MYSQL_FIELD *pFields = res.FetchFields();
  196. if(nRowCount != 1 || nFldCount != 2 || !pFields)
  197. break;
  198. MYSQL_ROW pRow = res.FetchRow();
  199. if(!pRow || !pRow[1] || !*pRow[1])
  200. break;
  201. bError = !val.FromField(pFields[1], pRow[1]);
  202. }
  203. while(false);
  204. }
  205. return !bError;
  206. }
  207. #endif
  208. /////////////////////////////////////////////////////////////////////////////
  209. bool CDataLogger::ReadGlobalOptions(CMySqlDB &rdb)
  210. {
  211. CMySqlVar val;
  212. memset(&m_gv, 0, sizeof(m_gv));
  213. if(QueryServerVariable(rdb, "datadir", val))
  214. {
  215. memset(m_gv.szDataDir, 0, sizeof(m_gv.szDataDir));
  216. if(!val.IsNull())
  217. val.CopyStrVal(m_gv.szDataDir, sizeof(m_gv.szDataDir) - 1, 0);
  218. }
  219. if(QueryServerVariable(rdb, "innodb_file_per_table", val))
  220. {
  221. if(!val.IsNull())
  222. m_gv.bInnoDbFilePerTable = val;
  223. }
  224. if(QueryServerVariable(rdb, "innodb_strict_mode", val))
  225. {
  226. if(!val.IsNull())
  227. m_gv.bInnoDbIsStrictMode = val;
  228. }
  229. if(QueryServerVariable(rdb, "innodb_file_format", val))
  230. {
  231. memset(m_gv.szInnoDbFileFormat, 0, sizeof(m_gv.szInnoDbFileFormat));
  232. if(!val.IsNull())
  233. {
  234. val.CopyStrVal(m_gv.szInnoDbFileFormat, sizeof(m_gv.szInnoDbFileFormat) - 1, 0);
  235. m_gv.bInnoDbIsBarracuda = !strcasecmp(m_gv.szInnoDbFileFormat, "barracuda");
  236. }
  237. }
  238. return true;
  239. }
  240. /////////////////////////////////////////////////////////////////////////////
  241. void CDataLogger::SizeGuardTrigger(time_t ts)
  242. {
  243. if(m_bSGConfigured && m_tidSGThread)
  244. {
  245. ::pthread_mutex_lock(&m_condmtx1);
  246. if( !m_bSGInProgress &&
  247. !SizeGuardDayWorkDone(ts))
  248. {
  249. unsigned long long nMnUTC = (unsigned long long)_MIDNIGHT_TIMESTAMP_UTC(ts);
  250. if(SizeGuardLastPassRead() != nMnUTC)
  251. {
  252. m_nSGCurPassUTC = ts;
  253. ::pthread_cond_signal(&m_cond1);
  254. }
  255. else
  256. m_nSGLastPassUTC = nMnUTC;
  257. }
  258. ::pthread_mutex_unlock(&m_condmtx1);
  259. }
  260. }
  261. /////////////////////////////////////////////////////////////////////////////
  262. #if 0
  263. bool CDataLogger::QueryTableSizes(CMySqlDB &rdb, const char *pszTableName)
  264. {
  265. bool bError;
  266. char *pszEndptr;
  267. MYSQL_ROW pRow;
  268. uint64_t nDataLength = 0, nDataFree = 0, nIndexLength = 0, nFreeExtents, nTotalExtents;
  269. std::string sSql;
  270. sSql = formatString("select `DATA_LENGTH`, `INDEX_LENGTH`, `DATA_FREE` from `information_schema`.`TABLES` where (`TABLE_SCHEMA` = '%s') AND (`TABLE_NAME` = '%s');", m_dlp.szDBName, pszTableName);
  271. CMySqlResult res = rdb.Query(sSql.c_str());
  272. bError = res.error();
  273. if(bError)
  274. {
  275. m_lf.Error("CDataLogger::QueryTableSizes: DB Error: %s\n", rdb.LastError().c_str());
  276. return false;
  277. }
  278. pRow = res.FetchRow();
  279. nDataLength = strtoull(pRow[0], &pszEndptr, 10);
  280. nIndexLength = strtoull(pRow[1], &pszEndptr, 10);
  281. nDataFree = strtoull(pRow[2], &pszEndptr, 10);
  282. //////////////////////////////////////////////////////////////////////////////////////////////
  283. sSql = formatString("select `FREE_EXTENTS`, `TOTAL_EXTENTS`, `DATA_FREE` from `information_schema`.`FILES` where `FILE_NAME` like '%%demo/%s.ibd';", pszTableName);
  284. CMySqlResult res2 = rdb.Query(sSql.c_str());
  285. bError = res2.error();
  286. if(bError)
  287. {
  288. m_lf.Error("CDataLogger::QueryTableSizes: DB Error: %s\n", rdb.LastError().c_str());
  289. return false;
  290. }
  291. pRow = res2.FetchRow();
  292. nFreeExtents = strtoull(pRow[0], &pszEndptr, 10);
  293. nTotalExtents = strtoull(pRow[1], &pszEndptr, 10);
  294. int64_t nFileSize = TableFileSize(pszTableName);
  295. m_lf.Info("Data length: %ju, Data free: %ju, Total extents: %ju, Free extents: %ju, File size: %ju.\n", nDataLength + nIndexLength, nDataFree, nTotalExtents, nFreeExtents, nFileSize);
  296. TRACE("Data length: %ju, Data free: %ju, Total extents: %ju, Free extents: %ju, File size: %ju.\n", nDataLength + nIndexLength, nDataFree, nTotalExtents, nFreeExtents, nFileSize);
  297. return true;
  298. }
  299. #endif
  300. /////////////////////////////////////////////////////////////////////////////
  301. bool CDataLogger::CheckTable(CMySqlDB &rdb, const char *pszTableName, bool &bExists, bool &bUseResortTable, bool &bIsInnoDB)
  302. {
  303. bool bError;
  304. std::string sSql;
  305. const char *pszEngine = NULL;
  306. uint64_t nDataLength = 0, nIndexLength = 0;
  307. int64_t nFileSize = -1;
  308. bExists = bUseResortTable = false;
  309. bIsInnoDB = false;
  310. sSql = formatString("select `ENGINE`, `DATA_LENGTH`, `INDEX_LENGTH` from `information_schema`.`TABLES` where (`TABLE_SCHEMA` = '%s') AND (`TABLE_NAME` = '%s')", m_dlp.szDBName, pszTableName);
  311. CMySqlResult res1 = rdb.Query(sSql.c_str());
  312. bError = res1.error();
  313. if(bError)
  314. {
  315. m_lf.Error("CDataLogger::CheckTables: DB Error: %s\n", rdb.LastError().c_str());
  316. return false;
  317. }
  318. else
  319. {
  320. my_ulonglong nCount = res1.RowCount();
  321. if(nCount > 0)
  322. {
  323. MYSQL_ROW pRow = res1.FetchRow();
  324. unsigned int nFc = res1.FieldCount();
  325. const MYSQL_FIELD *pFields = res1.FetchFields();
  326. CMySqlVar vals[3];
  327. bExists = true;
  328. for(unsigned int i = 0; i < nFc; i++)
  329. {
  330. vals[i].FromField(pFields[i], pRow[i]);
  331. if(!vals[i].FieldnameCmp("ENGINE"))
  332. {
  333. pszEngine = vals[i].StrVal();
  334. bIsInnoDB = !strcasecmp(pszEngine, "InnoDB");
  335. }
  336. else if(!vals[i].FieldnameCmp("DATA_LENGTH"))
  337. nDataLength = (uint64_t)vals[i];
  338. else if(!vals[i].FieldnameCmp("INDEX_LENGTH"))
  339. nIndexLength = (uint64_t)vals[i];
  340. }
  341. if(bIsInnoDB && m_gv.bInnoDbFilePerTable)
  342. {
  343. if(!TableFileExists(pszTableName))
  344. m_lf.Warning("'innodb_file_per_table' configured, but table '%s' does not have a data file!\n", pszTableName);
  345. else
  346. nFileSize = TableFileSize(pszTableName);
  347. }
  348. if(nFileSize >= 0)
  349. m_lf.Info("Found info on table '%s': Engine: '%s', Data size: %s, Index size: %s, File size: %s\n", pszTableName, pszEngine ? pszEngine : "unknown", strFormatByteSize(nDataLength, 1).c_str(), strFormatByteSize(nIndexLength, 1).c_str(), strFormatByteSize(nFileSize, 1).c_str());
  350. else
  351. m_lf.Info("Found info on table '%s': Engine: '%s', Data size: %s, Index size: %s\n", pszTableName, pszEngine ? pszEngine : "unknown", strFormatByteSize(nDataLength, 1).c_str(), strFormatByteSize(nIndexLength, 1).c_str());
  352. if(!pszEngine)
  353. m_lf.Warning("Failed to determine the Database engine of Table '%s'!\n", pszTableName);
  354. else if(strcmp(pszEngine, _DL_DATABSE_ENGINE_NAME))
  355. m_lf.Warning("Current Database engine of Table '%s' is '%s'! Please consider to migrate to '%s'!\n", pszTableName, pszEngine, _DL_DATABSE_ENGINE_NAME);
  356. sSql = formatString("select * from `%s` limit 0, 10", pszTableName);
  357. CMySqlResult res2 = rdb.Query(sSql.c_str()); // try to query table
  358. if(res2.error()) // if the query results in an error, the table may be corrupt!
  359. {
  360. char szTblName[256];
  361. m_lf.Error("Failed to query table '%s': %s\n", pszTableName, rdb.LastError().c_str());
  362. // if the table is corrupt, try to rename it and create a new instane of the original table.
  363. sprintf(szTblName, "%s_corrupt_%08zX", pszTableName, (size_t)time(NULL));
  364. sSql = formatString("rename table `%s` to `%s`", pszTableName, szTblName);
  365. CMySqlResult res3 = rdb.Query(sSql.c_str()); // try to rename the corrupt table
  366. if(res3.error())
  367. {
  368. // if the renaming fails, try to create a new working table with a different name
  369. m_lf.Error("Failed to rename table '%s': %s\n", pszTableName, rdb.LastError().c_str());
  370. bExists = false;
  371. bUseResortTable = true;
  372. }
  373. else
  374. {
  375. m_lf.Warning("Renamed table '%s' to '%s'\n", pszTableName, szTblName);
  376. bExists = false;
  377. }
  378. }
  379. }
  380. }
  381. return !bError;
  382. }
  383. /////////////////////////////////////////////////////////////////////////////
  384. bool CDataLogger::InitDatabase(bool bEnforceCreate)
  385. {
  386. CMySqlDB db;
  387. time_t nMaxLogTimestamp = m_nLastLogTimestamp = 0;
  388. bool bExists, bUseResortTable, bDummy;
  389. m_lf.Info("Connecting to Database server @ 'localhost'.\n");
  390. if(!db.Connect("localhost", m_dlp.szDBUser, m_dlp.szDBPass, NULL))
  391. {
  392. m_lf.Error("CDataLogger::InitDatabase: DB Error: %s\n", db.LastError().c_str());
  393. return false;
  394. }
  395. m_lf.Info("Success!\n");
  396. if(!ReadGlobalOptions(db))
  397. return false;
  398. if(!m_gv.bInnoDbIsBarracuda || !m_gv.bInnoDbFilePerTable || !m_gv.bInnoDbIsStrictMode)
  399. m_lf.Warning("InnoDB file format: '%s', InnoDB table space: '%s', InnoDB strict mode: %s.\n", m_gv.szInnoDbFileFormat, m_gv.bInnoDbFilePerTable ? "File per table" : "System", m_gv.bInnoDbIsStrictMode ? "yes" : "no");
  400. else
  401. m_lf.Info("InnoDB file format: '%s', InnoDB table space: '%s', InnoDB strict mode: %s.\n", m_gv.szInnoDbFileFormat, m_gv.bInnoDbFilePerTable ? "File per table" : "System", m_gv.bInnoDbIsStrictMode ? "yes" : "no");
  402. if(!CreateDatabase(db, bEnforceCreate))
  403. return false;
  404. m_lf.Info("Opening Database '%s'.\n", m_dlp.szDBName);
  405. if(db.SelectDB(m_dlp.szDBName))
  406. {
  407. m_lf.Error("CDataLogger::InitDatabase: DB Error: %s\n", db.LastError().c_str());
  408. return false;
  409. }
  410. m_lf.Info("Success!\n");
  411. if(!CheckTable(db, m_dlp.szTagsTable, bExists, bUseResortTable, bDummy))
  412. return false;
  413. if(!bExists)
  414. {
  415. if(bUseResortTable)
  416. {
  417. char szOrigTable[_DL_MAX_TABLE_NAME_LENGTH];
  418. memcpy(szOrigTable, m_dlp.szTagsTable, sizeof(szOrigTable));
  419. sprintf(m_dlp.szTagsTable, "%s_resort_%08zX", szOrigTable, (size_t)time(NULL));
  420. m_lf.Warning("Resorting to work-around table '%s'.\n", m_dlp.szTagsTable);
  421. }
  422. if(!CreateTagsTable(db))
  423. return false;
  424. }
  425. if(!AlterTagsTable(db)) // 23.10.2020, extend logtype enum if not up to date
  426. return false;
  427. if(!CheckTable(db, m_dlp.szLogsTable, bExists, bUseResortTable, m_gv.bLogsTblIsInnoDB))
  428. return false;
  429. if(!bExists)
  430. {
  431. if(bUseResortTable)
  432. {
  433. char szOrigTable[_DL_MAX_TABLE_NAME_LENGTH];
  434. memcpy(szOrigTable, m_dlp.szLogsTable, sizeof(szOrigTable));
  435. sprintf(m_dlp.szLogsTable, "%s_resort_%08zX", szOrigTable, (size_t)time(NULL));
  436. m_lf.Warning("Resorting to work-around table '%s'.\n", m_dlp.szLogsTable);
  437. }
  438. if(!CreateLogsTable(db))
  439. return false;
  440. #if _DL_DATABSE_ENGINE == _DL_DATABASE_ENGINE_INNODB
  441. m_gv.bLogsTblIsInnoDB = true;
  442. #endif // _DL_DATABSE_ENGINE == _DL_DATABASE_ENGINE_INNODB
  443. }
  444. if(m_bSGConfigured)
  445. {
  446. if(!(m_bSGHasSizeLimitPrerequisites = m_gv.bLogsTblIsInnoDB && m_gv.bInnoDbFilePerTable && TableFileExists(m_dlp.szLogsTable)))
  447. {
  448. m_lf.Warning("SG: Database configuration: InnoDB: %s, File-per-table tablespace: %s\n", m_gv.bLogsTblIsInnoDB ? "yes" : "no", m_gv.bInnoDbFilePerTable ? "yes" : "no");
  449. m_lf.Warning("SG: The current database configuration does not support monitoring of size limits. Size guard will operate on age limits only!\n");
  450. }
  451. ::pthread_mutex_lock(&m_condmtx1);
  452. if(::pthread_create(&m_tidSGThread, NULL, &CDataLogger::SizeGuardWorker, reinterpret_cast<void*>(this)))
  453. {
  454. m_tidSGThread = 0;
  455. m_lf.Error("SG: Failed to start size monitoring thread!\n");
  456. ::pthread_mutex_unlock(&m_condmtx1);
  457. return false;
  458. }
  459. ::pthread_cond_wait(&m_cond1, &m_condmtx1);
  460. ::pthread_mutex_unlock(&m_condmtx1);
  461. }
  462. if(!CreateLogsBDTable(db))
  463. return false;
  464. if(((nMaxLogTimestamp = GetLastLogTimestamp(db)) != _INVALID_TIMESTAMP_VALUE))
  465. m_nLastLogTimestamp = nMaxLogTimestamp;
  466. m_lf.Info("Database initialization complete.\n");
  467. return true;
  468. }
  469. /////////////////////////////////////////////////////////////////////////////
  470. bool CDataLogger::CreateDatabase(CMySqlDB &rdb, bool bEnforceCreate)
  471. {
  472. bool bError = false;
  473. std::string sSql;
  474. if(bEnforceCreate)
  475. {
  476. sSql = formatString("drop database if exists `%s`", m_dlp.szDBName);
  477. CMySqlResult res = rdb.Query(sSql.c_str());
  478. bError = res.error();
  479. if(bError)
  480. m_lf.Error("CDataLogger::InitDatabase: DB Error: %s\n", rdb.LastError().c_str());
  481. }
  482. if(!bError)
  483. {
  484. sSql = formatString("create database if not exists `%s`", m_dlp.szDBName);
  485. CMySqlResult res = rdb.Query(sSql.c_str());
  486. bError = res.error();
  487. if(bError)
  488. m_lf.Error("CDataLogger::InitDatabase: DB Error: %s\n", rdb.LastError().c_str());
  489. }
  490. return !bError;
  491. }
  492. /////////////////////////////////////////////////////////////////////////////
  493. bool CDataLogger::CreateTagsTable(CMySqlDB &rdb)
  494. {
  495. std::string sSql;
  496. const char *pszFormat =
  497. "CREATE TABLE IF NOT EXISTS `%s` (" \
  498. " `tagid` smallint(6) unsigned NOT NULL AUTO_INCREMENT," \
  499. " `dataType` enum('bool', 'I1', 'UI1', 'I2', 'UI2', 'I4', 'UI4', 'I8', 'UI8', 'float', 'double', 'string') NOT NULL," \
  500. " `logType` enum('IC', 'IU', 'VC', 'VU', 'ICR', 'IUR', 'VCR', 'VUR') NOT NULL," \
  501. " `path` varchar(%u) NOT NULL," \
  502. " PRIMARY KEY (`tagid`, `dataType`, `logType`)," \
  503. " KEY `path` (`path`)" \
  504. ") ENGINE=%s DEFAULT CHARSET=ascii";
  505. m_lf.Info("Creating Table '%s' (if not exists) using engine '%s'.\n", m_dlp.szTagsTable, _DL_DATABSE_ENGINE_NAME);
  506. sSql = formatString(pszFormat, m_dlp.szTagsTable, _DL_MAX_VARPATH_LENGTH, _DL_DATABSE_ENGINE_NAME);
  507. CMySqlResult res = rdb.Query(sSql.c_str());
  508. bool bError = res.error();
  509. if(bError)
  510. m_lf.Error("CDataLogger::CreateTagsTable: DB Error: %s\n", rdb.LastError().c_str());
  511. else
  512. m_lf.Info("Success!\n");
  513. return !bError;
  514. }
  515. bool CDataLogger::AlterTagsTable(CMySqlDB &rdb)
  516. {
  517. std::string sSql = formatString("ALTER TABLE `%s` MODIFY COLUMN `logType` enum('IC','IU','VC','VU','ICR','IUR','VCR','VUR') NOT NULL", m_dlp.szTagsTable);
  518. CMySqlResult res = rdb.Query(sSql.c_str());
  519. bool bError = res.error();
  520. if(bError)
  521. m_lf.Error("CDataLogger::AlterTagsTable: DB Error: %s\n", rdb.LastError().c_str());
  522. return !bError;
  523. }
  524. /////////////////////////////////////////////////////////////////////////////
  525. bool CDataLogger::CreateLogsTable(CMySqlDB &rdb)
  526. {
  527. std::string sSql;
  528. const char *pszFormat =
  529. #if _USE_MODIFIED_INDEX
  530. "CREATE TABLE IF NOT EXISTS `%s` (" \
  531. " `tagid` smallint(6) unsigned NOT NULL," \
  532. " `tslog` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP," \
  533. " `value` double," \
  534. " `valueMin` double DEFAULT NULL," \
  535. " `valueMax` double DEFAULT NULL," \
  536. " PRIMARY KEY (`tslog`, `tagid`)," \
  537. " KEY `tagid` (`tagid`)" \
  538. ") ENGINE=%s DEFAULT CHARSET=ascii ROW_FORMAT=COMPRESSED";
  539. #else // _USE_MODIFIED_INDEX
  540. "CREATE TABLE IF NOT EXISTS `%s` (" \
  541. " `tagid` smallint(6) unsigned NOT NULL," \
  542. " `tslog` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP," \
  543. " `value` double," \
  544. " `valueMin` double DEFAULT NULL," \
  545. " `valueMax` double DEFAULT NULL," \
  546. " PRIMARY KEY (`tagid`, `tslog`)," \
  547. " KEY `tslog` (`tslog`)" \
  548. ") ENGINE=%s DEFAULT CHARSET=ascii ROW_FORMAT=COMPRESSED";
  549. #endif // _USE_MODIFIED_INDEX
  550. m_lf.Info("Creating Table '%s' if not exists using engine '%s'.\n", m_dlp.szLogsTable, _DL_DATABSE_ENGINE_NAME);
  551. sSql = formatString(pszFormat, m_dlp.szLogsTable, _DL_DATABSE_ENGINE_NAME);
  552. CMySqlResult res = rdb.Query(sSql.c_str());
  553. bool bError = res.error();
  554. if(bError)
  555. m_lf.Error("CDataLogger::CreateLogsTable: DB Error: %s\n", rdb.LastError().c_str());
  556. else
  557. m_lf.Info("Success!\n");
  558. return !bError;
  559. }
  560. /////////////////////////////////////////////////////////////////////////////
  561. bool CDataLogger::CreateLogsBDTable(CMySqlDB &rdb)
  562. {
  563. std::string sSql;
  564. const char *pszFormat =
  565. "CREATE TABLE IF NOT EXISTS `%s` (" \
  566. " `id` int unsigned unsigned NOT NULL AUTO_INCREMENT," \
  567. " `tagid` smallint(6) unsigned NOT NULL," \
  568. " `tslog` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP," \
  569. " `value` double," \
  570. " `valueMin` double DEFAULT NULL," \
  571. " `valueMax` double DEFAULT NULL," \
  572. " PRIMARY KEY (`id`)," \
  573. " KEY `tslog` (`tslog`)," \
  574. " KEY `tagid` (`tagid`)" \
  575. ") ENGINE=%s DEFAULT CHARSET=ascii ROW_FORMAT=COMPRESSED";
  576. m_lf.Info("Creating Table '%s' if not exists using engine '%s'.\n", m_dlp.szLogsTableBD, _DL_DATABSE_ENGINE_NAME);
  577. sSql = formatString(pszFormat, m_dlp.szLogsTableBD, _DL_DATABSE_ENGINE_NAME);
  578. CMySqlResult res = rdb.Query(sSql.c_str());
  579. bool bError = res.error();
  580. if(bError)
  581. m_lf.Error("CDataLogger::CreateLogsBDTable: DB Error: %s\n", rdb.LastError().c_str());
  582. else
  583. m_lf.Info("Success!\n");
  584. return !bError;
  585. }
  586. /////////////////////////////////////////////////////////////////////////////
  587. bool CDataLogger::Log(unsigned long nTagID, double fValue, double fMin, double fMax, time_t nTimestamp, int nIndex, LogTypes lt, bool bNull, bool bNoBadDateCheck)
  588. {
  589. DL_LOG_ENTRY log;
  590. log.nTagID = nTagID;
  591. log.nTimestamp = nTimestamp;
  592. log.fValue = fValue;
  593. log.fMin = fMin;
  594. log.fMax = fMax;
  595. log.nIndex = nIndex;
  596. log.lt = lt;
  597. log.bNull = bNull;
  598. if(bNoBadDateCheck || (nTimestamp > m_nLastLogTimestamp))
  599. m_logs.push_back(log);
  600. else
  601. m_logsBD.push_back(log);
  602. return true;
  603. }
  604. /////////////////////////////////////////////////////////////////////////////
  605. time_t CDataLogger::GetLastLogTimestamp(CMySqlDB &rdb)
  606. {
  607. time_t nTs = _INVALID_TIMESTAMP_VALUE;
  608. std::string sSql;
  609. do
  610. {
  611. sSql = formatString("select unix_timestamp(max(`tslog`)) from `%s`", m_dlp.szLogsTable);
  612. CMySqlResult res = rdb.Query(sSql.c_str());
  613. if(res.error())
  614. {
  615. m_lf.Error("CDataLogger::GetLastLogTimestamp: DB Error: %s\n", rdb.LastError().c_str());
  616. break;
  617. }
  618. else
  619. {
  620. CMySqlVar val;
  621. my_ulonglong nRowCount = res.RowCount();
  622. unsigned int nFldCount = res.FieldCount();
  623. const MYSQL_FIELD *pFields = res.FetchFields();
  624. if(nRowCount != 1 || nFldCount != 1 || !pFields)
  625. {
  626. // m_lf.Error("CDataLogger::GetLastLogTimestamp: Unexpected error!\n");
  627. break;
  628. }
  629. MYSQL_ROW pRow = res.FetchRow();
  630. if( !pRow ||
  631. !val.FromField(pFields[0], pRow[0]))
  632. {
  633. // m_lf.Error("CDataLogger::GetLastLogTimestamp: Unexpected error!\n");
  634. break;
  635. }
  636. nTs = (time_t)(uint64_t)val;
  637. }
  638. }
  639. while(false);
  640. return nTs;
  641. }
  642. /////////////////////////////////////////////////////////////////////////////
  643. bool CDataLogger::Flush(time_t nTimestamp)
  644. {
  645. ::pthread_mutex_lock(&m_condmtx1);
  646. bool bGoodBadTransition = false, bSgInProg = m_bSGInProgress;
  647. ::pthread_mutex_unlock(&m_condmtx1);
  648. time_t nMaxLogTimestamp = 0, nFirstGoodTimestamp = 0;
  649. size_t nCountValidDate = m_logs.size();
  650. size_t nCountBadDate = m_logsBD.size();
  651. if(!nCountValidDate && !nCountBadDate)
  652. return true; // nothing to do
  653. TRACE("Trying to flush %zu logs ...\n", nCountValidDate + nCountBadDate);
  654. if(bSgInProg && nCountValidDate)
  655. {
  656. TRACE("Flush: SG in progress - flushing of %zu logs deferred!\n", nCountValidDate);
  657. m_lf.Info("Flush: SG in progress - flushing of %zu logs deferred!\n", nCountValidDate);
  658. nCountValidDate = 0; // size guard operates on the valid-date-logs-table only, so we can safely flush invalid-date-logs
  659. if(!nCountBadDate)
  660. return true; // no error
  661. }
  662. if(!m_bBadDateLogsDetected && nCountBadDate)
  663. {
  664. m_bBadDateLogsDetected = true;
  665. bGoodBadTransition = true;
  666. }
  667. else if(m_bBadDateLogsDetected && !nCountBadDate)
  668. {
  669. m_bBadDateLogsDetected = false;
  670. bGoodBadTransition = true;
  671. }
  672. CMySqlDB db;
  673. std::string strUnlock, strSaveFgnKey, strDisableFgnKey, strRestoreFgnKey, strSaveTZ, strSetTZ, strRestoreTZ;
  674. bool bError = false;
  675. std::string sSql;
  676. if(!db.Connect("localhost", m_dlp.szDBUser, m_dlp.szDBPass, m_dlp.szDBName))
  677. {
  678. m_lf.Error("CDataLogger::Flush: DB Error: %s\n", db.LastError().c_str());
  679. return false;
  680. }
  681. strUnlock = formatString("unlock tables");
  682. strSaveFgnKey = formatString("set @old_foreign_key_checks = @@foreign_key_checks");
  683. strDisableFgnKey = formatString("set foreign_key_checks = 0");
  684. strRestoreFgnKey = formatString("set foreign_key_checks = @old_foreign_key_checks");
  685. strSaveTZ = formatString("set @old_time_zone = @@time_zone");
  686. strSetTZ = formatString("set time_zone = '+00:00'");
  687. strRestoreTZ = formatString("set time_zone = @old_time_zone");
  688. db.Query(strSaveTZ.c_str());
  689. db.Query(strSetTZ.c_str());
  690. db.Query(strSaveFgnKey.c_str());
  691. db.Query(strDisableFgnKey.c_str());
  692. if(nCountValidDate > 0)
  693. {
  694. std::string strSql, strLock;
  695. strLock = formatString("lock tables `%s` write", m_dlp.szLogsTable);
  696. strSql.reserve(130 + 111 * nCountValidDate);
  697. auto itFirst = m_logs.begin();
  698. const DL_LOG_ENTRY &rle0 = *itFirst;
  699. if(nMaxLogTimestamp < rle0.nTimestamp)
  700. nMaxLogTimestamp = rle0.nTimestamp;
  701. nFirstGoodTimestamp = rle0.nTimestamp;
  702. if(m_dlp.bMinMax && _IS_INTERVAL_LOGTYPE(rle0.lt))
  703. sSql = formatString("insert into `%s` (`tagid`, `tslog`, `value`, `valueMin`, `valueMax`) values (%lu, timestamp(from_unixtime(%lu)), %.20g, %.20g, %.20g)", m_dlp.szLogsTable, rle0.nTagID, rle0.nTimestamp, rle0.fValue, rle0.fMin, rle0.fMax);
  704. else
  705. {
  706. if(!rle0.bNull)
  707. sSql = formatString("insert into `%s` (`tagid`, `tslog`, `value`, `valueMin`, `valueMax`) values (%lu, timestamp(from_unixtime(%lu)), %.20g, NULL, NULL)", m_dlp.szLogsTable, rle0.nTagID, rle0.nTimestamp, rle0.fValue);
  708. else
  709. sSql = formatString("insert into `%s` (`tagid`, `tslog`, `value`, `valueMin`, `valueMax`) values (%lu, timestamp(from_unixtime(%lu)), NULL, NULL, NULL)", m_dlp.szLogsTable, rle0.nTagID, rle0.nTimestamp);
  710. }
  711. strSql = sSql;
  712. for(++itFirst; itFirst < m_logs.end(); itFirst++)
  713. {
  714. const DL_LOG_ENTRY &rle = *itFirst;
  715. if(nMaxLogTimestamp < rle.nTimestamp)
  716. nMaxLogTimestamp = rle.nTimestamp;
  717. if(m_dlp.bMinMax && _IS_INTERVAL_LOGTYPE(rle.lt))
  718. sSql = formatString(",(%lu, timestamp(from_unixtime(%lu)), %.20g, %.20g, %.20g)", rle.nTagID, rle.nTimestamp, rle.fValue, rle.fMin, rle.fMax);
  719. else
  720. {
  721. if(!rle.bNull)
  722. sSql = formatString(",(%lu, timestamp(from_unixtime(%lu)), %.20g, NULL, NULL)", rle.nTagID, rle.nTimestamp, rle.fValue);
  723. else
  724. sSql = formatString(",(%lu, timestamp(from_unixtime(%lu)), NULL, NULL, NULL)", rle.nTagID, rle.nTimestamp);
  725. }
  726. strSql += sSql;
  727. }
  728. if(TryLock())
  729. {
  730. db.Query(strLock.c_str());
  731. CMySqlResult res = db.Query(strSql.c_str());
  732. bError = res.error();
  733. db.Query(strUnlock.c_str());
  734. Unlock();
  735. if(bError)
  736. m_lf.Error("CDataLogger::Flush: DB Error: %s\n", db.LastError().c_str());
  737. else
  738. m_nLastLogTimestamp = nMaxLogTimestamp;
  739. m_logs.clear();
  740. }
  741. }
  742. if(bGoodBadTransition)
  743. {
  744. if(m_bBadDateLogsDetected)
  745. {
  746. bool bError;
  747. char szTs[64];
  748. unsigned long long nNextAIVal = 0;
  749. sSql = formatString("select auto_increment from `information_schema`.`TABLES` where `TABLE_SCHEMA` = '%s' and `TABLE_NAME` = '%s';", m_dlp.szDBName, m_dlp.szLogsTableBD);
  750. CMySqlResult res = db.Query(sSql.c_str());
  751. if(!(bError = res.error()))
  752. {
  753. CMySqlVar val;
  754. my_ulonglong nRowCount = res.RowCount();
  755. unsigned int nFldCount = res.FieldCount();
  756. const MYSQL_FIELD *pFields = res.FetchFields();
  757. do
  758. {
  759. if(nRowCount != 1 || nFldCount != 1 || !pFields)
  760. break;
  761. MYSQL_ROW pRow = res.FetchRow();
  762. if(!pRow || !*pRow || !**pRow)
  763. break;
  764. if(!val.FromField(pFields[0], pRow[0]))
  765. break;
  766. nNextAIVal = val;
  767. }
  768. while(false);
  769. }
  770. Timestamp2String(m_nLastLogTimestamp, szTs, sizeof(szTs));
  771. if(nNextAIVal)
  772. {
  773. TRACE("Flush: Transition valid -> invalid date detected! The last Timestamp in Table `%s` was '%s+00:00'. Subsequent logs will be written to table `%s` starting with `id` %llu.\n", m_dlp.szLogsTable, szTs, m_dlp.szLogsTableBD, nNextAIVal);
  774. m_lf.Warning("Flush: Transition valid -> invalid date detected! The last Timestamp in Table `%s` was '%s+00:00'. Subsequent logs will be written to table `%s` starting with `id` %llu.\n", m_dlp.szLogsTable, szTs, m_dlp.szLogsTableBD, nNextAIVal);
  775. }
  776. else
  777. {
  778. TRACE("Flush: Transition valid -> invalid date detected! The last Timestamp in Table `%s` was '%s+00:00'. Subsequent logs will be written to table `%s`.\n", m_dlp.szLogsTable, szTs, m_dlp.szLogsTableBD);
  779. m_lf.Warning("Flush: Transition valid -> invalid date detected! The last Timestamp in Table `%s` was '%s+00:00'. Subsequent logs will be written to table `%s`.\n", m_dlp.szLogsTable, szTs, m_dlp.szLogsTableBD);
  780. }
  781. }
  782. else
  783. {
  784. bool bError;
  785. char szTs[64];
  786. unsigned long long nLastID = 0;
  787. sSql = formatString("select max(`id`) from `%s`.`%s`;", m_dlp.szDBName, m_dlp.szLogsTableBD);
  788. CMySqlResult res = db.Query(sSql.c_str());
  789. if(!(bError = res.error()))
  790. {
  791. CMySqlVar val;
  792. my_ulonglong nRowCount = res.RowCount();
  793. unsigned int nFldCount = res.FieldCount();
  794. const MYSQL_FIELD *pFields = res.FetchFields();
  795. do
  796. {
  797. if(nRowCount != 1 || nFldCount != 1 || !pFields)
  798. break;
  799. MYSQL_ROW pRow = res.FetchRow();
  800. if(!pRow || !*pRow || !**pRow)
  801. break;
  802. if(!val.FromField(pFields[0], pRow[0]))
  803. break;
  804. nLastID = val;
  805. }
  806. while(false);
  807. }
  808. Timestamp2String(nFirstGoodTimestamp, szTs, sizeof(szTs));
  809. if(nLastID)
  810. {
  811. TRACE("Flush: Transition invalid -> valid date detected! The last `id` in table `%s` was %llu. Subsequent logs will be written to table `%s` beginning with timestamp '%s+00:00'.\n", m_dlp.szLogsTableBD, nLastID, m_dlp.szLogsTable, szTs);
  812. m_lf.Warning("Flush: Transition invalid -> valid date detected! The last `id` in table `%s` was %llu. Subsequent logs will be written to table `%s` beginning with timestamp '%s+00:00'.\n", m_dlp.szLogsTableBD, nLastID, m_dlp.szLogsTable, szTs);
  813. }
  814. else
  815. {
  816. TRACE("Flush: Transition invalid -> valid date detected! Subsequent logs will be written to table `%s` beginning with timestamp '%s+00:00'.\n", m_dlp.szLogsTable, szTs);
  817. m_lf.Warning("Flush: Transition invalid -> valid date detected! Subsequent logs will be written to table `%s` beginning with timestamp '%s+00:00'.\n", m_dlp.szLogsTable, szTs);
  818. }
  819. }
  820. }
  821. if(nCountBadDate > 0)
  822. {
  823. std::string strSql, strLock;
  824. strLock = formatString("lock tables `%s` write", m_dlp.szLogsTableBD);
  825. strSql.reserve(130 + 111 * nCountBadDate);
  826. auto itFirst = m_logsBD.begin();
  827. const DL_LOG_ENTRY &rle0 = *itFirst;
  828. if(m_dlp.bMinMax && _IS_INTERVAL_LOGTYPE(rle0.lt))
  829. sSql = formatString("insert into `%s` (`tagid`, `tslog`, `value`, `valueMin`, `valueMax`) values (%lu, timestamp(from_unixtime(%lu)), %.20g, %.20g, %.20g)", m_dlp.szLogsTableBD, rle0.nTagID, rle0.nTimestamp, rle0.fValue, rle0.fMin, rle0.fMax);
  830. else
  831. {
  832. if(!rle0.bNull)
  833. sSql = formatString("insert into `%s` (`tagid`, `tslog`, `value`, `valueMin`, `valueMax`) values (%lu, timestamp(from_unixtime(%lu)), %.20g, NULL, NULL)", m_dlp.szLogsTableBD, rle0.nTagID, rle0.nTimestamp, rle0.fValue);
  834. else
  835. sSql = formatString("insert into `%s` (`tagid`, `tslog`, `value`, `valueMin`, `valueMax`) values (%lu, timestamp(from_unixtime(%lu)), NULL, NULL, NULL)", m_dlp.szLogsTableBD, rle0.nTagID, rle0.nTimestamp);
  836. }
  837. strSql = sSql;
  838. for(++itFirst; itFirst < m_logsBD.end(); itFirst++)
  839. {
  840. const DL_LOG_ENTRY &rle = *itFirst;
  841. if(m_dlp.bMinMax && _IS_INTERVAL_LOGTYPE(rle.lt))
  842. sSql = formatString(",(%lu, timestamp(from_unixtime(%lu)), %.20g, %.20g, %.20g)", rle.nTagID, rle.nTimestamp, rle.fValue, rle.fMin, rle.fMax);
  843. else
  844. {
  845. if(!rle.bNull)
  846. sSql = formatString(",(%lu, timestamp(from_unixtime(%lu)), %.20g, NULL, NULL)", rle.nTagID, rle.nTimestamp, rle.fValue);
  847. else
  848. sSql = formatString(",(%lu, timestamp(from_unixtime(%lu)), NULL, NULL, NULL)", rle.nTagID, rle.nTimestamp);
  849. }
  850. strSql += sSql;
  851. }
  852. db.Query(strLock.c_str());
  853. CMySqlResult res = db.Query(strSql.c_str());
  854. bError = res.error();
  855. db.Query(strUnlock.c_str());
  856. if(bError)
  857. m_lf.Error("CDataLogger::Flush: DB Error: %s\n", db.LastError().c_str());
  858. m_logsBD.clear();
  859. }
  860. db.Query(strRestoreFgnKey.c_str());
  861. db.Query(strRestoreTZ.c_str());
  862. return !bError;
  863. }
  864. /////////////////////////////////////////////////////////////////////////////
  865. unsigned long long CDataLogger::SizeGuardLastPassRead(void)
  866. {
  867. char szPath[PATH_MAX];
  868. sprintf(szPath, "%s/%s", m_szAppDir, _SIZE_GUARD_TIMESTAMP_FILE_NAME);
  869. FILE *pf = fopen(szPath, "rb");
  870. unsigned long long ts = 0;
  871. if(pf)
  872. {
  873. if(fread(&ts, sizeof(ts), 1, pf) != 1)
  874. ts = 0;
  875. fclose(pf);
  876. }
  877. return ts;
  878. }
  879. /////////////////////////////////////////////////////////////////////////////
  880. void CDataLogger::SizeGuardLastPassWrite(unsigned long long ts)
  881. {
  882. char szPath[PATH_MAX];
  883. sprintf(szPath, "%s/%s", m_szAppDir, _SIZE_GUARD_TIMESTAMP_FILE_NAME);
  884. FILE *pf = fopen(szPath, "wb");
  885. if(pf)
  886. {
  887. fwrite(&ts, sizeof(ts), 1, pf);
  888. fclose(pf);
  889. }
  890. }
  891. /////////////////////////////////////////////////////////////////////////////
  892. size_t CDataLogger::Timestamp2String(time_t t, char *pszBuffer, size_t nCbBuffer)
  893. {
  894. const struct tm * ptm = gmtime(&t);
  895. return strftime(pszBuffer, nCbBuffer, "%F %T", ptm);
  896. }
  897. /////////////////////////////////////////////////////////////////////////////
  898. const char* CDataLogger::Ns2String(unsigned long long nNs, char *pszBuffer, size_t nCbBuffer)
  899. {
  900. *pszBuffer = '\0';
  901. if(nNs < 1000)
  902. snprintf(pszBuffer, nCbBuffer, "%llu ns", nNs);
  903. else if(nNs < 1000000)
  904. snprintf(pszBuffer, nCbBuffer, "%.1f us", (double)nNs / 1000.0);
  905. else
  906. return Ms2String((double)nNs / 1000000.0, pszBuffer, nCbBuffer);
  907. return pszBuffer;
  908. }
  909. /////////////////////////////////////////////////////////////////////////////
  910. const char* CDataLogger::Ms2String(double fMs, char *pszBuffer, size_t nCbBuffer)
  911. {
  912. *pszBuffer = '\0';
  913. if(fMs < 1000.0)
  914. snprintf(pszBuffer, nCbBuffer, "%.2f ms", fMs);
  915. else if(fMs < 60000.0)
  916. snprintf(pszBuffer, nCbBuffer, "%.2f sec", fMs / 1000.0);
  917. else if(fMs < 3600000.0)
  918. snprintf(pszBuffer, nCbBuffer, "%.2f min", fMs / 60000.0);
  919. else
  920. snprintf(pszBuffer, nCbBuffer, "%.2f h", fMs / 3600000.0);
  921. return pszBuffer;
  922. }
  923. /////////////////////////////////////////////////////////////////////////////
  924. bool CDataLogger::DoSizeGuard(void)
  925. {
  926. if(!m_bSGConfigured)
  927. return true;
  928. CMySqlDB db;
  929. bool bError;
  930. MYSQL_ROW pRow;
  931. unsigned long long nElapsed;
  932. char szT1[32], szT2[32], szMs[32];
  933. CProcessClock pc;
  934. CMySqlVar vMinTs, vCountDelete;
  935. int64_t nMinTs, nTsCurMidnightUTC, nTsDeleteUpTo;
  936. std::string sSql;
  937. /////////////////////////////////////////////////////////////////////////
  938. nTsCurMidnightUTC = _MIDNIGHT_TIMESTAMP_UTC(m_nSGCurPassUTC);
  939. if(SizeGuardDayWorkDone(nTsCurMidnightUTC))
  940. return true;
  941. m_nSGLastPassUTC = nTsCurMidnightUTC;
  942. if(SizeGuardLastPassRead() == (unsigned long long)(nTsCurMidnightUTC))
  943. return true;
  944. SizeGuardLastPassWrite(nTsCurMidnightUTC);
  945. /////////////////////////////////////////////////////////////////////////
  946. if(!db.Connect("localhost", m_dlp.szDBUser, m_dlp.szDBPass, NULL))
  947. {
  948. m_lf.Error("CDataLogger::DoSizeGuard(%d): - DB Error: %s\n", __LINE__, db.LastError().c_str());
  949. return false;
  950. }
  951. sSql = formatString("select unix_timestamp(min(`tslog`)) from `%s`.`%s`;", m_dlp.szDBName, m_dlp.szLogsTable);
  952. m_lf.Info("SG: \"%s\".\n", sSql.c_str());
  953. pc.ClockTrigger();
  954. CMySqlResult res = db.Query(sSql.c_str());
  955. nElapsed = pc.ClockGetElapsed();
  956. bError = res.error();
  957. if(bError)
  958. {
  959. m_lf.Error("CDataLogger::DoSizeGuard(%d): \"%s\" - DB Error: %s\n", __LINE__, sSql.c_str(), db.LastError().c_str());
  960. return false;
  961. }
  962. if(!(pRow = res.FetchRow()))
  963. {
  964. m_lf.Error("CDataLogger::DoSizeGuard(%d): Unexpected Error!\n", __LINE__);
  965. return false;
  966. }
  967. my_ulonglong nRowCount = res.RowCount();
  968. unsigned int nFldCount = res.FieldCount();
  969. const MYSQL_FIELD *pFields = res.FetchFields();
  970. if(nRowCount != 1 || nFldCount != 1)
  971. {
  972. m_lf.Error("CDataLogger::DoSizeGuard(%d): Unexpected Error!\n", __LINE__);
  973. return false;
  974. }
  975. if(!vMinTs.FromField(pFields[0], pRow[0]))
  976. {
  977. m_lf.Error("CDataLogger::DoSizeGuard(%d): Unexpected Error!\n", __LINE__);
  978. return false;
  979. }
  980. nMinTs = vMinTs;
  981. m_lf.Info("SG: operation completed in %s. Min. timestamp: %ju\n", Ns2String(nElapsed, szMs, sizeof(szMs)), nMinTs);
  982. /////////////////////////////////////////////////////////////////////////
  983. // process max. size
  984. if(m_bSGHasSizeLimitPrerequisites && m_dlp.nMaxSize)
  985. {
  986. unsigned long long nFileSize = TableFileSize(m_dlp.szLogsTable);
  987. unsigned long long nGuardThreshold = m_dlp.nMaxSize * (100 - _FILE_SIZE_DELETE_MARGIN_PERCENT) / 100;
  988. m_lf.Info("SG: File size: %s.\n", strFormatByteSize(nFileSize, 1).c_str());
  989. if(nFileSize > nGuardThreshold)
  990. {
  991. Timestamp2String(nTsCurMidnightUTC - _SECONDS_PER_DAY, szT1, sizeof(szT1));
  992. Timestamp2String(nTsCurMidnightUTC - 1, szT2, sizeof(szT2));
  993. sSql = formatString("select count(*) from `%s`.`%s` where `tslog` between '%s' and '%s';", m_dlp.szDBName, m_dlp.szLogsTable, szT1, szT2);
  994. m_lf.Info("SG: \"%s\"\n", sSql.c_str());
  995. pc.ClockTrigger();
  996. CMySqlResult res = db.Query(sSql.c_str());
  997. bError = res.error();
  998. if(bError)
  999. {
  1000. m_lf.Error("CDataLogger::DoSizeGuard(%d): \"%s\" - DB Error: %s\n", __LINE__, sSql.c_str(), db.LastError().c_str());
  1001. return false;
  1002. }
  1003. else
  1004. {
  1005. nElapsed = pc.ClockGetElapsed();
  1006. nRowCount = res.RowCount();
  1007. nFldCount = res.FieldCount();
  1008. pFields = res.FetchFields();
  1009. pRow = res.FetchRow();
  1010. if(pRow && pFields && nRowCount == 1 && nFldCount == 1 && vCountDelete.FromField(pFields[0], pRow[0]))
  1011. {
  1012. unsigned long long nCountDelete = (uint64_t)vCountDelete;
  1013. m_lf.Info("SG: operation completed in %s. Estimated number of rows to delete: %llu.\n", Ns2String(nElapsed, szMs, sizeof(szMs)), nCountDelete);
  1014. if(nCountDelete > 0)
  1015. {
  1016. #if _USE_MODIFIED_INDEX
  1017. sSql = formatString("delete from `%s`.`%s` limit %llu;", m_dlp.szDBName, m_dlp.szLogsTable, nCountDelete);
  1018. #else // _USE_MODIFIED_INDEX
  1019. sSql = formatString("delete from `%s`.`%s` where `tslog` <= (select `tslog` from (select `tslog` from `demo`.`logs` order by `tslog` asc limit %llu, 1) x);", m_dlp.szDBName, m_dlp.szLogsTable, nCountDelete - 1);
  1020. #endif // _USE_MODIFIED_INDEX
  1021. m_lf.Info("SG: triggered on size limit: \"%s\".\n", sSql.c_str());
  1022. pc.ClockTrigger();
  1023. if(db.Query(sSql.c_str()).error())
  1024. {
  1025. m_lf.Error("CDataLogger::DoSizeGuard(%d): \"%s\" - DB Error: %s\n", __LINE__, sSql.c_str(), db.LastError().c_str());
  1026. return false;
  1027. }
  1028. else
  1029. {
  1030. nElapsed = pc.ClockGetElapsed();
  1031. m_lf.Info("SG: operation completed in %s.\n", Ns2String(nElapsed, szMs, sizeof(szMs)));
  1032. }
  1033. }
  1034. }
  1035. else
  1036. {
  1037. m_lf.Error("CDataLogger::DoSizeGuard(%d): Unexpected Error!\n", __LINE__);
  1038. return false;
  1039. }
  1040. }
  1041. }
  1042. }
  1043. /////////////////////////////////////////////////////////////////////////
  1044. // process max. age entries
  1045. if(m_dlp.nMaxAge)
  1046. {
  1047. nTsDeleteUpTo = nTsCurMidnightUTC - m_dlp.nMaxAge * _SECONDS_PER_DAY;
  1048. if(nMinTs <= nTsDeleteUpTo)
  1049. {
  1050. Timestamp2String(nTsDeleteUpTo, szT1, sizeof(szT1));
  1051. sSql = formatString("delete from `%s`.`%s` where `tslog` < '%s';", m_dlp.szDBName, m_dlp.szLogsTable, szT1);
  1052. m_lf.Info("SG: triggered on age limit: \"%s\".\n", sSql.c_str());
  1053. pc.ClockTrigger();
  1054. CMySqlResult res = db.Query(sSql.c_str());
  1055. nElapsed = pc.ClockGetElapsed();
  1056. bError = res.error();
  1057. if(bError)
  1058. {
  1059. m_lf.Error("CDataLogger::DoSizeGuard(%d): DB Error: %s\n", __LINE__, db.LastError().c_str());
  1060. return false;
  1061. }
  1062. else
  1063. {
  1064. m_lf.Info("SG: operation completed in %s.\n", Ns2String(nElapsed, szMs, sizeof(szMs)));
  1065. }
  1066. }
  1067. }
  1068. return true;
  1069. }
  1070. /////////////////////////////////////////////////////////////////////////////
  1071. /////////////////////////////////////////////////////////////////////////////
  1072. /////////////////////////////////////////////////////////////////////////////
  1073. // size guard worker thread
  1074. void* CDataLogger::SizeGuardWorker(void* pParam)
  1075. {
  1076. unsigned long long nElapsed;
  1077. char szMs[32];
  1078. CProcessClock pc;
  1079. CDataLogger *pThis = reinterpret_cast<CDataLogger*>(pParam);
  1080. ::pthread_mutex_lock(&pThis->m_condmtx1);
  1081. ::pthread_cond_signal(&pThis->m_cond1);
  1082. while(true)
  1083. {
  1084. pThis->m_bSGInProgress = false;
  1085. ::pthread_cond_wait(&pThis->m_cond1, &pThis->m_condmtx1);
  1086. pThis->m_bSGInProgress = true;
  1087. ::pthread_mutex_unlock(&pThis->m_condmtx1);
  1088. TRACE("Size guard start.\n");
  1089. pThis->Lock();
  1090. pc.ClockTrigger();
  1091. pThis->DoSizeGuard();
  1092. nElapsed = pc.ClockGetElapsed();
  1093. pThis->Unlock();
  1094. pThis->m_lf.Info("SG: finished in %s.\n", Ns2String(nElapsed, szMs, sizeof(szMs)));
  1095. TRACE("Size guard end - [%s].\n", szMs);
  1096. ::pthread_yield();
  1097. ::pthread_mutex_lock(&pThis->m_condmtx1);
  1098. }
  1099. return NULL;
  1100. }