Consider this MWE:
from multiprocessing import RLock class TheSetup: def __init__(self): self._hardware_lock = RLock() def hold_hardware(self): return self._hardware_lock def do_thing_with_hardware(self): with self._hardware_lock: print('Doing thing...') def do_other_thing_with_hardware(self): with self._hardware_lock: print('Doing other thing...') if __name__=='__main__': from multiprocessing.managers import BaseManager class TheSetupManager(BaseManager): pass the_setup = TheSetup() TheSetupManager.register('get_the_setup', callable=lambda:the_setup) m = TheSetupManager(address=('', 50000), authkey=b'abracadabra') s = m.get_server() print('Serving...') s.serve_forever()
When this script is executed, it instantiates the_setup
and serves it. Then I want clients to be able to do things like this from other scripts:
from multiprocessing.managers import BaseManager class TheSetup(BaseManager): pass TheSetup.register('get_the_setup') m = TheSetup(address=('', 50000), authkey=b'abracadabra') m.connect() the_setup = m.get_the_setup() with the_setup.hold_hardware(): # Removing this `with` it works fine, but I cannot guarantee to the client that nobody else will use this hardware in the meantime. the_setup.do_thing_with_hardware() the_setup.do_other_thing_with_hardware()
However, I get RuntimeError: RLock objects should only be shared between processes through inheritance
. If the with the_setup.hold_hardware():
is removed, it works fine but then I cannot guarantee that the hardware wasn’t used by someone else in the middle.
Is it possible to do what I want? I.e. having the_setup
running 24/7 and allowing interaction with it at any time from other Python instances. How?
Advertisement
Answer
Update : I have included a patch for multiprocessing.managers
to assimilate RLocks
seamlessly within. Scroll below to the next section.
Multiprocessing does not like it when you pass objects used for synchronization inside other such objects. That means you cannot put semaphores, locks, queues, pipes inside other queues and pipes (those offered through multiprocessing library). When you create a manager using BaseManager
, it uses pipes internally to establish communication between different processes and the manager process. So when you do the_setup.hold_hardware()
, you are essentially attempting to pass an Rlock
through a pipe, which as discussed, does not work.
Why workarounds don’t work
The most simplest fix one would think of using here would be to create a manager and use manager.Rlock
. This uses threading.Rlock
instead of the one available through multiprocessing (and therefore can be shared using queues/pipes), but works in a multiprocessing environment because access to it is synchronized though pipes (again, it’s using a manager).
Hence, this code should at least execute:
server.py
from multiprocessing.managers import SyncManager . . . def __init__(self): manager = SyncManager(authkey=b'same') manager.start() self._hardware_lock = manager.RLock()
client.py
import multiprocessing import os from multiprocessing.managers import BaseManager class TheSetup(BaseManager): pass TheSetup.register('get_the_setup') m = TheSetup(address=('localhost', 50000), authkey=b'abracadabra') m.connect() multiprocessing.current_process().authkey = b'same' # Need to setup proper authentication the_setup = m.get_the_setup() print(the_setup) with the_setup.hold_hardware(): # Removing this `with` it works fine, but I cannot guarantee to the client that nobody else will use this hardware in the meantime. print('in') the_setup.do_thing_with_hardware() the_setup.do_other_thing_with_hardware()
Note that we need to set the manager and the client’s authkey to the same value otherwise there will be an authentication error when attempting to unpickle the lock.
But regardless, even though the code will run, it won’t do what you think it should do. It will block when trying to run the_setup.do_thing_with_hardware()
. This is because the Rlock
is actually inside manager process, and when you get the lock using with the_setup.hold_hardware()
you are actually getting a proxy of the lock instead (try doing print(type(the_setup.hold_hardware()))
). When you attempt to acquire the lock using the proxy, the appropriate function name is sent to the manager process and is executed on the managed object via threading. This defeats the whole purpose of Rlock
and the implementation of Rlock
inside managers is useless.
Patching multiprocessing to work with RLocks
If you really want to use RLocks inside managers, then you will need to make your own managers and proxies, by subclassing BaseManager
and BaseProxy
, to relay process identifiers (like pids) which can then be used to create RLocks. Consider the below “patch”:
import time import os, sys from multiprocessing.managers import BaseProxy, BaseManager, Server, Token, RebuildProxy from multiprocessing import process from multiprocessing.context import get_spawning_popen, ProcessError from _thread import allocate_lock from traceback import format_exc from multiprocessing.managers import util, dispatch, convert_to_error, listener_client, public_methods, State import threading class PIDProxy(BaseProxy): def _callmethod(self, methodname, args=(), kwds={}): try: conn = self._tls.connection except AttributeError: util.debug('thread %r does not own a connection', threading.current_thread().name) self._connect() conn = self._tls.connection conn.send((self._id, methodname, args, kwds, str(os.getpid()))) kind, result = conn.recv() if kind == '#RETURN': return result elif kind == '#PROXY': exposed, token = result proxytype = self._manager._registry[token.typeid][-1] token.address = self._token.address proxy = proxytype( token, self._serializer, manager=self._manager, authkey=self._authkey, exposed=exposed ) conn = self._Client(token.address, authkey=self._authkey) dispatch(conn, None, 'decref', (token.id,)) return proxy raise convert_to_error(kind, result) def __reduce__(self): kwds = {} if get_spawning_popen() is not None: kwds['authkey'] = self._authkey if getattr(self, '_isauto', False): kwds['exposed'] = self._exposed_ return (RebuildProxy, (MyAutoProxy, self._token, self._serializer, kwds)) else: return (RebuildProxy, (type(self), self._token, self._serializer, kwds)) class PIDServer(Server): def serve_client(self, conn): global pid_registry util.debug('starting server thread to service %r', threading.current_thread().name) recv = conn.recv send = conn.send id_to_obj = self.id_to_obj while not self.stop_event.is_set(): try: methodname = obj = None request = recv() ident, methodname, args, kwds, pid = request try: obj, exposed, gettypeid = id_to_obj[ident] except KeyError as ke: try: obj, exposed, gettypeid = self.id_to_local_proxy_obj[ident] except KeyError: raise ke if methodname not in exposed: raise AttributeError( 'method %r of %r object is not in exposed=%r' % (methodname, type(obj), exposed) ) function = getattr(obj, methodname) try: pid_registry.forwarded_for = pid res = function(*args, **kwds) except Exception as e: msg = ('#ERROR', e) else: typeid = gettypeid and gettypeid.get(methodname, None) if typeid: rident, rexposed = self.create(conn, typeid, res) token = Token(typeid, self.address, rident) msg = ('#PROXY', (rexposed, token)) else: msg = ('#RETURN', res) except AttributeError: if methodname is None: msg = ('#TRACEBACK', format_exc()) else: try: fallback_func = self.fallback_mapping[methodname] result = fallback_func( self, conn, ident, obj, *args, **kwds ) msg = ('#RETURN', result) except Exception: msg = ('#TRACEBACK', format_exc()) except EOFError: util.debug('got EOF -- exiting thread serving %r', threading.current_thread().name) sys.exit(0) except Exception: msg = ('#TRACEBACK', format_exc()) try: try: send(msg) except Exception: send(('#UNSERIALIZABLE', format_exc())) except Exception as e: util.info('exception in thread serving %r', threading.current_thread().name) util.info(' ... message was %r', msg) util.info(' ... exception was %r', e) conn.close() sys.exit(1) class ForwardPIDManager(BaseManager): _Server = PIDServer def get_server(self): if self._state.value != State.INITIAL: if self._state.value == State.STARTED: raise ProcessError("Already started server") elif self._state.value == State.SHUTDOWN: raise ProcessError("Manager has shut down") else: raise ProcessError( "Unknown state {!r}".format(self._state.value)) return self._Server(self._registry, self._address, self._authkey, self._serializer) def MyMakeProxyType(name, exposed, _cache={}): ''' Return a proxy type whose methods are given by `exposed` ''' exposed = tuple(exposed) try: return _cache[(name, exposed)] except KeyError: pass dic = {} for meth in exposed: exec('''def %s(self, /, *args, **kwds): return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) ProxyType = type(name, (PIDProxy,), dic) ProxyType._exposed_ = exposed _cache[(name, exposed)] = ProxyType return ProxyType def MyAutoProxy(token, serializer, manager=None, authkey=None, exposed=None, incref=True, manager_owned=False): _Client = listener_client[serializer][1] if exposed is None: conn = _Client(token.address, authkey=authkey) try: exposed = dispatch(conn, None, 'get_methods', (token,)) finally: conn.close() if authkey is None and manager is not None: authkey = manager._authkey if authkey is None: authkey = process.current_process().authkey ProxyType = MyMakeProxyType('AutoProxy[%s]' % token.typeid, exposed) proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, incref=incref, manager_owned=manager_owned) proxy._isauto = True return proxy def init(): global pid_registry pid_registry = threading.local()
Over here, PIDProxy
is identical to BaseProxy
, except for the fact that apart from only sending the method name and arguments to be called on the managed object, it sends an additional identifier with the value str(os.getpid())
as well. Similarly, ForwardPIDManager
is identical to BaseManager
, except for the fact the it uses a modified subclass of multiprocessing.managers.Server
rather than the default parent, to start the server process. PIDServer
modifies it’s parent to accept an extra variable (the process identifier) when unpacking requests from proxies and stores it’s value inside the current thread’s local storage (created after executing init
inside the server process; further reading about thread local storage here). This happens before the requested function is executed, meaning that all methods of the managed object will have access to this storage. Lastly, MyAutoProxy
and MyMakeProxyType
override the default methods to create proxies using our subclasses instead.
All you need to do now is to use ForwardPIDManager
instead of BaseManager
, and specify the proxytype
kwarg explicitly as MyAutoProxy
. All managed objects will then have access to the pid of the process that called the function by doing
global pid_registry pid = pid_registry.forwarded_for
from within the functions on the managed object.
Creating RLock
Using this, we can create our own implementation of RLocks, which closely mirrors that of threading.RLock
, but uses this pid_registry.forwarded_for
to verify owners instead of threading.get_ident
:
class ManagerRLock: def __init__(self): self._block = allocate_lock() self._owner = None self._count = 0 def acquire(self, blocking=True, timeout=-1): global pid_registry process = pid_registry.forwarded_for if self._owner == process: self._count += 1 return True rc = self._block.acquire(blocking, timeout) if rc: self._owner = process self._count = 1 return rc def release(self): global pid_registry process = pid_registry.forwarded_for if self._owner != process: raise RuntimeError("cannot release un-acquired lock") self._count = count = self._count - 1 if not count: self._owner = None self._block.release() def __repr__(self): owner = self._owner return "<%s %s.%s object owner=%r count=%d at %s>" % ( "locked" if self._block.locked() else "unlocked", self.__class__.__module__, self.__class__.__qualname__, owner, self._count, hex(id(self)) ) def __enter__(self): return self.acquire() def __exit__(self, exc_type, exc_val, exc_tb): return self.release()
Keep in mind that objects of ManagerRLock
are not picklable, and therefore should not be passed around by value from manager to proxies. You can, however, expose it’s methods (acquire
, release
) to proxies without risk (example in the below section). Also, these are meant to be created directly, therefore do not use another manager to create them.
Example implementation
Using our patch and ManagerRLock
, our implementation of TheSetup
class becomes like this:
class TheSetup: def __init__(self): self._hardware_lock = None def initialize_lock(self): self._hardware_lock = ManagerRLock() def hold_hardware(self): self._hardware_lock.acquire() def release_hardware(self): self._hardware_lock.release() def do_thing_with_hardware(self): with self._hardware_lock: print('Doing thing...') # Mimic work being done, helpful for testing locks time.sleep(7) print('done') def do_other_thing_with_hardware(self): with self._hardware_lock: print('Doing other thing...') def return_(self): return self if __name__ == '__main__': # Execute init here since this the main module is the service process init() # You can initialize lock in the constructor itself, this is only here for consistency between # example implementations the_setup = TheSetup() the_setup.initialize_lock() ForwardPIDManager.register('get_the_setup', lambda: the_setup, MyAutoProxy) m = ForwardPIDManager(address=('localhost', 50000), authkey=b'abracadabra') s = m.get_server() print('Serving...') s.serve_forever()
One important thing to notice here is that you are not exposing the whole lock, only its methods (look at hold_hardware
and release_hardware
). This is important because ManagerRLock
just like multiprocessing.RLock
, is unpicklable. One side effect of this is that you can’t directly use context managers for the lock and you would have to do
the_setup.hold_hardware() . . . the_setup.release_hardware()
inside client.py instead of using a with block. However, if this bothers you can create a wrapper for the lock from within the client and use that instead (example given in the next section).
Lastly, also notice how we call init
from inside the main module itself. This is because init
must be executed in the server process, and since we are retrieving the server and calling serve_forever
inside the main process itself, we must execute init
there too. If instead you are using the .start()
method of managers to create the server in another process, here is the equivalent code:
if __name__ == '__main__': the_setup = TheSetup() # We cannot use lambda since that's not picklable ForwardPIDManager.register('get_the_setup', callable=the_setup.return_, proxytype=MyAutoProxy) m = ForwardPIDManager(address=('localhost', 50000), authkey=b'abracadabra') # Start the server in another process, and tell it to run init as soon as it starts m.start(initializer=init) # Initialize lock after transferring the instance data to the manager process (ManagerRLock is not picklable) proxy_setup = m.get_the_setup() proxy_setup.initialize_lock() # Wait for the server process to quit, only for demonstration, do not do this is in real code! m._process.join()
Final solution
Combine the whole patch provided above (classes PIDProxy
, PIDServer
, ForwardPIDManager
, and functions init
, MyMakeProxyType
, MyAutoProxy
), the ManagerRLock
class, and code related to your implementation (class TheSetup
along with the if __name__...
block) into one single server.py file. Then, an example client.py which uses the RLock can be like below:
client.py
from server import ForwardPIDManager, MyAutoProxy class TheSetup(ForwardPIDManager): pass class LockWrapper: def __init__(self, proxy): self.proxy = proxy def __enter__(self): return self.proxy.hold_hardware() def __exit__(self, exc_type, exc_val, exc_tb): return self.proxy.release_hardware() TheSetup.register('get_the_setup', proxytype=MyAutoProxy) m = TheSetup(address=('localhost', 50000), authkey=b'abracadabra') m.connect() the_setup = m.get_the_setup() the_setup_lock = LockWrapper(the_setup) with the_setup_lock: the_setup.do_thing_with_hardware() the_setup.do_other_thing_with_hardware()
Notice the use of LockWrapper
as a way to use the lock with a context manager.
A word about threads…
The solution relies on using threading.local
, therefore, trying to access pid_registry.forwarded_for
from inside another thread would fail. Hence, if you are using threading inside your shared class, then make sure you explicitly pass the pid
to thread upon starting.
Additionally, ManagerRLock
expects single-threaded processes (processes can be more than one) to access it. This means that if you are running multiple processes, which each are also multi-threaded, then using ManagerRLock
might be unsafe (untested). However, if this is the case then you can trivially extend the patch by passing not only the pid, but the thread identifer as well (from inside PIDProxy
) and store this inside pid_registry
(from inside PIDServer
). Then you will have access to the thread as well as the pid which sent the request to the manager inside ManagerRLock
, and you can then decide the current owner of the lock based on both these variables.
Edit: A quick note about proxies, if you want to use your own (and not use MyAutoProxy
), then you can do so, just make sure that the proxy subclasses PIDProxy
.