///////////////////////////////////////////////////////////////////////////// // #include #include "fileutil.h" #include "strutil.h" #include "summarist.h" #include "debug.h" #define _SUMMARIST_TIMESTAMP_FILE_NAME "sum.ts" #define _SECS_PER_DAY 86400 static const unsigned long g_nFactors[] = {1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 15, 16, 18, 20, 24, 25, 30, 36, 40, 45, 48, 50, 60, 72, 75, 80, 90, 100, 120, 144, 150, 180, 200, 225, 240, 300, 360, 400, 450, 600, 720, 900, 1200, 1800, 3600}; ///////////////////////////////////////////////////////////////////////////// // CSummarist::CSummarist(LPCSUMMARIST_PARAMS psup, CLogfile &rlf) : m_rlf(rlf), m_nProcFrequ(1), m_tzo(_SUM_INVALID_TIMESTAMP_VALUE), m_tsBase(_SUM_INVALID_TIMESTAMP_VALUE) { memset(&m_sup, 0, sizeof(m_sup)); if(psup) memcpy(&m_sup, psup, sizeof(m_sup)); if(m_sup.pszBaseDir) strncpy(m_szAppDir, m_sup.pszBaseDir, sizeof(m_szAppDir) - 1); else ::GetAppDirectory(m_szAppDir, sizeof(m_szAppDir)); } CSummarist::~CSummarist(void) { Release(); } ///////////////////////////////////////////////////////////////////////////// // void CSummarist::Release(void) { m_vTimeWnds.clear(); m_vTimeFrameStart.clear(); m_ilTagList.clear(); } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::Initialze(const unsigned long *pTimeWnds, size_t nCntTimeWnds, bool bDropTables) { if(!InitTimeWnd(pTimeWnds, nCntTimeWnds)) return false; if(!InitTables(bDropTables)) return false; return true; } ///////////////////////////////////////////////////////////////////////////// // std::string CSummarist::CreateSumTableName(unsigned long tWnd) const { return formatString("%s_sum_%lu", m_sup.szLogsTable, tWnd); } std::string CSummarist::CreateSumTableName(int nIndex) const { unsigned long tWnd; if((tWnd = GetTimeWindow(nIndex))) return CreateSumTableName(tWnd); return ""; } ///////////////////////////////////////////////////////////////////////////// // time_t CSummarist::GetTimezoneOffset(CMySqlDB &rdb) { time_t tzo = _SUM_INVALID_TIMESTAMP_VALUE; CMySqlResult res = rdb.Query("SELECT TIME_TO_SEC(TIMEDIFF(NOW(), UTC_TIMESTAMP()))"); if(res.error()) { m_rlf.Error("CSummarist::GetTimezoneOffset: DB Error: %s\n", rdb.LastError().c_str()); } else { CMySqlVar valTZ; my_ulonglong nRowCount = res.RowCount(); unsigned int nFldCount = res.FieldCount(); const MYSQL_FIELD *pFields = res.FetchFields(); MYSQL_ROW pRow = res.FetchRow(); if(nRowCount == 1 && nFldCount == 1 && pFields && pRow && valTZ.FromField(pFields[0], pRow[0])) tzo = (time_t)(uint64_t)valTZ; } return tzo; } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::CreateTagList(CMySqlDB &rdb) { bool bRet; std::string sSql; m_ilTagList.clear(); sSql = formatString("SET SESSION group_concat_max_len = %u", _SUM_GROUP_CONCAT_MAX_LEN); if(rdb.Query(sSql.c_str()).error()) { m_rlf.Error("CSummarist::CreateTagList: DB Error: %s\n", rdb.LastError().c_str()); return false; } sSql = formatString("SELECT GROUP_CONCAT(`tagid` ORDER BY `tagid` SEPARATOR ',') FROM `%s`", m_sup.szITagsView); CMySqlResult res = rdb.Query(sSql.c_str()); if(!(bRet = !res.error())) { m_rlf.Error("CSummarist::CreateTagList: DB Error: %s\n", rdb.LastError().c_str()); } else { my_ulonglong nRowCount = res.RowCount(); unsigned int nFldCount = res.FieldCount(); const MYSQL_FIELD *pFields = res.FetchFields(); MYSQL_ROW pRow = res.FetchRow(); if((nRowCount == 1) && (nFldCount == 1) && pFields && pRow && pRow[0]) m_ilTagList = pRow[0]; else bRet = false; } return bRet; } ///////////////////////////////////////////////////////////////////////////// // unsigned long CSummarist::GetTimeWindow(size_t nIndex) const { if(nIndex < TimeWndCount()) return m_vTimeWnds[nIndex]; return 0; } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::Connect(CMySqlDB &rdb) { if(!rdb.Connect("localhost", m_sup.szDBUser, m_sup.szDBPass, NULL)) return false; else if(rdb.SelectDB(m_sup.szDBName)) { rdb.Close(); return false; } m_tzo = GetTimezoneOffset(rdb); rdb.Query("SET @OLD_FOREIGN_KEY_CHECKS = @@FOREIGN_KEY_CHECKS"); rdb.Query("SET FOREIGN_KEY_CHECKS = 0"); rdb.Query("SET @OLD_TIME_ZONE = @@TIME_ZONE"); return !rdb.Query("SET TIME_ZONE = '+00:00'").error(); } void CSummarist::Close(CMySqlDB &rdb) { rdb.Query("SET FOREIGN_KEY_CHECKS = @OLD_FOREIGN_KEY_CHECKS"); rdb.Query("SET TIME_ZONE = @OLD_TIME_ZONE"); rdb.Close(); } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::LockTables(CMySqlDB &rdb, const char *pszTableName) { std::string sSql = formatString("LOCK TABLES `%s` READ, `%s` WRITE", m_sup.szLogsTable, pszTableName); return !rdb.Query(sSql.c_str()).error(); } bool CSummarist::UnlockTables(CMySqlDB &rdb) { return !rdb.Query("UNLOCK TABLES").error(); } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::DropTables(CMySqlDB &rdb) { bool bError; std::string tbls; std::string sSql = formatString("SHOW TABLES LIKE '%s_sum_%%'", m_sup.szLogsTable); CMySqlResult res = rdb.Query(sSql.c_str()); if((bError = res.error())) { m_rlf.Error("CSummarist::DropTables: DB Error: %s\n", rdb.LastError().c_str()); } else { MYSQL_ROW pRow; if((pRow = res.FetchRow())) tbls = formatString("`%s`", *pRow); while((pRow = res.FetchRow())) tbls += formatString(", `%s`", *pRow); sSql = formatString("DROP TABLE IF EXISTS %s", tbls.c_str()); m_rlf.Warning("Dropping summary tables: %s!\n", tbls.c_str()); CMySqlResult res = rdb.Query(sSql.c_str()); if((bError = res.error())) m_rlf.Error("CSummarist::DropTables: DB Error: %s\n", rdb.LastError().c_str()); } return !bError; } ///////////////////////////////////////////////////////////////////////////// // const char* CSummarist::Timestamp2String(time_t t, char *pszBuffer, size_t nCbBuffer) { static char szTimestamp[128]; if(!pszBuffer) { pszBuffer = szTimestamp; nCbBuffer = sizeof(szTimestamp); } const struct tm * ptm = gmtime(&t); strftime(pszBuffer, nCbBuffer, "%F %T", ptm); return pszBuffer; } ///////////////////////////////////////////////////////////////////////////// // time_t CSummarist::ReadTimestampBase(void) { char szPath[PATH_MAX]; sprintf(szPath, "%s/%s", m_szAppDir, _SUMMARIST_TIMESTAMP_FILE_NAME); time_t ts = _SUM_INVALID_TIMESTAMP_VALUE; FILE *pf = fopen(szPath, "rb"); if(pf) { if(fread(&ts, sizeof(ts), 1, pf) != 1) ts = _SUM_INVALID_TIMESTAMP_VALUE; fclose(pf); } return ts; } ///////////////////////////////////////////////////////////////////////////// // time_t CSummarist::GetBaseTimestamp(time_t tsMinLog) { if(m_tsBase == _SUM_INVALID_TIMESTAMP_VALUE) { m_tsBase = ReadTimestampBase(); if(m_tsBase == _SUM_INVALID_TIMESTAMP_VALUE) { m_tsBase = tsMinLog - (tsMinLog % _SECS_PER_DAY); WriteTimestampBase(m_tsBase); } } return m_tsBase; } ///////////////////////////////////////////////////////////////////////////// // void CSummarist::WriteTimestampBase(time_t ts) { char szPath[PATH_MAX]; sprintf(szPath, "%s/%s", m_szAppDir, _SUMMARIST_TIMESTAMP_FILE_NAME); FILE *pf = fopen(szPath, "wb"); if(pf) { fwrite(&ts, sizeof(ts), 1, pf); fclose(pf); } } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::GetNextTimeFrame(CMySqlDB &rdb, size_t nIndex, time_t tsLast, time_t tsBase, time_t &rtsStart, time_t &rtsEnd) const { time_t tsNext, tWnd = (time_t)GetTimeWindow(nIndex); if(tWnd) { if(tsLast == _SUM_INVALID_TIMESTAMP_VALUE) tsLast = tsBase; if( GetNextTimestampFromLogs(rdb, tsLast, tsNext) && (tsNext != _SUM_INVALID_TIMESTAMP_VALUE)) rtsStart = tsNext - ((tsNext - tsBase) % tWnd); else rtsStart = tsLast - ((tsLast - tsBase) % tWnd) + tWnd; rtsEnd = rtsStart + tWnd; return true; } return false; } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::SetNextTimeFrameStart(size_t nIndex, time_t tsStart) { if(nIndex < (size_t)m_vTimeFrameStart.size()) { m_vTimeFrameStart[nIndex] = tsStart; return true; } return false; } ///////////////////////////////////////////////////////////////////////////// // time_t CSummarist::GetNextDueTimeFrameStart(void) const { time_t ts = _SUM_INVALID_TIMESTAMP_VALUE; for(auto i = m_vTimeFrameStart.begin(); i != m_vTimeFrameStart.end(); ++i) { time_t t = *i; if((t != _SUM_INVALID_TIMESTAMP_VALUE) && ((ts == _SUM_INVALID_TIMESTAMP_VALUE) || (ts > t))) ts = t; } return ts; } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::InitTables(bool bDropTables) { CMySqlDB db; m_rlf.Info("Connecting to database server @'localhost'.\n"); if(!Connect(db)) { m_rlf.Error("CSummarist::InitTables: DB Error: %s\n", db.LastError().c_str()); return false; } m_rlf.Info("Success!\n"); if(bDropTables) { return DropTables(db); } for(auto i = m_vTimeWnds.begin(); i != m_vTimeWnds.end(); ++i) { unsigned long tWnd = *i; std::string sName = CreateSumTableName(tWnd); std::string sSql = formatString("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`", sName.c_str(), m_sup.szLogsTable); m_rlf.Info("Creating summary table '%s' (if not exists).\n", sName.c_str()); CMySqlResult res = db.Query(sSql.c_str()); bool bError = res.error(); if(bError) { m_rlf.Error("CSummarist::InitTables: DB Error: %s\n", db.LastError().c_str()); return false; } else m_rlf.Info("Success!\n"); } std::string sSql = formatString("CREATE OR REPLACE ALGORITHM = MERGE VIEW `%s` AS SELECT `tagid` FROM `%s` WHERE `logType` IN ('IC', 'IU', 'ICR', 'IUR') ORDER BY `tagid`", m_sup.szITagsView, m_sup.szTagsTable); m_rlf.Info("Creating view '%s' (if not exists).\n", m_sup.szITagsView); CMySqlResult res = db.Query(sSql.c_str()); bool bError = res.error(); if(bError) { m_rlf.Error("CSummarist::InitTables: DB Error: %s\n", db.LastError().c_str()); return false; } else m_rlf.Info("Success!\n"); if(!CreateTagList(db)) return false; return true; } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::InitTimeWnd(const unsigned long *pTimeWnds, size_t nCntTimeWnds) { m_nProcFrequ = 1; if(m_sup.nLogIntv % 1000) { m_rlf.Error("Incompatible datalogger log interval: %lu\n", m_sup.nLogIntv); return false; } for(size_t i = 0; i < nCntTimeWnds; i++) { m_vTimeWnds.push_back(pTimeWnds[i]); m_vTimeFrameStart.push_back(_SUM_INVALID_TIMESTAMP_VALUE); } size_t nCountWnds = m_vTimeWnds.size(); if(!nCountWnds) { m_rlf.Error("No time window specified!\n"); return false; } else if(nCountWnds > 1) { // sort time window sizes ascending std::sort(m_vTimeWnds.begin(), m_vTimeWnds.end()); } unsigned long tWndMin = m_vTimeWnds[0]; for(auto i = m_vTimeWnds.begin(); i != m_vTimeWnds.end(); ++i) { unsigned long tWnd = *i; unsigned long nMod = tWnd % _SUM_TIME_WND_BASE; if(tWnd < (2 * m_sup.nLogIntv / 1000)) // a time window must at least contain two logs to not being useless! { m_rlf.Error("Time window length (%lu) must be at least a double of the log interval (%lu)!\n", tWnd, m_sup.nLogIntv / 1000); return false; } else if(nMod && (_SUM_TIME_WND_BASE % nMod)) // time window length must be either an integer factor or a multiple of an integer factor of 3600 { m_rlf.Error("Time window length must be either an integer factor or a multiple of an integer factor of 3600: %lu\n", tWnd); return false; } else if(tWndMin && (tWnd % tWndMin)) { tWndMin = 0; } } if(!tWndMin) { // find the greatest common divisor of all time windows. this will be the processing frequency starting at a full hour boundary. for(int j = (int)(_countof(g_nFactors) - 1); j >= 0; --j) { bool bDiv = true; unsigned long nFac = g_nFactors[j]; for(auto i = m_vTimeWnds.begin(); i != m_vTimeWnds.end(); ++i) { unsigned long tWnd = *i; if((nFac > tWnd) || (tWnd % nFac)) { bDiv = false; break; } } if(bDiv) { m_nProcFrequ = nFac; m_rlf.Info("Processing frequency: %lu\n", m_nProcFrequ); break; } } } else { m_nProcFrequ = tWndMin; m_rlf.Info("Processing frequency: %lu\n", m_nProcFrequ); } return true; } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::GetLastSummarizeTimestamp(CMySqlDB &rdb, size_t nIndex, time_t &rtsLast) const { unsigned long tWnd = GetTimeWindow(nIndex); if(tWnd) { std::string sTbl = CreateSumTableName(tWnd); return GetLastSummarizeTimestamp(rdb, sTbl.c_str(), rtsLast); } rtsLast = _SUM_INVALID_TIMESTAMP_VALUE; return false; } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::GetLastSummarizeTimestamp(CMySqlDB &rdb, const char *pszTableName, time_t &rtsLast) const { rtsLast = _SUM_INVALID_TIMESTAMP_VALUE; std::string sSql = formatString("SELECT UNIX_TIMESTAMP(MAX(`tslog`)) FROM `%s`", pszTableName); CMySqlResult res = rdb.Query(sSql.c_str()); if(res.error()) { m_rlf.Error("CSummarist::GetLastSummarizeTimestamp: DB Error: %s\n", rdb.LastError().c_str()); } else { do { CMySqlVar val; my_ulonglong nRowCount = res.RowCount(); unsigned int nFldCount = res.FieldCount(); const MYSQL_FIELD *pFields = res.FetchFields(); if(nRowCount != 1 || nFldCount != 1 || !pFields) { break; } MYSQL_ROW pRow = res.FetchRow(); if( !pRow || !val.FromField(pFields[0], pRow[0])) { break; } rtsLast = (time_t)(uint64_t)val; } while(false); return true; } return false; } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::GetMinMaxTimestampFromLogs(CMySqlDB &rdb, time_t &rMin, time_t &rMax) const { bool bRet = true; int nCntTs = 0; std::string sSql[2]; CMySqlVar val[2]; rMin = _SUM_INVALID_TIMESTAMP_VALUE; rMax = _SUM_INVALID_TIMESTAMP_VALUE; sSql[0] = formatString("SELECT UNIX_TIMESTAMP(`tslog`) FROM `%s` WHERE (`tagid` IN (%s)) AND (`value` IS NOT NULL) ORDER BY `tslog` ASC LIMIT 1;", m_sup.szLogsTable, m_ilTagList.c_str()); sSql[1] = formatString("SELECT UNIX_TIMESTAMP(`tslog`) FROM `%s` WHERE (`tagid` IN (%s)) AND (`value` IS NOT NULL) ORDER BY `tslog` DESC LIMIT 1;", m_sup.szLogsTable, m_ilTagList.c_str()); for(int i = 0; i < 2; ++i) { CMySqlResult res = rdb.Query(sSql[i].c_str()); if(res.error()) { m_rlf.Error("CSummarist::GetMinMaxTimestampFromLogs: DB Error: %s\n", rdb.LastError().c_str()); bRet = false; break; } else { my_ulonglong nRowCount = res.RowCount(); unsigned int nFldCount = res.FieldCount(); const MYSQL_FIELD *pFields = res.FetchFields(); MYSQL_ROW pRow = res.FetchRow(); if(nRowCount != 1 || nFldCount != 1 || !pFields || !pRow || !val[i].FromField(pFields[0], pRow[0])) break; ++nCntTs; } } if(bRet && nCntTs == 2) { rMin = (time_t)(uint64_t)val[0]; rMax = (time_t)(uint64_t)val[1]; } return bRet; } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::GetNextTimestampFromLogs(CMySqlDB &rdb, time_t tsLast, time_t &rNext) const { std::string sSql = formatString("SELECT UNIX_TIMESTAMP(`tslog`) FROM `%s` WHERE (`tslog` > '%s') AND (`value` IS NOT NULL) AND (`tagid` IN (%s)) ORDER BY `tslog` ASC LIMIT 1;", m_sup.szLogsTable, Timestamp2String(tsLast), m_ilTagList.c_str()); CMySqlResult res = rdb.Query(sSql.c_str()); if(res.error()) { m_rlf.Error("CSummarist::GetNextTimestampFromLogs: DB Error: %s\n", rdb.LastError().c_str()); return false; } else { CMySqlVar valNext; my_ulonglong nRowCount = res.RowCount(); unsigned int nFldCount = res.FieldCount(); const MYSQL_FIELD *pFields = res.FetchFields(); MYSQL_ROW pRow = res.FetchRow(); if(nRowCount != 1 || nFldCount != 1 || !pFields) { rNext = _SUM_INVALID_TIMESTAMP_VALUE; } else if(!pRow || !valNext.FromField(pFields[0], pRow[0])) { rNext = _SUM_INVALID_TIMESTAMP_VALUE; } else { rNext = (time_t)(uint64_t)valNext; } return true; } } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::Summarize(CMySqlDB &rdb, size_t nIndex, time_t tsFrom, time_t tsTo) { if(nIndex >= TimeWndCount()) { m_rlf.Error("CSummarist::Summarize: invalid index: %z\n", nIndex); return false; } #define _SUM_FMT_STRING "REPLACE INTO `%s` " \ "SELECT `tagid`, MAX(`tslog`) `tslog`, AVG(`value`) `value`, MIN(`valueMin`) `valueMin`, MAX(`valueMax`) `valueMax` FROM `%s` " \ "WHERE (`tslog` BETWEEN '%s' AND '%s') AND (`value` IS NOT NULL) AND (`%s`.`tagid` IN (%s)) " \ "GROUP BY `%s`.`tagid`" char szFrom[64], szTo[64]; std::string sTName = CreateSumTableName((int)nIndex); Timestamp2String(tsFrom, szFrom, sizeof(szFrom)); Timestamp2String(tsTo - 1, szTo, sizeof(szTo)); std::string sSql = formatString(_SUM_FMT_STRING, sTName.c_str(), m_sup.szLogsTable, szFrom, szTo, m_sup.szLogsTable, m_ilTagList.c_str(), m_sup.szLogsTable); std::string sTbl = formatString("%s, %s", sTName.c_str(), m_sup.szLogsTable); TRACE("%-16s- Summarize: %s <= logs < %s\n", sTName.c_str(), szFrom, Timestamp2String(tsTo)); if(!LockTables(rdb, sTName.c_str())) { m_rlf.Error("CSummarist::Summarize: DB Error: %s\n", rdb.LastError().c_str()); return false; } CMySqlResult res = rdb.Query(sSql.c_str()); if(res.error()) { m_rlf.Error("CSummarist::Summarize: DB Error: %s\n", rdb.LastError().c_str()); return false; } UnlockTables(rdb); return true; } ///////////////////////////////////////////////////////////////////////////// // bool CSummarist::ProcessOutdated(CMySqlDB &rdb, size_t nIndex, time_t tsUpTo) { if(nIndex >= TimeWndCount()) { m_rlf.Error("CSummarist::ProcessOutdated: invalid index: %z\n", nIndex); return false; } char szUpTo[64]; std::string sTName = CreateSumTableName((int)nIndex); Timestamp2String(tsUpTo, szUpTo, sizeof(szUpTo)); std::string sSql = formatString("DELETE FROM `%s` where `tslog` < '%s'", sTName.c_str(), szUpTo); CMySqlResult res = rdb.Query(sSql.c_str()); if(res.error()) { m_rlf.Error("CSummarist::ProcessOutdated: DB Error: %s\n", rdb.LastError().c_str()); return false; } TRACE("%-16s- ProcessOutdated: sumlogs < %s\n", sTName.c_str(), szUpTo); return true; }