From: <fli...@li...> - 2019-06-11 13:35:21
|
r-harrison pushed a commit to branch next in repository simgear. SF URL: http://sourceforge.net/p/flightgear/simgear/ci/8eb51e813fd62098ba5c06c4746759187a8984f4/ Commit: 8eb51e813fd62098ba5c06c4746759187a8984f4 Author: Richard Harrison Committer: Richard Harrison AuthorDate: Mon Jun 3 23:32:34 2019 +0200 Added Emesary to SimGear Core --- simgear/CMakeLists.txt | 1 + simgear/emesary/CMakeLists.txt | 28 ++++ simgear/emesary/Emesary.cxx | 27 ++++ simgear/emesary/Emesary.hxx | 325 ++++++++++++++++++++++++++++++++++++++ simgear/emesary/notifications.hxx | 76 +++++++++ simgear/emesary/test_emesary.cxx | 126 +++++++++++++++ 6 files changed, 583 insertions(+) diff --git a/simgear/CMakeLists.txt b/simgear/CMakeLists.txt index 6f8d2933..ca26c9d2 100644 --- a/simgear/CMakeLists.txt +++ b/simgear/CMakeLists.txt @@ -6,6 +6,7 @@ foreach( mylibfolder bvh debug embedded_resources + emesary ephemeris io magvar diff --git a/simgear/emesary/CMakeLists.txt b/simgear/emesary/CMakeLists.txt new file mode 100644 index 00000000..1003c6f0 --- /dev/null +++ b/simgear/emesary/CMakeLists.txt @@ -0,0 +1,28 @@ + + +include (SimGearComponent) + +set(HEADERS + emesary.hxx + notifications.hxx + ) + +set(SOURCES + emesary.cxx + ) + +simgear_component(emesary emesary "${SOURCES}" "${HEADERS}") + + +if(ENABLE_TESTS) + +add_executable(test_emesary test_emesary.cxx) + +set_target_properties(test_emesary PROPERTIES + COMPILE_DEFINITIONS "SRC_DIR=\"${CMAKE_CURRENT_SOURCE_DIR}\"" ) + +target_link_libraries(test_emesary ${TEST_LIBS}) +add_test(emesary ${EXECUTABLE_OUTPUT_PATH}/test_emesary) + + +endif(ENABLE_TESTS) diff --git a/simgear/emesary/Emesary.cxx b/simgear/emesary/Emesary.cxx new file mode 100644 index 00000000..bf91077c --- /dev/null +++ b/simgear/emesary/Emesary.cxx @@ -0,0 +1,27 @@ +/*--------------------------------------------------------------------------- +* +* Title : Emesary - class based inter-object communication +* +* File Type : Implementation File +* +* Description : Templated version of Emesary +* : +* : +* : +* : +* +* References : http://www.chateau-logic.com/content/class-based-inter-object-communication +* +* Author : Richard Harrison (ri...@za...) +* +* Creation Date : 18 March 2002 +* +* Version : $Header: $ +* +* Copyright © 2002 Richard Harrison All Rights Reserved. +* +*---------------------------------------------------------------------------*/ + +#include "simgear/emesary/Emesary.hxx" + +simgear::Emesary::Transmitter GlobalTransmitter; diff --git a/simgear/emesary/Emesary.hxx b/simgear/emesary/Emesary.hxx new file mode 100644 index 00000000..da1f4ff8 --- /dev/null +++ b/simgear/emesary/Emesary.hxx @@ -0,0 +1,325 @@ +#pragma once +/*--------------------------------------------------------------------------- +* +* Title : Emesary - class based inter-object communication +* +* File Type : Implementation File +* +* Description : Provides generic inter-object communication. For an object to receive a message it +* : must first register with a Transmitter, such as GlobalTransmitter, and implement the +* : IReceiver interface. That's it. +* : To send a message use a Transmitter with an object. That's all there is to it. +* +* References : http://www.chateau-logic.com/content/class-based-inter-object-communication +* +* Author : Richard Harrison (ri...@za...) +* +* Creation Date : 18 March 2002, rewrite 2017 +* +* Version : $Header: $ +* +* Copyright � 2002 - 2017 Richard Harrison All Rights Reserved. +* +*---------------------------------------------------------------------------*/ +#include <typeinfo> + +#include <string> +#include <list> +#include <set> +#include <vector> +#include <atomic> +#include <simgear/threads/SGThread.hxx> + + +namespace simgear +{ + namespace Emesary + { + enum ReceiptStatus + { + /// <summary> + /// Processing completed successfully + /// </summary> + ReceiptStatusOK = 0, + + /// <summary> + /// Individual item failure + /// </summary> + ReceiptStatusFail = 1, + + /// <summary> + /// Fatal error; stop processing any further recipieints of this message. Implicitly fail + /// </summary> + ReceiptStatusAbort = 2, + + /// <summary> + /// Definitive completion - do not send message to any further recipieints + /// </summary> + ReceiptStatusFinished = 3, + + /// <summary> + /// Return value when method doesn't process a message. + /// </summary> + ReceiptStatusNotProcessed = 4, + + /// <summary> + /// Message has been sent but the return status cannot be determined as it has not been processed by the recipient. + /// </summary> + /// <notes> + /// For example a queue or outgoing bridge + /// </notes> + ReceiptStatusPending = 5, + + /// <summary> + /// Message has been definitively handled but the return value cannot be determined. The message will not be sent any further + /// </summary> + /// <notes> + /// For example a point to point forwarding bridge + /// </notes> + ReceiptStatusPendingFinished = 6, + }; + + /// <summary> + /// Interface (base class) for all notifications. The value is an opaque pointer that may be used to store anything, although + /// often it is more convenient to + /// </summary> + class INotification + { + public: + virtual const char *GetType() = 0; + }; + /// <summary> + /// Interface (base class) for a recipeint. + /// </summary> + class IReceiver + { + public: + /// <summary> + /// Receive notifiction - must be implemented + /// </summary> + virtual ReceiptStatus Receive(INotification& message) = 0; + + /// <summary> + /// Called when registered at a transmitter + /// </summary> + virtual void OnRegisteredAtTransmitter(class Transmitter *p) + { + } + /// <summary> + /// Called when de-registered at a transmitter + /// </summary> + virtual void OnDeRegisteredAtTransmitter(class Transmitter *p) + { + } + }; + + /// <summary> + /// Interface (base clasee) for a transmitter. + /// Transmits Message derived objects. Each instance of this class provides a + /// databus to which any number of receivers can attach to. + /// </summary> + class ITransmitter + { + public: + /* + * Registers a recipient to receive message from this transmitter + */ + virtual void Register(IReceiver& R) = 0; + /* + * Removes a recipient from from this transmitter + */ + virtual void DeRegister(IReceiver& R) = 0; + + /* + * Notify all registered recipients. Stop when receipt status of abort or finished are received. + * The receipt status from this method will be + * - OK > message handled + * - Fail > message not handled. A status of Abort from a recipient will result in our status + * being fail as Abort means that the message was not and cannot be handled, and + * allows for usages such as access controls. + */ + virtual ReceiptStatus NotifyAll(INotification& M) = 0; + /// <summary> + /// number of recipients + /// </summary> + virtual int Count() = 0; + }; + + + /** + * Description: Transmits Message derived objects. Each instance of this class provides a + * databus to which any number of receivers can attach to. + * + * Messages may be inherited and customised between individual systems. + */ + class Transmitter : public ITransmitter + { + protected: + typedef std::list<IReceiver *> RecipientList; + RecipientList recipient_list; + RecipientList deleted_recipients; + int CurrentRecipientIndex = 0; + SGMutex _lock; + std::atomic<int> receiveDepth; + std::atomic<int> sentMessageCount; + + void UnlockList() + { + _lock.unlock(); + } + void LockList() + { + _lock.lock(); + } + public: + Transmitter() : receiveDepth(0), sentMessageCount(0) + { + } + virtual ~Transmitter() + { + } + /** + * Registers an object to receive messsages from this transmitter. + * This object is added to the top of the list of objects to be notified. This is deliberate as + * the sequence of registration and message receipt can influence the way messages are processing + * when ReceiptStatus of Abort or Finished are encountered. So it was a deliberate decision that the + * most recently registered recipients should process the messages/events first. + */ + virtual void Register(IReceiver& r) + { + LockList(); + recipient_list.push_back(&r); + r.OnRegisteredAtTransmitter(this); + if (std::find(deleted_recipients.begin(), deleted_recipients.end(), &r) != deleted_recipients.end()) + deleted_recipients.remove(&r); + + UnlockList(); + } + + /* + * Removes an object from receving message from this transmitter + */ + virtual void DeRegister(IReceiver& R) + { + LockList(); + //printf("Remove %x\n", &R); + if (recipient_list.size()) + { + if (std::find(recipient_list.begin(), recipient_list.end(), &R) != recipient_list.end()) + { + recipient_list.remove(&R); + R.OnDeRegisteredAtTransmitter(this); + if (std::find(deleted_recipients.begin(), deleted_recipients.end(), &R) == deleted_recipients.end()) + deleted_recipients.push_back(&R); + } + } + UnlockList(); + } + + /* + * Notify all registered recipients. Stop when receipt status of abort or finished are received. + * The receipt status from this method will be + * - OK > message handled + * - Fail > message not handled. A status of Abort from a recipient will result in our status + * being fail as Abort means that the message was not and cannot be handled, and + * allows for usages such as access controls. + * NOTE: When I first designed Emesary I always intended to have message routing and the ability + * for each recipient to specify an area of interest to allow performance improvements + * however this has not yet been implemented - but the concept is still there and + * could be implemented by extending the IReceiver interface to allow for this. + */ + virtual ReceiptStatus NotifyAll(INotification& M) + { + ReceiptStatus return_status = ReceiptStatusNotProcessed; + //printf("Begin receive %d : %x\n", (int)receiveDepth, M); + //fflush(stdout); + sentMessageCount++; + try + { + LockList(); + if (receiveDepth == 0) + deleted_recipients.clear(); + receiveDepth++; + std::vector<IReceiver*> temp(recipient_list.size()); + int idx = 0; + for (RecipientList::iterator i = recipient_list.begin(); i != recipient_list.end(); i++) + { + temp[idx++] = *i; + } + UnlockList(); + int tempSize = temp.size(); + for (int index = 0; index < tempSize; index++) + { + IReceiver* R = temp[index]; + LockList(); + if (deleted_recipients.size()) + { + if (std::find(deleted_recipients.begin(), deleted_recipients.end(), R) != deleted_recipients.end()) + { + UnlockList(); + continue; + } + } + UnlockList(); + if (R) + { + ReceiptStatus rstat = R->Receive(M); + switch (rstat) + { + case ReceiptStatusFail: + return_status = ReceiptStatusFail; + break; + case ReceiptStatusPending: + return_status = ReceiptStatusPending; + break; + case ReceiptStatusPendingFinished: + return rstat; + + case ReceiptStatusNotProcessed: + break; + case ReceiptStatusOK: + if (return_status == ReceiptStatusNotProcessed) + return_status = rstat; + break; + + case ReceiptStatusAbort: + return ReceiptStatusAbort; + + case ReceiptStatusFinished: + return ReceiptStatusOK; + } + } + + } + } + catch (...) + { + throw; + // return_status = ReceiptStatusAbort; + } + receiveDepth--; + //printf("End receive %d : %x\n", (int) receiveDepth, M); + return return_status; + } + virtual int Count() + { + LockList(); + return recipient_list.size(); + UnlockList(); + } + int SentMessageCount() + { + return sentMessageCount; + } + static bool Failed(ReceiptStatus receiptStatus) + { + // + // failed is either Fail or Abort. + // NotProcessed isn't a failure because it hasn't been processed. + return receiptStatus == ReceiptStatusFail + || receiptStatus == ReceiptStatusAbort; + } + }; + Transmitter GlobalTransmitter; + } +} \ No newline at end of file diff --git a/simgear/emesary/notifications.hxx b/simgear/emesary/notifications.hxx new file mode 100644 index 00000000..032d9252 --- /dev/null +++ b/simgear/emesary/notifications.hxx @@ -0,0 +1,76 @@ +/*--------------------------------------------------------------------------- +* +* Title : Emesary - class based inter-object communication +* +* File Type : Implementation File +* +* Description : Provides generic inter-object communication. For an object to receive a message it +* : must first register with a Transmitter, such as GlobalTransmitter, and implement the +* : IReceiver interface. That's it. +* : To send a message use a Transmitter with an object. That's all there is to it. +* +* References : http://www.chateau-logic.com/content/class-based-inter-object-communication +* +* Author : Richard Harrison (ri...@za...) +* +* Creation Date : 18 March 2002, rewrite 2017 +* +* Version : $Header: $ +* +* Copyright � 2002 - 2017 Richard Harrison All Rights Reserved. +* +*---------------------------------------------------------------------------*/ +#include <typeinfo> + +#include <string> +#include <list> +#include <set> +#include <vector> +#include <Windows.h> +#include <process.h> +#include <atomic> +#include <simgear/emesary/emesary.hxx> + +namespace simgear +{ + namespace Notifications + { + class MainLoopNotification : public simgear::Emesary::INotification + { + public: + enum Type { Started, Stopped, Begin, End }; + MainLoopNotification(Type v) : Type(v) {} + + virtual Type GetValue() { return Type; } + virtual const char *GetType() { return "MainLoop"; } + + protected: + Type Type; + }; + + class NasalGarbageCollectionConfigurationNotification : public simgear::Emesary::INotification + { + public: + NasalGarbageCollectionConfigurationNotification(bool canWait, bool active) : CanWait(canWait), Active(active) {} + + virtual bool GetCanWait() { return CanWait; } + virtual bool GetActive() { return Active; } + virtual const char *GetType() { return "NasalGarbageCollectionConfiguration"; } + virtual bool SetWait(bool wait) { + if (wait == CanWait) + return false; + CanWait = wait; + return true; + } + virtual bool SetActive(bool active) { + if (active == Active) + return false; + Active = active; + return true; + } + public: + bool CanWait; + bool Active; + }; + } +} \ No newline at end of file diff --git a/simgear/emesary/test_emesary.cxx b/simgear/emesary/test_emesary.cxx new file mode 100644 index 00000000..ad9c190f --- /dev/null +++ b/simgear/emesary/test_emesary.cxx @@ -0,0 +1,126 @@ +//////////////////////////////////////////////////////////////////////// +// Test harness for Emesary. +//////////////////////////////////////////////////////////////////////// + +#include <simgear_config.h> +#include <simgear/compiler.h> + +#include <iostream> + +#include <simgear/emesary/emesary.hxx> + +using std::cout; +using std::cerr; +using std::endl; + +std::atomic<int> nthread = 0; +std::atomic<int> noperations = 0; +const int MaxIterations = 9999; + +class TestThreadNotification : public simgear::Emesary::INotification +{ +protected: + const char *baseValue; +public: + TestThreadNotification(const char *v) : baseValue(v) {} + + virtual const char* GetType () { return baseValue; } +}; + +class TestThreadRecipient : public simgear::Emesary::IReceiver +{ +public: + TestThreadRecipient() : receiveCount(0) + { + + } + + std::atomic<int> receiveCount; + virtual simgear::Emesary::ReceiptStatus Receive(simgear::Emesary::INotification &n) + { + if (n.GetType() == (const char*)this) + { + TestThreadNotification *tn = dynamic_cast<TestThreadNotification *>(&n); + receiveCount++; + TestThreadNotification onwardNotification("AL"); + simgear::Emesary::GlobalTransmitter.NotifyAll(onwardNotification); + return simgear::Emesary::ReceiptStatusOK; + } + return simgear::Emesary::ReceiptStatusOK; + } +}; + +class EmesaryTestThread : public SGThread +{ +protected: + virtual void run() { + int threadId = nthread.fetch_add(1); + + //System.Threading.Interlocked.Increment(ref nthread); + //var rng = new Random(); + TestThreadRecipient r; + char temp[100]; + sprintf(temp, "Notif %d", threadId); + printf("starting thread %s\n", temp); + TestThreadNotification tn((const char*)&r); + for (int i = 0; i < MaxIterations; i++) + { + simgear::Emesary::GlobalTransmitter.Register(r); + simgear::Emesary::GlobalTransmitter.NotifyAll(tn); + simgear::Emesary::GlobalTransmitter.DeRegister(r); + //System.Threading.Thread.Sleep(rng.Next(MaxSleep)); + noperations++; + } + printf("%s invocations %d\n", temp, (int)r.receiveCount); + printf("finish thread %s\n", temp); + } +}; + +class EmesaryTest +{ +public: + + void Emesary_MultiThreadTransmitterTest() + { + int num_threads = 12; + std::list<EmesaryTestThread*> threads; + + for (int i = 0; i < num_threads; i++) + { + EmesaryTestThread *thread = new EmesaryTestThread(); + threads.push_back(thread); + thread->start(); + } + for (std::list<EmesaryTestThread*>::iterator i = threads.begin(); i != threads.end(); i++) + { + (*i)->join(); + } + } +}; + +void testEmesaryThreaded() +{ + TestThreadRecipient r; + TestThreadNotification tn((const char*)&r); + simgear::Emesary::GlobalTransmitter.Register(r); + for (int i = 0; i < MaxIterations*MaxIterations; i++) + { + simgear::Emesary::GlobalTransmitter.NotifyAll(tn); + //System.Threading.Thread.Sleep(rng.Next(MaxSleep)); + noperations++; + } + simgear::Emesary::GlobalTransmitter.DeRegister(r); + printf("invocations %d\n", simgear::Emesary::GlobalTransmitter.SentMessageCount()); + + EmesaryTest t; + t.Emesary_MultiThreadTransmitterTest(); +} + + +int main(int ac, char ** av) +{ + testEmesaryThreaded(); + + std::cout << "all tests passed" << std::endl; + return 0; +} |