timing actually works provides simple and powerful timing utilities for measuring execution time, collecting statistics, and analyzing performance.
Sktimer - statistical timer with thread-safe concurrent sessionsTimerStats - frozen snapshot of timer statisticsTimeThis - context manager for timing code blocks@timethis - decorator for timing function executionstime() , sleep() , elapsed ()time() Get the current Unix timestamp.
def time() -> float:
return time.time()
Returns
float: Current Unix timestamp.
Directly wraps Python's time.time().
sleep() Sleep the current thread.
def _sync_sleep(seconds: float) -> float:
time.sleep(seconds)
return time.time()
async def _async_sleep(seconds: float) -> float:
await asyncio.sleep(seconds)
return time.time()
_sleep_impl = _AsyncableFunction(_sync_sleep, _async_sleep, name='sleep')
def sleep(seconds: float) -> float:
return _sleep_impl(seconds)
sleep.asynced = _sleep_impl.asynced
Arguments
seconds: Number of seconds to sleep.
floatReturns
float: Current time after sleeping.
The function uses _AsyncableFunction to provide both sync and async implementations:
time.sleep() internallyasyncio.sleep() internally via .asynced ()elapsed ()Calculate elapsed time between timestamps.
def _elapsed_time(time1: float, time2: Optional[float] = None) -> float:
if time2 is None:
time2 = time.time()
# return absolute difference so order doesn't matter
return fabs(time2 - time1)
Arguments
time1: First timestamp.
floattime2: Second timestamp.
float | None = NoneReturns
float: Absolute elapsed time in seconds.
Uses math.fabs() to always return positive value regardless of argument order.
Sktimer Statistical timer for collecting and analyzing execution times.
Arguments
max_times: Rolling window size.
int | None = NoneNone, keeps all measurementsReturns
: A new timer instance.
original_start_time: float | None Earliest start time across all sessions. Set on first call.
times: list[float] Aggregated list of all recorded measurements across all sessions.
_paused_durations: list[float] Parallel list of paused durations for each recorded measurement.
_max_times: int | None Rolling window size. None means keep all.
_lock: threading.RLock Thread safety lock for manager state.
_sessions: dict[int, TimerSession] Per-thread timing sessions, keyed by thread ident.
uses a session-per-thread model for concurrent safety.
Thread 1 Thread 2
│ │
├─ start() ──┐ ├─ start() ──┐
│ │ │ │
│ ┌──────────────────┐ │ ┌──────────────────┐
│ │ TimerSession 1 │ │ │ TimerSession 2 │
│ │ frames: [f1] │ │ │ frames: [f2] │
│ └──────────────────┘ │ └──────────────────┘
│ │ │ │
├─ stop() ───┘ ├─ stop() ───┘
│ │
└──────────────────────────────┬──────────────────────────────→ times[]
│
aggregate under _lock
Each thread gets its own TimerSession. Results aggregate into shared times list protected by _lock.
start ()Start timing a new measurement.
def start (self) -> float:
# warn if there's already an active frame
if self._has_active_frame():
warnings.warn(
"Sktimer.start() called while timing is already in progress. "
"This creates a nested timing frame.",
UserWarning,
stacklevel=2
)
# get or create session for current thread
sess = self._get_or_create_session()
started = sess.start()
with self._lock:
if self.original_start_time is None:
self.original_start_time = started
return started
TimerSession for current threadsess.start() to push a new frameoriginal_start_time if this is the first ever startReturns
float: Start timestamp from perf_counter().
stop ()Stop timing and record the measurement.
def stop (self) -> float:
sess = self._get_or_create_session()
elapsed, paused_total = sess.stop()
with self._lock:
self.times .append(elapsed)
self._paused_durations.append(paused_total)
self._trim_to_max()
return elapsed
sess.stop() to pop frame and get elapsed timetimes and _paused_durationsReturns
float: Elapsed time for this measurement.
Raises
RuntimeError: If timer was not started.
discard ()Stop timing but do NOT record.
def discard (self) -> float:
sess = self._get_or_create_session()
elapsed, _ = sess.stop()
# intentionally NOT appending to times or _paused_durations
return elapsed
sess.stop() to pop frameReturns
float: Elapsed time that was discarded.
lap ()Record a lap time (stop + start in one call).
def lap (self) -> float:
sess = self._get_or_create_session()
elapsed, paused_total = sess.lap()
with self._lock:
self.times .append(elapsed)
self._paused_durations.append(paused_total)
self._trim_to_max()
return elapsed
sess.lap() which records elapsed and restarts frameReturns
float: Elapsed time for this lap.
pause () / resume ()Pause and resume the current timing measurement.
def pause (self) -> None:
sess = self._get_or_create_session()
sess.pause()
def resume (self) -> None:
sess = self._get_or_create_session()
sess.resume()
Delegates to session which tracks pause state in the current frame.
add_time ()Manually add a timing measurement.
def add_time (self, elapsed_time: float) -> None:
with self._lock:
self.times .append(elapsed_time)
self._paused_durations.append(0.0)
self._trim_to_max()
Directly appends to times with zero paused duration.
set_max_times ()Set the rolling window size.
def set_max_times (self, max_times: Optional[int]) -> None:
if max_times is not None and max_times <= 0:
raise ValueError("max_times must be a positive integer or None")
with self._lock:
self._max_times = max_times
self._trim_to_max()
def _trim_to_max(self) -> None:
if self._max_times is None:
return
excess = len(self.times ) - self._max_times
if excess <= 0:
return
del self.times [:excess]
del self._paused_durations[:excess]
When max_times is set, oldest measurements are discarded to keep only the most recent N.
reset ()Clear all timing measurements.
def reset (self) -> None:
with self._lock:
self.times .clear()
self.original_start_time = None
self._sessions.clear()
self._paused_durations.clear()
Resets all state including per-thread sessions.
All statistics properties are computed live from times list under lock.
@property
def mean (self) -> Optional[float]:
with self._lock:
return statistics.mean(self.times ) if self.times else None
@property
def stdev (self) -> Optional[float]:
with self._lock:
if len(self.times ) <= 1:
return None
return statistics.stdev(self.times )
Available properties:
num_times, most_recent, result, most_recent_indextotal_time, total_time_pausedmean, median, min, maxfastest_time, slowest_time, fastest_index, slowest_indexstdev, variance, max_timespercentile ()Calculate any percentile using linear interpolation.
def percentile (self, percent: float) -> Optional[float]:
with self._lock:
if not self.times :
return None
if not 0 <= percent <= 100:
raise ValueError("Percentile must be between 0 and 100")
sorted_times = sorted(self.times )
index = (percent / 100) * (len(sorted_times) - 1)
if index == int(index):
return sorted_times[int(index)]
# linear interpolation
lower_index = int(index)
upper_index = lower_index + 1
weight = index - lower_index
return (sorted_times[lower_index] * (1 - weight) +
sorted_times[upper_index] * weight)
get_statistics() / get_stats()Get a frozen snapshot.
def get_statistics(self) -> Optional[TimerStats]:
with self._lock:
if not self.times :
return None
return TimerStats(self.times , self.original_start_time, self._paused_durations)
Returns
A TimerStats object with copied data that won't change.
defines _shared_meta for use with :
_shared_meta = {
'methods': {
'start': {'writes': ['_sessions', 'original_start_time']},
'stop': {'writes': ['times', '_paused_durations']},
'discard': {'writes': []},
'lap': {'writes': ['times', '_paused_durations']},
'pause': {'writes': ['_sessions']},
'resume': {'writes': ['_sessions']},
'add_time': {'writes': ['times', '_paused_durations']},
'set_max_times': {'writes': ['times', '_paused_durations', '_max_times']},
'reset': {'writes': ['times', '_sessions', '_paused_durations', 'original_start_time']},
'get_statistics': {'writes': []},
'get_stats': {'writes': []},
'get_time': {'writes': []},
'percentile': {'writes': []},
},
'properties': {
'num_times': {'reads': ['times']},
'most_recent': {'reads': ['times']},
# ... etc
}
}
This metadata declares which attributes each method/property reads or writes, enabling the to coordinate synchronization.
TimerSessionPer-thread timing session supporting nested frames.
class TimerSession:
def __init__(self, manager: Sktimer ):
self._manager = manager
self._frames: Deque[Dict[str, Any]] = deque()
self._lock = threading.RLock()
Each timing frame is a dict:
frame = {
'start_time': float, # perf_counter() at start
'paused': bool, # currently paused?
'pause_started_at': float, # when pause began (or None)
'total_paused': float, # accumulated paused time
}
start ()Push a new frame onto the stack.
def start (self) -> float:
with self._lock:
frame = {
'start_time': self._now(),
'paused': False,
'pause_started_at': None,
'total_paused': 0.0,
}
self._frames.append(frame)
return frame['start_time']
Uses perf_counter() for high-resolution monotonic timing.
stop ()Pop the top frame and return elapsed time.
def stop (self) -> tuple[float, float]:
with self._lock:
frame = self._top()
elapsed = self._elapsed_from_frame(frame)
paused_total = self._paused_total_from_frame(frame)
self._frames.pop()
return elapsed, paused_total
lap ()Record elapsed time and restart the frame.
def lap (self) -> tuple[float, float]:
with self._lock:
frame = self._top()
elapsed = self._elapsed_from_frame(frame)
paused_total = self._paused_total_from_frame(frame)
# restart frame
frame['start_time'] = self._now()
frame['total_paused'] = 0.0
frame['paused'] = False
frame['pause_started_at'] = None
return elapsed, paused_total
Keeps the frame but resets its timing state.
pause () / resume ()def pause (self) -> None:
with self._lock:
frame = self._top()
if frame['paused']:
warnings.warn("Sktimer is already paused.", UserWarning, stacklevel=2)
return
frame['paused'] = True
frame['pause_started_at'] = self._now()
def resume (self) -> None:
with self._lock:
frame = self._top()
if not frame['paused']:
warnings.warn("Sktimer is not paused.", UserWarning, stacklevel=2)
return
pause_duration = self._now() - frame['pause_started_at']
frame['total_paused'] += pause_duration
frame['paused'] = False
frame['pause_started_at'] = None
def _elapsed_from_frame(self, frame: Dict[str, Any]) -> float:
end = self._now()
paused_extra = 0.0
# if currently paused, add time since pause started
if frame['paused'] and frame['pause_started_at'] is not None:
paused_extra = end - frame['pause_started_at']
return (end - frame['start_time']) - (frame['total_paused'] + paused_extra)
Total elapsed = (end - start) - total paused time.
TimerStatsFrozen snapshot of timer statistics.
class TimerStats:
def __init__(self, times: List[float], original_start_time: Optional[float], paused_durations: List[float]):
self.times = times.copy() # copy for immutability
self.original_start_time = original_start_time
self.num_times = len(times)
self.most_recent = times[-1] if times else None
self.most_recent_index = len(times) - 1 if times else None
self.total_time = sum(times) if times else None
self.total_time_paused = sum(paused_durations) if paused_durations else None
self.mean = statistics.mean(times) if times else None
self.median = statistics.median(times) if times else None
self.min = min(times) if times else None
self.max = max(times) if times else None
self.stdev = statistics.stdev(times) if len(times) > 1 else None
self.variance = statistics.variance(times) if len(times) > 1 else None
# ... etc
All values are computed once at construction time and stored as attributes.
percentile ()Same algorithm as but operates on the frozen times copy.
TimeThis Context ManagerContext manager wrapper around .
class TimeThis :
def __init__(self, timer: Optional[Sktimer ] = None, threshold: float = 0.0):
self.timer = timer or Sktimer( )
self.threshold = threshold
def __enter__(self):
self.timer .start ()
return self.timer
def __exit__(self, exc_type, exc_val, exc_tb):
# Get elapsed time without recording
elapsed = self.timer .discard ()
# Only record if above threshold
if elapsed >= self.threshold:
self.timer .add_time (elapsed)
__enter__: Call timer .start (), return timer for as clause__exit__: Call timer .discard () to get elapsed without recordingadd_time () if above thresholdDelegates to the underlying timer:
def pause (self):
self.timer .pause ()
def resume (self):
self.timer .resume ()
def lap (self):
self.timer .lap ()
timethis DecoratorDecorator that times function executions.
def timethis (
timer: Optional[Sktimer ] = None,
threshold: float = 0.0,
max_times: Optional[int] = None,
) -> Callable:
def decorator(func: Callable) -> Callable:
if timer is not None:
if max_times is not None:
timer .set_max_times (max_times)
wrapper = _timethis_decorator(timer, threshold)(func)
# ...
return wrapper
Uses provided timer directly.
def decorator(func: Callable) -> Callable:
# ...
else:
# extract module name
frame = inspect.currentframe()
module_name = frame.f_back.f_globals.get('__name__', 'unknown')
if '.' in module_name:
module_name = module_name.split('.')[-1]
# build timer name from function qualname
func_qualname = func.__qualname__
if '.' in func_qualname:
class_name, func_name = func_qualname.rsplit('.', 1)
timer_name = f"{module_name}_{class_name}_{func_name}_timer"
else:
timer_name = f"{module_name}_{func_qualname}_timer"
# get or create global timer (thread-safe)
if not hasattr(timethis , '_global_timers'):
setattr(timethis , '_global_timers', {})
setattr(timethis , '_timers_lock', threading.RLock())
lock = getattr(timethis , '_timers_lock')
with lock:
global_timers = getattr(timethis , '_global_timers')
if timer_name not in global_timers:
global_timers[timer_name] = Sktimer( max_times=max_times)
wrapper = _timethis_decorator(global_timers[timer_name], threshold)(func)
setattr(wrapper, 'timer', global_timers[timer_name])
return wrapper
__qualname__.timer foo() in mymodule.py: mymodule_foo_timerBar.baz() in mymodule.py: mymodule_Bar_baz_timer_timethis_decorator()The actual timing wrapper:
def _timethis_decorator(timer: Sktimer , threshold: float = 0.0):
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
# avoid nested timing frames on the same timer
if timer ._has_active_frame():
start = perf_counter()
try:
return func(*args, **kwargs)
finally:
elapsed = perf_counter() - start
if elapsed >= threshold:
timer .add_time (elapsed)
else:
timer .start ()
try:
result = func(*args, **kwargs)
return result
finally:
elapsed = timer .discard ()
if elapsed >= threshold:
timer .add_time (elapsed)
return wrapper
return decorator
Two paths:
perf_counter() directly to avoid nested framesstart ()/discard ()/add_time () flowBoth paths only record if elapsed >= threshold.
clear_global_timers ()Clear auto-created timers.
def clear_global_timers () -> None:
if hasattr(timethis , '_timers_lock') and hasattr(timethis, '_global_timers'):
lock = getattr(timethis , '_timers_lock')
with lock:
timers = getattr(timethis , '_global_timers')
timers.clear()
Thread-safe clearing of the global timer registry.
is fully thread-safe:
_lock): Protects times, _paused_durations, _sessionsTimerSession has its own lock for frame operationstimethis ._timers_lock protects auto-created timer registrythreading.RLock (reentrant lock) is used because operations may call each other.