Some checks failed
Self-hosted runner (nightly-past-ci-caller) / Get number (push) Has been cancelled
Self-hosted runner (nightly-past-ci-caller) / TensorFlow 2.11 (push) Has been cancelled
Self-hosted runner (nightly-past-ci-caller) / TensorFlow 2.10 (push) Has been cancelled
Self-hosted runner (nightly-past-ci-caller) / TensorFlow 2.9 (push) Has been cancelled
Self-hosted runner (nightly-past-ci-caller) / TensorFlow 2.8 (push) Has been cancelled
Self-hosted runner (nightly-past-ci-caller) / TensorFlow 2.7 (push) Has been cancelled
Self-hosted runner (nightly-past-ci-caller) / TensorFlow 2.6 (push) Has been cancelled
Self-hosted runner (nightly-past-ci-caller) / TensorFlow 2.5 (push) Has been cancelled
Self-hosted runner (benchmark) / Benchmark (aws-g5-4xlarge-cache) (push) Has been cancelled
Build documentation / build (push) Has been cancelled
Build documentation / build_other_lang (push) Has been cancelled
CodeQL Security Analysis / CodeQL Analysis (push) Has been cancelled
New model PR merged notification / Notify new model (push) Has been cancelled
PR CI / pr-ci (push) Has been cancelled
Slow tests on important models (on Push - A10) / Get all modified files (push) Has been cancelled
Secret Leaks / trufflehog (push) Has been cancelled
Update Transformers metadata / build_and_package (push) Has been cancelled
Slow tests on important models (on Push - A10) / Model CI (push) Has been cancelled
Check Tiny Models / Check tiny models (push) Has been cancelled
Self-hosted runner (Intel Gaudi3 scheduled CI caller) / Model CI (push) Has been cancelled
Self-hosted runner (Intel Gaudi3 scheduled CI caller) / Pipeline CI (push) Has been cancelled
Self-hosted runner (Intel Gaudi3 scheduled CI caller) / Example CI (push) Has been cancelled
Self-hosted runner (Intel Gaudi3 scheduled CI caller) / DeepSpeed CI (push) Has been cancelled
Self-hosted runner (Intel Gaudi3 scheduled CI caller) / Trainer/FSDP CI (push) Has been cancelled
Nvidia CI - Flash Attn / Setup (push) Has been cancelled
Nvidia CI - Flash Attn / Model CI (push) Has been cancelled
Nvidia CI / Setup (push) Has been cancelled
Nvidia CI / Model CI (push) Has been cancelled
Nvidia CI / Torch pipeline CI (push) Has been cancelled
Nvidia CI / Example CI (push) Has been cancelled
Nvidia CI / Trainer/FSDP CI (push) Has been cancelled
Nvidia CI / DeepSpeed CI (push) Has been cancelled
Nvidia CI / Quantization CI (push) Has been cancelled
Nvidia CI / Kernels CI (push) Has been cancelled
Doctests / Setup (push) Has been cancelled
Doctests / Call doctest jobs (push) Has been cancelled
Doctests / Send results to webhook (push) Has been cancelled
Extras Smoke Test / Get supported Python versions (push) Has been cancelled
Extras Smoke Test / Test extras on Python ${{ matrix.python-version }} (push) Has been cancelled
Extras Smoke Test / Check Slack token availability (push) Has been cancelled
Extras Smoke Test / Notify failures to Slack (push) Has been cancelled
Self-hosted runner (AMD scheduled CI caller) / Trigger Scheduled AMD CI (push) Has been cancelled
Stale Bot / Close Stale Issues (push) Has been cancelled
181 lines
8.0 KiB
Python
181 lines
8.0 KiB
Python
# Copyright 2020 The HuggingFace Team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Shared constants, helpers, and reusable test logic for distributed trainer tests.
|
|
|
|
This module provides:
|
|
- Path constants for test scripts and accelerate configs.
|
|
- ``TrainerDistributedCommon``, an abstract base class that contains reusable
|
|
test scenarios (training, mixed-precision, gradient accumulation, checkpoint
|
|
resume, evaluation). Framework-specific test files (DDP, FSDP, DeepSpeed)
|
|
subclass it and wire each scenario to parameterized test methods.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from abc import ABC, abstractmethod
|
|
|
|
from transformers import is_torch_available
|
|
from transformers.testing_utils import execute_subprocess_async
|
|
from transformers.trainer_callback import TrainerState
|
|
from transformers.trainer_utils import get_last_checkpoint
|
|
|
|
|
|
if is_torch_available():
|
|
import torch
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Path constants
|
|
# ---------------------------------------------------------------------------
|
|
DISTRIBUTED_DIR = os.path.dirname(__file__)
|
|
CONFIGS_DIR = os.path.join(DISTRIBUTED_DIR, "accelerate_configs")
|
|
SCRIPTS_DIR = os.path.join(DISTRIBUTED_DIR, "scripts")
|
|
TRAIN_SCRIPT = os.path.join(SCRIPTS_DIR, "train.py")
|
|
|
|
|
|
class TrainerDistributedCommon(ABC):
|
|
"""Reusable test scenarios shared across DDP, FSDP, and DeepSpeed.
|
|
|
|
Subclasses must:
|
|
1. Implement ``get_accelerate_cmd`` to build the launch command.
|
|
2. Define the following test methods (parameterized as needed)::
|
|
|
|
test_training → self.check_training(dtype, ...)
|
|
test_training_mixed_precision → self.check_mixed_precision(dtype, ...)
|
|
test_training_with_gradient_accumulation → self.check_gradient_accumulation(...)
|
|
test_training_and_can_resume_normally → self.check_resume(...)
|
|
test_eval → self.check_eval(...)
|
|
|
|
These test methods can't be defined here as ``@abstractmethod`` because
|
|
``@parameterized.expand`` removes the original method name from the
|
|
subclass, which would cause ABC to raise ``TypeError`` at instantiation.
|
|
"""
|
|
|
|
@abstractmethod
|
|
def get_accelerate_cmd(self, script, config_file, launch_args=None, script_args=None, **kwargs):
|
|
"""Build the full ``accelerate launch`` command list.
|
|
|
|
Args:
|
|
script: Path to the Python script to run.
|
|
config_file: Path to the accelerate YAML config (always required).
|
|
launch_args: Extra flags inserted *before* the script
|
|
(e.g. ``--fsdp_sharding_strategy``, ``--offload_optimizer_device``).
|
|
script_args: Extra flags appended *after* the script
|
|
(e.g. ``--output_dir``, ``--bf16``).
|
|
**kwargs: Framework-specific overrides (e.g. ``num_processes``).
|
|
"""
|
|
...
|
|
|
|
# -------------------------------------------------------------------
|
|
# Helpers
|
|
# -------------------------------------------------------------------
|
|
def _get_default_script_args(self, output_dir, num_epochs=1, logging_steps=5, save_steps=None):
|
|
"""Build the baseline CLI arguments shared by all training runs."""
|
|
args = [
|
|
"--output_dir",
|
|
output_dir,
|
|
"--num_train_epochs",
|
|
str(num_epochs),
|
|
"--logging_steps",
|
|
str(logging_steps),
|
|
"--per_device_train_batch_size",
|
|
"4",
|
|
"--learning_rate",
|
|
"5e-5",
|
|
]
|
|
if save_steps is not None:
|
|
args += ["--save_steps", str(save_steps)]
|
|
else:
|
|
args += ["--save_strategy", "no"]
|
|
return args
|
|
|
|
def _train_and_get_log_history(self, cmd, output_dir):
|
|
"""Run a training command and return the log history from the last checkpoint."""
|
|
execute_subprocess_async(cmd, env=self.get_env())
|
|
checkpoint = get_last_checkpoint(output_dir)
|
|
state_file = os.path.join(checkpoint, "trainer_state.json")
|
|
return TrainerState.load_from_json(state_file).log_history
|
|
|
|
# -------------------------------------------------------------------
|
|
# Reusable test scenarios — called from subclass test methods
|
|
# -------------------------------------------------------------------
|
|
def check_training(self, dtype="bf16", **cmd_kwargs):
|
|
"""Verify that training completes with the model loaded in *dtype* (no mixed precision)."""
|
|
output_dir = self.get_auto_remove_tmp_dir()
|
|
args = self._get_default_script_args(output_dir) + ["--model_dtype", dtype]
|
|
execute_subprocess_async(
|
|
self.get_accelerate_cmd(TRAIN_SCRIPT, script_args=args, **cmd_kwargs),
|
|
env=self.get_env(),
|
|
)
|
|
|
|
def check_mixed_precision(self, dtype="bf16", **cmd_kwargs):
|
|
"""Verify mixed-precision training: model in fp32, compute in *dtype*."""
|
|
output_dir = self.get_auto_remove_tmp_dir()
|
|
args = self._get_default_script_args(output_dir) + ["--model_dtype", "fp32", f"--{dtype}"]
|
|
# fp16 requires a non-fused optimizer to avoid nan losses on small models
|
|
if dtype == "fp16":
|
|
args += ["--optim", "adamw_torch"]
|
|
execute_subprocess_async(
|
|
self.get_accelerate_cmd(TRAIN_SCRIPT, script_args=args, **cmd_kwargs),
|
|
env=self.get_env(),
|
|
)
|
|
|
|
def check_gradient_accumulation(self, **cmd_kwargs):
|
|
"""Verify that training with gradient accumulation completes without error."""
|
|
output_dir = self.get_auto_remove_tmp_dir()
|
|
args = self._get_default_script_args(output_dir) + ["--bf16", "--gradient_accumulation_steps", "2"]
|
|
execute_subprocess_async(
|
|
self.get_accelerate_cmd(TRAIN_SCRIPT, script_args=args, **cmd_kwargs),
|
|
env=self.get_env(),
|
|
)
|
|
|
|
def check_resume(self, **cmd_kwargs):
|
|
"""Verify that training can resume from a checkpoint with consistent learning rates."""
|
|
output_dir = self.get_auto_remove_tmp_dir()
|
|
args = self._get_default_script_args(output_dir, num_epochs=2, logging_steps=2, save_steps=2) + ["--bf16"]
|
|
|
|
original_logs = self._train_and_get_log_history(
|
|
self.get_accelerate_cmd(TRAIN_SCRIPT, script_args=args, **cmd_kwargs),
|
|
output_dir,
|
|
)
|
|
|
|
checkpoint = os.path.join(output_dir, "checkpoint-2")
|
|
self.assertTrue(os.path.isdir(checkpoint), f"Checkpoint dir not found: {checkpoint}")
|
|
|
|
resume_args = args + ["--resume_from_checkpoint", checkpoint]
|
|
resumed_logs = self._train_and_get_log_history(
|
|
self.get_accelerate_cmd(TRAIN_SCRIPT, script_args=resume_args, **cmd_kwargs),
|
|
output_dir,
|
|
)
|
|
|
|
for original, resumed in zip(original_logs, resumed_logs):
|
|
if "learning_rate" in original:
|
|
self.assertAlmostEqual(original["learning_rate"], resumed["learning_rate"], delta=1e-5)
|
|
|
|
def check_eval(self, **cmd_kwargs):
|
|
"""Verify that evaluation produces a finite eval loss."""
|
|
output_dir = self.get_auto_remove_tmp_dir()
|
|
eval_output = os.path.join(output_dir, "eval_metrics.json")
|
|
args = self._get_default_script_args(output_dir) + ["--do_eval", "--eval_output_file", eval_output]
|
|
execute_subprocess_async(
|
|
self.get_accelerate_cmd(TRAIN_SCRIPT, script_args=args, **cmd_kwargs),
|
|
env=self.get_env(),
|
|
)
|
|
|
|
with open(eval_output) as f:
|
|
eval_metrics = json.load(f)
|
|
self.assertIn("eval_loss", eval_metrics)
|
|
self.assertTrue(torch.isfinite(torch.tensor(eval_metrics["eval_loss"])))
|