processing Examples

Skprocess
Pool
Share
Pipe
Skprocess.tell() and Skprocess.listen()
autoreconnect

Full-on distributed task queue

Goal: Build a production-ready task processing system that can handle thousands of jobs with automatic retries, failure tracking, and performance monitoring.

Say you have a batch of data transformation tasks (processing uploaded files, running ML inference on images, generating reports) that need to run in parallel with:

What this script does

  1. Takes a list of 50 tasks, each with an ID and data payload
  2. Distributes them across 4 parallel workers
  3. Each task computes a cryptographic hash chain (this represents real CPU work)
  4. Some tasks fail deterministically based on content
  5. Failed tasks are automatically retried up to 3 times
  6. All timing and success/failure stats are aggregated across workers
  7. Prints a summary report with task statistics and performance metrics
"""
A complete example of a distributed task queue using processing.

Features used:
- Pool for parallel worker management
- Share for tracking global state across processes
- Skprocess for structured task execution with lifecycle hooks
- Timing for performance metrics collection
- lives for automatic retry on failure
"""

from suitkaise.processing import Pool, Share, Skprocess
from suitkaise.timing import Sktimer
from suitkaise import timing
import hashlib


class TaskStats:
    """
    Tracks statistics across all workers.
    
    This class will be auto-wrapped by Share with Skclass.
    """
    
    def __init__(self):
        self.total_tasks = 0
        self.completed = 0
        self.failed = 0
        self.retried = 0
    
    def record_complete(self):
        self.completed += 1
        self.total_tasks += 1
    
    def record_fail(self):
        self.failed += 1
        self.total_tasks += 1
    
    def record_retry(self):
        self.retried += 1


class TaskWorker(Skprocess):
    """
    A worker that processes a single task.
    
    Features:
    - Deterministic failure based on task content
    - Retry support via lives
    - Timing recorded to shared timer
    - Stats recorded to shared stats object
    """
    
    def __init__(self, shared: Share, task: dict):
        # store references
        self.shared = shared
        self.task = task
        
        # configure process
        self.process_config.runs = 1      # one run per task
        self.process_config.lives = 3     # retry up to 2 times
        self.process_config.timeouts.run = 5.0  # 5 second timeout
        
        # result storage
        self.result_data = None
        self.attempts = 0
    
    def __prerun__(self):
        # track retry attempts
        self.attempts += 1
        if self.attempts > 1:
            # this is a retry
            self.shared.stats.record_retry()
    
    def __run__(self):
        # record timing
        start = timing.time()
        
        try:
            # real work - compute hash chain with deterministic iterations
            iterations = 500 + (self.task['id'] * 37 % 1500)
            data = self.task['data'].encode()
            for _ in range(iterations):
                data = hashlib.sha256(data).digest()
            
            # deterministic failure based on task content
            checksum = hashlib.sha256(self.task['data'].encode()).digest()
            if checksum[0] % 10 == 0:
                raise RuntimeError(f"Task {self.task['id']} failed (content check)")
            
            # process the task
            self.result_data = {
                'task_id': self.task['id'],
                'input': self.task['data'],
                'output': hashlib.sha256(data).hexdigest()[:32],
                'iterations': iterations,
                'attempts': self.attempts,
                'status': 'success'
            }
            
            # record success
            self.shared.stats.record_complete()
            
        finally:
            # always record timing
            elapsed = timing.elapsed(start)
            self.shared.timer.add_time(elapsed)
    
    def __error__(self):
        # all retries exhausted
        self.shared.stats.record_fail()
        
        return {
            'task_id': self.task['id'],
            'input': self.task['data'],
            'output': None,
            'attempts': self.attempts,
            'status': 'failed',
            'error': str(self.error)
        }
    
    def __result__(self):
        return self.result_data


def run_task_queue(tasks: list[dict], workers: int = 4):
    """
    Process a list of tasks using a distributed worker pool.
    
    Args:
        tasks: List of task dicts with 'id' and 'data' keys
        workers: Number of parallel workers
    Returns:
        Dict with results and statistics
    """
    
    # set up shared state
    with Share() as share:
        share.stats = TaskStats()
        share.timer = Sktimer()
        
        # create argument tuples for star()
        args = [(share, task) for task in tasks]
        
        # process all tasks in parallel
        with Pool(workers=workers) as pool:
            results = pool.star().map(TaskWorker, args)
        
        # collect statistics
        stats = {
            'total_tasks': share.stats.total_tasks,
            'completed': share.stats.completed,
            'failed': share.stats.failed,
            'retried': share.stats.retried,
            'timing': {
                'total': share.timer.total_time,
                'mean': share.timer.mean,
                'min': share.timer.min,
                'max': share.timer.max,
                'p95': share.timer.percentile(95),
            }
        }
    
    return {
        'results': results,
        'stats': stats
    }


# example usage
if __name__ == "__main__":
    # create tasks
    tasks = [
        {'id': i, 'data': f'task_data_{i}'}
        for i in range(50)
    ]
    print(f"Processing {len(tasks)} tasks...")

    start = timing.time()
    
    # run the queue
    output = run_task_queue(tasks, workers=4)
    
    elapsed = timing.elapsed(start)
    
    # print results
    print(f"\n{'='*50}")
    print(f"TASK QUEUE RESULTS")
    print(f"{'='*50}")
    print(f"Total time: {elapsed:.2f}s")
    print(f"\nTask Statistics:")
    print(f"  Total processed: {output['stats']['total_tasks']}")
    print(f"  Completed:       {output['stats']['completed']}")
    print(f"  Failed:          {output['stats']['failed']}")
    print(f"  Retried:         {output['stats']['retried']}")
    print(f"\nTiming Statistics:")
    print(f"  Total work time: {output['stats']['timing']['total']:.2f}s")
    print(f"  Mean per task:   {output['stats']['timing']['mean']:.4f}s")
    print(f"  Min:             {output['stats']['timing']['min']:.4f}s")
    print(f"  Max:             {output['stats']['timing']['max']:.4f}s")
    print(f"  P95:             {output['stats']['timing']['p95']:.4f}s")
    
    # show sample results
    print(f"\nSample Results:")
    for r in output['results'][:5]:
        status = r['status']
        attempts = r['attempts']
        print(f"  Task {r['task_id']}: {status} (attempts: {attempts})")

Full-on data streaming pipeline

Goal: Build a streaming data processor that can handle a continuous flow of incoming data items, distribute them across multiple workers, and collect results in real-time.

Say you're building a system that processes a stream of events (log entries, sensor readings, user actions, webhook payloads) where:

What this script does

  1. Starts 3 worker processes that run indefinitely
  2. Generates a stream of 100 data items ("item_0", "item_1", etc.)
  3. Distributes items to workers in round-robin fashion via tell()
  4. Each worker computes a hash transformation on received items
  5. Workers report their status periodically via tell() back to parent
  6. Results accumulate in shared state accessible from all processes
  7. After stream ends, sends stop signal to all workers
  8. Collects final statistics and prints summary
"""
A real-time data pipeline using processing.

Features used:
- Indefinite process with stop signal (runs=None)
- tell/listen for real-time bidirectional communication
- Share for accumulating results across processes
- Graceful shutdown with __onfinish__
"""

from suitkaise.processing import Skprocess, Share
from suitkaise.timing import Sktimer, TimeThis
from suitkaise import timing
import hashlib


class Results:
    """Accumulates processed data."""
    def __init__(self):
        self.items = []
        self.count = 0
    
    def add(self, item):
        self.items.append(item)
        self.count += 1


class DataPipelineWorker(Skprocess):
    """
    A worker that processes streaming data.
    
    - Runs indefinitely until parent sends stop
    - Receives data items via listen()
    - Processes and stores results in Share
    - Sends status updates via tell()
    """
    
    def __init__(self, shared: Share, worker_id: int):
        self.shared = shared
        self.worker_id = worker_id
        
        # run indefinitely
        self.process_config.runs = None
        
        self.processed = 0
    
    def __prerun__(self):
        # check for stop signal or data
        msg = self.listen(timeout=0.1)
        
        if msg is not None:
            if msg.get('action') == 'stop':
                # graceful shutdown
                self.stop()
            elif msg.get('action') == 'data':
                # store data for processing
                self._pending_data = msg['payload']
            else:
                self._pending_data = None
        else:
            self._pending_data = None
    
    def __run__(self):
        if self._pending_data is None:
            # no data to process
            return
        
        # process the data - real work
        with TimeThis() as run_timer:
            data = self._pending_data
            
            # transform the data - compute hash and transform
            if isinstance(data, str):
                data_bytes = data.encode()
                # compute hash chain
                for _ in range(500):
                    data_bytes = hashlib.sha256(data_bytes).digest()
                output = hashlib.sha256(data_bytes).hexdigest()[:16]
            else:
                output = data * 2
            
            result = {
                'worker': self.worker_id,
                'input': data,
                'output': output,
                'timestamp': timing.time()
            }
        
        
        # store result in shared state
        self.shared.results.add(result)
        self.shared.timer.add_time(run_timer.most_recent)
        
        self.processed += 1
        self._pending_data = None
    
    def __postrun__(self):
        # send periodic status updates
        if self.processed > 0 and self.processed % 10 == 0:
            self.tell({
                'worker': self.worker_id,
                'processed': self.processed,
                'status': 'running'
            })
    
    def __onfinish__(self):
        # send final status
        self.tell({
            'worker': self.worker_id,
            'processed': self.processed,
            'status': 'finished'
        })
    
    def __result__(self):
        return {
            'worker_id': self.worker_id,
            'total_processed': self.processed
        }

def run_pipeline(data_stream, num_workers: int = 2, timeout: float = 5.0):
    """
    Run a data pipeline with multiple workers.
    
    Args:
        data_stream: Iterator of data items to process
        num_workers: Number of parallel workers
        timeout: Maximum time to run
    
    Returns:
        Dict with results and worker stats
    """
    
    with Share() as share:
        share.results = Results()
        share.timer = Sktimer()
        
        # start workers
        workers = []
        for i in range(num_workers):
            worker = DataPipelineWorker(share, worker_id=i)
            worker.start()
            workers.append(worker)
        
        # distribute data to workers
        start_time = timing.time()
        worker_idx = 0
        
        for item in data_stream:
            # check timeout
            if timing.elapsed(start_time) > timeout:
                break
            
            # round-robin to workers (compute checksum in parent)
            import hashlib
            checksum = hashlib.sha256(str(item).encode()).hexdigest()[:8]
            workers[worker_idx].tell({
                'action': 'data',
                'payload': item,
                'checksum': checksum,
            })
            worker_idx = (worker_idx + 1) % num_workers
        
        # signal workers to stop
        for worker in workers:
            worker.tell({'action': 'stop'})
        
        # collect status messages
        statuses = []
        for worker in workers:
            while True:
                msg = worker.listen(timeout=0.5)
                if msg is None:
                    break
                statuses.append(msg)
        
        # wait for all workers
        for worker in workers:
            worker.wait()
        
        # collect results
        worker_results = [worker.result() for worker in workers]
        
        return {
            'results': share.results.items,
            'count': share.results.count,
            'worker_stats': worker_results,
            'timing': {
                'total': share.timer.total_time,
                'mean': share.timer.mean if share.timer.num_times > 0 else 0,
            },
            'statuses': statuses
        }


# example usage
if __name__ == "__main__":
    # create a data stream
    def generate_data(n):
        for i in range(n):
            yield f"item_{i}"
    
    print("Starting pipeline...")
    output = run_pipeline(generate_data(100), num_workers=3, timeout=10.0)
    
    print(f"\nPipeline Results:")
    print(f"  Total processed: {output['count']}")
    print(f"  Total time: {output['timing']['total']:.2f}s")
    print(f"  Mean per item: {output['timing']['mean']:.4f}s")
    
    print(f"\nWorker Stats:")
    for ws in output['worker_stats']:
        print(f"  Worker {ws['worker_id']}: {ws['total_processed']} items")
    
    print(f"\nSample Results:")
    for r in output['results'][:5]:
        print(f"  {r['input']} -> {r['output']} (worker {r['worker']})")