使用 Apache Spark 进行搜索的横向扩展

有了新的类 SparkTrials,你可以告诉 Hyperopt 将调优任务分发到 Apache Spark 集群上。该 API 最初在 Databricks 内部开发,现已贡献给 Hyperopt。

超参数调优和模型选择通常涉及训练数百或数千个模型。SparkTrials 在每个 Spark 执行器上并行运行这些训练任务的批次,从而实现大规模的调优扩展。要在 Hyperopt 中使用 SparkTrials,只需将 SparkTrials 对象传递给 Hyperopt 的 fmin() 函数即可。

import hyperopt

best_hyperparameters = hyperopt.fmin(
  fn = training_function,
  space = search_space,
  algo = hyperopt.tpe.suggest,
  max_evals = 64,
  trials = hyperopt.SparkTrials())

在底层,fmin() 会生成新的超参数设置进行测试,并将其传递给 SparkTrials,后者会在集群上异步运行这些任务,具体如下:

  • Hyperopt 的主要逻辑运行在 Spark driver 上,计算新的超参数设置。
  • 当 worker 准备好执行新任务时,Hyperopt 会针对该超参数设置启动一个单任务 Spark job。
  • 在该任务中(运行在一个 Spark executor 上),将执行用户代码来训练和评估新的 ML 模型。
  • 完成后台,Spark 任务会将结果(包括损失值)返回给 driver。

Hyperopt 会使用这些新结果来计算未来任务的更好超参数设置。

由于 SparkTrials 在一个 Spark worker 上拟合和评估每个模型,因此它仅限于调优单机 ML 模型和工作流程,例如 scikit-learn 或单机 TensorFlow。对于分布式 ML 算法,如 Apache Spark MLlib 或 Horovod,你可以使用 Hyperopt 的默认 Trials 类。

SparkTrials API

SparkTrials 可以通过 3 个参数进行配置,所有参数都是可选的:

parallelism 并行评估的最大 trials 数量。更大的并行度允许对更多超参数设置进行横向扩展测试。默认为 Spark SparkContext.defaultParallelism

  • 权衡:parallelism 参数可以与 fmin() 中的 max_evals 参数结合设置。Hyperopt 将测试总共 max_evals 个超参数设置,分批次进行,每批次大小为 parallelism。如果 parallelism = max_evals,则 Hyperopt 将执行随机搜索:它将独立选择所有要测试的超参数设置,然后并行评估它们。如果 parallelism = 1,则 Hyperopt 可以充分利用自适应算法,如 Parzen 树估计器 (TPE),该算法迭代地探索超参数空间:每个新的超参数设置都将根据之前的结果选择。将 parallelism 设置在 1max_evals 之间,可以权衡可伸缩性(更快获得结果)和自适应性(有时获得更好的模型)。
  • 限制:目前并行度有一个硬性上限 128。SparkTrials 还会检查集群配置,以查看 Spark 允许多少并发任务;如果并行度超过此最大值,SparkTrials 会将并行度降低到此最大值。

timeout 允许 fmin() 运行的最大时间(秒),默认为 None。超时提供了一种预算机制,可以限制调优所需的时间。当达到超时时,如果可能,运行会被终止,fmin() 会退出,并返回当前结果集。

spark_sessionSparkTrials 使用的 SparkSession 实例。如果未给出,SparkTrials 将查找现有的 SparkSession

还可以通过调用 help(SparkTrials) 查看 SparkTrials API。

使用 SparkTrials 的工作流程示例

下面,我们给出了一个使用 SparkTrials 调优 scikit-learn 模型的工作流程示例。此示例改编自 scikit-learn 文档示例,用于 MNIST 稀疏逻辑回归。

from sklearn.datasets import fetch_openml
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state

from hyperopt import fmin, hp, tpe
from hyperopt import SparkTrials, STATUS_OK

# Load MNIST data, and preprocess it by standarizing features.
X, y = fetch_openml('mnist_784', version=1, return_X_y=True)

random_state = check_random_state(0)
permutation = random_state.permutation(X.shape[0])
X = X[permutation]
y = y[permutation]
X = X.reshape((X.shape[0], -1))

X_train, X_test, y_train, y_test = train_test_split(
    X, y, train_size=5000, test_size=10000)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# First, set up the scikit-learn workflow, wrapped within a function.
def train(params):
  """
  This is our main training function which we pass to Hyperopt.
  It takes in hyperparameter settings, fits a model based on those settings,
  evaluates the model, and returns the loss.

  :param params: map specifying the hyperparameter settings to test
  :return: loss for the fitted model
  """
  # We will tune 2 hyperparameters:
  #  regularization and the penalty type (L1 vs L2).
  regParam = float(params['regParam'])
  penalty = params['penalty']

  # Turn up tolerance for faster convergence
  clf = LogisticRegression(C=1.0 / regParam,
                           multi_class='multinomial',
                           penalty=penalty, solver='saga', tol=0.1)
  clf.fit(X_train, y_train)
  score = clf.score(X_test, y_test)

  return {'loss': -score, 'status': STATUS_OK}

# Next, define a search space for Hyperopt.
search_space = {
  'penalty': hp.choice('penalty', ['l1', 'l2']),
  'regParam': hp.loguniform('regParam', -10.0, 0),
}

# Select a search algorithm for Hyperopt to use.
algo=tpe.suggest  # Tree of Parzen Estimators, a Bayesian method

# We can run Hyperopt locally (only on the driver machine)
# by calling `fmin` without an explicit `trials` argument.
best_hyperparameters = fmin(
  fn=train,
  space=search_space,
  algo=algo,
  max_evals=32)
best_hyperparameters

# We can distribute tuning across our Spark cluster
# by calling `fmin` with a `SparkTrials` instance.
spark_trials = SparkTrials()
best_hyperparameters = fmin(
  fn=train,
  space=search_space,
  algo=algo,
  trials=spark_trials,
  max_evals=32)
best_hyperparameters