Source code for multineas.multimin

import numpy as np
from matplotlib import pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
import itertools
import math
import string
from time import time,strftime
from scipy.stats import multivariate_normal as multinorm
from scipy.optimize import minimize
import pickle
from hashlib import md5
import os

from .util import Util, Stats
from .plot import CornerPlot, multineas_watermark

[docs] class ComposedMultiVariateNormal(object): """ A linear combination of multivariate normal distribution (MND) with special methods for specifying the parameters of the distributions. Attributes ---------- Ngauss : int Number of composed MND. Nvars : int Number of random variables. mus : numpy.ndarray Array with average (mu) of random variables (Ngauss x Nvars). weights : numpy.ndarray Array with weights of each MND (Ngauss). NOTE: These weights are normalized at the end. sigmas : numpy.ndarray Standard deviation of each variable(Ngauss x Nvars). rhos : numpy.ndarray Elements of the upper triangle of the correlation matrix (Ngauss x Nvars x (Nvars-1)/2). Sigmas : numpy.ndarray Array with covariance matrices for each MND (Ngauss x Nvars x Nvars). params : numpy.ndarray Parameters of the distribution in flatten form including symmetric elements of the covariance matrix (Ngauss*(1+Nvars+Nvars*(Nvars+1)/2)). stdcorr : numpy.ndarray Parameters of the distribution in flatten form, including upper triangle of the correlation matrix (Ngauss*(1+Nvars+Nvars*(Nvars+1)/2)). Notes ----- There are several ways of initialize a CMND: 1. Providing: Ngauss and Nvars In this case the class is instantiated with zero means, unitary dispersion and covariance matrix equal to Ngasus identity matrices Nvars x Nvars. 2. Providing: params, Nvars In this case you have a flatted version of the parametes (weights, mus, Sigmas) and want to instantiate the system. All parameters are set and no other action is required. 3. Providing: stdcorr, Nvars In this case you have a flatted version of the parametes (weights, mus, sigmas, rhos) and want to instantiate the system. All parameters are set and no other action is required. 4. Providing: weights, mus, Sigmas (optional) In this case the basic properties of the CMND are set. Examples -------- >>> mus = [[0, 0], [1, 1]] >>> weights = [0.1, 0] >>> MND1 = ComposedMultiVariateNormal(mus=mus, weights=weights) >>> MND1.set_sigmas([[[1, 0.2], [0, 1]], [[1, 0], [0, 1]]]) >>> print(MND1) >>> params = [0.1, 0.9, 0, 0, 1, 1, 1, 0.2, 0.2, 1, 1, 0, 0, 1] >>> MND2 = ComposedMultiVariateNormal(params=params, Nvars=2) >>> print(MND2) """ #Control behavior _ignoreWarnings=True def __init__(self, Ngauss=0, Nvars=0, params=None, stdcorr=None, weights=None,mus=None,Sigmas=None): #Method 1: initialize a simple instance if Ngauss>0: mus=[[0]*Nvars]*Ngauss weights=[1/Ngauss]*Ngauss Sigmas=[np.eye(Nvars)]*Ngauss self.__init__(mus=mus,weights=weights,Sigmas=Sigmas) #Method 2: initialize from flatten parameters elif params is not None: self.set_params(params,Nvars) #Method 3: initialize from flatten parameters elif stdcorr is not None: self.set_stdcorr(stdcorr,Nvars) #Method 4: initialize from explicit arrays else: #Basic attributes mus=np.array(mus) try: mus[0,0] except Exception as e: Util.error_msg(e,"Parameter 'mus' must be a matrix, eg. mus=[[0,0]]") raise self.mus=mus #Number of variables self.Ngauss=len(mus) self.Nvars=len(mus[0]) #Weights and normalization if weights is None: self.weights=[1]+(self.Ngauss-1)*[0] elif len(weights)!=self.Ngauss: raise ValueError(f"Length of weights array ({len(weights)}) must be equal to number of MND ({self.Ngauss})") else: self._normalize_weights(weights) #Secondary attributes if Sigmas is None: self.Sigmas=None self.params=None else: self.set_sigmas(Sigmas) self._nerror=0
[docs] def set_sigmas(self,Sigmas): """ Set the value of list of covariance matrices. After setting Sigmas it update params and stdcorr. Parameters ---------- Sigmas : list or numpy.ndarray Array of covariance matrices. Attr. [HC] """ self.Sigmas=np.array(Sigmas) self._check_sigmas() self._flatten_params() self._flatten_stdcorr()
[docs] def set_params(self,params,Nvars): """ Set the properties of the CMND from flatten params. After setting it generate flattend stdcorr and normalize weights. Parameters ---------- params : list or numpy.ndarray Flattened parameters. Nvars : int Number of variables. Attr. [HC] """ if Nvars==0 or len(params)==0: raise ValueError(f"When setting from flat params, Nvars ({Nvars}) cannot be zero") self._unflatten_params(params,Nvars) self._normalize_weights(self.weights) return
[docs] def set_stdcorr(self,stdcorr,Nvars): """ Set the properties of the CMND from flatten stdcorr. After setting it generate flattened params and normalize weights. Parameters ---------- stdcorr : list or numpy.ndarray Flattened standard deviations and correlations. Nvars : int Number of variables. Attr. [HC] """ if Nvars==0 or len(stdcorr)==0: raise ValueError(f"When setting from flat params, Nvars ({Nvars}) cannot be zero") self._unflatten_stdcorr(stdcorr,Nvars) self._normalize_weights(self.weights) return
def _normalize_weights(self,weights): """ Normalize weights in such a way that sum(weights)=1 Attr. [HC] """ self.weights=np.array(weights)/sum(np.array(weights)) def _flatten_params(self): """ Flatten params Attr. [HC] """ self._check_params(self.Sigmas) #Flatten covariance matrix SF=[Stats.flatten_symmetric_matrix(self.Sigmas[i]).tolist() for i in range(self.Ngauss)] self.params=np.concatenate((self.weights.flatten(), self.mus.flatten(), list(itertools.chain(*SF)))) self.Npars=len(self.params) #Ngauss*(1+Nvars+Nvar*(Nvars+1)/2) def _flatten_stdcorr(self): """ Flatten stdcorr Attr. [HC] """ self._check_params(self.sigmas) #Flatten stds. and correlations self.stdcorr=np.concatenate((self.weights.flatten(), self.mus.flatten(), self.sigmas.flatten(), self.rhos.flatten() )) self.Ncor=len(self.stdcorr) def _unflatten_params(self,params,Nvars): """ Unflatten properties from params Attr. [HC] """ self.params=np.array(params) self.Npars=len(self.params) factor=int(1+Nvars+Nvars*(Nvars+1)/2) if (self.Npars%factor)!=0: raise AssertionError(f"The number of parameters {self.Npars} is incompatible with the provided number of variables ({Nvars})") #Number of gaussian functions Ngauss=int(self.Npars/factor) #Get the weights i=0 weights=self.params[i:Ngauss] i+=Ngauss #Get the mus mus=self.params[i:i+Ngauss*Nvars].reshape(Ngauss,Nvars) i+=Ngauss*Nvars #Get the sigmas Nsym=int(Nvars*(Nvars+1)/2) Sigmas=np.zeros((Ngauss,Nvars,Nvars)) [Stats.unflatten_symmetric_matrix(F,Sigmas[i]) for i,F in enumerate(self.params[i:i+Ngauss*Nsym].reshape(Ngauss,Nsym))] #Normalize weights self._normalize_weights(weights) #Check Sigmas self.Nvars=Nvars self.Ngauss=Ngauss self.weights=weights self.mus=mus self.Sigmas=Sigmas self._check_sigmas() #Flatten correlations self._flatten_stdcorr() def _unflatten_stdcorr(self,stdcorr,Nvars): """ Unflatten properties from stdcorr Attr. [HC] """ self.stdcorr=np.array(stdcorr) self.Ncor=len(self.stdcorr) factor=int(1+Nvars+Nvars*(Nvars+1)/2) if (self.Ncor%factor)!=0: raise AssertionError(f"The number of parameters {self.Ncor} is incompatible with the provided number of variables ({Nvars})") #Number of gaussian functions Ngauss=int(self.Ncor/factor) #Get the weights i=0 weights=self.stdcorr[i:Ngauss] i+=Ngauss #Get the mus mus=self.stdcorr[i:i+Ngauss*Nvars].reshape(Ngauss,Nvars) i+=Ngauss*Nvars #Get the sigmas sigmas=self.stdcorr[i:i+Ngauss*Nvars].reshape(Ngauss,Nvars) i+=Ngauss*Nvars #Get the rhos Noff=int(Nvars*(Nvars-1)/2) rhos=self.stdcorr[i:i+Ngauss*Noff].reshape(Ngauss,Noff) #Normalize weights self._normalize_weights(weights) #Set properties self.Nvars=Nvars self.Ngauss=Ngauss self.weights=weights self.mus=mus self.sigmas=sigmas self.rhos=rhos #Generate Sigma self.Sigmas=Stats.calc_covariance_from_correlations(self.sigmas,self.rhos) self._check_sigmas() #Flatten params self._flatten_params() def _check_sigmas(self): """ Check value of sigmas Attr. [HC] """ self._check_params(self.Sigmas) #Check matrix if len(self.Sigmas)!=self.Ngauss: raise ValueError(f"You provided {len(self.Sigmas)} matrix, but Ngauss={self.Ngauss} are required") elif self.Sigmas[0].shape!=(self.Nvars,self.Nvars): raise ValueError(f"Matrices have wrong dimensions ({self.Sigmas[0].shape}). It should be {self.Nvars}x{self.Nvars}") #Symmetrize for i in range(self.Ngauss): self.Sigmas[i]=np.triu(self.Sigmas[i])+np.tril(self.Sigmas[i].T,-1) """ #This check can be done, but it can be a problem when fitting if not np.all(np.linalg.eigvals(self.Sigmas[i])>0): raise ValueError(f"Matrix {i+1}, {self.Sigmas[i].tolist()} is not positive semidefinite.") """ #Get sigmas and correlations self.sigmas,self.rhos=Stats.calc_correlations_from_covariances(self.Sigmas) def _check_params(self,checkvar=None): """ Check if parameters are set. Attr. [HC] """ if checkvar is None: raise AssertionError("You must first set the parameters (Sigmas, mus, etc.)")
[docs] def pdf(self,X): """ Compute the PDF. Parameters ---------- X : numpy.ndarray Point in the Nvar-dimensional space (Nvar). Returns ------- p : float PDF value at X. Attr. [HC] """ self._check_params(self.params) self._nerror=0 value=0 for w,muvec,Sigma in zip(self.weights,self.mus,self.Sigmas): try: value+=w*multinorm.pdf(X,muvec,Sigma) except Exception as error: if not self._ignoreWarnings: print(f"Error: {error}, params = {self.params.tolist()}, stdcorr = {self.params.tolist()}") self._nerror+=1 value+=0 return value
[docs] def rvs(self,Nsam=1): """ Generate a random sample of points following this Multivariate distribution. Parameters ---------- Nsam : int, optional Number of samples (default 1). Returns ------- rs : numpy.ndarray Samples (Nsam x Nvars). Attr. [HC] """ self._check_params(self.params) Xs=np.zeros((Nsam,self.Nvars)) for i in range(Nsam): n=Stats.gen_index(self.weights) Xs[i]=multinorm.rvs(self.mus[n],self.Sigmas[n]) return Xs
[docs] def sample_cmnd_likelihood(self,uparams,data=None,pmap=None,tset="stdcorr",scales=[],verbose=0): """ Compute the negative value of the logarithm of the likelihood of a sample. Parameters ---------- uparams : numpy.ndarray Minimization parameters (unbound). data : numpy.ndarray, optional Data for which log_l is computed. pmap : function, optional Routine to map from minparams to params or stdcorr. Example: >>> def pmap(minparams): ... stdcorr = np.array([1] + list(minparams)) ... stdcorr[-1:] -= 1 ... return stdcorr tset : str, optional Type of minimization parameters. Values "params", "stdcorr" (default "stdcorr"). scales : list, optional List of scales for transforming uparams (unbound) in minparams (natural scale). verbose : int, optional Verbosity level (0, none, 1: input parameters, 2: full definition of the CMND) (default 0). Returns ------- log_l : float Negative log-likelihood. Attr. [HC] """ #Map unbound minimization parameters into their right range minparams=np.array(Util.t_if(uparams,scales,Util.u2f)) #Map minimizaiton parameters into CMND parameters params=np.array(pmap(minparams)) if verbose>=1: print("*"*80) print(f"Minimization parameters: {minparams.tolist()}") print(f"CMND parameters: {params.tolist()}") #Update CMND parameters according to type of minimization parameters if tset=="params":self.set_params(params,self.Nvars) else:self.set_stdcorr(params,self.Nvars) if verbose>=2: print("CMND:") print(self) #Compute PDF for each point in data and sum log_l=-np.log(self.pdf(data)).sum() if verbose>=1: print(f"-log_l = {log_l:e}") return log_l
[docs] def plot_sample(self,data=None, N=10000, props=None,ranges=None, figsize=2,sargs=dict(),hargs=None): """ Plot a sample of the CMND. Parameters ---------- data : numpy.ndarray, optional Data to plot. If None it generate a sample. N : int, optional Number of points to generate the sample (default 10000). props : list, optional Array with the name of the properties. Ex. props=["x","y"]. ranges : list, optional Array of ranges of the properties. Ex. ranges=[-3,3],[-5,5]. figsize : int, optional Size of each axis (default 2). sargs : dict, optional Dictionary with options for the scatter plot. Ex. sargs=dict(color='r'). hargs : dict, optional Dictionary with options for the hist2d function. Ex. hargs=dict(bins=50). Returns ------- G : matplotlib.figure.Figure or CornerPlot Graphic handle. If Nvars = 2, it is a figure object, otherwise is a CornerPlot instance. Examples -------- >>> G = CMND.plot_sample(N=10000, sargs=dict(s=1, c='r')) >>> G = CMND.plot_sample(N=1000, sargs=dict(s=1, c='r'), hargs=dict(bins=20)) >>> CMND = ComposedMultiVariateNormal(Ngauss=1, Nvars=2) >>> fig = CMND.plot_sample(N=1000, hargs=dict(bins=20), sargs=dict(s=1, c='r')) >>> CMND = ComposedMultiVariateNormal(Ngauss=2, Nvars=3) >>> print(CMND) >>> mus = [[0, 0], [1, 1]] >>> weights = [0.1, 0.9] >>> Sigmas = [[[1, 0.2], [0, 1]], [[1, 0], [0, 1]]] >>> MND1 = ComposedMultiVariateNormal(mus=mus, weights=weights, Sigmas=Sigmas) >>> #MND1=ComposedMultiVariateNormal(mus=mus,weights=weights);MND1.set_sigmas(Sigmas) >>> print(MND1) >>> print(MND1.pdf([1, 1])) >>> params = [0.1, 0.9, 0.0, 0.0, 1.0, 1.0, 1.0, 0.2, 1.0, 1.0, 0.0, 1.0] >>> MND2 = ComposedMultiVariateNormal(params=params, Nvars=2) >>> print(MND2) >>> print(MND2.pdf([1, 1])) Attr. [HC] """ if data is None: self.data=self.rvs(N) else: self.data=np.copy(data) properties=dict() for i in range(self.Nvars): symbol=string.ascii_letters[i] if props is None else props[i] rang=None if ranges is None else ranges[i] properties[symbol]=dict(label=f"${symbol}$",range=rang) if self.Nvars>2: G=CornerPlot(properties,figsize=figsize) if hargs is not None: G.plot_hist(self.data,**hargs) G.scatter_plot(self.data,**sargs); G.fig.tight_layout() return G else: keys=list(properties.keys()) fig=plt.figure(figsize=(5,5)) ax=fig.gca() if hargs is not None: ax.hist2d(self.data[:,0],self.data[:,1],**hargs) #Experimental #sns.kdeplot(x=data[:,0],y=data[:,1],shade=True,ax=ax) ax.scatter(self.data[:,0],self.data[:,1],**sargs) ax.set_xlabel(properties[keys[0]]["label"]) ax.set_ylabel(properties[keys[1]]["label"]) multineas_watermark(ax) fig.tight_layout() return fig
def _str_params(self): """ Generate strings explaining which quantities are stored in the flatten arrays params and stdcorr. It also generate and aray with the bounds applicable to the stdcorr parameters for purposes of minimization. Returns ------- None Notes ----- Set the value of: - str_params: list of properties in params, str - str_stdcorr: list of properties in stdcorr, str. - bnd_stdcorr: bounds of properties in stdcorr applicable for transforming to unbound. """ str_params="[" bnd_stdcorr="[" #Probabilities for n in range(self.Ngauss): str_params+=f"p{n+1}," bnd_stdcorr+=f"1," #Mus for n in range(self.Ngauss): for i in range(self.Nvars): str_params+=f{n+1}_{i+1}," bnd_stdcorr+=f"0," str_stdcorr=str_params #Std. devs for n in range(self.Ngauss): for i in range(self.Nvars): str_stdcorr+=f{n+1}_{i+1}," bnd_stdcorr+=f"10," #Sigmas for n in range(self.Ngauss): for i in range(self.Nvars): for j in range(self.Nvars): if j>=i: str_params+=f{n+1}_{i+1}{j+1}," if j>i: str_stdcorr+=f{n+1}_{i+1}{j+1}," bnd_stdcorr+=f"2," self.str_params=str_params.strip(",")+"]" self.str_stdcorr=str_stdcorr.strip(",")+"]" self.bnd_stdcorr=bnd_stdcorr.strip(",")+"]" def __str__(self): """ Generates a string version of the object """ self.Npars=len(self.params) self.Ncor=len(self.stdcorr) self._str_params() msg=f"""Composition of Ngauss = {self.Ngauss} gaussian multivariates of Nvars = {self.Nvars} random variables: Weights: {self.weights.tolist()} Number of variables: {self.Nvars} Averages (μ): {self.mus.tolist()} """ if self.Sigmas is None: msg+=f""" Sigmas: (Not defined yet) Params: (Not defined yet) """ else: msg+=f""" Standard deviations (σ): {self.sigmas.tolist()} Correlation coefficients (ρ): {self.rhos.tolist()} Covariant matrices (Σ): {self.Sigmas.tolist()} Flatten parameters: With covariance matrix ({self.Npars}): {self.str_params} {self.params.tolist()} With std. and correlations ({self.Ncor}): {self.str_stdcorr} {self.stdcorr.tolist()}""" return msg
[docs] class FitCMND(): """ CMND Fitting handler. Attributes ---------- Ngauss : int Number of fitting MND. Nvars : int Number of variables in each MND. cmnd : ComposedMultiVariateNormal Fitting object of the class CMND. This object will have the result of the fitting procedure. solution : scipy.optimize.OptimizeResult Once the fitting is completed the solution object is returned. Attributes include: - fun: value of the function in the minimum. - x: value of the minimization parameters in the minimum. - nit: number of iterations of the minimization algorithm. - nfev: number of evaluations of the function. - success: if True it implies that the minimization fullfills all the conditions. Ndim : int Number of mus. Ncorr : int Number of correlations. minparams : numpy.ndarray Array of minimization parameters at any stage in minimization process. scales : numpy.ndarray Array of scales to convert minparams to uparams (unbound) and viceversa. uparams : numpy.ndarray Array of unbound minimization parameters. Notes ----- Objects of this class must be always initialized with the number of gaussians and the number of random variables. Examples -------- >>> np.random.seed(1) >>> weights = [[1.0]] >>> mus = [[1.0, 0.5, -0.5]] >>> sigmas = [[1, 1.2, 2.3]] >>> angles = [[10*Angle.Deg, 30*Angle.Deg, 20*Angle.Deg]] >>> Sigmas = Stats.calc_covariance_from_rotation(sigmas, angles) >>> CMND = ComposedMultiVariateNormal(mus=mus, weights=weights, Sigmas=Sigmas) >>> data = CMND.rvs(10000) >>> F = FitCMND(Ngauss=CMND.Ngauss, Nvars=CMND.Nvars) >>> F.cmnd._fevfreq = 200 >>> bounds = None >>> # bounds = F.set_bounds(boundw=(0.1, 0.9)) >>> # bounds = F.set_bounds(boundr=(-0.9, 0.9)) >>> # bounds = F.set_bounds(bounds=(0.1, 0.9*F._sigmax)) >>> # bounds = F.set_bounds(boundsm=((-3, 3), (-2, 2), (-2, 2)), boundw=(0.1, 0.9), bounds=(0.1, 0.9*F._sigmax), boundr=(-0.9, 0.9)) >>> print(bounds) >>> Util.el_time(0) >>> # F.fit_data(data, verbose=0, tol=1e-3, options=dict(maxiter=100, disp=True), bounds=bounds) >>> F.fit_data(data, verbose=0, tol=1e-3, options=dict(maxiter=100, disp=True), method=None, bounds=bounds) >>> T = Util.el_time() >>> print(F.cmnd) >>> G = F.plot_fit(figsize=3, hargs=dict(bins=30, cmap='YlGn'), sargs=dict(s=0.5, edgecolor='None', color='r')) >>> F.save_fit("/tmp/fit.pkl", useprefix=False) >>> F._load_fit("/tmp/fit.pkl") >>> F.save_fit("/tmp/nuevo.pkl", useprefix=True, myprefix="test") """ #Constants #Maximum value of sigma _sigmax=10 _ignoreWarnings=True def __init__(self,objfile=None,Ngauss=1,Nvars=2): """ Initialize FitCMND object. Attribution ----------- [HC] This method was mostly developed by human intelligences. """ if objfile is not None: self._load_fit(objfile) else: #Basic attributes self.Ngauss=Ngauss self.Nvars=Nvars self.Ndim=Ngauss*Nvars self.Ncorr=int(Nvars*(Nvars-1)/2) #Define the model cmnds self.cmnd=ComposedMultiVariateNormal(Ngauss=Ngauss,Nvars=Nvars) #Set parameters self.set_params() #Other self.fig=None self.prefix=""
[docs] def set_params(self,mu=0.5,sigma=1.0,rho=0.5): """ Set the value of the basic params (minparams, scales, etc.). It updates minparams, scales and uprams. Parameters ---------- mu : float, optional Value of all initial mus (default 0.5). sigma : float, optional Value of all initial sigmas (default 1.0). rho : float, optional Value of all initial rhos (default 0.5). Returns ------- None """ #Define the initial parameters # mus sigmas correlations minparams=[mu]*self.Ndim+[sigma]*self.Ndim+[1+rho]*self.Ngauss*self.Ncorr scales=[0]*self.Ndim+[self._sigmax]*self.Ndim+[2]*self.Ngauss*self.Ncorr if self.Ngauss>1: self.extrap=[] minparams=[1/self.Ngauss]*self.Ngauss+minparams scales=[1]*self.Ngauss+scales else: self.extrap=[1] self.minparams=np.array(minparams) self.scales=np.array(scales) self.uparams=Util.t_if(self.minparams,self.scales,Util.f2u)
[docs] def pmap(self,minparams): """ Mapping routine used in sample_cmnd_likelihood. Mapping may change depending on the complexity of the parameters to be minimized. Here we assume that all parameters in the stdcorr vector is susceptible to be minimized (with the exception of weights in the case of Ngauss=1 when this parameter should not be included). Parameters ---------- minparams : numpy.ndarray Minimization parameters. Returns ------- stdcorr : numpy.ndarray Flatten parameters with correlations. """ stdcorr=np.array(self.extrap+list(minparams)) stdcorr[-self.Ngauss*self.Ncorr:]-=1 return stdcorr
[docs] def log_l(self,data): """ Value of the -log(Likelihood). Parameters ---------- data : numpy.ndarray Array with data (Nsam x Nvars). Returns ------- log_l : float Value of the -log(Likelihood). """ log_l=self.cmnd.sample_cmnd_likelihood(self.uparams, data=data, pmap=self.pmap, tset="stdcorr", scales=self.scales) return log_l
[docs] def fit_data(self,data,verbose=0,advance=0,**args): """ Minimization procedure. It updates the solution attribute. Parameters ---------- data : numpy.ndarray Array with data (Nsam x Nvars). verbose : int, optional Verbosity level for the sample_cmnd_likelihood routine (default 0). advance : int, optional If larger than 0 show advance each "advance" iterations (default 0). **args : dict Options of the minimize routine (eg. tol=1e-6). A particularly interesting parameter is the minimization method. Available methods: * Slow but sure: Powell * Fast but unsure: CG, BFGS, COBYLA, SLSQP Returns ------- None Examples -------- >>> F = FitCMND(1, 3) >>> F.fit_data(data, verbose=0, tol=1e-3, options=dict(maxiter=100, disp=True)) """ if advance: advance=int(advance) self.neval=0 def _advance(X,show=False): if self.neval==0: print(f"Iterations:") if self.neval%advance==0 or show: vars = np.array2string(X, separator=', ', precision=4, max_line_width=np.inf, formatter={'float_kind':lambda x: f"{x:.2g}"}) fun = self.cmnd.sample_cmnd_likelihood(X,data,self.pmap,"stdcorr",self.scales,verbose) print(f"Iter {self.neval}:\n\tVars: {vars}\n\tLogL/N: {fun/len(data)}") self.neval+=1 else: _advance = None self.data=np.copy(data) self.cmnd._ignoreWarnings=self._ignoreWarnings self.minargs=dict(method="Powell") self.minargs.update(args) self.solution=minimize(self.cmnd.sample_cmnd_likelihood, self.minparams, callback=_advance, args=(data,self.pmap,"stdcorr",self.scales,verbose), **self.minargs) if advance: _advance(self.solution.x,show=True) self.uparams=self.solution.x #Set the new params #Set the new params self.minparams=Util.t_if(self.uparams,self.scales,Util.u2f) params=self.pmap(self.minparams) self.cmnd.set_stdcorr(params,self.Nvars) self._update_prefix()
def _load_fit(self,objfile): """ Load a fit object from file. Attribution ----------- [HC] This method was mostly developed by human intelligences. """ F=pickle.load(open(objfile,"rb")) for k in F.__dict__.keys(): setattr(self,k,getattr(F,k)) self._update_prefix()
[docs] def plot_fit(self,N=10000,figsize=2,props=None,ranges=None,hargs=dict(),sargs=dict()): """ Plot the result of the fitting procedure. Parameters ---------- N : int, optional number of points used to build a representation of the marginal distributions (default 10000). figsize : int, optional Size of each axis (default 2). props : list, optional Array with the name of the properties. Ex. props=["x","y"]. ranges : list, optional Array of ranges of the properties. Ex. ranges=[-3,3],[-5,5]. hargs : dict, optional Dictionary with options for the hist2d function. Ex. hargs=dict(bins=50). sargs : dict, optional Dictionary with options for the scatter plot. Ex. sargs=dict(color='r'). Returns ------- G : matplotlib.figure.Figure or CornerPlot Graphic handle. Examples -------- >>> F = FitCMND(1, 3) >>> F.fit_data(data, verbose=0, tol=1e-3, options=dict(maxiter=100, disp=True)) >>> G = F.plot_fit(figsize=3, hargs=dict(bins=30, cmap='YlGn'), sargs=dict(s=0.5, edgecolor='None', color='r')) """ Xfits=self.cmnd.rvs(N) properties=dict() for i in range(self.Nvars): symbol=string.ascii_letters[i] if props is None else props[i] if ranges is not None: rang=ranges[i] else: rang=None properties[symbol]=dict(label=f"${symbol}$",range=rang) properties[symbol]=dict(label=f"${symbol}$",range=rang) if self.Nvars>2: G=CornerPlot(properties,figsize=figsize) G.plot_hist(Xfits,**hargs) G.scatter_plot(self.data,**sargs); G.fig.tight_layout() self.fig=G.fig return G else: keys=list(properties.keys()) fig=plt.figure(figsize=(5,5)) ax=fig.gca() ax.hist2d(Xfits[:,0],Xfits[:,1],**hargs) ax.scatter(self.data[:,0],self.data[:,1],**sargs) ax.grid() ax.set_xlabel(properties[keys[0]]["label"]) ax.set_ylabel(properties[keys[1]]["label"]) multineas_watermark(ax) fig.tight_layout() self.fig=fig return fig
def _inv_params(self,stdcorr): """ Invert stdcorr to minparams. Attribution ----------- [HC] This method was mostly developed by human intelligences. """ minparams=np.copy(stdcorr) minparams[-self.Ngauss*self.Ncorr:]+=1 self.minparams=minparams[1:] if self.Ngauss==1 else minparams def _update_prefix(self,myprefix=None): """ Update prefix of fit. Prefix has two parts: the number of gaussians used and a hash computed from the object. Prefix change if: - Data change. - Initial minimization parameters change (e.g. if the fit is ran twice) - Minimization parameters are changed. - Bounds are changed. Alternative prefix: >>> self.hash = md5(pickle.dumps([self.Ngauss, self.data])).hexdigest()[:5] >>> self.hash = md5(pickle.dumps(self.__dict__)).hexdigest()[:5] >>> self.hash = md5(pickle.dumps(self.minparams)).hexdigest()[:5] >>> self.hash = md5(pickle.dumps(self.cmnd)).hexdigest()[:5] """ self.hash=md5(pickle.dumps(self)).hexdigest()[:5] if myprefix is not None: myprefix=f"_{myprefix}" self.prefix=f"{self.Ngauss}cmnd{myprefix}_{self.hash}"
[docs] def save_fit(self,objfile=None,useprefix=True,myprefix=None): """ Pickle the result of a fit. Parameters ---------- objfile : str, optional Name of the file where the fit will be stored. If None, the name is set by the routine as FitCMND.pkl. useprefix : bool, optional Use a prefix in the filename of the pickle file (default True). The prefix is normally {Ngauss}cmnd_{hash}. myprefix : str, optional Custom prefix. Examples -------- If objfile="fit.pkl", the final filename will be fit-1mnd_asa33.pkl """ self.fig=None self._update_prefix(myprefix) if objfile is None: objfile=f"/tmp/FitCMND.pkl" if useprefix: parts=os.path.splitext(objfile) objfile=f"{parts[0]}-{self.prefix}{parts[1]}" pickle.dump(self,open(objfile,"wb"))
[docs] def set_bounds(self,boundw=None,bounds=None,boundr=None,boundsm=None): """ Set the minimization parameters. Parameters ---------- boundw : tuple, optional Bound of weights (default (-np.inf, np.inf)). bounds : tuple or list, optional Bounds of weights, mus, sigmas and rhos of each variable. boundr : tuple, optional Bound of rhos (default (-np.inf, np.inf)). boundsm : tuple of tuples, optional Bounds of averages (default (-np.inf, np.inf)). Normally the bounds on averages must be expressed in this way: boundsm = ((-min_1, max_1), (-min_2, max_2), ...) Example for Nvars = 2: boundsm = ((-2, 1), (-3, 0)) Returns ------- bounds : tuple Formatted bounds for minimization. """ if boundsm is None: boundsm=((-np.inf,np.inf),)*self.Nvars # Regular bounds if boundw is None: boundw=(-np.inf,np.inf) else: boundw=tuple([Util.f2u(bw,1) for bw in boundw]) if bounds is None: bounds=(-np.inf,np.inf) else: bounds=tuple([Util.f2u(bs,self._sigmax) for bs in bounds]) if boundr is None: boundr=(-np.inf,np.inf) else: boundr=tuple([Util.f2u(1+br,2) for br in boundr]) bounds=(*((boundw,)*self.Ngauss), *(boundsm*self.Ngauss), *((bounds,)*self.Nvars*self.Ngauss), *((boundr,)*self.Ngauss*int(self.Nvars*(self.Nvars-1)/2))) self.bounds=bounds if self.Ngauss==1: bounds=bounds[1:] self.cmnd._str_params() return bounds