Skip to content
Advertisement

Sharing instance of proxy object across processes results in pickle errors

I’m trying to implement a simple shared object system in python between several processes. I’m doing the following:

import os
from multiprocessing.managers import SyncManager

    if __name__ == '__main__':
        manager = SyncManager(authkey=b'test')
        manager.start()
        address = manager.address
        d = manager.dict()
        pickled_dict = d.__reduce__()
        pickled_dict[1][-1]["authkey"] = b"test"
        print(pickled_dict)
        for i in range(1000):
            d[i] = i
    
        child_id = os.fork()
    
        if child_id != 0:
            # in parent, do work on the proxy object forever
            i = 0
            while True:
                d[i%1000] = i%3434
                i += 1
        else:
            # in children
    
            # connect to manager process
            child_manager = SyncManager(address=address, authkey=b'test')
            child_manager.connect()
    
            # rebuild the dictionary proxy
            proxy_obj = pickled_dict[0](*pickled_dict[1])
    
            # read on the proxy object forever
            while True:
                print(list(proxy_obj.values())[:10])

However on Python 3.9, this keeps failing for various pickle errors, like _pickle.UnpicklingError: invalid load key, 'x0a'.

Am I doing something incorrect here? AFAIK it should be possible to read/write concurrently (several processes) from/to a Manager object (FYI I’ve created an issue on Python too: https://github.com/python/cpython/issues/101320 but no answer yet)

Advertisement

Answer

The following code I am presenting below will probably be very familiar to you and so I am telling you something you may already know. But there’s not much harm in that except for the 5 minutes you might have “wasted” in reading this. Even then perhaps you will find something of value.

I really see no need for you to be using the low-level calls __reduce__ and fork and so this answer does not attempt to get these calls to work for you. However, it does address what I understand to be your principal objective, i.e. “… trying to implement a simple shared object system in python between several processes” by showing how a multi-platform solution can be achieved.

The following code creates a new process in a platform-independent way and relies on Python knowing how to serialize a proxy object. I have also modified your code so that it does not run indefinitely:

from multiprocessing.managers import SyncManager
from multiprocessing import Process

def child_process(address, proxy_obj):
    child_manager = SyncManager(address=address, authkey=b'test')
    child_manager.connect()

    print(list(proxy_obj.values())[:10])

if __name__ == '__main__':
    with SyncManager(authkey=b'test') as manager:
        address = manager.address
        # Instead of making 1000 slow calls on a proxy,
        # do this instead:
        d = manager.dict({i: i for i in range(1000)})
        p = Process(target=child_process, args=(address, d))
        p.start()
        p.join()

Prints:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Note:

You say, “… it should be possible to read/write concurrently (several processes) from/to a Manager object” — and it is! Every call on a proxy object results in the call and its arguments being serialized and sent to the Manager’s process to be de-serialized and executed by a thread. So as long as the calls are thread-safe there should be no issue and that is the case with a managed dictionary whose underlying implementation is a standard dictionary. But consider the case where your client processes are executing code as follows:

proxy_obj[i] += 1

If it is possible for multiple processes to be trying to increment the same value, then it needs to be done under control of a multiprocessing.Lock instance because incrementing a dictionary value requires two successive calls on the dictionary, i.e. one to retrieve the current value and one to update the dictionary with the incremented value. So whenever your updates are not atomic you will need to treat the update as a critical section whose execution must be serialized. For example,

from multiprocessing.managers import SyncManager
from multiprocessing import Process, Lock

def child_process(address, proxy_obj, lock):
    child_manager = SyncManager(address=address, authkey=b'test')
    child_manager.connect()

    for i in range(10):
        with lock:
            proxy_obj[i] += 1

if __name__ == '__main__':
    with SyncManager(authkey=b'test') as manager:
        address = manager.address
        d = manager.dict({i: i for i in range(10)})
        lock = Lock()
        p = Process(target=child_process, args=(address, d, lock))
        p.start()
        for i in range(10):
            with lock:
                d[i] += 1
        p.join()
        print(list(d.values()))

Prints:

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement