circuits actually works is 2 circuit breaker classes, that help you manage failures in your code.
Circuit - auto-resetting circuit that sleeps and continuesBreakingCircuit - stays broken until manually resetBoth classes have:
Circuit is an auto-resetting circuit that sleeps, and then automatically resets to continue.
When the short counter reaches the num_shorts_to_trip threshold, the circuit trips.
┌─────────┐ ┌───────────────────┐ ┌──────────────────────────────────┐
│ short() │───▶│ increment counter │───▶│ shorts == num_shorts_to_trip ? │
└─────────┘ └───────────────────┘ └───────────────┬──────────────────┘
│
┌─────────────────────┴────────────────────┐
│ │
yes│ │no
▼ ▼
┌──────────┐ ┌──────────┐
│ trip │ │ continue │
└────┬─────┘ └──────────┘
│
▼
┌────────────────┐
│ sleep + reset │
└────────────────┘
uses a threading.RLock to ensure that internal state is thread-safe.
_num_shorts_to_trip: int Number of shorts required before the circuit trips. Set at initialization.
_times_shorted: int Counter tracking shorts since the last trip. Resets to 0 after each trip.
_total_trips: int Lifetime count of all trips. Never resets.
_current_sleep_time: float Current sleep duration after backoff is applied. Starts at sleep_time_after_trip and grows with each trip.
_lock: threading.RLock Reentrant lock for thread-safe state access.
num_shorts_to_trip: Number of shorts required before the circuit trips.
inttimes_shorted: Counter tracking shorts since the last trip. Resets to 0 after each trip.
inttotal_trips: Lifetime count of all trips. Never resets.
intcurrent_sleep_time: Current sleep duration after backoff is applied. Starts at sleep_time_after_trip and grows with each trip.
floatshort (custom_sleep: float | None = None) -> boolIncrements _times_shorted by 1, calling _trip_circuit() if the num_shorts_to_trip threshold is reached.
If custom_sleep is provided, it will be used instead of _current_sleep_time.
self._lock_times_shorted by 1_times_shorted >= num_shorts_to_tripself._lock_times_shorted >= num_shorts_to_trip, calls _trip_circuit():self._locksleep_duration (custom_sleep or _current_sleep_time)_total_trips by 1_times_shorted to 0_current_sleep_time (if backoff_factor != 1.0)self._locksleep_durationsleep_durationTrue if slept, False otherwisetrip (custom_sleep: float | None = None) -> bool immediately triggers the circuit, bypassing the short counter.
If custom_sleep is provided, it will be used instead of _current_sleep_time.
_trip_circuit():self._locksleep_duration (custom_sleep or _current_sleep_time)_total_trips by 1_times_shorted to 0_current_sleep_time (if backoff_factor != 1.0)self._locksleep_durationsleep_durationTrue (always sleeps)reset_backoff () -> NoneRestores the original sleep time to sleep_time_after_trip.
self._lock_current_sleep_time to sleep_time_after_tripself._lockNone supports exponential backoff to progressively increase sleep time after repeated trips.
circ = Circuit(
num_shorts_to_trip=5,
sleep_time_after_trip=1.0, # initial sleep time == 1.0s
backoff_factor=2.0, # double it each time it is tripped
max_sleep_time=30.0 # sleep time is capped at 30s
)
With the above parameters:
Formula for calculating the next sleep time:
_current_sleep_time = min(
_current_sleep_time * backoff_factor,
max_sleep_time
)
Jitter adds randomness to sleep durations.
When multiple processes trip their circuits at the same time, they all wake up at the same time, which puts pressure on the system. Jitter spreads out the wake-up times to prevent this.
The jitter parameter is a decimal (0.2), not a percentage (20).
Values are clamped to [0.0, 1.0].
For a jitter of 0.2 and a sleep_duration of 1.0, the range is [0.8, 1.2].
All state access is protected by a reentrant lock (threading.RLock).
A reentrant lock is needed because _trip_circuit() may be called from and and both need lock access.
The sleep operation itself happens outside the lock to avoid blocking other threads during the sleep.
All public properties acquire the lock for reads.
@property
def times_shorted (self) -> int:
with self._lock:
return self._times_shorted
This ensures that all reads are consistent and thread-safe.
supports async usage via the _AsyncableMethod pattern from .
_AsyncableMethod wraps a sync method and an async method into a single attribute that can be called either way.
short = _AsyncableMethod(_sync_short, _async_short)
Usage:
# sync usage
circ.short()
# async usage
await circ.short .asynced ()()
The async versions use asyncio.sleep() instead of blocking time.sleep():
async def _async_trip_circuit(self, custom_sleep: float | None = None) -> bool:
with self._lock:
sleep_duration = custom_sleep if custom_sleep is not None else self._current_sleep_time
self._total_trips += 1
self._times_shorted = 0
if self.backoff_factor != 1.0:
self._current_sleep_time = min(
self._current_sleep_time * self.backoff_factor,
self.max_sleep_time
)
sleep_duration = self._apply_jitter(sleep_duration)
if sleep_duration > 0:
await asyncio.sleep(sleep_duration)
return True
The lock usage is the same - only the sleep call differs.
Methods with async support:
short () - via short .asynced ()()trip () - via trip .asynced ()()Methods like and properties do not need async versions because they don't sleep.
includes _shared_meta for integration with .
_shared_meta is a dictionary that declares which attributes each method/property reads from or writes to. The class uses this metadata to synchronize state across processes.
_shared_meta = {
'methods': {
'short': {'writes': ['_times_shorted', '_total_trips', '_current_sleep_time']},
'trip': {'writes': ['_times_shorted', '_total_trips', '_current_sleep_time']},
'reset_backoff': {'writes': ['_current_sleep_time']},
},
'properties': {
'times_shorted': {'reads': ['_times_shorted']},
'total_trips': {'reads': ['_total_trips']},
'current_sleep_time': {'reads': ['_current_sleep_time']},
}
}
This allows a instance to wrap a circuit and automatically synchronize state across multiple processes.
uses for blocking sleeps:
from suitkaise .timing import api as timing
# in _trip_circuit():
timing .sleep(sleep_duration)
This uses the timing module's sleep implementation, which provides consistent behavior across different environments.
BreakingCircuit Breaking circuit that stops when the failure threshold is reached.
Unlike , it stays broken until you manually reset it. Use this for stopping after a threshold is reached and deciding what to do next.
When the short counter reaches the num_shorts_to_trip threshold, the circuit breaks.
┌─────────┐ ┌───────────────────┐ ┌──────────────────────────────────┐
│ short() │───▶│ increment counter │───▶│ shorts >= num_shorts_to_trip ? │
└─────────┘ └───────────────────┘ └───────────────┬──────────────────┘
│
┌─────────────────────┴────────────────────┐
│ │
yes│ │no
▼ ▼
┌──────────┐ ┌──────────┐
│ break │ │ continue │
└────┬─────┘ └──────────┘
│
▼
┌───────────────────────┐
│ sleep (stay broken) │
└───────────────────────┘
uses a threading.RLock to ensure that internal state is thread-safe.
_num_shorts_to_trip: int Number of shorts required before the circuit breaks. Set at initialization.
_times_shorted: int Counter tracking shorts since the last trip/reset. Resets to 0 after each trip or reset.
_total_trips: int Lifetime count of all trips. Incremented on every call, never resets.
_current_sleep_time: float Current sleep duration after backoff is applied. Starts at sleep_time_after_trip and grows with each .
_broken: bool Whether the circuit is currently broken. Set to True on trip, cleared by .
_lock: threading.RLock Reentrant lock for thread-safe state access.
num_shorts_to_trip: Number of shorts required before the circuit breaks.
intbroken: Whether the circuit is currently broken.
booltimes_shorted: Counter tracking shorts since the last trip/reset. Resets to 0 after each trip or reset.
inttotal_trips: Lifetime count of all trips. Never resets.
intcurrent_sleep_time: Current sleep duration after backoff is applied. Starts at sleep_time_after_trip and grows with each .
floatshort (custom_sleep: float | None = None) -> NoneIncrements _times_shorted and _total_trips by 1, calling _break_circuit() if the num_shorts_to_trip threshold is reached.
If custom_sleep is provided, it will be used instead of _current_sleep_time.
sleep_duration (custom_sleep or _current_sleep_time)self._lock_times_shorted by 1_total_trips by 1_times_shorted >= num_shorts_to_tripself._lock_times_shorted >= num_shorts_to_trip, calls _break_circuit():self._lock_broken to True_times_shorted to 0self._locksleep_durationsleep_durationNoneNote: Unlike , increments _total_trips on every call, not just when the circuit trips.
trip (custom_sleep: float | None = None) -> None immediately breaks the circuit, bypassing the short counter.
If custom_sleep is provided, it will be used instead of _current_sleep_time.
self._lock_total_trips by 1self._locksleep_duration (custom_sleep or _current_sleep_time)_break_circuit(sleep_duration):self._lock_broken to True_times_shorted to 0self._locksleep_durationsleep_durationNonereset () -> NoneResets the circuit to operational state and applies exponential backoff.
self._lock_broken to False_times_shorted to 0backoff_factor != 1.0:_current_sleep_time to min(_current_sleep_time * backoff_factor, max_sleep_time)self._lockNoneNote: Unlike which applies backoff on trip, applies backoff on . This means the next trip will use the increased sleep time.
reset_backoff () -> NoneRestores the original sleep time to sleep_time_after_trip.
self._lock_current_sleep_time to sleep_time_after_tripself._lockNoneNote: Does NOT reset the broken state - use for that.
supports exponential backoff to progressively increase sleep time after repeated resets.
circ = BreakingCircuit(
num_shorts_to_trip=5,
sleep_time_after_trip=1.0, # initial sleep time == 1.0s
backoff_factor=2.0, # double it each time reset() is called
max_sleep_time=30.0 # sleep time is capped at 30s
)
With the above parameters:
Formula for calculating the next sleep time (applied on ):
_current_sleep_time = min(
_current_sleep_time * backoff_factor,
max_sleep_time
)
Jitter adds randomness to sleep durations.
When multiple processes trip their circuits at the same time, they all wake up at the same time, which puts pressure on the system. Jitter spreads out the wake-up times to prevent this.
The jitter parameter is a decimal (0.2), not a percentage (20).
Values are clamped to [0.0, 1.0].
For a jitter of 0.2 and a sleep_duration of 1.0, the range is [0.8, 1.2].
All state access is protected by a reentrant lock (threading.RLock).
A reentrant lock is needed because _break_circuit() may be called from and and both need lock access.
The sleep operation itself happens outside the lock to avoid blocking other threads during the sleep.
All public properties acquire the lock for reads.
@property
def broken (self) -> bool:
with self._lock:
return self._broken
This ensures that all reads are consistent and thread-safe.
supports async usage via the _AsyncableMethod pattern from .
_AsyncableMethod wraps a sync method and an async method into a single attribute that can be called either way.
short = _AsyncableMethod(_sync_short, _async_short)
Usage:
# sync usage
circ.short()
# async usage
await circ.short .asynced ()()
The async versions use asyncio.sleep() instead of blocking time.sleep():
async def _async_break_circuit(self, sleep_duration: float) -> None:
with self._lock:
self._broken = True
self._times_shorted = 0
sleep_duration = self._apply_jitter(sleep_duration)
if sleep_duration > 0:
await asyncio.sleep(sleep_duration)
The lock usage is the same - only the sleep call differs.
Methods with async support:
short () - via short .asynced ()()trip () - via trip .asynced ()()Methods like , , and properties do not need async versions because they don't sleep.
includes _shared_meta for integration with .
_shared_meta is a dictionary that declares which attributes each method/property reads from or writes to. The class uses this metadata to synchronize state across processes.
_shared_meta = {
'methods': {
'short': {'writes': ['_times_shorted', '_total_trips', '_broken']},
'trip': {'writes': ['_total_trips', '_broken', '_times_shorted']},
'reset': {'writes': ['_broken', '_times_shorted', '_current_sleep_time']},
'reset_backoff': {'writes': ['_current_sleep_time']},
},
'properties': {
'broken': {'reads': ['_broken']},
'times_shorted': {'reads': ['_times_shorted']},
'total_trips': {'reads': ['_total_trips']},
'current_sleep_time': {'reads': ['_current_sleep_time']},
}
}
This allows a instance to wrap a circuit and automatically synchronize state across multiple processes.
uses for blocking sleeps:
from suitkaise .timing import api as timing
# in _break_circuit():
timing .sleep(sleep_duration)
This uses the timing module's sleep implementation, which provides consistent behavior across different environments.