I tried to implement Lmax in python .I tried to handle data in 4 processes
import disruptor
import multiprocessing
import random
if __name__ == '__main__':
cb = disruptor.CircularBuffer(5)
def receiveWriter():
while(True):
n = random.randint(5,20)
cb.receive(n)
def ReplicatorReader():
while(True):
cb.replicator()
def journalerReader():
while(True):
cb.journaler()
def unmarshallerReader():
while(True):
cb.unmarshaller()
def consumeReader():
while(True):
print(cb.consume())
p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
p1.start()
p0 = multiprocessing.Process(name="p0",target=receiveWriter)
p0.start()
p1 = multiprocessing.Process(name="p1",target=ReplicatorReader)
p1.start()
p2 = multiprocessing.Process(name="p2",target=journalerReader)
p2.start()
p3 = multiprocessing.Process(name="p3",target=unmarshallerReader)
p3.start()
p4 = multiprocessing.Process(name="p4",target=consumeReader)
p4.start()
but I get this Error in my code :
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "<string>", line 1, in <module>
File "C:Program FilesPython39libmultiprocessingspawn.py", line 116, in spawn_main
File "C:Program FilesPython39libmultiprocessingspawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
exitcode = _main(fd, parent_sentinel)
File "C:Program FilesPython39libmultiprocessingspawn.py", line 126, in _main
File "C:Program FilesPython39libmultiprocessingspawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'unmarshallerReader' on <module '__mp_main__' from 'd:\python\RunDisruptor.py'>
AttributeError: Can't get attribute 'consumeReader' on <module '__mp_main__' from 'd:\python\RunDisruptor.py'>
Advertisement
Answer
Your first problem is that the target of a Process
call cannot be within the if __name__ == '__main__':
block. But:
As I mentioned in an earlier post of yours, the only way I see that you can share an instance of CircularBuffer
across multiple processess is to implement a managed class, which surprisingly is not all that difficult to do. But when you create a managed class and create an instance of that class, what you have is actually a proxy reference to the object. This has two implications:
- Each method call is more like a remote procedure call to a special server process created by the manager you will start up and therefore has more overhead than a local method call.
- If you print the reference, the class’s
__str__
method will not be called; you will be printing a representation of the proxy pointer. You should probably rename method__str__
to something likedump
and call that explicitly whenever you want a representation of the instance.
You should also explicitly wait for the completion of the processes you are creating so that the manager service does not shutdown prematurely, which means that each process should be assigned to a unique variable and have a unique name.
import disruptor
import multiprocessing
from multiprocessing.managers import BaseManager
import random
class CircularBufferManager(BaseManager):
pass
def receiveWriter(cb):
while(True):
n = random.randint(5,20)
cb.receive(n)
def ReplicatorReader(cb):
while(True):
cb.replicator()
def journalerReader(cb):
while(True):
cb.journaler()
def unmarshallerReader(cb):
while(True):
cb.unmarshaller()
def consumeReader(cb):
while(True):
print(cb.consume())
if __name__ == '__main__':
# Create managed class
CircularBufferManager.register('CircularBuffer', disruptor.CircularBuffer)
# create and start manager:
with CircularBufferManager() as manager:
cb = manager.CircularBuffer(5)
p1 = multiprocessing.Process(name="p1", target=ReplicatorReader, args=(cb,))
p1.start()
p0 = multiprocessing.Process(name="p0",target=receiveWriter, args=(cb,))
p0.start()
p1a = multiprocessing.Process(name="p1a",target=ReplicatorReader, args=(cb,))
p1a.start()
p2 = multiprocessing.Process(name="p2",target=journalerReader, args=(cb,))
p2.start()
p3 = multiprocessing.Process(name="p3",target=unmarshallerReader, args=(cb,))
p3.start()
p4 = multiprocessing.Process(name="p4",target=consumeReader, args=(cb,))
p4.start()
p1.join()
p0.join()
p1a.join()
p2.join()
p3.join()
p4.join()