Source code for adaptive.learner.learner1D

from __future__ import annotations

import collections.abc
import itertools
import math
import sys
from collections.abc import Sequence
from copy import copy, deepcopy
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

import cloudpickle
import numpy as np
from sortedcollections.recipes import ItemSortedDict
from sortedcontainers.sorteddict import SortedDict

from adaptive.learner.base_learner import BaseLearner, uses_nth_neighbors
from adaptive.learner.learnerND import volume
from adaptive.learner.triangulation import simplex_volume_in_embedding
from adaptive.notebook_integration import ensure_holoviews
from adaptive.types import Float, Int, Real
from adaptive.utils import (
    assign_defaults,
    cache_latest,
    partial_function_from_dataframe,
)

if sys.version_info >= (3, 10):
    from typing import TypeAlias
else:
    from typing_extensions import TypeAlias


try:
    import pandas

    with_pandas = True

except ModuleNotFoundError:
    with_pandas = False

if TYPE_CHECKING:
    # -- types --

    # Commonly used types
    Interval: TypeAlias = Union[tuple[float, float], tuple[float, float, int]]
    NeighborsType: TypeAlias = SortedDict[float, list[Optional[float]]]

    # Types for loss_per_interval functions
    XsType0: TypeAlias = tuple[float, float]
    YsType0: TypeAlias = Union[tuple[float, float], tuple[np.ndarray, np.ndarray]]
    XsType1: TypeAlias = tuple[
        Optional[float], Optional[float], Optional[float], Optional[float]
    ]
    YsType1: TypeAlias = Union[
        tuple[Optional[float], Optional[float], Optional[float], Optional[float]],
        tuple[
            Optional[np.ndarray],
            Optional[np.ndarray],
            Optional[np.ndarray],
            Optional[np.ndarray],
        ],
    ]
    XsTypeN: TypeAlias = tuple[Optional[float], ...]
    YsTypeN: TypeAlias = Union[
        tuple[Optional[float], ...], tuple[Optional[np.ndarray], ...]
    ]


__all__ = [
    "uniform_loss",
    "default_loss",
    "abs_min_log_loss",
    "triangle_loss",
    "resolution_loss_function",
    "curvature_loss_function",
    "Learner1D",
]


[docs]@uses_nth_neighbors(0) def uniform_loss(xs: XsType0, ys: YsType0) -> Float: """Loss function that samples the domain uniformly. Works with `~adaptive.Learner1D` only. Examples -------- >>> def f(x): ... return x**2 >>> >>> learner = adaptive.Learner1D(f, ... bounds=(-1, 1), ... loss_per_interval=uniform_sampling_1d) >>> """ dx = xs[1] - xs[0] return dx
[docs]@uses_nth_neighbors(0) def default_loss(xs: XsType0, ys: YsType0) -> Float: """Calculate loss on a single interval. Currently returns the rescaled length of the interval. If one of the y-values is missing, returns 0 (so the intervals with missing data are never touched. This behavior should be improved later. """ dx = xs[1] - xs[0] if isinstance(ys[0], collections.abc.Iterable): dy_vec = np.array([abs(a - b) for a, b in zip(*ys)]) return np.hypot(dx, dy_vec).max() else: dy = ys[1] - ys[0] return np.hypot(dx, dy)
[docs]@uses_nth_neighbors(0) def abs_min_log_loss(xs: XsType0, ys: YsType0) -> Float: """Calculate loss of a single interval that prioritizes the absolute minimum.""" ys_log: YsType0 = tuple(np.log(np.abs(y).min()) for y in ys) # type: ignore[assignment] return default_loss(xs, ys_log)
[docs]@uses_nth_neighbors(1) def triangle_loss(xs: XsType1, ys: YsType1) -> Float: assert len(xs) == 4 xs = [x for x in xs if x is not None] # type: ignore[assignment] ys = [y for y in ys if y is not None] # type: ignore[assignment] if len(xs) == 2: # we do not have enough points for a triangle return xs[1] - xs[0] # type: ignore[operator] N = len(xs) - 2 # number of constructed triangles if isinstance(ys[0], collections.abc.Iterable): pts = [(x, *y) for x, y in zip(xs, ys)] # type: ignore[misc] vol = simplex_volume_in_embedding else: pts = list(zip(xs, ys)) vol = volume return sum(vol(pts[i : i + 3]) for i in range(N)) / N
[docs]def resolution_loss_function( min_length: Real = 0, max_length: Real = 1 ) -> Callable[[XsType0, YsType0], Float]: """Loss function that is similar to the `default_loss` function, but you can set the maximum and minimum size of an interval. Works with `~adaptive.Learner1D` only. The arguments `min_length` and `max_length` should be in between 0 and 1 because the total size is normalized to 1. Returns ------- loss_function : callable Examples -------- >>> def f(x): ... return x**2 >>> >>> loss = resolution_loss_function(min_length=0.01, max_length=1) >>> learner = adaptive.Learner1D(f, bounds=(-1, -1), loss_per_interval=loss) """ @uses_nth_neighbors(0) def resolution_loss(xs: XsType0, ys: YsType0) -> Float: loss = uniform_loss(xs, ys) if loss < min_length: # Return zero such that this interval won't be chosen again return 0 if loss > max_length: # Return infinite such that this interval will be picked return np.inf loss = default_loss(xs, ys) return loss return resolution_loss
[docs]def curvature_loss_function( area_factor: Real = 1, euclid_factor: Real = 0.02, horizontal_factor: Real = 0.02 ) -> Callable[[XsType1, YsType1], Float]: # XXX: add a doc-string @uses_nth_neighbors(1) def curvature_loss(xs: XsType1, ys: YsType1) -> Float: xs_middle = xs[1:3] ys_middle = ys[1:3] triangle_loss_ = triangle_loss(xs, ys) default_loss_ = default_loss(xs_middle, ys_middle) dx = xs_middle[1] - xs_middle[0] # type: ignore[operator] return ( area_factor * (triangle_loss_**0.5) + euclid_factor * default_loss_ + horizontal_factor * dx ) return curvature_loss
def linspace(x_left: Real, x_right: Real, n: Int) -> list[Float]: """This is equivalent to 'np.linspace(x_left, x_right, n, endpoint=False)[1:]', but it is 15-30 times faster for small 'n'.""" if n == 1: # This is just an optimization return [] else: step = (x_right - x_left) / n return [x_left + step * i for i in range(1, n)] def _get_neighbors_from_array(xs: np.ndarray) -> NeighborsType: xs = np.sort(xs) xs_left = np.roll(xs, 1).tolist() xs_right = np.roll(xs, -1).tolist() xs_left[0] = None xs_right[-1] = None neighbors = {x: [x_L, x_R] for x, x_L, x_R in zip(xs, xs_left, xs_right)} return SortedDict(neighbors) def _get_intervals( x: float, neighbors: NeighborsType, nth_neighbors: int ) -> list[tuple[float, float]]: nn = nth_neighbors i = neighbors.index(x) start = max(0, i - nn - 1) end = min(len(neighbors), i + nn + 2) points = neighbors.keys()[start:end] return list(zip(points, points[1:]))
[docs]class Learner1D(BaseLearner): """Learns and predicts a function 'f:โ„ โ†’ โ„^N'. Parameters ---------- function : callable The function to learn. Must take a single real parameter and return a real number or 1D array. bounds : pair of reals The bounds of the interval on which to learn 'function'. loss_per_interval: callable, optional A function that returns the loss for a single interval of the domain. If not provided, then a default is used, which uses the scaled distance in the x-y plane as the loss. See the notes for more details. Attributes ---------- data : dict Sampled points and values. pending_points : set Points that still have to be evaluated. Notes ----- `loss_per_interval` takes 2 parameters: ``xs`` and ``ys``, and returns a scalar; the loss over the interval. xs : tuple of floats The x values of the interval, if `nth_neighbors` is greater than zero it also contains the x-values of the neighbors of the interval, in ascending order. The interval we want to know the loss of is then the middle interval. If no neighbor is available (at the edges of the domain) then `None` will take the place of the x-value of the neighbor. ys : tuple of function values The output values of the function when evaluated at the `xs`. This is either a float or a tuple of floats in the case of vector output. The `loss_per_interval` function may also have an attribute `nth_neighbors` that indicates how many of the neighboring intervals to `interval` are used. If `loss_per_interval` doesn't have such an attribute, it's assumed that is uses **no** neighboring intervals. Also see the `uses_nth_neighbors` decorator for more information. """ def __init__( self, function: Callable[[Real], Float | np.ndarray], bounds: tuple[Real, Real], loss_per_interval: Callable[[XsTypeN, YsTypeN], Float] | None = None, ): self.function = function # type: ignore if loss_per_interval is not None and hasattr( loss_per_interval, "nth_neighbors" ): self.nth_neighbors = loss_per_interval.nth_neighbors else: self.nth_neighbors = 0 self.loss_per_interval = loss_per_interval or default_loss # When the scale changes by a factor 2, the losses are # recomputed. This is tunable such that we can test # the learners behavior in the tests. self._recompute_losses_factor = 2 self.data: dict[Real, Real] = {} self.pending_points: set[Real] = set() # A dict {x_n: [x_{n-1}, x_{n+1}]} for quick checking of local # properties. self.neighbors: NeighborsType = SortedDict() self.neighbors_combined: NeighborsType = SortedDict() # Bounding box [[minx, maxx], [miny, maxy]]. self._bbox = [list(bounds), [np.inf, -np.inf]] # Data scale (maxx - minx), (maxy - miny) self._scale = [bounds[1] - bounds[0], 0] self._oldscale = deepcopy(self._scale) # A LossManager storing the loss function for each interval x_n. self.losses = loss_manager(self._scale[0]) self.losses_combined = loss_manager(self._scale[0]) # The precision in 'x' below which we set losses to 0. self._dx_eps = 2 * max(np.abs(bounds)) * np.finfo(float).eps self.bounds: tuple[float, float] = (float(bounds[0]), float(bounds[1])) self.__missing_bounds = set(self.bounds) # cache of missing bounds self._vdim: int | None = None
[docs] def new(self) -> Learner1D: """Create a copy of `~adaptive.Learner1D` without the data.""" return Learner1D(self.function, self.bounds, self.loss_per_interval)
@property def vdim(self) -> int: """Length of the output of ``learner.function``. If the output is unsized (when it's a scalar) then `vdim = 1`. As long as no data is known `vdim = 1`. """ if self._vdim is None: if self.data: y = next(iter(self.data.values())) try: self._vdim = len(np.squeeze(y)) except TypeError: # Means we are taking the length of a float self._vdim = 1 else: return 1 return self._vdim
[docs] def to_numpy(self): """Data as NumPy array of size ``(npoints, 2)`` if ``learner.function`` returns a scalar and ``(npoints, 1+vdim)`` if ``learner.function`` returns a vector of length ``vdim``. """ return np.array([(x, *np.atleast_1d(y)) for x, y in sorted(self.data.items())])
[docs] def to_dataframe( # type: ignore[override] self, with_default_function_args: bool = True, function_prefix: str = "function.", x_name: str = "x", y_name: str = "y", ) -> pandas.DataFrame: """Return the data as a `pandas.DataFrame`. Parameters ---------- with_default_function_args : bool, optional Include the ``learner.function``'s default arguments as a column, by default True function_prefix : str, optional Prefix to the ``learner.function``'s default arguments' names, by default "function." x_name : str, optional Name of the input value, by default "x" y_name : str, optional Name of the output value, by default "y" Returns ------- pandas.DataFrame Raises ------ ImportError If `pandas` is not installed. """ if not with_pandas: raise ImportError("pandas is not installed.") xs, ys = zip(*sorted(self.data.items())) if self.data else ([], []) df = pandas.DataFrame(xs, columns=[x_name]) df[y_name] = ys df.attrs["inputs"] = [x_name] df.attrs["output"] = y_name if with_default_function_args: assign_defaults(self.function, df, function_prefix) return df
[docs] def load_dataframe( # type: ignore[override] self, df: pandas.DataFrame, with_default_function_args: bool = True, function_prefix: str = "function.", x_name: str = "x", y_name: str = "y", ) -> None: """Load data from a `pandas.DataFrame`. If ``with_default_function_args`` is True, then ``learner.function``'s default arguments are set (using `functools.partial`) from the values in the `pandas.DataFrame`. Parameters ---------- df : pandas.DataFrame The data to load. with_default_function_args : bool, optional The ``with_default_function_args`` used in ``to_dataframe()``, by default True function_prefix : str, optional The ``function_prefix`` used in ``to_dataframe``, by default "function." x_name : str, optional The ``x_name`` used in ``to_dataframe``, by default "x" y_name : str, optional The ``y_name`` used in ``to_dataframe``, by default "y" """ self.tell_many(df[x_name].values, df[y_name].values) if with_default_function_args: self.function = partial_function_from_dataframe( self.function, df, function_prefix )
@property def npoints(self) -> int: # type: ignore[override] """Number of evaluated points.""" return len(self.data)
[docs] @cache_latest def loss(self, real: bool = True) -> float: if self._missing_bounds(): return np.inf losses = self.losses if real else self.losses_combined if not losses: return np.inf max_interval, max_loss = losses.peekitem(0) return max_loss
def _scale_x(self, x: Float | None) -> Float | None: if x is None: return None return x / self._scale[0] def _scale_y(self, y: Float | np.ndarray | None) -> Float | np.ndarray | None: if y is None: return None y_scale = self._scale[1] or 1 return y / y_scale def _get_point_by_index(self, ind: int) -> float | None: if ind < 0 or ind >= len(self.neighbors): return None return self.neighbors.keys()[ind] def _get_loss_in_interval(self, x_left: float, x_right: float) -> float: assert x_left is not None and x_right is not None if x_right - x_left < self._dx_eps: return 0 nn = self.nth_neighbors i = self.neighbors.index(x_left) start = i - nn end = i + nn + 2 xs = [self._get_point_by_index(i) for i in range(start, end)] ys = [self.data.get(x, None) for x in xs] xs_scaled = tuple(self._scale_x(x) for x in xs) ys_scaled = tuple(self._scale_y(y) for y in ys) # we need to compute the loss for this interval return self.loss_per_interval(xs_scaled, ys_scaled) def _update_interpolated_loss_in_interval( self, x_left: float, x_right: float ) -> None: if x_left is None or x_right is None: return loss = self._get_loss_in_interval(x_left, x_right) self.losses[x_left, x_right] = loss # Iterate over all interpolated intervals in between # x_left and x_right and set the newly interpolated loss. a, b = x_left, None dx = x_right - x_left while b != x_right: b = self.neighbors_combined[a][1] self.losses_combined[a, b] = (b - a) * loss / dx a = b def _update_losses(self, x: float, real: bool = True) -> None: """Update all losses that depend on x""" # When we add a new point x, we should update the losses # (x_left, x_right) are the "real" neighbors of 'x'. x_left, x_right = self._find_neighbors(x, self.neighbors) # (a, b) are the neighbors of the combined interpolated # and "real" intervals. a, b = self._find_neighbors(x, self.neighbors_combined) # (a, b) is splitted into (a, x) and (x, b) so if (a, b) exists self.losses_combined.pop((a, b), None) # we get rid of (a, b). if real: # We need to update all interpolated losses in the interval # (x_left, x), (x, x_right) and the nth_neighbors nearest # neighboring intervals. Since the addition of the # point 'x' could change their loss. for ival in _get_intervals(x, self.neighbors, self.nth_neighbors): self._update_interpolated_loss_in_interval(*ival) # Since 'x' is in between (x_left, x_right), # we get rid of the interval. self.losses.pop((x_left, x_right), None) self.losses_combined.pop((x_left, x_right), None) elif x_left is not None and x_right is not None: # 'x' happens to be in between two real points, # so we can interpolate the losses. dx = x_right - x_left loss = self.losses[x_left, x_right] self.losses_combined[a, x] = (x - a) * loss / dx self.losses_combined[x, b] = (b - x) * loss / dx # (no real point left of x) or (no real point right of a) left_loss_is_unknown = (x_left is None) or (not real and x_right is None) if (a is not None) and left_loss_is_unknown: self.losses_combined[a, x] = float("inf") # (no real point right of x) or (no real point left of b) right_loss_is_unknown = (x_right is None) or (not real and x_left is None) if (b is not None) and right_loss_is_unknown: self.losses_combined[x, b] = float("inf") @staticmethod def _find_neighbors(x: float, neighbors: NeighborsType) -> Any: if x in neighbors: return neighbors[x] pos = neighbors.bisect_left(x) keys = neighbors.keys() x_left = keys[pos - 1] if pos != 0 else None x_right = keys[pos] if pos != len(neighbors) else None return x_left, x_right def _update_neighbors(self, x: float, neighbors: NeighborsType) -> None: if x not in neighbors: # The point is new x_left, x_right = self._find_neighbors(x, neighbors) neighbors[x] = [x_left, x_right] neighbors.get(x_left, [None, None])[1] = x neighbors.get(x_right, [None, None])[0] = x def _update_scale(self, x: float, y: Float | np.ndarray) -> None: """Update the scale with which the x and y-values are scaled. For a learner where the function returns a single scalar the scale is determined by the peak-to-peak value of the x and y-values. When the function returns a vector the learners y-scale is set by the level with the the largest peak-to-peak value. """ self._bbox[0][0] = min(self._bbox[0][0], x) self._bbox[0][1] = max(self._bbox[0][1], x) self._scale[0] = self._bbox[0][1] - self._bbox[0][0] if y is not None: if self.vdim > 1: try: y_min = np.nanmin([self._bbox[1][0], y], axis=0) y_max = np.nanmax([self._bbox[1][1], y], axis=0) except ValueError: # Happens when `_bbox[1]` is a float and `y` a vector. y_min = y_max = y self._bbox[1] = [y_min, y_max] self._scale[1] = np.max(y_max - y_min) else: self._bbox[1][0] = min(self._bbox[1][0], y) self._bbox[1][1] = max(self._bbox[1][1], y) self._scale[1] = self._bbox[1][1] - self._bbox[1][0]
[docs] def tell(self, x: float, y: Float | Sequence[Float] | np.ndarray) -> None: if x in self.data: # The point is already evaluated before return if y is None: raise TypeError( "Y-value may not be None, use learner.tell_pending(x)" "to indicate that this value is currently being calculated" ) # either it is a float/int, if not, try casting to a np.array if not isinstance(y, (float, int)): y = np.asarray(y, dtype=float) # Add point to the real data dict self.data[x] = y # remove from set of pending points self.pending_points.discard(x) if not self.bounds[0] <= x <= self.bounds[1]: return self._update_neighbors(x, self.neighbors_combined) self._update_neighbors(x, self.neighbors) self._update_scale(x, y) self._update_losses(x, real=True) # If the scale has increased enough, recompute all losses. if self._scale[1] > self._recompute_losses_factor * self._oldscale[1]: for interval in reversed(self.losses): self._update_interpolated_loss_in_interval(*interval) self._oldscale = deepcopy(self._scale)
[docs] def tell_pending(self, x: float) -> None: if x in self.data: # The point is already evaluated before return self.pending_points.add(x) self._update_neighbors(x, self.neighbors_combined) self._update_losses(x, real=False)
[docs] def tell_many( self, xs: Sequence[Float] | np.ndarray, ys: ( Sequence[Float] | Sequence[Sequence[Float]] | Sequence[np.ndarray] | np.ndarray ), *, force: bool = False, ) -> None: if not force and not (len(xs) > 0.5 * len(self.data) and len(xs) > 2): # Only run this more efficient method if there are # at least 2 points and the amount of points added are # at least half of the number of points already in 'data'. # These "magic numbers" are somewhat arbitrary. super().tell_many(xs, ys) return # Add data points self.data.update(zip(xs, ys)) self.pending_points.difference_update(xs) # Get all data as numpy arrays points = np.array(list(self.data.keys())) values = np.array(list(self.data.values())) points_pending = np.array(list(self.pending_points)) points_combined = np.hstack([points_pending, points]) # Generate neighbors self.neighbors = _get_neighbors_from_array(points) self.neighbors_combined = _get_neighbors_from_array(points_combined) # Update scale self._bbox[0] = [points_combined.min(), points_combined.max()] self._bbox[1] = [values.min(axis=0), values.max(axis=0)] self._scale[0] = self._bbox[0][1] - self._bbox[0][0] self._scale[1] = np.max(self._bbox[1][1] - self._bbox[1][0]) self._oldscale = deepcopy(self._scale) # Find the intervals for which the losses should be calculated. intervals, intervals_combined = ( [(x_m, x_r) for x_m, (x_l, x_r) in neighbors.items()][:-1] for neighbors in (self.neighbors, self.neighbors_combined) ) # The the losses for the "real" intervals. self.losses = loss_manager(self._scale[0]) for ival in intervals: self.losses[ival] = self._get_loss_in_interval(*ival) # List with "real" intervals that have interpolated intervals inside to_interpolate: list[tuple[Real, Real]] = [] self.losses_combined = loss_manager(self._scale[0]) for ival in intervals_combined: # If this interval exists in 'losses' then copy it otherwise # calculate it. if ival in reversed(self.losses): self.losses_combined[ival] = self.losses[ival] else: # Set all losses to inf now, later they might be udpdated if the # interval appears to be inside a real interval. self.losses_combined[ival] = np.inf x_left, x_right = ival a, b = to_interpolate[-1] if to_interpolate else (None, None) if b == x_left and (a, b) not in self.losses: # join (a, b) and (x_left, x_right) โ†’ (a, x_right) to_interpolate[-1] = (a, x_right) else: to_interpolate.append((x_left, x_right)) for ival in to_interpolate: if ival in reversed(self.losses): # If this interval does not exist it should already # have an inf loss. self._update_interpolated_loss_in_interval(*ival)
[docs] def ask(self, n: int, tell_pending: bool = True) -> tuple[list[float], list[float]]: """Return 'n' points that are expected to maximally reduce the loss.""" points, loss_improvements = self._ask_points_without_adding(n) if tell_pending: for p in points: self.tell_pending(p) return points, loss_improvements
def _missing_bounds(self) -> list[Real]: missing_bounds = [] for b in copy(self.__missing_bounds): if b in self.data: self.__missing_bounds.remove(b) elif b not in self.pending_points: missing_bounds.append(b) return sorted(missing_bounds) def _ask_points_without_adding(self, n: int) -> tuple[list[float], list[float]]: """Return 'n' points that are expected to maximally reduce the loss. Without altering the state of the learner""" # Find out how to divide the n points over the intervals # by finding positive integer n_i that minimize max(L_i / n_i) subject # to a constraint that sum(n_i) = n + N, with N the total number of # intervals. # Return equally spaced points within each interval to which points # will be added. # XXX: when is this used and could we safely remove it without impacting performance? if n == 0: return [], [] # If the bounds have not been chosen yet, we choose them first. missing_bounds = self._missing_bounds() if len(missing_bounds) >= n: return missing_bounds[:n], [np.inf] * n # Add bound intervals to quals if bounds were missing. if len(self.data) + len(self.pending_points) == 0: # We don't have any points, so return a linspace with 'n' points. a, b = self.bounds return np.linspace(a, b, n).tolist(), [np.inf] * n quals = loss_manager(self._scale[0]) if len(missing_bounds) > 0: # There is at least one point in between the bounds. all_points = list(self.data.keys()) + list(self.pending_points) intervals = [ (self.bounds[0], min(all_points)), (max(all_points), self.bounds[1]), ] for interval, bound in zip(intervals, self.bounds): if bound in missing_bounds: quals[(*interval, 1)] = np.inf points_to_go = n - len(missing_bounds) # Calculate how many points belong to each interval. i, i_max = 0, len(self.losses_combined) for _ in range(points_to_go): qual, loss_qual = quals.peekitem(0) if quals else (None, 0) ival, loss_ival = ( self.losses_combined.peekitem(i) if i < i_max else (None, 0) ) if qual is None or ( ival is not None and self._loss(self.losses_combined, ival) >= self._loss(quals, qual) ): i += 1 quals[(*ival, 2)] = loss_ival / 2 else: quals.pop(qual, None) *xs, n = qual quals[(*xs, n + 1)] = loss_qual * n / (n + 1) points = list( itertools.chain.from_iterable( linspace(x_l, x_r, n) for (x_l, x_r, n) in quals ) ) loss_improvements = list( itertools.chain.from_iterable( itertools.repeat(quals[x0, x1, n], n - 1) for (x0, x1, n) in quals ) ) # add the missing bounds points = missing_bounds + points loss_improvements = [np.inf] * len(missing_bounds) + loss_improvements return points, loss_improvements def _loss( self, mapping: dict[Interval, float], ival: Interval ) -> tuple[float, Interval]: loss = mapping[ival] return finite_loss(ival, loss, self._scale[0])
[docs] def plot(self, *, scatter_or_line: str = "scatter"): """Returns a plot of the evaluated data. Parameters ---------- scatter_or_line : str, default: "scatter" Plot as a scatter plot ("scatter") or a line plot ("line"). Returns ------- plot : `holoviews.Overlay` Plot of the evaluated data. """ if scatter_or_line not in ("scatter", "line"): raise ValueError("scatter_or_line must be 'scatter' or 'line'") hv = ensure_holoviews() xs, ys = zip(*sorted(self.data.items())) if self.data else ([], []) if scatter_or_line == "scatter": if self.vdim == 1: plots = [hv.Scatter((xs, ys))] else: plots = [hv.Scatter((xs, _ys)) for _ys in np.transpose(ys)] else: plots = [hv.Path((xs, ys))] # Put all plots in an Overlay because a DynamicMap can't handle changing # datatypes, e.g. when `vdim` isn't yet known and the live_plot is running. p = hv.Overlay(plots) # Plot with 5% empty margins such that the boundary points are visible margin = 0.05 * (self.bounds[1] - self.bounds[0]) plot_bounds = (self.bounds[0] - margin, self.bounds[1] + margin) return p.redim(x={"range": plot_bounds})
[docs] def remove_unfinished(self) -> None: self.pending_points = set() self.losses_combined = deepcopy(self.losses) self.neighbors_combined = deepcopy(self.neighbors)
def _get_data(self) -> dict[float, float]: return self.data def _set_data(self, data: dict[float, float]) -> None: if data: xs, ys = zip(*data.items()) self.tell_many(xs, ys) def __getstate__(self): return ( cloudpickle.dumps(self.function), tuple(self.bounds), self.loss_per_interval, dict(self.losses), # SortedDict cannot be pickled dict(self.losses_combined), # ItemSortedDict cannot be pickled self._get_data(), ) def __setstate__(self, state): function, bounds, loss_per_interval, losses, losses_combined, data = state function = cloudpickle.loads(function) self.__init__(function, bounds, loss_per_interval) self._set_data(data) self.losses.update(losses) self.losses_combined.update(losses_combined)
def loss_manager(x_scale: float) -> ItemSortedDict[Interval, float]: def sort_key(ival, loss): loss, ival = finite_loss(ival, loss, x_scale) return -loss, ival sorted_dict = ItemSortedDict(sort_key) return sorted_dict def finite_loss(ival: Interval, loss: float, x_scale: float) -> tuple[float, Interval]: """Get the so-called finite_loss of an interval in order to be able to sort intervals that have infinite loss.""" # If the loss is infinite we return the # distance between the two points. if math.isinf(loss) or math.isnan(loss): loss = (ival[1] - ival[0]) / x_scale if len(ival) == 3: # Used when constructing quals. Last item is # the number of points inside the qual. loss /= ival[2] # type: ignore[misc] # We round the loss to 12 digits such that losses # are equal up to numerical precision will be considered # equal. This is 3.5x faster than unsing the `round` function. round_fac = 1e12 loss = int(loss * round_fac + 0.5) / round_fac return loss, ival