Source code for mii.config

# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team
import os
import string
from typing import List, Optional, Union, Dict, Any, Literal

from deepspeed.launcher.runner import DLTS_HOSTFILE, fetch_hostfile
from deepspeed.inference import RaggedInferenceEngineConfig
from deepspeed.runtime.config_utils import DeepSpeedConfigModel
from pydantic import Field, model_validator, field_validator

from mii.constants import DeploymentType, TaskType, ModelProvider
from mii.errors import DeploymentNotFoundError
from mii.modeling.tokenizers import MIITokenizerWrapper
from mii.utils import generate_deployment_name, import_score_file

DEVICE_MAP_DEFAULT = "auto"


[docs]class GenerateParamsConfig(DeepSpeedConfigModel): """ Options for changing text-generation behavior. """ prompt_length: int """ Length of the input prompt. Autopopulated when creating requests, any user-provided values will be ignored.""" max_length: int = 1024 """ Maximum length of ``input_tokens`` + ``generated_tokens``. """ max_new_tokens: Optional[int] = None """ Maximum number of new tokens generated. ``max_length`` takes precedent. """ min_new_tokens: int = 0 """ Minimum number of new tokens generated. """ stream: bool = False """ Enable streaming output. """ ignore_eos: bool = False """ Ignore EoS token and continue generating text until we reach ``max_length`` or ``max_new_tokens``. """ return_full_text: bool = False """ Prepends the input prompt to the generated text. """ do_sample: bool = True """ When ``False``, do greedy sampling. """ top_p: float = Field(0.9, gt=0, le=1) """ Top P value. """ top_k: Optional[int] = Field(None, gt=0) """ Top K value. """ temperature: Optional[float] = Field(None, gt=0) """ Temperature value. """ stop: List[str] = [] """ List of strings to stop generation at.""" @field_validator("stop", mode="before") @classmethod def make_stop_string_list(cls, field_value: Union[str, List[str]]) -> List[str]: if isinstance(field_value, str): return [field_value] return field_value @field_validator("stop") @classmethod def sort_stop_strings(cls, field_value: List[str]) -> List[str]: return sorted(field_value) @model_validator(mode="after") def check_prompt_length(self) -> "GenerateParamsConfig": assert self.max_length > self.prompt_length, f"max_length ({self.max_length}) must be greater than prompt_length ({self.prompt_length})" return self @model_validator(mode="before") @classmethod def set_max_new_tokens(cls, values: Dict[str, Any]) -> Dict[str, Any]: max_length = values.get("max_length") max_new_tokens = values.get("max_new_tokens") prompt_length = values.get("prompt_length") if max_new_tokens is None: values["max_new_tokens"] = max_length - prompt_length return values
class ReplicaConfig(DeepSpeedConfigModel): hostname: str = "" tensor_parallel_ports: List[int] = [] torch_dist_port: Optional[int] = None gpu_indices: List[int] = [] zmq_port: Optional[int] = None
[docs]class ModelConfig(DeepSpeedConfigModel): model_name_or_path: str """ Model name or path of the model to HuggingFace model to be deployed. """ tokenizer: Optional[Union[str, MIITokenizerWrapper]] = None """ Tokenizer wrapped with `MIITokenizerWrapper`, name or path of the HuggingFace tokenizer to be used. """ task: Optional[TaskType] = TaskType.TEXT_GENERATION """ Name of the task to be performed by the model. """ tensor_parallel: int = int(os.getenv("WORLD_SIZE", "1")) """ Tensor parallelism to use for a model (i.e., how many GPUs to shard a model across). This defaults to the `WORLD_SIZE` environment variable, or a value of 1 if that variable is not set. This value is also propagated to the `inference_engine_config`. """ quantization_mode: Optional[str] = None """ The quantization mode in string format. The supported modes are as follows: - 'wf6af16', weight-only quantization with FP6 weight and FP16 activation. """ inference_engine_config: RaggedInferenceEngineConfig = {} """ DeepSpeed inference engine config. This is automatically generated, but you can provide a set of custom configs. """ torch_dist_port: int = 29500 """ Torch distributed port to be used. This also serves as a base port when multiple replicas are deployed. For example, if there are 2 replicas, the first will use port 29500 and the second will use port 29600. """ zmq_port_number: int = 25555 """ Port number to use for the ZMQ communication (for broadcasting requests and responses among all ranks in ragged batching). """ replica_num: int = Field(1, gt=0) """ Number of model replicas. Enables easy data parallelism. """ replica_configs: List[ReplicaConfig] = [] """ Configuration details for each replica. This will be automatically generated, but you can provide a set of custom configs. """ device_map: Union[Literal["auto"], Dict[str, List[List[int]]]] = DEVICE_MAP_DEFAULT """ GPU indices a model is deployed on. Note that CUDA_VISIBLE_DEVICES does not work with DeepSpeed-MII. """ max_length: Optional[int] = None """ The maximum number of tokens DeepSpeed-Inference can work with, including the input and output tokens. """ sync_debug: bool = False """ Inserts additional synchronization points for debugging purposes. """ profile_model_time: bool = False """ Log performance information about model inference with very little overhead. """ @property def provider(self) -> ModelProvider: return ModelProvider.HUGGING_FACE @field_validator("device_map", mode="before") @classmethod def make_device_map_dict(cls, v: Any) -> Dict: if isinstance(v, int): return {"localhost": [[v]]} if isinstance(v, list) and isinstance(v[0], int): return {"localhost": [v]} if isinstance(v, list) and isinstance(v[0], list): return {"localhost": v} return v @model_validator(mode="before") @classmethod def auto_fill_values(cls, values: Dict[str, Any]) -> Dict[str, Any]: assert values.get("model_name_or_path"), "model_name_or_path must be provided" if not values.get("tokenizer"): values["tokenizer"] = values.get("model_name_or_path") #if not values.get("task"): # values["task"] = get_default_task(values.get("model_name_or_path")) values["task"] = TaskType.TEXT_GENERATION return values @model_validator(mode="after") def propagate_tp_size(self) -> "ModelConfig": self.inference_engine_config.tensor_parallel.tp_size = self.tensor_parallel return self @model_validator(mode="after") def check_replica_config(self) -> "ModelConfig": num_replica_config = len(self.replica_configs) if num_replica_config > 0: assert num_replica_config == self.replica_num, "Number of replica configs must match replica_num" return self @model_validator(mode="after") def propagate_quantization_mode(self) -> "ModelConfig": self.inference_engine_config.quantization.quantization_mode = self.quantization_mode return self
[docs]class MIIConfig(DeepSpeedConfigModel): deployment_name: str = "" """ Name of the deployment. Used as an identifier for obtaining a inference server client and posting queries. Automatically generated if it is not provided. """ deployment_type: DeploymentType = DeploymentType.LOCAL """ One of the `enum mii.DeploymentTypes:` * `LOCAL` uses a grpc server to create a local deployment. * `AML` will generate the assets necessary to deploy on AML resources. """ model_conf: ModelConfig = Field(alias="model_config") """ Configuration for the deployed model(s). """ port_number: int = 50050 """ Port number to use for the load balancer process. """ enable_restful_api: bool = False """ Enables a RESTful API that can be queries with via http POST method. """ restful_api_host: str = "localhost" """ Hostname to use for the RESTful API. """ restful_api_port: int = 51080 """ Port number to use for the RESTful API. """ restful_processes: int = Field(32, ge=1) """ Number of processes to use for the RESTful API. """ hostfile: str = DLTS_HOSTFILE """ DeepSpeed hostfile. Will be autogenerated if None is provided. """ # TODO: Place AML-related configs in subconfig version: int = 1 """ Version number to pass to AML deployments. """ instance_type: str = "Standard_NC12s_v3" """ AML instance type to use when create AML deployment assets. """ @model_validator(mode="after") def AML_name_valid(self) -> "MIIConfig": if self.deployment_type == DeploymentType.AML: allowed_chars = set(string.ascii_lowercase + string.ascii_uppercase + string.digits + "-") assert ( set(self.deployment_name) <= allowed_chars ), "AML deployment names can only contain a-z, A-Z, 0-9, and '-'." return self @model_validator(mode="before") @classmethod def check_deployment_name(cls, values: Dict[str, Any]) -> Dict[str, Any]: deployment_name = values.get("deployment_name") if not deployment_name: model_name_or_path = values.get("model_config").model_name_or_path deployment_name = generate_deployment_name( model_name_or_path=model_name_or_path) values["deployment_name"] = deployment_name return values
[docs] def generate_replica_configs(self) -> None: if self.model_conf.replica_configs: return torch_dist_port = self.model_conf.torch_dist_port tensor_parallel = self.model_conf.tensor_parallel replica_pool = _allocate_devices(self.hostfile, tensor_parallel, self.model_conf.replica_num, self.model_conf.device_map) replica_configs = [] for i, (hostname, gpu_indices) in enumerate(replica_pool): # Reserver port for a LB proxy when replication is enabled port_offset = 1 base_port = self.port_number + i * tensor_parallel + port_offset tensor_parallel_ports = list(range(base_port, base_port + tensor_parallel)) replica_torch_dist_port = torch_dist_port + (100 * i) replica_configs.append( ReplicaConfig( hostname=hostname, tensor_parallel_ports=tensor_parallel_ports, torch_dist_port=replica_torch_dist_port, gpu_indices=gpu_indices, zmq_port=self.model_conf.zmq_port_number + i, )) self.model_conf.replica_configs = replica_configs
def _allocate_devices(hostfile_path: str, tensor_parallel: int, replica_num: int, device_map: Dict[str, List[List[int]]] = DEVICE_MAP_DEFAULT): resource_pool = fetch_hostfile(hostfile_path) assert ( resource_pool is not None and len(resource_pool) > 0 ), f"No hosts found in {hostfile_path}" # If no device map was provided, we generate one based on the resources we find in the hostfile if device_map == DEVICE_MAP_DEFAULT: device_map = {} filled_slots = 0 for host, slots in resource_pool.items(): slots_to_fill = min(slots // tensor_parallel, replica_num - filled_slots) filled_slots += slots_to_fill device_map[host] = [ list(range(i * tensor_parallel, (i + 1) * tensor_parallel)) for i in range(slots_to_fill) ] # Assert that we have the correct number of mappings device_map_slots = sum([len(slots_list) for slots_list in device_map.values()]) if device_map_slots < replica_num: raise ValueError( f"Only able to place {device_map_slots} replicas, but {replica_num} replicas were requested." ) if device_map_slots > replica_num: raise ValueError( f"Device map contains {device_map_slots} mappings, but only {replica_num} replicas were requested. There must be a 1:1 mapping." ) replica_pool = [] # Fill the available slots with replicas for host, slots_list in device_map.items(): if host not in resource_pool: raise ValueError(f"Host {host} not found in hostfile") for slots in slots_list: if len(slots) != tensor_parallel: raise ValueError( f"Number of devices must match tensor_parallel. Found {len(slots)} devices for host {host}, but tensor_parallel={tensor_parallel}" ) replica_pool.append((host, slots)) return replica_pool def get_mii_config(model_or_deployment_name: str) -> MIIConfig: """ Looks for score file of given model or deployment name, loads the file and returns the MIIConfig object. """ try: deployment_name = model_or_deployment_name mii_config = import_score_file(deployment_name, DeploymentType.LOCAL).mii_config except: # If a deployment name is not given, then one was generated # automatically from the model name, so try that try: deployment_name = generate_deployment_name( model_name_or_path=model_or_deployment_name) mii_config = import_score_file(deployment_name, DeploymentType.LOCAL).mii_config except: raise DeploymentNotFoundError( f"Could not find a deployment named {model_or_deployment_name} or {deployment_name}" ) return MIIConfig(**mii_config)