init
This commit is contained in:
0
vllm/core/__init__.py
Normal file
0
vllm/core/__init__.py
Normal file
BIN
vllm/core/__pycache__/__init__.cpython-310.pyc
Normal file
BIN
vllm/core/__pycache__/__init__.cpython-310.pyc
Normal file
Binary file not shown.
BIN
vllm/core/__pycache__/block_manager.cpython-310.pyc
Normal file
BIN
vllm/core/__pycache__/block_manager.cpython-310.pyc
Normal file
Binary file not shown.
BIN
vllm/core/__pycache__/policy.cpython-310.pyc
Normal file
BIN
vllm/core/__pycache__/policy.cpython-310.pyc
Normal file
Binary file not shown.
BIN
vllm/core/__pycache__/scheduler.cpython-310.pyc
Normal file
BIN
vllm/core/__pycache__/scheduler.cpython-310.pyc
Normal file
Binary file not shown.
330
vllm/core/block_manager.py
Normal file
330
vllm/core/block_manager.py
Normal file
@@ -0,0 +1,330 @@
|
||||
"""A block manager that manages token blocks."""
|
||||
import enum
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
|
||||
from vllm.block import BlockTable, PhysicalTokenBlock
|
||||
from vllm.sequence import Sequence, SequenceGroup, SequenceStatus
|
||||
from vllm.utils import Device
|
||||
|
||||
|
||||
class BlockAllocator:
|
||||
"""Manages free physical token blocks for a device.
|
||||
|
||||
The allocator maintains a list of free blocks and allocates a block when
|
||||
requested. When a block is freed, its reference count is decremented. If
|
||||
the reference count becomes zero, the block is added back to the free list.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
device: Device,
|
||||
block_size: int,
|
||||
num_blocks: int,
|
||||
) -> None:
|
||||
self.device = device
|
||||
self.block_size = block_size
|
||||
self.num_blocks = num_blocks
|
||||
|
||||
# Initialize the free blocks.
|
||||
self.free_blocks: BlockTable = []
|
||||
for i in range(num_blocks):
|
||||
block = PhysicalTokenBlock(device=device,
|
||||
block_number=i,
|
||||
block_size=block_size)
|
||||
self.free_blocks.append(block)
|
||||
|
||||
def allocate(self) -> PhysicalTokenBlock:
|
||||
if not self.free_blocks:
|
||||
raise ValueError("Out of memory! No free blocks are available.")
|
||||
block = self.free_blocks.pop()
|
||||
block.ref_count = 1
|
||||
return block
|
||||
|
||||
def free(self, block: PhysicalTokenBlock) -> None:
|
||||
if block.ref_count == 0:
|
||||
raise ValueError(f"Double free! {block} is already freed.")
|
||||
block.ref_count -= 1
|
||||
if block.ref_count == 0:
|
||||
self.free_blocks.append(block)
|
||||
|
||||
def get_num_free_blocks(self) -> int:
|
||||
return len(self.free_blocks)
|
||||
|
||||
|
||||
class AllocStatus(enum.Enum):
|
||||
"""Result for BlockSpaceManager.can_allocate
|
||||
|
||||
1. Ok: seq_group can be allocated now.
|
||||
2. Later: seq_group cannot be allocated.
|
||||
The capacity of allocator is larger than seq_group required.
|
||||
3. Never: seq_group can never be allocated.
|
||||
The seq_group is too large to allocated in GPU.
|
||||
"""
|
||||
OK = enum.auto()
|
||||
LATER = enum.auto()
|
||||
NEVER = enum.auto()
|
||||
|
||||
|
||||
class BlockSpaceManager:
|
||||
"""Manages the mapping between logical and physical token blocks."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
block_size: int,
|
||||
num_gpu_blocks: int,
|
||||
num_cpu_blocks: int,
|
||||
watermark: float = 0.01,
|
||||
sliding_window: Optional[int] = None,
|
||||
) -> None:
|
||||
self.block_size = block_size
|
||||
self.num_total_gpu_blocks = num_gpu_blocks
|
||||
self.num_total_cpu_blocks = num_cpu_blocks
|
||||
|
||||
self.block_sliding_window = None
|
||||
if sliding_window is not None:
|
||||
assert sliding_window % block_size == 0, (sliding_window,
|
||||
block_size)
|
||||
self.block_sliding_window = sliding_window // block_size
|
||||
|
||||
self.watermark = watermark
|
||||
assert watermark >= 0.0
|
||||
|
||||
self.watermark_blocks = int(watermark * num_gpu_blocks)
|
||||
self.gpu_allocator = BlockAllocator(Device.GPU, block_size,
|
||||
num_gpu_blocks)
|
||||
self.cpu_allocator = BlockAllocator(Device.CPU, block_size,
|
||||
num_cpu_blocks)
|
||||
# Mapping: seq_id -> BlockTable.
|
||||
self.block_tables: Dict[int, BlockTable] = {}
|
||||
|
||||
def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus:
|
||||
# FIXME(woosuk): Here we assume that all sequences in the group share
|
||||
# the same prompt. This may not be true for preempted sequences.
|
||||
seq = seq_group.get_seqs(status=SequenceStatus.WAITING)[0]
|
||||
num_required_blocks = len(seq.logical_token_blocks)
|
||||
|
||||
if seq_group.prefix is not None and seq_group.prefix.allocated:
|
||||
num_required_blocks -= seq_group.prefix.get_num_blocks()
|
||||
|
||||
if self.block_sliding_window is not None:
|
||||
num_required_blocks = min(num_required_blocks,
|
||||
self.block_sliding_window)
|
||||
num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()
|
||||
|
||||
# Use watermark to avoid frequent cache eviction.
|
||||
if (self.num_total_gpu_blocks - num_required_blocks <
|
||||
self.watermark_blocks):
|
||||
return AllocStatus.NEVER
|
||||
if num_free_gpu_blocks - num_required_blocks >= self.watermark_blocks:
|
||||
return AllocStatus.OK
|
||||
else:
|
||||
return AllocStatus.LATER
|
||||
|
||||
def allocate(self, seq_group: SequenceGroup) -> None:
|
||||
# NOTE: Here we assume that all sequences in the group have the same
|
||||
# prompt.
|
||||
seq = seq_group.get_seqs(status=SequenceStatus.WAITING)[0]
|
||||
|
||||
# Allocate new physical token blocks that will store the prompt tokens.
|
||||
num_prompt_blocks = len(seq.logical_token_blocks)
|
||||
|
||||
block_table: BlockTable = []
|
||||
prefix_block_table: BlockTable = []
|
||||
num_prefix_blocks = 0
|
||||
|
||||
prefix = seq_group.prefix
|
||||
if prefix is not None and prefix.allocated:
|
||||
# Prefix has already been allocated. Use the existing block table.
|
||||
num_prompt_blocks -= prefix.get_num_blocks()
|
||||
for block in prefix.block_table:
|
||||
block.ref_count += seq_group.num_seqs()
|
||||
block_table.append(block)
|
||||
|
||||
for logical_idx in range(num_prompt_blocks):
|
||||
if (self.block_sliding_window is not None
|
||||
and logical_idx >= self.block_sliding_window):
|
||||
block = block_table[logical_idx % self.block_sliding_window]
|
||||
else:
|
||||
block = self.gpu_allocator.allocate()
|
||||
# Set the reference counts of the token blocks.
|
||||
block.ref_count = seq_group.num_seqs()
|
||||
block_table.append(block)
|
||||
|
||||
if prefix is not None and not prefix.allocated:
|
||||
# Allocate blocks for the prefix, we will compute the prefix's
|
||||
# KV cache in this run.
|
||||
num_prefix_blocks = prefix.get_num_blocks()
|
||||
prefix_block_table = block_table[:num_prefix_blocks]
|
||||
for block in prefix_block_table:
|
||||
block.ref_count += 1
|
||||
prefix.set_block_table(prefix_block_table)
|
||||
|
||||
# Assign the block table for each sequence.
|
||||
for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
|
||||
self.block_tables[seq.seq_id] = block_table.copy()
|
||||
|
||||
def can_append_slot(self, seq_group: SequenceGroup) -> bool:
|
||||
# Simple heuristic: If there is at least one free block
|
||||
# for each sequence, we can append.
|
||||
num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()
|
||||
num_seqs = seq_group.num_seqs(status=SequenceStatus.RUNNING)
|
||||
return num_seqs <= num_free_gpu_blocks
|
||||
|
||||
def append_slot(self, seq: Sequence) -> Optional[Tuple[int, int]]:
|
||||
"""Allocate a physical slot for a new token."""
|
||||
logical_blocks = seq.logical_token_blocks
|
||||
block_table = self.block_tables[seq.seq_id]
|
||||
|
||||
if len(block_table) < len(logical_blocks):
|
||||
if (self.block_sliding_window
|
||||
and len(block_table) >= self.block_sliding_window):
|
||||
# reuse a block
|
||||
block_table.append(block_table[len(block_table) %
|
||||
self.block_sliding_window])
|
||||
else:
|
||||
# The sequence has a new logical block.
|
||||
# Allocate a new physical block.
|
||||
block = self.gpu_allocator.allocate()
|
||||
block_table.append(block)
|
||||
return None
|
||||
|
||||
# We want to append the token to the last physical block.
|
||||
last_block = block_table[-1]
|
||||
assert last_block.device == Device.GPU
|
||||
if last_block.ref_count == 1:
|
||||
# Not shared with other sequences. Appendable.
|
||||
return None
|
||||
else:
|
||||
# The last block is shared with other sequences.
|
||||
# Copy on Write: Allocate a new block and copy the tokens.
|
||||
new_block = self.gpu_allocator.allocate()
|
||||
block_table[-1] = new_block
|
||||
self.gpu_allocator.free(last_block)
|
||||
return last_block.block_number, new_block.block_number
|
||||
|
||||
def fork(self, parent_seq: Sequence, child_seq: Sequence) -> None:
|
||||
# NOTE: fork does not allocate a new physical block.
|
||||
# Thus, it is always safe from OOM.
|
||||
src_block_table = self.block_tables[parent_seq.seq_id]
|
||||
self.block_tables[child_seq.seq_id] = src_block_table.copy()
|
||||
for block in src_block_table:
|
||||
block.ref_count += 1
|
||||
|
||||
def _get_physical_blocks(
|
||||
self, seq_group: SequenceGroup) -> List[PhysicalTokenBlock]:
|
||||
# NOTE: Here, we assume that the physical blocks are only shared by
|
||||
# the sequences in the same group.
|
||||
blocks: Set[PhysicalTokenBlock] = set()
|
||||
for seq in seq_group.get_seqs():
|
||||
if seq.is_finished():
|
||||
continue
|
||||
blocks.update(self.block_tables[seq.seq_id])
|
||||
return list(blocks)
|
||||
|
||||
def can_swap_in(self, seq_group: SequenceGroup) -> bool:
|
||||
blocks = self._get_physical_blocks(seq_group)
|
||||
num_swapped_seqs = seq_group.num_seqs(status=SequenceStatus.SWAPPED)
|
||||
num_free_blocks = self.gpu_allocator.get_num_free_blocks()
|
||||
# NOTE: Conservatively, we assume that every sequence will allocate
|
||||
# at least one free block right after the swap-in.
|
||||
# NOTE: This should match the logic in can_append_slot().
|
||||
num_required_blocks = len(blocks) + num_swapped_seqs
|
||||
return num_free_blocks - num_required_blocks >= self.watermark_blocks
|
||||
|
||||
def swap_in(self, seq_group: SequenceGroup) -> Dict[int, int]:
|
||||
# CPU block -> GPU block.
|
||||
if seq_group.prefix is not None:
|
||||
# make sure to swap in the prefix first
|
||||
assert seq_group.prefix.allocated and seq_group.prefix.computed
|
||||
|
||||
mapping: Dict[PhysicalTokenBlock, PhysicalTokenBlock] = {}
|
||||
for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
|
||||
new_block_table: BlockTable = []
|
||||
block_table = self.block_tables[seq.seq_id]
|
||||
if seq_group.prefix is not None:
|
||||
for block in seq_group.prefix.block_table:
|
||||
new_block_table.append(block)
|
||||
block.ref_count += 1
|
||||
|
||||
for cpu_block in block_table:
|
||||
if cpu_block in mapping:
|
||||
gpu_block = mapping[cpu_block]
|
||||
gpu_block.ref_count += 1
|
||||
else:
|
||||
gpu_block = self.gpu_allocator.allocate()
|
||||
mapping[cpu_block] = gpu_block
|
||||
new_block_table.append(gpu_block)
|
||||
# Free the CPU block swapped in to GPU.
|
||||
self.cpu_allocator.free(cpu_block)
|
||||
self.block_tables[seq.seq_id] = new_block_table
|
||||
|
||||
block_number_mapping = {
|
||||
cpu_block.block_number: gpu_block.block_number
|
||||
for cpu_block, gpu_block in mapping.items()
|
||||
}
|
||||
return block_number_mapping
|
||||
|
||||
def can_swap_out(self, seq_group: SequenceGroup) -> bool:
|
||||
blocks = self._get_physical_blocks(seq_group)
|
||||
return len(blocks) <= self.cpu_allocator.get_num_free_blocks()
|
||||
|
||||
def swap_out(self, seq_group: SequenceGroup) -> Dict[int, int]:
|
||||
# GPU block -> CPU block.
|
||||
mapping: Dict[PhysicalTokenBlock, PhysicalTokenBlock] = {}
|
||||
for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
|
||||
new_block_table: BlockTable = []
|
||||
block_table = self.block_tables[seq.seq_id]
|
||||
|
||||
for gpu_block in block_table:
|
||||
if (seq_group.prefix is not None
|
||||
and gpu_block in seq_group.prefix.block_table):
|
||||
# NOTE: We do not swap out the prefix blocks for now.
|
||||
self.gpu_allocator.free(gpu_block)
|
||||
continue
|
||||
|
||||
if gpu_block in mapping:
|
||||
cpu_block = mapping[gpu_block]
|
||||
cpu_block.ref_count += 1
|
||||
else:
|
||||
cpu_block = self.cpu_allocator.allocate()
|
||||
mapping[gpu_block] = cpu_block
|
||||
new_block_table.append(cpu_block)
|
||||
# Free the GPU block swapped out to CPU.
|
||||
self.gpu_allocator.free(gpu_block)
|
||||
self.block_tables[seq.seq_id] = new_block_table
|
||||
|
||||
block_number_mapping = {
|
||||
gpu_block.block_number: cpu_block.block_number
|
||||
for gpu_block, cpu_block in mapping.items()
|
||||
}
|
||||
return block_number_mapping
|
||||
|
||||
def _free_block_table(self, block_table: BlockTable) -> None:
|
||||
for block in set(block_table):
|
||||
if block.device == Device.GPU:
|
||||
self.gpu_allocator.free(block)
|
||||
else:
|
||||
self.cpu_allocator.free(block)
|
||||
|
||||
def free(self, seq: Sequence) -> None:
|
||||
if seq.seq_id not in self.block_tables:
|
||||
# Already freed or haven't been scheduled yet.
|
||||
return
|
||||
block_table = self.block_tables[seq.seq_id]
|
||||
self._free_block_table(block_table)
|
||||
del self.block_tables[seq.seq_id]
|
||||
|
||||
def reset(self) -> None:
|
||||
for block_table in self.block_tables.values():
|
||||
self._free_block_table(block_table)
|
||||
self.block_tables.clear()
|
||||
|
||||
def get_block_table(self, seq: Sequence) -> List[int]:
|
||||
block_table = self.block_tables[seq.seq_id]
|
||||
return [block.block_number for block in block_table]
|
||||
|
||||
def get_num_free_gpu_blocks(self) -> int:
|
||||
return self.gpu_allocator.get_num_free_blocks()
|
||||
|
||||
def get_num_free_cpu_blocks(self) -> int:
|
||||
return self.cpu_allocator.get_num_free_blocks()
|
||||
47
vllm/core/policy.py
Normal file
47
vllm/core/policy.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from collections import deque
|
||||
from typing import Deque
|
||||
|
||||
from vllm.sequence import SequenceGroup
|
||||
|
||||
|
||||
class Policy:
|
||||
|
||||
def get_priority(
|
||||
self,
|
||||
now: float,
|
||||
seq_group: SequenceGroup,
|
||||
) -> float:
|
||||
raise NotImplementedError
|
||||
|
||||
def sort_by_priority(
|
||||
self,
|
||||
now: float,
|
||||
seq_groups: Deque[SequenceGroup],
|
||||
) -> Deque[SequenceGroup]:
|
||||
return deque(
|
||||
sorted(
|
||||
seq_groups,
|
||||
key=lambda seq_group: self.get_priority(now, seq_group),
|
||||
reverse=True,
|
||||
))
|
||||
|
||||
|
||||
class FCFS(Policy):
|
||||
|
||||
def get_priority(
|
||||
self,
|
||||
now: float,
|
||||
seq_group: SequenceGroup,
|
||||
) -> float:
|
||||
return now - seq_group.metrics.arrival_time
|
||||
|
||||
|
||||
class PolicyFactory:
|
||||
|
||||
_POLICY_REGISTRY = {
|
||||
'fcfs': FCFS,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_policy(cls, policy_name: str, **kwargs) -> Policy:
|
||||
return cls._POLICY_REGISTRY[policy_name](**kwargs)
|
||||
498
vllm/core/scheduler.py
Normal file
498
vllm/core/scheduler.py
Normal file
@@ -0,0 +1,498 @@
|
||||
from collections import deque
|
||||
import enum
|
||||
import time
|
||||
from typing import Deque, Dict, Iterable, List, Optional, Tuple, Union, Set
|
||||
|
||||
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
|
||||
from vllm.core.block_manager import AllocStatus, BlockSpaceManager
|
||||
from vllm.core.policy import PolicyFactory
|
||||
from vllm.lora.request import LoRARequest
|
||||
from vllm.logger import init_logger
|
||||
from vllm.sequence import (Sequence, SequenceData, SequenceGroup,
|
||||
SequenceGroupMetadata, SequenceStatus)
|
||||
from vllm.prefix import PrefixPool
|
||||
|
||||
logger = init_logger(__name__)
|
||||
|
||||
|
||||
class PreemptionMode(enum.Enum):
|
||||
"""Preemption modes.
|
||||
|
||||
1. Swapping: Swap out the blocks of the preempted sequences to CPU memory
|
||||
and swap them back in when the sequences are resumed.
|
||||
2. Recomputation: Discard the blocks of the preempted sequences and
|
||||
recompute them when the sequences are resumed, treating the sequences as
|
||||
new prompts.
|
||||
"""
|
||||
SWAP = enum.auto()
|
||||
RECOMPUTE = enum.auto()
|
||||
|
||||
|
||||
class SchedulerOutputs:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
scheduled_seq_groups: Iterable[SequenceGroup],
|
||||
prompt_run: bool,
|
||||
num_batched_tokens: int,
|
||||
blocks_to_swap_in: Dict[int, int],
|
||||
blocks_to_swap_out: Dict[int, int],
|
||||
blocks_to_copy: Dict[int, List[int]],
|
||||
ignored_seq_groups: List[SequenceGroup],
|
||||
) -> None:
|
||||
self.scheduled_seq_groups = scheduled_seq_groups
|
||||
self.prompt_run = prompt_run
|
||||
self.num_batched_tokens = num_batched_tokens
|
||||
self.blocks_to_swap_in = blocks_to_swap_in
|
||||
self.blocks_to_swap_out = blocks_to_swap_out
|
||||
self.blocks_to_copy = blocks_to_copy
|
||||
# Swap in and swap out should never happen at the same time.
|
||||
assert not (blocks_to_swap_in and blocks_to_swap_out)
|
||||
self.ignored_seq_groups = ignored_seq_groups
|
||||
|
||||
self.num_loras = len(self.lora_requests)
|
||||
if self.num_loras > 0:
|
||||
self._sort_by_lora_ids()
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
# NOTE: We do not consider the ignored sequence groups.
|
||||
return (not self.scheduled_seq_groups and not self.blocks_to_swap_in
|
||||
and not self.blocks_to_swap_out and not self.blocks_to_copy)
|
||||
|
||||
def _sort_by_lora_ids(self) -> bool:
|
||||
self.scheduled_seq_groups = sorted(
|
||||
self.scheduled_seq_groups,
|
||||
key=lambda g: (g.lora_request.lora_int_id
|
||||
if g.lora_request else 0, g.request_id))
|
||||
|
||||
@property
|
||||
def lora_requests(self) -> Set[LoRARequest]:
|
||||
return {g.lora_request for g in self.scheduled_seq_groups}
|
||||
|
||||
|
||||
class Scheduler:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
scheduler_config: SchedulerConfig,
|
||||
cache_config: CacheConfig,
|
||||
lora_config: Optional[LoRAConfig],
|
||||
) -> None:
|
||||
self.scheduler_config = scheduler_config
|
||||
self.cache_config = cache_config
|
||||
# Note for LoRA scheduling: the current policy is extremely
|
||||
# simple and NOT fair. It can lead to starvation of some
|
||||
# LoRAs. This should be improved in the future.
|
||||
self.lora_config = lora_config
|
||||
|
||||
self.prompt_limit = min(self.scheduler_config.max_model_len,
|
||||
self.scheduler_config.max_num_batched_tokens)
|
||||
|
||||
# Instantiate the scheduling policy.
|
||||
self.policy = PolicyFactory.get_policy(policy_name="fcfs")
|
||||
# Create the block space manager.
|
||||
self.block_manager = BlockSpaceManager(
|
||||
block_size=self.cache_config.block_size,
|
||||
num_gpu_blocks=self.cache_config.num_gpu_blocks,
|
||||
num_cpu_blocks=self.cache_config.num_cpu_blocks,
|
||||
sliding_window=self.cache_config.sliding_window)
|
||||
|
||||
# Create the prefix pool to cache the prefixes.
|
||||
self.prefix_pool = PrefixPool(self.cache_config.block_size)
|
||||
|
||||
# Sequence groups in the WAITING state.
|
||||
self.waiting: Deque[SequenceGroup] = deque()
|
||||
# Sequence groups in the RUNNING state.
|
||||
self.running: Deque[SequenceGroup] = deque()
|
||||
# Sequence groups in the SWAPPED state.
|
||||
self.swapped: Deque[SequenceGroup] = deque()
|
||||
|
||||
@property
|
||||
def lora_enabled(self) -> bool:
|
||||
return bool(self.lora_config)
|
||||
|
||||
def add_seq_group(self, seq_group: SequenceGroup) -> None:
|
||||
# Add sequence groups to the waiting queue.
|
||||
self.waiting.append(seq_group)
|
||||
|
||||
def abort_seq_group(self, request_id: Union[str, Iterable[str]]) -> None:
|
||||
"""Aborts a sequence group with the given ID.
|
||||
|
||||
Check if the sequence group with the given ID
|
||||
is present in any of the state queue.
|
||||
If present, remove the sequence group from the state queue.
|
||||
Also, if any of the sequences in the sequence group is not finished,
|
||||
free the sequence with status `FINISHED_ABORTED`.
|
||||
Otherwise, do nothing.
|
||||
|
||||
Args:
|
||||
request_id: The ID(s) of the sequence group to abort.
|
||||
"""
|
||||
if isinstance(request_id, str):
|
||||
request_id = (request_id, )
|
||||
request_ids = set(request_id)
|
||||
for state_queue in [self.waiting, self.running, self.swapped]:
|
||||
aborted_groups: List[SequenceGroup] = []
|
||||
for seq_group in state_queue:
|
||||
if not request_ids:
|
||||
# Using 'break' here may add two extra iterations,
|
||||
# but is acceptable to reduce complexity .
|
||||
break
|
||||
if seq_group.request_id in request_ids:
|
||||
# Appending aborted group into pending list.
|
||||
aborted_groups.append(seq_group)
|
||||
request_ids.remove(seq_group.request_id)
|
||||
for aborted_group in aborted_groups:
|
||||
# Remove the sequence group from the state queue.
|
||||
state_queue.remove(aborted_group)
|
||||
for seq in aborted_group.get_seqs():
|
||||
if seq.is_finished():
|
||||
continue
|
||||
seq.status = SequenceStatus.FINISHED_ABORTED
|
||||
self.free_seq(seq)
|
||||
|
||||
def has_unfinished_seqs(self) -> bool:
|
||||
return self.waiting or self.running or self.swapped
|
||||
|
||||
def get_num_unfinished_seq_groups(self) -> int:
|
||||
return len(self.waiting) + len(self.running) + len(self.swapped)
|
||||
|
||||
def _schedule(self) -> SchedulerOutputs:
|
||||
# Blocks that need to be swapped or copied before model execution.
|
||||
blocks_to_swap_in: Dict[int, int] = {}
|
||||
blocks_to_swap_out: Dict[int, int] = {}
|
||||
blocks_to_copy: Dict[int, List[int]] = {}
|
||||
|
||||
# Fix the current time.
|
||||
now = time.monotonic()
|
||||
|
||||
# Join waiting sequences if possible.
|
||||
if not self.swapped:
|
||||
ignored_seq_groups: List[SequenceGroup] = []
|
||||
scheduled: List[SequenceGroup] = []
|
||||
# The total number of sequences on the fly, including the
|
||||
# requests in the generation phase.
|
||||
num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
|
||||
for seq_group in self.running)
|
||||
curr_loras = set(
|
||||
seq_group.lora_int_id
|
||||
for seq_group in self.running) if self.lora_enabled else None
|
||||
seq_lens: List[int] = []
|
||||
|
||||
# Optimization: We do not sort the waiting queue since the preempted
|
||||
# sequence groups are added to the front and the new sequence groups
|
||||
# are added to the back.
|
||||
leftover_waiting_sequences = deque()
|
||||
while self.waiting:
|
||||
seq_group = self.waiting[0]
|
||||
waiting_seqs = seq_group.get_seqs(
|
||||
status=SequenceStatus.WAITING)
|
||||
assert len(waiting_seqs) == 1, (
|
||||
"Waiting sequence group should have only one prompt "
|
||||
"sequence.")
|
||||
num_prompt_tokens = waiting_seqs[0].get_len()
|
||||
if num_prompt_tokens > self.prompt_limit:
|
||||
logger.warning(
|
||||
f"Input prompt ({num_prompt_tokens} tokens) is too long"
|
||||
f" and exceeds limit of {self.prompt_limit}")
|
||||
for seq in waiting_seqs:
|
||||
seq.status = SequenceStatus.FINISHED_IGNORED
|
||||
ignored_seq_groups.append(seq_group)
|
||||
self.waiting.popleft()
|
||||
continue
|
||||
|
||||
# If the sequence group cannot be allocated, stop.
|
||||
can_allocate = self.block_manager.can_allocate(seq_group)
|
||||
if can_allocate == AllocStatus.LATER:
|
||||
break
|
||||
elif can_allocate == AllocStatus.NEVER:
|
||||
logger.warning(
|
||||
f"Input prompt ({num_prompt_tokens} tokens) is too long"
|
||||
f" and exceeds the capacity of block_manager")
|
||||
for seq in waiting_seqs:
|
||||
seq.status = SequenceStatus.FINISHED_IGNORED
|
||||
ignored_seq_groups.append(seq_group)
|
||||
self.waiting.popleft()
|
||||
continue
|
||||
|
||||
lora_int_id = 0
|
||||
if self.lora_enabled:
|
||||
lora_int_id = seq_group.lora_int_id
|
||||
if lora_int_id > 0 and lora_int_id not in curr_loras and len(
|
||||
curr_loras) >= self.lora_config.max_loras:
|
||||
# We don't have a space for another LoRA, so
|
||||
# we ignore this request for now.
|
||||
leftover_waiting_sequences.appendleft(seq_group)
|
||||
self.waiting.popleft()
|
||||
continue
|
||||
|
||||
# If the number of batched tokens exceeds the limit, stop.
|
||||
new_seq_lens = seq_lens + [num_prompt_tokens]
|
||||
num_batched_tokens = len(new_seq_lens) * max(new_seq_lens)
|
||||
if (num_batched_tokens >
|
||||
self.scheduler_config.max_num_batched_tokens):
|
||||
break
|
||||
|
||||
# The total number of sequences in the RUNNING state should not
|
||||
# exceed the maximum number of sequences.
|
||||
num_new_seqs = seq_group.get_max_num_running_seqs()
|
||||
if (num_curr_seqs + num_new_seqs >
|
||||
self.scheduler_config.max_num_seqs):
|
||||
break
|
||||
|
||||
num_paddings = num_batched_tokens - sum(new_seq_lens)
|
||||
if num_paddings > self.scheduler_config.max_paddings:
|
||||
break
|
||||
seq_lens = new_seq_lens
|
||||
|
||||
if lora_int_id > 0:
|
||||
curr_loras.add(lora_int_id)
|
||||
self.waiting.popleft()
|
||||
self._allocate(seq_group)
|
||||
self.running.append(seq_group)
|
||||
num_curr_seqs += num_new_seqs
|
||||
scheduled.append(seq_group)
|
||||
|
||||
self.waiting.extendleft(leftover_waiting_sequences)
|
||||
|
||||
if scheduled or ignored_seq_groups:
|
||||
scheduler_outputs = SchedulerOutputs(
|
||||
scheduled_seq_groups=scheduled,
|
||||
prompt_run=True,
|
||||
num_batched_tokens=len(seq_lens) *
|
||||
max(seq_lens) if seq_lens else 0,
|
||||
blocks_to_swap_in=blocks_to_swap_in,
|
||||
blocks_to_swap_out=blocks_to_swap_out,
|
||||
blocks_to_copy=blocks_to_copy,
|
||||
ignored_seq_groups=ignored_seq_groups,
|
||||
)
|
||||
return scheduler_outputs
|
||||
|
||||
# NOTE(woosuk): Preemption happens only when there is no available slot
|
||||
# to keep all the sequence groups in the RUNNING state.
|
||||
# In this case, the policy is responsible for deciding which sequence
|
||||
# groups to preempt.
|
||||
self.running = self.policy.sort_by_priority(now, self.running)
|
||||
|
||||
# Reserve new token slots for the running sequence groups.
|
||||
running: Deque[SequenceGroup] = deque()
|
||||
preempted: List[SequenceGroup] = []
|
||||
while self.running:
|
||||
seq_group = self.running.popleft()
|
||||
while not self.block_manager.can_append_slot(seq_group):
|
||||
if self.running:
|
||||
# Preempt the lowest-priority sequence groups.
|
||||
victim_seq_group = self.running.pop()
|
||||
self._preempt(victim_seq_group, blocks_to_swap_out)
|
||||
preempted.append(victim_seq_group)
|
||||
else:
|
||||
# No other sequence groups can be preempted.
|
||||
# Preempt the current sequence group.
|
||||
self._preempt(seq_group, blocks_to_swap_out)
|
||||
preempted.append(seq_group)
|
||||
break
|
||||
else:
|
||||
# Append new slots to the sequence group.
|
||||
self._append_slot(seq_group, blocks_to_copy)
|
||||
running.append(seq_group)
|
||||
self.running = running
|
||||
|
||||
# Swap in the sequence groups in the SWAPPED state if possible.
|
||||
self.swapped = self.policy.sort_by_priority(now, self.swapped)
|
||||
if not preempted:
|
||||
num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
|
||||
for seq_group in self.running)
|
||||
curr_loras = set(
|
||||
seq_group.lora_int_id
|
||||
for seq_group in self.running) if self.lora_enabled else None
|
||||
|
||||
leftover_swapped = deque()
|
||||
|
||||
while self.swapped:
|
||||
seq_group = self.swapped[0]
|
||||
lora_int_id = 0
|
||||
if self.lora_enabled:
|
||||
lora_int_id = seq_group.lora_int_id
|
||||
if lora_int_id > 0 and lora_int_id not in curr_loras and len(
|
||||
curr_loras) >= self.lora_config.max_loras:
|
||||
# We don't have a space for another LoRA, so
|
||||
# we ignore this request for now.
|
||||
leftover_swapped.appendleft(seq_group)
|
||||
self.swapped.popleft()
|
||||
continue
|
||||
|
||||
# If the sequence group cannot be swapped in, stop.
|
||||
if not self.block_manager.can_swap_in(seq_group):
|
||||
break
|
||||
|
||||
# The total number of sequences in the RUNNING state should not
|
||||
# exceed the maximum number of sequences.
|
||||
num_new_seqs = seq_group.get_max_num_running_seqs()
|
||||
if (num_curr_seqs + num_new_seqs >
|
||||
self.scheduler_config.max_num_seqs):
|
||||
break
|
||||
|
||||
if lora_int_id > 0:
|
||||
curr_loras.add(lora_int_id)
|
||||
self.swapped.popleft()
|
||||
self._swap_in(seq_group, blocks_to_swap_in)
|
||||
self._append_slot(seq_group, blocks_to_copy)
|
||||
num_curr_seqs += num_new_seqs
|
||||
self.running.append(seq_group)
|
||||
|
||||
self.swapped.extendleft(leftover_swapped)
|
||||
|
||||
# Each sequence in the generation phase only takes one token slot.
|
||||
# Therefore, the number of batched tokens is equal to the number of
|
||||
# sequences in the RUNNING state.
|
||||
num_batched_tokens = sum(
|
||||
seq_group.num_seqs(status=SequenceStatus.RUNNING)
|
||||
for seq_group in self.running)
|
||||
|
||||
scheduler_outputs = SchedulerOutputs(
|
||||
scheduled_seq_groups=self.running,
|
||||
prompt_run=False,
|
||||
num_batched_tokens=num_batched_tokens,
|
||||
blocks_to_swap_in=blocks_to_swap_in,
|
||||
blocks_to_swap_out=blocks_to_swap_out,
|
||||
blocks_to_copy=blocks_to_copy,
|
||||
ignored_seq_groups=[],
|
||||
)
|
||||
return scheduler_outputs
|
||||
|
||||
def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
|
||||
# Schedule sequence groups.
|
||||
# This function call changes the internal states of the scheduler
|
||||
# such as self.running, self.swapped, and self.waiting.
|
||||
scheduler_outputs = self._schedule()
|
||||
now = time.time()
|
||||
|
||||
# Create input data structures.
|
||||
seq_group_metadata_list: List[SequenceGroupMetadata] = []
|
||||
for seq_group in scheduler_outputs.scheduled_seq_groups:
|
||||
seq_group.maybe_set_first_scheduled_time(now)
|
||||
|
||||
seq_data: Dict[int, SequenceData] = {}
|
||||
block_tables: Dict[int, List[int]] = {}
|
||||
for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
|
||||
seq_id = seq.seq_id
|
||||
seq_data[seq_id] = seq.data
|
||||
block_tables[seq_id] = self.block_manager.get_block_table(seq)
|
||||
|
||||
seq_group_metadata = SequenceGroupMetadata(
|
||||
request_id=seq_group.request_id,
|
||||
is_prompt=scheduler_outputs.prompt_run,
|
||||
seq_data=seq_data,
|
||||
sampling_params=seq_group.sampling_params,
|
||||
block_tables=block_tables,
|
||||
lora_request=seq_group.lora_request,
|
||||
prefix=seq_group.prefix,
|
||||
state=seq_group.state,
|
||||
)
|
||||
seq_group_metadata_list.append(seq_group_metadata)
|
||||
return seq_group_metadata_list, scheduler_outputs
|
||||
|
||||
def fork_seq(self, parent_seq: Sequence, child_seq: Sequence) -> None:
|
||||
self.block_manager.fork(parent_seq, child_seq)
|
||||
|
||||
def free_seq(self, seq: Sequence) -> None:
|
||||
self.block_manager.free(seq)
|
||||
|
||||
def free_finished_seq_groups(self) -> None:
|
||||
self.running = deque(seq_group for seq_group in self.running
|
||||
if not seq_group.is_finished())
|
||||
|
||||
def _allocate(self, seq_group: SequenceGroup) -> None:
|
||||
self.block_manager.allocate(seq_group)
|
||||
for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
|
||||
seq.status = SequenceStatus.RUNNING
|
||||
|
||||
def _append_slot(
|
||||
self,
|
||||
seq_group: SequenceGroup,
|
||||
blocks_to_copy: Dict[int, List[int]],
|
||||
) -> None:
|
||||
for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
|
||||
ret = self.block_manager.append_slot(seq)
|
||||
if ret is not None:
|
||||
src_block, dst_block = ret
|
||||
if src_block in blocks_to_copy:
|
||||
blocks_to_copy[src_block].append(dst_block)
|
||||
else:
|
||||
blocks_to_copy[src_block] = [dst_block]
|
||||
|
||||
def _preempt(
|
||||
self,
|
||||
seq_group: SequenceGroup,
|
||||
blocks_to_swap_out: Dict[int, int],
|
||||
preemption_mode: Optional[PreemptionMode] = None,
|
||||
) -> None:
|
||||
# If preemption mode is not specified, we determine the mode as follows:
|
||||
# We use recomputation by default since it incurs lower overhead than
|
||||
# swapping. However, when the sequence group has multiple sequences
|
||||
# (e.g., beam search), recomputation is not currently supported. In
|
||||
# such a case, we use swapping instead.
|
||||
# FIXME(woosuk): This makes our scheduling policy a bit bizarre.
|
||||
# As swapped sequences are prioritized over waiting sequences,
|
||||
# sequence groups with multiple sequences are implicitly prioritized
|
||||
# over sequence groups with a single sequence.
|
||||
# TODO(woosuk): Support recomputation for sequence groups with multiple
|
||||
# sequences. This may require a more sophisticated CUDA kernel.
|
||||
if preemption_mode is None:
|
||||
if seq_group.get_max_num_running_seqs() == 1:
|
||||
preemption_mode = PreemptionMode.RECOMPUTE
|
||||
else:
|
||||
preemption_mode = PreemptionMode.SWAP
|
||||
if preemption_mode == PreemptionMode.RECOMPUTE:
|
||||
self._preempt_by_recompute(seq_group)
|
||||
elif preemption_mode == PreemptionMode.SWAP:
|
||||
self._preempt_by_swap(seq_group, blocks_to_swap_out)
|
||||
else:
|
||||
raise AssertionError("Invalid preemption mode.")
|
||||
|
||||
def _preempt_by_recompute(
|
||||
self,
|
||||
seq_group: SequenceGroup,
|
||||
) -> None:
|
||||
seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)
|
||||
assert len(seqs) == 1
|
||||
for seq in seqs:
|
||||
seq.status = SequenceStatus.WAITING
|
||||
self.block_manager.free(seq)
|
||||
# NOTE: For FCFS, we insert the preempted sequence group to the front
|
||||
# of the waiting queue.
|
||||
self.waiting.appendleft(seq_group)
|
||||
|
||||
def _preempt_by_swap(
|
||||
self,
|
||||
seq_group: SequenceGroup,
|
||||
blocks_to_swap_out: Dict[int, int],
|
||||
) -> None:
|
||||
self._swap_out(seq_group, blocks_to_swap_out)
|
||||
self.swapped.append(seq_group)
|
||||
|
||||
def _swap_in(
|
||||
self,
|
||||
seq_group: SequenceGroup,
|
||||
blocks_to_swap_in: Dict[int, int],
|
||||
) -> None:
|
||||
mapping = self.block_manager.swap_in(seq_group)
|
||||
blocks_to_swap_in.update(mapping)
|
||||
for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
|
||||
seq.status = SequenceStatus.RUNNING
|
||||
|
||||
def _swap_out(
|
||||
self,
|
||||
seq_group: SequenceGroup,
|
||||
blocks_to_swap_out: Dict[int, int],
|
||||
) -> None:
|
||||
if not self.block_manager.can_swap_out(seq_group):
|
||||
# FIXME(woosuk): Abort the sequence group instead of aborting the
|
||||
# entire engine.
|
||||
raise RuntimeError(
|
||||
"Aborted due to the lack of CPU swap space. Please increase "
|
||||
"the swap space to avoid this error.")
|
||||
mapping = self.block_manager.swap_out(seq_group)
|
||||
blocks_to_swap_out.update(mapping)
|
||||
for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
|
||||
seq.status = SequenceStatus.SWAPPED
|
||||
Reference in New Issue
Block a user