processing provides powerful subprocess execution, parallel processing, and shared memory across process boundaries.
: Base class for easy, class based subprocess execution.
Skprocess tell () and listen () for queue based communicationcucumber autoreconnect : Parallel batch processing.
map : returns a list, ordered by inputunordered_map : returns an unordered list, fastest items firstimap : returns an iterator, ordered by inputunordered_imap : returns an iterator, unordered.star () modifier: unpacks tuples as function argumentssk modifiers: .timeout (), .background (), .asynced (): Shared memory container that works across processes.
Share and add any objects to it, like a regular classShare to your subprocesses: upgraded multiprocessing.
cucumber for serializationfrom suitkaise import processing
from suitkaise .processing import (
Skprocess ,
Pool ,
Share ,
Pipe ,
autoreconnect ,
ProcessTimers ,
ProcessError ,
PreRunError ,
RunError ,
PostRunError ,
OnFinishError ,
ResultError ,
ErrorHandlerError ,
ProcessTimeoutError ,
ResultTimeoutError ,
)
Skprocess Base class for subprocess execution. Inherit from this class and implement lifecycle methods.
from suitkaise .processing import Skprocess
class MyProcess (Skprocess ):
def __init__(self):
self.counter = 0
self.process_config .runs = 10
def __run__ (self):
self.counter += 1
def __result__ (self):
return self.counter
process = MyProcess ()
process.start()
process.wait()
result = process.result() # 10
Define any of these methods in your subclass. All are optional except .
: main work method
: setup before each iteration (run)
: cleanup after each iteration (run)
: cleanup/teardown after the process ends
: return data when the process completes
process.result() returns: return data when the process fails
__prerun__ ()Called before each iteration.
def __prerun__ (self):
self.data = fetch_next_batch()
Use for:
__run__ ()Main work method. Called each iteration.
def __run__ (self):
for item in self.data:
process(item)
This is where your core logic goes.
__postrun__ ()Called after each iteration completes.
def __postrun__ (self):
self.results.append(self.batch_result)
self.batch_result = None
Use for:
__onfinish__ ()Called when the process ends (stop signal or run limit reached).
def __onfinish__ (self):
self.cleanup_resources()
self.save_final_state()
Use for:
__result__ ()Return data when process completes. This is what returns.
def __result__ (self):
return {
'count': self.counter,
'results': self.results,
'status': 'completed'
}
NOTE: Your process will not return a result unless you define .
__error__ ()Handle errors when all lives are exhausted. Receives the error via self..
def __error__ (self):
log_error(self.error )
return {'status': 'failed', 'error': str(self.error)}
Default behavior: Returns self., which will be raised by .
process_config Configuration object available in your __init__. Set these to control process behavior.
runs Number of run iterations before auto-stopping.
def __init__(self):
# run 100 iterations, then stop
self.process_config .runs = 100
int: Run this many iterationsNone: Run indefinitely until stop () is calledjoin_in Maximum total runtime in seconds before auto-stopping.
def __init__(self):
# stop after 60 seconds
self.process_config .join_in = 60.0
float: Maximum seconds to runNone: No time limitlives Number of times to retry after a crash before giving up.
def __init__(self):
# retry up to 2 times (3 total attempts)
self.process_config .lives = 3
1: No retries (fail on first error)n > 1: Retry n-1 times on errorlives is decrementedlives reaches 0, __error__ () is calledtimeouts Timeout settings for each lifecycle section.
def __init__(self):
self.process_config .timeouts .prerun = 5.0
self.process_config .timeouts .run = 30.0
self.process_config .timeouts .postrun = 5.0
self.process_config .timeouts .onfinish = 10.0
self.process_config .timeouts .result = 5.0
self.process_config .timeouts .error = 5.0
All default to None (no timeout). Set a value to enable timeout for that section.
If a section times out, is raised. This counts against .
These are all of the methods you use to actually run and control made subprocesses.
start ()Start the process in a new subprocess.
process = MyProcess ()
process.start()
Skprocess objectSkprocess objectstop ()Signal the process to stop gracefully.
process.stop()
__onfinish__ () and __result__ ()wait () after stop () to block until finishedkill ()Forcefully terminate the process immediately.
process.kill()
__onfinish__ (), no resultwait ()Wait for the process to finish.
finished = process.wait() # blocks until done
finished = process.wait( timeout=10.0) # returns False if timeout
Arguments
timeout: Maximum seconds to wait.
float | None = NoneNone = wait foreverReturns
bool: True if process finished, False if timeout reached.
Modifiers:
# async
await process.wait .asynced ()()
If the process crashes and has lives remaining, continues blocking during the restart.
result ()Get the result from the process.
Will block until the process finishes if not already done.
Returns
Whatever returned.
data = process.result() # blocks until result ready
Raises
: If the process failed (after exhausting lives).
Modifiers:
# with timeout
data = process.result .timeout (10.0)()
# background - returns Future
future = process.result .background ()()
data = future.result ()
# async
data = await process.result .asynced ()()
run ()Start, wait, and return the result in one call.
result = process.run()
Equivalent to:
process.start()
process.wait()
result = process.result()
Returns
Whatever returned.
Raises
: If the process failed (after exhausting lives).
Modifiers:
# with timeout
result = process.run .timeout (30.0)()
# background - returns Future
future = process.run .background ()()
# ... do other work ...
result = future.result ()
# async
result = await process.run .asynced ()()
tell () and listen ()Bidirectional communication between parent and subprocess.
tell ()Send data to the other side.
# from parent
process.tell( {"command": "update_config", "value": 100})
# from subprocess (in lifecycle methods)
def __postrun__ (self):
self.tell ({"status": "batch_complete", "count": len(self.batch)})
Arguments
data: Any serializable data to send.
AnyNon-blocking - returns immediately after queuing the data.
listen ()Receive data from the other side.
# from parent
data = process.listen() # blocks until data received
data = process.listen( timeout=5.0) # returns None if timeout
# from subprocess (in lifecycle methods)
def __prerun__ (self):
command = self.listen (timeout=1.0)
if command:
self.handle_command(command)
Arguments
timeout: Maximum seconds to wait.
float | None = NoneNone = wait foreverReturns
Any | None: Data sent by the other side, or None if timeout.
Modifiers:
# background
future = process.listen .background ()(timeout=5.0)
# async
data = await process.listen .asynced ()()
Every lifecycle method is automatically timed.
process.start()
process.wait()
# access timing data
print(process.__run__ .timer .mean )
print(process.__prerun__ .timer .total_time )
print(process.__postrun__ .timer .percentile (95))
# aggregate timer for full iterations (prerun + run + postrun)
print(process.process_timer.mean)
Each timer is an with full statistics: mean, median, stdev, min, max, , ...
current_run: Current run iteration number (0-indexed).
intis_alive: Whether the subprocess is currently running.
booltimers: Container with all lifecycle timers.
ProcessTimers | None: The error that caused the process to fail (available in ).
BaseException | NonePool Process pool for parallel batch processing.
from suitkaise .processing import Pool
pool = Pool( workers=4)
# basic usage
results = pool.map( process_item, items)
Arguments
workers: Maximum concurrent workers.
int | None = NoneNone = number of CPUsmapApply function to each item, return list of results.
results = pool.map( fn, items)
Skprocess classesArguments
fn_or_process: Function or class to apply.
Callable | type[Skprocess ]iterable: Items to process.
IterableReturns
list: Results in order.
# star - unpacks tuples as function arguments
results = pool.star() .map (fn, [(1, 2), (3, 4)])
# fn(1, 2), fn(3, 4) instead of fn((1, 2), ), fn((3, 4), )
# with timeout
results = pool.map .timeout (30.0)(fn, items)
# background - returns Future
future = pool.map .background ()(fn, items)
results = future.result ()
# async
results = await pool.map .asynced ()(fn, items)
# combine modifiers
future = pool.map .timeout (30.0).background ()(fn, items)
results = await pool.map .asynced ().timeout (30.0)(fn, items)
# star composes with all modifiers
results = pool.star() .map .timeout (30.0)(fn, args_tuples)
future = pool.star() .map .background ()(fn, args_tuples)
results = await pool.star() .map .asynced ()(fn, args_tuples)
unordered_mapApply function to each item, return list in completion order.
results = pool.unordered_map( fn, items)
map)unordered_imap)Arguments and returns same as map, but results are in completion order.
# star - unpacks tuples as function arguments
results = pool.star() .unordered_map (fn, [(1, 2), (3, 4)])
# with timeout
results = pool.unordered_map .timeout (30.0)(fn, items)
# background - returns Future
future = pool.unordered_map .background ()(fn, items)
results = future.result ()
# async
results = await pool.unordered_map .asynced ()(fn, items)
# star composes with all modifiers
results = pool.star() .unordered_map .timeout (30.0)(fn, args_tuples)
future = pool.star() .unordered_map .background ()(fn, args_tuples)
results = await pool.star() .unordered_map .asynced ()(fn, args_tuples)
imapApply function to each item, return iterator of results.
for result in pool.imap( fn, items):
process(result)
next() if the next result isn't readyArguments and returns same as map, but returns Iterator instead of list.
# star - unpacks tuples as function arguments
for result in pool.star() .imap (fn, [(1, 2), (3, 4)]):
process(result)
# with timeout (per-item)
for result in pool.imap .timeout (10.0)(fn, items):
process(result)
# background - collects to list
future = pool.imap .background ()(fn, items)
results = future.result () # list
# async - collects to list
results = await pool.imap .asynced ()(fn, items) # list
# star composes with all modifiers
for result in pool.star() .imap .timeout (10.0)(fn, args_tuples):
process(result)
future = pool.star() .imap .background ()(fn, args_tuples)
results = await pool.star() .imap .asynced ()(fn, args_tuples)
unordered_imapApply function to each item, yield results as they complete.
for result in pool.unordered_imap( fn, items):
process(result)
Arguments and returns same as imap.
# star - unpacks tuples as function arguments
for result in pool.star() .unordered_imap (fn, [(1, 2), (3, 4)]):
process(result)
# with timeout
for result in pool.unordered_imap .timeout (30.0)(fn, items):
process(result)
# background - collects to list
future = pool.unordered_imap .background ()(fn, items)
results = future.result () # list
# async - collects to list
results = await pool.unordered_imap .asynced ()(fn, items) # list
# star composes with all modifiers
for result in pool.star() .unordered_imap .timeout (30.0)(fn, args_tuples):
process(result)
future = pool.star() .unordered_imap .background ()(fn, args_tuples)
results = await pool.star() .unordered_imap .asynced ()(fn, args_tuples)
star () ModifierUnpack tuples as function arguments.
# without star: fn receives a single tuple argument
pool.map( fn, [(1, 2), (3, 4)])
# fn((1, 2), ), fn((3, 4), )
# with star: fn receives unpacked arguments
pool.star() .map (fn, [(1, 2), (3, 4)])
# fn(1, 2), fn(3, 4)
Works with all methods:
pool.star() .map (fn, args_tuples)
pool.star() .imap (fn, args_tuples)
pool.star() .unordered_imap (fn, args_tuples)
pool.star() .unordered_map (fn, args_tuples)
Works with other modifiers:
pool.star() .map .timeout (30.0)(fn, args_tuples)
await pool.star() .imap .asynced ()(fn, args_tuples)
Skprocess with Pool class ProcessItem (Skprocess ):
def __init__(self, item):
self.item = item
self.process_config .runs = 1
def __run__ (self):
self.result_data = heavy_computation(self.item)
def __result__ (self):
return self.result_data
# Pool creates instances and runs them
results = pool.map( ProcessItem , items)
The pool:
ProcessItem instance for each itemwith Pool( workers=4) as pool:
results = pool.map( fn, items)
# pool is closed on exit
close() and terminate()pool.close() # wait for all active processes to finish
pool.terminate() # forcefully terminate all processes
Share Container for shared memory across process boundaries.
The easiest and greatest way to share data between processes.
Uses for serialization, so now you can easily share anything you want.
from suitkaise .processing import Share
from suitkaise .timing import Sktimer
share = Share( )
share.timer = Sktimer( )
share.counter = 0
share = Share( )
share.counter = 0
class IncrementProcess (Skprocess ):
def __init__(self, share):
self.share = share
self.process_config .runs = 10
def __postrun__ (self):
self.share.counter += 1
pool = Pool( workers=4)
pool.map( IncrementProcess , [share] * 10)
print(share.counter) # 100 (10 processes × 10 runs each)
Share uses a coordinator-proxy system:
With _shared_meta (suitkaise objects):
Sktimer , Circuit , BreakingCircuit , ...User classes:
Skclass to generate _shared_metaPrimitives:
int, str, float, bool, list, dict, ...Iterators (enumerate, zip, map, ...):
cucumber by exhausting remaining valuesNot Supported:
multiprocessing.* objects (queues, managers, events, shared_memory, connections)Share primitives insteados.pipe() file handles / pipe-backed io.FileIOshare = Share( ) # auto-starts
# stop sharing (frees resources)
share.stop()
# or
share.exit()
# start again
share.start()
While stopped, changes are queued but won't take effect until is called.
reconnects all Reconnector objects currently stored in Share and returns a dict of reconnected objects by name.
share = Share( )
share.db = sqlite3.connect(":memory:")
# share.db is a Reconnector in Share
reconnected = share.reconnect_all()
# now it's a live connection again
conn = reconnected["db"]
with Share( ) as share:
share.counter = 0
# ... use share ...
# automatically stopped on exit
is_running: Whether the coordinator is running.
boolhas_error: Whether the coordinator encountered an error.
bool: Start the coordinator.
: Stop the coordinator gracefully.
True if stopped cleanly, False if timed out.exit(timeout=5.0): Alias for .
clear(): Clear all shared objects and counters.
Pipe Fast, direct parent/child communication using multiprocessing..
This is the fastest way to communicate between processes.
from suitkaise .processing import Pipe
# create a pipe pair
anchor, point = Pipe .pair()
# bidirectional (default)
anchor, point = Pipe .pair()
# one-way
anchor, point = Pipe .pair(one_way=True)
For one-way pipes, the anchor is the send-only end (parent), and the point is the receive-only end (child).
Anchor:
Point:
# from anchor (parent)
anchor.send({"data": [1, 2, 3]})
response = anchor.recv()
# from point (subprocess)
data = point.recv()
point.send({"status": "received"})
send(obj): Serialize with and send.
recv(): Receive and deserialize with .
close(): Close the connection.
class PipeProcess (Skprocess ):
def __init__(self, pipe_point):
self.pipe = pipe_point
self.process_config .runs = 1
def __run__ (self):
# receive command
command = self.pipe.recv()
# process it
result = process_command(command)
# send result back
self.pipe.send(result)
# parent
anchor, point = Pipe .pair()
process = PipeProcess (point)
process.start()
anchor.send({"action": "compute", "value": 42})
result = anchor.recv()
process.wait()
point.lock() # prevent transfer
point.unlock() # allow transfer
anchor.lock() # always locked
anchor.unlock() # raises PipeEndpointError
autoreconnect autoreconnect DecoratorAutomatically reconnect resources (database connections, sockets, ...) when an is deserialized in the child process.
Since is serialized with , it gives you placeholders for live resources that can be reconnected.
Usually, you need to call to reconnect all resources in an object.
However, with , you can decorate a class and it will automatically reconnect all resources when the is deserialized in the child process.
from suitkaise .processing import Skprocess , autoreconnect
@autoreconnect (
start_threads=True,
**{
"psycopg2.Connection": {"*": "secret"},
"redis.Redis": {"*": "redis_pass"},
}
)
class MyProcess (Skprocess ):
def __init__(self, db_connection, cache_connection):
self.db = db_connection
self.cache = cache_connection
def __run__ (self):
# db and cache are automatically reconnected
self.db.execute(...)
self.cache.get(...)
Arguments
start_threads: If True, auto-start any deserialized threads.
bool = False**auth: Reconnection parameters keyed by type, then by attribute name.
"*" as the attr key for defaults that apply to all instancesWhen deserializes the :
Reconnector objects@autoreconnect calls reconnect_all () automaticallyReconnector is replaced with a live connection using the provided auth@autoreconnect (**{
"psycopg2.Connection": {
"*": "default_password", # default for all psycopg2 connections
"analytics_db": "analytics_secret", # specific override for analytics_db attr
},
})
class MyProcess (Skprocess ):
def __init__(self):
self.main_db = psycopg2.connect(...) # uses "*" auth
self.analytics_db = psycopg2.connect(...) # uses "analytics_db" auth
ProcessTimers Container for timing lifecycle sections.
from suitkaise .processing import ProcessTimers
# usually accessed via process.timers
process.start()
process.wait()
timers = process.timers
print(timers.run .mean )
print(timers.prerun .total_time )
print(timers.full_run.percentile (95))
Each property is an :
: Timer for calls.
: Timer for calls.
: Timer for calls.
: Timer for call.
: Timer for call.
: Timer for call.
full_run: Aggregate timer for complete iterations (prerun + run + postrun).
All exceptions inherit from .
ProcessError Base class for all Process-related errors.
from suitkaise .processing import ProcessError
try:
result = process.result()
except ProcessError as e:
print(f"Process failed: {e}")
Properties:
current_run: Run iteration where error occurredoriginal_error: The underlying exceptionPreRunError Raised when fails.
RunError Raised when fails.
PostRunError Raised when fails.
OnFinishError Raised when fails.
ResultError Raised when fails.
ErrorHandlerError Raised when fails.
ProcessTimeoutError Raised when a lifecycle section times out.
from suitkaise .processing import ProcessTimeoutError
try:
result = process.result()
except ProcessTimeoutError as e:
print(f"Timeout in {e.section} after {e.timeout}s on run {e.current_run}")
Properties:
section: Which lifecycle method timed outtimeout: The timeout value that was exceededResultTimeoutError Raised when , , or times out via . modifier.
from suitkaise .processing import ResultTimeoutError
try:
result = process.result .timeout (10.0)()
except ResultTimeoutError as e:
print("Timed out waiting for result")