diff --git a/.gitignore b/.gitignore index 9ba131a..2719f45 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ middleman middleman_runs.log stop quit +slack_web_hook.txt diff --git a/JsonParser.cpp b/JsonParser.cpp index a944309..5c18793 100644 --- a/JsonParser.cpp +++ b/JsonParser.cpp @@ -132,6 +132,12 @@ bool JSONP::ScanJsonArray(const std::string& thejson, JsonParserResult& result){ return true; } + // empty array + if(thejson.find_first_not_of(" ,\n")==std::string::npos){ + result.type=JsonParserResultType::empty; + return true; + } + // json arrays are annoyingly flexible; they can be homogeneous, // containing ints, floats, strings, bools, nulls or objects, // but they can also be inhomogeneous combining elements of all these different types @@ -183,22 +189,22 @@ bool JSONP::ScanJsonArray(const std::string& thejson, JsonParserResult& result){ // could be floats or strings, or arrays or objects } // rule out integers by anything other than digits and signs - if(thejson.find_first_not_of("01234567890+-, ")!=std::string::npos){ + if(thejson.find_first_not_of("01234567890+-, \n")!=std::string::npos){ if(verbose) std::cout<<"found something other than digits: can't be ints"< delimiters; - if(verbose) std::cout<<"scanning remaining string: " + if(verbose) std::cout<<"scanning remaining string: " <0) std::cout<<", "; std::cout<0) std::cout<<", "; std::cout< + +int main(int argc, const char** argv){ +// std::string testjson="{\"not true\": [0, false], \"true\": true, \"not null\": [0, 1, false, true, {\"obj\": null}, \"a string\"] }"; +// std::string testjson="{}"; +// std::string testjson="[0]"; +// std::string testjson="{\"Sally\": \"Father McKenzie, I thought you were DEAD!\", \"McKenzie\": \"I vas!\"}"; +// std::string testjson="[{ \"why\":null} ]"; +// std::string testjson="{ \"xvals\":[\"2022-09-23 02:04:55\",\"2022-09-23 01:48:08\",\"2022-09-23 01:31:45\"], \"yvals\":[0.497962,0.502168,0.498470] }"; +// std::string testjson="{\"purefit\":1, \"fits\":[ {\"method\":\"raw\", \"absfit\":1, \"peakdiff\":0.135514, \"gdconc\":0.055772 }, {\"method\":\"simple\", \"absfit\":1, \"peakdiff\":0.118749, \"gdconc\":0.055036 }, {\"method\":\"complex\", \"absfit\":1, \"peakdiff\":0.120748, \"gdconc\":0.055113 } ] }"; +// std::string testjson="{\"runnum\":128,\"start\":\"2022-09-09T00:21:09.356056\",\"stop\":null,\"runconfig\":1,\"notes\":\"\nfirst in-situ run with database, marcusanalysis, transparency and web page updating.\",\"git_tag\":\"V2.1.2-76-g2b4c532\", \"output_file\": \"05SeptEGADS\" }"; +// std::string testjson="[ 1, 2, 3.4 ]"; // returns std::vector +// all the following convert to std::vector +// std::string testjson="[ 1, true ]"; +// std::string testjson="[ 1, 1.1, true ]"; +// std::string testjson="[ 1, 2.2, \"three\"]"; +// std::string testjson="[ 1, null ]"; +// std::string testjson="[ 1, true, null ]"; +// std::string testjson="[ 1, true, null, \"potato\" ]"; +// std::string testjson="[ [1,2,3], [4.4,5.5,6.6] ]"; +// std::string testjson="{\"VME\": 1, \"PGTool\": 2, \"SlackBot\": 3, \"RunControl\": 4}"; + +// // adapted from https://github.com/briandfoy/json-acceptance-tests, pass1 + std::string testjson= + "{\n" +// "\"integer\": 1234567890,\n" +// "\"real\": -9876.543210,\n" +// "\"e\": 0.123456789e-12,\n" +// "\"E\": 1.234567890E+34,\n" +// "\"\": 23456789012E66,\n" +// "\"zero\": 0,\n" +// "\"one\": 1,\n" +// "\"space\": \" \",\n" +// "\"quote\": \"\\\"\",\n" +// "\"backslash\": \"\\\\\",\n" +// "\"controls\": \"\\b\\f\\n\\r\\t\",\n" +// "\"slash\": \"/ & \\/\",\n" +// "\"alpha\": \"abcdefghijklmnopqrstuvwyz\",\n" +// "\"no value\":,\n" +// "\"ALPHA\": \"ABCDEFGHIJKLMNOPQRSTUVWYZ\",\n" +// "\"digit\": \"0123456789\",\n" +// "\"0123456789\": \"digit\",\n" +// "\"special\": \"`1~!@#$%^&*()_+-={':[,]}|;.?\",\n" +// "\"hex\": \"\\u0123\\u4567\\u89AB\\uCDEF\\uabcd\\uef4A\",\n" +// "\"true\": true,\n" +// "\"false\": false,\n" +// "\"null\": null,\n" +// "\"array\":[ ],\n" +// "\"object\":{ },\n" + "\"address\": \"50 St. James Street\",\n" +// "\"url\": \"http://www.JSON.org/\",\n" + "\"comment\": \"// /* */\": \" \",\n" +// "\" s p a c e d \" :[1,2 , 3\n" +// "\n" +// ",\n" +// "\n" +// "4 , 5 , 6 ,7 ],\"compact\":[1,2,3,4,5,6,7,],\n" +// "\"jsontext\": \"{\\\"object with 1 member\\\":[\\\"array with 1 element\\\"]}\",\n" +// "\"quotes\": \"" \\u0022 %22 0x22 034 "\",\n" +// "\"\\/\\\\\\\"\\uCAFE\\uBABE\\uAB98\\uFCDE\\ubcda\\uef4A\\b\\f\\n\\r\\t`1~!@#$%^&*()_+-=[]{}|;:',./<>?\"\n" +// ": \"A key can be any string\"\n" + "}"; + + BStore store{}; + JSONP parser{}; +// parser.SetVerbose(true); + bool ok = parser.Parse(testjson, store); + std::cout<<"parser ok? "<1 && std::string(argv[1])=="1") store.Print(true); + else store.Print(false); + std::cout<<"================"< v; + BStore tmpstore; +// ok=store.Get("true",boolean); +// if(ok){ +// std::cout<<"true => "< "< '"< "< "< "< "< "< "< "< '"< '"< '"< '"< */", a_str); + std::cout<<"# -- --> */ => '"< ["; +// for(int i=0; i ["; +// for(int i=0; i ["; +// for(int i=0; i?",a_str); +// if(ok){ +// std::cout<<"\"\\/\\\\\\\"\\uCAFE\\uBABE\\uAB98\\uFCDE\\ubcda\\uef4A\\b\\f\\n\\r\\t`1~!@#$%^&*()_+-=[]{}|;:',./<>?\" => " +// < "; // its empty so this prints nothing +// tmpstore.Print(); +// } + + /* + std::cout<<"store.Has(\"SlackBot\") = "< connect_errs{EINVAL,EPROTONOSUPPORT,ENOCOMPATPROTO,ETERM,ENOTSOCK,EMTHREAD}; -int MMUtilities::ConnectToEndpoints(zmq::socket_t* readrep_sock, std::map &readrep_conns, int read_port_num, zmq::socket_t* write_sock, std::map &write_conns, int write_port_num, zmq::socket_t* mm_sock, std::map &mm_conns, int mm_port_num){ +int MMUtilities::ConnectToEndpoints(zmq::socket_t* readrep_sock, std::map &readrep_conns, int read_port_num, std::mutex& readrep_mtx, zmq::socket_t* write_sock, std::map &write_conns, int write_port_num, std::mutex& write_mtx, zmq::socket_t* mm_sock, std::map &mm_conns, int mm_port_num, std::mutex& mm_mtx){ // it's like UpdateConnections, but rather than connecting to specifically named endpoints, // we find all services that aren't middlemen and assume they have associated postgres client endpoints // for middlemen, we likewise find their Control service and assume they have a inter-middlemen comms point // since we connect to hidden endpoints, we need to already know their port numbers + if(readrep_sock==nullptr || write_sock==nullptr || mm_sock==nullptr){ + std::cerr<<"ConnectToEndpoints with null sockets: "<>size; int num_new_connections=0; + + std::set active_endpoints; for(int i=0;iJsonParser(ss.str()); std::string type; - std::string uuid; std::string ip; std::string store_port=""; service->Get("msg_value",type); - service->Get("uuid",uuid); service->Get("ip",ip); service->Get("remote_port",store_port); std::string tmp; bool registered=false; + active_endpoints.emplace(ip); + if(type.substr(0,9)!="middleman"){ // if this isn't a middleman, assume it's a service with a PGClient // try to connect to the standard PGClient ports @@ -75,22 +78,15 @@ int MMUtilities::ConnectToEndpoints(zmq::socket_t* readrep_sock, std::map:( - // if it fails it should set errno to say why. Now it is poor practice to use - // errno unless you know it should be checked, because its value may be set by - // intervening calls that do not indicate an actual error. But its the best we have. - // at the least we should check the currently set value represents one that - // will be set by zmq::socket_t::connect in the event of an error - errno=0; - readrep_sock->connect(tmp.c_str()); - if(errno!=0 && connect_errs.count(errno)){ - std::cerr<<"MMUtilities::ConnectToEndpoints error connecting to read socket " - <connect(tmp.c_str()); + readrep_mtx.unlock(); std::cout<<"MMUtilities::ConnectToEndpoints new connection to read socket "<connect(tmp.c_str()); - if(errno!=0 && connect_errs.count(errno)){ - std::cerr<<"MMUtilities::ConnectToEndpoints error connecting to write socket " - <connect(tmp.c_str()); // no return value but will throw instead! + write_mtx.unlock(); std::cout<<"MMUtilities::ConnectToEndpoints new connection to write socket "<connect(tmp.c_str()); - ++num_new_connections; + try { + mm_mtx.lock(); + mm_sock->connect(tmp.c_str()); + mm_mtx.unlock(); + std::cout<<"MMUtilities::ConnectToEndpoints new connection to middleman "<::iterator> to_erase; + for(std::map::iterator it=readrep_conns.begin(); it!=readrep_conns.end(); ++it){ + if(active_endpoints.count(it->first)==0){ + std::cout<<"Booting inactive endpoint "<first<second->Get("remote_port",store_port); + std::string tmp="tcp://"+ it->first + ":" + store_port; + try { + readrep_sock->disconnect(tmp.c_str()); + write_sock->disconnect(tmp.c_str()); + } catch(zmq::error_t& err){ + std::cerr<<"MMUtilities::ConnectToEndpoints error disconnecting from stale socket "<second; + to_erase.push_back(it); + } } + for(std::map::iterator it : to_erase){ + readrep_conns.erase(it); + } + */ return num_new_connections; } - +bool MMUtilities::ClearConnections(zmq::socket_t* sock, std::map &conns, std::mutex& sock_mtx){ + // prune and disconnect from all endpoints + bool ok=true; + for(std::map::iterator it=conns.begin(); it!=conns.end(); ++it){ + std::string store_port=""; + it->second->Get("remote_port",store_port); + std::string tmp="tcp://"+ it->first + ":" + store_port; + try { + sock_mtx.lock(); + sock->disconnect(tmp.c_str()); + sock_mtx.unlock(); + } catch(zmq::error_t& err){ + std::cerr<<"MMUtilities::ClearConnections error disconnecting from client "<second; + } + conns.clear(); + return ok; +} diff --git a/MMUtilities.h b/MMUtilities.h index d9d3bd7..37638fd 100644 --- a/MMUtilities.h +++ b/MMUtilities.h @@ -4,6 +4,7 @@ #include #include // for errno #include +#include using namespace ToolFramework; @@ -13,7 +14,9 @@ class MMUtilities : public DAQUtilities { MMUtilities(zmq::context_t* zmqcontext); - int ConnectToEndpoints(zmq::socket_t* readrep_sock, std::map &readrep_conns, int read_port_num, zmq::socket_t* write_sock, std::map &write_conns, int write_port_num, zmq::socket_t* mm_sock, std::map &mm_conns, int mm_port_num); ///< Add to standard ports assumed to be associated with all found services. + int ConnectToEndpoints(zmq::socket_t* readrep_sock, std::map &readrep_conns, int read_port_num, std::mutex& readrep_mtx, zmq::socket_t* write_sock, std::map &write_conns, int write_port_num, std::mutex& write_mtx, zmq::socket_t* mm_sock, std::map &mm_conns, int mm_port_num, std::mutex& mm_mtx); ///< Add to standard ports assumed to be associated with all found services. + + bool ClearConnections(zmq::socket_t* sock, std::map &conns, std::mutex& sock_mtx); private: diff --git a/Makefile b/Makefile index bf3a10e..0736393 100755 --- a/Makefile +++ b/Makefile @@ -16,8 +16,8 @@ ToolFrameworkInclude= -I $(Dependencies)/ToolFrameworkCore/include ToolDAQFrameworkLib= -L $(Dependencies)/ToolDAQFramework/lib -lDAQStore -lDAQDataModelBase -lServiceDiscovery ToolDAQFrameworkInclude= -I $(Dependencies)/ToolDAQFramework/include -#CXXFLAGS= -g -O0 -fno-omit-frame-pointer -fdiagnostics-color=always -Wno-attributes -CXXFLAGS= -fdiagnostics-color=always -Wno-attributes -O3 +#CXXFLAGS= -g -O0 -fno-omit-frame-pointer -fdiagnostics-color=always -Wno-attributes -fsanitize=address +CXXFLAGS= -g -fdiagnostics-color=always -Wno-attributes -O3 all: middleman diff --git a/MessageMiddleman/Makefile b/MessageMiddleman/Makefile new file mode 100644 index 0000000..9412078 --- /dev/null +++ b/MessageMiddleman/Makefile @@ -0,0 +1,25 @@ +# minimal middleman tester +Dependencies=/opt + +ZMQLib= -L $(Dependencies)/zeromq-4.0.7/lib -lzmq +ZMQInclude= -I $(Dependencies)/zeromq-4.0.7/include/ + +BoostLib= -L $(Dependencies)/boost_1_66_0/install/lib -lboost_date_time -lboost_serialization -lboost_iostreams +BoostInclude= -I $(Dependencies)/boost_1_66_0/install/include + +ToolDAQLib= -L $(Dependencies)/ToolDAQFramework/lib -lDAQDataModelBase -lServiceDiscovery -lDAQStore +ToolDAQInclude= -I $(Dependencies)/ToolDAQFramework/include + +ToolFrameworkLib= -L $(Dependencies)/ToolFrameworkCore/lib -lStore -lDataModelBase +ToolFrameworkInclude= -I $(Dependencies)/ToolFrameworkCore/include + +all: MessageMiddleman SendAlert ReceiveAlert + +MessageMiddleman: MessageMiddleman.cpp lib/libDAQInterface.so + g++ -O3 -Wpedantic -std=c++11 $< -o $@ -I ./include/ -L lib/ -lDAQInterface -lpthread $(ToolDAQInclude) $(ToolDAQLib) $(ToolFrameworkInclude) $(ToolFrameworkLib) $(BoostInclude) $(ZMQInclude) $(ZMQLib) $(ToolDAQLib) $(BoostLib) $(ToolDAQLib) + +SendAlert: SendAlert.cpp lib/libDAQInterface.so + g++ -O3 -Wpedantic -std=c++11 $< -o $@ -I ./include/ -L lib/ -lDAQInterface -lpthread $(ToolDAQInclude) $(ToolDAQLib) $(ToolFrameworkInclude) $(ToolFrameworkLib) $(BoostInclude) $(ZMQInclude) $(ZMQLib) $(ToolDAQLib) $(BoostLib) $(ToolDAQLib) + +ReceiveAlert: ReceiveAlert.cpp lib/libDAQInterface.so + g++ -O3 -Wpedantic -std=c++11 $< -o $@ -I ./include/ -L lib/ -lDAQInterface -lpthread $(ToolDAQInclude) $(ToolDAQLib) $(ToolFrameworkInclude) $(ToolFrameworkLib) $(BoostInclude) $(ZMQInclude) $(ZMQLib) $(ToolDAQLib) $(BoostLib) $(ToolDAQLib) diff --git a/MessageMiddleman/MessageMiddleman.cpp b/MessageMiddleman/MessageMiddleman.cpp new file mode 100644 index 0000000..38b3efd --- /dev/null +++ b/MessageMiddleman/MessageMiddleman.cpp @@ -0,0 +1,61 @@ +#include +#include +#include + +using namespace ToolFramework; + +int main(int argc, const char* argv[]){ + + if(argc<3){ + std::cout<<"usage: "<SetValue("Initialising"); //setting status message + + if(topic=="log"){ + // send logging multicast message + std::cout<<"logging '"< +#include +#include +#include +#include + +using namespace ToolFramework; + +bool running = false; +void stopSignalHandler(int _ignored){ + running = false; +} + +std::ofstream outfile; + +// class for automating functions from slowcontrol +class AutomatedFunctions { + + public: + AutomatedFunctions(DAQInterface* in_DAQ_inter){ + DAQ_inter=in_DAQ_inter; + }; + + DAQInterface* DAQ_inter; + void TestAlert_func(const char* alert, const char* payload){ + time_t now = std::chrono::high_resolution_clock::to_time_t(std::chrono::high_resolution_clock::now()); + std::string payloadstr = ""; + if(payload!=nullptr) payloadstr = payload; + std::cout<<"recevied a '"<SetValue("Initialising"); //setting status message + + std::cout<<"Registering callback function 'AutomatedFunctions::TestAlert_func' to be invoked on alert 'TestAlert'..."< alertnames{"TestAlert","RunStart","ChangeConfig","RunStop","LEDTrigger","SoftTrigger"}; + for(std::string& alert_name : alertnames){ + DAQ_inter.AlertSubscribe(alert_name, std::bind(&AutomatedFunctions::TestAlert_func, automated_functions, std::placeholders::_1, std::placeholders::_2)); + } + std::cout<<"Done"<SetValue("Ready"); + + if(verbose) std::cout<<"Registering 'Quit' button..."<SetValue(false); + if(verbose) std::cout<<"Done"<GetValue()); + + usleep(1000); + + } // end of program loop + + + std::cout<<"Application terminated"<SetValue("Terminated"); + + outfile.close(); + + return 0; + +} diff --git a/MessageMiddleman/SendAlert.cpp b/MessageMiddleman/SendAlert.cpp new file mode 100644 index 0000000..b5528c9 --- /dev/null +++ b/MessageMiddleman/SendAlert.cpp @@ -0,0 +1,25 @@ +#include +#include + +using namespace ToolFramework; + +int main(){ + + int verbose=1; + + ////////////////////////////// setup ///////////////////////////////// + + std::string Interface_configfile = "./InterfaceConfig"; + //std::string database_name = "daq"; + + std::cout<<"Constructing DAQInterface"<v_debug) std::cout<<"Opening Connection"<is_open()){ - if(verbosity>v_debug) std::cout<<"Connection already open"<is_open()){ std::cerr<<"Failed to connect to the database! Connection string was: '" <is_open()){ + if(verbosity>v_debug) std::cout<<"Connection already open"<v_debug){ std::cout<<"Closing connection"< abort return false; } @@ -174,8 +210,9 @@ bool Postgres::Query(std::string query, int nret, pqxx::result* res, pqxx::row* catch (const pqxx::broken_connection &e){ // if our connection is broken after all, disconnect, reconnect and retry if(tries==0){ - CloseConnection(); - delete conn; conn=nullptr; + CloseConnection(conn); + delete conn; + conn = OpenConnection(); continue; } else { std::cerr<<"Postgres::Query error - broken connection, failed to re-establish it"< *results, char row_or_col, std::string* err){ +bool Postgres::QueryAsStrings(pqxx::connection* conn, std::string query, std::vector *results, char row_or_col, std::string* err){ // generically run a query, without knowing how many returns are expected. // we'll need to get the results in a generic pqxx::result, and specify the number // of returned rows is >1. If there's fewer, it'll just return an empty container. pqxx::result res; - get_ok = Query(query, 2, &res, nullptr, err); + get_ok = Query(conn, query, 2, &res, nullptr, err); // if the query failed, the user didn't provide means for a return, or the query had no return, // then we have no need to parse the response and we're done. if(not get_ok || results==nullptr || res.size()==0) return get_ok; @@ -236,13 +273,13 @@ bool Postgres::QueryAsStrings(std::string query, std::vector *resul return true; } -bool Postgres::QueryAsJsons(std::string query, std::vector *results, std::string* err){ +bool Postgres::QueryAsJsons(pqxx::connection* conn, std::string query, std::vector *results, std::string* err){ // generically run a query, without knowing how many returns are expected. // we'll need to get the results in a generic pqxx::result, and specify the number // of returned rows is >1. If there's fewer, it'll just return an empty container. //printf("QueryAsJsons running '%s'\n",query.c_str()); pqxx::result res; - get_ok = Query(query, 2, &res, nullptr, err); + get_ok = Query(conn, query, 2, &res, nullptr, err); // if the query failed, the user didn't provide means for a return, or the query had no return, // then we have no need to parse the response and we're done. if(not get_ok || results==nullptr || res.size()==0) return get_ok; @@ -257,8 +294,7 @@ bool Postgres::QueryAsJsons(std::string query, std::vector *results // but to convert this into JSON, strings should be quoted: // i.e. { "field1":3, "field2":"cat", "field3":{"iam":"ajson"} } // this means we need to add enclosing quotes *only* for string fields - if((it->type()==18) || (it->type()==25) || - (it->type()==1042) || (it->type()==1043)){ + if((it->type()==18) || (it->type()==25) || (it->type()==1042) || (it->type()==1043)){ tmpval = "\""+tmpval+"\""; } tmp << "\"" << it->name() << "\":"<< tmpval; @@ -271,14 +307,14 @@ bool Postgres::QueryAsJsons(std::string query, std::vector *results return true; } -bool Postgres::Promote(int wait_seconds, std::string* err){ +bool Postgres::Promote(pqxx::connection* conn, int wait_seconds, std::string* err){ std::string promote_query = "pg_promote(TRUE,"+std::to_string(wait_seconds)+")"; // TRUE says to wait; otherwise we don't know whether the promotion succeeded. // wait is the time we wait for the promotion to succeed before aborting. - return Query(promote_query, 0, nullptr, nullptr, err); + return Query(conn, promote_query, 0, nullptr, nullptr, err); } -bool Postgres::Demote(int wait_seconds, std::string* err){ +bool Postgres::Demote(pqxx::connection* conn, int wait_seconds, std::string* err){ /* FIXME implement using repmgr or pgbackrest */ // as a minimal start, we can just make the standby.signal file and issue `pg_ctl restart` // however, if there are inconsistencies in this instance and the new master, startup may fail! @@ -287,7 +323,7 @@ bool Postgres::Demote(int wait_seconds, std::string* err){ // // get db name // std::string query_string = "SELECT current_database()"; // std::string dbname; -// get_ok = ExecuteQuery(query_string, dbname); +// get_ok = ExecuteQuery(conn, query_string, dbname); // if(not get_ok){ // std::string msg = "Failed to get name of current database in Postgres::Demote!"; // std::cerr<quote_name(in); return true; @@ -444,12 +480,12 @@ bool Postgres::pqxx_quote_name(const std::string& in, std::string& out, std::str // quote values (nominally, use single quotes) // TODO error signalling here sucks; shouldn't just return empty on failure! -bool Postgres::pqxx_quote(const std::string& in, std::string& out, std::string* err){ +bool Postgres::pqxx_quote(pqxx::connection* conn, const std::string& in, std::string& out, std::string* err){ out = in; //return pqxx::nullconnection{}.quote(string); // annoyingly we can't use a null connection just to get libpqxx to quote things for us; // we must have a valid connection to a real database :/ - if(OpenConnection(err)==nullptr) return false; + if(!CheckConnection(conn, err)) return false; try { out = conn->quote(in); return true; diff --git a/Postgres.h b/Postgres.h index 61d73a9..20a4b4f 100755 --- a/Postgres.h +++ b/Postgres.h @@ -22,23 +22,28 @@ class Postgres { // open a connection to the database pqxx::connection* OpenConnection(std::string* err=nullptr); // close the connection to the database. returns success, for whatever failure implies. - bool CloseConnection(std::string* err=nullptr); + bool CloseConnection(pqxx::connection* conn, std::string* err=nullptr); + // check a connection to the database is open. if passed pointer is null, + // will try to fall back to internal connection m_conn + bool CheckConnection(pqxx::connection*& conn, std::string* err=nullptr); + // update internal connection + bool SetConnection(pqxx::connection* conn); // wrapper around exec since we handle the transaction and connection. // nret specifies the expected number of returned rows from the query. // res and row are outputs. return value is success. - bool Query(std::string query, int nret=0, pqxx::result* res=nullptr, pqxx::row* row=nullptr, std::string* err=nullptr); + bool Query(pqxx::connection* conn, std::string query, int nret=0, pqxx::result* res=nullptr, pqxx::row* row=nullptr, std::string* err=nullptr); // one that returns the value of many fields from one row as strings in a vector (row_or_col='r') // or the value of one field from many rows (row_or_col='c') - bool QueryAsStrings(std::string query, std::vector *results, char row_or_col, std::string* err=nullptr); + bool QueryAsStrings(pqxx::connection* conn, std::string query, std::vector *results, char row_or_col, std::string* err=nullptr); // one that returns rows as json strings representing maps of fieldname:value // we uhhhhh need to check this works ok with nested strings and stuff - bool QueryAsJsons(std::string query, std::vector *results, std::string* err=nullptr); + bool QueryAsJsons(pqxx::connection* conn, std::string query, std::vector *results, std::string* err=nullptr); - bool Promote(int wait_seconds=60, std::string* err=nullptr); - bool Demote(int wait_seconds=60, std::string* err=nullptr); + bool Promote(pqxx::connection* conn, int wait_seconds=60, std::string* err=nullptr); + bool Demote(pqxx::connection* conn, int wait_seconds=60, std::string* err=nullptr); private: int verbosity=1; @@ -48,7 +53,7 @@ class Postgres { int v_debug=3; std::string logmessage; int get_ok; - pqxx::connection* conn=nullptr; + pqxx::connection* m_conn=nullptr; // default connection details std::string dbname=""; @@ -73,12 +78,12 @@ class Postgres { // } template - bool ExecuteQuery(std::string query_string, Ts&&... rets){ + bool ExecuteQuery(pqxx::connection* conn, std::string query_string, Ts&&... rets){ // run an SQL query and try to pass the results // into a parameter pack. the passed arguments // must be compatible with the returned columns pqxx::row local_row; - bool success = Query(query_string, 1, nullptr, &local_row); + bool success = Query(conn, query_string, 1, nullptr, &local_row); if(not success) return false; // query failed success = ExpandRow::expand(local_row, std::forward(rets)...); @@ -162,11 +167,11 @@ class Postgres { //////// // helper function for insertions template - bool Insert(std::string tablename, std::vector &fields, std::string* err, Rest... args){ - // maybe this is redundant since OpenConnection will check is_open (against recommendations) + bool Insert(pqxx::connection*& conn, std::string tablename, std::vector &fields, std::string* err, Rest... args){ + // maybe this is redundant since CheckConnection will check is_open (against recommendations) for(int tries=0; tries<2; ++tries){ // ensure we have a connection to work with - if(OpenConnection(err)==nullptr){ + if(CheckConnection(conn, err)){ // no connection to batabase -> abort return false; } @@ -194,8 +199,10 @@ class Postgres { catch (const pqxx::broken_connection &e){ // if our connection is broken after all, disconnect, reconnect and retry if(tries==0){ - CloseConnection(); - delete conn; conn=nullptr; + CloseConnection(conn); + delete conn; + conn=nullptr; + conn=OpenConnection(); continue; } else { std::cerr<<"Postgres::Query error - broken connection, failed to re-establish it"< struct Query{ - Query(zmq::message_t& client_id_in, zmq::message_t& msg_id_in, const std::string& database_in, const std::string& query_in, uint32_t query_ok_in=0, std::string response_in="NULL"); + Query(zmq::message_t& client_id_in, zmq::message_t& msg_id_in, const std::string& database_in, const std::string& topic_in, const std::string& query_in, uint32_t query_ok_in=0, std::string response_in="NULL"); Query(const Query& in); void Print(); @@ -14,6 +14,7 @@ struct Query{ zmq::message_t message_id; std::string database; std::string query; + std::string topic; uint32_t query_ok; std::vector response; int retries; diff --git a/ReceiveSQL.cpp b/ReceiveSQL.cpp index dac7e47..d5a1443 100755 --- a/ReceiveSQL.cpp +++ b/ReceiveSQL.cpp @@ -57,11 +57,18 @@ bool ReceiveSQL::Initialise(const std::string& configfile){ bool ReceiveSQL::Execute(){ Log("ReceiveSQL Executing...",21); + auto loop_start = std::chrono::high_resolution_clock::now(); + auto last = std::chrono::high_resolution_clock::now(); // find new clients + /* now handled by background thread Log("Finding new clients",20); get_ok = FindNewClients_v2(); + timers["Finding Clients"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + */ // poll the input sockets for messages Log("Polling input sockets",20); @@ -75,32 +82,24 @@ bool ReceiveSQL::Execute(){ Log("Warning! ReceiveSQL error polling input sockets; have they closed?",0); } + timers["Poll input sockets"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + // debug print std::string pollsmsg; for(int i=0; i(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + Log("Getting Client Read Queries",20); get_ok = GetClientReadQueries(); + timers["Get Read Queries"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + + /* + // moved to thread if(am_master){ Log("Getting Client Multicast Messages",20); get_ok = GetMulticastMessages(); } + timers["Get Multicasts"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + */ + Log("Getting Middleman Checkin",20); get_ok = GetMiddlemanCheckin(); + timers["Get MM check-in"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + Log("Checking Master Status",20); get_ok = CheckMasterStatus(); + timers["Get MM status"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + if(am_master){ Log("Running Next Write Query",20); get_ok = RunNextWriteQuery(); } + timers["Run Write Query"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + Log("Running Next Read Query",20); get_ok = RunNextReadQuery(); + timers["Run Read Query"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + + /* + // now handled by ReceiveSQL::MulticastWorker, which gets invoked by Utilities and does not need to be called explicitly if(am_master){ Log("Running Next Fire-and-Forget Message",20); get_ok = RunNextMulticastMsg(); } + timers["Run Multicast"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + */ // poll the output sockets for listeners Log("Polling output sockets",20); @@ -152,14 +188,28 @@ bool ReceiveSQL::Execute(){ if(get_ok<0){ Log("Warning! ReceiveSQL error polling output sockets; have they closed?",0); } + timers["Poll output sockets"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); // send outputs Log("Sending Next Client Response",20); get_ok = SendNextReply(); + timers["Send Reply"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + Log("Sending Next Log Message",20); get_ok = SendNextMulticast(); + timers["Send Multicast"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); + Log("Broadcasting Presence",20); get_ok = BroadcastPresence(); + timers["SD Broadcast"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); // Maintenance Log("Trimming Write Queue",20); @@ -168,30 +218,42 @@ bool ReceiveSQL::Execute(){ get_ok = TrimQueue("rd_txn_queue"); Log("Trimming Ack Queue",20); get_ok = TrimQueue("response_queue"); - Log("Trimming In Multicast Deque",20); - get_ok = TrimDequeue("in_multicast_queue"); Log("Trimming Out Logging Deque",20); get_ok = TrimDequeue("out_multicast_queue"); Log("Trimming Cache",20); get_ok = TrimCache(); Log("Cleaning Up Old Cache Messages",20); get_ok = CleanupCache(); + timers["Cleanup"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); // Monitoring and Logging Log("Tracking Stats",20); if(!stats_period.is_negative()) get_ok = TrackStats(); + timers["Stats"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); // Check for any commands from remote control port Log("Checking Controls",20); get_ok = UpdateControls(); + timers["Update Controls"] = + std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last).count(); + last = std::chrono::high_resolution_clock::now(); Log("Loop Iteration Done",20); auto loop_end = std::chrono::high_resolution_clock::now(); // tracking loop rate int loop_ms = std::chrono::duration_cast(loop_end - loop_start).count(); - if(loop_ms>3000){ + if(loop_ms>100){ Log("Warning: Middleman Execute took "+std::to_string(loop_ms)+"ms!",v_warning); + std::string msg="Timers were: "; + for(auto&& atimer : timers){ + msg+= atimer.first + ": "+std::to_string(atimer.second)+" ms\n"; + } + Log(msg,v_warning); } if(loop_msmax_loop_ms) max_loop_ms=loop_ms; @@ -208,9 +270,43 @@ bool ReceiveSQL::Finalise(){ Log("Removing Discoverable Services",3); if(utilities) utilities->RemoveService("middleman"); - Log("Closing multicast socket",3); - close(log_socket); - close(mon_socket); + Log("Killing FindClients thread",3); + // signal background thread connecting to new clients to stop + m_util.KillThread(findclients_args); + + // signal background receiver thread to stop + Log("Telling multicast listener thread to stop",3); + multicast_listener_args->running=false; + m_util.KillThread(multicast_listener_args); + + // signal background workers to stop + Log("Telling multicast worker threads to stop",3); + for(MulticastWorker_args* thread_args : multicast_worker_args){ + thread_args->running=false; + } + // wait for it to finish processing any multicast queries in its buffer + // it'll close the socket and tell us when it's done + int secs_to_wait=60; + Log("Waiting for multicast workers to finish...",3); + for(int i=0; ifinished){ + all_done=false; + break; + } + } + if(all_done) break; + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + Log("Killing multicast worker threads",3); + for(MulticastWorker_args* thread_args : multicast_worker_args){ + m_util.KillThread(thread_args); + delete thread_args; + thread_args=0; + } + multicast_worker_args.clear(); Log("Deleting Utilities",3); if(utilities){ delete utilities; utilities=nullptr; } @@ -218,17 +314,14 @@ bool ReceiveSQL::Finalise(){ Log("Deleting ServiceDiscovery",3); if(service_discovery){ delete service_discovery; service_discovery=nullptr; } - Log("Clearing known connections",3); - clt_rtr_connections.clear(); - mm_rcv_connections.clear(); - clt_sub_connections.clear(); - // delete sockets Log("Deleting sockets",3); if(clt_sub_socket){ delete clt_sub_socket; clt_sub_socket=nullptr; } if(clt_rtr_socket){ delete clt_rtr_socket; clt_rtr_socket=nullptr; } if(mm_rcv_socket) { delete mm_rcv_socket; mm_rcv_socket=nullptr; } if(mm_snd_socket) { delete mm_snd_socket; mm_snd_socket=nullptr; } + close(mon_socket); + close(log_socket); // delete zmq context // clear all message queues @@ -237,7 +330,8 @@ bool ReceiveSQL::Finalise(){ rd_txn_queue.clear(); resp_queue.clear(); cache.clear(); - in_multicast_queue.clear(); + in_log_queue.clear(); + in_mon_queue.clear(); out_multicast_queue.clear(); Log("Deleting context",3); @@ -283,7 +377,6 @@ bool ReceiveSQL::InitPostgres(Store& m_variables, const std::string& dbname){ // ########################################################################## // pass connection details to the postgres interface class - Postgres& m_database = m_databases[dbname]; m_database.Init(dbhostname, dbhostaddr, dbport, @@ -291,11 +384,13 @@ bool ReceiveSQL::InitPostgres(Store& m_variables, const std::string& dbname){ dbpasswd, dbname); - // try to open a connection to ensure we can do, or else bail out now. - if(!m_database.OpenConnection()){ + // try to open a connection to ensure we can do, or else bail out now + pqxx::connection* conn = m_database.OpenConnection(); + if(conn==nullptr){ Log(Concat("Error! Failed to open connection to the ",dbname," database!"),0); return false; } + m_database.SetConnection(conn); // yeah this seems circular, urgh. return true; } @@ -314,6 +409,10 @@ bool ReceiveSQL::InitMulticast(Store& m_variables){ m_variables.Get("log_port",log_port); m_variables.Get("mon_port",mon_port); m_variables.Get("multicast_address",multicast_address); + int num_multicast_threads=10; + m_variables.Get("num_multicast_threads",num_multicast_threads); + m_variables.Get("batch_size",batch_size); + m_variables.Get("max_hold_ms",max_hold_ms); // set up multicast socket for sending logging & monitoring data log_socket = socket(AF_INET, SOCK_DGRAM, 0); @@ -348,6 +447,7 @@ bool ReceiveSQL::InitMulticast(Store& m_variables){ get_ok = fcntl(log_socket, F_SETFL, O_NONBLOCK); get_ok = get_ok || fcntl(mon_socket, F_SETFL, O_NONBLOCK); if(get_ok!=0){ + // maybe this is not essential..? XXX Log(std::string{"Failed to set multicast socket to non-blocking with error "}+strerror(errno),v_error); return false; } @@ -370,7 +470,7 @@ bool ReceiveSQL::InitMulticast(Store& m_variables){ // used in send & receive; will be the same for both log & mon multicast_addrlen = sizeof(log_addr); - + /* // for two-way comms, we should bind to INADDR_ANY, not a specific multicast address.... maybe? struct sockaddr_in multicast_addr2; @@ -403,9 +503,56 @@ bool ReceiveSQL::InitMulticast(Store& m_variables){ return false; } - // we can poll with zmq... - in_polls.emplace_back(zmq::pollitem_t{NULL, log_socket, ZMQ_POLLIN, 0}); - in_polls.emplace_back(zmq::pollitem_t{NULL, mon_socket, ZMQ_POLLIN, 0}); + multicast_listener_args = new MulticastReceive_args; + multicast_listener_args->type=multicast_type::log; + multicast_listener_args->parent = this; + multicast_listener_args->running = true; + multicast_listener_args->finished = false; + multicast_listener_args->socket = log_socket; + multicast_listener_args->addr = &log_addr; + multicast_listener_args->addrlen = multicast_addrlen; + multicast_listener_args->poll = zmq::pollitem_t{NULL, log_socket, ZMQ_POLLIN, 0}; + m_util.CreateThread("log_receiver", &GetMulticastMessages, multicast_listener_args); + + multicast_listener_args = new MulticastReceive_args; + multicast_listener_args->type=multicast_type::mon; + multicast_listener_args->parent = this; + multicast_listener_args->running = true; + multicast_listener_args->finished = false; + multicast_listener_args->socket = mon_socket; + multicast_listener_args->addr = &mon_addr; + multicast_listener_args->addrlen = multicast_addrlen; + multicast_listener_args->poll = zmq::pollitem_t{NULL, mon_socket, ZMQ_POLLIN, 0}; + m_util.CreateThread("mon_receiver", &GetMulticastMessages, multicast_listener_args); + + // the majority of our messages come via multicast, and each may be quite large, + // with the corresponding queries taking a while (order seconds). + // We need to make sure these don't delay responses on Read/Write sockets + // which have a timeout, so let's handle multicasts in a separate thread + // FIXME take number of threads for each independently rather than assuming 50:50 split + for(int i=0; isetsockopt(ZMQ_LINGER, 10); clt_sub_socket->setsockopt(ZMQ_SUBSCRIBE,"",0); + clt_sub_socket->setsockopt(ZMQ_RCVHWM,10000); + clt_sub_socket->setsockopt(ZMQ_BACKLOG,1000); // we will connect this socket to clients with the utilities class } @@ -508,6 +657,8 @@ bool ReceiveSQL::InitZMQ(Store& m_variables){ clt_rtr_socket = new zmq::socket_t(*context, ZMQ_ROUTER); clt_rtr_socket->setsockopt(ZMQ_SNDTIMEO, clt_rtr_socket_timeout); clt_rtr_socket->setsockopt(ZMQ_RCVTIMEO, clt_rtr_socket_timeout); + clt_rtr_socket->setsockopt(ZMQ_RCVHWM,10000); + clt_rtr_socket->setsockopt(ZMQ_BACKLOG,1000); // make router transfer connections with an already seen ZMQ_IDENTITY to a new connection // rather than rejecting the new connection attempt. /* @@ -644,6 +795,14 @@ bool ReceiveSQL::InitServiceDiscovery(Store& m_variables){ // note that it is not necessary to register the RemoteControl service, // this is automatically done by the ServiceDiscovery class. + + // this will be done in a bg thread + findclients_args = new FindClients_args; + findclients_args->parent = this; + findclients_args->utilities = utilities; + + m_util.CreateThread("findclients", &FindNewClients_v2, findclients_args); + return true; } @@ -684,6 +843,9 @@ bool ReceiveSQL::InitControls(Store& m_variables){ SC_vars.Add("Clients",SlowControlElementType(INFO)); SC_vars["Clients"]->SetValue("None"); + SC_vars.Add("ClearConnections",SlowControlElementType(BUTTON)); + SC_vars["ClearConnections"]->SetValue(false); + // the SlowControlCollection thread listens for requests to get or set the registered controls, // and responds with or updates its internal control values. Note that (for now) it does not // take any action to effect the state of things outside it, so we will need to manually keep @@ -775,6 +937,7 @@ bool ReceiveSQL::InitMessaging(Store& m_variables){ // ««-------------- ≪ °◇◆◇° ≫ --------------»» +/* bool ReceiveSQL::FindNewClients(){ int new_connections=0; @@ -801,37 +964,36 @@ bool ReceiveSQL::FindNewClients(){ Log("No new clients found",21); } - /* needs fixing to uncomment - std::cout<<"We have: "<Get("msg_value",service); - std::cout<(arg); + ReceiveSQL& p = *m_args->parent; - int clt_rtr_conns = clt_rtr_connections.size(); - int clt_sub_conns = clt_sub_connections.size(); - int mm_conns = mm_rcv_connections.size(); + int clt_rtr_conns = m_args->clt_rtr_connections.size(); + int clt_sub_conns = m_args->clt_sub_connections.size(); + int mm_conns = m_args->mm_rcv_connections.size(); - int new_connections = utilities->ConnectToEndpoints(clt_rtr_socket, clt_rtr_connections, clt_rtr_port, clt_sub_socket, clt_sub_connections, clt_sub_port, mm_rcv_socket, mm_rcv_connections, mm_rcv_port); + p.Log("checking for new clients",22); + int new_connections = m_args->utilities->ConnectToEndpoints(p.clt_rtr_socket, m_args->clt_rtr_connections, p.clt_rtr_port, p.clt_rtr_mtx, + p.clt_sub_socket, m_args->clt_sub_connections, p.clt_sub_port, p.clt_sub_mtx, + p.mm_rcv_socket, m_args->mm_rcv_connections, p.mm_rcv_port, p.mm_rcv_mtx); if(new_connections>0){ - Log("Made "+std::to_string(new_connections)+" new connections!",2); - Log("Made "+std::to_string(clt_rtr_connections.size()-clt_rtr_conns) + p.Log("Made "+std::to_string(new_connections)+" new connections!",2); + p.Log("Made "+std::to_string(m_args->clt_rtr_connections.size()-clt_rtr_conns) +" new read/reply socket connections",3); - Log("Made "+std::to_string(clt_sub_connections.size()-clt_sub_conns) + p.Log("Made "+std::to_string(m_args->clt_sub_connections.size()-clt_sub_conns) +" new write socket connections",3); - Log("Made "+std::to_string(mm_rcv_connections.size()-mm_conns) + p.Log("Made "+std::to_string(m_args->mm_rcv_connections.size()-mm_conns) +" new middleman socket connections",3); + // FIXME only print out new elements /* for(auto&& acon : clt_rtr_connections){ @@ -848,10 +1010,10 @@ bool ReceiveSQL::FindNewClients_v2(){ // update the list of clients so they can be queried std::map clientsmap; - for(std::pair& aservice : clt_rtr_connections){ + for(std::pair& aservice : m_args->clt_rtr_connections){ clientsmap.emplace(aservice.first,"R"); } - for(std::pair& aservice : clt_sub_connections){ + for(std::pair& aservice : m_args->clt_sub_connections){ if(!clientsmap.count(aservice.first)){ clientsmap.emplace(aservice.first,"W"); } else { @@ -860,19 +1022,22 @@ bool ReceiveSQL::FindNewClients_v2(){ } std::string clientlist; for(std::pair& aclient : clientsmap){ - clientlist += aclient.first+": "+aclient.second+","; + if(!clientlist.empty()) clientlist+=", "; + clientlist += aclient.first+": "+aclient.second; } if(clientlist.size()>0){ // if middleman is only connection, client list will be empty clientlist.pop_back(); // remove trailing ',' - SC_vars["Clients"]->SetValue(clientlist); + p.SC_vars["Clients"]->SetValue(clientlist); } } else { - Log("No new clients found",21); + p.Log("No new clients found",22); } - return true; + usleep(100000); + + return; } // ««-------------- ≪ °◇◆◇° ≫ --------------»» @@ -880,14 +1045,16 @@ bool ReceiveSQL::FindNewClients_v2(){ bool ReceiveSQL::GetClientWriteQueries(){ // see if we had any write requests from clients - if(in_polls.at(4).revents & ZMQ_POLLIN){ + if(in_polls.at(2).revents & ZMQ_POLLIN){ Log(">>> got a write query from client",3); ++write_queries_recvd; // we did. receive next message. std::vector outputs; + clt_sub_mtx.lock(); get_ok = Receive(clt_sub_socket, outputs); + clt_sub_mtx.unlock(); if(not get_ok){ Log(Concat("error receiving part ",outputs.size()+1," of Write query from client"),1); @@ -994,7 +1161,7 @@ bool ReceiveSQL::GetClientWriteQueries(){ } // construct a Query object to encapsulate the query and enqueue it. - Query qry{outputs.at(1-o),outputs.at(2-o), database, query}; + Query qry{outputs.at(1-o),outputs.at(2-o), database, topic, query}; Log(Concat("QUERY WAS: '",qry.query,"'"),4); wrt_txn_queue.emplace(key, qry); @@ -1014,8 +1181,6 @@ std::cout<<"no write queries at input port"<>> got a read query from client",3); ++read_queries_recvd; // We do. receive the next query std::vector outputs; + clt_rtr_mtx.lock(); get_ok = Receive(clt_rtr_socket, outputs); + clt_rtr_mtx.unlock(); if(not get_ok){ Log(Concat("error receiving part ",outputs.size()+1," of Read query from client"),1); @@ -1430,7 +1597,7 @@ bool ReceiveSQL::GetClientReadQueries(){ } // construct a Query object to encapsulate the query and enqueue it. - Query qry{outputs.at(0),outputs.at(2), database, query}; + Query qry{outputs.at(0),outputs.at(2), database, topic, query}; rd_txn_queue.emplace(key, qry); } // else we've already got this message queued, ignore it. @@ -1461,8 +1628,6 @@ bool ReceiveSQL::ReadQueryToQuery(const std::string& message, BStore& request, s // ««-------------- ≪ °◇◆◇° ≫ --------------»» bool ReceiveSQL::ReadDeviceConfigToQuery(const std::string& message, BStore& request, std::string& db_out, std::string& sql_out){ - db_out = "daq"; - Postgres& a_database = m_databases.at(db_out); // get a new device config entry std::string device; @@ -1476,7 +1641,7 @@ bool ReceiveSQL::ReadDeviceConfigToQuery(const std::string& message, BStore& req int version = *reinterpret_cast(&version_u); // SQL sanitization - get_ok = a_database.pqxx_quote(device, device); + get_ok = m_database.pqxx_quote(nullptr, device, device); if(!get_ok){ Log("ReadDeviceConfigToQuery: error quoting fields in message '"+message+"'",v_error); return false; @@ -1494,7 +1659,12 @@ bool ReceiveSQL::ReadDeviceConfigToQuery(const std::string& message, BStore& req + device + " AND version=" + versionstring+";"; - Log(Concat("Resulting SQL: '",sql_out,"', database: '",db_out,"'"),4); + Log(Concat("Resulting SQL: '",sql_out,"', database: '",db_out,"'"),12); + + unsigned long timestamp = time(nullptr); + std::string timestring; + TimeStringFromUnixSec(timestamp, timestring); + Log(Concat("Received config request '",sql_out,"' received at ",timestring),0); return true; } @@ -1502,8 +1672,6 @@ bool ReceiveSQL::ReadDeviceConfigToQuery(const std::string& message, BStore& req // ««-------------- ≪ °◇◆◇° ≫ --------------»» bool ReceiveSQL::ReadRunConfigToQuery(const std::string& message, BStore& request, std::string& db_out, std::string& sql_out){ - db_out = "daq"; - Postgres& a_database = m_databases.at(db_out); // get a run config entry // the configurations table has fields 'config_id', 'name' and 'version' amongst others. @@ -1537,7 +1705,7 @@ bool ReceiveSQL::ReadRunConfigToQuery(const std::string& message, BStore& reques } // SQL sanitization - get_ok = a_database.pqxx_quote(config_name, config_name); + get_ok = m_database.pqxx_quote(nullptr, config_name, config_name); if(!get_ok){ Log("ReadRunConfigToQuery: error quoting fields in message '"+message+"'",v_error); return false; @@ -1557,7 +1725,7 @@ bool ReceiveSQL::ReadRunConfigToQuery(const std::string& message, BStore& reques } - Log(Concat("Resulting SQL: '",sql_out,"', database: '",db_out,"'"),4); + Log(Concat("Resulting SQL: '",sql_out,"', database: '",db_out,"'"),12); return true; } @@ -1565,8 +1733,6 @@ bool ReceiveSQL::ReadRunConfigToQuery(const std::string& message, BStore& reques // ««-------------- ≪ °◇◆◇° ≫ --------------»» bool ReceiveSQL::ReadCalibrationToQuery(const std::string& message, BStore& request, std::string& db_out, std::string& sql_out){ - db_out = "daq"; - Postgres& a_database = m_databases.at(db_out); // get a calibration data entry std::string device; @@ -1579,7 +1745,7 @@ bool ReceiveSQL::ReadCalibrationToQuery(const std::string& message, BStore& requ } // SQL sanitization - get_ok = a_database.pqxx_quote(device, device); + get_ok = m_database.pqxx_quote(nullptr, device, device); if(!get_ok){ Log("ReadCalibrationToQuery: error quoting fields in message '"+message+"'",v_error); return false; @@ -1598,7 +1764,7 @@ bool ReceiveSQL::ReadCalibrationToQuery(const std::string& message, BStore& requ + device + " AND version=" + versionstring+";"; - Log(Concat("Resulting SQL: '",sql_out,"', database: '",db_out,"'"),4); + Log(Concat("Resulting SQL: '",sql_out,"', database: '",db_out,"'"),12); return true; @@ -1607,8 +1773,6 @@ bool ReceiveSQL::ReadCalibrationToQuery(const std::string& message, BStore& requ // ««-------------- ≪ °◇◆◇° ≫ --------------»» bool ReceiveSQL::ReadRootPlotToQuery(const std::string& message, BStore& request, std::string& db_out, std::string& sql_out){ - db_out = "daq"; - Postgres& a_database = m_databases.at(db_out); // get a ROOT plot entry std::string plot_name; @@ -1621,7 +1785,7 @@ bool ReceiveSQL::ReadRootPlotToQuery(const std::string& message, BStore& request } // SQL sanitization - get_ok = a_database.pqxx_quote(plot_name, plot_name); + get_ok = m_database.pqxx_quote(nullptr, plot_name, plot_name); if(!get_ok){ Log("ReadRootPlotToQuery: error quoting fields in message '"+message+"'",v_error); return false; @@ -1636,7 +1800,7 @@ bool ReceiveSQL::ReadRootPlotToQuery(const std::string& message, BStore& request sql_out += " AND version=" + std::to_string(version); } - Log(Concat("Resulting SQL: '",sql_out,"', database: '",db_out,"'"),4); + Log(Concat("Resulting SQL: '",sql_out,"', database: '",db_out,"'"),40); return true; @@ -1645,8 +1809,6 @@ bool ReceiveSQL::ReadRootPlotToQuery(const std::string& message, BStore& request // ««-------------- ≪ °◇◆◇° ≫ --------------»» bool ReceiveSQL::ReadPlotlyPlotToQuery(const std::string& message, BStore& request, std::string& db_out, std::string& sql_out) { - db_out = "daq"; - Postgres& a_database = m_databases.at(db_out); std::string name; uint64_t version_u = -1; @@ -1658,7 +1820,7 @@ bool ReceiveSQL::ReadPlotlyPlotToQuery(const std::string& message, BStore& reque } // SQL sanitization - get_ok = a_database.pqxx_quote(name, name); + get_ok = m_database.pqxx_quote(nullptr, name, name); if (!get_ok) { Log("ReadPlotlyPlotToQuery: error quoting fields in message '" + message + "'", v_error); return false; @@ -1713,93 +1875,90 @@ bool ReceiveSQL::ReadMessageToQuery(const std::string& topic, const std::string& // ««-------------- ≪ °◇◆◇° ≫ --------------»» -bool ReceiveSQL::GetMulticastMessages(){ +void ReceiveSQL::GetMulticastMessages(Thread_args* arg){ // check for incoming message + MulticastReceive_args* m_args=reinterpret_cast(arg); + ReceiveSQL& p = *m_args->parent; + + if(m_args->finished){ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // throttler.... probably unneeded + return; + } + + // check if parent has told us to finalise + if(!m_args->running){ + // parent has told us to finalise + m_args->finished=true; + return; + } + + // poll multicast socket + zmq::poll(&m_args->poll, 1, 500); // see if we had any multicast messages - // logging and monitoring come on different sockets (different ports) - // but code to handle them is the same - std::vector sockets{log_socket,mon_socket}; - for(size_t i=0; i<2; ++i){ - if(in_polls.at(i).revents & ZMQ_POLLIN){ - Log(">>> got a multicast message from client",10); - (i==0) ? ++logs_recvd : ++mons_recvd; - int multicast_socket = sockets[i]; - struct sockaddr_in* multicast_addr = (i==0) ? &log_addr : &mon_addr; - - // read the messge - //char buf[655355]; // theoretical maximum UDP buffer size. move to member - int cnt = recvfrom(multicast_socket, buf, sizeof(buf), 0, (struct sockaddr*)multicast_addr, &multicast_addrlen); - if(cnt <= 0){ - Log(std::string{"Failed to receive on multicast socket with error '"}+strerror(errno)+"'",v_error); - (i==0) ? ++log_recv_fails : ++mon_recv_fails; - return false; - } - - Log("Received multicast message from "+std::string{inet_ntoa(multicast_addr->sin_addr)} - +": '"+std::string{buf}+"'",12); - - std::string database; - std::string query; - std::string topic= (i==0) ? "logging" : "monitoring"; - get_ok = MulticastMessageToQuery(buf, topic, database, query); + if(m_args->poll.revents & ZMQ_POLLIN){ + p.Log(">>> got a multicast message from client",10); + (m_args->type==multicast_type::log) ? ++p.logs_recvd : ++p.mons_recvd; + + // read the messge + m_args->get_ok = recvfrom(m_args->socket, m_args->message, sizeof(m_args->message), 0, (struct sockaddr*)m_args->addr, &m_args->addrlen); + if(m_args->get_ok <= 0){ + p.Log(std::string{"Failed to receive on multicast socket with error '"}+strerror(errno)+"'",p.v_error); + (m_args->type==multicast_type::log) ? ++p.log_recv_fails : ++p.mon_recv_fails; + } else { - if(!get_ok){ - (i==0) ? ++log_recv_fails : ++mon_recv_fails; - return false; - } + p.Log("Received multicast message from "+std::string{inet_ntoa(m_args->addr->sin_addr)}+": '"+std::string{m_args->message}+"'",12); - // FIXME for now all messages go to daq database, - // probably need to make this a pair at least with first element a DB connection or name - if(topic=="logging" || topic=="monitoring" || topic=="rootplot"){ - in_multicast_queue.emplace_back(query); - Log("Put "+topic+" msg in queue: '"+query+"'",12); - + if(m_args->type==multicast_type::log){ + p.in_log_queue_mtx.lock(); + p.in_log_queue.emplace_back(m_args->message); + p.in_log_queue_mtx.unlock(); } else { - // could not determine multicast type - Log(std::string{"Unrecognised topic '"}+topic+"' in multicast message '"+buf+"'",v_error); - (i==0) ? ++log_recv_fails : ++mon_recv_fails; - return false; - + p.in_mon_queue_mtx.lock(); + p.in_mon_queue.emplace_back(m_args->message); + p.in_mon_queue_mtx.unlock(); } + p.Log("Put multicast msg in queue: '"+std::string(m_args->message)+"'",12); - } /*else { std::cout<<"no multicast messages"<type==multicast_type::log) m_args->get_ok = p.TrimVector("in_log_queue"); + else m_args->get_ok = p.TrimVector("in_mon_queue"); + + return; } // ««-------------- ≪ °◇◆◇° ≫ --------------»» -// FIXME refactor to make code DRY -bool ReceiveSQL::MulticastMessageToQuery(const std::string& message, std::string& topic_out, std::string& db_out, std::string& sql_out){ - Log(Concat("Forming SQL for logging message: '",message,"'"),12); +// FIXME refactor to make code DRY, FIXME split logging and mon methods as we can...? +bool ReceiveSQL::MulticastMessageToQuery(pqxx::connection* conn, const std::string& message, std::string& sql_out, BStore& tmp, JSONP& jsonp){ + Log(Concat("Forming SQL for multicast message: '",message,"'"),12); // write queries received on the pub port are JSON messages that we need to convert to SQL. - BStore tmp; - get_ok = parser.Parse(message, tmp); - if(!get_ok){ + tmp.Delete(); + int ok = jsonp.Parse(message, tmp); + if(!ok){ Log("MulticastMessageToQuery error parsing message json '"+message+"'",v_error); - (topic_out=="logging") ? ++log_recv_fails : mon_recv_fails; return false; } // 'logging'/'monitoring' topics are redundant now as they arrive on different ports // however topic field provides other, more specific values so we still need it - get_ok = tmp.Get("topic",topic_out); - if(!get_ok){ - (topic_out=="logging") ? ++log_recv_fails : mon_recv_fails; + std::string topic; + ok = tmp.Get("topic",topic); + if(!ok){ + (topic=="logging") ? ++log_recv_fails : ++mon_recv_fails; Log("MulticastMessageToQuery error, no topic in message '"+message+"'",v_error); return false; } // the JSON fields depend on the kind of data. Unlike writes we have no topic here, // so we just need to use the JSON keys to identify what kind of data this is. - if(topic_out=="logging"){ - - db_out = "daq"; - Postgres& a_database = m_databases.at(db_out); + if(topic=="logging"){ // logging message std::string device; @@ -1807,20 +1966,20 @@ bool ReceiveSQL::MulticastMessageToQuery(const std::string& message, std::string uint64_t timestamp{0}; uint64_t severity; std::string msg; - get_ok = tmp.Get("time",timestamp); // optional - get_ok = tmp.Get("device",device); - get_ok &= tmp.Get("severity",severity); - get_ok &= tmp.Get("message",msg); - if(!get_ok){ + ok = tmp.Get("time",timestamp); // optional + ok = tmp.Get("device",device); + ok &= tmp.Get("severity",severity); + ok &= tmp.Get("message",msg); + if(!ok){ Log("MulticastMessageToQuery: missing fields in message '"+message+"'",v_error); ++log_recv_fails; return false; } // SQL sanitization - get_ok = a_database.pqxx_quote(device, device); - get_ok &= a_database.pqxx_quote(msg, msg); - if(!get_ok){ + ok = m_database.pqxx_quote(conn, device, device); + ok &= m_database.pqxx_quote(conn, msg, msg); + if(!ok){ Log("MulticastMessageToQuery: error quoting fields in message '"+message+"'",v_error); ++log_recv_fails; return false; @@ -1830,24 +1989,22 @@ bool ReceiveSQL::MulticastMessageToQuery(const std::string& message, std::string // build an ISO 8601 timestamp ("2015-10-02 11:16:34+0100") // (the trailing "+0100" is number of [hours][mins] in local timezone relative to UTC) std::string timestring; - get_ok = TimeStringFromUnixMs(timestamp, timestring); - if(!get_ok) timestring="now()"; // since multicast doesn't propagate back an error, assume now + ok = TimeStringFromUnixMs(timestamp, timestring); + if(!ok) timestring="now()"; // since multicast doesn't propagate back an error, assume now // form into a suitable SQL query - sql_out = "INSERT INTO logging ( time, device, severity, message ) VALUES ( '" + //sql_out = "INSERT INTO logging ( time, device, severity, message ) VALUES ( '" + sql_out = "( '" // XXX PARTIAL SQL FOR BATCH INSERTION!!! + timestring + "'," + device + "," + std::to_string(severity) + "," - + msg + ");"; + + msg + ")"; - Log(Concat("Resulting SQL: '",sql_out,"', database: '",db_out,"', topic: ",topic_out),12); + Log(Concat("Resulting SQL: '",sql_out,"', topic: ",topic),100); return true; - } else if(topic_out=="monitoring"){ - - db_out = "daq"; - Postgres& a_database = m_databases.at(db_out); + } else if(topic=="monitoring"){ // monitoring data std::string device; @@ -1856,44 +2013,42 @@ bool ReceiveSQL::MulticastMessageToQuery(const std::string& message, std::string uint64_t timestamp{0}; std::string data; tmp.Get("time",timestamp); // optional - get_ok = tmp.Get("device",device); - get_ok &= tmp.Get("subject",subject); - get_ok &= tmp.Get("data",data); - if(!get_ok){ + ok = tmp.Get("device",device); + ok &= tmp.Get("subject",subject); + ok &= tmp.Get("data",data); + if(!ok){ Log("MulticastMessageToQuery: missing fields in message '"+message+"'",v_error); ++mon_recv_fails; return false; } // SQL sanitization - get_ok = a_database.pqxx_quote(device, device); - get_ok &= a_database.pqxx_quote(subject, subject); - get_ok &= a_database.pqxx_quote(data, data); - if(!get_ok){ + ok = m_database.pqxx_quote(conn, device, device); + ok &= m_database.pqxx_quote(conn, subject, subject); + ok &= m_database.pqxx_quote(conn, data, data); + if(!ok){ Log("MulticastMessageToQuery: error quoting fields in message '"+message+"'",v_error); ++mon_recv_fails; return false; } std::string timestring; - get_ok = TimeStringFromUnixMs(timestamp, timestring); - if(!get_ok) timestring="now()"; // since multicast doesn't propagate back an error, assume now + ok = TimeStringFromUnixMs(timestamp, timestring); + if(!ok) timestring="now()"; // since multicast doesn't propagate back an error, assume now // form into a suitable SQL query - sql_out = "INSERT INTO monitoring ( time, device, subject, data ) VALUES ( '" + //sql_out = "INSERT INTO monitoring ( time, device, subject, data ) VALUES ( '" + sql_out = "( '" // XXX PARTIAL SQL FOR BATCH INSERTION!!! + timestring + "'," + device + "," + subject + "," - + data + ");"; + + data + ")"; - Log(Concat("Resulting SQL: '",sql_out,"', database: '",db_out,"', topic: ",topic_out),12); + Log(Concat("Resulting SQL: '",sql_out,"', topic: ",topic),50); return true; - } else if(topic_out=="rootplot"){ - - db_out = "daq"; // FIXME move to monitoring db - Postgres& a_database = m_databases.at(db_out); + } else if(topic=="rootplot"){ // root plot in json std::string plot_name; @@ -1902,28 +2057,28 @@ bool ReceiveSQL::MulticastMessageToQuery(const std::string& message, std::string std::string data; std::string draw_options; tmp.Get("time",timestamp); // optional - get_ok = tmp.Get("plot_name",plot_name); - get_ok &= tmp.Get("data",data); - get_ok &= tmp.Get("draw_options",draw_options); - if(!get_ok){ + ok = tmp.Get("plot_name",plot_name); + ok &= tmp.Get("data",data); + ok &= tmp.Get("draw_options",draw_options); + if(!ok){ Log("MulticastMessageToQuery: missing fields in message '"+message+"'",v_error); ++mon_recv_fails; return false; } // SQL sanitization - get_ok = a_database.pqxx_quote(plot_name, plot_name); - get_ok &= a_database.pqxx_quote(draw_options, draw_options); - get_ok &= a_database.pqxx_quote(data, data); - if(!get_ok){ + ok = m_database.pqxx_quote(conn, plot_name, plot_name); + ok &= m_database.pqxx_quote(conn, draw_options, draw_options); + ok &= m_database.pqxx_quote(conn, data, data); + if(!ok){ Log("MulticastMessageToQuery: error quoting fields in message '"+message+"'",v_error); ++mon_recv_fails; return false; } std::string timestring; - get_ok = TimeStringFromUnixMs(timestamp, timestring); - if(!get_ok) timestring="now()"; // since multicast doesn't propagate back an error, assume now + ok = TimeStringFromUnixMs(timestamp, timestring); + if(!ok) timestring="now()"; // since multicast doesn't propagate back an error, assume now // form into a suitable SQL query // FIXME this needs to insert into a temporary root plots table @@ -1934,7 +2089,7 @@ bool ReceiveSQL::MulticastMessageToQuery(const std::string& message, std::string + draw_options + "," + data + ");"; - Log(Concat("Resulting SQL: '",sql_out,"', database: '",db_out,"', topic: ",topic_out),12); + Log(Concat("Resulting SQL: '",sql_out,"', topic: ",topic),50); return true; @@ -1963,11 +2118,13 @@ bool ReceiveSQL::GetMiddlemanCheckin(){ // keep reading from the SUB socket until there are no more waiting messages. // it's important we don't take any action until we've read out everything, // to ensure we don't start negotiation based on old, stale requests. - while(in_polls.at(3).revents & ZMQ_POLLIN){ + while(in_polls.at(1).revents & ZMQ_POLLIN){ // We do. Receive it. std::vector outputs; + mm_rcv_mtx.lock(); get_ok = Receive(mm_rcv_socket, outputs); + mm_rcv_mtx.unlock(); if(not get_ok){ Log(Concat("error receiving message part ",outputs.size()+1," of message from middleman"),0); ++mm_broadcast_recv_fails; // FIXME this includes negotiation requests and our own broadcasts @@ -2055,7 +2212,7 @@ bool ReceiveSQL::GetMiddlemanCheckin(){ // but perhaps this message is stale: // re-poll the socket and see if there is another message in the buffer try { - get_ok = zmq::poll(&in_polls.at(3), 1, 0); + get_ok = zmq::poll(&in_polls.at(1), 1, 0); } catch (zmq::error_t& err){ std::cerr<<"ReceiveSQL::GetMiddlemanCheckin poller caught "<second; - std::string& db = next_msg.database; - if(m_databases.count(db)==0){ - Log("Write query to unknown database '"+db+"'",v_error); - return false; - } std::string err; - next_msg.query_ok = m_databases.at(db).QueryAsJsons(next_msg.query, &next_msg.response, &err); + next_msg.query_ok = m_database.QueryAsJsons(nullptr, next_msg.query, &next_msg.response, &err); if(not next_msg.query_ok){ - Log(Concat("Write query failed! Query was: \"",next_msg.query,"\", error was: '",err,"'"),1); + Log(Concat("Write query failed! Query was: \"",next_msg.query,"\", error was: '",err,"'"),0); ++write_queries_failed; next_msg.response = std::vector{err}; } @@ -2158,18 +2310,23 @@ bool ReceiveSQL::RunNextWriteQuery(){ bool ReceiveSQL::RunNextReadQuery(){ // run our next postgres query, if we have one - if(rd_txn_queue.size()){ + //if(rd_txn_queue.size()){ + while(rd_txn_queue.size()){ Query& next_msg = rd_txn_queue.begin()->second; - std::string& db = next_msg.database; - if(m_databases.count(db)==0){ - Log("Read query to unknown database '"+db+"'",v_error); - return false; - } std::string err; - next_msg.query_ok = m_databases.at(db).QueryAsJsons(next_msg.query, &next_msg.response, &err); + next_msg.query_ok = m_database.QueryAsJsons(nullptr, next_msg.query, &next_msg.response, &err); + + // XXX debug + if(next_msg.topic=="DEVCONFIG"){ // FINDME + unsigned long timestamp = time(nullptr); + std::string timestring; + TimeStringFromUnixSec(timestamp, timestring); + Log(Concat("Ran config request query '",next_msg.query,"' at ",timestring," result: ",next_msg.query_ok),0); + } + if(not next_msg.query_ok){ - Log(Concat("Read query failed! Query was: \"",next_msg.query,"\", error was: '",err,"'"),1); + Log(Concat("Read query failed! Query was: \"",next_msg.query,"\", error was: '",err,"'"),0); ++read_queries_failed; next_msg.response = std::vector{err}; } @@ -2186,31 +2343,100 @@ bool ReceiveSQL::RunNextReadQuery(){ // ««-------------- ≪ °◇◆◇° ≫ --------------»» -bool ReceiveSQL::RunNextMulticastMsg(){ +void ReceiveSQL::MulticastWorker(Thread_args* arg){ - // insert our next fire-and-forget message, if we have one - if(in_multicast_queue.size()){ - Log("Inserting next multicast message to DB: we have "+std::to_string(in_multicast_queue.size()) - +" messages to process",13); - - std::string next_msg = in_multicast_queue.front(); - get_ok = m_databases.at("daq").Query(next_msg); // FIXME hard-coded db name + MulticastWorker_args* m_args=reinterpret_cast(arg); + ReceiveSQL& p = *m_args->parent; + + if(m_args->finished){ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // throttler.... probably unneeded + return; + } + + // check if parent has new messages for us + m_args->in_queue_mtx->lock(); + if(m_args->in_queue->empty()){ + m_args->in_queue_mtx->unlock(); - if(not get_ok){ - // something went wrong - std::cerr<<"Error inserting logmessage '"<running){ + m_args->finished=true; } - // remove the message from the queue - in_multicast_queue.pop_front(); + // don't keep locking it too often + std::this_thread::sleep_for(std::chrono::microseconds(100)); + return; + } + + // parent has messages for us. grab the message queue, swapping to give them a new one + std::swap(*m_args->in_queue, m_args->msg_queue); + m_args->in_queue_mtx->unlock(); + + if(m_args->type==multicast_type::log) p.logs_waiting += m_args->msg_queue.size(); + else p.mons_waiting += m_args->msg_queue.size(); + p.Log("Inserting next multicast message to DB: we have "+std::to_string(m_args->msg_queue.size())+" messages to process",13); + + // loop over messages, parse each into an SQL query. + // FIXME replace with prepared statement? is that persistent enough? + for(std::string& next_msg : m_args->msg_queue){ + + m_args->get_ok = p.MulticastMessageToQuery(m_args->conn, next_msg, m_args->vals_string, m_args->tmp, m_args->jsonp); + + if(m_args->get_ok){ + if(!m_args->first_vals) m_args->query += ", "; + m_args->query += m_args->vals_string; + m_args->first_vals = false; + ++m_args->n_queries; + // FIXME errors should never silently disappear + } /*else { printf("MC parse error\n"); } */ - } // else no log messages for now + } + m_args->msg_queue.resize(0); - return true; + // see if we have enough messages for a transaction, + // or it's been long enough to force one to prevent messages sitting too long without insertion + boost::posix_time::time_duration lapse = m_args->last_insert - boost::posix_time::microsec_clock::universal_time(); + if(m_args->n_queries < p.batch_size && lapse.total_milliseconds() < p.max_hold_ms) return; + + // merge all messages and do batch insertion + m_args->query += ";"; + + //auto t_start = std::chrono::high_resolution_clock::now(); + + m_args->get_ok = p.m_database.Query(m_args->conn, m_args->query); + + /* + auto t_end = std::chrono::high_resolution_clock::now(); + if(std::chrono::duration_cast(t_end-t_start).count()>1000){ + //p.Log("Slow multicast query: '"+m_args->query+"' took "+ + p.Log("Slow multicast query took "+ + std::to_string(std::chrono::duration_cast(t_end-t_start).count())+" ms",p.v_warning); + } + */ + + if(not m_args->get_ok){ + // something went wrong + // can't use Log or we end up in a circular loop since Log is a multicast query itself. + // FIXME printouts for error not great + std::cerr<<"Error inserting logmessage '"<query<<"' into database"<type==multicast_type::log) p.log_queries_failed+=m_args->n_queries; + else p.mon_queries_failed+=m_args->n_queries; + } else { + p.Log("batch insert of "+std::to_string(m_args->n_queries)+" multicasts",2); + } + + // FIXME if the insert failed due to some transient issue (e.g. lost connection to DB) + // we should re-insert. Alternatively, if just one query was bad, we could try to de-batch + // and insert one-by-one, rather than losing the whole lot... + // FIXME FIXME or perhaps we just dump this to file? it ought to be recorded *somewhere* what failed. + // for now, we just drop them regardless of success or failure. + m_args->last_insert = boost::posix_time::microsec_clock::universal_time(); + if(m_args->type==multicast_type::log) p.logs_waiting-=m_args->n_queries; + else p.mons_waiting-=m_args->n_queries; + m_args->query = m_args->query_base; + m_args->first_vals=true; + + return; } // ««-------------- ≪ °◇◆◇° ≫ --------------»» @@ -2235,13 +2461,14 @@ bool ReceiveSQL::SendNextReply(){ std::string client_str(reinterpret_cast(next_msg.client_id.data())); uint32_t* msgID = reinterpret_cast(next_msg.message_id.data()); - Log("Sending next reply to ZMQ IDENTITY '"+client_str+"' for msg "+std::to_string(*msgID),1); + Log("Sending next reply to ZMQ IDENTITY '"+client_str+"' for msg "+std::to_string(*msgID),12); // as soon as we send a zmq::message_t (i.e. client_id and message_id), they are "used up": // the 'message.size()' becomes 0 and they strictly no longer retain their contents. // to keep a copy cached for re-sending we need to explicitly make a copy now. Query qrycpy(next_msg); // (the Query copy-constructor invokes zmq::message_t->copy on members) + clt_rtr_mtx.lock(); if(next_msg.response.size()==0){ try{ get_ok = Send(clt_rtr_socket, @@ -2266,6 +2493,7 @@ bool ReceiveSQL::SendNextReply(){ get_ok=false; } } + clt_rtr_mtx.unlock(); if(get_ok){ // all parts sent successfully, add to the sent cache @@ -2286,6 +2514,13 @@ bool ReceiveSQL::SendNextReply(){ } } // end send ok check + if(qrycpy.topic=="DEVCONFIG"){ + unsigned long timestamp = time(nullptr); + std::string timestring; + TimeStringFromUnixSec(timestamp, timestring); + Log(Concat("Replied config request '",qrycpy.query,"' at ",timestring," result: ",get_ok),0); + } + } // else no available listeners } // else no responses to send @@ -2307,11 +2542,12 @@ bool ReceiveSQL::SendNextMulticast(){ // Get the message std::string& message = out_multicast_queue.front(); + // FIXME FIXME FIXME this assumes log, but our monitoring data is also put in this queue int cnt = sendto(log_socket, message.c_str(), message.length()+1, 0, (struct sockaddr*)&log_addr, multicast_addrlen); if(cnt < 0){ std::string errmsg = "Error sending multicast message: "+std::string{strerror(errno)}; Log(errmsg,v_error); - out_multicast_queue.pop_front(); + out_multicast_queue.pop_front(); // discard it anyway ++multicast_send_fails; return false; @@ -2428,15 +2664,12 @@ bool ReceiveSQL::TrimQueue(const std::string& queuename){ bool ReceiveSQL::TrimDequeue(const std::string& queuename){ - // check in or out log message queue size and do the same + // check message dequeue size and do the same std::deque* queue; unsigned long* drop_count; // check which queue we're managing - if(queuename=="in_multicast_queue"){ - queue = &in_multicast_queue; - drop_count = &dropped_multicast_in; - } else if(queuename=="out_multicast_queue"){ + if(queuename=="out_multicast_queue"){ queue = &out_multicast_queue; drop_count = &dropped_logs_out; } else { @@ -2464,6 +2697,58 @@ bool ReceiveSQL::TrimDequeue(const std::string& queuename){ // ««-------------- ≪ °◇◆◇° ≫ --------------»» +bool ReceiveSQL::TrimVector(const std::string& queuename){ + + // check message vector size and do the same + std::vector* a_queue; + unsigned long* drop_count; + std::mutex* mtx; + + // check which queue we're managing + if(queuename=="in_log_queue"){ + a_queue = &in_log_queue; + mtx = &in_log_queue_mtx; + drop_count = &dropped_log_in; + } else if(queuename=="in_mon_queue"){ + a_queue = &in_mon_queue; + mtx = &in_mon_queue_mtx; + drop_count = &dropped_mon_in; + } else { + Log(Concat("TrimVector called with unknown message queue '",queuename,"'"),0); + return false; + } + + // FIXME this trimming process needs to be done better for background threads + // since we have some messages in the thread and some in the parent, + // so we need to trim in two places and use mutexes when handling parent one.... + mtx->lock(); + size_t nelems = a_queue->size(); + + // FIXME better than queue size would be to look at messages older than a certain time + // and trim based on that, since how well we're keeping up is more about how long + // each query takes, not about how many we have waiting. + if(nelems > drop_limit){ + int to_drop = nelems - drop_limit; + a_queue->erase(a_queue->begin(), a_queue->begin()+to_drop); + mtx->unlock(); + + Log(Concat("Warning! Number of waiting elements in queue ",queuename," (",nelems, + ") is over limit (",drop_limit,")! Dropping ",to_drop," messages!"),0); + *drop_count += to_drop; + + // check if we need to warn about being close to the limit + } else if(nelems > warn_limit){ + Log(Concat("Warning! Number of waiting elements in ",queuename," (",nelems, + ") is approaching drop limit (",drop_limit,") !", + "Is the network down, or responding slowly?"),1); + } + mtx->unlock(); + + return true; +} + +// ««-------------- ≪ °◇◆◇° ≫ --------------»» + bool ReceiveSQL::TrimCache(){ // check cache size and do the same @@ -2514,6 +2799,10 @@ bool ReceiveSQL::UpdateControls(){ SC_vars["ResetStats"]->GetValue(reset); if(reset) ResetStats(reset); + bool clearconnections=false; + SC_vars["ClearConnections"]->GetValue(clearconnections); + if(clearconnections) ClearConnections(clearconnections); + return true; } @@ -2530,8 +2819,12 @@ bool ReceiveSQL::TrackStats(){ // of reads/writes since last time. So get the last values unsigned long last_write_query_count; unsigned long last_read_query_count; + unsigned long last_log_count; + unsigned long last_mon_count; MonitoringStore.Get("write_queries_recvd", last_write_query_count); MonitoringStore.Get("read_queries_recvd", last_read_query_count); + MonitoringStore.Get("logs_recvd", last_log_count); + MonitoringStore.Get("mons_recvd", last_mon_count); // calculate rates are per minute elapsed_time = boost::posix_time::microsec_clock::universal_time() - last_stats_calc; @@ -2540,6 +2833,10 @@ bool ReceiveSQL::TrackStats(){ ((read_queries_recvd - last_read_query_count) * 60.) / elapsed_time.total_seconds(); float write_query_rate = (elapsed_time.total_seconds()==0) ? 0 : ((write_queries_recvd - last_write_query_count) * 60.) / elapsed_time.total_seconds(); + float log_rate = (elapsed_time.total_seconds()==0) ? 0 : + ((logs_recvd - last_log_count) * 60.) / elapsed_time.total_seconds(); + float mon_rate = (elapsed_time.total_seconds()==0) ? 0 : + ((mons_recvd - last_mon_count) * 60.) / elapsed_time.total_seconds(); // dump all stats into a Store. MonitoringStore.Set("min_loop_time",min_loop_ms); @@ -2549,21 +2846,23 @@ bool ReceiveSQL::TrackStats(){ MonitoringStore.Set("write_queries_waiting",wrt_txn_queue.size()); MonitoringStore.Set("read_queries_waiting",rd_txn_queue.size()); MonitoringStore.Set("replies_waiting",resp_queue.size()); - MonitoringStore.Set("incoming_multicastmsgs_waiting",in_multicast_queue.size()); + MonitoringStore.Set("incoming_logs_waiting",in_log_queue.size()); + MonitoringStore.Set("incoming_mons_waiting",in_mon_queue.size()); MonitoringStore.Set("out_multicasts_waiting",out_multicast_queue.size()); MonitoringStore.Set("cached_queries",cache.size()); MonitoringStore.Set("write_queries_recvd", write_queries_recvd); MonitoringStore.Set("write_query_recv_fails", write_query_recv_fails); MonitoringStore.Set("read_queries_recvd", read_queries_recvd); MonitoringStore.Set("read_query_recv_fails", read_query_recv_fails); - MonitoringStore.Set("logs_recvd", logs_recvd); - MonitoringStore.Set("mons_recvd", mons_recvd); - MonitoringStore.Set("log_recv_fails", log_recv_fails); - MonitoringStore.Set("mon_recv_fails", mon_recv_fails); + MonitoringStore.Set("logs_recvd", logs_recvd.load()); + MonitoringStore.Set("mons_recvd", mons_recvd.load()); + MonitoringStore.Set("log_recv_fails", log_recv_fails.load()); + MonitoringStore.Set("mon_recv_fails", mon_recv_fails.load()); MonitoringStore.Set("mm_broadcasts_recvd", mm_broadcasts_recvd); MonitoringStore.Set("mm_broadcast_recv_fails", mm_broadcast_recv_fails); MonitoringStore.Set("write_queries_failed", write_queries_failed); - MonitoringStore.Set("multicast_queries_failed", multicast_queries_failed); + MonitoringStore.Set("log_queries_failed", log_queries_failed.load()); + MonitoringStore.Set("mon_queries_failed", mon_queries_failed.load()); MonitoringStore.Set("read_queries_failed", read_queries_failed); MonitoringStore.Set("reps_sent", reps_sent); MonitoringStore.Set("rep_send_fails", rep_send_fails); @@ -2584,7 +2883,8 @@ bool ReceiveSQL::TrackStats(){ MonitoringStore.Set("dropped_writes", dropped_writes); MonitoringStore.Set("dropped_reads", dropped_reads); MonitoringStore.Set("dropped_resps", dropped_resps); - MonitoringStore.Set("dropped_multicast_in", dropped_multicast_in); + MonitoringStore.Set("dropped_log_in", dropped_log_in); + MonitoringStore.Set("dropped_mon_in", dropped_mon_in); MonitoringStore.Set("dropped_logs_out", dropped_logs_out); MonitoringStore.Set("dropped_monitoring_out", dropped_monitoring_out); MonitoringStore.Set("read_query_rate", read_query_rate); @@ -2599,10 +2899,10 @@ bool ReceiveSQL::TrackStats(){ std::stringstream status; status << " read qrys (rcvd/rcv errs/qry errs):["<SetValue(status.str()); @@ -2613,17 +2913,24 @@ bool ReceiveSQL::TrackStats(){ Log(Concat("Monitoring Stats:",json_stats),15); db_verbosity = db_verbosity_tmp; */ + + /* + std::string sql_qry = "INSERT INTO monitoring ( time, device, subject, data ) VALUES ( 'now()', '" + + my_id+"','stats','"+json_stats+"' );"; + */ + + std::string multicast_msg = "{ \"topic\":\"monitoring\"" + ", \"subject\":\"stats\"" + ", \"device\":\""+escape_json(my_id)+"\"" + + ", \"time\":"+std::to_string(time(nullptr)*1000) // ms since unix epoch + + ", \"data\":\""+json_stats+"\" }"; + if(am_master){ - std::string sql_qry = "INSERT INTO monitoring ( time, device, subject, data ) VALUES ( 'now()', '" - + my_id+"','stats','"+json_stats+"' );"; - in_multicast_queue.push_back(sql_qry); + in_mon_queue_mtx.lock(); + in_mon_queue.push_back(multicast_msg); + in_mon_queue_mtx.unlock(); } else { - std::string multicast_msg = "{ \"topic\":\"monitoring\"" - ", \"subject\":\"stats\"" - ", \"device\":\""+escape_json(my_id)+"\", " - + ", \"time\":"+std::to_string(time(nullptr)*1000) // ms since unix epoch - + ", \"data\":\""+json_stats+"\" }"; - out_multicast_queue.push_back(multicast_msg); + out_multicast_queue.push_back(multicast_msg); // FIXME FIXME FIXME needs to go to mon port } // reset counters @@ -2717,7 +3024,9 @@ bool ReceiveSQL::NegotiationRequest(){ // receive the other middleman's response std::vector messages; - int ret = PollAndReceive(mm_rcv_socket, in_polls.at(3), inpoll_timeout, messages); + mm_rcv_mtx.lock(); + int ret = PollAndReceive(mm_rcv_socket, in_polls.at(1), inpoll_timeout, messages); + mm_rcv_mtx.unlock(); // chech for errors if(ret==-3) Log("Error polling in socket in NegotiateMaster() call!",0); @@ -2924,11 +3233,13 @@ bool ReceiveSQL::UpdateRole(){ // promote the database out of recovery mode. 60s timeout. std::string err; - get_ok = m_databases.begin()->second.Promote(60,&err); // FIXME multiple dbs? + get_ok = m_database.Promote(nullptr, 60,&err); // should we also stop broadcasting ourself as a source of logging messages? utilities->RemoveService("logging"); + // FIXME aghhh we need to start the multicast threads too FIXME FIXME FIXME + // check for errors if(get_ok){ Log("Promotion success",1); @@ -2958,7 +3269,7 @@ bool ReceiveSQL::UpdateRole(){ std::string err; // demote the database to standby. 60s timeout. - get_ok = m_databases.begin()->second.Demote(60,&err); // FIXME multiple dbs? + get_ok = m_database.Demote(nullptr, 60,&err); // check for errors if(get_ok){ @@ -2971,7 +3282,9 @@ bool ReceiveSQL::UpdateRole(){ // drop any outstanding write messages wrt_txn_queue.clear(); - in_multicast_queue.clear(); + // FIXME aghhh we need to stop the threads too FIXME FIXME FIXME + in_mon_queue.clear(); + in_log_queue.clear(); } else { @@ -3067,7 +3380,7 @@ bool ReceiveSQL::GetLastUpdateTime(std::string& our_timestamp){ std::string err; std::vector results; - bool query_ok = m_databases.begin()->second.QueryAsStrings(query, &results, 'r', &err); // FIXME multiple dbs? + bool query_ok = m_database.QueryAsStrings(nullptr, query, &results, 'r', &err); if(not query_ok || results.size()==0){ Log(Concat("Error getting last commit timestamp in negotiation! ", @@ -3083,13 +3396,13 @@ bool ReceiveSQL::GetLastUpdateTime(std::string& our_timestamp){ // ««-------------- ≪ °◇◆◇° ≫ --------------»» -bool ReceiveSQL::TimeStringFromUnixSec(uint64_t timestamp, std::string& timestring){ +bool ReceiveSQL::TimeStringFromUnixSec(const uint64_t& timestamp, std::string& timestring){ return TimeStringFromUnixMs(timestamp*1000, timestring); } // ««-------------- ≪ °◇◆◇° ≫ --------------»» -bool ReceiveSQL::TimeStringFromUnixMs(uint64_t timestamp, std::string& timestring){ +bool ReceiveSQL::TimeStringFromUnixMs(const uint64_t& timestamp, std::string& timestring){ if(timestamp==0){ timestring="now()"; @@ -3097,7 +3410,7 @@ bool ReceiveSQL::TimeStringFromUnixMs(uint64_t timestamp, std::string& timestrin //std::cout<<"converting time "<second.pqxx_quote(message, msg); + get_ok = m_database.pqxx_quote(nullptr, message, msg); if(!get_ok){ std::cerr<<"Error sanitizing log message '"<ClearConnections(clt_rtr_socket, clt_rtr_connections); + ok = ok && utilities->ClearConnections(clt_sub_socket, clt_sub_connections); + ok = ok && utilities->ClearConnections(mm_rcv_socket, mm_rcv_connections); + */ + return ok; +} + //https://wiki.postgresql.org/wiki/What%27s_new_in_PostgreSQL_9.5#Commit_timestamp_tracking //https://stackoverflow.com/questions/33943524/atomically-set-serial-value-when-committing-transaction/33944402#33944402 //https://dba.stackexchange.com/questions/199290/get-last-modified-date-of-table-in-postgresql @@ -3401,7 +3740,8 @@ bool ReceiveSQL::ResetStats(bool reset){ mm_broadcasts_recvd=0; mm_broadcast_recv_fails=0; write_queries_failed=0; - multicast_queries_failed=0; + log_queries_failed=0; + mon_queries_failed=0; read_queries_failed=0; reps_sent=0; rep_send_fails=0; @@ -3422,7 +3762,8 @@ bool ReceiveSQL::ResetStats(bool reset){ dropped_writes=0; dropped_reads=0; dropped_resps=0; - dropped_multicast_in=0; + dropped_log_in=0; + dropped_mon_in=0; dropped_logs_out=0; dropped_monitoring_out=0; diff --git a/ReceiveSQL.h b/ReceiveSQL.h index 621c401..4773d17 100755 --- a/ReceiveSQL.h +++ b/ReceiveSQL.h @@ -10,6 +10,8 @@ // for finding clients #include "ServiceDiscovery.h" #include "MMUtilities.h" +// for background thread +#include "Utilities.h" // for slow control over zmq SD port #include "SlowControlCollection.h" // for databse interaction @@ -26,12 +28,20 @@ #include #include #include +//#include +#include // multicast #include #include #include #include #include +// for thread safety +#include + +struct MulticastReceive_args; +struct MulticastWorker_args; +struct FindClients_args; class ReceiveSQL{ public: @@ -48,29 +58,39 @@ class ReceiveSQL{ bool Execute(); bool FindNewClients(); - bool FindNewClients_v2(); + static void FindNewClients_v2(Thread_args* args); bool GetClientWriteQueries(); bool WriteMessageToQuery(const std::string& topic, const std::string& message, std::string& db_out, std::string& sql_out); bool GetClientReadQueries(); bool ReadMessageToQuery(const std::string& topic, const std::string& message, std::string& db_out, std::string& sql_out); - bool GetMulticastMessages(); - bool MulticastMessageToQuery(const std::string& message, std::string& topic_out, std::string& db_out, std::string& sql_out); + //bool GetMulticastMessages(); + static void GetMulticastMessages(Thread_args* args); // run multicast queries + bool MulticastMessageToQuery(pqxx::connection* conn, const std::string& message, std::string& sql_out, BStore& tmp, JSONP& jsonp); bool GetMiddlemanCheckin(); bool CheckMasterStatus(); bool RunNextWriteQuery(); bool RunNextReadQuery(); - bool RunNextMulticastMsg(); + //bool RunNextMulticastMsg(); + static void MulticastWorker(Thread_args* args); // receive multicast messages + std::vector multicast_worker_args; + MulticastReceive_args* multicast_listener_args=nullptr; + FindClients_args* findclients_args=nullptr; + size_t batch_size = 100; // minimum number of multicast messages to buffer before insert + size_t max_hold_ms = 500; // or wait at most this long before inserting any new multicast messages bool SendNextReply(); bool SendNextMulticast(); std::string escape_json(std::string s); + void TrimWS(std::string&); bool BroadcastPresence(); bool CleanupCache(); bool TrimQueue(const std::string& queuename); bool TrimDequeue(const std::string& queuename); + bool TrimVector(const std::string& queuename); bool TrimCache(); bool UpdateControls(); bool DoStop(bool stop); bool DoQuit(bool quit); + bool ClearConnections(bool clearconnections); bool TrackStats(); bool ResetStats(bool reset); @@ -82,8 +102,8 @@ class ReceiveSQL{ bool UpdateRole(); boost::posix_time::ptime ToTimestamp(const std::string& timestring); std::string ToTimestring(boost::posix_time::ptime); - bool TimeStringFromUnixMs(uint64_t timestamp, std::string& timestring); - bool TimeStringFromUnixSec(uint64_t timestamp, std::string& timestring); + bool TimeStringFromUnixMs(const uint64_t& timestamp, std::string& timestring); + bool TimeStringFromUnixSec(const uint64_t& timestamp, std::string& timestring); bool GetLastUpdateTime(std::string& our_timestamp); // Logging functions @@ -95,7 +115,7 @@ class ReceiveSQL{ private: // an instance of the postgres interface class to communicate with the database(s) - std::map m_databases; + Postgres m_database; SlowControlCollection SC_vars; std::string stopfile="stop"; @@ -115,6 +135,10 @@ class ReceiveSQL{ zmq::socket_t* clt_rtr_socket=nullptr; // receives read queries from client dealers zmq::socket_t* clt_sub_socket=nullptr; // receives write queries from client publishers zmq::socket_t* mm_rcv_socket=nullptr; // receives connections from other middlemen + // mutexes for locking them while background service discovery thread makes new connections + std::mutex clt_rtr_mtx; + std::mutex clt_sub_mtx; + std::mutex mm_rcv_mtx; // these sockets will bind, they advertise our services zmq::socket_t* mm_snd_socket=nullptr; // we will advertise our presence as a middleman to other middlemen @@ -123,12 +147,9 @@ class ReceiveSQL{ // and connect us to those sockets ServiceDiscovery* service_discovery = nullptr; MMUtilities* utilities = nullptr; - // required by the Utilities class to keep track of connections to clients - // we should have one map per zmq_socket managed by the Utilities class; - // it uses this to determine if we are connected to a given client already - std::map clt_rtr_connections; - std::map mm_rcv_connections; - std::map clt_sub_connections; + + // for background thread creation + Utilities m_util; // multicast socket file descriptor int log_socket=-1; @@ -137,9 +158,6 @@ class ReceiveSQL{ struct sockaddr_in log_addr; struct sockaddr_in mon_addr; socklen_t multicast_addrlen; - // apparently works with zmq poller? - zmq::pollitem_t multicast_poller; - char buf[655355]; // buffer for multicast messages // poll timeouts int inpoll_timeout; @@ -192,7 +210,16 @@ class ReceiveSQL{ std::map, Query> wrt_txn_queue; std::map, Query> rd_txn_queue; std::map, Query> resp_queue; - std::deque in_multicast_queue; + + std::vector in_log_queue; + std::vector in_mon_queue; + std::mutex in_log_queue_mtx; + std::mutex in_mon_queue_mtx; + + std::atomic logs_waiting; // num log messages received but not yet written to DB + std::atomic mons_waiting; // num mon messages received but not yet written to DB + + // FIXME needs splitting std::deque out_multicast_queue; // we'll cache a set of recent responses send to each client, // then if a client that misses their acknowledgement and resends the query, @@ -231,10 +258,10 @@ class ReceiveSQL{ unsigned long write_query_recv_fails = 0; unsigned long read_queries_recvd = 0; unsigned long read_query_recv_fails = 0; - unsigned long logs_recvd = 0; - unsigned long mons_recvd = 0; - unsigned long log_recv_fails = 0; - unsigned long mon_recv_fails = 0; + std::atomic logs_recvd = 0; + std::atomic mons_recvd = 0; + std::atomic log_recv_fails = 0; + std::atomic mon_recv_fails = 0; unsigned long mm_broadcasts_recvd = 0; unsigned long mm_broadcast_recv_fails = 0; @@ -243,7 +270,8 @@ class ReceiveSQL{ unsigned long read_queries_failed = 0; // number of postgres multicast message insertions that failed - unsigned long multicast_queries_failed = 0; + std::atomic log_queries_failed = 0; + std::atomic mon_queries_failed = 0; // number of messages sent over zmq sockets, and how many failed. unsigned long reps_sent = 0; @@ -274,7 +302,8 @@ class ReceiveSQL{ unsigned long dropped_reads = 0; unsigned long dropped_resps = 0; // number of log messages we've fropped from queues due to overflow - unsigned long dropped_multicast_in = 0; + unsigned long dropped_log_in = 0; + unsigned long dropped_mon_in = 0; unsigned long dropped_logs_out = 0; unsigned long dropped_monitoring_out = 0; @@ -291,6 +320,9 @@ class ReceiveSQL{ // for holding stats variables and turning them into a json Store MonitoringStore; + // tracking where we spend our time + std::map timers; // function -> time in ms + //////////// // variadic templates: our excuse to use c++11 ;) @@ -415,4 +447,77 @@ class ReceiveSQL{ bool ReadPlotlyPlotToQuery(const std::string& message, BStore& request, std::string& db_out, std::string& sql_out); }; +enum class multicast_type { log, mon }; + +// POD class for things passed to thread to connect to new client +struct FindClients_args : public Thread_args { + ReceiveSQL* parent; + MMUtilities* utilities; + // required by the Utilities class to keep track of connections to clients + // we should have one map per zmq_socket managed by the Utilities class; + // it uses this to determine if we are connected to a given client already + std::map clt_rtr_connections; + std::map mm_rcv_connections; + std::map clt_sub_connections; +}; + +// POD class for things passed to multicast worker threads +struct MulticastWorker_args : public Thread_args { + short id; + ReceiveSQL* parent; + bool running; + bool finished; + // thread must maintain its own connection to database + pqxx::connection* conn; + BStore tmp; + JSONP jsonp; + std::string vals_string; + std::string_view query_base; + std::string query; + int get_ok; + bool first_vals; + size_t n_queries; + boost::posix_time::ptime last_insert; + std::vector msg_queue; + std::vector* in_queue; + std::mutex* in_queue_mtx; + multicast_type type; + + MulticastWorker_args(ReceiveSQL* i_parent, pqxx::connection* i_conn, multicast_type worker_type, std::vector* msg_queue, std::mutex* msg_queue_mtx, short iid){ + id=iid; + parent = i_parent; + conn = i_conn; + type = worker_type; + in_queue = msg_queue; + in_queue_mtx = msg_queue_mtx; + if(worker_type==multicast_type::log){ + query_base = "INSERT INTO logging ( time, device, severity, message ) VALUES "; + } else { + query_base = "INSERT INTO monitoring ( time, device, subject, data ) VALUES "; + } + query = query_base; + first_vals = true; + n_queries=0; + running=true; + finished=false; + last_insert = boost::posix_time::microsec_clock::universal_time(); + + } + +}; + +// POD class for things passed to multicast listener thread +struct MulticastReceive_args : public Thread_args { + ReceiveSQL* parent; + multicast_type type; + bool running; + bool finished; + socklen_t addrlen; + struct sockaddr_in* addr; + int socket; + zmq::pollitem_t poll; + char message[655355]; // theoretical maximum UDP buffer size + int get_ok; +}; + #endif diff --git a/ReceiveSQLConfig b/ReceiveSQLConfig index 0ec2b18..7333e24 100644 --- a/ReceiveSQLConfig +++ b/ReceiveSQLConfig @@ -10,7 +10,7 @@ port 5432 dbname daq #user tooldaq #passwd tooldaq19 -#context_io_threads 1 +context_io_threads 3 clt_sub_port 55556 clt_rtr_port 55555 mm_snd_port 55597 @@ -18,6 +18,7 @@ log_port 55554 mon_port 55553 multicast_address 239.192.1.1 remote_control_port 24011 +num_multicast_threads 10 # all timeouts in ms clt_sub_socket_timeout 500 @@ -41,8 +42,8 @@ dont_promote 0 max_send_attempts 3 # how many postgres queries in the queue before we warn about being full warn_limit 700 -# how many postgres queries before we start dropping them -drop_limit 1000 +# how many postgres queries before we start dropping them +drop_limit 5000 # how long to cache received messages so we don't get duplicates cache_period_ms 10000 # if writes are sent to the read-only port, but we're master, should we accept them? diff --git a/Setup.sh b/Setup.sh index f127a21..a66967f 100755 --- a/Setup.sh +++ b/Setup.sh @@ -1,5 +1,7 @@ #!/bin/bash Dependencies=/opt +#export PS1='${debian_chroot:+($debian_chroot)}\[\033[35;2m\]\u@\h\[\033[02;31;22m\][\t]\[\033[00m\]:\[\033[00;36m\]\w\[\033[00m\]\$ ' +export PS1='${debian_chroot:+($debian_chroot)}\[\033[35;2;1m\]\u@\h\[\033[00m\]:\[\033[00;36m\]\w\[\033[00m\]\$ ' # add Dependencies from ToolFramework export LD_LIBRARY_PATH=${Dependencies}/zeromq-4.0.7/lib:${Dependencies}/boost_1_66_0/install/lib:${Dependencies}/libpqxx-6.4.5/install/lib:${Dependencies}/ToolFrameworkCore/lib:${Dependencies}/ToolDAQFramework/lib:$LD_LIBRARY_PATH diff --git a/middleman.service b/middleman.service new file mode 100644 index 0000000..ba7eaf2 --- /dev/null +++ b/middleman.service @@ -0,0 +1,23 @@ +[Unit] +Description=middleman application that interfaces with the database via zmq +After=postgresql.service +Requires=postgresql.service +OnFailure=notify-slack@%i.service + +[Service] +Type=simple +#Environment="PGHOST=/var/run/psql" # some debian-based distros use this, though it can be configured in postgresql.conf +Environment="PGHOST=/tmp" +Environment="PGUSER=root" +Environment="PGPORT=5432" +Environment="PGDATABASE=daq" +Environment="PGDATA=/var/lib/pgsql/data" +Environment="LD_LIBRARY_PATH=/opt/zeromq-4.0.7/lib:/opt/boost_1_66_0/install/lib:/opt/libpqxx-6.4.5/install/lib:/opt/ToolFrameworkCore/lib:/opt/ToolDAQFramework/lib:$LD_LIBRARY_PATH" +Environment="ASAN_OPTIONS=allocator_may_return_null=1" +WorkingDirectory=/opt/middleman +ExecStart=/opt/middleman/middleman /opt/middleman/ReceiveSQLConfig +# if the middleman stops without the user doing so via systemctl, restart it automatically +Restart=always + +[Install] +WantedBy=default.target diff --git a/notify-service@.service b/notify-service@.service new file mode 100644 index 0000000..f88be2c --- /dev/null +++ b/notify-service@.service @@ -0,0 +1,11 @@ +[Unit] +Description=Send Notification of systemd unit Failure + +[Service] +Type=oneshot +#ExecStart=echo 'Notification triggered for service %i' +ExecStart=/opt/middleman/slackNotify.sh -s %i + +[Install] +WantedBy=default.target +EOF diff --git a/run_maintenance.sh b/run_maintenance.sh new file mode 100755 index 0000000..3d00d73 --- /dev/null +++ b/run_maintenance.sh @@ -0,0 +1,14 @@ +#!/bin/sh +set -x +while [ /bin/true ]; do + psql -c "CALL partman.partition_data_proc('public.monitoring', p_batch := 1);" + NDEFAULT=$(psql -At -c "SELECT * FROM partman.check_default();" | cut -d'|' -f 2 ); + #if [ $? -ne 0 ]; then + if [ ! -z "${NDEFAULT}" ] && [ ${NDEFAULT} -eq 0 ]; then + break; + else + sleep 1200 + fi +done +sleep 1200 +#psql -c "VACUUM ANALYZE monitoring;" diff --git a/show_logs.sh b/show_logs.sh new file mode 100755 index 0000000..af836ea --- /dev/null +++ b/show_logs.sh @@ -0,0 +1,7 @@ +#!/bin/bash +if [ $# -gt 0 ]; then + if [ "$1" == "-f" ]; then + journalctl -f _SYSTEMD_INVOCATION_ID=`systemctl show --value -p InvocationID middleman` + fi +fi +journalctl _SYSTEMD_INVOCATION_ID=`systemctl show --value -p InvocationID middleman` | less diff --git a/slack_notify.sh b/slack_notify.sh new file mode 100755 index 0000000..5eb9fe7 --- /dev/null +++ b/slack_notify.sh @@ -0,0 +1,56 @@ +#!/bin/bash +# Bash script to send systemd notifications to Slack + +function usage { + programName=$0 + echo "description: use this script to post systemd service failure message to Slack channel" + echo "usage: $programName -s \"service name\"" + echo " -s the systemd service name e.g. nginx" + exit 1 +} + +# Get service name from options +while getopts ":s:" opt; do + case $opt in + s) + SERVICE_NAME=$OPTARG + ;; + \?) + echo "Invalid option: -$OPTARG" >&2 + exit 1 + ;; + :) + echo "Option -$OPTARG requires an argument." >&2 + exit 1 + ;; + esac +done + +#echo "service name is '${SERVICE_NAME}'" + +if [[ ! "${SERVICE_NAME}" ]]; then + echo "Service name is required" + usage +fi + +# Edit the following variables to match your requirements +SLACK_HOOK_FILE=/opt/middleman/slack_web_hook.txt +SLACK_WEBHOOK_URL=$(cat ${SLACK_HOOK_FILE}) +SLACK_CHANNEL="#general" +SLACK_USERNAME="Notification Bot" +SLACK_ICON=":zap:" +SLACK_COLOR="danger" +SLACK_TITLE="Service $SERVICE_NAME failed on $(hostname)" +SLACK_PRETEXT="Service $SERVICE_NAME failed" +SLACK_TEXT="$(systemctl status $SERVICE_NAME)" +#SLACK_TEXT="testing service \"potato\"" +SLACK_FOOTER="Notification Bot at $(hostname) on $(date)" +# End of variables + +SLACK_ATTACHMENT='[{"fallback": "'"$SLACK_MESSAGE"'", "color": "'"$SLACK_COLOR"'", "title": "'"$SLACK_TITLE"'", "title_link": "'"$SLACK_TITLE_LINK"'", "pretext": "'"$SLACK_PRETEXT"'", "text": "'"$SLACK_TEXT"'", "footer": "'"$SLACK_FOOTER"'", "footer_icon": "'"$SLACK_FOOTER_ICON"'"}]' + +# Send notification to Slack +curl -X POST --data-urlencode 'payload={"channel": "'"$SLACK_CHANNEL"'", "username": "'"$SLACK_USERNAME"'", "icon_emoji": "'"$SLACK_ICON"'", "attachments": '"$SLACK_ATTACHMENT"'}' $SLACK_WEBHOOK_URL + +# Exit with success code +exit 0