sk .retry() , .timeout() , .background() , .rate_limit() , and .asynced() to any function or classfn.retry(3).timeout(5.0) and fn.timeout(5.0).retry(3) do the same thingShare metadata generation - The kicker. Makes your classes work efficiently inside Share You probably already use something for retry logic. Maybe tenacity. Maybe a hand-rolled decorator.
from tenacity import retry, stop_after_attempt, wait_exponential
@retry (stop=stop_after_attempt(3), wait=wait_exponential())
def fetch_data(url):
return requests.get(url).json()
This works. But now your function always retries. Every call. Every time.
What if you want to retry in production but not in tests? What if one call site needs a timeout but another doesn't? What if you want to run it in the background just this once?
You end up with multiple wrapped versions of the same function, or you start passing flags and config around.
sk — modify at the call site, not the definition takes a different approach. You define your function once, cleanly. Then you decide how to call it each time.
from suitkaise import sk
@sk
def fetch_data(url):
return requests.get(url).json()
The function works exactly like before:
data = fetch_data("https://api.example.com")
But now you have modifiers available at every call site:
# retry 3 times with exponential backoff
data = fetch_data.retry (times=3, delay=1.0, backoff_factor=2.0)("https://api.example.com")
# timeout after 5 seconds
data = fetch_data.timeout (5.0)("https://api.example.com")
# run in background, get a Future
future = fetch_data.background ()("https://api.example.com")
result = future.result ()
# rate limit to 2 calls per second
data = fetch_data.rate_limit(2.0)("https://api.example.com")
# make it async
data = await fetch_data.asynced ()("https://api.example.com")
The function definition stays clean. The call site says exactly what's happening. No wrapper functions, no config objects, no multiple versions.
Modifiers can be chained in any order:
# retry 3 times, with a 5-second timeout per attempt
data = fetch_data.retry (3).timeout (5.0)("https://api.example.com")
# same thing, different order — identical behavior
data = fetch_data.timeout (5.0).retry (3)("https://api.example.com")
The execution order is always consistent regardless of how you chain them:
This means you don't have to think about ordering. Just add what you need.
# all five modifiers, chained
result = await (
fetch_data.asynced ()
.retry (times=3, delay=0.5)
.timeout (10.0)
.rate_limit(5.0)
)("https://api.example.com")
They do! But it's actually really simple.
This is intentional. The actual function arguments are always at the end of the chain:
fetch_data.retry (3).timeout (5.0)("https://api.example.com")
# ^^^^^^^^ ^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^
# modifier modifier actual function args
You might notice the pattern: fn.modifier()("args"). The first call sets up the modifier. The second call runs the function.
Once you see it, it's easy to read: everything before the last parentheses is configuration, the last parentheses are the call.
But now when reviewing code, you can quickly see how it is being modified without sifting through 5 extra args in the main function call.
isn't just for functions. Put it on a class and every method gets modifiers:
@sk
class DataProcessor:
def __init__(self, config):
self.config = config
self.results = []
def process(self, data):
return transform(data)
def save(self, path):
with open(path, 'w') as f:
f.write(json.dumps(self.results))
processor = DataProcessor(config)
# normal call
processor.process(data)
# with timeout
processor.save.timeout (10.0)("output.json")
# with retry
processor.process.retry( 3)(data)
# in background
future = processor.save.background ()("output.json")
You can even get an async version of the entire class:
AsyncProcessor = DataProcessor.asynced ()
processor = AsyncProcessor(config)
# all blocking methods are now async
await processor.process(data)
await processor.save("output.json")
uses AST analysis to inspect your function's source code and detect blocking patterns — time.sleep(), requests.get(), file I/O, database calls, subprocess calls, and many more.
@sk
def slow_fetch(url):
return requests.get(url).text
slow_fetch.has_blocking_calls # True
slow_fetch.blocking_calls # ['requests.get']
This detection controls which modifiers are available. and are only allowed on functions that actually block — preventing you from wrapping pure CPU code in asyncio.to_thread() where it wouldn't help.
If the AST can't detect your blocking code (C extensions, custom blocking functions, tight CPU loops), use to explicitly mark it:
@sk
@blocking
def heavy_computation():
return sum(x**2 for x in range(10_000_000))
# now .asynced() and .background() are available
result = await heavy_computation.asynced ()()
_shared_metaThis is what makes essential to the ecosystem.
When you put on a class, it analyzes every method's AST to figure out which instance attributes each method reads and writes. It stores this as _shared_meta:
@sk
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
print(Counter._shared_meta)
# {
# 'methods': {
# 'increment': {'reads': ['value'], 'writes': ['value']}
# },
# 'properties': {}
# }
Why does this matter? Because uses _shared_meta to know exactly which attributes to sync after each method call.
Without _shared_meta, would have to sync everything after every operation — slow and wasteful.
With _shared_meta, only syncs the attributes that actually changed. This is what makes practical at scale: the overhead is proportional to what you actually touch, not to the total size of the shared object.
from suitkaise .processing import Share
@sk
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
share = Share( )
share.counter = Counter()
# works across processes — Share knows to sync only 'value' after increment()
share.counter.increment()
If you're using with custom classes, is what makes it efficient. Without it, still works, but you lose time every time needs to calculate _shared_meta for each object of that class.
tenacityTenacity is a great retry library with more retry strategies and conditions than .
But tenacity bakes retry config into the function definition:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry (stop=stop_after_attempt(3), wait=wait_exponential())
def fetch_data(url):
return requests.get(url).json()
# every call retries. always. even in tests.
# want a timeout too? add another library or wrap it yourself.
With , you decide per call site:
from suitkaise import sk
@sk
def fetch_data(url):
return requests.get(url).json()
# production: retry with timeout
data = fetch_data.retry (3).timeout (5.0)("https://api.example.com")
# tests: no retry, no timeout — just a normal call
data = fetch_data("https://api.example.com")
# one-off background fetch
future = fetch_data.background ()("https://api.example.com")
Tenacity only does retry. gives you retry + timeout + background + rate_limit + async in one decorator, and lets you choose per call site.
asyncio.to_threadWhat uses under the hood. wraps it in a consistent API and prevents you from using it on non-blocking code.
concurrent.futuresWhat uses under the hood. wraps it in the same chaining API as everything else.
You could absolutely implement retry + timeout + background manually. The value of is that all five modifiers share a consistent interface, chain naturally, and — most importantly — generate _shared_meta for compatibility, which you would never build yourself.
suitkaise processing — Pool methods use sk modifiers. Pool.map.timeout(20).asynced() works because of sk .Share — _shared_meta from sk is what makes Share efficient with custom classes.circuits — Circuit .short () has .asynced() because circuits uses sk internally.timing — timing .sleep has .asynced() via sk .paths — @autopath () can be combined with @sk on the same function. is the glue. All modules use it internally when applicable, and now your own code benefits from the same modifier system.