Why you should use processing

TLDR


See it in action

Try sharing a logger, a list, and a counter across 4 parallel processes using standard multiprocessing:

share.counter = 0
share.results = []
share.log = logging.getLogger("worker")

Just kidding, you can't. multiprocessing.Manager doesn't support loggers. pickle will choke. You'd need to redesign everything.

Not with processing.

from suitkaise.processing import Share, Pool, Skprocess
import logging

# put anything on Share — literally anything
share = Share()
share.counter = 0
share.results = []
share.log = logging.getLogger("worker")

class Worker(Skprocess):
    def __init__(self, share, item):
        self.share = share
        self.item = item

    def __run__(self):
        result = self.item * 2
        self.share.results.append(result)       # shared list
        self.share.counter += 1                 # shared counter
        self.share.log.info(f"done: {result}")  # shared logger

pool = Pool(workers=4)
pool.star().map(Worker, [(share, x) for x in range(20)])

print(share.counter)         # 20
print(len(share.results))    # 20
print(share.log.handlers)    # still works

A list, a counter, and a logger — shared across 4 processes, all in sync, all updated like normal Python. No queues, no locks, no Manager, no pickle errors. No redesigning your code around what the serializer can handle.

processing makes parallel code regular code.


Python has a parallel processing problem.

Python uses a Global Interpreter Lock (GIL).

What this means

Hey, that sounds pretty good!

Maybe it doesn't sound that good

Python also has one major issue: no simultaneous bytecode execution.

This means that threads don't speed up CPU-bound work, and makes Python essentially single-threaded.

16 cores running 16 threads will not speed up work.

In a time where almost all major programs parallelize their code, Python is stuck in the past.

Or is it?

We can spawn multiple processes using Python's multiprocessing module.

True parallelism. Problem solved!

Wrong. The problem is not solved yet.

1. pickle can't serialize your code

You are trying to run something in parallel.

def process_data(items):
    def transform(x):
        return x * 2

    for item in items:
        result = transform(item)
        results.append(result)
    return results

    
with Pool(4) as pool:
    return pool.map(transform, lists_of_items)

This looks like it should work. But it doesn't, because you put a locally-defined function in the pool.

There are a bunch of random things that pickle can't handle, many of which are pretty common things you use when writing code.

Figuring out what works and what doesn't is a nightmare.

Just use cloudpickle or dill. They're better than pickle.

cloudpickle and dill are cool but they just lessen this problem, not solve it.

However: Python's multiprocessing doesn't use them by default.

The standard library's multiprocessing.Pool is hardcoded to use pickle. To use cloudpickle or dill, you have to:

# option 1: monkey-patch the serializer (risky, affects entire process)
import multiprocessing
import cloudpickle

multiprocessing.reduction.ForkingPickler.dumps = cloudpickle.dumps
multiprocessing.reduction.ForkingPickler.loads = cloudpickle.loads

# let's hope nothing else in your codebase depends on regular pickle
# option 2: use multiprocess (a fork of multiprocessing that uses dill)
# pip install multiprocess
import multiprocess as mp 

with mp.Pool(4) as pool:
    results = pool.map(my_function, items)

# dill is slow
# 2 libraries to keep track of
# option 3: use concurrent.futures with a custom executor
from concurrent.futures import ProcessPoolExecutor
import cloudpickle

# you need to write a custom executor class that overrides the serializer

None of these just work, but they all do waste your time.

And even after all that, you still have limitations. Your code is now more complex, in exchange for... not as many serialization errors?

So what CAN you do?

You could learn every limitation of whatever you use, and tiptoe around objects or patterns that will fail.

You could learn every limitation of whatever you use, and write custom code to handle unsupported objects haphazardly.

Or, you could just use processing.

processing uses cucumber instead of pickle

By default, processing uses cucumber, suitkaise's serialization engine, instead of pickle.

Problem actually solved

cucumber handles everything.

cucumber actually solves the problem of things not being serializable. And the problem of actually being compatible with multiprocessing.

(For more info, see the cucumber pages)

Pool — parallel mapping without the pickle headaches

With multiprocessing:

from multiprocessing import Pool as MPPool

with MPPool(4) as pool:
    results = pool.map(my_function, items)
    # PicklingError if my_function uses closures, lambdas, or local classes

With processing:

from suitkaise.processing import Pool

pool = Pool(workers=4)
results = pool.map(my_function, items)  # closures, lambdas, anything

Python's multiprocessing module also has problems

Python's multiprocessing module also has problems.

Outside of the serialization problem, a large problem still exists with multiprocessing.

2. Using multiprocessing is complicated and not intuitive

multiprocessing is a powerful tool, but it is also a pain in the ass to actually use, especially for complex tasks. A lot of this is just due to the fact that you sort of have to actually manage everything yourself.

Python gives us the bare minimum to parallelize code, but outside of that, everything is left to you.

This is a long list of things that you need in order to have solid code when parallelizing.

Making this situation worse

Notice what is passed into multiprocessing to run a single parallel process.

import multiprocessing

def process_data(items):
    
    # process your data
    return data


# run the single process (not even in a Pool)
process = multiprocessing.Process(target=process_data, args=(items,))
process.start()

It's a function.

You have to add all of those things from that list into a single function.

For the case above, passing in a simple function is fine, you just want to get compute and get the data faster in parallel.

But the case above doesn't scratch the surface of what you could do with parallel processing.

Not only does having to pass a function make implementing and debugging very difficult, but it also goes against the entire point of object-oriented programming -- where you encapsulate your code into different class objects -- by forcing you to make one giant god function that does everything.

Making the situation better

What is something in programming that we can use to split up a giant god function into a more manageable set of pieces?

Classes.

Everyone knows how to work with classes. They are the fundamental building block of object-oriented programming.

So why not pass a class into multiprocessing instead of a function?

import multiprocessing

class ProcessData(multiprocessing.Process):
    def __init__(self, items, result_queue):
        self.items = items
        self.result_queue = result_queue  # need a Queue to communicate
        super().__init__()
    
    def run(self):
        # do work
        result = process_items(self.items)
        self.result_queue.put(result)  # send back via queue

queue = multiprocessing.Queue()
process = ProcessData(items, queue)
process.start()
process.join()
result = queue.get()  # retrieve from queue, not process.result

This is a step in the right direction, but it is by no means perfect.

Skprocess

A class is the overall solution that should've been used all along.

But we still have no structure and no lifecycle.

Skprocess is a class that goes above and beyond for you.

Let's make a process that queries a database for user data based on given input from a parent process.

Requirements:

Without it - 92 lines

# comments and whitespace excluded from line count
import multiprocessing
import signal
import time
import psycopg2
from multiprocessing import Queue, Event, Value
from ctypes import c_double

class DatabaseWorker(multiprocessing.Process):
    def __init__(self, task_queue, result_queue, stats_lock, 
                 total_time, query_count, stop_event, db_config):
        super().__init__()
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.stats_lock = stats_lock
        self.total_time = total_time
        self.query_count = query_count
        self.stop_event = stop_event
        self.db_config = db_config
        self.timeout = 30
        self.max_retries = 3
        self.conn = None
    
    def _connect(self):
        # manual retry logic with exponential backoff
        for attempt in range(self.max_retries):
            try:
                self.conn = psycopg2.connect(**self.db_config)
                return
            except psycopg2.OperationalError:
                if attempt == self.max_retries - 1:
                    raise
                time.sleep(2 ** attempt)
    
    def _timeout_handler(self, signum, frame):
        raise TimeoutError("Query timed out")
    
    def run(self):
        # manual connection setup
        self._connect()
        
        # manual signal handling for timeouts
        signal.signal(signal.SIGALRM, self._timeout_handler)
        
        try:
            while not self.stop_event.is_set():
                # check for incoming query (non-blocking)
                try:
                    query_params = self.task_queue.get(timeout=0.1)
                except:
                    # no query received
                    self.result_queue.put({'status': 'no query', 'data': None})
                    continue
                
                # manual timing
                start = time.time()
                signal.alarm(self.timeout)
                
                try:
                    cursor = self.conn.cursor()
                    cursor.execute(query_params['sql'], query_params.get('params'))
                    results = cursor.fetchall()
                    cursor.close()
                    
                    signal.alarm(0)
                    elapsed = time.time() - start
                    
                    # manual stats tracking with locks
                    with self.stats_lock:
                        self.total_time.value += elapsed
                        self.query_count.value += 1
                    
                    # different status based on results
                    if not results:
                        self.result_queue.put({'status': 'error', 'data': None})
                    else:
                        self.result_queue.put({'status': 'ok', 'data': results})
                    
                except TimeoutError:
                    signal.alarm(0)
                    self.result_queue.put({'status': 'error', 'error': 'timeout'})
                except Exception as e:
                    signal.alarm(0)
                    self.result_queue.put({'status': 'error', 'error': str(e)})
        finally:
            # manual cleanup
            if self.conn:
                self.conn.close()


# usage

# connect to the database (credentials stored separately)
db_config = {'host': 'localhost', 'database': 'mydb', 'password': 'secret'}

# create all the shared state machinery
manager = multiprocessing.Manager()
task_queue = Queue()
result_queue = Queue()
stats_lock = manager.Lock()
total_time = Value(c_double, 0.0)
query_count = Value('i', 0)
stop_event = Event()

# init and start the worker process
worker = DatabaseWorker(
    task_queue, result_queue, stats_lock, total_time, 
    query_count, stop_event, db_config,
    timeout=30, max_retries=3
)
worker.start()

# list of queries to run (counted as a single line)
queries = [
    {'sql': 'SELECT * FROM users WHERE id = %s', 'params': (123,)},
    {'sql': 'SELECT * FROM users WHERE id = %s', 'params': (456,)},
    {'sql': 'SELECT * FROM users WHERE id = %s', 'params': (789,)},
    # ...
]

# create a list to store the results
results = []

# send each query to the worker
for query in queries:
    task_queue.put(query)
    result = result_queue.get(timeout=30)  # need manual timeout here too
    results.append(result)

# signal stop and wait for cleanup
stop_event.set()
worker.join()

# manual timing calculation
if query_count.value > 0:
    avg_time = total_time.value / query_count.value
    print(f"Avg query time: {avg_time:.3f}s")

There are a lot of problems here.

  1. 6 import statements
  2. 12 parameters in __init__, most of which are just trying to setup infrastructure
  3. super().__init__() has to be called and is easy to forget
  4. Manual retry logic
  5. Manual performance timing
  6. Multiple different timeouts need to be handled manually
  7. Several queues to manage
  8. Have to handle signals, which don't even work on Windows
  9. Awkward .value access for shared state
  10. Have to use a separate event object for stopping
  11. Manager for locks, something else that needs to be coordinated
  12. Manual cleanup in finally
  13. Statistics done by hand
  14. Passing database credentials around as a dict
  15. Uses pickle

This is a simple example: you already have to do this much for this little.

With Skprocess - 40 lines

# comments and whitespace excluded from line count
from suitkaise.processing import Skprocess, autoreconnect
import psycopg2

@autoreconnect(**{"psycopg2.Connection": {"*": "password"}})
class DatabaseWorker(Skprocess):

    def __init__(self, db_connection):
        # this automatically reconnects
        self.db = db_connection

        # built in configuration
        # run indefinitely until stop() is called
        self.process_config.runs = None
        # NOTE: this is the default: here for clarity, not counted in line count

        # 3 lives (2 extra attempts after the first failure)
        self.process_config.lives = 3

        # 30 second timeout per query
        self.process_config.timeouts.run = 30.0
    
    def __prerun__(self):
        # receive query from parent (non-blocking check)
        msg = self.listen(timeout=0.1)
        self.query = msg if msg else None
    
    def __run__(self):
        if not self.query:
            return
        cursor = self.db.cursor()
        cursor.execute(self.query['sql'], self.query.get('params'))
        self.results = cursor.fetchall()
        cursor.close()
    
    def __postrun__(self):

        if self.query:
            if not self.results:
                self.tell({'status': 'error', 'data': None})
            else:
                self.tell({'status': 'ok', 'data': self.results})

        else:
            self.tell({'status': 'no query', 'data': None})
    
    def __onfinish__(self):
        self.db.close()

# usage

# connect to the database
db = psycopg2.connect(host='localhost', database='mydb', password='secret')

# init and start the worker process
worker = DatabaseWorker(db)
worker.start()

# list of queries to run (counted as a single line)
queries = [
    {'sql': 'SELECT * FROM users WHERE id = %s', 'params': (123,)},
    {'sql': 'SELECT * FROM users WHERE id = %s', 'params': (456,)},
    {'sql': 'SELECT * FROM users WHERE id = %s', 'params': (789,)},
    # ...
]

# create a list to store the results
results = []

# send each query to the worker
for query in queries:
    worker.tell(query)
    result = worker.listen(timeout=30)
    results.append(result)

# request stop (join) and wait for it to finish
worker.stop()
worker.wait()

# automatically timed
print(f"Avg query time: {worker.__run__.timer.mean:.3f}s")

Shared state in Python

Sharing state across process boundaries is one of the most functionally important parts of multiprocessing in Python.

There are generally 6 patterns for doing this:

1. Value and Array

Shared memory, but just for primitive types.

Pros: Fast, no serialization

Cons: Only primitives, not even dicts or lists

2. multiprocessing.Manager

Proxy objects that wrap Python types.

Pros: Supports dict, list, and other Python types (uses pickle)

Cons: Slow. Manager is a separate process, so not truly shared memory

3. multiprocessing.shared_memory

Raw shared memory blocks. Only available in Python 3.8+.

Pros: True shared memory, fast

Cons: Manual buffer management, no serialization, have to handle syncing yourself

4. Queues (message passing)

Isn't exactly shared state, but functionally similar.

Pros: Safe, decently fast, works with any serializable object

Cons: Not actually shared — each process has its own copy

5. Files/Databases

Write to disk so that other processes can read.

Pros: Simple and persistent

Cons: Slow IO, race conditions, not real-time

6. External services

Use an external process to hold state, like Redis or Memcached.

Pros: Atomic operations, pub/sub, works across machines

Cons: External dependency, network overhead, more to manage

The problem with all of these

There is no good solution here. Just a lot of average ones.

That's where Share comes in.

Share

Share is the ultimate solution to shared state in Python.

Pros:

Cons:

Look at this example.

from suitkaise.processing import Skprocess, Share, Pool
import logging

share = Share()
share.counter = 0
share.log = logging.getLogger("ShareLog")

class ShareCounter(Skprocess):

    def __init__(self, share: Share):
        self.share = share
        self.process_config.runs = 10

    def __run__(self):
        self.share.counter += 1
        self.share.log.info(f"{self.share.counter}")

pool = Pool(workers=4)
pool.map(ShareCounter, [share] * 10)

print(share.counter) # 100 (10 total workers, 10 runs each)
print(share.log.messages) # ['1', '2', '3', '4', '5', ..., '100'] in order

In 20 lines, I made a counter that works in a parallel pool, that will loop exactly 10 times, logging its progress. Not a single result is needed, as everything just got added to shared memory, and that shared memory was 100% in sync.

Share is 100% worth the slow speed

Admittedly, it's about 3x slower than multiprocessing.Manager.

But every object works exactly the same, and everything syncs perfectly.

And with cucumber, it works with any object you want.

To add them: assign them to attributes.

share = Share()
share.counter = 0
share.log = logging.getLogger("ShareLog")
share.complex_object = ComplexObject()

Exactly the same as regular variable assignment.

To use them: access and update them as normal.

share.counter += 1
share.log.info(f"{share.counter}")
share.complex_object.do_something()

Exactly the same as how you would use them outside of shared memory.

Why the 3x doesn't matter in practice

The overhead is on the coordinator IPC layer, the per-operation cost of syncing state. For long-running parallel work (the kind where you actually need multiprocessing), that cost gets diluted as you perform longer running tasks.

If your process runs for 30 seconds and does 1,000 share operations, the overhead is a few extra milliseconds total. Meanwhile, the alternative is hours of your time debugging.

Share trades microseconds of IPC overhead for the ability to turn your brain off and never write shared state boilerplate again. You create it, assign to it, and read from it -- exactly like you learned in your first programming class.

That's a tradeoff worth making.

processing still has 2 more options for sharing state

processing has 2 high speed options for sharing state.

1. Skprocess.tell() and Skprocess.listen()

The 2 queue-like methods that are a part of Skprocess (and all inheriting classes).

These are automatically 2-way, and use cucumber.

from suitkaise.processing import Skprocess

class MyProcess(Skprocess):
    def __prerun__(self):
        self.command = self.listen(timeout=1.0)

    def __run__(self):
        if self.command == "stop":
            self.stop()
        elif self.command == "print":
            print("hello")
        else:
            raise ValueError(f"Unknown command: {self.command}")

    def __postrun__(self):
        self.command = None
        self.tell("command received")
        
p = MyProcess()
p.start()
for i in range(10):
    p.tell("print")
    result = p.listen(timeout=1.0)
    if result != "command received":
        break

p.tell("stop")
p.wait()

2. Pipe

The fastest, most direct way to communicate between processes.

from suitkaise.processing import Pipe, Skprocess

anchor, point = Pipe.pair()

class MyProcess(Skprocess):

    def __init__(self, pipe_point: Pipe.Point):
        self.pipe = pipe_point
        self.process_config.runs = 1

    def __run__(self):
        self.pipe.send("hello")
        result = self.pipe.recv()
        print(result)

process = MyProcess(point)
process.start()

anchor.send("hello")

result = anchor.recv()
print(result)

process.wait()

One way pipe:

from suitkaise.processing import Pipe, Skprocess

# one way pipe: only anchor can send data, point can only receive
anchor, point = Pipe.pair(one_way=True)

class MyProcess(Skprocess):
    def __init__(self, pipe_point: Pipe.Point):
        self.pipe = pipe_point
        self.process_config.runs = 1

    def __prerun__(self):
        self.data_to_process = self.pipe.recv()

    def __run__(self):
        self.process_data(self.data_to_process)

    def __postrun__(self):
        self.data_to_process = None

So, which one should you use?

Most of the time, you should just use Share.

If you want simpler, faster, 2-way communication without setup, use tell() and listen().

But if you still need speed, or want more manual control, use Pipe.

Putting it all together

Throughout this page, you might have seen something called Pool.

Pool is an upgraded wrapper around multiprocessing.Pool used for parallel batch processing.

What this enables:

So, already, Pool is vastly more powerful than multiprocessing.Pool. especially because you can use Share.

Pool is better, but still familiar to users

It has the 4 main map methods, with clearer names.

map: returns a list, ordered by input. Each item gets added to the list in the order it was added to the pool.

list_in_order = Pool.map(fn_or_skprocess, items)

unordered_map: returns a list, unordered. Whatever finishes first, gets added to the list first.

unordered_list = Pool.unordered_map(fn_or_skprocess, items)

imap: returns an iterator, ordered by input. Each item gets added to the iterator in the order it was added to the pool.

for item in Pool.imap(fn_or_skprocess, items):
    print(item)

unordered_imap: returns an iterator, unordered. Whatever finishes first, gets added to the iterator first.

for item in Pool.unordered_imap(fn_or_skprocess, items):
    print(item)

Since you can use Skprocess objects that can stop() themselves (or set a number of runs), you can theoretically keep running the pool and let the processes run until they are done. This opens up a lot of possibilities for complex parallel processing tasks.

Modifiers are what make it reach the next level

sk modifiers are from another suitkaise module, and are available on most suitkaise functions and methods, including Pool.

And, Pool itself has a special modifier, star(), that allows you to unpack tuples into function arguments.

from suitkaise.processing import Pool
import asyncio

# get a coroutine for map with a timeout
coro = Pool.map.timeout(20.0).asynced()
results = await coro(fn_or_skprocess, items)

# or, run in the background, get a Future
# and unpack tuples across function arguments (instead of adding the whole tuple as a single argument)
future = Pool.star().map.background()(fn_or_skprocess, items)

asynced() and background() do not work with each other (they do the same thing in different ways), but other than that, everything else is combinable.

These modifiers work with all map methods.

For more info on how to use these modifiers, see the sk pages or look at the processing examples.

Works with the rest of suitkaise

processing doesn't exist in a vacuum. It's designed to work with the rest of the suitkaise ecosystem.

Each module is useful on its own, but they were designed together. When you use processing, you get the full benefit of that integration.