Skip to content
Advertisement

Share RLock between multiple instances of Python with multiprocessing

Consider this MWE:

from multiprocessing import RLock

class TheSetup:
    def __init__(self):
        self._hardware_lock = RLock()
    
    def hold_hardware(self):
        return self._hardware_lock
    
    def do_thing_with_hardware(self):
        with self._hardware_lock:
            print('Doing thing...')
    
    def do_other_thing_with_hardware(self):
        with self._hardware_lock:
            print('Doing other thing...')

if __name__=='__main__':
    from multiprocessing.managers import BaseManager
    
    class TheSetupManager(BaseManager):
        pass
    
    the_setup = TheSetup()
    
    TheSetupManager.register('get_the_setup', callable=lambda:the_setup)
    m = TheSetupManager(address=('', 50000), authkey=b'abracadabra')
    s = m.get_server()
    print('Serving...')
    s.serve_forever()

When this script is executed, it instantiates the_setup and serves it. Then I want clients to be able to do things like this from other scripts:

from multiprocessing.managers import BaseManager

class TheSetup(BaseManager):
    pass

TheSetup.register('get_the_setup')
m = TheSetup(address=('', 50000), authkey=b'abracadabra')
m.connect()
the_setup = m.get_the_setup()

with the_setup.hold_hardware(): # Removing this `with` it works fine, but I cannot guarantee to the client that nobody else will use this hardware in the meantime.
    the_setup.do_thing_with_hardware()
    the_setup.do_other_thing_with_hardware()

However, I get RuntimeError: RLock objects should only be shared between processes through inheritance. If the with the_setup.hold_hardware(): is removed, it works fine but then I cannot guarantee that the hardware wasn’t used by someone else in the middle.

Is it possible to do what I want? I.e. having the_setup running 24/7 and allowing interaction with it at any time from other Python instances. How?

Advertisement

Answer

Update : I have included a patch for multiprocessing.managers to assimilate RLocks seamlessly within. Scroll below to the next section.

Multiprocessing does not like it when you pass objects used for synchronization inside other such objects. That means you cannot put semaphores, locks, queues, pipes inside other queues and pipes (those offered through multiprocessing library). When you create a manager using BaseManager, it uses pipes internally to establish communication between different processes and the manager process. So when you do the_setup.hold_hardware(), you are essentially attempting to pass an Rlock through a pipe, which as discussed, does not work.

Why workarounds don’t work

The most simplest fix one would think of using here would be to create a manager and use manager.Rlock. This uses threading.Rlock instead of the one available through multiprocessing (and therefore can be shared using queues/pipes), but works in a multiprocessing environment because access to it is synchronized though pipes (again, it’s using a manager).

Hence, this code should at least execute:

server.py

from multiprocessing.managers import SyncManager
.
.
.
def __init__(self):
    manager = SyncManager(authkey=b'same')
    manager.start()
    self._hardware_lock = manager.RLock()

client.py

import multiprocessing
import os
from multiprocessing.managers import BaseManager

class TheSetup(BaseManager):
    pass

TheSetup.register('get_the_setup')
m = TheSetup(address=('localhost', 50000), authkey=b'abracadabra')
m.connect()

multiprocessing.current_process().authkey = b'same'  # Need to setup proper authentication

the_setup = m.get_the_setup()
print(the_setup)
with the_setup.hold_hardware(): # Removing this `with` it works fine, but I cannot guarantee to the client that nobody else will use this hardware in the meantime.
    print('in')
    the_setup.do_thing_with_hardware()
    the_setup.do_other_thing_with_hardware()

Note that we need to set the manager and the client’s authkey to the same value otherwise there will be an authentication error when attempting to unpickle the lock.

But regardless, even though the code will run, it won’t do what you think it should do. It will block when trying to run the_setup.do_thing_with_hardware(). This is because the Rlock is actually inside manager process, and when you get the lock using with the_setup.hold_hardware() you are actually getting a proxy of the lock instead (try doing print(type(the_setup.hold_hardware())) ). When you attempt to acquire the lock using the proxy, the appropriate function name is sent to the manager process and is executed on the managed object via threading. This defeats the whole purpose of Rlock and the implementation of Rlock inside managers is useless.

Patching multiprocessing to work with RLocks

If you really want to use RLocks inside managers, then you will need to make your own managers and proxies, by subclassing BaseManager and BaseProxy, to relay process identifiers (like pids) which can then be used to create RLocks. Consider the below “patch”:

import time
import os, sys
from multiprocessing.managers import BaseProxy, BaseManager, Server, Token, RebuildProxy
from multiprocessing import process
from multiprocessing.context import get_spawning_popen, ProcessError
from _thread import allocate_lock
from traceback import format_exc

from multiprocessing.managers import util, dispatch, convert_to_error, listener_client, public_methods, State
import threading


class PIDProxy(BaseProxy):


    def _callmethod(self, methodname, args=(), kwds={}):

        try:
            conn = self._tls.connection
        except AttributeError:
            util.debug('thread %r does not own a connection',
                       threading.current_thread().name)
            self._connect()
            conn = self._tls.connection

        conn.send((self._id, methodname, args, kwds, str(os.getpid())))
        kind, result = conn.recv()

        if kind == '#RETURN':
            return result
        elif kind == '#PROXY':
            exposed, token = result
            proxytype = self._manager._registry[token.typeid][-1]
            token.address = self._token.address
            proxy = proxytype(
                token, self._serializer, manager=self._manager,
                authkey=self._authkey, exposed=exposed
            )
            conn = self._Client(token.address, authkey=self._authkey)
            dispatch(conn, None, 'decref', (token.id,))
            return proxy
        raise convert_to_error(kind, result)

    def __reduce__(self):
        kwds = {}
        if get_spawning_popen() is not None:
            kwds['authkey'] = self._authkey

        if getattr(self, '_isauto', False):
            kwds['exposed'] = self._exposed_
            return (RebuildProxy,
                    (MyAutoProxy, self._token, self._serializer, kwds))
        else:
            return (RebuildProxy,
                    (type(self), self._token, self._serializer, kwds))


class PIDServer(Server):

    def serve_client(self, conn):

        global pid_registry
        util.debug('starting server thread to service %r',
                   threading.current_thread().name)

        recv = conn.recv
        send = conn.send
        id_to_obj = self.id_to_obj

        while not self.stop_event.is_set():

            try:
                methodname = obj = None
                request = recv()
                ident, methodname, args, kwds, pid = request
                try:
                    obj, exposed, gettypeid = id_to_obj[ident]
                except KeyError as ke:
                    try:
                        obj, exposed, gettypeid = 
                            self.id_to_local_proxy_obj[ident]
                    except KeyError:
                        raise ke

                if methodname not in exposed:
                    raise AttributeError(
                        'method %r of %r object is not in exposed=%r' %
                        (methodname, type(obj), exposed)
                        )

                function = getattr(obj, methodname)

                try:
                    pid_registry.forwarded_for = pid
                    res = function(*args, **kwds)
                except Exception as e:
                    msg = ('#ERROR', e)
                else:
                    typeid = gettypeid and gettypeid.get(methodname, None)
                    if typeid:
                        rident, rexposed = self.create(conn, typeid, res)
                        token = Token(typeid, self.address, rident)
                        msg = ('#PROXY', (rexposed, token))
                    else:
                        msg = ('#RETURN', res)

            except AttributeError:
                if methodname is None:
                    msg = ('#TRACEBACK', format_exc())
                else:
                    try:
                        fallback_func = self.fallback_mapping[methodname]
                        result = fallback_func(
                            self, conn, ident, obj, *args, **kwds
                            )
                        msg = ('#RETURN', result)
                    except Exception:
                        msg = ('#TRACEBACK', format_exc())

            except EOFError:
                util.debug('got EOF -- exiting thread serving %r',
                           threading.current_thread().name)
                sys.exit(0)

            except Exception:
                msg = ('#TRACEBACK', format_exc())

            try:
                try:
                    send(msg)
                except Exception:
                    send(('#UNSERIALIZABLE', format_exc()))
            except Exception as e:
                util.info('exception in thread serving %r',
                        threading.current_thread().name)
                util.info(' ... message was %r', msg)
                util.info(' ... exception was %r', e)
                conn.close()
                sys.exit(1)


class ForwardPIDManager(BaseManager):
    _Server = PIDServer

    def get_server(self):

        if self._state.value != State.INITIAL:
            if self._state.value == State.STARTED:
                raise ProcessError("Already started server")
            elif self._state.value == State.SHUTDOWN:
                raise ProcessError("Manager has shut down")
            else:
                raise ProcessError(
                    "Unknown state {!r}".format(self._state.value))
        return self._Server(self._registry, self._address,
                      self._authkey, self._serializer)


def MyMakeProxyType(name, exposed, _cache={}):
    '''
    Return a proxy type whose methods are given by `exposed`
    '''
    exposed = tuple(exposed)
    try:
        return _cache[(name, exposed)]
    except KeyError:
        pass

    dic = {}

    for meth in exposed:
        exec('''def %s(self, /, *args, **kwds):
        return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)

    ProxyType = type(name, (PIDProxy,), dic)
    ProxyType._exposed_ = exposed
    _cache[(name, exposed)] = ProxyType
    return ProxyType


def MyAutoProxy(token, serializer, manager=None, authkey=None,
              exposed=None, incref=True, manager_owned=False):

    _Client = listener_client[serializer][1]

    if exposed is None:
        conn = _Client(token.address, authkey=authkey)
        try:
            exposed = dispatch(conn, None, 'get_methods', (token,))
        finally:
            conn.close()

    if authkey is None and manager is not None:
        authkey = manager._authkey
    if authkey is None:
        authkey = process.current_process().authkey

    ProxyType = MyMakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
                      incref=incref, manager_owned=manager_owned)
    proxy._isauto = True
    return proxy

def init():
    global pid_registry
    pid_registry = threading.local()

Over here, PIDProxy is identical to BaseProxy, except for the fact that apart from only sending the method name and arguments to be called on the managed object, it sends an additional identifier with the value str(os.getpid()) as well. Similarly, ForwardPIDManager is identical to BaseManager, except for the fact the it uses a modified subclass of multiprocessing.managers.Server rather than the default parent, to start the server process. PIDServer modifies it’s parent to accept an extra variable (the process identifier) when unpacking requests from proxies and stores it’s value inside the current thread’s local storage (created after executing init inside the server process; further reading about thread local storage here). This happens before the requested function is executed, meaning that all methods of the managed object will have access to this storage. Lastly, MyAutoProxy and MyMakeProxyType override the default methods to create proxies using our subclasses instead.

All you need to do now is to use ForwardPIDManager instead of BaseManager, and specify the proxytype kwarg explicitly as MyAutoProxy. All managed objects will then have access to the pid of the process that called the function by doing

global pid_registry
pid = pid_registry.forwarded_for

from within the functions on the managed object.

Creating RLock

Using this, we can create our own implementation of RLocks, which closely mirrors that of threading.RLock, but uses this pid_registry.forwarded_for to verify owners instead of threading.get_ident:

class ManagerRLock:

    def __init__(self):
        self._block = allocate_lock()
        self._owner = None
        self._count = 0

    def acquire(self, blocking=True, timeout=-1):
        global pid_registry
        process = pid_registry.forwarded_for
        if self._owner == process:
            self._count += 1
            return True

        rc = self._block.acquire(blocking, timeout)

        if rc:
            self._owner = process
            self._count = 1
        return rc

    def release(self):
        global pid_registry
        process = pid_registry.forwarded_for
        if self._owner != process:
            raise RuntimeError("cannot release un-acquired lock")
        self._count = count = self._count - 1
        if not count:
            self._owner = None
            self._block.release()

    def __repr__(self):
        owner = self._owner
        return "<%s %s.%s object owner=%r count=%d at %s>" % (
            "locked" if self._block.locked() else "unlocked",
            self.__class__.__module__,
            self.__class__.__qualname__,
            owner,
            self._count,
            hex(id(self))
        )

    def __enter__(self):
        return self.acquire()

    def __exit__(self, exc_type, exc_val, exc_tb):
        return self.release()

Keep in mind that objects of ManagerRLock are not picklable, and therefore should not be passed around by value from manager to proxies. You can, however, expose it’s methods (acquire, release) to proxies without risk (example in the below section). Also, these are meant to be created directly, therefore do not use another manager to create them.

Example implementation

Using our patch and ManagerRLock, our implementation of TheSetup class becomes like this:

class TheSetup:
    def __init__(self):
        self._hardware_lock = None

    def initialize_lock(self):
        self._hardware_lock = ManagerRLock()

    def hold_hardware(self):
        self._hardware_lock.acquire()

    def release_hardware(self):
        self._hardware_lock.release()

    def do_thing_with_hardware(self):

        with self._hardware_lock:
            print('Doing thing...')
            # Mimic work being done, helpful for testing locks
            time.sleep(7)
            print('done')

    def do_other_thing_with_hardware(self):
        with self._hardware_lock:
            print('Doing other thing...')

    def return_(self):
        return self


if __name__ == '__main__':
    
    # Execute init here since this the main module is the service process
    init()
    
    # You can initialize lock in the constructor itself, this is only here for consistency between 
    # example implementations
    the_setup = TheSetup()
    the_setup.initialize_lock()
    
    ForwardPIDManager.register('get_the_setup', lambda: the_setup, MyAutoProxy)
    m = ForwardPIDManager(address=('localhost', 50000), authkey=b'abracadabra')

    s = m.get_server()
    print('Serving...')
    s.serve_forever()

One important thing to notice here is that you are not exposing the whole lock, only its methods (look at hold_hardware and release_hardware). This is important because ManagerRLock just like multiprocessing.RLock, is unpicklable. One side effect of this is that you can’t directly use context managers for the lock and you would have to do

the_setup.hold_hardware()
.
.
.
the_setup.release_hardware()

inside client.py instead of using a with block. However, if this bothers you can create a wrapper for the lock from within the client and use that instead (example given in the next section).

Lastly, also notice how we call init from inside the main module itself. This is because init must be executed in the server process, and since we are retrieving the server and calling serve_forever inside the main process itself, we must execute init there too. If instead you are using the .start() method of managers to create the server in another process, here is the equivalent code:

if __name__ == '__main__':
    the_setup = TheSetup()
    
    # We cannot use lambda since that's not picklable
    ForwardPIDManager.register('get_the_setup', callable=the_setup.return_, proxytype=MyAutoProxy)
    m = ForwardPIDManager(address=('localhost', 50000), authkey=b'abracadabra')
    
    # Start the server in another process, and tell it to run init as soon as it starts
    m.start(initializer=init)
    
    # Initialize lock after transferring the instance data to the manager process (ManagerRLock is not picklable)    
    proxy_setup = m.get_the_setup()
    proxy_setup.initialize_lock()
    
    # Wait for the server process to quit, only for demonstration, do not do this is in real code!
    m._process.join()

Final solution

Combine the whole patch provided above (classes PIDProxy, PIDServer, ForwardPIDManager, and functions init, MyMakeProxyType, MyAutoProxy), the ManagerRLock class, and code related to your implementation (class TheSetup along with the if __name__... block) into one single server.py file. Then, an example client.py which uses the RLock can be like below:

client.py

from server import ForwardPIDManager, MyAutoProxy


class TheSetup(ForwardPIDManager):
    pass


class LockWrapper:

    def __init__(self, proxy):
        self.proxy = proxy

    def __enter__(self):
        return self.proxy.hold_hardware()

    def __exit__(self, exc_type, exc_val, exc_tb):
        return self.proxy.release_hardware()


TheSetup.register('get_the_setup', proxytype=MyAutoProxy)
m = TheSetup(address=('localhost', 50000), authkey=b'abracadabra')
m.connect()

the_setup = m.get_the_setup()
the_setup_lock = LockWrapper(the_setup)

with the_setup_lock:

    the_setup.do_thing_with_hardware()
    the_setup.do_other_thing_with_hardware()

Notice the use of LockWrapper as a way to use the lock with a context manager.

A word about threads…

The solution relies on using threading.local, therefore, trying to access pid_registry.forwarded_for from inside another thread would fail. Hence, if you are using threading inside your shared class, then make sure you explicitly pass the pid to thread upon starting.

Additionally, ManagerRLock expects single-threaded processes (processes can be more than one) to access it. This means that if you are running multiple processes, which each are also multi-threaded, then using ManagerRLock might be unsafe (untested). However, if this is the case then you can trivially extend the patch by passing not only the pid, but the thread identifer as well (from inside PIDProxy) and store this inside pid_registry (from inside PIDServer). Then you will have access to the thread as well as the pid which sent the request to the manager inside ManagerRLock, and you can then decide the current owner of the lock based on both these variables.

Edit: A quick note about proxies, if you want to use your own (and not use MyAutoProxy), then you can do so, just make sure that the proxy subclasses PIDProxy.

Advertisement