"""
Utility classes and functions for SKLL learners.
:author: Nitin Madnani (nmadnani@ets.org)
:author: Michael Heilman (mheilman@ets.org)
:author: Dan Blanchard (dblanchard@ets.org)
:author: Aoife Cahill (acahill@ets.org)
:organization: ETS
"""
from __future__ import annotations
import inspect
import logging
import sys
import time
from collections import Counter, defaultdict
from csv import DictWriter, excel_tab
from functools import wraps
from importlib import import_module
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
Type,
Union,
)
import joblib
import numpy as np
import scipy.sparse as sp
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_selection import SelectKBest
from sklearn.metrics import (
accuracy_score,
confusion_matrix,
precision_recall_fscore_support,
)
from sklearn.model_selection import (
KFold,
LeaveOneGroupOut,
ShuffleSplit,
StratifiedKFold,
)
from skll.data import FeatureSet
from skll.metrics import _CUSTOM_METRICS, _PREDEFINED_CUSTOM_METRICS, use_score_func
from skll.types import (
ComputeEvalMetricsResults,
ConfusionMatrix,
FeaturesetIterator,
FoldMapping,
IdType,
IndexIterator,
LabelType,
PathOrStr,
SparseFeatureMatrix,
)
from skll.utils.constants import (
CLASSIFICATION_ONLY_METRICS,
CORRELATION_METRICS,
REGRESSION_ONLY_METRICS,
UNWEIGHTED_KAPPA_METRICS,
WEIGHTED_KAPPA_METRICS,
)
from skll.version import VERSION
# import classes that we only need for type checking and not
# otherwise since they would cause circular import issues
if TYPE_CHECKING:
import skll.learner
import skll.learner.voting
class Densifier(BaseEstimator, TransformerMixin):
"""
Custom pipeline stage for handling dense feature arrays.
A custom pipeline stage that will be inserted into the
learner pipeline attribute to accommodate the situation
when SKLL needs to manually convert feature arrays from
sparse to dense. For example, when features are being hashed
but we are also doing centering using the feature means.
"""
def fit(self, X, y=None):
"""Fit the estimator."""
return self
def fit_transform(self, X, y=None):
"""Fit the estimator and transform the input."""
return self
def transform(self, X):
"""Transform the input using already fit estimator."""
return X.toarray()
class FilteredLeaveOneGroupOut(LeaveOneGroupOut):
"""
Custom version ``LeaveOneGroupOut`` cross-validation iterator.
This version only outputs indices of instances with IDs in a prespecified set.
Parameters
----------
keep : Iterable[IdType]
A set of IDs to keep.
example_ids : numpy.ndarray, of length n_samples
A list of example IDs.
logger : Optional[logging.Logger], default=None
A logger instance.
"""
def __init__(
self,
keep: Iterable[IdType],
example_ids: np.ndarray,
logger: Optional[logging.Logger] = None,
):
"""Initialize the model."""
super(FilteredLeaveOneGroupOut, self).__init__()
self.keep = keep
self.example_ids = example_ids
self._warned = False
self.logger = logger if logger else logging.getLogger(__name__)
def split(
self, X: SparseFeatureMatrix, y: np.ndarray, groups: Optional[List[str]]
) -> IndexIterator:
"""
Generate indices to split data into training and test set.
Parameters
----------
X : numpy.ndarray, with shape (n_samples, n_features)
Training data, where n_samples is the number of samples
and n_features is the number of features.
y : numpy.ndarray, of length n_samples
The target variable for supervised learning problems.
groups : List[str]
Group labels for the samples used while splitting the dataset into
train/test set.
Yields
------
train_index : numpy.ndarray
The training set indices for that split.
test_index : numpy.ndarray
The testing set indices for that split.
"""
for train_index, test_index in super(FilteredLeaveOneGroupOut, self).split(X, y, groups):
train_len = len(train_index)
test_len = len(test_index)
train_index = [i for i in train_index if self.example_ids[i] in self.keep]
test_index = [i for i in test_index if self.example_ids[i] in self.keep]
if not self._warned and (train_len != len(train_index) or test_len != len(test_index)):
self.logger.warning(
"Feature set contains IDs that are not "
"in folds dictionary. Skipping those IDs."
)
self._warned = True
yield train_index, test_index
class SelectByMinCount(SelectKBest):
"""
Select or discard features based on how often they occur in the data.
Select features occurring in more (and/or fewer than) than a specified
number of examples in the training data (or a CV training fold).
Parameters
----------
min_count : int, default=1
The minimum feature count to select.
"""
def __init__(self, min_count: int = 1):
"""Initialize the model."""
self.min_count = min_count
self.scores_: Optional[np.ndarray] = None
def fit(self, X, y=None):
"""
Fit the SelectByMinCount model.
Parameters
----------
X : numpy.ndarray, with shape (n_samples, n_features)
The training data to fit.
y : Ignored
Not used.
Returns
-------
self
"""
# initialize a list of counts of times each feature appears
col_counts = [0 for _ in range(X.shape[1])]
if sp.issparse(X):
# find() is scipy.sparse's equivalent of nonzero()
_, col_indices, _ = sp.find(X)
else:
# assume it's a numpy array (not a numpy matrix)
col_indices = X.nonzero()[1].tolist()
for i in col_indices:
col_counts[i] += 1
self.scores_ = np.array(col_counts)
return self
def _get_support_mask(self):
"""
Return a mask indicating which features to keep.
Adapted from ``SelectKBest``.
Returns
-------
mask : numpy.ndarray
The mask with features to keep set to True.
"""
mask = np.zeros(self.scores_.shape, dtype=bool)
mask[self.scores_ >= self.min_count] = True
return mask
def add_unseen_labels(
train_label_dict: Dict[LabelType, int], test_label_list: List[LabelType]
) -> Dict[LabelType, int]:
"""
Merge test set labels that are not seen in the training data with seen ones.
Parameters
----------
train_label_dict : Dict[:class:`skll.types.LabelType`, int]
Dictionary mapping training set class labels to class indices.
test_label_list : List[:class:`skll.types.LabelType`]
List containing labels in the test set.
Returns
-------
Dict[:class:`skll.types.LabelType`, int]
Dictionary mapping merged labels from both the training and test sets
to indices.
"""
# get the list of labels that were in the training set
train_label_list = list(train_label_dict.keys())
# identify any unseen labels in the test set
unseen_test_label_list = [label for label in test_label_list if label not in train_label_list]
# create a new dictionary for these unseen labels with label indices
# for them starting _after_ those for the training set labels
unseen_label_dict = {
label: i for i, label in enumerate(unseen_test_label_list, start=len(train_label_list))
}
# combine the train label dictionary with this unseen label one & return
train_and_test_label_dict = train_label_dict.copy()
train_and_test_label_dict.update(unseen_label_dict)
return train_and_test_label_dict
def compute_evaluation_metrics(
metrics: List[str],
labels: np.ndarray,
predictions: np.ndarray,
model_type: str,
label_dict: Optional[Dict[LabelType, int]] = None,
grid_objective: Optional[str] = None,
probability: bool = False,
logger: Optional[logging.Logger] = None,
) -> ComputeEvalMetricsResults:
"""
Compute given evaluation metrics.
Compute given metrics to evaluate the given predictions generated
by the given type of estimator against the given true labels.
Parameters
----------
metrics : List[str]
List of metrics to compute.
labels : numpy.ndarray
True labels to be used for computing the metrics.
predictions : numpy.ndarray
The predictions to be used for computing the metrics.
model_type : str
One of "classifier" or "regressor".
label_dict : Optional[Dict[LabelType, int]], default=None
Dictionary mapping class labels to indices for classification.
grid_objective : Optional[str], default=None
The objective used for tuning the hyper-parameters of the model
that generated the predictions. If ``None``, it means that no
grid search was done.
probability : bool, default=False
Does the model output class probabilities?
logger : Optional[logging.Logger], default=None
A logger instance to use for logging messages and warnings.
If ``None``, a new one is created.
Returns
-------
:class:`skll.types.ComputeEvalMetricsResults`
5-tuple including the confusion matrix, the overall accuracy, the
per-label PRFs, the grid search objective function score, and the
additional evaluation metrics, if any. For regressors, the
first two elements are ``None``.
"""
# set up the logger
logger = logger if logger else logging.getLogger(__name__)
# warn if grid objective was also specified in metrics
if len(metrics) > 0 and grid_objective in metrics:
logger.warning(
f"The grid objective '{grid_objective}' is also "
"specified as an evaluation metric. Since its value "
"is already included in the results as the objective "
"score, it will not be printed again in the list of "
"metrics."
)
metrics = [metric for metric in metrics if metric != grid_objective]
# initialize a dictionary that will hold all of the metric scores
metric_scores: Dict[str, Optional[float]] = {metric: None for metric in metrics}
# if we are doing classification and are a probablistic
# learner or a soft-voting meta learner, then `yhat` are
# probabilities so we need to compute the class indices
# separately and save them too
if model_type == "classifier" and probability:
class_probs = predictions
predictions = np.argmax(class_probs, axis=1)
# if we are a regressor or classifier not in probability
# mode, then we have the class indices already and there
# are no probabilities
else:
class_probs = None
# make a single list of metrics including the grid objective
# since it's easier to compute everything together
metrics_to_compute = [grid_objective] + metrics
for metric in metrics_to_compute:
# skip the None if we are not doing grid search
if not metric:
continue
# declare types
preds_for_metric: Optional[np.ndarray]
# CASE 1: in probability mode for classification which means we
# need to either use the probabilities directly or infer the labels
# from them depending on the metric
if probability and label_dict and class_probs is not None:
# there are three possible cases here:
# (a) if we are using a correlation metric or
# `average_precision` or `roc_auc` in a binary
# classification scenario, then we need to explicitly
# pass in the probabilities of the positive class.
# (b) if we are using `neg_log_loss`, then we
# just pass in the full probability array
# (c) we compute the most likely labels from the
# probabilities via argmax and use those
# for all other metrics
if (
len(label_dict) == 2
and (metric in CORRELATION_METRICS or metric in ["average_precision", "roc_auc"])
and metric != grid_objective
):
logger.info(
"using probabilities for the positive class to "
f"compute '{metric}' for evaluation."
)
preds_for_metric = class_probs[:, 1]
elif metric == "neg_log_loss":
preds_for_metric = class_probs
else:
preds_for_metric = predictions
# CASE 2: no probability mode for classifier or regressor
# in which case we just use the predictions as they are
else:
preds_for_metric = predictions
try:
metric_scores[metric] = use_score_func(metric, labels, preds_for_metric)
except ValueError:
metric_scores[metric] = float("NaN")
# now separate out the grid objective score from the additional metric scores
# if a grid objective was actually passed in. If no objective was passed in
# then that score should just be none.
objective_score = None
additional_scores = metric_scores.copy()
if grid_objective:
objective_score = metric_scores[grid_objective]
del additional_scores[grid_objective]
# declare the type for the results
res: ComputeEvalMetricsResults
# compute some basic statistics for regressors
if model_type == "regressor":
regressor_result_dict: Dict[LabelType, Any] = {"descriptive": defaultdict(dict)}
for table_label, y in zip(["actual", "predicted"], [labels, predictions]):
regressor_result_dict["descriptive"][table_label]["min"] = min(y)
regressor_result_dict["descriptive"][table_label]["max"] = max(y)
regressor_result_dict["descriptive"][table_label]["avg"] = np.mean(y)
regressor_result_dict["descriptive"][table_label]["std"] = np.std(y)
regressor_result_dict["pearson"] = use_score_func("pearson", labels, predictions)
res = (None, None, regressor_result_dict, objective_score, additional_scores)
elif label_dict:
# compute the confusion matrix and precision/recall/f1
# note that we are using the class indices here
# and not the actual class labels themselves
num_labels = len(label_dict)
conf_mat: ConfusionMatrix = confusion_matrix(
labels, predictions, labels=list(range(num_labels))
).tolist()
# Calculate metrics
overall_accuracy: float = accuracy_score(labels, predictions)
result_matrix = precision_recall_fscore_support(
labels, predictions, labels=list(range(num_labels)), average=None
)
# Store results
classifier_result_dict: Dict[LabelType, Any] = defaultdict(dict)
for actual_label in sorted(label_dict):
col = label_dict[actual_label]
classifier_result_dict[actual_label]["Precision"] = result_matrix[0][col]
classifier_result_dict[actual_label]["Recall"] = result_matrix[1][col]
classifier_result_dict[actual_label]["F-measure"] = result_matrix[2][col]
res = (
conf_mat,
overall_accuracy,
classifier_result_dict,
objective_score,
additional_scores,
)
return res
def compute_num_folds_from_example_counts(
cv_folds: int,
labels: Optional[np.ndarray],
model_type: str,
logger: Optional[logging.Logger] = None,
) -> int:
"""
Calculate number of cross-validation folds, based on number of examples per label.
Parameters
----------
cv_folds : int
The number of cross-validation folds.
labels : numpy.ndarray
The example labels.
model_type : str
One of "classifier" or "regressor".
logger : Optional[logging.Logger], default=None
A logger instance to use for logging messages and warnings.
If ``None``, a new one is created.
Returns
-------
int
The number of folds to use, based on the number of examples
for each label.
Raises
------
ValueError
If ``cv_folds`` is not an integer or if the training set has
fewer than 2 examples associated with a label (for classification).
"""
# get a logger if not provided
logger = logger if logger else logging.getLogger(__name__)
try:
assert isinstance(cv_folds, int)
except AssertionError:
raise ValueError("`cv_folds` must be an integer.")
# For regression models, we can just return the current cv_folds
if model_type == "regressor":
return cv_folds
min_examples_per_label = min(Counter(labels).values())
if min_examples_per_label <= 1:
raise ValueError(
f"The training set has only {min_examples_per_label}" " example for a label."
)
if min_examples_per_label < cv_folds:
logger.warning(
"The minimum number of examples per label was "
f"{min_examples_per_label}. Setting the number of "
"cross-validation folds to that value."
)
cv_folds = min_examples_per_label
return cv_folds
def contiguous_ints_or_floats(numbers: np.ndarray) -> bool:
"""
Check for continuity in the given list of numbers.
Check whether the given list of numbers contains
contiguous integers or contiguous integer-like
floats. For example, [1, 2, 3] or [4.0, 5.0, 6.0]
are both contiguous but [1.1, 1.2, 1.3] is not.
Parameters
----------
numbers : numpy.ndarray
The numbers we want to check.
Returns
-------
bool
``True`` if the numbers are contiguous integers
or contiguous integer-like floats (1.0, 2.0, etc.),
``False`` otherwise.
Raises
------
TypeError
If ``numbers`` does not contain integers or floating point values.
ValueError
If ``numbers`` is empty.
"""
try:
# make sure that number is not empty
assert len(numbers) > 0
# first check that the numbers are all integers
# or integer-like floats (e.g., 1.0, 2.0 etc.)
ints_or_int_like_floats = np.all(np.mod(numbers, 1) == 0)
# next check that the successive differences between
# the numbers are all 1, i.e., they are nuermicontiguous
contiguous = np.all(np.diff(numbers) == 1)
except AssertionError:
raise ValueError("Input cannot be empty.")
except TypeError:
raise TypeError("Input should only contain numbers.")
# we need both conditions to be true and we want to return
# a regular Python `bool`, not a `numpy.bool_`
return bool(ints_or_int_like_floats and contiguous)
def get_acceptable_classification_metrics(label_array: np.ndarray) -> Set[str]:
"""
Return acceptable metrics given the unique set of labels being classified.
Parameters
----------
label_array : numpy.ndarray
A sorted numpy array containing the unique labels
that we are trying to predict. Optional for regressors
but required for classifiers.
Returns
-------
acceptable_metrics : Set[str]
A set of metric names that are acceptable
for the given classification scenario.
"""
# this is a classifier so the acceptable objective
# functions definitely include those metrics that
# are specifically for classification and also
# the unweighted kappa metrics
acceptable_metrics = CLASSIFICATION_ONLY_METRICS | UNWEIGHTED_KAPPA_METRICS
# now let us consider which other metrics may also
# be acceptable depending on whether the labels
# are strings or (contiguous) integers/floats
label_type = label_array.dtype.type
# CASE 1: labels are strings, then no other metrics
# are acceptable
if issubclass(label_type, (np.object_, str)):
pass
# CASE 2: labels are integers or floats; the way
# it works in SKLL, it's guaranteed that
# class indices will be sorted in the same order
# as the class labels therefore, ranking metrics
# such as various correlations should work fine.
elif issubclass(label_type, (int, np.int32, np.int64, float, np.float32, np.float64)):
acceptable_metrics.update(CORRELATION_METRICS)
# CASE 3: labels are numerically contiguous integers
# this is a special sub-case of CASE 2 which
# represents ordinal classification. Only in this
# case, weighted kappas -- where the distance
# between the class labels has a special
# meaning -- can be allowed. This is because
# class indices are always contiguous and all
# metrics in SKLL are computed in the index
# space, not the label space. Note that floating
# point numbers that are equivalent to integers
# (e.g., [1.0, 2.0, 3.0]) are also acceptable.
if contiguous_ints_or_floats(label_array):
acceptable_metrics.update(WEIGHTED_KAPPA_METRICS)
# if there are any user-defined custom metrics registered, include them too
user_defined_metrics = set(_CUSTOM_METRICS) - set(_PREDEFINED_CUSTOM_METRICS)
if len(user_defined_metrics) > 0:
acceptable_metrics.update(user_defined_metrics)
return acceptable_metrics
def get_acceptable_regression_metrics() -> Set[str]:
"""Return the set of metrics that are acceptable for regression."""
# it's fairly straightforward for regression since
# we do not have to check the labels
acceptable_metrics = (
REGRESSION_ONLY_METRICS
| UNWEIGHTED_KAPPA_METRICS
| WEIGHTED_KAPPA_METRICS
| CORRELATION_METRICS
)
# if there are any user-defined custom metrics registered, include them too
user_defined_metrics = set(_CUSTOM_METRICS) - set(_PREDEFINED_CUSTOM_METRICS)
if len(user_defined_metrics) > 0:
acceptable_metrics.update(user_defined_metrics)
return acceptable_metrics
[docs]
def load_custom_learner(
custom_learner_path: Optional[PathOrStr], custom_learner_name: str
) -> "skll.learner.Learner":
"""
Import and load the custom learner object from the given path.
Parameters
----------
custom_learner_path : :class:`skll.types.PathOrStr`
The path to a custom learner.
custom_learner_name : str
The name of a custom learner.
Returns
-------
:class:`skll.learner.Learner`
The SKLL learner object loaded from the given path.
Raises
------
ValueError
If the custom learner path does not end in '.py'.
"""
if not custom_learner_path:
raise ValueError(
"custom_learner_path was not set and learner " f"{custom_learner_name} was not found."
)
# convert to a Path object
custom_learner_path = Path(custom_learner_path)
if custom_learner_path.suffix != ".py":
raise ValueError("custom_learner_path must end in .py " f"({custom_learner_path})")
custom_learner_module_name = custom_learner_path.stem
sys.path.append(str(custom_learner_path.resolve().parent))
import_module(custom_learner_module_name)
return getattr(sys.modules[custom_learner_module_name], custom_learner_name)
def get_predictions(
learner: Union["skll.learner.Learner", "skll.learner.voting.VotingLearner"], xtest: np.ndarray
) -> Dict[str, Any]:
"""
Get predictions from the given learner (or meta-learner) for given features.
The various types of predictions include:
- "raw" predictions which are self-explanatory for regressors; for
classifiers, these are the indices of the class labels, not the labels
themselves.
- "labels": for classifiers, these are the class labels; for regressors
they are not applicable and represented as ``None``.
- "probabilities": for classifiers, these are the class probabilities; for
non-probabilistic classifiers or regressors, they are not applicable and
represented as ``None``.
Parameters
----------
learner : Union[:class:`skll.learner.Learner`, :class:`skll.learner.voting.VotingLearner`]
The already-trained ``Learner`` or ``VotingLearner`` instance that is
used to generate the predictions.
xtest : numpy.ndarray
Numpy array of features on which the predictions are to be made.
Returns
-------
prediction_dict : Dict[str, Any]
Dictionary containing the three types of predictions as the keys
and either ``None`` or a numpy array as the value.
Raises
------
NotImplementedError
If the scikit-learn model does not implement ``predict_proba()`` to
get the class probabilities.
"""
# deferred import to avoid circular dependencies
from skll.learner.voting import VotingLearner
# initialize the prediction dictionary
prediction_dict: Dict[str, Any] = {"raw": None, "labels": None, "probabilities": None}
# first get the raw predictions from the underlying scikit-learn model
# this works for both classifiers and regressors
yhat = learner.model.predict(xtest)
prediction_dict["raw"] = yhat
# next, if it's a classifier ...
if learner.model_type._estimator_type == "classifier":
# get the list of labels from the learner (or meta-learner)
if isinstance(learner, VotingLearner):
label_list = learner.learners[0].label_list
else:
label_list = learner.label_list
# get the predicted class labels
class_labels = np.array([label_list[int(pred)] for pred in yhat])
prediction_dict["labels"] = class_labels
# then get the class probabilities too if the learner
# (or meta-learner) is probabilistic
if (hasattr(learner, "probability") and learner.probability) or (
hasattr(learner, "voting") and learner.voting == "soft"
):
try:
yhat_probs = learner.model.predict_proba(xtest)
except NotImplementedError as e:
learner.logger.error(
f"Model type: {learner.model_type.__name__}\n" f"Model: {learner.model}\n"
)
raise e
else:
prediction_dict["probabilities"] = yhat_probs
return prediction_dict
def rescaled(cls):
"""
Create regressors that rescale their predictions.
This decorator creates regressors that store a min and a max for the training
data and make sure that predictions fall within that range. They also store
the means and SDs of the gold standard and the predictions on the training
set to rescale the predictions (e.g., as in e-rater).
Parameters
----------
cls : BaseEstimator
An estimator class to add rescaling to.
Returns
-------
cls : BaseEstimator
Modified version of estimator class with rescaled functions added.
Raises
------
ValueError
If classifier cannot be rescaled (i.e. is not a regressor).
"""
# If this class has already been run through the decorator, return it
if hasattr(cls, "rescale"):
return cls
# Save original versions of functions to use later.
orig_init = cls.__init__
orig_fit = cls.fit
orig_predict = cls.predict
if cls._estimator_type == "classifier":
raise ValueError("Classifiers cannot be rescaled. Only regressors " "can.")
# Define all new versions of functions
@wraps(cls.fit)
def fit(self, X: np.ndarray, y=None): # noqa: D417
"""
Fit a model.
Also store the mean, SD, max and min of the training set
and the mean and SD of the predictions on the training set.
Parameters
----------
X : numpy.ndarray, with shape (n_samples, n_features)
The data to fit.
y : Ignored
This is ignored.
Returns
-------
self
"""
# fit a regular regression model
orig_fit(self, X, y=y)
if self.constrain:
# also record the training data min and max
self.y_min = np.min(y)
self.y_max = np.max(y)
if self.rescale:
# also record the means and SDs for the training set
y_hat = orig_predict(self, X)
self.yhat_mean = np.mean(y_hat)
self.yhat_sd = np.std(y_hat)
self.y_mean = np.mean(y)
self.y_sd = np.std(y)
return self
@wraps(cls.predict)
def predict(self, X: np.ndarray) -> np.ndarray:
"""
Predict with regressor and rescale.
Make predictions with the super class, and then adjust them using the
stored min, max, means, and standard deviations.
Parameters
----------
self
The instance itself
X : numpy.ndarray, with shape (n_samples,)
The data to predict.
Returns
-------
numpy.ndarray
The prediction results.
"""
# get the unconstrained predictions
res = orig_predict(self, X)
if self.rescale:
# convert the predictions to z-scores,
# then rescale to match the training set distribution
res = (((res - self.yhat_mean) / self.yhat_sd) * self.y_sd) + self.y_mean
if self.constrain:
# apply min and max constraints
res = np.array([max(self.y_min, min(self.y_max, pred)) for pred in res])
return res
@classmethod
@wraps(cls._get_param_names)
def _get_param_names(class_x):
"""
Get kwargs for superclass and add new kwargs.
This is adapted from scikit-learn's ``BaseEstimator`` class.
It gets the kwargs for the superclass's init method and adds the
kwargs for newly added ``__init__()`` method.
Parameters
----------
class_x
The superclass from which to retrieve param names.
Returns
-------
List[str]
A list of parameter names for the class's init method.
Raises
------
RuntimeError
If `varargs` exist in the scikit-learn estimator.
"""
# initialize the empty list of parameter names
args = []
try:
# get signature of the original init method
init = getattr(orig_init, "deprecated_original", orig_init)
init_signature = inspect.signature(init)
# get all parameters excluding 'self'
original_parameters = [
p
for p in init_signature.parameters.values()
if p.name != "self" and p.kind != p.VAR_KEYWORD
]
# there should be no varargs
for parameter in original_parameters:
if parameter.kind == parameter.VAR_POSITIONAL:
raise RuntimeError(
"scikit-learn estimators should always specify their "
"parameters in the signature of their __init__ (no "
f"varargs). {cls} with constructor {init_signature} "
"doesn't follow this convention."
)
else:
args.append(parameter.name)
except TypeError:
pass
# now get the additional rescaling arguments
rescale_args = list(inspect.signature(class_x.__init__).parameters.keys())
# Remove 'self'
rescale_args.remove("self")
# add the rescaling arguments to the original arguments and sort
args += rescale_args
args.sort()
return args
@wraps(cls.__init__)
def init(self, constrain: bool = True, rescale: bool = True, **kwargs): # noqa: D417
"""
Initialize things in the right order.
Parameters
----------
constrain : bool, default=True
Whether to constrain predictions within min and max values.
rescale : bool, default=True
Whether to rescale prediction values using z-scores.
kwargs : Dict[str, Any]
Keyword arguments for base class.
"""
# pylint: disable=W0201
self.constrain = constrain
self.rescale = rescale
self.y_min = None
self.y_max = None
self.yhat_mean = None
self.yhat_sd = None
self.y_mean = None
self.y_sd = None
orig_init(self, **kwargs)
# Override original functions with new ones
cls.__init__ = init
cls.fit = fit
cls.predict = predict
cls._get_param_names = _get_param_names
cls.rescale = True
# Return modified class
return cls
def setup_cv_fold_iterator(
cv_folds: Union[int, FoldMapping],
examples: FeatureSet,
model_type: str,
stratified: bool = False,
logger: Optional[logging.Logger] = None,
) -> Tuple[Union[FilteredLeaveOneGroupOut, KFold, StratifiedKFold], Optional[List[str]]]:
"""
Set up a cross-validation fold iterator for the given ``FeatureSet``.
Parameters
----------
cv_folds : Union[int, :class:`skll.types.FoldMapping`]
The number of folds to use for cross-validation, or
a mapping from example IDs to folds.
examples : :class:`skll.data.featureset.FeatureSet`
The ``FeatureSet`` instance for which the CV iterator is to be computed.
model_type : str
One of "classifier" or "regressor".
stratified : bool, default=False
Should the cross-validation iterator be set up in a stratified fashion?
logger : Optional[logging.Logger], default=None
A logger instance to use for logging messages and warnings.
If ``None``, a new one is created.
Returns
-------
:class:`skll.types.StratifiedKFold`
k-fold iterator
Optional[List[str]]
List of cross-validation groups
"""
# explicitly declare the return types
kfold: Union[FilteredLeaveOneGroupOut, KFold, StratifiedKFold]
# Set up the cross-validation iterator.=
if isinstance(cv_folds, int):
cv_folds = compute_num_folds_from_example_counts(
cv_folds, examples.labels, model_type, logger=logger
)
stratified = stratified and model_type == "classifier"
if stratified:
kfold = StratifiedKFold(n_splits=cv_folds)
cv_groups = None
else:
kfold = KFold(n_splits=cv_folds)
cv_groups = None
# Otherwise cv_folds is a dict
else:
# if we have a mapping from IDs to folds, use it for the overall
# cross-validation as well as the grid search within each
# training fold. Note that this means that the grid search
# will use K-1 folds because the Kth will be the test fold for
# the outer cross-validation.
dummy_label = next(iter(cv_folds.values()))
fold_groups = [cv_folds.get(curr_id, dummy_label) for curr_id in examples.ids]
# Only retain IDs within folds if they're in cv_folds
kfold = FilteredLeaveOneGroupOut(cv_folds, examples.ids, logger=logger)
cv_groups = fold_groups
return kfold, cv_groups
def setup_cv_split_iterator(
cv_folds: Union[int, FoldMapping], examples: FeatureSet
) -> Tuple[FeaturesetIterator, int]:
"""
Set up a cross-validation split iterator over the given ``FeatureSet``.
Parameters
----------
cv_folds : Union[int, :class:`skll.types.FoldMapping`]
The number of folds to use for cross-validation, or
a mapping from example IDs to folds.
examples : :class:`skll.data.featureset.FeatureSet`
The given featureset which is to be split.
Returns
-------
:class:`skll.types.FeaturesetIterator`
Iterator over the train/test featuresets
int
The maximum number of training samples available.
"""
# seed the random number generator for replicability
random_state = np.random.RandomState(123456789)
# set up the cross-validation split iterator with 20% of
# the data always reserved for testing
cv = ShuffleSplit(n_splits=cv_folds, test_size=0.2, random_state=random_state)
cv_iter = list(cv.split(examples.features, examples.labels, None))
n_max_training_samples = len(cv_iter[0][0])
# create an iterator over train/test featuresets based on the
# cross-validation index iterator
featureset_iter = (FeatureSet.split(examples, train, test) for train, test in cv_iter)
return featureset_iter, n_max_training_samples
def train_and_score(
learner: "skll.learner.Learner",
train_examples: FeatureSet,
test_examples: FeatureSet,
metric: str,
) -> Tuple[float, float, float]:
"""
Train learner, generate predictions, and evaluate predictions.
A utility method to train a given learner instance on the given
training examples, generate predictions on the training set itself
and also the given test set, and score those predictions using the
given metric. The method returns the train and test scores.
If the learner has its ``probability`` attribute set to ``True``, it will
produce probability values as predictions rather than class indices.
In this case, this function will compute the argmax over the probability
values to find the most likely class index and use that.
Note that this method needs to be a top-level function since it is
called from within ``joblib.Parallel()`` and, therefore, needs to be
picklable which it would not be as an instancemethod of the ``Learner``
class.
Parameters
----------
learner : :class:`skll.learner.Learner`
A SKLL ``Learner`` instance.
train_examples : :class:`skll.data.featureset.FeatureSet`
The training examples.
test_examples : :class:`skll.data.featureset.FeatureSet`
The test examples.
metric : str
The scoring function passed to ``use_score_func()``.
Returns
-------
float
Output of the score function applied to predictions of
``learner`` on ``train_examples``.
float
Output of the score function applied to predictions of
``learner`` on ``test_examples``.
float
The time taken in seconds to fit the ``learner`` on
``train_examples``.
"""
# capture the time before we train the model
start_time = time.time()
_ = learner.train(train_examples, grid_search=False, shuffle=False)
# compute the time it took to train the model
fit_time = time.time() - start_time
# get the train and test class probabilities or indices (not labels)
train_predictions = learner.predict(train_examples, class_labels=False)
test_predictions = learner.predict(test_examples, class_labels=False)
# recall that voting learners return a tuple from `predict()`
if isinstance(train_predictions, tuple):
train_predictions = train_predictions[0]
if isinstance(test_predictions, tuple):
test_predictions = test_predictions[0]
# if we got probabilities, then we need to run argmax over them
# to convert them into indices; this needs to handle both
# regular learners as well as voting learners
if (hasattr(learner, "probability") and learner.probability) or (
hasattr(learner, "voting") and learner.voting == "soft"
):
train_predictions = np.argmax(train_predictions, axis=1)
test_predictions = np.argmax(test_predictions, axis=1)
# now get the training and test labels and convert them to indices
# but make sure to include any unseen labels in the test data
if train_examples.labels is not None and test_examples.labels is not None:
if learner.model_type._estimator_type == "classifier":
test_label_list = np.unique(test_examples.labels).tolist()
train_and_test_label_dict = add_unseen_labels(learner.label_dict, test_label_list)
train_labels = np.array(
[train_and_test_label_dict[label] for label in train_examples.labels]
)
test_labels = np.array(
[train_and_test_label_dict[label] for label in test_examples.labels]
)
else:
train_labels = train_examples.labels
test_labels = test_examples.labels
# now compute and return the scores
train_score = use_score_func(metric, train_labels, train_predictions)
test_score = use_score_func(metric, test_labels, test_predictions)
return train_score, test_score, fit_time
def write_predictions(
example_ids: np.ndarray,
predictions_to_write: np.ndarray,
file_prefix: str,
model_type: str,
label_list: List[LabelType],
append: bool = False,
):
"""
Write example IDs and predictions to a tab-separated file with given prefix.
Parameters
----------
example_ids : numpy.ndarray
The IDs of the examples for which the predictions have been generated.
predictions_to_write : numpy.ndarray
The predictions to write out to the file.
file_prefix : str
The prefix for the output file. The output file will be named
"<file_prefix>_predictions.tsv".
model_type : str
One of "classifier" or "regressor".
label_list : List[:class:`skll.types.LabelType`]
List of class labels, required if ``probability`` is ``True``.
append : bool, default=False
Should we append the current predictions to the file if it exists?
"""
# create a new file starting with the given prefix
prediction_file = f"{file_prefix}_predictions.tsv"
with open(prediction_file, mode="w" if not append else "a", newline="") as predictionfh:
# create a DictWriter with the appropriate field names
if predictions_to_write.ndim > 1 and label_list:
fieldnames = ["id"] + [label for label in label_list]
else:
fieldnames = ["id", "prediction"]
writer = DictWriter(predictionfh, fieldnames=fieldnames, dialect=excel_tab)
# write out the header unless we are appending
if not append:
writer.writeheader()
# explicitly declare some types
row: Dict[LabelType, Any]
for example_id, pred in zip(example_ids, predictions_to_write):
# for regressors, we just write out the prediction as-is
if model_type == "regressor":
row = {"id": example_id, "prediction": pred}
# if we have an array as a prediction, it must be
# a list of probabilities and if not, then it's
# either a class label or an index
else:
if isinstance(pred, np.ndarray):
row = {"id": example_id}
row.update(dict(zip(label_list, pred))) # type: ignore
else:
row = {"id": example_id, "prediction": pred}
# write out the row
writer.writerow(row)
def _save_learner_to_disk(
learner: Union["skll.learner.Learner", "skll.learner.voting.VotingLearner"], filepath: PathOrStr
) -> None:
"""
Save the given SKLL learner instance to disk.
NOTE: This function should only be used by the ``save()`` methods
for the various learner classes in SKLL.
Parameters
----------
learner : Union[:class:`skll.learner.Learner`, :class:`skll.learner.voting.VotingLearner`]
A ``Learner`` or ``VotingLearner`` instance to save to disk.
filepath : :class:`skll.types.PathOrStr`
The path to save the learner instance to.
"""
# create the directory if it doesn't exist
learner_dir = Path(filepath).parent
if not learner_dir.exists():
learner_dir.mkdir(parents=True)
# write out the learner to disk
joblib.dump((VERSION, learner), filepath)
def _load_learner_from_disk(
learner_type: Union[Type["skll.learner.Learner"], Type["skll.learner.voting.VotingLearner"]],
filepath: PathOrStr,
logger: logging.Logger,
) -> Union["skll.learner.Learner", "skll.learner.voting.VotingLearner"]:
"""
Load a saved instance of the given type from disk.
NOTE: This function should only be used by the ``from_file()``
methods for the various learner classes in SKLL.
Parameters
----------
learner_type : Union[Type[:class:`skll.learner.Learner`], Type[:class:`skll.learner.voting.VotingLearner`]]
The type of learner instance to load from disk.
filepath : :class:`skll.types.PathOrStr`
The path to a saved ``Learner`` or ``VotingLearner`` file.
logger : logging.Logger
A logging object.
Returns
-------
learner : Union[:class:`skll.learner.Learner`, :class:`skll.learner.voting.VotingLearner`]
The ``Learner`` or ``VotingLearner`` instance loaded from the file.
Raises
------
ValueError
If the pickled version of the ``Learner`` instance is out of date.
"""
skll_version, learner = joblib.load(filepath)
# Check that we've actually loaded an instance of the requested type
if not isinstance(learner, learner_type):
raise ValueError(
f"'{filepath}' does not contain an object " f"of type '{learner_type.__name__}'."
)
# check that versions are compatible
elif skll_version < (2, 5, 0):
model_version_str = ".".join(map(str, skll_version))
current_version_str = ".".join(map(str, VERSION))
raise ValueError(
f"The learner stored in '{filepath}' was "
f"created with v{model_version_str} of SKLL, "
"which is incompatible with the current "
f"v{current_version_str}."
)
else:
if not hasattr(learner, "sampler"):
learner.sampler = None
# For backward compatibility, convert string model types to actual classes
if isinstance(learner._model_type, str):
learner._model_type = globals()[learner._model_type]
# set the learner logger attribute to the logger that's passed in
learner.logger = logger
# if the loaded learner is a `VotingLearner` then we need to attach
# the same logger to the underlying learners as well
if learner_type.__name__ == "VotingLearner":
for underlying_learner in learner._learners:
underlying_learner.logger = logger
return learner