"""
This module contains all routines for evaluating GDML and sGDML models.
"""
# MIT License
#
# Copyright (c) 2018-2022 Stefan Chmiela, Gregory Fonseca
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import print_function
import sys
import logging
import os
import psutil
import multiprocessing as mp
Pool = mp.get_context('fork').Pool
import timeit
from functools import partial
try:
import torch
except ImportError:
_has_torch = False
else:
_has_torch = True
try:
_torch_mps_is_available = torch.backends.mps.is_available()
except AttributeError:
_torch_mps_is_available = False
_torch_mps_is_available = False
try:
_torch_cuda_is_available = torch.cuda.is_available()
except AttributeError:
_torch_cuda_is_available = False
import numpy as np
from . import __version__
from .utils.desc import Desc
[docs]def share_array(arr_np):
"""
Return a ctypes array allocated from shared memory with data from a
NumPy array of type `float`.
Parameters
----------
arr_np : :obj:`numpy.ndarray`
NumPy array.
Returns
-------
array of :obj:`ctype`
"""
arr = mp.RawArray('d', arr_np.ravel())
return arr, arr_np.shape
[docs]def _predict_wkr(
r, r_desc_d_desc, lat_and_inv, glob_id, wkr_start_stop=None, chunk_size=None
):
"""
Compute (part) of a prediction.
Every prediction is a linear combination involving the training points used for
this model. This function evalutates that combination for the range specified by
`wkr_start_stop`. This workload can optionally be processed in chunks,
which can be faster as it requires less memory to be allocated.
Note
----
It is sufficient to provide either the parameter `r` or `r_desc_d_desc`.
The other one can be set to `None`.
Parameters
----------
r : :obj:`numpy.ndarray`
An array of size 3N containing the Cartesian
coordinates of each atom in the molecule.
r_desc_d_desc : tuple of :obj:`numpy.ndarray`
A tuple made up of:
(1) An array of size D containing the descriptors
of dimension D for the molecule.
(2) An array of size D x 3N containing the
descriptor Jacobian for the molecules. It has dimension
D with 3N partial derivatives with respect to the 3N
Cartesian coordinates of each atom.
lat_and_inv : tuple of :obj:`numpy.ndarray`
Tuple of 3 x 3 matrix containing lattice vectors as columns and
its inverse.
glob_id : int
Identifier of the global namespace that this
function is supposed to be using (zero if only one
instance of this class exists at the same time).
wkr_start_stop : tuple of int, optional
Range defined by the indices of first and last (exclusive)
sum element. The full prediction is generated if this parameter
is not specified.
chunk_size : int, optional
Chunk size. The whole linear combination is evaluated in a large
vector operation instead of looping over smaller chunks if this
parameter is left unspecified.
Returns
-------
:obj:`numpy.ndarray`
Partial prediction of all force components and
energy (appended to array as last element).
"""
global globs
glob = globs[glob_id]
sig, n_perms = glob['sig'], glob['n_perms']
desc_func = glob['desc_func']
R_desc_perms = np.frombuffer(glob['R_desc_perms']).reshape(
glob['R_desc_perms_shape']
)
R_d_desc_alpha_perms = np.frombuffer(glob['R_d_desc_alpha_perms']).reshape(
glob['R_d_desc_alpha_perms_shape']
)
if 'alphas_E_lin' in glob:
alphas_E_lin = np.frombuffer(glob['alphas_E_lin']).reshape(
glob['alphas_E_lin_shape']
)
r_desc, r_d_desc = r_desc_d_desc or desc_func.from_R(
r, lat_and_inv, max_processes=1
) # no additional forking during parallelization
n_train = int(R_desc_perms.shape[0] / n_perms)
wkr_start, wkr_stop = (0, n_train) if wkr_start_stop is None else wkr_start_stop
if chunk_size is None:
chunk_size = n_train
dim_d = desc_func.dim
dim_i = desc_func.dim_i
dim_c = chunk_size * n_perms
# Pre-allocate memory.
diff_ab_perms = np.empty((dim_c, dim_d))
a_x2 = np.empty((dim_c,))
mat52_base = np.empty((dim_c,))
# avoid divisions (slower)
sig_inv = 1.0 / sig
mat52_base_fact = 5.0 / (3 * sig**3)
diag_scale_fact = 5.0 / sig
sqrt5 = np.sqrt(5.0)
E_F = np.zeros((dim_d + 1,))
F = E_F[1:]
wkr_start *= n_perms
wkr_stop *= n_perms
b_start = wkr_start
for b_stop in list(range(wkr_start + dim_c, wkr_stop, dim_c)) + [wkr_stop]:
rj_desc_perms = R_desc_perms[b_start:b_stop, :]
rj_d_desc_alpha_perms = R_d_desc_alpha_perms[b_start:b_stop, :]
# Resize pre-allocated memory for last iteration, if chunk_size is not a divisor of the training set size.
# Note: It's faster to process equally sized chunks.
c_size = b_stop - b_start
if c_size < dim_c:
diff_ab_perms = diff_ab_perms[:c_size, :]
a_x2 = a_x2[:c_size]
mat52_base = mat52_base[:c_size]
np.subtract(
np.broadcast_to(r_desc, rj_desc_perms.shape),
rj_desc_perms,
out=diff_ab_perms,
)
norm_ab_perms = sqrt5 * np.linalg.norm(diff_ab_perms, axis=1)
np.exp(-norm_ab_perms * sig_inv, out=mat52_base)
mat52_base *= mat52_base_fact
np.einsum(
'ji,ji->j', diff_ab_perms, rj_d_desc_alpha_perms, out=a_x2
) # colum wise dot product
F += (a_x2 * mat52_base).dot(diff_ab_perms) * diag_scale_fact
mat52_base *= norm_ab_perms + sig
F -= mat52_base.dot(rj_d_desc_alpha_perms)
# Note: Energies are automatically predicted with a flipped sign here (because -E are trained, instead of E)
E_F[0] += a_x2.dot(mat52_base)
# Note: Energies are automatically predicted with a flipped sign here (because -E are trained, instead of E)
if 'alphas_E_lin' in glob:
K_fe = diff_ab_perms * mat52_base[:, None]
F += alphas_E_lin[b_start:b_stop].dot(K_fe)
K_ee = (
1 + (norm_ab_perms * sig_inv) * (1 + norm_ab_perms / (3 * sig))
) * np.exp(-norm_ab_perms * sig_inv)
E_F[0] += K_ee.dot(alphas_E_lin[b_start:b_stop])
b_start = b_stop
out = E_F[: dim_i + 1]
# Descriptor has less entries than 3N, need to extend size of the 'E_F' array.
if dim_d < dim_i:
out = np.empty((dim_i + 1,))
out[0] = E_F[0]
out[1:] = desc_func.vec_dot_d_desc(
r_d_desc,
F,
) # 'r_d_desc.T.dot(F)' for our special representation of 'r_d_desc'
return out
[docs]class GDMLPredict(object):
def __init__(
self,
model,
batch_size=None,
num_workers=None,
max_memory=None,
max_processes=None,
use_torch=False,
log_level=None,
):
"""
Query trained sGDML force fields.
This class is used to load a trained model and make energy and
force predictions for new geometries. GPU support is provided
through PyTorch (requires optional `torch` dependency to be
installed).
Note
----
The parameters `batch_size` and `num_workers` are only
relevant if this code runs on a CPU. Both can be set
automatically via the function `prepare_parallel`.
Note: Running calculations via PyTorch is only
recommended with available GPU hardware. CPU calcuations
are faster with our NumPy implementation.
Parameters
----------
model : :obj:`dict`
Data structure that holds all parameters of the
trained model. This object is the output of
`GDMLTrain.train`
batch_size : int, optional
Chunk size for processing parallel tasks
num_workers : int, optional
Number of parallel workers (in addition to the main
process)
max_memory : int, optional
Limit the max. memory usage [GB]. This is only a
soft limit that can not always be enforced.
max_processes : int, optional
Limit the max. number of processes. Otherwise
all CPU cores are used. This parameters has no
effect if `use_torch=True`
use_torch : boolean, optional
Use PyTorch to calculate predictions
log_level : optional
Set custom logging level (e.g. `logging.CRITICAL`)
"""
global globs
if 'globs' not in globals():
globs = []
# Create a personal global space for this model at a new index
# Note: do not call delete entries in this list, since 'self.glob_id' is
# static. Instead, setting them to None conserves positions while still
# freeing up memory.
globs.append({})
self.glob_id = len(globs) - 1
glob = globs[self.glob_id]
self.log = logging.getLogger(__name__)
if log_level is not None:
self.log.setLevel(log_level)
total_memory = psutil.virtual_memory().total // 2**30 # bytes to GB)
self.max_memory = (
min(max_memory, total_memory) if max_memory is not None else total_memory
)
total_cpus = mp.cpu_count()
self.max_processes = (
min(max_processes, total_cpus) if max_processes is not None else total_cpus
)
if 'type' not in model or not (model['type'] == 'm' or model['type'] == b'm'):
self.log.critical('The provided data structure is not a valid model.')
sys.exit()
self.n_atoms = model['z'].shape[0]
self.desc = Desc(self.n_atoms, max_processes=max_processes)
glob['desc_func'] = self.desc
# Cache for iterative training mode.
self.R_desc = None
self.R_d_desc = None
self.lat_and_inv = (
(model['lattice'], np.linalg.inv(model['lattice']))
if 'lattice' in model
else None
)
self.n_train = model['R_desc'].shape[1]
glob['sig'] = model['sig']
self.std = model['std'] if 'std' in model else 1.0
self.c = model['c']
n_perms = model['perms'].shape[0]
glob['n_perms'] = n_perms
self.tril_perms_lin = model['tril_perms_lin']
self.torch_predict = None
self.use_torch = use_torch
if use_torch:
if not _has_torch:
raise ImportError(
'Optional PyTorch dependency not found! Please run \'pip install sgdml[torch]\' to install it or disable the PyTorch option.'
)
from .torchtools import GDMLTorchPredict
self.torch_predict = GDMLTorchPredict(
model,
self.lat_and_inv,
max_memory=max_memory,
max_processes=max_processes,
log_level=self.log.level,
)
# Enable data parallelism
n_gpu = torch.cuda.device_count()
if n_gpu > 1:
self.torch_predict = torch.nn.DataParallel(self.torch_predict)
# Send model to device
# self.torch_device = 'cuda' if torch.cuda.is_available() else 'cpu'
if _torch_cuda_is_available:
self.torch_device = 'cuda'
elif _torch_mps_is_available:
self.torch_device = 'mps'
else:
self.torch_device = 'cpu'
while True:
try:
self.torch_predict.to(self.torch_device)
except RuntimeError as e:
if 'out of memory' in str(e):
if _torch_cuda_is_available:
torch.cuda.empty_cache()
model = self.torch_predict
if isinstance(self.torch_predict, torch.nn.DataParallel):
model = model.module
if (
model.get_n_perm_batches() == 1
): # model caches the permutations, this could be why it is too large
model.set_n_perm_batches(
model.get_n_perm_batches() + 1
) # uncache
# self.torch_predict.to( # NOTE!
# self.torch_device
# ) # try sending to device again
pass
else:
self.log.critical(
'Not enough memory on device (RAM or GPU memory). There is no hope!'
)
print()
os._exit(1)
else:
raise e
else:
break
else:
# Precompute permuted training descriptors and its first derivatives multiplied with the coefficients.
R_desc_perms = (
np.tile(model['R_desc'].T, n_perms)[:, self.tril_perms_lin]
.reshape(self.n_train, n_perms, -1, order='F')
.reshape(self.n_train * n_perms, -1)
)
glob['R_desc_perms'], glob['R_desc_perms_shape'] = share_array(R_desc_perms)
R_d_desc_alpha_perms = (
np.tile(model['R_d_desc_alpha'], n_perms)[:, self.tril_perms_lin]
.reshape(self.n_train, n_perms, -1, order='F')
.reshape(self.n_train * n_perms, -1)
)
(
glob['R_d_desc_alpha_perms'],
glob['R_d_desc_alpha_perms_shape'],
) = share_array(R_d_desc_alpha_perms)
if 'alphas_E' in model:
alphas_E_lin = np.tile(model['alphas_E'][:, None], (1, n_perms)).ravel()
glob['alphas_E_lin'], glob['alphas_E_lin_shape'] = share_array(
alphas_E_lin
)
# Parallel processing configuration
self.bulk_mp = False # Bulk predictions with multiple processes?
self.pool = None
# How many workers in addition to main process?
num_workers = num_workers or (
self.max_processes - 1
) # exclude main process
self._set_num_workers(num_workers, force_reset=True)
# Size of chunks in which each parallel task will be processed (unit: number of training samples)
# This parameter should be as large as possible, but it depends on the size of available memory.
self._set_chunk_size(batch_size)
def __del__(self):
global globs
try:
self.pool.terminate()
self.pool.join()
self.pool = None
except:
pass
if 'globs' in globals() and globs is not None and self.glob_id < len(globs):
globs[self.glob_id] = None
## Public ##
# def set_R(self, R):
# """
# Store a reference to the training geometries.
# This function is used to avoid unnecessary copies of the
# traininig geometries when evaluation the training error
# (= gradient of the model's loss function).
# This routine is used during iterative model training.
# Parameters
# ----------
# R : :obj:`numpy.ndarray`
# Array containing the geometry for each training point.
# """
# # Add singleton dimension if input is (,3N).
# if R.ndim == 1:
# R = R[None, :]
# self.R = R
# # if self.use_torch:
# # model = self.torch_predict
# # if isinstance(self.torch_predict, torch.nn.DataParallel):
# # model = model.module
# # R_torch = torch.from_numpy(R.reshape(-1, self.n_atoms, 3)).to(self.torch_device)
# # model.set_R(R_torch)
[docs] def set_R_desc(self, R_desc):
"""
Store a reference to the training geometry descriptors.
This can accelerate iterative model training.
Parameters
----------
R_desc : :obj:`numpy.ndarray`, optional
An 2D array of size M x D containing the
descriptors of dimension D for M
molecules.
"""
self.R_desc = R_desc
[docs] def set_R_d_desc(self, R_d_desc):
"""
Store a reference to the training geometry descriptor Jacobians.
This function must be called before `set_alphas()` can be used.
This routine is used during iterative model training.
Parameters
----------
R_d_desc : :obj:`numpy.ndarray`, optional
A 2D array of size M x D x 3N containing of the
descriptor Jacobians for M molecules. The descriptor
has dimension D with 3N partial derivatives with
respect to the 3N Cartesian coordinates of each atom.
"""
self.R_d_desc = R_d_desc
if self.use_torch:
model = self.torch_predict
if isinstance(self.torch_predict, torch.nn.DataParallel):
model = model.module
model.set_R_d_desc(R_d_desc)
[docs] def set_alphas(self, alphas_F, alphas_E=None):
"""
Reconfigure the current model with a new set of regression parameters.
`R_d_desc` needs to be set for this function to work.
This routine is used during iterative model training.
Parameters
----------
alphas_F : :obj:`numpy.ndarray`
1D array containing the new model parameters.
alphas_E : :obj:`numpy.ndarray`, optional
1D array containing the additional new model parameters, if
energy constraints are used in the kernel (`use_E_cstr=True`)
"""
if self.use_torch:
model = self.torch_predict
if isinstance(self.torch_predict, torch.nn.DataParallel):
model = model.module
model.set_alphas(alphas_F, alphas_E=alphas_E)
else:
assert self.R_d_desc is not None
global globs
glob = globs[self.glob_id]
dim_i = self.desc.dim_i
R_d_desc_alpha = self.desc.d_desc_dot_vec(
self.R_d_desc, alphas_F.reshape(-1, dim_i)
)
R_d_desc_alpha_perms_new = np.tile(R_d_desc_alpha, glob['n_perms'])[
:, self.tril_perms_lin
].reshape(self.n_train, glob['n_perms'], -1, order='F')
R_d_desc_alpha_perms = np.frombuffer(glob['R_d_desc_alpha_perms'])
np.copyto(R_d_desc_alpha_perms, R_d_desc_alpha_perms_new.ravel())
if alphas_E is not None:
alphas_E_lin_new = np.tile(
alphas_E[:, None], (1, glob['n_perms'])
).ravel()
alphas_E_lin = np.frombuffer(glob['alphas_E_lin'])
np.copyto(alphas_E_lin, alphas_E_lin_new)
[docs] def _set_num_workers(
self, num_workers=None, force_reset=False
): # TODO: complain if chunk or worker parameters do not fit training data (this causes issues with the caching)!!
"""
Set number of processes to use during prediction.
If bulk_mp == True, each worker handles the whole generation of single prediction (this if for querying multiple geometries at once)
If bulk_mp == False, each worker may handle only a part of a prediction (chunks are defined in 'wkr_starts_stops'). In that scenario multiple proesses
are used to distribute the work of generating a single prediction
This number should not exceed the number of available CPU cores.
Note
----
This parameter can be optimally determined using
`prepare_parallel`.
Parameters
----------
num_workers : int, optional
Number of processes (maximum value is set if `None`).
force_reset : bool, optional
Force applying the new setting.
"""
if force_reset or self.num_workers is not num_workers:
if self.pool is not None:
self.pool.terminate()
self.pool.join()
self.pool = None
self.num_workers = 0
if num_workers is None or num_workers > 0:
self.pool = Pool(num_workers)
self.num_workers = (
self.pool._processes
) # number of actual workers (not max_processes)
# Data ranges for processes
if self.bulk_mp or self.num_workers < 2:
# wkr_starts = [self.n_train]
wkr_starts = [0]
else:
wkr_starts = list(
range(
0,
self.n_train,
int(np.ceil(float(self.n_train) / self.num_workers)),
)
)
wkr_stops = wkr_starts[1:] + [self.n_train]
self.wkr_starts_stops = list(zip(wkr_starts, wkr_stops))
[docs] def _set_chunk_size(self, chunk_size=None):
# TODO: complain if chunk or worker parameters do not fit training data (this causes issues with the caching)!!
"""
Set chunk size for each worker process.
Every prediction is generated as a linear combination of the training
points that the model is comprised of. If multiple workers are available
(and bulk mode is disabled), each one processes an (approximatelly equal)
part of those training points. Then, the chunk size determines how much of
a processes workload is passed to NumPy's underlying low-level routines at
once. If the chunk size is smaller than the number of points the worker is
supposed to process, it processes them in multiple steps using a loop. This
can sometimes be faster, depending on the available hardware.
Note
----
This parameter can be optimally determined using
`prepare_parallel`.
Parameters
----------
chunk_size : int
Chunk size (maximum value is set if `None`).
"""
if chunk_size is None:
chunk_size = self.n_train
self.chunk_size = chunk_size
[docs] def _set_batch_size(self, batch_size=None): # deprecated
"""
Warning
-------
Deprecated! Please use the function `_set_chunk_size` in future projects.
Set chunk size for each worker process. A chunk is a subset
of the training data points whose linear combination needs to
be evaluated in order to generate a prediction.
The chunk size determines how much of a processes workload will
be passed to Python's underlying low-level routines at once.
This parameter is highly hardware dependent.
Note
----
This parameter can be optimally determined using
`prepare_parallel`.
Parameters
----------
batch_size : int
Chunk size (maximum value is set if `None`).
"""
self._set_chunk_size(batch_size)
[docs] def _set_bulk_mp(self, bulk_mp=False):
"""
Toggles bulk prediction mode.
If bulk prediction is enabled, the prediction is parallelized accross
input geometries, i.e. each worker generates the complete prediction for
one query. Otherwise (depending on the number of available CPU cores) the
input geometries are process sequentially, but every one of them may be
processed by multiple workers at once (in chunks).
Note
----
This parameter can be optimally determined using
`prepare_parallel`.
Parameters
----------
bulk_mp : bool, optional
Enable or disable bulk prediction mode.
"""
bulk_mp = bool(bulk_mp)
if self.bulk_mp is not bulk_mp:
self.bulk_mp = bulk_mp
# Reset data ranges for processes stored in 'wkr_starts_stops'
self._set_num_workers(self.num_workers)
[docs] def set_opt_num_workers_and_batch_size_fast(self, n_bulk=1, n_reps=1): # deprecated
"""
Warning
-------
Deprecated! Please use the function `prepare_parallel` in future projects.
Parameters
----------
n_bulk : int, optional
Number of geometries that will be passed to the
`predict` function in each call (performance
will be optimized for that exact use case).
n_reps : int, optional
Number of repetitions (bigger value: more
accurate, but also slower).
Returns
-------
int
Force and energy prediciton speed in geometries
per second.
"""
self.prepare_parallel(n_bulk, n_reps)
[docs] def prepare_parallel(
self, n_bulk=1, n_reps=1, return_is_from_cache=False
): # noqa: C901
"""
Find and set the optimal parallelization parameters for the
currently loaded model, running on a particular system. The result
also depends on the number of geometries `n_bulk` that will be
passed at once when calling the `predict` function.
This function runs a benchmark in which the prediction routine is
repeatedly called `n_reps`-times (default: 1) with varying parameter
configurations, while the runtime is measured for each one. The
optimal parameters are then cached for fast retrival in future
calls of this function.
We recommend calling this function after initialization of this
class, as it will drastically increase the performance of the
`predict` function.
Note
----
Depending on the parameter `n_reps`, this routine may take
some seconds/minutes to complete. However, once a
statistically significant number of benchmark results has
been gathered for a particular configuration, it starts
returning almost instantly.
Parameters
----------
n_bulk : int, optional
Number of geometries that will be passed to the
`predict` function in each call (performance
will be optimized for that exact use case).
n_reps : int, optional
Number of repetitions (bigger value: more
accurate, but also slower).
return_is_from_cache : bool, optional
If enabled, this function returns a second value
indicating if the returned results were obtained
from cache.
Returns
-------
int
Force and energy prediciton speed in geometries
per second.
boolean, optional
Return, whether this function obtained the results
from cache.
"""
# global globs
# glob = globs[self.glob_id]
# n_perms = glob['n_perms']
# No benchmarking necessary if prediction is running on GPUs.
if self.use_torch:
self.log.info(
'Skipping multi-CPU benchmark, since torch is enabled.'
) # TODO: clarity!
return
# Retrieve cached benchmark results, if available.
bmark_result = self._load_cached_bmark_result(n_bulk)
if bmark_result is not None:
num_workers, chunk_size, bulk_mp, gps = bmark_result
self._set_chunk_size(chunk_size)
self._set_num_workers(num_workers)
self._set_bulk_mp(bulk_mp)
if return_is_from_cache:
is_from_cache = True
return gps, is_from_cache
else:
return gps
warm_up_done = False
best_results = []
last_i = None
best_gps = 0
gps_min = 0.0
best_params = None
r_dummy = np.random.rand(n_bulk, self.n_atoms * 3)
def _dummy_predict():
self.predict(r_dummy)
bulk_mp_rng = [True, False] if n_bulk > 1 else [False]
for bulk_mp in bulk_mp_rng:
self._set_bulk_mp(bulk_mp)
if bulk_mp is False:
last_i = 0
num_workers_rng = list(range(0, self.max_processes))
if bulk_mp:
num_workers_rng.reverse() # benchmark converges faster this way
# num_workers_rng_sizes = [batch_size for batch_size in batch_size_rng if min_batch_size % batch_size == 0]
# for num_workers in range(min_num_workers,self.max_processes+1):
for num_workers in num_workers_rng:
if not bulk_mp and num_workers != 0 and self.n_train % num_workers != 0:
continue
self._set_num_workers(num_workers)
best_gps = 0
gps_rng = (np.inf, 0.0) # min and max per num_workers
min_chunk_size = (
min(self.n_train, n_bulk)
if bulk_mp or num_workers < 2
else int(np.ceil(self.n_train / num_workers))
)
chunk_size_rng = list(range(min_chunk_size, 0, -1))
chunk_size_rng_sizes = [
chunk_size
for chunk_size in chunk_size_rng
if min_chunk_size % chunk_size == 0
]
# print('batch_size_rng_sizes ' + str(bulk_mp))
# print(batch_size_rng_sizes)
i_done = 0
i_dir = 1
i = 0 if last_i is None else last_i
# i = 0
# print(batch_size_rng_sizes)
while i >= 0 and i < len(chunk_size_rng_sizes):
chunk_size = chunk_size_rng_sizes[i]
self._set_chunk_size(chunk_size)
i_done += 1
if warm_up_done == False:
timeit.timeit(_dummy_predict, number=10)
warm_up_done = True
gps = n_bulk * n_reps / timeit.timeit(_dummy_predict, number=n_reps)
# print(
# '{:2d}@{:d} {:d} | {:7.2f} gps'.format(
# num_workers, chunk_size, bulk_mp, gps
# )
# )
gps_rng = (
min(gps_rng[0], gps),
max(gps_rng[1], gps),
) # min and max per num_workers
# gps_min_max = min(gps_min_max[0], gps), max(gps_min_max[1], gps)
# print(' best_gps ' + str(best_gps))
# NEW
# if gps > best_gps and gps > gps_min: # gps is still going up, everything is good
# best_gps = gps
# best_params = num_workers, batch_size, bulk_mp
# else:
# break
# if gps > best_gps: # gps is still going up, everything is good
# best_gps = gps
# best_params = num_workers, batch_size, bulk_mp
# else: # gps did not go up wrt. to previous step
# # can we switch the search direction?
# # did we already?
# # we checked two consecutive configurations
# # are bigger batch sizes possible?
# print(batch_size_rng_sizes)
# turn_search_dir = i_dir > 0 and i_done == 2 and batch_size != batch_size_rng_sizes[1]
# # only turn, if the current gps is not lower than the lowest overall
# if turn_search_dir and gps >= gps_min:
# i -= 2 * i_dir
# i_dir = -1
# print('><')
# continue
# else:
# print('>>break ' + str(i_done))
# break
# NEW
# gps still going up?
# AND: gps not lower than the lowest overall?
# if gps < best_gps and gps >= gps_min:
if gps < best_gps:
if (
i_dir > 0
and i_done == 2
and chunk_size
!= chunk_size_rng_sizes[
1
] # there is no point in turning if this is the second batch size in the range
): # do we turn?
i -= 2 * i_dir
i_dir = -1
# print('><')
continue
else:
if chunk_size == chunk_size_rng_sizes[1]:
i -= 1 * i_dir
# print('>>break ' + str(i_done))
break
else:
best_gps = gps
best_params = num_workers, chunk_size, bulk_mp
if (
not bulk_mp and n_bulk > 1
): # stop search early when multiple cpus are available and the 1 cpu case is tested
if (
gps < gps_min
): # if the batch size run is lower than the lowest overall, stop right here
# print('breaking here')
break
i += 1 * i_dir
last_i = i - 1 * i_dir
i_dir = 1
if len(best_results) > 0:
overall_best_gps = max(best_results, key=lambda x: x[1])[1]
if best_gps < overall_best_gps:
# print('breaking, because best of last test was worse than overall best so far')
break
# if best_gps < gps_min:
# print('breaking here3')
# break
gps_min = gps_rng[0] # FIX me: is this the overall min?
# print ('gps_min ' + str(gps_min))
# print ('best_gps')
# print (best_gps)
best_results.append(
(best_params, best_gps)
) # best results per num_workers
(num_workers, chunk_size, bulk_mp), gps = max(best_results, key=lambda x: x[1])
# Cache benchmark results.
self._save_cached_bmark_result(n_bulk, num_workers, chunk_size, bulk_mp, gps)
self._set_chunk_size(chunk_size)
self._set_num_workers(num_workers)
self._set_bulk_mp(bulk_mp)
if return_is_from_cache:
is_from_cache = False
return gps, is_from_cache
else:
return gps
def _save_cached_bmark_result(self, n_bulk, num_workers, chunk_size, bulk_mp, gps):
pkg_dir = os.path.dirname(os.path.abspath(__file__))
bmark_file = '_bmark_cache.npz'
bmark_path = os.path.join(pkg_dir, bmark_file)
bkey = '{}-{}-{}-{}'.format(
self.n_atoms, self.n_train, n_bulk, self.max_processes
)
if os.path.exists(bmark_path):
with np.load(bmark_path, allow_pickle=True) as bmark:
bmark = dict(bmark)
bmark['runs'] = np.append(bmark['runs'], bkey)
bmark['num_workers'] = np.append(bmark['num_workers'], num_workers)
bmark['batch_size'] = np.append(bmark['batch_size'], chunk_size)
bmark['bulk_mp'] = np.append(bmark['bulk_mp'], bulk_mp)
bmark['gps'] = np.append(bmark['gps'], gps)
else:
bmark = {
'code_version': __version__,
'runs': [bkey],
'gps': [gps],
'num_workers': [num_workers],
'batch_size': [chunk_size],
'bulk_mp': [bulk_mp],
}
np.savez_compressed(bmark_path, **bmark)
def _load_cached_bmark_result(self, n_bulk):
pkg_dir = os.path.dirname(os.path.abspath(__file__))
bmark_file = '_bmark_cache.npz'
bmark_path = os.path.join(pkg_dir, bmark_file)
bkey = '{}-{}-{}-{}'.format(
self.n_atoms, self.n_train, n_bulk, self.max_processes
)
if not os.path.exists(bmark_path):
return None
with np.load(bmark_path, allow_pickle=True) as bmark:
# Keep collecting benchmark runs, until we have at least three.
run_idxs = np.where(bmark['runs'] == bkey)[0]
if len(run_idxs) >= 3:
config_keys = []
for run_idx in run_idxs:
config_keys.append(
'{}-{}-{}'.format(
bmark['num_workers'][run_idx],
bmark['batch_size'][run_idx],
bmark['bulk_mp'][run_idx],
)
)
values, uinverse = np.unique(config_keys, return_index=True)
best_mean = -1
best_gps = 0
for i, config_key in enumerate(zip(values, uinverse)):
mean_gps = np.mean(
bmark['gps'][
np.where(np.array(config_keys) == config_key[0])[0]
]
)
if best_gps == 0 or best_gps < mean_gps:
best_mean = i
best_gps = mean_gps
best_idx = run_idxs[uinverse[best_mean]]
num_workers = bmark['num_workers'][best_idx]
chunk_size = bmark['batch_size'][best_idx]
bulk_mp = bmark['bulk_mp'][best_idx]
return num_workers, chunk_size, bulk_mp, best_gps
return None
[docs] def get_GPU_batch(self):
"""
Get batch size used by the GPU implementation to process bulk
predictions (predictions for multiple input geometries at once).
This value is determined on-the-fly depending on the available GPU
memory.
"""
if self.use_torch:
model = self.torch_predict
if isinstance(model, torch.nn.DataParallel):
model = model.module
return model._batch_size()
[docs] def predict(self, R=None, return_E=True):
"""
Predict energy and forces for multiple geometries. This function
can run on the GPU, if the optional PyTorch dependency is
installed and `use_torch=True` was speciefied during
initialization of this class.
Optionally, the descriptors and descriptor Jacobians for the
same geometries can be provided, if already available from some
previous calculations.
Note
----
The order of the atoms in `R` is not arbitrary and must
be the same as used for training the model.
Parameters
----------
R : :obj:`numpy.ndarray`, optional
An 2D array of size M x 3N containing the
Cartesian coordinates of each atom of M
molecules. If this parameter is ommited, the training
error is returned. Note that the training geometries
need to be set right after initialization using
`set_R()` for this to work.
return_E : boolean, optional
If false (default: true), only the forces are returned.
Returns
-------
:obj:`numpy.ndarray`
Energies stored in an 1D array of size M (unless `return_E == False`)
:obj:`numpy.ndarray`
Forces stored in an 2D arry of size M x 3N.
"""
# Add singleton dimension if input is (,3N).
if R is not None and R.ndim == 1:
R = R[None, :]
if self.use_torch: # multi-GPU (or CPU if no GPUs are available)
R_torch = torch.arange(self.n_train)
if R is None:
if self.R_d_desc is None:
self.log.critical(
'A reference to the training geometry descriptors needs to be set (using \'set_R_d_desc()\') for this function to work without arguments (using PyTorch).'
)
print()
os._exit(1)
else:
R_torch = (
torch.from_numpy(R.reshape(-1, self.n_atoms, 3))
.type(torch.float32)
.to(self.torch_device)
)
model = self.torch_predict
if R_torch.shape[0] < torch.cuda.device_count() and isinstance(
model, torch.nn.DataParallel
):
model = self.torch_predict.module
E_torch_F_torch = model.forward(R_torch, return_E=return_E)
if return_E:
E_torch, F_torch = E_torch_F_torch
E = E_torch.cpu().numpy()
else:
(F_torch,) = E_torch_F_torch
F = F_torch.cpu().numpy().reshape(-1, 3 * self.n_atoms)
else: # multi-CPU
# Use precomputed descriptors in training mode.
is_desc_in_cache = self.R_desc is not None and self.R_d_desc is not None
if R is None and not is_desc_in_cache:
self.log.critical(
'A reference to the training geometry descriptors and Jacobians needs to be set for this function to work without arguments.'
)
print()
os._exit(1)
assert is_desc_in_cache or R is not None
dim_i = 3 * self.n_atoms
n_pred = self.R_desc.shape[0] if R is None else R.shape[0]
E_F = np.empty((n_pred, dim_i + 1))
if (
self.bulk_mp and self.num_workers > 0
): # One whole prediction per worker (and multiple workers).
_predict_wo_r_or_desc = partial(
_predict_wkr,
lat_and_inv=self.lat_and_inv,
glob_id=self.glob_id,
wkr_start_stop=None,
chunk_size=self.chunk_size,
)
for i, e_f in enumerate(
self.pool.imap(
partial(_predict_wo_r_or_desc, None)
if is_desc_in_cache
else partial(_predict_wo_r_or_desc, r_desc_d_desc=None),
zip(self.R_desc, self.R_d_desc) if is_desc_in_cache else R,
)
):
E_F[i, :] = e_f
else: # Multiple workers per prediction (or just one worker).
for i in range(n_pred):
if is_desc_in_cache:
r_desc, r_d_desc = self.R_desc[i], self.R_d_desc[i]
else:
r_desc, r_d_desc = self.desc.from_R(R[i], self.lat_and_inv)
_predict_wo_wkr_starts_stops = partial(
_predict_wkr,
None,
(r_desc, r_d_desc),
self.lat_and_inv,
self.glob_id,
chunk_size=self.chunk_size,
)
if self.num_workers == 0:
E_F[i, :] = _predict_wo_wkr_starts_stops()
else:
E_F[i, :] = sum(
self.pool.imap_unordered(
_predict_wo_wkr_starts_stops, self.wkr_starts_stops
)
)
E_F *= self.std
F = E_F[:, 1:]
E = E_F[:, 0] + self.c
ret = (F,)
if return_E:
ret = (E,) + ret
return ret