processing share.counter = 0 just works across processeslives=3 and your process auto-retries. No try/except loops.@autoreconnect brings live connections into subprocesses. Normally impossible..asynced() when you need it.Try sharing a logger, a list, and a counter across 4 parallel processes using standard multiprocessing:
share.counter = 0
share.results = []
share.log = logging.getLogger("worker")
Just kidding, you can't. multiprocessing.Manager doesn't support loggers. pickle will choke. You'd need to redesign everything.
Not with .
from suitkaise .processing import Share , Pool , Skprocess
import logging
# put anything on Share — literally anything
share = Share( )
share.counter = 0
share.results = []
share.log = logging.getLogger("worker")
class Worker (Skprocess ):
def __init__(self, share, item):
self.share = share
self.item = item
def __run__ (self):
result = self.item * 2
self.share.results.append(result) # shared list
self.share.counter += 1 # shared counter
self.share.log.info(f"done: {result}") # shared logger
pool = Pool( workers=4)
pool.star() .map (Worker , [(share, x) for x in range(20)])
print(share.counter) # 20
print(len(share.results)) # 20
print(share.log.handlers) # still works
A list, a counter, and a logger — shared across 4 processes, all in sync, all updated like normal Python. No queues, no locks, no Manager, no pickle errors. No redesigning your code around what the serializer can handle.
makes parallel code regular code.
Python uses a Global Interpreter Lock (GIL).
Hey, that sounds pretty good!
Python also has one major issue: no simultaneous bytecode execution.
This means that threads don't speed up CPU-bound work, and makes Python essentially single-threaded.
16 cores running 16 threads will not speed up work.
In a time where almost all major programs parallelize their code, Python is stuck in the past.
We can spawn multiple processes using Python's multiprocessing module.
True parallelism. Problem solved!
Wrong. The problem is not solved yet.
pickle can't serialize your codeYou are trying to run something in parallel.
def process_data(items):
def transform(x):
return x * 2
for item in items:
result = transform(item)
results.append(result)
return results
with Pool (4) as pool:
return pool.map(transform, lists_of_items)
This looks like it should work. But it doesn't, because you put a locally-defined function in the pool.
There are a bunch of random things that pickle can't handle, many of which are pretty common things you use when writing code.
Figuring out what works and what doesn't is a nightmare.
cloudpickle or dill. They're better than pickle.cloudpickle and dill are cool but they just lessen this problem, not solve it.
cloudpickle is fast, and has a little more coverage than pickle, but not enoughdill has a lot more coverage than pickle, but is very slow in exchangeHowever: Python's multiprocessing doesn't use them by default.
The standard library's multiprocessing. is hardcoded to use pickle. To use cloudpickle or dill, you have to:
# option 1: monkey-patch the serializer (risky, affects entire process)
import multiprocessing
import cloudpickle
multiprocessing.reduction.ForkingPickler.dumps = cloudpickle.dumps
multiprocessing.reduction.ForkingPickler.loads = cloudpickle.loads
# let's hope nothing else in your codebase depends on regular pickle
# option 2: use multiprocess (a fork of multiprocessing that uses dill)
# pip install multiprocess
import multiprocess as mp
with mp.Pool (4) as pool:
results = pool.map(my_function, items)
# dill is slow
# 2 libraries to keep track of
# option 3: use concurrent.futures with a custom executor
from concurrent.futures import ProcessPoolExecutor
import cloudpickle
# you need to write a custom executor class that overrides the serializer
None of these just work, but they all do waste your time.
And even after all that, you still have limitations. Your code is now more complex, in exchange for... not as many serialization errors?
You could learn every limitation of whatever you use, and tiptoe around objects or patterns that will fail.
You could learn every limitation of whatever you use, and write custom code to handle unsupported objects haphazardly.
Or, you could just use .
processing uses cucumber instead of pickleBy default, uses , 's serialization engine, instead of pickle.
handles everything.
picklecloudpickledillprocessing actually solves the problem of things not being serializable. And the problem of actually being compatible with multiprocessing.
(For more info, see the pages)
Pool — parallel mapping without the pickle headachesWith multiprocessing:
from multiprocessing import Pool as MPPool
with MPPool(4) as pool:
results = pool.map(my_function, items)
# PicklingError if my_function uses closures, lambdas, or local classes
With :
from suitkaise .processing import Pool
pool = Pool( workers=4)
results = pool.map( my_function, items) # closures, lambdas, anything
multiprocessing module also has problemsPython's multiprocessing module also has problems.
Outside of the serialization problem, a large problem still exists with multiprocessing.
multiprocessing is complicated and not intuitivemultiprocessing is a powerful tool, but it is also a pain in the ass to actually use, especially for complex tasks. A lot of this is just due to the fact that you sort of have to actually manage everything yourself.
Python gives us the bare minimum to parallelize code, but outside of that, everything is left to you.
This is a long list of things that you need in order to have solid code when parallelizing.
Notice what is passed into multiprocessing to run a single parallel process.
import multiprocessing
def process_data(items):
# process your data
return data
# run the single process (not even in a Pool)
process = multiprocessing.Process(target=process_data, args=(items,))
process.start()
It's a function.
You have to add all of those things from that list into a single function.
For the case above, passing in a simple function is fine, you just want to get compute and get the data faster in parallel.
But the case above doesn't scratch the surface of what you could do with parallel processing.
Not only does having to pass a function make implementing and debugging very difficult, but it also goes against the entire point of object-oriented programming -- where you encapsulate your code into different class objects -- by forcing you to make one giant god function that does everything.
What is something in programming that we can use to split up a giant god function into a more manageable set of pieces?
Classes.
Everyone knows how to work with classes. They are the fundamental building block of object-oriented programming.
So why not pass a class into multiprocessing instead of a function?
import multiprocessing
class ProcessData(multiprocessing.Process):
def __init__(self, items, result_queue):
self.items = items
self.result_queue = result_queue # need a Queue to communicate
super().__init__()
def run(self):
# do work
result = process_items(self.items)
self.result_queue.put(result) # send back via queue
queue = multiprocessing.Queue()
process = ProcessData(items, queue)
process.start()
process.join()
result = queue.get() # retrieve from queue, not process.result
This is a step in the right direction, but it is by no means perfect.
picklesuper().__init__() and implement run()Skprocess A class is the overall solution that should've been used all along.
But we still have no structure and no lifecycle.
Skprocess is a class that goes above and beyond for you.cucumber Share (very important later)super().__init__() when inheritingPipe Requirements:
Without it - 92 lines
# comments and whitespace excluded from line count
import multiprocessing
import signal
import time
import psycopg2
from multiprocessing import Queue, Event, Value
from ctypes import c_double
class DatabaseWorker(multiprocessing.Process):
def __init__(self, task_queue, result_queue, stats_lock,
total_time, query_count, stop_event, db_config):
super().__init__()
self.task_queue = task_queue
self.result_queue = result_queue
self.stats_lock = stats_lock
self.total_time = total_time
self.query_count = query_count
self.stop_event = stop_event
self.db_config = db_config
self.timeout = 30
self.max_retries = 3
self.conn = None
def _connect(self):
# manual retry logic with exponential backoff
for attempt in range(self.max_retries):
try:
self.conn = psycopg2.connect(**self.db_config)
return
except psycopg2.OperationalError:
if attempt == self.max_retries - 1:
raise
time.sleep(2 ** attempt)
def _timeout_handler(self, signum, frame):
raise TimeoutError("Query timed out")
def run(self):
# manual connection setup
self._connect()
# manual signal handling for timeouts
signal.signal(signal.SIGALRM, self._timeout_handler)
try:
while not self.stop_event.is_set():
# check for incoming query (non-blocking)
try:
query_params = self.task_queue.get(timeout=0.1)
except:
# no query received
self.result_queue.put({'status': 'no query', 'data': None})
continue
# manual timing
start = time.time()
signal.alarm(self.timeout)
try:
cursor = self.conn.cursor()
cursor.execute(query_params['sql'], query_params.get('params'))
results = cursor.fetchall()
cursor.close()
signal.alarm(0)
elapsed = time.time() - start
# manual stats tracking with locks
with self.stats_lock:
self.total_time.value += elapsed
self.query_count.value += 1
# different status based on results
if not results:
self.result_queue.put({'status': 'error', 'data': None})
else:
self.result_queue.put({'status': 'ok', 'data': results})
except TimeoutError:
signal.alarm(0)
self.result_queue.put({'status': 'error', 'error': 'timeout'})
except Exception as e:
signal.alarm(0)
self.result_queue.put({'status': 'error', 'error': str(e)})
finally:
# manual cleanup
if self.conn:
self.conn.close()
# usage
# connect to the database (credentials stored separately)
db_config = {'host': 'localhost', 'database': 'mydb', 'password': 'secret'}
# create all the shared state machinery
manager = multiprocessing.Manager()
task_queue = Queue()
result_queue = Queue()
stats_lock = manager.Lock()
total_time = Value(c_double, 0.0)
query_count = Value('i', 0)
stop_event = Event()
# init and start the worker process
worker = DatabaseWorker(
task_queue, result_queue, stats_lock, total_time,
query_count, stop_event, db_config,
timeout=30, max_retries=3
)
worker.start()
# list of queries to run (counted as a single line)
queries = [
{'sql': 'SELECT * FROM users WHERE id = %s', 'params': (123,)},
{'sql': 'SELECT * FROM users WHERE id = %s', 'params': (456,)},
{'sql': 'SELECT * FROM users WHERE id = %s', 'params': (789,)},
# ...
]
# create a list to store the results
results = []
# send each query to the worker
for query in queries:
task_queue.put(query)
result = result_queue.get(timeout=30) # need manual timeout here too
results.append(result)
# signal stop and wait for cleanup
stop_event.set()
worker.join()
# manual timing calculation
if query_count.value > 0:
avg_time = total_time.value / query_count.value
print(f"Avg query time: {avg_time:.3f}s")
There are a lot of problems here.
__init__, most of which are just trying to setup infrastructuresuper().__init__() has to be called and is easy to forget.value access for shared stateManager for locks, something else that needs to be coordinatedfinallypickleThis is a simple example: you already have to do this much for this little.
With - 40 lines
# comments and whitespace excluded from line count
from suitkaise .processing import Skprocess , autoreconnect
import psycopg2
@autoreconnect (**{"psycopg2.Connection": {"*": "password"}})
class DatabaseWorker (Skprocess ):
def __init__(self, db_connection):
# this automatically reconnects
self.db = db_connection
# built in configuration
# run indefinitely until stop() is called
self.process_config .runs = None
# NOTE: this is the default: here for clarity, not counted in line count
# 3 lives (2 extra attempts after the first failure)
self.process_config .lives = 3
# 30 second timeout per query
self.process_config .timeouts .run = 30.0
def __prerun__ (self):
# receive query from parent (non-blocking check)
msg = self.listen (timeout=0.1)
self.query = msg if msg else None
def __run__ (self):
if not self.query:
return
cursor = self.db.cursor()
cursor.execute(self.query['sql'], self.query.get('params'))
self.results = cursor.fetchall()
cursor.close()
def __postrun__ (self):
if self.query:
if not self.results:
self.tell ({'status': 'error', 'data': None})
else:
self.tell ({'status': 'ok', 'data': self.results})
else:
self.tell ({'status': 'no query', 'data': None})
def __onfinish__ (self):
self.db.close()
# usage
# connect to the database
db = psycopg2.connect(host='localhost', database='mydb', password='secret')
# init and start the worker process
worker = DatabaseWorker (db)
worker.start()
# list of queries to run (counted as a single line)
queries = [
{'sql': 'SELECT * FROM users WHERE id = %s', 'params': (123,)},
{'sql': 'SELECT * FROM users WHERE id = %s', 'params': (456,)},
{'sql': 'SELECT * FROM users WHERE id = %s', 'params': (789,)},
# ...
]
# create a list to store the results
results = []
# send each query to the worker
for query in queries:
worker.tell( query)
result = worker.listen( timeout=30)
results.append(result)
# request stop (join) and wait for it to finish
worker.stop()
worker.wait()
# automatically timed
print(f"Avg query time: {worker.__run__.timer.mean:.3f}s")
cucumber for serializationSharing state across process boundaries is one of the most functionally important parts of multiprocessing in Python.
There are generally 6 patterns for doing this:
Value and ArrayShared memory, but just for primitive types.
Pros: Fast, no serialization
Cons: Only primitives, not even dicts or lists
multiprocessing.ManagerProxy objects that wrap Python types.
Pros: Supports dict, list, and other Python types (uses pickle)
Cons: Slow. Manager is a separate process, so not truly shared memory
multiprocessing.shared_memoryRaw shared memory blocks. Only available in Python 3.8+.
Pros: True shared memory, fast
Cons: Manual buffer management, no serialization, have to handle syncing yourself
Isn't exactly shared state, but functionally similar.
Pros: Safe, decently fast, works with any serializable object
Cons: Not actually shared — each process has its own copy
Write to disk so that other processes can read.
Pros: Simple and persistent
Cons: Slow IO, race conditions, not real-time
Use an external process to hold state, like Redis or Memcached.
Pros: Atomic operations, pub/sub, works across machines
Cons: External dependency, network overhead, more to manage
There is no good solution here. Just a lot of average ones.
That's where comes in.
Share is the ultimate solution to shared state in Python.
Pros:
cucumber for serialization, so all objects workCons:
Look at this example.
from suitkaise .processing import Skprocess , Share , Pool
import logging
share = Share( )
share.counter = 0
share.log = logging.getLogger("ShareLog")
class ShareCounter (Skprocess ):
def __init__(self, share: Share ):
self.share = share
self.process_config .runs = 10
def __run__ (self):
self.share.counter += 1
self.share.log.info(f"{self.share.counter}")
pool = Pool( workers=4)
pool.map( ShareCounter , [share] * 10)
print(share.counter) # 100 (10 total workers, 10 runs each)
print(share.log.messages) # ['1', '2', '3', '4', '5', ..., '100'] in order
In 20 lines, I made a counter that works in a parallel pool, that will loop exactly 10 times, logging its progress. Not a single result is needed, as everything just got added to shared memory, and that shared memory was 100% in sync.
Share is 100% worth the slow speedAdmittedly, it's about 3x slower than multiprocessing.Manager.
But every object works exactly the same, and everything syncs perfectly.
And with , it works with any object you want.
share = Share()
share.counter = 0
share.log = logging.getLogger("ShareLog")
share.complex_object = ComplexObject()
Exactly the same as regular variable assignment.
share.counter += 1
share.log.info (f"{share.counter }")
share.complex_object.do_something ()
Exactly the same as how you would use them outside of shared memory.
The overhead is on the coordinator IPC layer, the per-operation cost of syncing state. For long-running parallel work (the kind where you actually need multiprocessing), that cost gets diluted as you perform longer running tasks.
If your process runs for 30 seconds and does 1,000 share operations, the overhead is a few extra milliseconds total. Meanwhile, the alternative is hours of your time debugging.
trades microseconds of IPC overhead for the ability to turn your brain off and never write shared state boilerplate again. You create it, assign to it, and read from it -- exactly like you learned in your first programming class.
That's a tradeoff worth making.
processing still has 2 more options for sharing state has 2 high speed options for sharing state.
Skprocess .tell () and Skprocess .listen ()The 2 queue-like methods that are a part of (and all inheriting classes).
These are automatically 2-way, and use .
from suitkaise .processing import Skprocess
class MyProcess (Skprocess ):
def __prerun__ (self):
self.command = self.listen (timeout=1.0)
def __run__ (self):
if self.command == "stop":
self.stop ()
elif self.command == "print":
print("hello")
else:
raise ValueError(f"Unknown command: {self.command}")
def __postrun__ (self):
self.command = None
self.tell ("command received")
p = MyProcess ()
p.start()
for i in range(10):
p.tell( "print")
result = p.listen( timeout=1.0)
if result != "command received":
break
p.tell( "stop")
p.wait()
Pipe The fastest, most direct way to communicate between processes.
from suitkaise .processing import Pipe , Skprocess
anchor , point = Pipe .pair()
class MyProcess (Skprocess ):
def __init__(self, pipe_point: Pipe .Point):
self.pipe = pipe_point
self.process_config .runs = 1
def __run__ (self):
self.pipe.send("hello")
result = self.pipe.recv()
print(result)
process = MyProcess (point)
process.start()
anchor.send( "hello")
result = anchor.recv()
print(result)
process.wait()
One way pipe:
from suitkaise .processing import Pipe , Skprocess
# one way pipe: only anchor can send data, point can only receive
anchor, point = Pipe .pair(one_way=True)
class MyProcess (Skprocess ):
def __init__(self, pipe_point: Pipe .Point):
self.pipe = pipe_point
self.process_config .runs = 1
def __prerun__ (self):
self.data_to_process = self.pipe.recv()
def __run__ (self):
self.process_data(self.data_to_process)
def __postrun__ (self):
self.data_to_process = None
Most of the time, you should just use .
If you want simpler, faster, 2-way communication without setup, use and .
But if you still need speed, or want more manual control, use .
Throughout this page, you might have seen something called .
is an upgraded wrapper around multiprocessing. used for parallel batch processing.
What this enables:
Share cucumber for serializationSkprocess class objectssk modifiersSo, already, is vastly more powerful than multiprocessing.. especially because you can use .
Pool is better, but still familiar to usersIt has the 4 main map methods, with clearer names.
: returns a list, ordered by input. Each item gets added to the list in the order it was added to the pool.
list_in_order = Pool.map (fn_or_skprocess, items)
: returns a list, unordered. Whatever finishes first, gets added to the list first.
unordered_list = Pool.unordered_map (fn_or_skprocess, items)
: returns an iterator, ordered by input. Each item gets added to the iterator in the order it was added to the pool.
for item in Pool.imap (fn_or_skprocess, items):
print(item)
: returns an iterator, unordered. Whatever finishes first, gets added to the iterator first.
for item in Pool.unordered_imap (fn_or_skprocess, items):
print(item)
Since you can use objects that can themselves (or set a number of runs), you can theoretically keep running the pool and let the processes run until they are done. This opens up a lot of possibilities for complex parallel processing tasks.
modifiers are from another module, and are available on most functions and methods, including .
FuturesAnd, itself has a special modifier, , that allows you to unpack tuples into function arguments.
from suitkaise .processing import Pool
import asyncio
# get a coroutine for map with a timeout
coro = Pool .map .timeout (20.0).asynced ()
results = await coro(fn_or_skprocess, items)
# or, run in the background, get a Future
# and unpack tuples across function arguments (instead of adding the whole tuple as a single argument)
future = Pool .star ().map .background ()(fn_or_skprocess, items)
and do not work with each other (they do the same thing in different ways), but other than that, everything else is combinable.
These modifiers work with all map methods.
For more info on how to use these modifiers, see the pages or look at the examples.
suitkaise doesn't exist in a vacuum. It's designed to work with the rest of the ecosystem.
cucumber handles all serialization automatically. You never think about pickle errors.timing provides Sktimer objects that work natively inside Share -- aggregate timing statistics across processes without any extra code.sk generates _shared_meta for your classes, which tells Share exactly which attributes each method reads and writes. This is what makes Share efficient.circuits provides circuit breakers that work inside Share -- one process trips the circuit, every other process sees it immediately. Cross-process fault tolerance with zero setup.paths gives you Skpath objects that serialize cleanly through cucumber and work the same on every machine.Each module is useful on its own, but they were designed together. When you use , you get the full benefit of that integration.