Support qwen2 vl model (#1721)

Co-authored-by: yizhang2077 <1109276519@qq.com>
Co-authored-by: ispobock <ISPObaoke@163.com>
This commit is contained in:
Yineng Zhang
2024-10-19 21:44:38 -07:00
committed by GitHub
parent 8bee20f80b
commit cbbc82b7b8
15 changed files with 1310 additions and 9 deletions

View File

@@ -177,10 +177,127 @@ class LlavaImageProcessor(BaseImageProcessor):
}
class Qwen2VLImageProcessor(BaseImageProcessor):
def __init__(self, hf_config, server_args, _image_processor):
self.hf_config = hf_config
self._image_processor = _image_processor
self.executor = concurrent.futures.ProcessPoolExecutor(
initializer=init_global_processor,
mp_context=mp.get_context("fork"),
initargs=(server_args,),
max_workers=os.environ.get("SGLANG_CPU_COUNT", os.cpu_count()),
)
@staticmethod
def _process_single_image_task(
image_data: Union[str, bytes],
image_processor=None,
):
image_processor = image_processor or global_processor.image_processor
try:
image, image_size = load_image(image_data)
if image_size is not None:
# It is a video with multiple images
image_hash = hash(image_data)
process_result = image_processor(image)
pixel_values, image_grid_thws = (
process_result["pixel_values"],
process_result["image_grid_thw"][0],
)
for _ in range(len(pixel_values)):
pixel_values[_] = pixel_values[_].astype(np.float16)
pixel_values = np.stack(pixel_values, axis=0)
image_grid_thws = np.stack(image_grid_thws, axis=0)
return pixel_values, image_hash, image_size, image_grid_thws
else:
# It is an image
image_hash = hash(image_data)
process_result = image_processor(image)
pixel_values, image_grid_thws = (
process_result["pixel_values"],
process_result["image_grid_thw"][0],
)
if isinstance(pixel_values, np.ndarray):
pixel_values = pixel_values.astype(np.float16)
return pixel_values, image_hash, image.size, image_grid_thws
except Exception:
logger.error("Exception in TokenizerManager:\n" + get_exception_traceback())
async def _process_single_image(self, image_data: Union[bytes, str]):
if self.executor is not None:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
Qwen2VLImageProcessor._process_single_image_task,
image_data,
)
else:
return self._process_single_image_task(image_data)
async def process_images_async(
self, image_data: List[Union[str, bytes]], request_obj
):
if not image_data:
return None
if isinstance(image_data, list) and len(image_data) > 0:
# Multiple images
if len(image_data) > 1:
pixel_values, image_hashes, image_sizes, image_grid_thws = (
[],
[],
[],
[],
)
res = []
for img_data in image_data:
res.append(self._process_single_image(img_data))
res = await asyncio.gather(*res)
for pixel_v, image_h, image_s, image_thw in res:
pixel_values.append(pixel_v)
image_hashes.append(image_h)
image_sizes.append(image_s)
image_grid_thws.append(image_thw)
if isinstance(pixel_values[0], np.ndarray):
pixel_values = np.concatenate(pixel_values, axis=0)
else:
# A single image
pixel_values, image_hash, image_size, image_grid_thw = (
await self._process_single_image(image_data[0])
)
image_hashes = [image_hash]
image_sizes = [image_size]
image_grid_thws = [image_grid_thw]
elif isinstance(image_data, str):
# A single image
pixel_values, image_hash, image_size, image_grid_thw = (
await self._process_single_image(image_data)
)
image_hashes = [image_hash]
image_sizes = [image_size]
image_grid_thws = [image_grid_thw]
else:
raise ValueError(f"Invalid image data: {image_data}")
return {
"pixel_values": pixel_values,
"image_hashes": image_hashes,
"image_sizes": image_sizes,
"modalities": request_obj.modalities,
"image_grid_thws": image_grid_thws,
}
def get_image_processor(
hf_config, server_args: ServerArgs, _image_processor
) -> BaseImageProcessor:
return LlavaImageProcessor(hf_config, server_args, _image_processor)
if "Qwen2VLForConditionalGeneration" in hf_config.architectures:
return Qwen2VLImageProcessor(hf_config, server_args, _image_processor)
else:
return LlavaImageProcessor(hf_config, server_args, _image_processor)
def get_dummy_image_processor():

View File

@@ -128,6 +128,8 @@ class ImageInputs:
image_embeds: Optional[List[torch.Tensor]] = None
aspect_ratio_ids: Optional[List[torch.Tensor]] = None
aspect_ratio_mask: Optional[List[torch.Tensor]] = None
# QWen2-VL related
image_grid_thws: List[Tuple[int, int, int]] = None
@staticmethod
def from_dict(obj, vocab_size):
@@ -135,6 +137,7 @@ class ImageInputs:
ret = ImageInputs(
pixel_values=obj["pixel_values"],
image_hash=hash(tuple(obj["image_hashes"])),
image_grid_thws=obj.get("image_grid_thws"),
)
image_hash = ret.image_hash
ret.pad_values = [
@@ -236,6 +239,9 @@ class Req:
self.regex_fsm_state: int = 0
self.jump_forward_map: JumpForwardMap = None
# For Qwen2-VL
self.mrope_position_delta = [] # use mutable object
# whether request reached finished condition
def finished(self) -> bool:
return self.finished_reason is not None
@@ -854,6 +860,8 @@ class ScheduleBatch:
global bid
bid += 1
mrope_positions_delta = [req.mrope_position_delta for req in self.reqs]
return ModelWorkerBatch(
bid=bid,
forward_mode=self.forward_mode,
@@ -869,6 +877,7 @@ class ScheduleBatch:
image_inputs=image_inputs,
lora_paths=lora_paths,
sampling_info=self.sampling_info,
mrope_positions_delta=mrope_positions_delta,
)
def copy(self):
@@ -920,6 +929,9 @@ class ModelWorkerBatch:
# Sampling info
sampling_info: SamplingBatchInfo
# For Qwen2-VL
mrope_positions_delta: List[List[int]]
def copy(self):
return ModelWorkerBatch(
bid=self.bid,
@@ -936,4 +948,5 @@ class ModelWorkerBatch:
image_inputs=self.image_inputs,
lora_paths=self.lora_paths,
sampling_info=self.sampling_info.copy(),
mrope_positions_delta=self.mrope_positions_delta,
)