ython has GIL. Hence, we cannot do a fully multithreaded test using the API
presented in the previous section. The native Python multiprocessor libraries
rely on processes and serialization to do multi-CPU processing and
communication between jobs. This is not what we want to do here. Therefore, we
need to start working threads from C++. We add some code, see below.
1\#include <boost/python.hpp>
2\#include "PythonNonBlockingQueue.hpp"
3\#include "NonBlockingQueue.hpp"
4\#include <ots/otsConfig.hpp>
5\#include <ots/utils/toString.hpp>
6\#include <boost/function.hpp>
7\#include <boost/bind.hpp>
8\#include <ots/math/random/uniform.hpp>
9\
10\namespace ots { namespace scheduler { namespace
testNonBlockingQueue {
11\
12\
template
<class Data> class WrapperTmpl : boost::noncopyable
13\
{
14\
public:
15\
explicit
WrapperTmpl(const Data& d) : theData(d) {}
16\
const
Data theData;
17\
std::string
toString() const
18\
{
19\
std::stringstream
os;
20\
os<<theData<<'\0';
21\
return
os.str();
22\
}
23\
};
24\
25\
typedef
std::string Data;
26\
typedef
WrapperTmpl<Data> Wrapper;
27\
typedef
ots::scheduler::NonBlockingQueue<Wrapper> Queue;
28\
typedef
Queue::Node Node;
29\
41\
std::pair<bool,std::string>
wrapperToString( volatile Wrapper* w )
42\
{
43\
const
Wrapper& ww=*const_cast<Wrapper*>(w);
44\
return
std::pair<bool,std::string>(true,ww.toString());
45\
}
46\
std::string
queueToString( const Queue& queue )
47\
{
48\
return
queue.toString(wrapperToString);
49\
}
50\
std::string
nodeToString( const Node& node )
51\
{
52\
return
node.toString(wrapperToString);
53\
}
54\
55\
Node*
makeNode( Wrapper& w )
56\
{
57\
return
new Node(w);
58\
}
59\
60\
struct
StaticQueue
61\
{
62\
Queue&
queue()
63\
{
64\
static
Queue q;
65\
return
q;
66\
}
67\
};
68\
69\
class
TestThread
70\
{
71\
private:
72\
boost::shared_ptr<std::vector<Node*>
> theNodes;
73\
public:
74\
TestThread(
const std::string& name, int N, int times )
75\
:
theNodes(new std::vector<Node*>)
76\
{
77\
for(
int i=0; i<N; ++i )
78\
{
79\
Data*
d=new Data(name+boost::lexical_cast<std::string>(i));
80\
Wrapper*
w=new Wrapper(*d);
81\
Node*
n=new Node(*w);
82\
theNodes->push_back(n);
83\
}
84\
boost::thread(boost::bind(&TestThread::run,*this,times));
85\
}
86\
TestThread(
const TestThread& th ) : theNodes(th.theNodes) {}
87\
TestThread&
operator=( const TestThread& th ) { theNodes=th.theNodes; return *this;
}
88\
void
run( int times )
89\
{
90\
std::size_t
N=theNodes->size();
91\
for(
int i=0; i<times; ++i )
92\
{
93\
std::vector<Node*>::const_iterator
e=theNodes->end();
94\
for(
std::vector<Node*>::const_iterator j=theNodes->begin(); j!=e; ++j
)
95\
{
96\
double
u=ots::random::uniform();
97\
if(
u<0.5)
98\
StaticQueue().queue().push(**j);
99\
}
100\
for(
std::vector<Node*>::const_iterator j=theNodes->begin(); j!=e; ++j
)
101\
{
102\
double
u=ots::random::uniform();
103\
if(
u<0.5 )
104\
(*j)->remove();
105\
}
106\
double
u=ots::random::uniform();
107\
if(
u<0.5 )
108\
ots::threading::SmallRandomDelay();
109\
}
110\
}
111\
};
112\
113\
void
pythonNonBlockingQueue()
114\
{
115\
using
namespace boost::python;
116\
def("simpleTest",&simpleTest);
117\
class_<Wrapper,boost::noncopyable>("Wrapper",init<std::string>())
118\
.def("__str__",&Wrapper::toString)
119\
;
120\
class_<Queue,boost::noncopyable>("Queue")
121\
.def("__str__",&queueToString)
122\
.def("pop",
123\
&Queue::boostPythonPop,
124\
"Returning
reference to something that was previously pushed.\n"
125\
"Relies
on existence of the object\n",
126\
return_internal_reference<>()
127\
)
128\
.def("push",&Queue::push)
129\
;
130\
class_<StaticQueue>("StaticQueue")
131\
.def("queue",
132\
&StaticQueue::queue,
133\
"returns
a static Queue object",
134\
return_internal_reference<>()
135\
)
136\
;
137\
class_<Node,boost::noncopyable>("Node")
138\
.def("__str__",&nodeToString)
139\
.def("remove",&Node::remove)
140\
;
141\
def("makeNode",&makeNode,return_value_policy<manage_new_object>());
142\
class_<ots::threading::SmallRandomDelay,boost::noncopyable>("SmallRandomDelay")
143\
;
144\
class_<TestThread>("TestThread",init<std::string,int,int>())
146\
;
147\
}
148\
149\}}} //namespace ots,scheduler
The code is based on the previous example. The additions are explained below.
Lines 60-67. We need a static Queue to share among working threads. The
introduction of StaticQueue class is a workaround a boost::python problem. For
some reason, making queue() a stand-alone function does not go well with
return_internal_reference<>() template, line 134.
Lines 69-111. This is the working thread. We plan to instantiate several. In
the constructor 74-85 we create std::string,Wrapper,Node objects and store
them in the std::vector<Node*>.
Line 72. We use shared_ptr because we want TestThread to have pass-by-value
semantics implemented in lines 86,87 and used in the line 84.
Line 84. We start the working thread here. See boost::thread and boost::bind
manuals for explanation of syntax.
Lines 88-109. We push and pop Nodes repeatedly with some intentionally random
behavior. The ots::random::uniform() returns a simulated uniform [0,1] random
variable.
Once we have a binary library "test.pyd" containing the above code we may
start Python interpreter and perform the following session.
In [1]: import test
In [2]: test.TestThread("a",100,1000000)
Out[2]: <test.TestThread object at
0x01CBFAB0>
In [3]: test.TestThread("b",100,1000000)
Out[3]: <test.TestThread object at
0x0217EFC0>
In [4]: test.TestThread("c",100,1000000)
Out[4]: <test.TestThread object at
0x0217EF90>
In [5]: test.TestThread("d",100,1000000)
Out[5]: <test.TestThread object at
0x01D98540>
In [6]: test.TestThread("e",100,1000000)
Out[6]: <test.TestThread object at
0x0218D1B0>
In [7]: test.TestThread("f",100,1000000)
Out[7]: <test.TestThread object at
0x0218D240>
In [8]:
At this point there should be a visible workload on the CPU. The static Queue
receives heavy flow of requests. We may simultaneously experiment with it.
In [8]: q=test.StaticQueue().queue()
In [9]: print q
<Output is removed for brevity>
In [10]: s1="TestString1"
In [11]: w1=test.Wrapper(s1)
In [12]: n1=test.makeNode(w1)
In [13]: print n1
Node( TestString1 )
In [14]: q.push(n1)
In [15]: print q
At this point the TestString1 should be visible among contents of the Queue.
We may also apply pop operation, since it is not tested in the C++ code:
In [16]: L=[q.pop() for x in range(0,100000)]
In [17]: print q
The pop() operation may or may not remove TestString1 from the Queue.
|