from __future__ import annotations
import functools
import itertools
import random
from collections import OrderedDict
from collections.abc import Iterable
from copy import deepcopy
import numpy as np
import scipy.spatial
from scipy import interpolate
from sortedcontainers import SortedKeyList
from adaptive.learner.base_learner import BaseLearner, uses_nth_neighbors
from adaptive.learner.triangulation import (
Triangulation,
circumsphere,
fast_det,
point_in_simplex,
simplex_volume_in_embedding,
)
from adaptive.notebook_integration import ensure_holoviews, ensure_plotly
from adaptive.utils import (
assign_defaults,
cache_latest,
partial_function_from_dataframe,
restore,
)
try:
import pandas
with_pandas = True
except ModuleNotFoundError:
with_pandas = False
def to_list(inp):
if isinstance(inp, Iterable):
return list(inp)
return [inp]
def volume(simplex, ys=None):
# Notice the parameter ys is there so you can use this volume method as
# as loss function
matrix = np.subtract(simplex[:-1], simplex[-1], dtype=float)
# See https://www.jstor.org/stable/2315353
dim = len(simplex) - 1
vol = np.abs(fast_det(matrix)) / np.math.factorial(dim)
return vol
def orientation(simplex):
matrix = np.subtract(simplex[:-1], simplex[-1])
# See https://www.jstor.org/stable/2315353
sign, _logdet = np.linalg.slogdet(matrix)
return sign
[docs]def std_loss(simplex, values, value_scale):
"""
Computes the loss of the simplex based on the standard deviation.
Parameters
----------
simplex : list of tuples
Each entry is one point of the simplex.
values : list of values
The scaled function values of each of the simplex points.
value_scale : float
The scale of values, where ``values = function_values * value_scale``.
Returns
-------
loss : float
"""
r = np.linalg.norm(np.std(values, axis=0))
vol = volume(simplex)
dim = len(simplex) - 1
return r.flat * np.power(vol, 1.0 / dim) + vol
[docs]def default_loss(simplex, values, value_scale):
"""
Computes the average of the volumes of the simplex.
Parameters
----------
simplex : list of tuples
Each entry is one point of the simplex.
values : list of values
The scaled function values of each of the simplex points.
value_scale : float
The scale of values, where ``values = function_values * value_scale``.
Returns
-------
loss : float
"""
if isinstance(values[0], Iterable):
pts = [(*x, *y) for x, y in zip(simplex, values)]
else:
pts = [(*x, y) for x, y in zip(simplex, values)]
return simplex_volume_in_embedding(pts)
@uses_nth_neighbors(1)
def triangle_loss(simplex, values, value_scale, neighbors, neighbor_values):
"""
Computes the average of the volumes of the simplex combined with each
neighbouring point.
Parameters
----------
simplex : list of tuples
Each entry is one point of the simplex.
values : list of values
The scaled function values of each of the simplex points.
value_scale : float
The scale of values, where ``values = function_values * value_scale``.
neighbors : list of tuples
The neighboring points of the simplex, ordered such that simplex[0]
exactly opposes neighbors[0], etc.
neighbor_values : list of values
The function values for each of the neighboring points.
Returns
-------
loss : float
"""
neighbors = [n for n in neighbors if n is not None]
neighbor_values = [v for v in neighbor_values if v is not None]
if len(neighbors) == 0:
return 0
s = [(*x, *to_list(y)) for x, y in zip(simplex, values)]
n = [(*x, *to_list(y)) for x, y in zip(neighbors, neighbor_values)]
return sum(simplex_volume_in_embedding([*s, neighbor]) for neighbor in n) / len(
neighbors
)
def curvature_loss_function(exploration=0.05):
# XXX: add doc-string!
@uses_nth_neighbors(1)
def curvature_loss(simplex, values, value_scale, neighbors, neighbor_values):
"""Compute the curvature loss of a simplex.
Parameters
----------
simplex : list of tuples
Each entry is one point of the simplex.
values : list of values
The scaled function values of each of the simplex points.
value_scale : float
The scale of values, where ``values = function_values * value_scale``.
neighbors : list of tuples
The neighboring points of the simplex, ordered such that simplex[0]
exactly opposes neighbors[0], etc.
neighbor_values : list of values
The scaled function values for each of the neighboring points.
Returns
-------
loss : float
"""
dim = len(simplex[0]) # the number of coordinates
loss_input_volume = volume(simplex)
loss_curvature = triangle_loss(
simplex, values, value_scale, neighbors, neighbor_values
)
return (
loss_curvature + exploration * loss_input_volume ** ((2 + dim) / dim)
) ** (1 / (2 + dim))
return curvature_loss
def choose_point_in_simplex(simplex, transform=None):
"""Choose a new point in inside a simplex.
Pick the center of the simplex if the shape is nice (that is, the
circumcenter lies within the simplex). Otherwise take the middle of the
longest edge.
Parameters
----------
simplex : numpy array
The coordinates of a triangle with shape (N+1, N).
transform : N*N matrix
The multiplication to apply to the simplex before choosing
the new point.
Returns
-------
point : numpy array of length N
The coordinates of the suggested new point.
"""
if transform is not None:
simplex = np.dot(simplex, transform)
# choose center if and only if the shape of the simplex is nice,
# otherwise: the center the longest edge
center, _radius = circumsphere(simplex)
if point_in_simplex(center, simplex):
point = np.average(simplex, axis=0)
else:
distances = scipy.spatial.distance.pdist(simplex)
distance_matrix = scipy.spatial.distance.squareform(distances)
i, j = np.unravel_index(np.argmax(distance_matrix), distance_matrix.shape)
point = (simplex[i, :] + simplex[j, :]) / 2
if transform is not None:
point = np.linalg.solve(transform, point) # undo the transform
return point
def _simplex_evaluation_priority(key):
# We round the loss to 8 digits such that losses
# are equal up to numerical precision will be considered
# to be equal. This is needed because we want the learner
# to behave in a deterministic fashion.
loss, simplex, subsimplex = key
return -round(loss, ndigits=8), simplex, subsimplex or (0,)
[docs]class LearnerND(BaseLearner):
"""Learns and predicts a function 'f: โ^N โ โ^M'.
Parameters
----------
func: callable
The function to learn. Must take a tuple of N real
parameters and return a real number or an arraylike of length M.
bounds : list of 2-tuples or `scipy.spatial.ConvexHull`
A list ``[(a_1, b_1), (a_2, b_2), ..., (a_n, b_n)]`` containing bounds,
one pair per dimension.
Or a ConvexHull that defines the boundary of the domain.
loss_per_simplex : callable, optional
A function that returns the loss for a simplex.
If not provided, then a default is used, which uses
the deviation from a linear estimate, as well as
triangle area, to determine the loss.
Attributes
----------
data : dict
Sampled points and values.
points : numpy array
Coordinates of the currently known points
values : numpy array
The values of each of the known points
pending_points : set
Points that still have to be evaluated.
Notes
-----
The sample points are chosen by estimating the point where the
gradient is maximal. This is based on the currently known points.
In practice, this sampling protocol results to sparser sampling of
flat regions, and denser sampling of regions where the function
has a high gradient, which is useful if the function is expensive to
compute.
This sampling procedure is not fast, so to benefit from
it, your function needs to be slow enough to compute.
This class keeps track of all known points. It triangulates these points
and with every simplex it associates a loss. Then if you request points
that you will compute in the future, it will subtriangulate a real simplex
with the pending points inside it and distribute the loss among it's
children based on volume.
"""
def __init__(self, func, bounds, loss_per_simplex=None):
self._vdim = None
self.loss_per_simplex = loss_per_simplex or default_loss
if hasattr(self.loss_per_simplex, "nth_neighbors"):
if self.loss_per_simplex.nth_neighbors > 1:
raise NotImplementedError(
"The provided loss function wants "
"next-nearest neighboring simplices for the loss computation, "
"this feature is not yet implemented, either use "
"nth_neightbors = 0 or 1"
)
self.nth_neighbors = self.loss_per_simplex.nth_neighbors
else:
self.nth_neighbors = 0
self.data = OrderedDict()
self.pending_points = set()
self.bounds = bounds
if isinstance(bounds, scipy.spatial.ConvexHull):
hull_points = bounds.points[bounds.vertices]
self._bounds_points = sorted(map(tuple, hull_points))
self._bbox = tuple(zip(hull_points.min(axis=0), hull_points.max(axis=0)))
self._interior = scipy.spatial.Delaunay(self._bounds_points)
else:
self._bounds_points = sorted(map(tuple, itertools.product(*bounds)))
self._bbox = tuple(tuple(map(float, b)) for b in bounds)
self._interior = None
self.ndim = len(self._bbox)
self.function = func
self._tri = None
self._losses = {}
self._pending_to_simplex = {} # vertex โ simplex
# triangulation of the pending points inside a specific simplex
self._subtriangulations = {} # simplex โ triangulation
# scale to unit hypercube
# for the input
self._transform = np.linalg.inv(np.diag(np.diff(self._bbox).flat))
# for the output
self._min_value = None
self._max_value = None
self._old_scale = None
self._output_multiplier = (
1 # If we do not know anything, do not scale the values
)
self._recompute_losses_factor = 1.1
# create a private random number generator with fixed seed
self._random = random.Random(1)
# all real triangles that have not been subdivided and the pending
# triangles heap of tuples (-loss, real simplex, sub_simplex or None)
# _simplex_queue is a heap of tuples (-loss, real_simplex, sub_simplex)
# It contains all real and pending simplices except for real simplices
# that have been subdivided.
# _simplex_queue may contain simplices that have been deleted, this is
# because deleting those items from the heap is an expensive operation,
# so when popping an item, you should check that the simplex that has
# been returned has not been deleted. This checking is done by
# _pop_highest_existing_simplex
self._simplex_queue = SortedKeyList(key=_simplex_evaluation_priority)
[docs] def new(self) -> LearnerND:
"""Create a new learner with the same function and bounds."""
return LearnerND(self.function, self.bounds, self.loss_per_simplex)
@property
def npoints(self):
"""Number of evaluated points."""
return len(self.data)
@property
def vdim(self):
"""Length of the output of ``learner.function``.
If the output is unsized (when it's a scalar)
then `vdim = 1`.
As long as no data is known `vdim = 1`.
"""
if self._vdim is None and self.data:
try:
value = next(iter(self.data.values()))
self._vdim = len(value)
except TypeError:
self._vdim = 1
return self._vdim if self._vdim is not None else 1
[docs] def to_numpy(self):
"""Data as NumPy array of size ``(npoints, dim+vdim)``, where ``dim`` is the
size of the input dimension and ``vdim`` is the length of the return value
of ``learner.function``."""
return np.array([(*p, *np.atleast_1d(v)) for p, v in sorted(self.data.items())])
[docs] def to_dataframe( # type: ignore[override]
self,
with_default_function_args: bool = True,
function_prefix: str = "function.",
point_names: tuple[str, ...] = ("x", "y", "z"),
value_name: str = "value",
) -> pandas.DataFrame:
"""Return the data as a `pandas.DataFrame`.
Parameters
----------
with_default_function_args : bool, optional
Include the ``learner.function``'s default arguments as a
column, by default True
function_prefix : str, optional
Prefix to the ``learner.function``'s default arguments' names,
by default "function."
point_names : tuple[str, ...], optional
Names of the input points, should be the same length as number
of input parameters by default ("x", "y", "z" )
value_name : str, optional
Name of the output value, by default "value"
Returns
-------
pandas.DataFrame
Raises
------
ImportError
If `pandas` is not installed.
"""
if not with_pandas:
raise ImportError("pandas is not installed.")
if len(point_names) != self.ndim:
raise ValueError(
f"point_names ({point_names}) should have the"
f" same length as learner.ndims ({self.ndim})"
)
data = [(*x, y) for x, y in self.data.items()]
df = pandas.DataFrame(data, columns=[*point_names, value_name])
df.attrs["inputs"] = list(point_names)
df.attrs["output"] = value_name
if with_default_function_args:
assign_defaults(self.function, df, function_prefix)
return df
[docs] def load_dataframe( # type: ignore[override]
self,
df: pandas.DataFrame,
with_default_function_args: bool = True,
function_prefix: str = "function.",
point_names: tuple[str, ...] = ("x", "y", "z"),
value_name: str = "value",
):
"""Load data from a `pandas.DataFrame`.
If ``with_default_function_args`` is True, then ``learner.function``'s
default arguments are set (using `functools.partial`) from the values
in the `pandas.DataFrame`.
Parameters
----------
df : pandas.DataFrame
The data to load.
with_default_function_args : bool, optional
The ``with_default_function_args`` used in ``to_dataframe()``,
by default True
function_prefix : str, optional
The ``function_prefix`` used in ``to_dataframe``, by default "function."
point_names : str, optional
The ``point_names`` used in ``to_dataframe``, by default ("x", "y", "z")
value_name : str, optional
The ``value_name`` used in ``to_dataframe``, by default "value"
"""
self.tell_many(df[list(point_names)].values, df[value_name].values)
if with_default_function_args:
self.function = partial_function_from_dataframe(
self.function, df, function_prefix
)
@property
def bounds_are_done(self):
return all(p in self.data for p in self._bounds_points)
def _ip(self):
"""A `scipy.interpolate.LinearNDInterpolator` instance
containing the learner's data."""
# XXX: take our own triangulation into account when generating the _ip
return interpolate.LinearNDInterpolator(self.points, self.values)
@property
def tri(self):
"""An `adaptive.learner.triangulation.Triangulation` instance
with all the points of the learner."""
if self._tri is not None:
return self._tri
try:
self._tri = Triangulation(self.points)
except ValueError:
# A ValueError is raised if we do not have enough points or
# the provided points are coplanar, so we need more points to
# create a valid triangulation
return None
self._update_losses(set(), self._tri.simplices)
return self._tri
@property
def values(self):
"""Get the values from `data` as a numpy array."""
return np.array(list(self.data.values()), dtype=float)
@property
def points(self):
"""Get the points from `data` as a numpy array."""
return np.array(list(self.data.keys()), dtype=float)
[docs] def tell(self, point, value):
point = tuple(point)
if point in self.data:
return # we already know about the point
if value is None:
return self.tell_pending(point)
self.pending_points.discard(point)
tri = self.tri
self.data[point] = value
if not self.inside_bounds(point):
return
self._update_range(value)
if tri is not None:
simplex = self._pending_to_simplex.get(point)
if simplex is not None and not self._simplex_exists(simplex):
simplex = None
to_delete, to_add = tri.add_point(point, simplex, transform=self._transform)
self._update_losses(to_delete, to_add)
def _simplex_exists(self, simplex):
simplex = tuple(sorted(simplex))
return simplex in self.tri.simplices
[docs] def inside_bounds(self, point):
"""Check whether a point is inside the bounds."""
if self._interior is not None:
return self._interior.find_simplex(point, tol=1e-8) >= 0
else:
eps = 1e-8
return all(
(mn - eps) <= p <= (mx + eps) for p, (mn, mx) in zip(point, self._bbox)
)
[docs] def tell_pending(self, point, *, simplex=None):
point = tuple(point)
if not self.inside_bounds(point):
return
self.pending_points.add(point)
if self.tri is None:
return
simplex = tuple(simplex or self.tri.locate_point(point))
if not simplex:
return
# Simplex is None if pending point is outside the triangulation,
# then you do not have subtriangles
simplex = tuple(simplex)
simplices = [self.tri.vertex_to_simplices[i] for i in simplex]
neighbors = set.union(*simplices)
# Neighbours also includes the simplex itself
for simpl in neighbors:
_, to_add = self._try_adding_pending_point_to_simplex(point, simpl)
if to_add is None:
continue
self._update_subsimplex_losses(simpl, to_add)
def _try_adding_pending_point_to_simplex(self, point, simplex):
# try to insert it
if not self.tri.point_in_simplex(point, simplex):
return None, None
if simplex not in self._subtriangulations:
vertices = self.tri.get_vertices(simplex)
self._subtriangulations[simplex] = Triangulation(vertices)
self._pending_to_simplex[point] = simplex
return self._subtriangulations[simplex].add_point(point)
def _update_subsimplex_losses(self, simplex, new_subsimplices):
loss = self._losses[simplex]
loss_density = loss / self.tri.volume(simplex)
subtriangulation = self._subtriangulations[simplex]
for subsimplex in new_subsimplices:
subloss = subtriangulation.volume(subsimplex) * loss_density
self._simplex_queue.add((subloss, simplex, subsimplex))
def _ask_and_tell_pending(self, n=1):
xs, losses = zip(*(self._ask() for _ in range(n)))
return list(xs), list(losses)
[docs] def ask(self, n, tell_pending=True):
"""Chose points for learners."""
if not tell_pending:
with restore(self):
return self._ask_and_tell_pending(n)
else:
return self._ask_and_tell_pending(n)
def _ask_bound_point(self):
# get the next bound point that is still available
new_point = next(
p
for p in self._bounds_points
if p not in self.data and p not in self.pending_points
)
self.tell_pending(new_point)
return new_point, np.inf
def _ask_point_without_known_simplices(self):
assert not self._bounds_available
# pick a random point inside the bounds
# XXX: change this into picking a point based on volume loss
a = np.diff(self._bbox).flat
b = np.array(self._bbox)[:, 0]
p = None
while p is None or not self.inside_bounds(p):
r = np.array([self._random.random() for _ in range(self.ndim)])
p = r * a + b
p = tuple(p)
self.tell_pending(p)
return p, np.inf
def _pop_highest_existing_simplex(self):
# find the simplex with the highest loss, we do need to check that the
# simplex hasn't been deleted yet
while len(self._simplex_queue):
# XXX: Need to add check that the loss is the most recent computed loss
loss, simplex, subsimplex = self._simplex_queue.pop(0)
if (
subsimplex is None
and simplex in self.tri.simplices
and simplex not in self._subtriangulations
):
return abs(loss), simplex, subsimplex
if (
simplex in self._subtriangulations
and simplex in self.tri.simplices
and subsimplex in self._subtriangulations[simplex].simplices
):
return abs(loss), simplex, subsimplex
# Could not find a simplex, this code should never be reached
assert self.tri is not None
raise AssertionError(
"Could not find a simplex to subdivide. Yet there should always"
" be a simplex available if LearnerND.tri() is not None."
)
def _ask_best_point(self):
assert self.tri is not None
loss, simplex, subsimplex = self._pop_highest_existing_simplex()
if subsimplex is None:
# We found a real simplex and want to subdivide it
points = self.tri.get_vertices(simplex)
else:
# We found a pending simplex and want to subdivide it
subtri = self._subtriangulations[simplex]
points = subtri.get_vertices(subsimplex)
point_new = tuple(choose_point_in_simplex(points, transform=self._transform))
self._pending_to_simplex[point_new] = simplex
self.tell_pending(point_new, simplex=simplex) # O(??)
return point_new, loss
@property
def _bounds_available(self):
return any(
(p not in self.pending_points and p not in self.data)
for p in self._bounds_points
)
def _ask(self):
if self._bounds_available:
return self._ask_bound_point() # O(1)
if self.tri is None:
# All bound points are pending or have been evaluated, but we do not
# have enough evaluated points to construct a triangulation, so we
# pick a random point
return self._ask_point_without_known_simplices() # O(1)
return self._ask_best_point() # O(log N)
def _compute_loss(self, simplex):
# get the loss
vertices = self.tri.get_vertices(simplex)
values = [self.data[tuple(v)] for v in vertices]
# scale them to a cube with sides 1
vertices = vertices @ self._transform
values = self._output_multiplier * np.array(values)
if self.nth_neighbors == 0:
# compute the loss on the scaled simplex
return float(
self.loss_per_simplex(vertices, values, self._output_multiplier)
)
# We do need the neighbors
neighbors = self.tri.get_opposing_vertices(simplex)
neighbor_points = self.tri.get_vertices(neighbors)
neighbor_values = [self.data.get(x, None) for x in neighbor_points]
for i, point in enumerate(neighbor_points):
if point is not None:
neighbor_points[i] = point @ self._transform
for i, value in enumerate(neighbor_values):
if value is not None:
neighbor_values[i] = self._output_multiplier * value
return float(
self.loss_per_simplex(
vertices,
values,
self._output_multiplier,
neighbor_points,
neighbor_values,
)
)
def _update_losses(self, to_delete: set, to_add: set):
# XXX: add the points outside the triangulation to this as well
pending_points_unbound = set()
for simplex in to_delete:
loss = self._losses.pop(simplex, None)
subtri = self._subtriangulations.pop(simplex, None)
if subtri is not None:
pending_points_unbound.update(subtri.vertices)
pending_points_unbound = {
p for p in pending_points_unbound if p not in self.data
}
for simplex in to_add:
loss = self._compute_loss(simplex)
self._losses[simplex] = loss
for p in pending_points_unbound:
self._try_adding_pending_point_to_simplex(p, simplex)
if simplex not in self._subtriangulations:
self._simplex_queue.add((loss, simplex, None))
continue
self._update_subsimplex_losses(
simplex, self._subtriangulations[simplex].simplices
)
if self.nth_neighbors:
points_of_added_simplices = set.union(*[set(s) for s in to_add])
neighbors = (
self.tri.get_simplices_attached_to_points(points_of_added_simplices)
- to_add
)
for simplex in neighbors:
loss = self._compute_loss(simplex)
self._losses[simplex] = loss
if simplex not in self._subtriangulations:
self._simplex_queue.add((loss, simplex, None))
continue
self._update_subsimplex_losses(
simplex, self._subtriangulations[simplex].simplices
)
def _recompute_all_losses(self):
"""Recompute all losses and pending losses."""
# amortized O(N) complexity
if self.tri is None:
return
# reset the _simplex_queue
self._simplex_queue = SortedKeyList(key=_simplex_evaluation_priority)
# recompute all losses
for simplex in self.tri.simplices:
loss = self._compute_loss(simplex)
self._losses[simplex] = loss
# now distribute it around the the children if they are present
if simplex not in self._subtriangulations:
self._simplex_queue.add((loss, simplex, None))
continue
self._update_subsimplex_losses(
simplex, self._subtriangulations[simplex].simplices
)
@property
def _scale(self):
# get the output scale
return self._max_value - self._min_value
def _update_range(self, new_output):
if self._min_value is None or self._max_value is None:
# this is the first point, nothing to do, just set the range
self._min_value = np.min(new_output)
self._max_value = np.max(new_output)
self._old_scale = self._scale or 1
return False
# if range in one or more directions is doubled, then update all losses
self._min_value = min(self._min_value, np.min(new_output))
self._max_value = max(self._max_value, np.max(new_output))
scale_multiplier = 1 / (self._scale or 1)
# the maximum absolute value that is in the range. Because this is the
# largest number, this also has the largest absolute numerical error.
max_absolute_value_in_range = max(abs(self._min_value), abs(self._max_value))
# since a float has a relative error of 1e-15, the absolute error is the value * 1e-15
abs_err = 1e-15 * max_absolute_value_in_range
# when scaling the floats, the error gets increased.
scaled_err = abs_err * scale_multiplier
# do not scale along the axis if the numerical error gets too big
if scaled_err > 1e-2: # allowed_numerical_error = 1e-2
scale_multiplier = 1
self._output_multiplier = scale_multiplier
scale_factor = self._scale / self._old_scale
if scale_factor > self._recompute_losses_factor:
self._old_scale = self._scale
self._recompute_all_losses()
return True
return False
[docs] @cache_latest
def loss(self, real=True):
# XXX: compute pending loss if real == False
losses = self._losses if self.tri is not None else {}
return max(losses.values()) if losses else float("inf")
[docs] def remove_unfinished(self):
# XXX: implement this method
self.pending_points = set()
self._subtriangulations = {}
self._pending_to_simplex = {}
##########################
# Plotting related stuff #
##########################
[docs] def plot(self, n=None, tri_alpha=0):
"""Plot the function we want to learn, only works in 2D.
Parameters
----------
n : int
the number of boxes in the interpolation grid along each axis
tri_alpha : float (0 to 1)
Opacity of triangulation lines
"""
hv = ensure_holoviews()
if self.vdim > 1:
raise NotImplementedError(
"holoviews currently does not support", "3D surface plots in bokeh."
)
if self.ndim != 2:
raise NotImplementedError(
"Only 2D plots are implemented: You can "
"plot a 2D slice with 'plot_slice'."
)
x, y = self._bbox
lbrt = x[0], y[0], x[1], y[1]
if len(self.data) >= 4:
if n is None:
# Calculate how many grid points are needed.
# factor from A=โ3/4 * aยฒ (equilateral triangle)
scale_factor = np.product(np.diag(self._transform))
a_sq = np.sqrt(np.min(self.tri.volumes()) * scale_factor)
n = max(10, int(0.658 / a_sq))
xs = ys = np.linspace(0, 1, n)
xs = xs * (x[1] - x[0]) + x[0]
ys = ys * (y[1] - y[0]) + y[0]
z = self._ip()(xs[:, None], ys[None, :]).squeeze()
im = hv.Image(np.rot90(z), bounds=lbrt)
if tri_alpha:
points = np.array(
[self.tri.get_vertices(s) for s in self.tri.simplices]
)
points = np.pad(
points[:, [0, 1, 2, 0], :],
pad_width=((0, 0), (0, 1), (0, 0)),
mode="constant",
constant_values=np.nan,
).reshape(-1, 2)
tris = hv.EdgePaths([points])
else:
tris = hv.EdgePaths([])
else:
im = hv.Image([], bounds=lbrt)
tris = hv.EdgePaths([])
return im.opts(cmap="viridis") * tris.opts(
line_width=0.5, alpha=tri_alpha, tools=[]
)
[docs] def plot_slice(self, cut_mapping, n=None):
"""Plot a 1D or 2D interpolated slice of a N-dimensional function.
Parameters
----------
cut_mapping : dict (int โ float)
for each fixed dimension the value, the other dimensions
are interpolated. e.g. ``cut_mapping = {0: 1}``, so from
dimension 0 ('x') to value 1.
n : int
the number of boxes in the interpolation grid along each axis
"""
hv = ensure_holoviews()
plot_dim = self.ndim - len(cut_mapping)
if plot_dim == 1:
if not self.data:
return hv.Scatter([]) * hv.Path([])
elif self.vdim > 1:
raise NotImplementedError(
"multidimensional output not yet supported by `plot_slice`"
)
n = n or 201
values = [
cut_mapping.get(i, np.linspace(*self._bbox[i], n))
for i in range(self.ndim)
]
ind = next(i for i in range(self.ndim) if i not in cut_mapping)
x = values[ind]
y = self._ip()(*values)
p = hv.Path((x, y))
# Plot with 5% margins such that the boundary points are visible
margin = 0.05 / self._transform[ind, ind]
plot_bounds = (x.min() - margin, x.max() + margin)
return p.redim(x={"range": plot_bounds})
elif plot_dim == 2:
if self.vdim > 1:
raise NotImplementedError(
"holoviews currently does not support 3D surface plots in bokeh."
)
if n is None:
# Calculate how many grid points are needed.
# factor from A=โ3/4 * aยฒ (equilateral triangle)
scale_factor = np.product(np.diag(self._transform))
a_sq = np.sqrt(np.min(self.tri.volumes()) * scale_factor)
n = max(10, int(0.658 / a_sq))
xs = ys = np.linspace(0, 1, n)
xys = [xs[:, None], ys[None, :]]
values = [
(
cut_mapping[i]
if i in cut_mapping
else xys.pop(0) * (b[1] - b[0]) + b[0]
)
for i, b in enumerate(self._bbox)
]
lbrt = [b for i, b in enumerate(self._bbox) if i not in cut_mapping]
lbrt = np.reshape(lbrt, (2, 2)).T.flatten().tolist()
if len(self.data) >= 4:
z = self._ip()(*values).squeeze()
im = hv.Image(np.rot90(z), bounds=lbrt)
else:
im = hv.Image([], bounds=lbrt)
return im.opts(cmap="viridis")
else:
raise ValueError("Only 1 or 2-dimensional plots can be generated.")
[docs] def plot_3D(self, with_triangulation=False, return_fig=False):
"""Plot the learner's data in 3D using plotly.
Does *not* work with the
`adaptive.notebook_integration.live_plot` functionality.
Parameters
----------
with_triangulation : bool, default: False
Add the verticices to the plot.
return_fig : bool, default: False
Return the `plotly.graph_objs.Figure` object instead of showing
the rendered plot (default).
Returns
-------
plot : `plotly.offline.iplot` object
The 3D plot of ``learner.data``.
"""
plotly = ensure_plotly()
plots = []
vertices = self.tri.vertices
if with_triangulation:
Xe, Ye, Ze = [], [], []
for simplex in self.tri.simplices:
for s in itertools.combinations(simplex, 2):
Xe += [vertices[i][0] for i in s] + [None]
Ye += [vertices[i][1] for i in s] + [None]
Ze += [vertices[i][2] for i in s] + [None]
plots.append(
plotly.graph_objs.Scatter3d(
x=Xe,
y=Ye,
z=Ze,
mode="lines",
line={"color": "rgb(125,125,125)", "width": 1},
hoverinfo="none",
)
)
Xn, Yn, Zn = zip(*vertices)
colors = [self.data[p] for p in self.tri.vertices]
marker = {
"symbol": "circle",
"size": 3,
"color": colors,
"colorscale": "Viridis",
"line": {"color": "rgb(50,50,50)", "width": 0.5},
}
plots.append(
plotly.graph_objs.Scatter3d(
x=Xn,
y=Yn,
z=Zn,
mode="markers",
name="actors",
marker=marker,
hoverinfo="text",
)
)
axis = {
"showbackground": False,
"showline": False,
"zeroline": False,
"showgrid": False,
"showticklabels": False,
"title": "",
}
layout = plotly.graph_objs.Layout(
showlegend=False,
scene={"xaxis": axis, "yaxis": axis, "zaxis": axis},
margin={"t": 100},
hovermode="closest",
)
fig = plotly.graph_objs.Figure(data=plots, layout=layout)
return fig if return_fig else plotly.offline.iplot(fig)
def _get_iso(self, level=0.0, which="surface"):
if which == "surface":
if self.ndim != 3 or self.vdim != 1:
raise Exception(
"Isosurface plotting is only supported"
" for a 3D input and 1D output"
)
get_surface = True
get_line = False
elif which == "line":
if self.ndim != 2 or self.vdim != 1:
raise Exception(
"Isoline plotting is only supported for a 2D input and 1D output"
)
get_surface = False
get_line = True
vertices = [] # index -> (x,y,z)
faces_or_lines = [] # tuple of indices of the corner points
@functools.lru_cache
def _get_vertex_index(a, b):
vertex_a = self.tri.vertices[a]
vertex_b = self.tri.vertices[b]
value_a = self.data[vertex_a]
value_b = self.data[vertex_b]
da = abs(value_a - level)
db = abs(value_b - level)
dab = da + db
new_pt = db / dab * np.array(vertex_a) + da / dab * np.array(vertex_b)
new_index = len(vertices)
vertices.append(new_pt)
return new_index
for simplex in self.tri.simplices:
plane_or_line = []
for a, b in itertools.combinations(simplex, 2):
va = self.data[self.tri.vertices[a]]
vb = self.data[self.tri.vertices[b]]
if min(va, vb) < level <= max(va, vb):
vi = _get_vertex_index(a, b)
should_add = True
for pi in plane_or_line:
if np.allclose(vertices[vi], vertices[pi]):
should_add = False
if should_add:
plane_or_line.append(vi)
if get_surface and len(plane_or_line) == 3:
faces_or_lines.append(plane_or_line)
elif get_surface and len(plane_or_line) == 4:
faces_or_lines.append(plane_or_line[:3])
faces_or_lines.append(plane_or_line[1:])
elif get_line and len(plane_or_line) == 2:
faces_or_lines.append(plane_or_line)
if len(faces_or_lines) == 0:
r_min = min(self.data[v] for v in self.tri.vertices)
r_max = max(self.data[v] for v in self.tri.vertices)
raise ValueError(
f"Could not draw isosurface for level={level}, as"
" this value is not inside the function range. Please choose"
f" a level strictly inside interval ({r_min}, {r_max})"
)
return vertices, faces_or_lines
[docs] def plot_isoline(self, level=0.0, n=None, tri_alpha=0):
"""Plot the isoline at a specific level, only works in 2D.
Parameters
----------
level : float, default: 0
The value of the function at which you would like to see
the isoline.
n : int
The number of boxes in the interpolation grid along each axis.
This is passed to `plot`.
tri_alpha : float
The opacity of the overlaying triangulation. This is passed
to `plot`.
Returns
-------
`holoviews.core.Overlay`
The plot of the isoline(s). This overlays a `plot` with a
`holoviews.element.Path`.
"""
hv = ensure_holoviews()
if n == -1:
plot = hv.Path([])
else:
plot = self.plot(n=n, tri_alpha=tri_alpha)
if isinstance(level, Iterable):
for lvl in level:
plot = plot * self.plot_isoline(level=lvl, n=-1)
return plot
vertices, lines = self._get_iso(level, which="line")
paths = [[vertices[i], vertices[j]] for i, j in lines]
contour = hv.Path(paths).opts(color="black")
return plot * contour
[docs] def plot_isosurface(self, level=0.0, hull_opacity=0.2):
"""Plots a linearly interpolated isosurface.
This is the 3D analog of an isoline. Does *not* work with the
`adaptive.notebook_integration.live_plot` functionality.
Parameters
----------
level : float, default: 0.0
the function value which you are interested in.
hull_opacity : float, default: 0.0
the opacity of the hull of the domain.
Returns
-------
plot : `plotly.offline.iplot` object
The plot object of the isosurface.
"""
plotly = ensure_plotly()
vertices, faces = self._get_iso(level, which="surface")
x, y, z = zip(*vertices)
fig = plotly.figure_factory.create_trisurf(
x=x, y=y, z=z, plot_edges=False, simplices=faces, title="Isosurface"
)
isosurface = fig.data[0]
isosurface.update(
lighting={
"ambient": 1,
"diffuse": 1,
"roughness": 1,
"specular": 0,
"fresnel": 0,
}
)
if hull_opacity < 1e-3:
# Do not compute the hull_mesh.
return plotly.offline.iplot(fig)
hull_mesh = self._get_hull_mesh(opacity=hull_opacity)
return plotly.offline.iplot([isosurface, hull_mesh])
def _get_hull_mesh(self, opacity=0.2):
plotly = ensure_plotly()
hull = scipy.spatial.ConvexHull(self._bounds_points)
# Find the colors of each plane, giving triangles which are coplanar
# the same color, such that a square face has the same color.
color_dict = {}
def _get_plane_color(simplex):
simplex = tuple(simplex)
# If the volume of the two triangles combined is zero then they
# belong to the same plane.
for simplex_key, color in color_dict.items():
points = [hull.points[i] for i in set(simplex_key + simplex)]
points = np.array(points)
if np.linalg.matrix_rank(points[1:] - points[0]) < 3:
return color
if scipy.spatial.ConvexHull(points).volume < 1e-5:
return color
color_dict[simplex] = tuple(random.randint(0, 255) for _ in range(3))
return color_dict[simplex]
colors = [_get_plane_color(simplex) for simplex in hull.simplices]
x, y, z = zip(*self._bounds_points)
i, j, k = hull.simplices.T
lighting = {
"ambient": 1,
"diffuse": 1,
"roughness": 1,
"specular": 0,
"fresnel": 0,
}
return plotly.graph_objs.Mesh3d(
x=x,
y=y,
z=z,
i=i,
j=j,
k=k,
facecolor=colors,
opacity=opacity,
lighting=lighting,
)
def _get_data(self):
return deepcopy(self.__dict__)
def _set_data(self, state):
for k, v in state.items():
setattr(self, k, v)