Quantitative Analysis
Parallel Processing
Numerical Analysis
C++ Multithreading
Python for Excel
Python Utilities
Services
Author

I. Installation.
II. Threading primitives.
III. NonBlockingQueue.
IV. ThreadPool.
V. ThreadMaster.
VI. OTS Scheduler.
1. Scheduler implementation.
A. Node class.
a. Initial processing of an update event.
b. Processing of out-of-date Nodes.
B. Exception handling policy.
2. Customization of Scheduler. Interfaces IOrigin and IProxy.
3. Acceptance test for the Scheduler.
VII. Bibliography
Downloads. Index. Contents.

Node class.


he previous section defines theUpdateQueue that holds the Nodes that immediately ready for processing. Consider the structure on the picture ( Example situation ). For example, we start from a tree of all up-to-date objects and the object (B) is the only one that receives an update event. In this case (B) should be in theUpdateQueue. However, If (A) also receives an update event then the object (A) should be in theUpdateQueue and (B) should not be (because it cannot update until (A) updates). Alternatively, if we start from the up-to-date tree and (A) receives an update event then neither (B) nor (C) should be in theUpdateQueue until (A) updates regardless of whether or not these nodes receive an update event. It follows from these examples that the procedure cannot be implemented without walking through dependency tree. Also, we need to track the number of immediate parents that should be updated and mark Nodes that need updating. The Node that is marked for updating and has no immediate out-of-date parents is placed into theUpdateQueue. The implementation code below reflects these observations.

1\ class Node

2\ {

3\ public:

4\ typedef config::InstanceName::type InstanceName;

5\ typedef config::SynchCounter::type SynchCounter;

6\ typedef NonBlockingQueue<NodeImpl> WaitingQueue;

7\ friend class NodeImpl;

8\ private:

9\ boost::shared_ptr<volatile NodeImpl> theImpl;

10\ explicit Node( const boost::shared_ptr<volatile NodeImpl>& impl );

11\ friend std::pair<Node,Node> makeNodes(

12\ volatile NodeRegistry& registry1,

13\ volatile NodeRegistry& registry2,

14\ const CreateEvent& event

15\ );

16\ public:

17\ Node( const Node& node ) : theImpl(node.theImpl) {}

18\ Node& operator=( const Node& node ) { theImpl=node.theImpl; return *this; }

19\ inline void swap( Node& node ) { theImpl.swap(node.theImpl); }

...

29\ };

The lines 9 and 17-19 indicate that the Node instance is a smart pointer to the described below NodeImpl. It is passed by value.

The constructor, line 10, is private. This is because there are two copies of the tree. Hence, the Nodes are created and deleted in pairs via the function makeNodes, lines 11-15.

32\ class NodeImpl : boost::noncopyable

33\ {

34\ public:

35\ typedef config::InstanceName::type InstanceName;

36\ typedef NonBlockingQueue<NodeImpl> UpdateQueue;

37\ typedef NonBlockingQueue<NodeImpl> WaitingQueue;

38\ typedef config::SynchCounter::type SynchCounter;

39\ typedef config::RWMutex::type Mutex;

40\ typedef std::list<boost::weak_ptr<volatile NodeImpl> > Listeners;

...

43\ typedef config::UpgradeGuard<NodeImpl,Mutex>::type UpgradeGuard;

44\ private:

45\ const InstanceName theName;

46\ mutable Mutex theMutex;

47\ WaitingQueue::Node theWaitingQueueNode;

48\ UpdateQueue::Node theUpdateQueueNode;

49\ Origin theOrigin;

50\ Proxy theProxy;

51\ SynchCounter theSynchCounter;

52\ unsigned int theNumberOfSourcesOutOfDate;

53\ bool theNeedToUpdate;

54\ boost::weak_ptr<volatile NodeImpl> theOther;

55\ Listeners theListeners;

56\ private:

57\ friend std::pair<Node,Node> makeNodes(

58\ volatile NodeRegistry& registry1,

59\ volatile NodeRegistry& registry2,

60\ const CreateEvent& event

61\ );

62\ friend class Node;

63\ explicit NodeImpl( const CreateEvent& event );

...

72\ void onSourceNeedsUpdate() volatile;

73\ void onSourceUpdated( volatile UpdateQueue& queue ) volatile;

75\ void addToListeners( boost::weak_ptr<volatile NodeImpl> p ) volatile;

...

79\ boost::weak_ptr<volatile NodeImpl> other() const volatile;

80\ public:

81\ ~NodeImpl();

82\ inline Mutex& mutex() const { return theMutex; }

83\ void placeIntoWaitingQueue( volatile WaitingQueue& queue ) volatile;

84\ void placeIntoUpdateQueue( volatile UpdateQueue& queue ) volatile;

85\ void update( volatile UpdateQueue& queue ) volatile;

86\ };

The procedure is built to avoid heap operations as much as possible and to grab all necessary memory on creation of object. It does not require additional memory when the flow of updates intensifies. For this reason the NodeImpl has fields theWaitingQueueNode and theUpdateQueueNode. There are only two queue where the Node may participate and these fields reserve the needed memory.

The fields theOrigin and theProxy (lines 49,50) carry the business logic. Customization of these objects enables the Scheduler to do useful tasks. These objects are described in the next section.

The update events may arrive out-of-order. We need to discard old update events if these require action that was already executed. In addition, senders of the events may require confirmation that the event was processed. Such confirmations should have an ID for separation of repeated updates. For these reasons we need the synchronization counter, line 51.

The function of theNumberOfSourcesOutOfDate and theNeedToUpdate (lines 52,53) was described at the beginning of this section.

Since Nodes are created and destroyed in pairs, the NodeImpl has to hold the weak_ptr to the other node, line 54.

The field theListeners, line 55, is the list of those Nodes that depend on the current Node. It is a list of weak_ptr, line 40.

Most of the implementation logic is hidden inside the functions placeIntoUpdateQueue, onSourceNeedsUpdate and onSourceUpdated. The "Source" is equivalent to "Parent". The "Listener" is "child". If the Source is out-of-date then the Listener is out-of-date. Always, the Parent (Source) updates before Child (Listener).




a. Initial processing of an update event.
b. Processing of out-of-date Nodes.

Downloads. Index. Contents.


















Copyright 2007