I tried to implement Lmax in python .I tried to handle data in 4 processes
import disruptor import multiprocessing import random if __name__ == '__main__': cb = disruptor.CircularBuffer(5) def receiveWriter(): while(True): n = random.randint(5,20) cb.receive(n) def ReplicatorReader(): while(True): cb.replicator() def journalerReader(): while(True): cb.journaler() def unmarshallerReader(): while(True): cb.unmarshaller() def consumeReader(): while(True): print(cb.consume()) p1 = multiprocessing.Process(name="p1",target=ReplicatorReader) p1.start() p0 = multiprocessing.Process(name="p0",target=receiveWriter) p0.start() p1 = multiprocessing.Process(name="p1",target=ReplicatorReader) p1.start() p2 = multiprocessing.Process(name="p2",target=journalerReader) p2.start() p3 = multiprocessing.Process(name="p3",target=unmarshallerReader) p3.start() p4 = multiprocessing.Process(name="p4",target=consumeReader) p4.start()
but I get this Error in my code :
Traceback (most recent call last): File "<string>", line 1, in <module> File "<string>", line 1, in <module> File "C:Program FilesPython39libmultiprocessingspawn.py", line 116, in spawn_main File "C:Program FilesPython39libmultiprocessingspawn.py", line 116, in spawn_main exitcode = _main(fd, parent_sentinel) exitcode = _main(fd, parent_sentinel) File "C:Program FilesPython39libmultiprocessingspawn.py", line 126, in _main File "C:Program FilesPython39libmultiprocessingspawn.py", line 126, in _main self = reduction.pickle.load(from_parent) self = reduction.pickle.load(from_parent) AttributeError: Can't get attribute 'unmarshallerReader' on <module '__mp_main__' from 'd:\python\RunDisruptor.py'> AttributeError: Can't get attribute 'consumeReader' on <module '__mp_main__' from 'd:\python\RunDisruptor.py'>
Advertisement
Answer
Your first problem is that the target of a Process
call cannot be within the if __name__ == '__main__':
block. But:
As I mentioned in an earlier post of yours, the only way I see that you can share an instance of CircularBuffer
across multiple processess is to implement a managed class, which surprisingly is not all that difficult to do. But when you create a managed class and create an instance of that class, what you have is actually a proxy reference to the object. This has two implications:
- Each method call is more like a remote procedure call to a special server process created by the manager you will start up and therefore has more overhead than a local method call.
- If you print the reference, the class’s
__str__
method will not be called; you will be printing a representation of the proxy pointer. You should probably rename method__str__
to something likedump
and call that explicitly whenever you want a representation of the instance.
You should also explicitly wait for the completion of the processes you are creating so that the manager service does not shutdown prematurely, which means that each process should be assigned to a unique variable and have a unique name.
import disruptor import multiprocessing from multiprocessing.managers import BaseManager import random class CircularBufferManager(BaseManager): pass def receiveWriter(cb): while(True): n = random.randint(5,20) cb.receive(n) def ReplicatorReader(cb): while(True): cb.replicator() def journalerReader(cb): while(True): cb.journaler() def unmarshallerReader(cb): while(True): cb.unmarshaller() def consumeReader(cb): while(True): print(cb.consume()) if __name__ == '__main__': # Create managed class CircularBufferManager.register('CircularBuffer', disruptor.CircularBuffer) # create and start manager: with CircularBufferManager() as manager: cb = manager.CircularBuffer(5) p1 = multiprocessing.Process(name="p1", target=ReplicatorReader, args=(cb,)) p1.start() p0 = multiprocessing.Process(name="p0",target=receiveWriter, args=(cb,)) p0.start() p1a = multiprocessing.Process(name="p1a",target=ReplicatorReader, args=(cb,)) p1a.start() p2 = multiprocessing.Process(name="p2",target=journalerReader, args=(cb,)) p2.start() p3 = multiprocessing.Process(name="p3",target=unmarshallerReader, args=(cb,)) p3.start() p4 = multiprocessing.Process(name="p4",target=consumeReader, args=(cb,)) p4.start() p1.join() p0.join() p1a.join() p2.join() p3.join() p4.join()