Source code for parallelformers.parallelize

# Copyright 2021 TUNiB inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import traceback
from contextlib import suppress
from dataclasses import _is_dataclass_instance

import torch
import torch.multiprocessing as mp
from dacite import Any, Config, Dict, from_dict
from torch import nn
from transformers.file_utils import ModelOutput

from parallelformers.parallel.process import ParallelProcess
from parallelformers.utils import rgetattr, rsetattr

[docs]class parallelize(object): """ Parallelformers end-point function Args: model (nn.Module): Huggingface pre-trained transformer model. fp16: (bool): whether use FP16 or not. num_gpus (int): number of GPU for parallelization. master_addr (str): master process address for process communication (default='') master_port (int): master process port for process communication (default=29500) backend (str): distributed backend (default='nccl') verbose (bool): logging current gpu states (one of ['detail', 'simple', None] init_method (str): multiprocess initialization method. (It is safe to set `init_method` to `spawn`.) daemon (bool): whether make process daemon or not (default=True) Notes: We want to use this object as a simple function rather than a class. So we broke the PEP8 rules and set class names that start with a lowercase letter. Examples: >>> # 1. Import Huggingface and Parallelformers modules >>> from transformers import AutoModelForCausalLM, AutoTokenizer >>> from parallelformers import parallelize >>> # 2. Create Huggingface model and tokenizer. >>> model = AutoModelForCausalLM.from_pretrained("EleutherAI/gpt-neo-2.7B") >>> tokenizer = AutoTokenizer.from_pretrained("EleutherAI/gpt-neo-2.7B") >>> # 3. Parallelize using Parallelformers. >>> parallelize(model, num_gpus=4, fp16=True) >>> # 4. Do inference as usual. >>> inputs = tokenizer("Parallelformers is", return_tensors="pt") >>> outputs = model.generate(**inputs, num_beams=5, no_repeat_ngram_size=4) >>> print(f"Output: {tokenizer.batch_decode(outputs)[0]}") 'Output: Parallelformers is an open-source library for parallel programming ...' """ def __init__( self, model: nn.Module, fp16: bool, num_gpus: int, custom_policies=None, master_addr: str = "", master_port: int = 29500, backend="nccl", verbose: str = None, init_method="spawn", daemon: bool = True, ): self.init_environments( num_gpus, master_addr, master_port, ) # Using fork often leads to deadlock. if mp.get_start_method() != init_method: with suppress(Exception): mp.set_start_method(init_method, force=True) for param in model.parameters(): param.requires_grad = False param.detach() self.preprocess_for_wav2vec(model) self.model = model.half() if fp16 else model self.model.eval() self.fp16 = fp16 self.num_gpus = num_gpus self.backend = backend self.daemon = daemon self.verbose = verbose self.custom_policies = custom_policies self.processes = [] self.parallel_mutexes = [] self.inference_mutexes = [] self.inputs_queues = [] self.outputs_queues = [] self.orig_methods = {} hijack_methods = ["generate", "forward", "to", "cpu", "cuda"] for attr in hijack_methods: if hasattr(self.model, attr): fn = getattr(self.model, attr) self.orig_methods[attr] = fn self.parallelize() for attr in hijack_methods: if hasattr(self.model, attr): self.register_hijack_methods(attr) self._memory_logger = ["memory_reserved", "memory_allocated", "memory_cached"] for attr in self._memory_logger: self.register_memory_methods(attr)
[docs] def preprocess_for_wav2vec(self, model: nn.Module) -> None: """ There is one missing parameter in the Huggingface Wav2Vec model, and the user loses control over this parameter, making parallelization impossible. To use multiprocessing, if `requries_grad` is `True`, it must be a leaf tensor, but this tensor (`conv.weight`) does not follow this rule, so multiprocessing becomes impossible. Therefore, we detach this parameter to enable multiprocessing. Args: model (nn.Module): model weight """ with suppress(Exception): from transformers.models.hubert.modeling_hubert import ( HubertPositionalConvEmbedding, ) from transformers.models.wav2vec2.modeling_wav2vec2 import ( Wav2Vec2PositionalConvEmbedding, ) for child in model.named_children(): layer_object = child[1] if layer_object.__class__ in [ Wav2Vec2PositionalConvEmbedding, HubertPositionalConvEmbedding, ]: detached_parameter = rgetattr(layer_object, "conv.weight").detach() rsetattr(layer_object, "conv.weight", detached_parameter) else: self.preprocess_for_wav2vec(layer_object)
[docs] def init_environments( self, num_gpus: int, master_addr: str, master_port: int, ) -> None: """ Initialize environment variables Args: num_gpus (int): number of GPU for parallelization. master_addr (str): master process address for process communication master_port (int): master process port for process communication """ os.environ["TOKENIZERS_PARALLELISM"] = "true" os.environ["MKL_SERVICE_FORCE_INTEL"] = "GNU" os.environ["MASTER_ADDR"] = str(master_addr) os.environ["MASTER_PORT"] = str(master_port) os.environ["WORLD_SIZE"] = str(num_gpus) os.environ["CUDA_VISIBLE_DEVICES"] = ", ".join( [str(i) for i in range(num_gpus)] )
[docs] def register_hijack_methods(self, method: str) -> None: """ Intercept the flow by changing some methods (e.g. forward, generate, ...) in the model to `self.hijack` methods. Args: method (str): name of method """ setattr( self.model, method, lambda *inputs, **kwargs: self.hijack( inputs=inputs, kwargs=kwargs, func=method, ), )
[docs] def register_memory_methods(self, method: str) -> None: """ Add several methods to check GPU occupancy status of models located in other process. Args: method (str): name of method """ setattr( self.model, method, lambda: self.hijack( inputs="dummy", kwargs={"dummy": "dummy"}, func=method, ), )
[docs] def deparallelize(self) -> None: """ Remove all methods registered in the model and join all GPU processes to main process. """ if hasattr(self, "orig_methods"): for k, v in self.orig_methods.items(): setattr(self.model, k, v) if hasattr(self, "_memory_logger"): for method in self._memory_logger: setattr(self.model, method, None) if hasattr(self, "processes"): for process in self.processes: process.join() torch.cuda.empty_cache()
[docs] @torch.no_grad() def parallelize(self) -> None: """Create processes for model parallelization and parallel inference""" try: for rank in range(self.num_gpus): parallel_mutex = mp.Event() inference_mutex = mp.Event() self.parallel_mutexes.append(parallel_mutex) self.inference_mutexes.append(inference_mutex) inputs_queue = mp.Queue() outputs_queue = mp.Queue() self.inputs_queues.append(inputs_queue) self.outputs_queues.append(outputs_queue) process = ParallelProcess( rank=rank, model=self.model, fp16=self.fp16, num_gpus=self.num_gpus, inputs_queue=inputs_queue, outputs_queue=outputs_queue, parallel_mutex=parallel_mutex, inference_mutex=inference_mutex, backend=self.backend, verbose=self.verbose, custom_policies=self.custom_policies, ) process.daemon = self.daemon # When the main process done, all processes should frees resources. # So default value is True, but change it according to your needs. process.start() self.processes.append(process) for p_mutex in self.parallel_mutexes: p_mutex.wait() except BaseException: traceback.print_exc() self.deparallelize()
[docs] @torch.no_grad() def hijack( self, inputs: Any, kwargs: Dict, func: str, ) -> Any: """ Transfers the input passed to the main process to another process, and transfers the output to the main process and outputs it to the user. Args: inputs (Any): inputs of model kwargs (Dict): arguments of model func (str): name of method that hijacked Returns: Any: outputs of model """ try: for i_mutex, i_queue in zip( self.inference_mutexes, self.inputs_queues, ): i_queue.put((inputs, kwargs, func)) i_mutex.set() # producer part if func in ["to", "cpu", "cuda"]: self.deparallelize() if func == "cpu": self.model = self.model.cpu(*inputs, **kwargs) elif func == "cuda": self.model = self.model.cuda(*inputs, **kwargs) else: self.model =*inputs, **kwargs) return self.model else: outputs = [] for o_queue in self.outputs_queues: output = o_queue.get() # consumer part if isinstance(output, ModelOutput) or _is_dataclass_instance( output ): _output = output elif isinstance(output, tuple) or isinstance(output, list): _output = [] for o in output: if torch.is_tensor(o): o = o.clone().to("cpu") _output.append(o) elif isinstance(output, dict): _output = {} for k, v in output.items(): if torch.is_tensor(v): v = v.clone().to("cpu") _output[k] = v else: _output = output if torch.is_tensor(output): _output = output.clone().to("cpu") del output outputs.append(_output) if func in self._memory_logger: final_output = dict(outputs) else: final_output = outputs[0] # non-picklable object to original dataclass if ( isinstance(final_output, dict) and "orig_dataclass_type" in final_output ): orig_dataclass_type = final_output["orig_dataclass_type"] del final_output["orig_dataclass_type"] final_output = from_dict( orig_dataclass_type, final_output, config=Config(check_types=False), ) return final_output except BaseException: traceback.print_exc() self.deparallelize()