Source code for ddg.nets._conversion

# TODO: Remove old functions
from collections.abc import Iterable

import numpy as np

from ddg import nonexact
from ddg.nets._decorators import applicable_to_netcollection
from ddg.nets._domain import (
    DiscreteRectangularDomain,
    EmptyDomain,
    SmoothRectangularDomain,
    _is_bounded_interval,
)
from ddg.nets._net import DiscreteNet, EmptyNet, PointNet


[docs]def sample_interval(interval, sample, option="", atol=None, anchor=None): """ Sample an interval. Parameters ---------- interval : tuple tuple containing the boundaries of the interval sample : float, int or list list is containing a float in the first, and int in the second entry Stepsize (or amount of samples if 't' option is set). If given a list for compound sampling, the first entry is the stepsize, while the second is the total amount of samples. option : string (default="") | Options for the sampling. | | The supported options are: | 't'otal amount of samples given | 's'ymmetric sampling | 'p'eriodic | 'c'ompound sampling, both stepsize and amount of samples given | | When option 'c' is set, the sampling will choose between the stepsize | and total amount according to the type of interval. In the case of an | unbounded interval, it will also cut off the interval similar to | bound_domain. | | When option 's' is set, the sampling will be fit symmetrically inside | the interval. atol : float or None (default=None) Tolerance for the sampling. Is not used when 't' is set or the interval is unbounded. This function uses the global tolerance defaults if `atol` or `rtol` are set to None. See :py:mod:`ddg.nonexact` for details. anchor : float or None (default=None) Point inside the interval. The sampling will be chosen such that the point is part of it. Returns ------- tuple | containing: | [0] the (modified) boundaries of the sampled interval | [1] amount of samples | [2] sampling function Raises ------ NotImplementedError if given options are not compatible with eachother ValueError if sample is not compatible with the given options See Also -------- bound_domain """ a, b = interval # Check if interval is bounded # TODO: Total amounts and anchor given valid_options = set(["s", "t", "p", "c"]) if not set(option).issubset(valid_options): raise NotImplementedError( "Unkown options found: " + str(set(option).difference(valid_options)) ) if "t" in option and "s" in option: raise NotImplementedError("Uncompatible options 's' and 't'.") if "t" in option and "c" in option: raise NotImplementedError("Uncompatible options 'c' and 't'.") if "c" in option and "s" in option: raise NotImplementedError("Uncompatible options 'c' and 's'.") if "c" in option and len(sample) != 2: raise ValueError("Compound sampling needs to be [stepsize,samples,c].") if "c" not in option and isinstance(sample, Iterable) and len(sample) != 1: raise ValueError("Only one value supported for non-compound sampling.") if "c" not in option and isinstance(sample, Iterable): sample = sample[0] if _is_bounded_interval(interval): # Total amount of samples given if "c" in option: sample = sample[1] option += "t" if "t" in option and not anchor: N = sample if not isinstance(N, int): raise ValueError("Total samples must be an integer.") m, n = 0, N if "p" in option: N += 1 def tmp(x, a=a, b=b, m=m, N=N): return (x - m) * (b - a) / (N - 1) + a return ([a, b], n, tmp) elif "t" in option: msg = "Total samples is not compatible with anchor." raise NotImplementedError(msg) # Stepsize given else: # Anchor given if anchor is not None: if "s" in option: msg = "Option 's' is not compatible with anchor." raise NotImplementedError(msg) else: N, re = divmod(anchor - a, sample) N = int(np.floor(N)) if not nonexact.isclose(sample, re, atol=atol): a += re tempsample = np.arange(a, b + sample, sample) if tempsample[-1] > b: tempsample = tempsample[:-1] if nonexact.isclose(tempsample[-1], b, atol=atol): tempsample[-1] = b N = len(tempsample) m, n = 0, N if "p" in option and tempsample[-1] == b: N += 1 def tmp(x, a=a, b=tempsample[-1], m=m, N=N): return (x - m) * (b - a) / (N - 1) + a return ([a, tempsample[-1]], n, tmp) # Symmetric case elif "s" in option: N, re = divmod((b - a), sample) N = int(np.floor(N)) if nonexact.isclose(re, sample, atol=atol): N += 2 else: N += 1 if not nonexact.isclose(re, 0, atol=atol, rtol=0.0): b = a + re / 2 + (N - 1) * sample a += re / 2 m, n = 0, N def tmp(x, a=a, b=b, m=m, N=N): return (x - m) * (b - a) / (N - 1) + a return ([a, b], n, tmp) # No options given else: tempsample = np.arange(a, b + sample, sample) if tempsample[-1] > b: tempsample = tempsample[:-1] if nonexact.isclose(tempsample[-1], b, atol=atol): tempsample[-1] = b N = len(tempsample) m, n = 0, N def tmp(x, a=a, b=tempsample[-1], m=m, N=N): return (x - m) * (b - a) / (N - 1) + a return ([a, tempsample[-1]], n, tmp) else: total = None if "c" in option: total = sample[1] sample = sample[0] if "t" in option: msg = "Option 't' is not compatible with unbounded intervals." raise ValueError(msg) if a == -np.inf and b == np.inf: if total: N, re = divmod(total, 2) b = N * sample a = (1 - N - re) * sample def tmp(x, s=sample, a=1 - N - re): return s * (x + a) else: def tmp(x, s=sample): return s * x elif a == -np.inf and b < np.inf: if total: a = b - sample * (total - 1) def tmp(x, s=sample, a=b - total + 1): return s * (x + a) else: def tmp(x, s=sample, b=b): return s * x + b elif b == np.inf: def tmp(x, s=sample, a=a): return s * x + a if total: b = a + sample * (total - 1) else: raise ValueError("Strange Interval given") if not total: total = np.inf return ([a, b], total, tmp)
[docs]def sampling_decomposer(sampling, atol, anchor, dimension): """ Helper function for sample_smooth_domain. Given parameters of a sampling, this function returns a list of usable length for the sampling process of domains. Parameters ---------- sampling : list, int or float Stepsize/Amount of samples and additional options. To default pass an int/float. If not pass a list of the form [stepsize/samples, option]. atol : list, int or float Tolerance(s) used in the sampling process. To default pass an int/float. If not pass a list of tolerances for each direction. If `None` is given, will return `None` again, i.e. won't replace it with the global default. anchor : list, tuple or None Anchor point used in the sampling process. dimension : int Dimension of the domain you want to sample. Returns ------- list | List of the decomposed sampling. | [0] stepsize/samples | [1] options | [2] tolerances | [3] anchor See also -------- sample_smooth_domain """ if not isinstance(sampling, Iterable): samples = [[sampling] * dimension, [""] * dimension] elif len(sampling) == 1 and isinstance(sampling[0], Iterable): samples = [sampling[0][:-1]] * dimension if len(sampling[0]) == 2: options = [sampling[0][-1]] * dimension else: options = [""] * dimension samples = [samples, options] elif len(sampling) == 1: samples = [[sampling[0]] * dimension, [""] * dimension] elif len(sampling) in [2, 3] and isinstance(sampling[-1], str): samples = [[sampling[:-1]] * dimension, [sampling[-1]] * dimension] else: samples = [s[:-1] if isinstance(s, Iterable) else [s] for s in sampling] options = [ s[-1] if isinstance(s, Iterable) and len(s) in [2, 3] else "" for s in sampling ] samples = [samples, options] if isinstance(atol, Iterable): if len(atol) == 1: samples.append([atol[0]] * dimension) else: samples.append(atol) else: samples.append([atol] * dimension) if anchor is not None: anchor = np.array(anchor) samples.append(anchor) else: samples.append([None] * dimension) return samples
[docs]def sample_smooth_domain(domain, sampling, anchor=None, atol=1e-8): """ Samples a smooth domain. Parameters ---------- domain : ddg.nets.SmoothDomain SmoothDomain to sample. sampling : list, int or float | Determines how the domain is sampled. | If a default stepsize/sample amount and option | should be used for all directions, either pass an int/float (will be | regarded as stepsize), or a list of length 2 where the first entry is | the stepsize/sample amount and the second is the option to use. | Else pass a list of length of the dimension of the domain where each | entry can be either an int/float or a list of length two (same rules as | above). | | Options: | '' : stepsize given. The interval will be sampled with the given | stepsize starting from its lower bound. | 't' : use the number as total amount of samples. The interval will be | divided up into the given number of samples in an equal fashion. | Note that this option is only available for bounded intervals. | Will raise a warning, if the number is a float. | 'c' : compound sampling. Behaves as option 't' for bounded and '' for | unbounded intervals. Additionally only the given amount of samples | will be returned for unbounded directions. | Sampling given as a list [stepsize, amount of samples, 'c'] | 's' : symmetric sampling. This option tries to fit as many samples as | possible of the given stepsize in the interval in a symmetric fashion | around its center. anchor : list, tuple or None Anchor point to use for the sampling process, given as a list/tuple of anchors for each direction. Directions where the anchor is None will be sampled normally, while for the others the sampling process ensures that the given set of values appears as one of the samples. For example for the anchor (2, None, 3) the resulting discrete net will attain the value (2, x, 3) at some point of its domain, where x is some value depending on the sampling of the second direction. Note that this option is not available for the options 's', 't' and 'c'. atol : list, int or float Tolerance(s) used in the sampling process. Either pass a single value to use for all directions, or a list of length of the dimension of the domain of floats/ints. This function uses the global tolerance defaults if `atol` or `rtol` are set to None. See :py:mod:`ddg.nonexact` for details. Returns ------- ddg.nets.DiscreteNet See also -------- sampling_decomposer """ if isinstance(domain, EmptyDomain): return DiscreteNet(lambda: None, domain) if not isinstance(domain, SmoothRectangularDomain): raise TypeError( "Domain does not support given type: {}".format(domain.__class__.__name__) ) if domain.dimension == 0: return DiscreteNet(lambda: None, domain) sample, options, atols, anchor_ = sampling_decomposer( sampling, atol, anchor, domain.dimension ) options_with_periodicity = options.copy() for idx in domain.periodicity: options_with_periodicity[idx] += "p" sample_intervals_output = list( map(sample_interval, domain, sample, options_with_periodicity, atols, anchor_) ) intervals = [interval for interval, _, _ in sample_intervals_output] nums = [num for _, num, _ in sample_intervals_output] sampling_functions = [f for _, _, f in sample_intervals_output] def g(*xs): return [f(x) for f, x in zip(sampling_functions, xs)] def discrete_interval(sampled_interval, num): (start, end) = sampled_interval if start == -np.inf and end == np.inf: return (-np.inf, np.inf) elif start == -np.inf: return (-np.inf, 0) elif end == np.inf: return (0, np.inf) else: return (0, num - 1) discrete_intervals = list(map(discrete_interval, intervals, nums)) periodicities = [ (i in domain.periodicity) and nonexact.allclose(interval, domain_interval, atol=atol, rtol=0.0) for interval, (i, domain_interval), atol in zip( intervals, enumerate(domain), atols ) ] discrete_intervals_with_periodicities = [ interval + (p,) for interval, p in zip(discrete_intervals, periodicities) ] sampledomain = DiscreteRectangularDomain(discrete_intervals_with_periodicities) samplingnet = DiscreteNet(g, sampledomain, name="SamplingNet") return samplingnet
[docs]@applicable_to_netcollection def sample_smooth_net(net, sampling, anchor=None, atol=1e-8, name=None): """ Samples a smooth net. Parameters ---------- net : ddg.nets.SmoothNet Smooth net to sample. sampling : list, int or float See sample_smooth_domain. anchor : list or None Anchor point for sampling process. atol : list, int or float Tolerance(s) for sampling process. name : string Name of the sampled net. If None, uses net.name instead. Returns ------- sampled_net ddg.nets.net.DiscreteNet or ddg.nets.net.EmptyNet or ddg.nets.net.PointNet See Also -------- sample_smooth_domain """ samplingnet = sample_smooth_domain(net.domain, sampling, anchor=anchor, atol=atol) def discrete_fct(*args, **kwargs): return net(*samplingnet(*args)) if name is None: name = net.name if isinstance(net, EmptyNet) or isinstance(net, PointNet): return net dnet = DiscreteNet(discrete_fct, samplingnet.domain, name=name) return dnet