processing works is built around subprocess execution with several key components.
The module is built around subprocess execution with several key components:
┌────────────────────────────────────────────────────────────────────────────┐
│ Parent Process │
│ │
│ ┌──────────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────────┐ │
│ │ Skprocess │ │ Pool │ │ Share │ │ Pipe │ │
│ │ (instance) │ │ (workers) │ │(coordinator)│ │(anchor/point) │ │
│ └──────┬───────┘ └──────┬──────┘ └──────┬──────┘ └───────┬───────┘ │
│ │ │ │ │ │
│ │ cucumber │ cucumber │ manager │ pickle │
│ │ serialize │ serialize │ proxies │ handles │
│ ▼ ▼ ▼ ▼ │
└─────────┼──────────────────┼─────────────────┼──────────────────┼──────────┘
│ │ │ │
│ │ │ │
┌─────────┼──────────────────┼─────────────────┼──────────────────┼──────────┐
│ ▼ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────────┐ │
│ │ Engine │ │ Worker │ │ Coordinator │ │ Point │ │
│ │ (lifecycle) │ │ (inline) │ │ (process) │ │ (endpoint) │ │
│ └──────────────┘ └─────────────┘ └─────────────┘ └───────────────┘ │
│ │
│ Subprocess(es) │
└────────────────────────────────────────────────────────────────────────────┘
Skprocess When you define a subclass of :
class MyProcess (Skprocess ):
def __init__(self):
self.counter = 0
The __init_subclass__ hook runs automatically when your class is defined.
__init_subclass__ on the parent class (Skprocess )__init__ method to call _setup() first__serialize__ and __deserialize__ methods for cucumber to use__run__ , etc.)# what happens under the hood
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
# wrap __init__ if defined
if '__init__' in cls.__dict__:
original_init = cls.__dict__['__init__']
def wrapped_init(self, *args, **kwargs):
Skprocess ._setup(self) # parent setup first
original_init(self, *args, **kwargs) # then user's __init__
cls.__init__ = wrapped_init
# set up serialization methods
cls.__serialize__ = make_serialize(user_serialize)
cls.__deserialize__ = make_deserialize(user_deserialize)
class MyProcess(Skprocess ) is parsed, Python triggers Skprocess .__init_subclass____init__ is saved and replaced with a wrapper_setup() runs before your code, initializing all internal state__serialize__/__deserialize__ methods are generated that know how to capture your specific lifecycle methods_setup) initializes all internal state before your __init__ runs.
_setup() createsprocess_config holds runs , join_in , lives , and timeouts timers container (created lazily when first needed)_current_run counter and _start_time timestamperror attribute for __error__ to access_tell_queue (parent→child) and _listen_queue (child→parent)_subprocess holds the multiprocessing.Process object_result and _has_result for retrieving the final valueprocess.__run__ .timer worksdef _setup(instance):
# configuration with defaults
instance.process_config = ProcessConfig()
# timers container (created when needed)
instance.timers = None
# runtime state
instance._current_run = 0
instance._start_time = None
# error state (set when error occurs)
instance.error = None
# communication primitives (created on start)
instance._stop_event = None
instance._result_queue = None
instance._tell_queue = None # Parent → Child
instance._listen_queue = None # Child → Parent
# subprocess handle
instance._subprocess = None
# result storage
instance._result = None
instance._has_result = False
# set up TimedMethod wrappers
Skprocess ._setup_timed_methods(instance)
Each lifecycle method is wrapped in a TimedMethod to enable timer access.
process.__run__ .timer syntax to get the timer for __run__ process.__run__() )TimedMethod does"run " for __run__ ).timer access, looks up the Sktimer from process.timersclass TimedMethod:
def __init__(self, method, process, timer_name):
self._method = method
self._process = process
self._timer_name = timer_name
def __call__(self, *args, **kwargs):
return self._method(*args, **kwargs)
@property
def timer (self):
# returns the Sktimer for this method
if self._process.timers is None:
return None
return getattr(self._process.timers, self._timer_name, None)
This enables the access pattern:
process.run()
print(process.__run__ .timer .elapsed ) # Get timing for __run__ method
print(process.__prerun__ .timer .elapsed ) # Get timing for __prerun__ method
When is called, the entire object must be transferred to the subprocess. This uses serialization.
TimedMethod wrappers which aren't serializable)__run__ , __prerun__ , etc.) as actual function objectscucumber can serializedef __serialize__(self):
return {
'instance_dict': {k: v for k, v in self.__dict__.items()
if not isinstance(v, TimedMethod)},
'class_name': cls.__name__,
'lifecycle_methods': {
name: cls.__dict__[name]
for name in ['__prerun__', '__run__', '__postrun__',
'__onfinish__', '__result__', '__error__']
if name in cls.__dict__
},
'class_attrs': {...},
}
type() with the saved lifecycle methodsobject.__new__() to skip __init__ (state already captured)TimedMethod for timer access@autoreconnect was used, call reconnect_all () to restore live connections@staticmethod
def __deserialize__(state):
# rebuild class with lifecycle methods
new_class = type(
state['class_name'],
(Skprocess ,),
state['lifecycle_methods'] | state['class_attrs']
)
# create instance without calling __init__
obj = object.__new__(new_class)
obj.__dict__.update(state['instance_dict'])
# set up timed methods
Skprocess ._setup_timed_methods(obj)
# handle @autoreconnect
if getattr(new_class, '_auto_reconnect_enabled', False):
obj = reconnect_all (obj, **reconnect_kwargs)
return obj
start () flowprocess.start() ProcessTimers if not already presentcucumber _stop_event - Signal to tell subprocess to stop_result_queue - Subprocess sends final result/error here_tell_queue - Parent sends messages to child_listen_queue - Child sends messages to parentjoin_in time limit checkingmultiprocessing.Process targeting the engineresult () completesdef start (self):
from .engine import _engine_main
from suitkaise import cucumber
# ensure timers exist
if self.timers is None:
self.timers = ProcessTimers ()
# serialize current state
serialized = cucumber .serialize (self)
# create communication primitives (manager-backed to avoid SemLock issues)
manager = _get_ipc_manager() # shared manager for all Skprocess instances
self._stop_event = manager.Event()
self._result_queue = manager.Queue()
self._tell_queue = manager.Queue() # Parent → Child
self._listen_queue = manager.Queue() # Child → Parent
# record start time
from suitkaise import timing
self._start_time = timing .time()
# spawn subprocess
self._subprocess = multiprocessing.Process(
target=_engine_main,
args=(serialized, self._stop_event, self._result_queue,
serialized, self._tell_queue, self._listen_queue)
)
self._subprocess.start()
After returns:
The engine runs in the subprocess and orchestrates the lifecycle.
Skprocess object from bytesProcessTimers existslives from config for retry trackingjoin_in tracking__prerun__ - Timed, with configured timeout__run__ - Your main work, timed with configured timeout__postrun__ - Cleanup after each run, timed__onfinish__ → __result__ )lives_remaining__error__ , send error to parentdef _engine_main_inner(serialized_process, stop_event, result_queue,
original_state, tell_queue, listen_queue):
# deserialize the process
process = cucumber .deserialize (serialized_process)
# ensure timers exist
if process.timers is None:
process.timers = ProcessTimers ()
# track lives
lives_remaining = process.process_config .lives
# set up communication (SWAPPED for symmetric API)
process._stop_event = stop_event
process._tell_queue = listen_queue # subprocess tell() → parent listen()
process._listen_queue = tell_queue # parent tell() → subprocess listen()
process._start_time = timing .time()
while lives_remaining > 0:
try:
# main execution loop
while _should_continue(process, stop_event):
_run_section_timed(process, '__prerun__', 'prerun', PreRunError, stop_event)
if stop_event.is_set(): break
_run_section_timed(process, '__run__', 'run', RunError, stop_event)
if stop_event.is_set(): break
_run_section_timed(process, '__postrun__', 'postrun', PostRunError, stop_event)
process._current_run += 1
process.timers._update_full_run()
# success - run finish sequence
_run_finish_sequence(process, stop_event, result_queue)
return
except (PreRunError , RunError , PostRunError , ProcessTimeoutError ) as e:
lives_remaining -= 1
if lives_remaining > 0:
# retry with current state
process.process_config .lives = lives_remaining
continue
else:
# no lives - send error
_send_error(process, e, result_queue)
return
The tell/listen queues are swapped in the subprocess to create a symmetric API.
Without swapping:
tell_queue and listen_queuetell () writes to tell_queuelisten () reads from listen_queuetell () would write to... tell_queue (same as parent!)Swap in subprocess to maintain symmetry.
Parent Process:
process._tell_queue = Queue() # Parent → Child (parent writes here)
process._listen_queue = Queue() # Child → Parent (parent reads from here)
Subprocess (after deserialization):
process._tell_queue = listen_queue # Child writes here → Parent reads
process._listen_queue = tell_queue # Child reads from here ← Parent writes
Result - symmetric API.
This means both sides use the same mental model:
tell () always sends TO the other sidelisten () always receives FROM the other sidestop ()? Check the multiprocessing event.process_config .runs iterations? (If runs =None, skip this check - run indefinitely)process_config .join_in seconds? (If join_in =None, skip this check)Evaluation order matters:
def _should_continue(process, stop_event):
# check stop signal
if stop_event.is_set():
return False
# check run count limit
if process.process_config .runs is not None:
if process._current_run >= process.process_config .runs :
return False
# check time limit (join_in)
if process.process_config .join_in is not None:
elapsed = timing .elapsed (process._start_time)
if elapsed >= process.process_config .join_in :
return False
return True
Each lifecycle section is timed individually.
TimedMethod if necessary to get the raw functiontimeouts .run )Sktimer exists for this sectionProcessTimeoutError RunError )def _run_section_timed(process, method_name, timer_name, error_class, stop_event):
# get method (unwrap TimedMethod if needed)
method_attr = getattr(process, method_name)
method = method_attr._method if hasattr(method_attr, '_method') else method_attr
# get timeout
timeout = getattr(process.process_config .timeouts , timer_name, None)
# get or create timer
timer = process.timers._ensure_timer(timer_name)
timer .start ()
try:
run_with_timeout(method, timeout, method_name, process._current_run)
timer .stop ()
except ProcessTimeoutError :
timer .discard () # don't record failed timing
raise
except Exception as e:
timer .discard ()
raise error_class(process._current_run, e) from e
Platform-specific timeout handling.
SIGALRM handler that raises ProcessTimeoutError timeout secondsThis approach can interrupt any code, including blocking I/O.
def _signal_based_timeout(func, timeout, section, current_run):
if timeout is None:
return func()
def handler(signum, frame):
raise ProcessTimeoutError (section, timeout, current_run)
old_handler = signal.signal(signal.SIGALRM, handler)
signal.alarm(int(timeout) + 1)
try:
return func()
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
ProcessTimeoutError Limitation: The thread-based approach cannot interrupt blocking code. The thread continues running as a "zombie" but the timeout is detected and raised.
def _thread_based_timeout(func, timeout, section, current_run):
if timeout is None:
return func()
result = [None]
exception = [None]
completed = threading.Event()
def wrapper():
try:
result[0] = func()
except BaseException as e:
exception[0] = e
finally:
completed.set()
thread = threading.Thread(target=wrapper, daemon=True)
thread.start()
finished = completed.wait(timeout=timeout)
if not finished:
raise ProcessTimeoutError (section, timeout, current_run)
if exception[0] is not None:
raise exception[0]
return result[0]
__onfinish__ - Final cleanup, timed with configured timeoutOnFinishError to parent, abort__result__ - Extract the return value, timedResultError to parent, abort{"type": "result", ...}def _run_finish_sequence(process, stop_event, result_queue):
# run __onfinish__
method = unwrap(process.__onfinish__ )
timeout = process.process_config .timeouts .onfinish
timer = process.timers._ensure_timer('onfinish')
timer .start ()
try:
run_with_timeout(method, timeout, '__onfinish__', process._current_run)
except Exception as e:
_send_error(process, OnFinishError (process._current_run, e), result_queue)
return
finally:
timer .stop ()
# run __result__
result_method = unwrap(process.__result__ )
result_timeout = process.process_config .timeouts .result
result_timer = process.timers._ensure_timer('result')
result_timer.start()
try:
result = run_with_timeout(result_method, result_timeout, '__result__', process._current_run)
except Exception as e:
_send_error(process, ResultError (process._current_run, e), result_queue)
return
finally:
result_timer.stop()
# send success result with timers
result_queue.put({
"type": "result",
"data": cucumber .serialize (result),
"timers": cucumber .serialize (process.timers)
})
process.error - Make the error accessible to __error__ method__error__ - Give user a chance to handle/transform the error__error__ itself fails: use the original error__error__ returned (or original error){"type": "error", ...}def _send_error(process, error , result_queue):
# set error for __error__ to access
process.error = error
error_method = unwrap(process.__error__ )
error_timeout = process.process_config .timeouts .error
error_timer = process.timers._ensure_timer('error')
error_timer.start()
try:
error_result = run_with_timeout(error_method, error_timeout, '__error__', process._current_run)
except Exception:
# if __error__ fails, send original error
error_result = error
finally:
error_timer.stop()
# send error result
result_queue.put({
"type": "error",
"data": cucumber .serialize (error_result),
"timers": cucumber .serialize (process.timers)
})
The parent must drain the result queue BEFORE joining the subprocess to avoid deadlock.
join() waiting for subprocess to exit_sync_wait() avoids deadlock_drain_result_queue() worksget_nowait() firstget(timeout=0.5)_has_result = True so subsequent calls skipdef _sync_wait(self, timeout=None):
if self._subprocess is None:
return True
# MUST drain result queue BEFORE waiting
# otherwise: subprocess can't exit until queue is drained,
# but we can't drain until subprocess exits = deadlock
self._drain_result_queue()
self._subprocess.join(timeout=timeout)
self._drain_result_queue()
return not self._subprocess.is_alive()
def _drain_result_queue(self):
if self._has_result or self._result_queue is None:
return
try:
message = self._result_queue.get_nowait()
except queue.Empty:
message = self._result_queue.get(timeout=0.5)
except:
return
# update timers from subprocess
if message.get('timers'):
self.timers = cucumber .deserialize (message['timers'])
Skprocess ._setup_timed_methods(self)
if message["type"] == "error":
self._result = cucumber .deserialize (message["data"])
else:
self._result = cucumber .deserialize (message["data"])
self._has_result = True
Pool class Pool :
def __init__(self, workers=None):
self._workers = workers or multiprocessing.cpu_count()
self._active_processes = []
self._mp_pool = multiprocessing.Pool (processes=self._workers)
multiprocessing.Pool .map() for efficiency(serialized_fn, serialized_item, is_star)multiprocessing.Pool .map() to distribute workself._workers limitTimeoutErrordef _map_impl(self, fn_or_process, iterable, is_star, timeout=None):
items = list(iterable)
if not items:
return []
# serialize function once
serialized_fn = cucumber .serialize (fn_or_process)
# use built-in multiprocessing.Pool for efficiency when no timeout
if timeout is None and self._mp_pool is not None:
args = [
(serialized_fn, cucumber .serialize (item), is_star)
for item in items
]
messages = self._mp_pool.map(_pool_worker_bytes_args, args)
results = []
for message in messages:
if message["type"] == "error":
raise cucumber .deserialize (message["data"])
results.append(cucumber .deserialize (message["data"]))
return results
# manual worker management with timeout
results = [None] * len(items)
active = []
next_index = 0
while active or next_index < len(items):
# start workers up to limit
while next_index < len(items) and len(active) < self._workers:
serialized_item = cucumber .serialize (items[next_index])
queue, worker = self._spawn_worker(serialized_fn, serialized_item, is_star)
active.append((next_index, queue, worker))
next_index += 1
# collect finished workers
for idx, queue, worker in list(active):
worker.join(timeout=timeout)
if worker.is_alive():
worker.terminate()
raise TimeoutError(f"Worker {idx} timed out")
message = queue.get()
if message["type"] == "error":
raise cucumber .deserialize (message["data"])
results[idx] = cucumber .deserialize (message["data"])
active.remove((idx, queue, worker))
break
return results
is_star=True, unpack tuple as positional argsfn_or_process is an Skprocess subclassdef _pool_worker(serialized_fn, serialized_item, is_star, result_queue):
try:
fn_or_process = cucumber .deserialize (serialized_fn)
item = cucumber .deserialize (serialized_item)
# unpack if star mode
if is_star:
args = item if isinstance(item, tuple) else (item,)
else:
args = (item,)
# check if Skprocess class
if isinstance(fn_or_process, type) and issubclass(fn_or_process, Skprocess ):
process_instance = fn_or_process(*args)
result = _run_process_inline(process_instance)
else:
result = fn_or_process(*args) if is_star else fn_or_process(item)
result_queue.put({
"type": "result",
"data": cucumber .serialize (result)
})
except Exception as e:
result_queue.put({
"type": "error",
"data": cucumber .serialize (e)
})
When runs a , it runs inline since it's already in a subprocess. No need to spawn another subprocess.
Skprocess executionthreading.Event - Uses thread event instead of multiprocessing eventProcessTimers if neededthreading.Event for potential early termination__prerun__ → __run__ → __postrun__ cycle__onfinish__ → __result__ , return result__error__ def _run_process_inline(process):
# ensure timers exist
if process.timers is None:
process.timers = ProcessTimers ()
# initialize state
process._current_run = 0
process._start_time = timing .time()
# create threading.Event (not multiprocessing.Event - already in subprocess)
stop_event = threading.Event()
process._stop_event = stop_event
lives_remaining = process.process_config .lives
while lives_remaining > 0:
try:
while _should_continue_inline():
_run_section_timed('__prerun__', 'prerun', PreRunError)
if stop_event.is_set(): break
_run_section_timed('__run__', 'run', RunError)
if stop_event.is_set(): break
_run_section_timed('__postrun__', 'postrun', PostRunError)
process._current_run += 1
process.timers._update_full_run()
return _run_finish_sequence_inline(process)
except (PreRunError , RunError , PostRunError , ProcessTimeoutError ) as e:
lives_remaining -= 1
if lives_remaining > 0:
continue
else:
return _run_error_sequence_inline(process, e)
Pool methods return modifier objects that allow chaining.
pool.map returns _PoolMapModifier instancepool.map( fn, items)) runs synchronously.timeout (30) returns _PoolMapTimeoutModifier with timeout stored.background () returns _PoolMapBackgroundModifier that returns a Future.asynced () returns _PoolMapAsyncModifier that returns a coroutinepool.map → _PoolMapModifier → sync execution
pool.map .timeout (30) → _PoolMapTimeoutModifier → sync with timeout
pool.map .background () → _PoolMapBackgroundModifier→ returns Future
pool.map .asynced () → _PoolMapAsyncModifier → returns coroutine
class _PoolMapModifier:
def __init__(self, pool, is_star=False):
self._pool = pool
self._is_star = is_star
def __call__(self, fn_or_process, iterable):
return self._pool._map_impl(fn_or_process, iterable, self._is_star)
def timeout (self, seconds):
return _PoolMapTimeoutModifier(self._pool, self._is_star, seconds)
def background (self):
return _PoolMapBackgroundModifier(self._pool, self._is_star)
def asynced (self):
return _PoolMapAsyncModifier(self._pool, self._is_star)
modifier Returns a StarModifier that configures is_star=True for all methods. This makes each method unpack tuples as positional arguments.
class StarModifier:
def __init__(self, pool):
self._pool = pool
@property
def map (self):
return _PoolMapModifier(self._pool, is_star=True)
@property
def imap (self):
return _PoolImapModifier(self._pool, is_star=True)
@property
def unordered_imap (self):
return _PoolUnorderedImapModifier(self._pool, is_star=True)
@property
def unordered_map (self):
return _PoolUnorderedMapModifier(self._pool, is_star=True)
Share uses a coordinator-proxy system to enable safe concurrent access to shared objects.
All writes go through a single queue to a coordinator process, ensuring serialized (one-at-a-time) execution. Reads wait for pending writes to complete before fetching.
┌─────────────────────────────────────────────────────────────────────────┐
│ Share Container │
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ timer │ │ counter │ │ config │ (user objects) │
│ │ (proxy) │ │ (proxy) │ │ (direct) │ │
│ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │
│ │ │ │ │
│ │ getattr() │ setattr() │ fetch │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Coordinator │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │ │
│ │ │ Command Q │ │ Counters │ │ Source Store │ │ │
│ │ │ (Manager) │ │ (Atomic) │ │ (Manager) │ │ │
│ │ └─────────────┘ └─────────────┘ └──────────────┘ │ │
│ │ │ │
│ │ Background Process: │ │
│ │ - Consumes commands │ │
│ │ - Executes on mirrors │ │
│ │ - Commits to source │ │
│ │ - Updates counters │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
share.timer = Sktimer( ))_SHARE_ATTRS)_shared_meta - Does the object's class have sharing metadata?Skclass() to generate it_shared_meta, determine which attributes each method reads/writes_ObjectProxy to intercept accessNote: When is deserialized inside a subprocess, it is reconstructed in client mode from a snapshot. In that mode it does not re-register objects or allocate new counters; it simply restores the serialized snapshots and proxies so reads/writes go through the existing coordinator.
Primitive shared values (int, float, bool, str, bytes, tuple, frozenset, complex) are handled with a lightweight primitive proxy. This gives regular assignment and augmented operators (+=, -=, *=, etc.) the same cross-process safety guarantees as other shared writes.
How it works:
This keeps common patterns like:
share.counter += 1
share.log.info(f"counter: {share.counter}")
consistent with user expectations under concurrency.
def __setattr__(self, name, value):
if name in self._SHARE_ATTRS:
object.__setattr__(self, name, value)
return
# check for _shared_meta (suitkaise objects)
has_meta = hasattr(type(value), '_shared_meta')
# auto-wrap user classes
if not has_meta and self._is_user_class_instance(value):
Skclass(type(value)) # generates _shared_meta
has_meta = True
# extract read/write attrs from _shared_meta
attrs = set()
if has_meta:
meta = getattr(type(value), '_shared_meta', {})
for method_meta in meta.get('methods', {}).values():
attrs.update(method_meta.get('writes', []))
attrs.update(method_meta.get('reads', []))
for prop_meta in meta.get('properties', {}).values():
attrs.update(prop_meta.get('reads', []))
attrs.update(prop_meta.get('writes', []))
# register with coordinator
self._coordinator.register_object(name, value, attrs=attrs)
# create proxy or direct reference
if has_meta:
proxy = _ObjectProxy(name, self._coordinator, type(value))
self._proxies[name] = proxy
else:
self._proxies[name] = None # fetch directly
The coordinator is a background process that serializes all writes.
class _Coordinator:
def __init__(self, manager=None):
self._manager = manager or Manager()
self._command_queue = self._manager.Queue()
self._counter_registry = _AtomicCounterRegistry(self._manager)
self._source_store = self._manager.dict()
self._source_lock = self._manager.Lock()
self._object_names = self._manager.list()
All writes go through the command queue.
def queue_command(self, object_name, method_name, args=(), kwargs=None, written_attrs=None):
serialized_args = cucumber .serialize (args)
serialized_kwargs = cucumber .serialize (kwargs or {})
command = (object_name, method_name, serialized_args, serialized_kwargs, written_attrs or [])
self._command_queue.put(command)
The counter system ensures reads see consistent state by tracking pending writes.
pending = 5, completed = 3target = pending = 5completed >= 5completed = 5def increment_pending(self, key):
return self._counter_registry.increment_pending(key)
def get_read_target(self, key):
targets = self._counter_registry.get_read_targets([key])
return targets.get(key, 0)
def wait_for_read(self, keys, timeout=1.0):
return self._counter_registry.wait_for_read(keys, timeout=timeout)
def _coordinator_main(command_queue, counter_registry, source_store,
source_lock, stop_event, error_event, poll_timeout):
mirrors = {} # local cache of deserialized objects
while not stop_event.is_set():
try:
command = command_queue.get(timeout=poll_timeout)
except queue.Empty:
continue
object_name, method_name, ser_args, ser_kwargs, written_attrs = command
# special commands
if object_name == "__clear__":
mirrors.clear()
continue
if object_name == "__remove__":
mirrors.pop(method_name, None)
continue
# deserialize args
args = cucumber .deserialize (ser_args)
kwargs = cucumber .deserialize (ser_kwargs)
# get mirror (from cache or source)
mirror = mirrors.get(object_name)
if mirror is None:
with source_lock:
serialized = source_store.get(object_name)
if serialized:
mirror = cucumber .deserialize (serialized)
mirrors[object_name] = mirror
if mirror is None:
_update_counters_after_write(counter_registry, object_name, written_attrs)
continue
# execute method on mirror
try:
method = getattr(mirror, method_name)
method(*args, **kwargs)
except Exception:
traceback.print_exc()
# commit to source of truth
with source_lock:
serialized = cucumber .serialize (mirror)
source_store[object_name] = serialized
# update counters
_update_counters_after_write(counter_registry, object_name, written_attrs)
The proxy intercepts all attribute access and routes it through the coordinator.
share.timer .start ())_shared_meta['methods'], return _MethodProxy_shared_meta['properties'], wait for writes then fetchshare.timer .count = 5)class _ObjectProxy:
_PROXY_ATTRS = frozenset({'_object_name', '_coordinator', '_wrapped_class', '_shared_meta'})
def __init__(self, object_name, coordinator, wrapped_class):
object.__setattr__(self, '_object_name', object_name)
object.__setattr__(self, '_coordinator', coordinator)
object.__setattr__(self, '_wrapped_class', wrapped_class)
object.__setattr__(self, '_shared_meta', getattr(wrapped_class, '_shared_meta', None))
def __getattr__(self, name):
# method calls -> return callable that queues commands
if self._shared_meta and name in self._shared_meta.get('methods', {}):
return _MethodProxy(self, name)
# properties -> wait for writes, then fetch
if self._shared_meta and name in self._shared_meta.get('properties', {}):
return self._read_property(name)
# fallback -> fetch and get attr
return self._read_attr(name)
def __setattr__(self, name, value):
if name in self._PROXY_ATTRS:
object.__setattr__(self, name, value)
return
# queue setattr command
self._coordinator.increment_pending(f"{self._object_name}.{name}")
self._coordinator.queue_command(
self._object_name,
'__setattr__',
(name, value),
{},
[name]
)
share.timer .start ())_shared_meta, which attributes will this method modify?This is why writes are "fire-and-forget" - the method call returns before the coordinator processes it.
class _MethodProxy:
def __init__(self, object_proxy, method_name):
self._object_proxy = object_proxy
self._method_name = method_name
def __call__(self, *args, **kwargs):
proxy = self._object_proxy
meta = proxy._shared_meta
# get write attrs from _shared_meta
method_meta = meta['methods'].get(self._method_name, {})
write_attrs = method_meta.get('writes', [])
# increment pending counters
for attr in write_attrs:
key = f"{proxy._object_name}.{attr}"
proxy._coordinator.increment_pending(key)
# queue command (fire-and-forget)
proxy._coordinator.queue_command(
proxy._object_name,
self._method_name,
args,
kwargs,
write_attrs
)
share.timer .elapsed )_shared_meta, which attributes does this property depend on?"object_name.attr_name" keyThis is why reads block on pending writes - to ensure you see consistent state.
def _read_property(self, name):
# get read dependencies from _shared_meta
prop_meta = self._shared_meta['properties'].get(name, {})
read_attrs = prop_meta.get('reads', [])
keys = [f"{self._object_name}.{attr}" for attr in read_attrs]
# wait for all writes to complete
if keys:
self._coordinator.wait_for_read(keys, timeout=10.0)
# fetch fresh snapshot
obj = self._coordinator.get_object(self._object_name)
return getattr(obj, name)
Pipe _conn is a multiprocessing.Pipe connection object_locked prevents serialization (transfer to subprocess)"anchor" (parent side) or "point" (transferable side)send() workscucumber recv() workscucumber and returnmultiprocessing.reduction.ForkingPickler)@dataclass
class _PipeEndpoint:
_conn: Optional[Any]
_locked: bool = False
_role: str = "point"
def send(self, obj):
conn = self._ensure_conn()
conn.send_bytes(cucumber .serialize (obj))
def recv(self):
conn = self._ensure_conn()
data = conn.recv_bytes()
return cucumber .deserialize (data)
def __serialize__(self):
if self._locked:
raise PipeEndpointError("Locked endpoint cannot be serialized")
# pickle the connection handle for multiprocessing
payload = ForkingPickler.dumps(self._conn)
return {
"conn_pickle": payload,
"locked": self._locked,
"role": self._role
}
Anchor vs Pointlock() on point after transfer prevents re-transferpair() worksmultiprocessing.Pipe with two connection objectsone_way=True, ensure anchor is the send-only end and point is the recv-only endAnchor (automatically locked)Point (unlocked, ready to transfer)class Pipe :
class Anchor(_PipeEndpoint):
def __init__(self, conn, locked=True, role="anchor"):
super().__init__(conn, True, role) # always locked
def unlock(self):
raise PipeEndpointError("Anchor endpoints are always locked")
class Point(_PipeEndpoint):
pass
@staticmethod
def pair(one_way=False):
conn1, conn2 = multiprocessing.Pipe (duplex=not one_way)
if one_way:
# conn1 is recv-only, conn2 is send-only
anchor = Pipe .Anchor(conn2)
point = Pipe .Point(conn1, False, "point")
else:
anchor = Pipe .Anchor(conn1)
point = Pipe .Point(conn2, False, "point")
return anchor, point
ProcessConfig@dataclass
class TimeoutConfig:
prerun : float | None = None
run : float | None = None
postrun : float | None = None
onfinish : float | None = None
result : float | None = None
error : float | None = None
@dataclass
class ProcessConfig:
runs : int | None = None # None = indefinite
join_in : float | None = None # None = no time limit
lives : int = 1 # 1 = no retries
timeouts : TimeoutConfig = field(default_factory=TimeoutConfig)
ProcessTimers class ProcessTimers :
def __init__(self):
# individual section timers (created lazily)
self.prerun : Sktimer | None = None
self.run : Sktimer | None = None
self.postrun : Sktimer | None = None
self.onfinish : Sktimer | None = None
self.result : Sktimer | None = None
self.error : Sktimer | None = None
# aggregate for full iterations
self.full_run: Sktimer = Sktimer( )
def _ensure_timer(self, section):
current = getattr(self, section, None)
if current is None:
new_timer = Sktimer( )
setattr(self, section, new_timer)
return new_timer
return current
def _update_full_run(self):
total = 0.0
for timer in [self.prerun , self.run , self.postrun ]:
if timer and timer .num_times > 0 and timer .most_recent :
total += timer .most_recent
if total > 0:
self.full_run.add_time (total)
autoreconnect () Decorator@autoreconnect does at class definition time_auto_reconnect_enabled = True)The decorator does NOT reconnect anything - it just marks the class so deserialization knows to reconnect.
def autoreconnect (*, start_threads=False, **auth):
def decorator(cls):
# mark class for reconnect on deserialize
cls._auto_reconnect_enabled = True
cls._auto_reconnect_kwargs = dict(auth) if auth else {}
cls._auto_reconnect_start_threads = bool(start_threads)
return cls
return decorator
_auto_reconnect_enabled is Truereconnect_all () which recursively finds Reconnector objectsReconnector calls its reconnect(auth) method to restore the live connectionstart_threads=True, find all Thread objects and start themSkprocess serialized with a database connectioncucumber converts connection to PostgresReconnector (stores connection params)__deserialize__ sees _auto_reconnect_enabledreconnect_all () finds the PostgresReconnectorreconnector.reconnect("password") → returns new live connection# in Skprocess.__deserialize__
if getattr(new_class, '_auto_reconnect_enabled', False):
reconnect_kwargs = getattr(new_class, '_auto_reconnect_kwargs', {})
start_threads = getattr(new_class, '_auto_reconnect_start_threads', False)
obj = reconnect_all (obj, **reconnect_kwargs)
if start_threads:
# recursively find and start Thread objects
_start_threads(obj)
ProcessError (base)
├── PreRunError
├── RunError
├── PostRunError
├── OnFinishError
├── ResultError
├── ErrorHandlerError
├── ProcessTimeoutError
├── ResultTimeoutError
└── DuplicateTimeoutError
class ProcessError (Exception):
def __init__(self, message, current_run=0, original_error=None):
self.current_run = current_run
self.original_error = original_error
super().__init__(message)
class PreRunError (ProcessError ):
def __init__(self, current_run, original_error=None):
super().__init__(
f"Error in __prerun__ on run {current_run}",
current_run,
original_error
)
class ProcessTimeoutError (ProcessError ):
def __init__(self, section, timeout, current_run):
self.section = section
self.timeout = timeout
super().__init__(
f"Timeout in {section} after {timeout}s on run {current_run}",
current_run,
None
)
Skprocess Each runs in its own subprocess, providing process isolation.
multiprocessing.Event for parent→child signalingWithin the subprocess:
threading.Event used for stop signaling (in Pool inline execution)multiprocessing.Event used for cross-process signalingPool thread safety
multiprocessing.Pool which handles worker management internally_active_processes listShare thread safety
Manager.dict(), Manager.Queue() handle inter-process syncReads wait for pending writes to complete before fetching, ensuring you see effects of prior writes from the same logical sequence.
All cross-process communication uses for serialization.
Skprocess - Full state + lifecycle methodsPool - Function/class + each itemShare - Registered objectsPipe - Any object via send()cucumber instead of picklepickle fails on <locals> classes, cucumber reconstructs themReconnector objects that can restore themselvescucumber .serialize ()cucumber .deserialize ()