Quantitative Analysis
Parallel Processing
Numerical Analysis
C++ Multithreading
Python for Excel
Python Utilities
Services
Author

I. Installation.
II. Threading primitives.
III. NonBlockingQueue.
1. NonBlockingQueue design.
2. Simplest example with NonBlockingQueue.
3. NonBlockingQueue prototypes.
4. Python-based acceptance test of NonBlockingQueue.
A. Basic Python API for NonBlockingQueue.
B. Multithreaded test of NonBlockingQueue.
IV. ThreadPool.
V. ThreadMaster.
VI. OTS Scheduler.
VII. Bibliography
Downloads. Index. Contents.

Multithreaded test of NonBlockingQueue.


ython has GIL. Hence, we cannot do a fully multithreaded test using the API presented in the previous section. The native Python multiprocessor libraries rely on processes and serialization to do multi-CPU processing and communication between jobs. This is not what we want to do here. Therefore, we need to start working threads from C++. We add some code, see below.

1\#include <boost/python.hpp>

2\#include "PythonNonBlockingQueue.hpp"

3\#include "NonBlockingQueue.hpp"

4\#include <ots/otsConfig.hpp>

5\#include <ots/utils/toString.hpp>

6\#include <boost/function.hpp>

7\#include <boost/bind.hpp>

8\#include <ots/math/random/uniform.hpp>

9\

10\namespace ots { namespace scheduler { namespace testNonBlockingQueue {

11\

12\ template <class Data> class WrapperTmpl : boost::noncopyable

13\ {

14\ public:

15\ explicit WrapperTmpl(const Data& d) : theData(d) {}

16\ const Data theData;

17\ std::string toString() const

18\ {

19\ std::stringstream os;

20\ os<<theData<<'\0';

21\ return os.str();

22\ }

23\ };

24\

25\ typedef std::string Data;

26\ typedef WrapperTmpl<Data> Wrapper;

27\ typedef ots::scheduler::NonBlockingQueue<Wrapper> Queue;

28\ typedef Queue::Node Node;

29\

41\ std::pair<bool,std::string> wrapperToString( volatile Wrapper* w )

42\ {

43\ const Wrapper& ww=*const_cast<Wrapper*>(w);

44\ return std::pair<bool,std::string>(true,ww.toString());

45\ }

46\ std::string queueToString( const Queue& queue )

47\ {

48\ return queue.toString(wrapperToString);

49\ }

50\ std::string nodeToString( const Node& node )

51\ {

52\ return node.toString(wrapperToString);

53\ }

54\

55\ Node* makeNode( Wrapper& w )

56\ {

57\ return new Node(w);

58\ }

59\

60\ struct StaticQueue

61\ {

62\ Queue& queue()

63\ {

64\ static Queue q;

65\ return q;

66\ }

67\ };

68\

69\ class TestThread

70\ {

71\ private:

72\ boost::shared_ptr<std::vector<Node*> > theNodes;

73\ public:

74\ TestThread( const std::string& name, int N, int times )

75\ : theNodes(new std::vector<Node*>)

76\ {

77\ for( int i=0; i<N; ++i )

78\ {

79\ Data* d=new Data(name+boost::lexical_cast<std::string>(i));

80\ Wrapper* w=new Wrapper(*d);

81\ Node* n=new Node(*w);

82\ theNodes->push_back(n);

83\ }

84\ boost::thread(boost::bind(&TestThread::run,*this,times));

85\ }

86\ TestThread( const TestThread& th ) : theNodes(th.theNodes) {}

87\ TestThread& operator=( const TestThread& th ) { theNodes=th.theNodes; return *this; }

88\ void run( int times )

89\ {

90\ std::size_t N=theNodes->size();

91\ for( int i=0; i<times; ++i )

92\ {

93\ std::vector<Node*>::const_iterator e=theNodes->end();

94\ for( std::vector<Node*>::const_iterator j=theNodes->begin(); j!=e; ++j )

95\ {

96\ double u=ots::random::uniform();

97\ if( u<0.5)

98\ StaticQueue().queue().push(**j);

99\ }

100\ for( std::vector<Node*>::const_iterator j=theNodes->begin(); j!=e; ++j )

101\ {

102\ double u=ots::random::uniform();

103\ if( u<0.5 )

104\ (*j)->remove();

105\ }

106\ double u=ots::random::uniform();

107\ if( u<0.5 )

108\ ots::threading::SmallRandomDelay();

109\ }

110\ }

111\ };

112\

113\ void pythonNonBlockingQueue()

114\ {

115\ using namespace boost::python;

116\ def("simpleTest",&simpleTest);

117\ class_<Wrapper,boost::noncopyable>("Wrapper",init<std::string>())

118\ .def("__str__",&Wrapper::toString)

119\ ;

120\ class_<Queue,boost::noncopyable>("Queue")

121\ .def("__str__",&queueToString)

122\ .def("pop",

123\ &Queue::boostPythonPop,

124\ "Returning reference to something that was previously pushed.\n"

125\ "Relies on existence of the object\n",

126\ return_internal_reference<>()

127\ )

128\ .def("push",&Queue::push)

129\ ;

130\ class_<StaticQueue>("StaticQueue")

131\ .def("queue",

132\ &StaticQueue::queue,

133\ "returns a static Queue object",

134\ return_internal_reference<>()

135\ )

136\ ;

137\ class_<Node,boost::noncopyable>("Node")

138\ .def("__str__",&nodeToString)

139\ .def("remove",&Node::remove)

140\ ;

141\ def("makeNode",&makeNode,return_value_policy<manage_new_object>());

142\ class_<ots::threading::SmallRandomDelay,boost::noncopyable>("SmallRandomDelay")

143\ ;

144\ class_<TestThread>("TestThread",init<std::string,int,int>())

146\ ;

147\ }

148\

149\}}} //namespace ots,scheduler

The code is based on the previous example. The additions are explained below.

Lines 60-67. We need a static Queue to share among working threads. The introduction of StaticQueue class is a workaround a boost::python problem. For some reason, making queue() a stand-alone function does not go well with return_internal_reference<>() template, line 134.

Lines 69-111. This is the working thread. We plan to instantiate several. In the constructor 74-85 we create std::string,Wrapper,Node objects and store them in the std::vector<Node*>.

Line 72. We use shared_ptr because we want TestThread to have pass-by-value semantics implemented in lines 86,87 and used in the line 84.

Line 84. We start the working thread here. See boost::thread and boost::bind manuals for explanation of syntax.

Lines 88-109. We push and pop Nodes repeatedly with some intentionally random behavior. The ots::random::uniform() returns a simulated uniform [0,1] random variable.

Once we have a binary library "test.pyd" containing the above code we may start Python interpreter and perform the following session.

In [1]: import test

In [2]: test.TestThread("a",100,1000000)

Out[2]: <test.TestThread object at 0x01CBFAB0>

In [3]: test.TestThread("b",100,1000000)

Out[3]: <test.TestThread object at 0x0217EFC0>

In [4]: test.TestThread("c",100,1000000)

Out[4]: <test.TestThread object at 0x0217EF90>

In [5]: test.TestThread("d",100,1000000)

Out[5]: <test.TestThread object at 0x01D98540>

In [6]: test.TestThread("e",100,1000000)

Out[6]: <test.TestThread object at 0x0218D1B0>

In [7]: test.TestThread("f",100,1000000)

Out[7]: <test.TestThread object at 0x0218D240>

In [8]:

At this point there should be a visible workload on the CPU. The static Queue receives heavy flow of requests. We may simultaneously experiment with it.

In [8]: q=test.StaticQueue().queue()

In [9]: print q

<Output is removed for brevity>

In [10]: s1="TestString1"

In [11]: w1=test.Wrapper(s1)

In [12]: n1=test.makeNode(w1)

In [13]: print n1

Node( TestString1 )

In [14]: q.push(n1)

In [15]: print q

At this point the TestString1 should be visible among contents of the Queue.

We may also apply pop operation, since it is not tested in the C++ code:

In [16]: L=[q.pop() for x in range(0,100000)]

In [17]: print q

The pop() operation may or may not remove TestString1 from the Queue.





Downloads. Index. Contents.


















Copyright 2007