Source code for kiwi.lib.train

#  OpenKiwi: Open-Source Machine Translation Quality Estimation
#  Copyright (C) 2019 Unbabel <openkiwi@unbabel.com>
#
#  This program is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Affero General Public License as published
#  by the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Affero General Public License for more details.
#
#  You should have received a copy of the GNU Affero General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
#

import logging
from pathlib import Path
from pprint import pformat

import torch

from kiwi import constants as const
from kiwi.cli.pipelines.train import build_parser
from kiwi.data import builders, utils
from kiwi.data.iterators import build_bucket_iterator
from kiwi.data.utils import (
    save_training_datasets,
    save_vocabularies_from_datasets,
)
from kiwi.lib.utils import (
    configure_logging,
    configure_seed,
    merge_namespaces,
    save_args_to_file,
    setup_output_directory,
)
from kiwi.loggers import tracking_logger
from kiwi.models.linear_word_qe_classifier import LinearWordQEClassifier
from kiwi.models.model import Model
from kiwi.trainers.callbacks import Checkpoint
from kiwi.trainers.linear_word_qe_trainer import LinearWordQETrainer
from kiwi.trainers.trainer import Trainer
from kiwi.trainers.utils import optimizer_class

logger = logging.getLogger(__name__)


[docs]class TrainRunInfo: """ Encapsulates relevant information on training runs. Can be instantiated with a trainer object. Attributes: stats: Stats of the best model so far model_path: Path of the best model so far run_uuid: Unique identifier of the current run """ def __init__(self, trainer): # FIXME: linear trainer not yet supported here # (no full support to checkpointer) self.stats = trainer.checkpointer.best_stats() self.model_path = trainer.checkpointer.best_model_path() self.run_uuid = tracking_logger.run_uuid
[docs]def train_from_file(filename): """ Loads options from a config file and calls the training procedure. Args: filename (str): filename of the configuration file """ parser = build_parser() options = parser.parse_config_file(filename) return train_from_options(options)
[docs]def train_from_options(options): """ Runs the entire training pipeline using the configuration options received. These options include the pipeline and model options plus the model's API. Args: options (Namespace): All the configuration options retrieved from either a config file or input flags and the model being used. """ if options is None: return pipeline_options = options.pipeline model_options = options.model ModelClass = options.model_api tracking_run = tracking_logger.configure( run_uuid=pipeline_options.run_uuid, experiment_name=pipeline_options.experiment_name, run_name=pipeline_options.run_name, tracking_uri=pipeline_options.mlflow_tracking_uri, always_log_artifacts=pipeline_options.mlflow_always_log_artifacts, ) with tracking_run: output_dir = setup( output_dir=pipeline_options.output_dir, seed=pipeline_options.seed, gpu_id=pipeline_options.gpu_id, debug=pipeline_options.debug, quiet=pipeline_options.quiet, ) all_options = merge_namespaces(pipeline_options, model_options) log( output_dir, config_options=vars(all_options), save_config=pipeline_options.save_config, ) trainer = run(ModelClass, output_dir, pipeline_options, model_options) train_info = TrainRunInfo(trainer) teardown(pipeline_options) return train_info
[docs]def run(ModelClass, output_dir, pipeline_options, model_options): """ Implements the main logic of the training module. Instantiates the dataset, model class and sets their attributes according to the pipeline options received. Loads or creates a trainer and runs it. Args: ModelClass (Model): Python Type of the Model to train output_dir: Directory to save models pipeline_options (Namespace): Generic Train Options load_model: load pre-trained predictor model resume: load trainer state and resume training gpu_id: Set to non-negative integer to train on GPU train_batch_size: Batch Size for training valid_batch_size: Batch size for validation model_options(Namespace): Model Specific options Returns: The trainer object """ model_name = getattr(ModelClass, "title", ModelClass.__name__) logger.info("Training the {} model".format(model_name)) # FIXME: make sure all places use output_dir # del pipeline_options.output_dir pipeline_options.output_dir = None # Data step fieldset = ModelClass.fieldset( wmt18_format=model_options.__dict__.get("wmt18_format") ) datasets = retrieve_datasets( fieldset, pipeline_options, model_options, output_dir ) save_vocabularies_from_datasets(output_dir, *datasets) if pipeline_options.save_data: save_training_datasets(pipeline_options.save_data, *datasets) # Trainer step device_id = None if pipeline_options.gpu_id is not None and pipeline_options.gpu_id >= 0: device_id = pipeline_options.gpu_id vocabs = utils.fields_to_vocabs(datasets[0].fields) trainer = retrieve_trainer( ModelClass, pipeline_options, model_options, vocabs, output_dir, device_id, ) logger.info(str(trainer.model)) logger.info("{} parameters".format(trainer.model.num_parameters())) tracking_logger.log_param( "model_parameters", trainer.model.num_parameters() ) # Dataset iterators train_iter = build_bucket_iterator( datasets[0], batch_size=pipeline_options.train_batch_size, is_train=True, device=device_id, ) valid_iter = build_bucket_iterator( datasets[1], batch_size=pipeline_options.valid_batch_size, is_train=False, device=device_id, ) trainer.run(train_iter, valid_iter, epochs=pipeline_options.epochs) return trainer
[docs]def retrieve_trainer( ModelClass, pipeline_options, model_options, vocabs, output_dir, device_id ): """ Creates a Trainer object with an associated model. This object encapsulates the logic behind training the model and checkpointing. This method uses the received pipeline options to instantiate a Trainer object with the the requested model and hyperparameters. Args: ModelClass pipeline_options (Namespace): Generic training options resume (bool): Set to true if resuming an existing run. load_model (str): Directory containing model.torch for loading pre-created model. checkpoint_save (bool): Boolean indicating if snapshots should be saved after validation runs. warning: if false, will never save the model. checkpoint_keep_only_best (int): Indicates kiwi to keep the best `n` models. checkpoint_early_stop_patience (int): Stops training if metrics don't improve after `n` validation runs. checkpoint_validation_steps (int): Perform validation every `n` training steps. optimizer (string): The optimizer to be used in training. learning_rate (float): Starting learning rate. learning_rate_decay (float): Factor of learning rate decay. learning_rate_decay_start (int): Start decay after epoch `x`. log_interval (int): Log after `k` batches. model_options (Namespace): Model specific options. vocabs (dict): Vocab dictionary. output_dir (str or Path): Output directory for models and stats concerning training. device_id (int): The gpu id to be used in training. Set to negative to use cpu. Returns: Trainer """ if pipeline_options.resume: return Trainer.resume(local_path=output_dir, device_id=device_id) if pipeline_options.load_model: model = Model.create_from_file(pipeline_options.load_model) else: model = ModelClass.from_options(vocabs=vocabs, opts=model_options) checkpointer = Checkpoint( output_dir, pipeline_options.checkpoint_save, pipeline_options.checkpoint_keep_only_best, pipeline_options.checkpoint_early_stop_patience, pipeline_options.checkpoint_validation_steps, ) if isinstance(model, LinearWordQEClassifier): trainer = LinearWordQETrainer( model, model_options.training_algorithm, model_options.regularization_constant, checkpointer, ) else: # Set GPU or CPU; has to be before instantiating the optimizer model.to(device_id) # Optimizer OptimizerClass = optimizer_class(pipeline_options.optimizer) optimizer = OptimizerClass( model.parameters(), lr=pipeline_options.learning_rate ) scheduler = None if 0.0 < pipeline_options.learning_rate_decay < 1.0: scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, factor=pipeline_options.learning_rate_decay, patience=pipeline_options.learning_rate_decay_start, verbose=True, mode="max", ) trainer = Trainer( model, optimizer, checkpointer, log_interval=pipeline_options.log_interval, scheduler=scheduler, ) return trainer
[docs]def retrieve_datasets(fieldset, pipeline_options, model_options, output_dir): """ Creates `Dataset` objects for the training and validation sets. Parses files according to pipeline and model options. Args: fieldset pipeline_options (Namespace): Generic training options load_data (str): Input directory for loading preprocessed data files. load_model (str): Directory containing model.torch for loading pre-created model. resume (boolean): Indicates if you should resume training from a previous run. load_vocab (str): Directory containing vocab.torch file to be loaded. model_options (Namespace): Model specific options. output_dir (str): Path to directory where experiment files should be saved. Returns: datasets (Dataset): Training and validation datasets """ if pipeline_options.load_data: datasets = utils.load_training_datasets( pipeline_options.load_data, fieldset ) else: load_vocab = None if pipeline_options.resume: load_vocab = Path(output_dir, const.VOCAB_FILE) elif pipeline_options.load_model: load_vocab = pipeline_options.load_model elif model_options.__dict__.get("load_pred_source"): load_vocab = model_options.load_pred_source elif model_options.__dict__.get("load_pred_target"): load_vocab = model_options.load_pred_target elif pipeline_options.load_vocab: load_vocab = pipeline_options.load_vocab datasets = builders.build_training_datasets( fieldset, load_vocab=load_vocab, **vars(model_options) ) return datasets
[docs]def setup(output_dir, seed=42, gpu_id=None, debug=False, quiet=False): """ Analyzes pipeline options and sets up requirements for running the training pipeline. This includes setting up the output directory, random seeds and the device(s) where training is run. Args: output_dir: Path to directory to use or None, in which case one is created automatically. seed (int): Random seed for all random engines (Python, PyTorch, NumPy). gpu_id (int): GPU number to use or `None` to use the CPU. debug (bool): Whether to increase the verbosity of output messages. quiet (bool): Whether to decrease the verbosity of output messages. Takes precedence over `debug`. Returns: output_dir(str): Path to output directory """ output_dir = setup_output_directory( output_dir, tracking_logger.run_uuid, tracking_logger.experiment_id, create=True, ) configure_logging(output_dir=output_dir, debug=debug, quiet=quiet) configure_seed(seed) logging.info("This is run ID: {}".format(tracking_logger.run_uuid)) logging.info( "Inside experiment ID: {} ({})".format( tracking_logger.experiment_id, tracking_logger.experiment_name ) ) logging.info("Local output directory is: {}".format(output_dir)) logging.info( "Logging execution to MLflow at: {}".format( tracking_logger.get_tracking_uri() ) ) if gpu_id is not None and gpu_id >= 0: torch.cuda.set_device(gpu_id) logging.info("Using GPU: {}".format(gpu_id)) else: logging.info("Using CPU") logging.info( "Artifacts location: {}".format(tracking_logger.get_artifact_uri()) ) return output_dir
[docs]def teardown(options): """ Tears down after executing prediction pipeline. Args: options(Namespace): Pipeline specific options """ pass
[docs]def log( output_dir, config_options, config_file_name="train_config.yml", save_config=None, ): """ Logs configuration options for the current training run. Args: output_dir (str): Path to directory where experiment files should be saved. config_options (Namespace): Namespace representing all configuration options. config_file_name (str): Filename of the config file save_config (str or Path): Boolean stating if you should save a configuration file. """ logging.debug(pformat(config_options)) config_file_copy = Path(output_dir, config_file_name) save_args_to_file(config_file_copy, **config_options) if tracking_logger.should_log_artifacts(): tracking_logger.log_artifact(str(config_file_copy)) if save_config: save_args_to_file(save_config, output_dir=output_dir, **config_options) # Log parameters tracking_logger.log_param("output_dir", output_dir) tracking_logger.log_param("save_config", save_config) for param, value in config_options.items(): tracking_logger.log_param(param, value)