使用 Apache Spark 进行搜索的横向扩展
有了新的类 SparkTrials
,你可以告诉 Hyperopt 将调优任务分发到 Apache Spark 集群上。该 API 最初在 Databricks 内部开发,现已贡献给 Hyperopt。
超参数调优和模型选择通常涉及训练数百或数千个模型。SparkTrials
在每个 Spark 执行器上并行运行这些训练任务的批次,从而实现大规模的调优扩展。要在 Hyperopt 中使用 SparkTrials
,只需将 SparkTrials
对象传递给 Hyperopt 的 fmin()
函数即可。
import hyperopt
best_hyperparameters = hyperopt.fmin(
fn = training_function,
space = search_space,
algo = hyperopt.tpe.suggest,
max_evals = 64,
trials = hyperopt.SparkTrials())
在底层,fmin()
会生成新的超参数设置进行测试,并将其传递给 SparkTrials
,后者会在集群上异步运行这些任务,具体如下:
- Hyperopt 的主要逻辑运行在 Spark driver 上,计算新的超参数设置。
- 当 worker 准备好执行新任务时,Hyperopt 会针对该超参数设置启动一个单任务 Spark job。
- 在该任务中(运行在一个 Spark executor 上),将执行用户代码来训练和评估新的 ML 模型。
- 完成后台,Spark 任务会将结果(包括损失值)返回给 driver。
Hyperopt 会使用这些新结果来计算未来任务的更好超参数设置。
由于 SparkTrials
在一个 Spark worker 上拟合和评估每个模型,因此它仅限于调优单机 ML 模型和工作流程,例如 scikit-learn 或单机 TensorFlow。对于分布式 ML 算法,如 Apache Spark MLlib 或 Horovod,你可以使用 Hyperopt 的默认 Trials 类。
SparkTrials API
SparkTrials
可以通过 3 个参数进行配置,所有参数都是可选的:
parallelism
并行评估的最大 trials 数量。更大的并行度允许对更多超参数设置进行横向扩展测试。默认为 Spark SparkContext.defaultParallelism
。
- 权衡:
parallelism
参数可以与fmin()
中的max_evals
参数结合设置。Hyperopt 将测试总共max_evals
个超参数设置,分批次进行,每批次大小为parallelism
。如果parallelism = max_evals
,则 Hyperopt 将执行随机搜索:它将独立选择所有要测试的超参数设置,然后并行评估它们。如果parallelism = 1
,则 Hyperopt 可以充分利用自适应算法,如 Parzen 树估计器 (TPE),该算法迭代地探索超参数空间:每个新的超参数设置都将根据之前的结果选择。将parallelism
设置在1
和max_evals
之间,可以权衡可伸缩性(更快获得结果)和自适应性(有时获得更好的模型)。 - 限制:目前并行度有一个硬性上限 128。
SparkTrials
还会检查集群配置,以查看 Spark 允许多少并发任务;如果并行度超过此最大值,SparkTrials
会将并行度降低到此最大值。
timeout
允许 fmin()
运行的最大时间(秒),默认为 None。超时提供了一种预算机制,可以限制调优所需的时间。当达到超时时,如果可能,运行会被终止,fmin()
会退出,并返回当前结果集。
spark_session
供 SparkTrials
使用的 SparkSession
实例。如果未给出,SparkTrials
将查找现有的 SparkSession
。
还可以通过调用 help(SparkTrials)
查看 SparkTrials
API。
使用 SparkTrials 的工作流程示例
下面,我们给出了一个使用 SparkTrials 调优 scikit-learn 模型的工作流程示例。此示例改编自 scikit-learn 文档示例,用于 MNIST 稀疏逻辑回归。
from sklearn.datasets import fetch_openml
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state
from hyperopt import fmin, hp, tpe
from hyperopt import SparkTrials, STATUS_OK
# Load MNIST data, and preprocess it by standarizing features.
X, y = fetch_openml('mnist_784', version=1, return_X_y=True)
random_state = check_random_state(0)
permutation = random_state.permutation(X.shape[0])
X = X[permutation]
y = y[permutation]
X = X.reshape((X.shape[0], -1))
X_train, X_test, y_train, y_test = train_test_split(
X, y, train_size=5000, test_size=10000)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# First, set up the scikit-learn workflow, wrapped within a function.
def train(params):
"""
This is our main training function which we pass to Hyperopt.
It takes in hyperparameter settings, fits a model based on those settings,
evaluates the model, and returns the loss.
:param params: map specifying the hyperparameter settings to test
:return: loss for the fitted model
"""
# We will tune 2 hyperparameters:
# regularization and the penalty type (L1 vs L2).
regParam = float(params['regParam'])
penalty = params['penalty']
# Turn up tolerance for faster convergence
clf = LogisticRegression(C=1.0 / regParam,
multi_class='multinomial',
penalty=penalty, solver='saga', tol=0.1)
clf.fit(X_train, y_train)
score = clf.score(X_test, y_test)
return {'loss': -score, 'status': STATUS_OK}
# Next, define a search space for Hyperopt.
search_space = {
'penalty': hp.choice('penalty', ['l1', 'l2']),
'regParam': hp.loguniform('regParam', -10.0, 0),
}
# Select a search algorithm for Hyperopt to use.
algo=tpe.suggest # Tree of Parzen Estimators, a Bayesian method
# We can run Hyperopt locally (only on the driver machine)
# by calling `fmin` without an explicit `trials` argument.
best_hyperparameters = fmin(
fn=train,
space=search_space,
algo=algo,
max_evals=32)
best_hyperparameters
# We can distribute tuning across our Spark cluster
# by calling `fmin` with a `SparkTrials` instance.
spark_trials = SparkTrials()
best_hyperparameters = fmin(
fn=train,
space=search_space,
algo=algo,
trials=spark_trials,
max_evals=32)
best_hyperparameters