Skip to content

context_aware_warnings isn't thread-safe when showwarning is patched #151149

@seamuswn

Description

@seamuswn

Bug report

Bug description:

When showwarning is patched (most commonly by setting logging.captureWarnings(True), the warnings.catch_warnings context manager still isn't thread safe with context_aware_warnings enabled.

In Python 3.14 even with context_aware_warnings flag enabled, there is a bug in asyncio / multithread contexts when using warnings.catch_warnings and logging.captureWarnings(True) since both attempt to patch warnings.showwarning. Write me a minimal reproducible example of this issue to share with the cpython team.
"""
MRE: catch_warnings + captureWarnings race on warnings.showwarning

Python 3.14+ with context_aware_warnings enabled.

Both warnings.catch_warnings() and logging.captureWarnings(True) save/restore
warnings.showwarning by *value at call time* — a plain reference copy.  Under
concurrent execution the save/restore pairs interleave, leaking the wrong
showwarning into the global module state.

Scenario that demonstrates the interleaving:

    T1 catch_warnings.__enter__   → saved = showwarning      (= warn_orig)
    T1 captureWarnings(True)      → saved = showwarning      (= warn_orig)
                                   showwarning = log_showwarning
    T2 catch_warnings.__enter__   → saved = showwarning      (= log_showwarning ← T1's patch!)
    T2 captureWarnings(True)      → showwarning = log_showwarning
    T1 captureWarnings(False)     → showwarning = warn_orig  (correct)
    T1 catch_warnings.__exit__    → showwarning = warn_orig  (correct, saved in step 1)
    T2 captureWarnings(False)     → showwarning = log_showwarning  (BUG — saved T1's patch)
    T2 catch_warnings.__exit__    → showwarning = log_showwarning  (BUG — persists)

After all threads exit, warnings.showwarning is still logging.showwarning even
though every thread called captureWarnings(False).

Two reproductions:
  1. ThreadPoolExecutor (preemptive — natural race)
  2. asyncio with explicit yield points (cooperative — deterministic)
"""

import asyncio
import concurrent.futures
import logging
import threading
import warnings
import sys

try:
    warnings._context_aware_warnings = True
except AttributeError:
    sys.exit("Python 3.14+ required for context_aware_warnings")

logging.basicConfig(level=logging.WARNING, force=True)

ORIGINAL_SHOWWARNING = warnings.showwarning


# ── Reproduction 1: threads (preemptive) ──────────────────────────────

_thread_errors: list[str] = []
_thread_lock = threading.Lock()


def _thread_worker(n: int) -> None:
    with warnings.catch_warnings():
        warnings.simplefilter("always")
        logging.captureWarnings(True)
        warnings.warn(f"thread worker {n}", UserWarning)
        logging.captureWarnings(False)

    if warnings.showwarning is not ORIGINAL_SHOWWARNING:
        with _thread_lock:
            _thread_errors.append(
                f"worker {n}: showwarning is {warnings.showwarning!r} "
                f"instead of {ORIGINAL_SHOWWARNING!r}"
            )


def reproduce_with_threads() -> None:
    for _ in range(200):
        with concurrent.futures.ThreadPoolExecutor(max_workers=8) as pool:
            list(pool.map(_thread_worker, range(32)))


# ── Reproduction 2: asyncio (cooperative with explicit yield points) ──

_async_errors: list[str] = []
_async_lock = threading.Lock()

# Barriers to orchestrate the interleaving shown at the top of this file.
_barrier_1 = asyncio.Event()  # T2 waits for T1 to finish captureWarnings(True)
_barrier_2 = asyncio.Event()  # T1 waits for T2 to finish catch_warnings __enter__
_barrier_3 = asyncio.Event()  # T2 waits for T1 to exit catch_warnings


async def _async_worker_a(n: int) -> None:
    """Task A: the "early" task that sets captureWarnings before B enters."""
    with warnings.catch_warnings():
        warnings.simplefilter("always")
        logging.captureWarnings(True)
        _barrier_1.set()           # T1: captureWarnings(True) done
        await asyncio.sleep(0)     # yield so B enters catch_warnings

        await _barrier_2.wait()    # wait until B has done __enter__
        logging.captureWarnings(False)
        # __exit__ restores showwarning to what it was on __enter__ (= original)
    _barrier_3.set()               # signal B to continue


async def _async_worker_b(n: int) -> None:
    """Task B: enters catch_warnings while A's captureWarnings(True) is active."""
    await _barrier_1.wait()        # wait for A's captureWarnings(True)
    with warnings.catch_warnings():
        # __enter__ saves showwarning — which is currently A's logging.showwarning!
        _barrier_2.set()           # signal A to continue
        await asyncio.sleep(0)     # yield so A calls captureWarnings(False) + __exit__

        await _barrier_3.wait()    # A is done
        logging.captureWarnings(True)
        await asyncio.sleep(0)
        logging.captureWarnings(False)
        # __exit__ restores showwarning to what it was on __enter__ (= logging.showwarning)
    # At this point showwarning is logging.showwarning — even though
    # captureWarnings(False) was called.  This is wrong.

    if warnings.showwarning is not ORIGINAL_SHOWWARNING:
        async with _async_lock:
            _async_errors.append(
                f"async scenario: showwarning is {warnings.showwarning!r} "
                f"instead of {ORIGINAL_SHOWWARNING!r}"
            )


async def reproduce_with_asyncio() -> None:
    _barrier_1.clear()
    _barrier_2.clear()
    _barrier_3.clear()
    await asyncio.gather(_async_worker_a(0), _async_worker_b(0))


# ── Report ────────────────────────────────────────────────────────────

def _report(backend: str, errors: list[str]) -> None:
    print(f"\n{'─'*70}")
    print(f"[{backend}]")
    print(f"{'─'*70}")
    if errors:
        print(f"  BUG REPRODUCED — {len(errors)} occurrence(s):")
        for err in errors:
            print(f"    • {err}")
    else:
        print("  No bug detected (race is timing-dependent; try again)")
    sw = warnings.showwarning
    print(f"  warnings.showwarning: original={ORIGINAL_SHOWWARNING!r}, current={sw!r}")
    print(f"  {'✓ match' if sw is ORIGINAL_SHOWWARNING else '✗ LEAKED'}")


def main() -> None:
    reproduce_with_threads()
    _report("ThreadPoolExecutor (200×32 workers)", _thread_errors)

    _async_errors.clear()
    for _ in range(20):
        asyncio.run(reproduce_with_asyncio())
    _report("asyncio (20 deterministic interleavings)", _async_errors)


if __name__ == "__main__":
    main()

CPython versions tested on:

3.14

Operating systems tested on:

macOS

Metadata

Metadata

Assignees

No one assigned

    Labels

    type-bugAn unexpected behavior, bug, or error
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions