339 lines
14 KiB
Python
339 lines
14 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
|
import multiprocessing
|
|
from concurrent.futures import Future, ThreadPoolExecutor
|
|
from typing import TYPE_CHECKING
|
|
|
|
from vllm.config import VllmConfig
|
|
from vllm.logger import init_logger
|
|
from vllm.reasoning import ReasoningParserManager
|
|
from vllm.transformers_utils.tokenizer import init_tokenizer_from_configs
|
|
from vllm.utils.import_utils import LazyLoader
|
|
from vllm.v1.structured_output.backend_guidance import GuidanceBackend
|
|
from vllm.v1.structured_output.backend_types import (
|
|
StructuredOutputBackend,
|
|
StructuredOutputGrammar,
|
|
)
|
|
from vllm.v1.structured_output.backend_xgrammar import XgrammarBackend
|
|
|
|
if TYPE_CHECKING:
|
|
import numpy as np
|
|
import numpy.typing as npt
|
|
import torch
|
|
|
|
from vllm.reasoning import ReasoningParser
|
|
from vllm.v1.request import Request
|
|
else:
|
|
torch = LazyLoader("torch", globals(), "torch")
|
|
|
|
ReasoningParser = object
|
|
Request = object
|
|
|
|
logger = init_logger(__name__)
|
|
|
|
|
|
class StructuredOutputManager:
|
|
"""Engine-level manager for structured output requests."""
|
|
|
|
def __init__(self, vllm_config: VllmConfig):
|
|
self.backend: StructuredOutputBackend | None = None
|
|
self.reasoner: ReasoningParser | None = None
|
|
self.vllm_config = vllm_config
|
|
|
|
self._grammar_bitmask: torch.Tensor | None = None
|
|
self._full_mask = torch.tensor(-1, dtype=torch.int32)
|
|
|
|
max_batch_size = self.vllm_config.scheduler_config.max_num_seqs
|
|
self.fill_bitmask_parallel_threshold = 128
|
|
if self.fill_bitmask_parallel_threshold < max_batch_size:
|
|
self.fill_bitmask_parallel_batch_size = 16
|
|
# Use:
|
|
# - at least 1 CPU
|
|
# - at most half the number of CPUs or 8, whichever is less
|
|
max_workers = max(1, min(multiprocessing.cpu_count() // 2, 8))
|
|
self.executor_for_fillmask = ThreadPoolExecutor(max_workers=max_workers)
|
|
|
|
if not self.vllm_config.model_config.skip_tokenizer_init:
|
|
# The default max_workers if not specified is the number of
|
|
# CPUs * 5, which is way too high since these tasks are CPU-bound,
|
|
# not I/O bound. We also know we would never dominate CPU usage
|
|
# with just grammar compilation, so we set it to half the number
|
|
# of CPUs.
|
|
max_workers = max(1, (multiprocessing.cpu_count() + 1) // 2)
|
|
self.executor = ThreadPoolExecutor(max_workers=max_workers)
|
|
self.tokenizer = init_tokenizer_from_configs(
|
|
model_config=self.vllm_config.model_config
|
|
)
|
|
reasoning_parser = (
|
|
self.vllm_config.structured_outputs_config.reasoning_parser
|
|
)
|
|
reasoning_parser_plugin = (
|
|
self.vllm_config.structured_outputs_config.reasoning_parser_plugin
|
|
)
|
|
if reasoning_parser_plugin and len(reasoning_parser_plugin) > 3:
|
|
ReasoningParserManager.import_reasoning_parser(reasoning_parser_plugin)
|
|
|
|
reasoning_parser = (
|
|
self.vllm_config.structured_outputs_config.reasoning_parser
|
|
)
|
|
if reasoning_parser:
|
|
reasoner_cls = ReasoningParserManager.get_reasoning_parser(
|
|
reasoning_parser
|
|
)
|
|
self.reasoner = reasoner_cls(tokenizer=self.tokenizer)
|
|
|
|
self.enable_in_reasoning = (
|
|
self.vllm_config.structured_outputs_config.enable_in_reasoning
|
|
)
|
|
|
|
def grammar_init(self, request: Request) -> None:
|
|
if request.structured_output_request is None:
|
|
return
|
|
|
|
if TYPE_CHECKING:
|
|
assert (
|
|
request.sampling_params is not None
|
|
and request.sampling_params.structured_outputs is not None
|
|
)
|
|
|
|
# Initialize the backend the first time it is needed.
|
|
#
|
|
# NOTE: We only support a single backend. We do NOT support different
|
|
# backends on a per-request basis in V1 (for now, anyway...).
|
|
# _backend is set in Processor._validate_structured_output
|
|
if self.backend is None:
|
|
assert request.sampling_params is not None
|
|
backend = request.sampling_params.structured_outputs._backend
|
|
vocab_size = self.vllm_config.model_config.get_vocab_size()
|
|
if backend == "xgrammar":
|
|
self.backend = XgrammarBackend(
|
|
self.vllm_config,
|
|
tokenizer=self.tokenizer,
|
|
vocab_size=vocab_size,
|
|
)
|
|
elif backend == "guidance":
|
|
self.backend = GuidanceBackend(
|
|
self.vllm_config,
|
|
tokenizer=self.tokenizer,
|
|
vocab_size=vocab_size,
|
|
)
|
|
elif backend == "outlines":
|
|
from vllm.v1.structured_output.backend_outlines import OutlinesBackend
|
|
|
|
self.backend = OutlinesBackend(
|
|
self.vllm_config,
|
|
tokenizer=self.tokenizer,
|
|
vocab_size=vocab_size,
|
|
)
|
|
elif backend == "lm-format-enforcer":
|
|
from vllm.v1.structured_output.backend_lm_format_enforcer import ( # noqa: E501
|
|
LMFormatEnforcerBackend,
|
|
)
|
|
|
|
self.backend = LMFormatEnforcerBackend(
|
|
self.vllm_config,
|
|
tokenizer=self.tokenizer,
|
|
vocab_size=vocab_size,
|
|
)
|
|
else:
|
|
raise ValueError(f"Unsupported structured output backend: {backend}")
|
|
|
|
grammar = self.executor.submit(self._async_create_grammar, request)
|
|
request.structured_output_request.grammar = grammar # type: ignore[assignment]
|
|
|
|
def _async_create_grammar(
|
|
self,
|
|
request: Request,
|
|
) -> StructuredOutputGrammar:
|
|
key = request.structured_output_request.structured_output_key # type: ignore[union-attr]
|
|
|
|
# Note that the request was validated in the engine core client,
|
|
# so at this point we know it is a supported type of request.
|
|
#
|
|
# TODO: we still need to handle xgrammar compilation failures,
|
|
# though it should be unlikely as we test that up front as well.
|
|
request_type, grammar_spec = key
|
|
|
|
assert self.backend is not None
|
|
return self.backend.compile_grammar(request_type, grammar_spec)
|
|
|
|
def _fill_bitmasks(
|
|
self,
|
|
batch: list[tuple[StructuredOutputGrammar, int, bool]],
|
|
) -> None:
|
|
assert self._grammar_bitmask is not None
|
|
for grammar, index, apply_bitmask in batch:
|
|
if apply_bitmask and not grammar.is_terminated():
|
|
grammar.fill_bitmask(self._grammar_bitmask, index)
|
|
else:
|
|
# Note that for thinking support, we will need to
|
|
# reset the relevant part of the bitmask for consequent
|
|
# requests here.
|
|
self._grammar_bitmask[index].fill_(self._full_mask)
|
|
|
|
def _async_submit_fill_bitmask(
|
|
self,
|
|
batch: list[tuple[StructuredOutputGrammar, int, bool]],
|
|
) -> Future:
|
|
return self.executor_for_fillmask.submit(self._fill_bitmasks, batch)
|
|
|
|
def grammar_bitmask(
|
|
self,
|
|
requests: dict[str, Request],
|
|
structured_output_request_ids: list[str],
|
|
scheduled_spec_decode_tokens: dict[str, list[int]],
|
|
) -> "npt.NDArray[np.int32] | None":
|
|
# Prepare the structured output bitmask for this batch.
|
|
if not structured_output_request_ids:
|
|
return None
|
|
|
|
max_num_spec_tokens = 0
|
|
if self.vllm_config.speculative_config is not None:
|
|
max_num_spec_tokens = (
|
|
self.vllm_config.speculative_config.num_speculative_tokens
|
|
)
|
|
|
|
if self._grammar_bitmask is None:
|
|
assert self.backend is not None
|
|
max_batch_size = self.vllm_config.scheduler_config.max_num_seqs
|
|
|
|
# Allocate a bitmask for each token needing to be checked:
|
|
# one for each speculative position, and one more for the
|
|
# bonus token / non-speculative token.
|
|
self._grammar_bitmask = self.backend.allocate_token_bitmask(
|
|
max_batch_size * (1 + max_num_spec_tokens)
|
|
)
|
|
|
|
# Generate a batched bitmask for all structured output requests.
|
|
# When speculative decoding is enabled, we need to include multiple
|
|
# masks for each request, one for each possible bonus token position.
|
|
# These are stored inline in the tensor and unpacked by the gpu runner.
|
|
cumulative_index = 0
|
|
|
|
# Optimized parallel filling of bitmasks for
|
|
# non-spec, large-batch-size cases
|
|
if (
|
|
len(structured_output_request_ids) > self.fill_bitmask_parallel_threshold
|
|
and max_num_spec_tokens == 0
|
|
):
|
|
promises = []
|
|
batch = []
|
|
for req_id in structured_output_request_ids:
|
|
request = requests[req_id]
|
|
structured_output_request = request.structured_output_request
|
|
if TYPE_CHECKING:
|
|
assert structured_output_request is not None
|
|
assert structured_output_request.grammar is not None
|
|
|
|
apply_bitmask = self.should_fill_bitmask(request)
|
|
batch.append(
|
|
(structured_output_request.grammar, cumulative_index, apply_bitmask)
|
|
)
|
|
if len(batch) == self.fill_bitmask_parallel_batch_size:
|
|
promises.append(self._async_submit_fill_bitmask(batch))
|
|
batch = []
|
|
|
|
cumulative_index += 1
|
|
if batch:
|
|
promises.append(self._async_submit_fill_bitmask(batch))
|
|
|
|
# Wait for all bitmask filling tasks to complete.
|
|
for promise in promises:
|
|
promise.result()
|
|
else:
|
|
# Fallback to serial filling of bitmasks for small-batch-size cases
|
|
for req_id in structured_output_request_ids:
|
|
request = requests[req_id]
|
|
structured_output_request = request.structured_output_request
|
|
|
|
if TYPE_CHECKING:
|
|
assert structured_output_request is not None
|
|
assert structured_output_request.grammar is not None
|
|
apply_bitmask = self.should_fill_bitmask(request)
|
|
|
|
state_advancements = 0
|
|
req_tokens = scheduled_spec_decode_tokens.get(req_id, [])
|
|
for i, token in enumerate(req_tokens + [None]):
|
|
self._fill_bitmasks(
|
|
[
|
|
(
|
|
structured_output_request.grammar,
|
|
cumulative_index,
|
|
apply_bitmask,
|
|
)
|
|
]
|
|
)
|
|
|
|
if (
|
|
apply_bitmask
|
|
and token is not None
|
|
and not structured_output_request.grammar.is_terminated()
|
|
):
|
|
accepted = structured_output_request.grammar.accept_tokens(
|
|
req_id, [token]
|
|
)
|
|
assert accepted, (token, req_id, scheduled_spec_decode_tokens)
|
|
state_advancements += 1
|
|
cumulative_index += 1
|
|
if state_advancements > 0:
|
|
structured_output_request.grammar.rollback(state_advancements)
|
|
|
|
bitmask_tensor = self._grammar_bitmask
|
|
if cumulative_index < bitmask_tensor.shape[0]:
|
|
bitmask_tensor = bitmask_tensor[:cumulative_index]
|
|
|
|
# After finishing with the xgrammar operations, we convert to
|
|
# np.ndarray, because that is much more efficient for serialization
|
|
# and deserialization when sending this to the GPU workers.
|
|
return bitmask_tensor.numpy()
|
|
|
|
def should_fill_bitmask(self, request: Request) -> bool:
|
|
# NOTE (Hanchen) if enable_in_reasoning is True, it means that
|
|
# the model needs to be constrained in reasoning. So we should always
|
|
# enable the bitmask filling.
|
|
|
|
if self.reasoner is not None:
|
|
if self.enable_in_reasoning:
|
|
return True
|
|
assert request.structured_output_request is not None
|
|
if request.structured_output_request.reasoning_ended is None:
|
|
request.structured_output_request.reasoning_ended = (
|
|
self.reasoner.is_reasoning_end(request.prompt_token_ids)
|
|
)
|
|
return request.structured_output_request.reasoning_ended
|
|
return True
|
|
|
|
def should_advance(self, request: Request) -> bool:
|
|
if not request.use_structured_output:
|
|
return False
|
|
|
|
# To determine whether we can advance the FSM.
|
|
# Supports thinking usage where we skip the reasoning components.
|
|
if TYPE_CHECKING:
|
|
assert request.structured_output_request is not None
|
|
assert request.structured_output_request.grammar is not None
|
|
# by default, we should always advance
|
|
# for cases that don't use thinking mode.
|
|
if self.reasoner is None:
|
|
return True
|
|
|
|
# if the model needs structured in reasoning, we should advance
|
|
if self.enable_in_reasoning:
|
|
return True
|
|
|
|
structured_req = request.structured_output_request
|
|
if structured_req.reasoning_ended:
|
|
return True
|
|
|
|
# Check if reasoning ends in *this* step
|
|
if self.reasoner.is_reasoning_end(request.all_token_ids):
|
|
# Reasoning just ended, so we shouldn't advance til
|
|
# next pass
|
|
structured_req.reasoning_ended = True
|
|
|
|
return False
|
|
|
|
def clear_backend(self) -> None:
|
|
if self.backend is not None:
|
|
self.backend.destroy()
|