sk works attaches modifiers and async support to classes and functions without changing how you call them. It also pre-computes _shared_meta for compatibility.
┌──────────────────────────────────────────────────────┐
│ @sk decorator / sk() │
│ input: class or function │
└─────────────────────────────┬────────────────────────┘
▼
┌────────────────────────────┐
│ AST analysis (analyzer.py) │
└──────────────┬─────────────┘
│
┌───────────────────────┼────────────────────────┐
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌─────────────────────────┐
│ attribute reads │ │ attribute writes │ │ blocking detection │
│ (self.x reads) │ │ (self.x writes) │ │ (time.sleep, I/O, etc.) │
└────────┬─────────┘ └────────┬─────────┘ └────────────┬────────────┘
└──────────────────────┴──────────────────────────┘
▼
┌─────────────────────────┐
│ attach sk features │
└────────────┬────────────┘
│
┌───────────────┴────────────────┐
▼ ▼
┌──────────────────────────────────────┐ ┌──────────────────────────────────────┐
│ classes │ │ functions │
│ - _shared_meta │ │ - has_blocking_calls │
│ - _blocking_methods │ │ - blocking_calls │
│ - .asynced() │ │ - .asynced() .retry() .timeout() │
│ - method modifiers │ │ - .background() .rate_limit() │
└───────────────────┬──────────────────┘ └───────────────────┬──────────────────┘
└──────────────────────┬──────────────────┘
▼
┌──────────────────────────────────────────┐
│ return original object (enhanced) │
└──────────────────────────────────────────┘
The sk module detects blocking code to decide whether . and . are allowed.
@blocking decorator - if a function/method has @blocking , it's immediately marked as blocking. AST analysis for blocking calls is skipped (performance optimization).@blocking decorator, parse the source code and look for known blocking patterns.The analyzer maintains a set of known blocking calls:
BLOCKING_CALLS = {
# time module
'time.sleep', 'sleep',
# file I/O
'open', 'read', 'write', 'readline', 'readlines',
# subprocess
'subprocess.run', 'subprocess.call', 'subprocess.check_call',
# requests
'requests.get', 'requests.post', 'requests.put', ...
# database connectors
'sqlite3.connect', 'psycopg2.connect', 'pymysql.connect', ...
# for the whole list, see the blocking calls page
}
The analyzer also recognizes method name patterns that typically indicate blocking:
BLOCKING_METHOD_PATTERNS = {
'sleep', 'wait', 'join',
'recv', 'send', 'accept', 'connect',
'read', 'write', 'fetch', 'fetchone', 'fetchall',
'execute', 'commit', 'rollback',
# for the whole list, see the blocking calls page
}
_BlockingCallVisitor Worksast.Call nodes - for each function/method call in the codetime.sleep, self.db.execute)BLOCKING_CALLSBLOCKING_METHOD_PATTERNSclass _BlockingCallVisitor(ast.NodeVisitor):
def __init__(self):
self.blocking_calls : List[str] = []
def visit_Call(self, node: ast.Call):
call_name = self._get_call_name(node)
if call_name:
# check exact match
if call_name.lower() in BLOCKING_CALLS:
self.blocking_calls .append(call_name)
# check method pattern
elif call_name.split('.')[-1] in BLOCKING_METHOD_PATTERNS:
self.blocking_calls .append(call_name)
self.generic_visit(node)
_shared_meta Generation_shared_meta tells which attributes each method reads and writes. This enables efficient synchronization.
_AttributeVisitor Worksself.something accessast.Store context → write (self.x = 1)ast.Load context → read (y = self.x)self.x += 1 is both read and writeclass _AttributeVisitor(ast.NodeVisitor):
def __init__(self):
self.reads: Set[str] = set()
self.writes: Set[str] = set()
def visit_Attribute(self, node: ast.Attribute):
if isinstance(node.value, ast.Name) and node.value.id == 'self':
attr_name = node.attr
if isinstance(node.ctx, ast.Store):
self.writes.add(attr_name)
elif isinstance(node.ctx, ast.Load):
self.reads.add(attr_name)
self.generic_visit(node)
def visit_AugAssign(self, node: ast.AugAssign):
# self.x += 1 is both read and write
if isinstance(node.target, ast.Attribute):
if isinstance(node.target.value, ast.Name) and node.target.value.id == 'self':
attr_name = node.target.attr
self.reads.add(attr_name)
self.writes.add(attr_name)
self.visit(node.value)
_shared_meta Structure_shared_meta = {
'methods': {
'increment': {'writes': ['counter']},
'reset': {'writes': ['counter', 'history']},
'get_value': {'writes': []},
},
'properties': {
'value': {'reads': ['counter']},
'is_empty': {'reads': ['counter']},
},
}
sk on FunctionsWhen you apply to a function (as a decorator or function call):
# as decorator
@sk
def slow_fetch(url):
return requests.get(url).text
# or as function call
def slow_fetch(url):
return requests.get(url).text
slow_fetch = sk (slow_fetch)
@blocking or analyze ASTfunc.has_blocking_calls - boolfunc.blocking_calls - list of detected callsSkfunction for chainingdef sk (func):
# detect blocking calls
blocking_calls = detect_blocking(func)
# attach attributes
func.has_blocking_calls = len(blocking_calls) > 0
func.blocking_calls = blocking_calls
# attach modifier methods
func.asynced = lambda: Skfunction(func).asynced ()
func.retry = lambda *args, **kwargs: Skfunction(func).retry (*args, **kwargs)
func.timeout = lambda seconds: Skfunction(func).timeout (seconds)
func.background = lambda: Skfunction(func).background ()
func.rate_limit = lambda per_second: Skfunction(func).rate_limit(per_second)
return func # return original function
When you call a modifier, it creates an Skfunction wrapper:
slow_fetch.retry (3).timeout (5.0)("https://example.com")
slow_fetch.retry (3) → creates Skfunction with retry config.timeout (5.0) → returns new Skfunction with both retry and timeout("https://example.com") → executes with both modifiers appliedsk on ClassesWhen you apply to a class (as a decorator or function call):
# as decorator
@sk
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
# or as function call
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
Counter = sk (Counter)
_shared_meta and detect blocking callscls._shared_meta - for Share compatibilitycls._blocking_methods - dict of method → blocking callscls.has_blocking_calls - boolcls.asynced () - static method returning async classdef sk (cls):
# analyze class
shared_meta, blocking_methods = analyze_class(cls)
# attach metadata
cls._shared_meta = shared_meta
cls._blocking_methods = blocking_methods
cls.has_blocking_calls = len(blocking_methods) > 0
# attach asynced() static method
def asynced ():
if not blocking_methods:
raise SkModifierError (f"{cls.__name__} has no blocking calls")
return create_async_class(cls, blocking_methods)
cls.asynced = staticmethod(asynced)
# wrap methods with _ModifiableMethod descriptors
for name, member in cls.__dict__.items():
if is_regular_method(member):
setattr(cls, name, _ModifiableMethod(member))
return cls # return original class
Each method is wrapped with _ModifiableMethod, a descriptor that provides modifiers:
class _ModifiableMethod:
def __init__(self, sync_method, ...):
self._sync_method = sync_method
def __get__(self, obj, objtype=None):
if obj is None:
return self
return _ModifiableBoundMethod(obj, self._sync_method, ...)
When you access a method on an instance, you get a _ModifiableBoundMethod that supports:
counter.increment() # direct call
counter.increment.asynced()() # async version using asyncio.to_thread()
counter.increment.retry(3)() # with retry
counter.increment.timeout(5.0)() # with timeout
counter.increment.background()() # returns Future
counter.increment.rate_limit(2.0)() # rate limited
When you call MyClass., it creates a new class with blocking methods wrapped.
create_async_class Works_Async{ClassName}asyncio.to_thread() wrapper_shared_meta for Sharedef create_async_class(cls, blocking_methods):
new_methods = {}
for name in dir(cls):
method = getattr(cls, name)
if name in blocking_methods:
# wrap with to_thread
async def async_wrapper(self, *args, _method=method, **kwargs):
return await asyncio.to_thread(_method, self, *args, **kwargs)
new_methods[name] = async_wrapper
else:
# keep original
new_methods[name] = method
# create new class
async_cls = type(f'_Async{cls.__name__}', (cls,), new_methods)
async_cls._shared_meta = cls._shared_meta
return async_cls
Modifiers are always applied in a consistent order, regardless of how you chain them.
This means these are equivalent:
fn.retry (3).timeout (5.0)(...)
fn.timeout (5.0).retry (3)(...)
Both will
Making this consistent means you don't have to worry about the exact order every time.
Skfunction InternalsSkfunction is the internal wrapper that holds modifier configuration and applies it when you call the function.
When you chain modifiers, each one returns a new Skfunction with updated config:
.retry (3) on an Skfunctionretry: {times: 3, ...} added to _config.timeout (5.0) on that copytimeout: {seconds: 5.0} addedSkfunction has both retry and timeout in its _config dictThis copy-on-modify pattern is why chaining order doesn't matter - each modifier just adds its config, and execution order is determined at call time.
class Skfunction:
def __init__(self, func, *, _config=None, _blocking_calls=None):
self._func = func
self._config = _config or {}
self._blocking_calls = _blocking_calls
def _copy_with(self, **config_updates):
new_config = {**self._config, **config_updates}
return Skfunction(self._func, _config=new_config, _blocking_calls=self._blocking_calls)
def retry (self, times=3, delay=1.0, backoff_factor=1.0, exceptions=(Exception,)):
return self._copy_with(retry={'times': times, 'delay': delay, 'backoff_factor': backoff_factor, 'exceptions': exceptions})
def timeout (self, seconds):
return self._copy_with(timeout={'seconds': seconds})
When you finally call the Skfunction (e.g., fn.):
_configThreadPoolExecutor with a timeout on future.result ()times attemptsdelay seconds (multiplied by backoff_factor each time)def __call__(self, *args, **kwargs):
func = self._func
retry_config = self._config.get('retry')
timeout_config = self._config.get('timeout')
rate_limit_config = self._config.get('rate_limit')
# step 2: check rate limit first
if rate_limit_config:
rate_limit_config['limiter'].acquire()
# step 3: build the execution function
def execute_once():
if timeout_config:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs)
try:
return future.result (timeout=timeout_config['seconds'])
except TimeoutError:
raise FunctionTimeoutError (f"{func.__name__} timed out")
else:
return func(*args, **kwargs)
# step 4: apply retry logic
if retry_config:
sleep_time = retry_config['delay']
for attempt in range(retry_config['times']):
try:
return execute_once()
except retry_config['exceptions'] as e:
if attempt < retry_config['times'] - 1:
time.sleep(sleep_time)
sleep_time *= retry_config['backoff_factor']
else:
raise
else:
return execute_once()
Timeouts use a ThreadPoolExecutor with a single worker:
future.result (timeout=seconds) which blocks up to the timeoutFunctionTimeoutError This approach can interrupt blocking I/O because the main thread stops waiting, even if the worker thread continues.
AsyncSkfunction InternalsReturned by Skfunction. for async execution.
The async version follows the same modifier pattern, but:
await limiter.acquire_async() instead of blockingasyncio.wait_for() instead of ThreadPoolExecutorasyncio.to_thread() to run the sync function without blocking the event loopawait asyncio.sleep() instead of time.sleep()asyncio.to_thread()asyncio.wait_for()await for the execution and asyncio.sleep() for delaysasync def __call__(self, *args, **kwargs):
retry_config = self._config.get('retry')
timeout_config = self._config.get('timeout')
rate_limit_config = self._config.get('rate_limit')
# step 1: check rate limit
if rate_limit_config:
await rate_limit_config['limiter'].acquire_async()
# step 2: build async execution
async def execute_once():
if timeout_config:
try:
return await asyncio.wait_for(
asyncio.to_thread(self._func, *args, **kwargs),
timeout=timeout_config['seconds'],
)
except asyncio.TimeoutError:
raise FunctionTimeoutError (f"{self._func.__name__} timed out")
else:
return await asyncio.to_thread(self._func, *args, **kwargs)
# step 3: apply retry logic
if retry_config:
sleep_time = retry_config['delay']
for attempt in range(retry_config['times']):
try:
return await execute_once()
except retry_config['exceptions'] as e:
if attempt < retry_config['times'] - 1:
await asyncio.sleep(sleep_time)
sleep_time *= retry_config['backoff_factor']
else:
raise
else:
return await execute_once()
The rate limiter ensures a maximum number of calls per second using a simple interval-based approach.
acquire(): Uses time.sleep() and a threading lockacquire_async(): Uses await asyncio.sleep() (no lock needed in async context)class RateLimiter:
def __init__(self, per_second: float):
self._per_second = per_second
self._interval = 1.0 / per_second # minimum time between calls
self._last_call = 0.0
self._lock = threading.Lock()
def acquire(self):
with self._lock:
now = time.monotonic()
wait_time = self._interval - (now - self._last_call)
if wait_time > 0:
time.sleep(wait_time)
self._last_call = time.monotonic()
async def acquire_async(self):
now = time.monotonic()
wait_time = self._interval - (now - self._last_call)
if wait_time > 0:
await asyncio.sleep(wait_time)
self._last_call = time.monotonic()
When receives an -decorated class, it can use _shared_meta for efficient synchronization.
share.counter = Counter() - share sees Counter._shared_metashare.counter.increment() is called:_shared_meta['methods']['increment']['writes']counter.valueshare.counter.value:_shared_meta['properties']['value']['reads']counter.valueThis ensures reads see the effects of prior writes.