Files
enginex-mthreads-vllm/tests/utils_/test_async_utils.py

43 lines
1.3 KiB
Python
Raw Permalink Normal View History

2026-01-19 10:38:50 +08:00
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
2026-01-09 13:34:11 +08:00
import asyncio
2026-01-19 10:38:50 +08:00
from collections.abc import AsyncIterator
2026-01-09 13:34:11 +08:00
import pytest
2026-01-19 10:38:50 +08:00
from vllm.utils.async_utils import merge_async_iterators
2026-01-09 13:34:11 +08:00
2026-01-19 10:38:50 +08:00
async def _mock_async_iterator(idx: int):
try:
while True:
yield f"item from iterator {idx}"
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print(f"iterator {idx} cancelled")
2026-01-09 13:34:11 +08:00
2026-01-19 10:38:50 +08:00
@pytest.mark.asyncio
async def test_merge_async_iterators():
iterators = [_mock_async_iterator(i) for i in range(3)]
merged_iterator = merge_async_iterators(*iterators)
2026-01-09 13:34:11 +08:00
2026-01-19 10:38:50 +08:00
async def stream_output(generator: AsyncIterator[tuple[int, str]]):
2026-01-09 13:34:11 +08:00
async for idx, output in generator:
print(f"idx: {idx}, output: {output}")
task = asyncio.create_task(stream_output(merged_iterator))
await asyncio.sleep(0.5)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
for iterator in iterators:
try:
await asyncio.wait_for(anext(iterator), 1)
except StopAsyncIteration:
# All iterators should be cancelled and print this message.
print("Iterator was cancelled normally")
except (Exception, asyncio.CancelledError) as e:
raise AssertionError() from e