Files
mr_v100-vllm/vllm/model_executor/layers/pooler.py
2025-09-15 14:58:11 +08:00

323 lines
11 KiB
Python

# SPDX-License-Identifier: Apache-2.0
from enum import IntEnum
from typing import List, Optional, Union
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import PretrainedConfig
from typing_extensions import assert_never
from vllm.config import PoolerConfig
from vllm.model_executor.pooling_metadata import (PoolingMetadata,
PoolingTensors)
from vllm.sequence import PoolerOutput, PoolingSequenceGroupOutput
from vllm.transformers_utils.config import (
get_cross_encoder_activation_function)
class PoolingType(IntEnum):
"""Enumeration for different types of pooling methods."""
LAST = 0
ALL = 1
CLS = 2
STEP = 3
MEAN = 4
class SimplePooler(nn.Module):
"""A layer that pools specific information from hidden states.
This layer does the following:
1. Extracts specific tokens or aggregates data based on pooling method.
2. Normalizes output if specified.
3. Returns structured results as `PoolerOutput`.
Attributes:
pooling_type: The type of pooling to use.
normalize: Whether to normalize the pooled data.
"""
@staticmethod
def from_pooling_type(
pooling_type: PoolingType,
*,
normalize: bool,
softmax: bool,
step_tag_id: Optional[int] = None,
returned_token_ids: Optional[List[int]] = None,
) -> "SimplePooler":
if pooling_type == PoolingType.LAST:
assert step_tag_id is None and returned_token_ids is None
return LastPool(normalize=normalize, softmax=softmax)
if pooling_type == PoolingType.ALL:
assert step_tag_id is None and returned_token_ids is None
return AllPool(normalize=normalize, softmax=softmax)
if pooling_type == PoolingType.CLS:
assert step_tag_id is None and returned_token_ids is None
return CLSPool(normalize=normalize, softmax=softmax)
if pooling_type == PoolingType.MEAN:
assert step_tag_id is None and returned_token_ids is None
return MeanPool(normalize=normalize, softmax=softmax)
if pooling_type == PoolingType.STEP:
return StepPool(normalize=normalize,
softmax=softmax,
step_tag_id=step_tag_id,
returned_token_ids=returned_token_ids)
assert_never(pooling_type)
def __init__(self, *, normalize: bool, softmax: bool) -> None:
super().__init__()
self.head = PoolerHead(normalize=normalize, softmax=softmax)
def get_prompt_lens(
self,
hidden_states: torch.Tensor,
pooling_metadata: PoolingMetadata,
) -> torch.Tensor:
return PoolingTensors.from_pooling_metadata(
pooling_metadata, hidden_states.device).prompt_lens
def extract_states(
self,
hidden_states: torch.Tensor,
pooling_metadata: PoolingMetadata,
) -> Union[list[torch.Tensor], torch.Tensor]:
raise NotImplementedError
def build_output(self, data: torch.Tensor) -> PoolingSequenceGroupOutput:
return PoolingSequenceGroupOutput(data)
def forward(
self,
hidden_states: torch.Tensor,
pooling_metadata: PoolingMetadata,
) -> PoolerOutput:
pooled_data = self.extract_states(hidden_states, pooling_metadata)
pooled_data = self.head(pooled_data)
pooled_outputs = [self.build_output(data) for data in pooled_data]
return PoolerOutput(outputs=pooled_outputs)
class CLSPool(SimplePooler):
def extract_states(
self,
hidden_states: torch.Tensor,
pooling_metadata: PoolingMetadata,
) -> Union[list[torch.Tensor], torch.Tensor]:
prompt_lens = self.get_prompt_lens(hidden_states, pooling_metadata)
first_token_flat_indices = torch.zeros_like(prompt_lens)
first_token_flat_indices[1:] += torch.cumsum(prompt_lens, dim=0)[:-1]
return hidden_states[first_token_flat_indices]
class LastPool(SimplePooler):
def extract_states(
self,
hidden_states: torch.Tensor,
pooling_metadata: PoolingMetadata,
) -> Union[list[torch.Tensor], torch.Tensor]:
prompt_lens = self.get_prompt_lens(hidden_states, pooling_metadata)
last_token_flat_indices = torch.cumsum(prompt_lens, dim=0) - 1
return hidden_states[last_token_flat_indices]
class AllPool(SimplePooler):
def extract_states(
self,
hidden_states: torch.Tensor,
pooling_metadata: PoolingMetadata,
) -> Union[list[torch.Tensor], torch.Tensor]:
prompt_lens = self.get_prompt_lens(hidden_states, pooling_metadata)
offset = 0
pooled_data = list[torch.Tensor]()
for prompt_len in prompt_lens:
pooled_data.append(hidden_states[offset:offset + prompt_len])
offset += prompt_len
return pooled_data
class MeanPool(SimplePooler):
def extract_states(
self,
hidden_states: torch.Tensor,
pooling_metadata: PoolingMetadata,
) -> Union[list[torch.Tensor], torch.Tensor]:
prompt_lens = self.get_prompt_lens(hidden_states, pooling_metadata)
cumsum = torch.cumsum(hidden_states, dim=0)
start_indices = torch.cat([
torch.tensor([0], device=hidden_states.device),
torch.cumsum(prompt_lens[:-1], dim=0)
])
end_indices = torch.cumsum(prompt_lens, dim=0)
return (cumsum[end_indices - 1] - cumsum[start_indices] +
hidden_states[start_indices]) / prompt_lens.unsqueeze(1)
class StepPool(SimplePooler):
def __init__(
self,
*,
normalize: bool,
softmax: bool,
step_tag_id: Optional[int] = None,
returned_token_ids: Optional[List[int]] = None,
):
super().__init__(normalize=normalize, softmax=softmax)
self.step_tag_id = step_tag_id
self.returned_token_ids = returned_token_ids
def extract_states(
self,
hidden_states: torch.Tensor,
pooling_metadata: PoolingMetadata,
) -> Union[list[torch.Tensor], torch.Tensor]:
prompt_lens = self.get_prompt_lens(hidden_states, pooling_metadata)
returned_token_ids = self.returned_token_ids
if returned_token_ids is not None and len(returned_token_ids) > 0:
hidden_states = hidden_states[:, returned_token_ids]
step_tag_id = self.step_tag_id
offset = 0
pooled_data = list[torch.Tensor]()
for prompt_len, seq_data_i in zip(prompt_lens,
pooling_metadata.seq_data.values()):
pooled_data_i = hidden_states[offset:offset + prompt_len]
if step_tag_id is not None:
token_ids = torch.tensor(seq_data_i.prompt_token_ids)
pooled_data_i = pooled_data_i[token_ids == step_tag_id]
offset += prompt_len
pooled_data.append(pooled_data_i)
return pooled_data
class PoolerHead(nn.Module):
def __init__(self, *, normalize: bool, softmax: bool) -> None:
super().__init__()
self.normalize = normalize
self.softmax = softmax
def forward(self, pooled_data: Union[list[torch.Tensor], torch.Tensor]):
if self.normalize:
if isinstance(pooled_data, list):
pooled_data = [
F.normalize(data, p=2, dim=1) for data in pooled_data
]
else:
pooled_data = F.normalize(pooled_data, p=2, dim=1)
if self.softmax:
if isinstance(pooled_data, list):
pooled_data = [F.softmax(data, dim=-1) for data in pooled_data]
else:
pooled_data = F.softmax(pooled_data, dim=-1)
return pooled_data
class Pooler(nn.Module):
@classmethod
def from_config_with_defaults(
cls,
pooler_config: PoolerConfig,
pooling_type: PoolingType,
normalize: bool,
softmax: bool,
step_tag_id: Optional[int] = None,
returned_token_ids: Optional[List[int]] = None,
) -> SimplePooler:
return SimplePooler.from_pooling_type(
pooling_type=PoolingType[pooler_config.pooling_type]
if pooler_config.pooling_type is not None else pooling_type,
normalize=pooler_config.normalize
if pooler_config.normalize is not None else normalize,
softmax=pooler_config.softmax
if pooler_config.softmax is not None else softmax,
step_tag_id=pooler_config.step_tag_id
if pooler_config.step_tag_id is not None else step_tag_id,
returned_token_ids=pooler_config.returned_token_ids
if pooler_config.returned_token_ids is not None else
returned_token_ids,
)
class CrossEncodingPooler(nn.Module):
"""A layer that pools specific information from hidden states.
This layer does the following:
1. Extracts specific tokens or aggregates data based on pooling method.
2. Normalizes output if specified.
3. Returns structured results as `PoolerOutput`.
Attributes:
pooling_type: The type of pooling to use.
normalize: Whether to normalize the pooled data.
"""
def __init__(
self,
config: PretrainedConfig,
classifier: nn.Module,
pooler: Optional[nn.Module] = None,
):
super().__init__()
self.classifier = classifier
self.pooler = pooler
self.default_activation_function = \
get_cross_encoder_activation_function(config)
def forward(
self,
hidden_states: torch.Tensor,
pooling_metadata: PoolingMetadata,
) -> PoolerOutput:
"""Pools sentence pair scores from the hidden_states."""
prompt_lens = PoolingTensors.from_pooling_metadata(
pooling_metadata, hidden_states.device).prompt_lens
offset = 0
pooled_data_lst = []
for prompt_len in prompt_lens:
pooled_data_i = hidden_states[offset:offset + prompt_len]
if self.pooler is not None:
final_shape_tensor = self.pooler(pooled_data_i)
else:
final_shape_tensor = self.classifier(pooled_data_i)
pooled_data_lst.append(final_shape_tensor)
offset += prompt_len
pooled_output = torch.stack(pooled_data_lst)
if self.pooler is not None:
# apply classifier once on the full batch if possible
pooled_output = self.classifier(pooled_output)
scores = self.default_activation_function(pooled_output).squeeze(-1)
pooled_outputs = [PoolingSequenceGroupOutput(data) for data in scores]
return PoolerOutput(outputs=pooled_outputs)