Multithreaded test of NonBlockingQueue.

ython has GIL. Hence, we cannot do a fully multithreaded test using the API presented in the previous section. The native Python multiprocessor libraries rely on processes and serialization to do multi-CPU processing and communication between jobs. This is not what we want to do here. Therefore, we need to start working threads from C++. We add some code, see below.

1\#include <boost/python.hpp>

2\#include "PythonNonBlockingQueue.hpp"

3\#include "NonBlockingQueue.hpp"

4\#include <ots/otsConfig.hpp>

5\#include <ots/utils/toString.hpp>

6\#include <boost/function.hpp>

7\#include <boost/bind.hpp>

8\#include <ots/math/random/uniform.hpp>


10\namespace ots { namespace scheduler { namespace testNonBlockingQueue {


12\ template <class Data> class WrapperTmpl : boost::noncopyable

13\ {

14\ public:

15\ explicit WrapperTmpl(const Data& d) : theData(d) {}

16\ const Data theData;

17\ std::string toString() const

18\ {

19\ std::stringstream os;

20\ os<<theData<<'\0';

21\ return os.str();

22\ }

23\ };


25\ typedef std::string Data;

26\ typedef WrapperTmpl<Data> Wrapper;

27\ typedef ots::scheduler::NonBlockingQueue<Wrapper> Queue;

28\ typedef Queue::Node Node;


41\ std::pair<bool,std::string> wrapperToString( volatile Wrapper* w )

42\ {

43\ const Wrapper& ww=*const_cast<Wrapper*>(w);

44\ return std::pair<bool,std::string>(true,ww.toString());

45\ }

46\ std::string queueToString( const Queue& queue )

47\ {

48\ return queue.toString(wrapperToString);

49\ }

50\ std::string nodeToString( const Node& node )

51\ {

52\ return node.toString(wrapperToString);

53\ }


55\ Node* makeNode( Wrapper& w )

56\ {

57\ return new Node(w);

58\ }


60\ struct StaticQueue

61\ {

62\ Queue& queue()

63\ {

64\ static Queue q;

65\ return q;

66\ }

67\ };


69\ class TestThread

70\ {

71\ private:

72\ boost::shared_ptr<std::vector<Node*> > theNodes;

73\ public:

74\ TestThread( const std::string& name, int N, int times )

75\ : theNodes(new std::vector<Node*>)

76\ {

77\ for( int i=0; i<N; ++i )

78\ {

79\ Data* d=new Data(name+boost::lexical_cast<std::string>(i));

80\ Wrapper* w=new Wrapper(*d);

81\ Node* n=new Node(*w);

82\ theNodes->push_back(n);

83\ }

84\ boost::thread(boost::bind(&TestThread::run,*this,times));

85\ }

86\ TestThread( const TestThread& th ) : theNodes(th.theNodes) {}

87\ TestThread& operator=( const TestThread& th ) { theNodes=th.theNodes; return *this; }

88\ void run( int times )

89\ {

90\ std::size_t N=theNodes->size();

91\ for( int i=0; i<times; ++i )

92\ {

93\ std::vector<Node*>::const_iterator e=theNodes->end();

94\ for( std::vector<Node*>::const_iterator j=theNodes->begin(); j!=e; ++j )

95\ {

96\ double u=ots::random::uniform();

97\ if( u<0.5)

98\ StaticQueue().queue().push(**j);

99\ }

100\ for( std::vector<Node*>::const_iterator j=theNodes->begin(); j!=e; ++j )

101\ {

102\ double u=ots::random::uniform();

103\ if( u<0.5 )

104\ (*j)->remove();

105\ }

106\ double u=ots::random::uniform();

107\ if( u<0.5 )

108\ ots::threading::SmallRandomDelay();

109\ }

110\ }

111\ };


113\ void pythonNonBlockingQueue()

114\ {

115\ using namespace boost::python;

116\ def("simpleTest",&simpleTest);

117\ class_<Wrapper,boost::noncopyable>("Wrapper",init<std::string>())

118\ .def("__str__",&Wrapper::toString)

119\ ;

120\ class_<Queue,boost::noncopyable>("Queue")

121\ .def("__str__",&queueToString)

122\ .def("pop",

123\ &Queue::boostPythonPop,

124\ "Returning reference to something that was previously pushed.\n"

125\ "Relies on existence of the object\n",

126\ return_internal_reference<>()

127\ )

128\ .def("push",&Queue::push)

129\ ;

130\ class_<StaticQueue>("StaticQueue")

131\ .def("queue",

132\ &StaticQueue::queue,

133\ "returns a static Queue object",

134\ return_internal_reference<>()

135\ )

136\ ;

137\ class_<Node,boost::noncopyable>("Node")

138\ .def("__str__",&nodeToString)

139\ .def("remove",&Node::remove)

140\ ;

141\ def("makeNode",&makeNode,return_value_policy<manage_new_object>());

142\ class_<ots::threading::SmallRandomDelay,boost::noncopyable>("SmallRandomDelay")

143\ ;

144\ class_<TestThread>("TestThread",init<std::string,int,int>())

146\ ;

147\ }


149\}}} //namespace ots,scheduler

The code is based on the previous example. The additions are explained below.

Lines 60-67. We need a static Queue to share among working threads. The introduction of StaticQueue class is a workaround a boost::python problem. For some reason, making queue() a stand-alone function does not go well with return_internal_reference<>() template, line 134.

Lines 69-111. This is the working thread. We plan to instantiate several. In the constructor 74-85 we create std::string,Wrapper,Node objects and store them in the std::vector<Node*>.

Line 72. We use shared_ptr because we want TestThread to have pass-by-value semantics implemented in lines 86,87 and used in the line 84.

Line 84. We start the working thread here. See boost::thread and boost::bind manuals for explanation of syntax.

Lines 88-109. We push and pop Nodes repeatedly with some intentionally random behavior. The ots::random::uniform() returns a simulated uniform [0,1] random variable.

Once we have a binary library "test.pyd" containing the above code we may start Python interpreter and perform the following session.

In [1]: import test

In [2]: test.TestThread("a",100,1000000)

Out[2]: <test.TestThread object at 0x01CBFAB0>

In [3]: test.TestThread("b",100,1000000)

Out[3]: <test.TestThread object at 0x0217EFC0>

In [4]: test.TestThread("c",100,1000000)

Out[4]: <test.TestThread object at 0x0217EF90>

In [5]: test.TestThread("d",100,1000000)

Out[5]: <test.TestThread object at 0x01D98540>

In [6]: test.TestThread("e",100,1000000)

Out[6]: <test.TestThread object at 0x0218D1B0>

In [7]: test.TestThread("f",100,1000000)

Out[7]: <test.TestThread object at 0x0218D240>

In [8]:

At this point there should be a visible workload on the CPU. The static Queue receives heavy flow of requests. We may simultaneously experiment with it.

In [8]: q=test.StaticQueue().queue()

In [9]: print q

<Output is removed for brevity>

In [10]: s1="TestString1"

In [11]: w1=test.Wrapper(s1)

In [12]: n1=test.makeNode(w1)

In [13]: print n1

Node( TestString1 )

In [14]: q.push(n1)

In [15]: print q

At this point the TestString1 should be visible among contents of the Queue.

We may also apply pop operation, since it is not tested in the C++ code:

In [16]: L=[q.pop() for x in range(0,100000)]

In [17]: print q

The pop() operation may or may not remove TestString1 from the Queue.

