mirror of
https://github.com/pybind/pybind11.git
synced 2026-05-13 17:56:02 +00:00
* fix: segfault when moving `scoped_ostream_redirect` The default move constructor left the stream (`std::cout`) pointing at the moved-from `pythonbuf`, whose internal buffer and streambuf pointers were nulled by the move. Any subsequent write through the stream dereferenced null, causing a segfault. Replace `= default` with an explicit move constructor that re-points the stream to the new buffer and disarms the moved-from destructor. * fix: mark move constructor noexcept to satisfy clang-tidy * fix: use bool flag instead of nullptr sentinel for moved-from state Using `old == nullptr` as the moved-from sentinel was incorrect because nullptr is a valid original rdbuf() value (e.g. `std::ostream os(nullptr)`). Replace with an explicit `active` flag so the destructor correctly restores nullptr buffers. Add tests for the nullptr-rdbuf edge case. * fix: remove noexcept and propagate active flag from source - Remove noexcept: pythonbuf inherits from std::streambuf whose move is not guaranteed nothrow on all implementations. Suppress clang-tidy with NOLINTNEXTLINE instead. - Initialize active from other.active so that moving an already moved-from object does not incorrectly re-activate the redirect. - Only rebind the stream and disarm the source when active. * test: add unflushed ostream redirect regression Cover the buffered-before-move case for `scoped_ostream_redirect`, which still crashes despite the current move fix. This gives the PR a direct reproducer for the remaining bug path. Made-with: Cursor * fix: disarm moved-from pythonbuf after redirect move The redirect guard now survives moves, but buffered output could still remain in the moved-from `pythonbuf` and be flushed during destruction through moved-out Python handles. Rebuild the destination put area from the transferred storage and clear the source put area so unflushed bytes follow the active redirect instead of crashing in the moved-from destructor. Made-with: Cursor --------- Co-authored-by: Ralf W. Grosse-Kunstleve <rgrossekunst@nvidia.com>
330 lines
8.0 KiB
Python
330 lines
8.0 KiB
Python
from __future__ import annotations
|
|
|
|
import sys
|
|
from contextlib import redirect_stderr, redirect_stdout
|
|
from io import StringIO
|
|
|
|
import pytest
|
|
|
|
import env
|
|
from pybind11_tests import iostream as m
|
|
|
|
if env.WIN:
|
|
wv_build = sys.getwindowsversion().build
|
|
skip_if_ge = 26100
|
|
if wv_build >= skip_if_ge:
|
|
pytest.skip(
|
|
f"Windows build {wv_build} >= {skip_if_ge}:"
|
|
" Skipping iostream capture (redirection regression needs investigation)",
|
|
allow_module_level=True,
|
|
)
|
|
|
|
|
|
def test_captured(capsys):
|
|
msg = "I've been redirected to Python, I hope!"
|
|
m.captured_output(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
m.captured_output_default(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
m.captured_err(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert not stdout
|
|
assert stderr == msg
|
|
|
|
|
|
def test_captured_large_string(capsys):
|
|
# Make this bigger than the buffer used on the C++ side: 1024 chars
|
|
msg = "I've been redirected to Python, I hope!"
|
|
msg = msg * (1024 // len(msg) + 1)
|
|
|
|
m.captured_output_default(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
|
|
def test_captured_utf8_2byte_offset0(capsys):
|
|
msg = "\u07ff"
|
|
msg = "" + msg * (1024 // len(msg) + 1)
|
|
|
|
m.captured_output_default(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
|
|
def test_captured_utf8_2byte_offset1(capsys):
|
|
msg = "\u07ff"
|
|
msg = "1" + msg * (1024 // len(msg) + 1)
|
|
|
|
m.captured_output_default(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
|
|
def test_captured_utf8_3byte_offset0(capsys):
|
|
msg = "\uffff"
|
|
msg = "" + msg * (1024 // len(msg) + 1)
|
|
|
|
m.captured_output_default(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
|
|
def test_captured_utf8_3byte_offset1(capsys):
|
|
msg = "\uffff"
|
|
msg = "1" + msg * (1024 // len(msg) + 1)
|
|
|
|
m.captured_output_default(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
|
|
def test_captured_utf8_3byte_offset2(capsys):
|
|
msg = "\uffff"
|
|
msg = "12" + msg * (1024 // len(msg) + 1)
|
|
|
|
m.captured_output_default(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
|
|
def test_captured_utf8_4byte_offset0(capsys):
|
|
msg = "\U0010ffff"
|
|
msg = "" + msg * (1024 // len(msg) + 1)
|
|
|
|
m.captured_output_default(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
|
|
def test_captured_utf8_4byte_offset1(capsys):
|
|
msg = "\U0010ffff"
|
|
msg = "1" + msg * (1024 // len(msg) + 1)
|
|
|
|
m.captured_output_default(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
|
|
def test_captured_utf8_4byte_offset2(capsys):
|
|
msg = "\U0010ffff"
|
|
msg = "12" + msg * (1024 // len(msg) + 1)
|
|
|
|
m.captured_output_default(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
|
|
def test_captured_utf8_4byte_offset3(capsys):
|
|
msg = "\U0010ffff"
|
|
msg = "123" + msg * (1024 // len(msg) + 1)
|
|
|
|
m.captured_output_default(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
|
|
def test_guard_capture(capsys):
|
|
msg = "I've been redirected to Python, I hope!"
|
|
m.guard_output(msg)
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
|
|
|
|
def test_series_captured(capture):
|
|
with capture:
|
|
m.captured_output("a")
|
|
m.captured_output("b")
|
|
assert capture == "ab"
|
|
|
|
|
|
def test_flush(capfd):
|
|
msg = "(not flushed)"
|
|
msg2 = "(flushed)"
|
|
|
|
with m.ostream_redirect():
|
|
m.noisy_function(msg, flush=False)
|
|
stdout, stderr = capfd.readouterr()
|
|
assert not stdout
|
|
|
|
m.noisy_function(msg2, flush=True)
|
|
stdout, stderr = capfd.readouterr()
|
|
assert stdout == msg + msg2
|
|
|
|
m.noisy_function(msg, flush=False)
|
|
|
|
stdout, stderr = capfd.readouterr()
|
|
assert stdout == msg
|
|
|
|
|
|
def test_not_captured(capfd):
|
|
msg = "Something that should not show up in log"
|
|
stream = StringIO()
|
|
with redirect_stdout(stream):
|
|
m.raw_output(msg)
|
|
stdout, stderr = capfd.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
assert not stream.getvalue()
|
|
|
|
stream = StringIO()
|
|
with redirect_stdout(stream):
|
|
m.captured_output(msg)
|
|
stdout, stderr = capfd.readouterr()
|
|
assert not stdout
|
|
assert not stderr
|
|
assert stream.getvalue() == msg
|
|
|
|
|
|
def test_err(capfd):
|
|
msg = "Something that should not show up in log"
|
|
stream = StringIO()
|
|
with redirect_stderr(stream):
|
|
m.raw_err(msg)
|
|
stdout, stderr = capfd.readouterr()
|
|
assert not stdout
|
|
assert stderr == msg
|
|
assert not stream.getvalue()
|
|
|
|
stream = StringIO()
|
|
with redirect_stderr(stream):
|
|
m.captured_err(msg)
|
|
stdout, stderr = capfd.readouterr()
|
|
assert not stdout
|
|
assert not stderr
|
|
assert stream.getvalue() == msg
|
|
|
|
|
|
def test_multi_captured(capfd):
|
|
stream = StringIO()
|
|
with redirect_stdout(stream):
|
|
m.captured_output("a")
|
|
m.raw_output("b")
|
|
m.captured_output("c")
|
|
m.raw_output("d")
|
|
stdout, stderr = capfd.readouterr()
|
|
assert stdout == "bd"
|
|
assert stream.getvalue() == "ac"
|
|
|
|
|
|
def test_dual(capsys):
|
|
m.captured_dual("a", "b")
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == "a"
|
|
assert stderr == "b"
|
|
|
|
|
|
def test_redirect(capfd):
|
|
msg = "Should not be in log!"
|
|
stream = StringIO()
|
|
with redirect_stdout(stream):
|
|
m.raw_output(msg)
|
|
stdout, stderr = capfd.readouterr()
|
|
assert stdout == msg
|
|
assert not stream.getvalue()
|
|
|
|
stream = StringIO()
|
|
with redirect_stdout(stream), m.ostream_redirect():
|
|
m.raw_output(msg)
|
|
stdout, stderr = capfd.readouterr()
|
|
assert not stdout
|
|
assert stream.getvalue() == msg
|
|
|
|
stream = StringIO()
|
|
with redirect_stdout(stream):
|
|
m.raw_output(msg)
|
|
stdout, stderr = capfd.readouterr()
|
|
assert stdout == msg
|
|
assert not stream.getvalue()
|
|
|
|
|
|
def test_redirect_err(capfd):
|
|
msg = "StdOut"
|
|
msg2 = "StdErr"
|
|
|
|
stream = StringIO()
|
|
with redirect_stderr(stream), m.ostream_redirect(stdout=False):
|
|
m.raw_output(msg)
|
|
m.raw_err(msg2)
|
|
stdout, stderr = capfd.readouterr()
|
|
assert stdout == msg
|
|
assert not stderr
|
|
assert stream.getvalue() == msg2
|
|
|
|
|
|
def test_redirect_both(capfd):
|
|
msg = "StdOut"
|
|
msg2 = "StdErr"
|
|
|
|
stream = StringIO()
|
|
stream2 = StringIO()
|
|
with redirect_stdout(stream), redirect_stderr(stream2), m.ostream_redirect():
|
|
m.raw_output(msg)
|
|
m.raw_err(msg2)
|
|
stdout, stderr = capfd.readouterr()
|
|
assert not stdout
|
|
assert not stderr
|
|
assert stream.getvalue() == msg
|
|
assert stream2.getvalue() == msg2
|
|
|
|
|
|
def test_move_redirect(capsys):
|
|
m.move_redirect_output("before_move", "after_move")
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == "before_moveafter_move"
|
|
assert not stderr
|
|
|
|
|
|
def test_move_redirect_unflushed(capsys):
|
|
m.move_redirect_output_unflushed("before_move", "after_move")
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == "before_moveafter_move"
|
|
assert not stderr
|
|
|
|
|
|
def test_move_redirect_null_rdbuf(capsys):
|
|
m.move_redirect_null_rdbuf("hello")
|
|
stdout, stderr = capsys.readouterr()
|
|
assert stdout == "hellohello"
|
|
assert not stderr
|
|
|
|
|
|
def test_null_rdbuf_restored():
|
|
assert m.get_null_rdbuf_restored("test")
|
|
|
|
|
|
@pytest.mark.skipif(sys.platform.startswith("emscripten"), reason="Requires threads")
|
|
def test_threading():
|
|
with m.ostream_redirect(stdout=True, stderr=False):
|
|
# start some threads
|
|
threads = [m.TestThread() for _j in range(20)]
|
|
|
|
# give the threads some time to fail
|
|
threads[0].sleep()
|
|
|
|
# stop all the threads
|
|
for t in threads:
|
|
t.stop()
|
|
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# if a thread segfaults, we don't get here
|
|
assert True
|