In this exercise, we will introduce standard sampling procedure with a simple toy distribution: a bivariate Gaussian! To this end, we will only use samples from univariate Gaussians. Note that the method we will look at are not the best one in order to sample from this distribution, our goal is to implement ancestral sampling and MCMC on a simple example that we can easily visualize.
import numpy as np
import scipy.stats
import matplotlib.pyplot as plt
import itertools
import random
import math
import time
%matplotlib inline
The following function can be used to plot samples. If you pass two arrays of samples, the first one will be in blue and the second one in red. You can then visualize if your generated samples looks like coming from the same distribution as the target.
Note: this may not be optimal if you have a vision deficiency. Please let me know if this is the case so we can work out a better plotting format.
# samples1-2 shape must be (n samples, 2)
def plot_samples(samples1, samples2=None):
fig, ax = plt.subplots()
ax.scatter(samples1[:,0], samples1[:,1], marker="x", color="blue")
if samples2 is not None:
ax.scatter(samples2[:,0], samples2[:,1], marker="x", color="red")
The following class implement an univariate Gaussian distribution. Note that we initialize it with the standard deviation instead of the variance. We use the numpy function to sample: https://numpy.org/doc/stable/reference/random/generated/numpy.random.normal.html
class UnivariateGaussian:
def __init__(self, mean, std):
self.mean = mean
self.std = std
def sample(self, n_samples=1):
return np.random.normal(self.mean, self.std, n_samples)
The bivariate gaussian class takes 5 arguments:
The sample function can be used to directly generate samples: this is the function we will try to recode using ancestral and MCMC sampling. Here we again use the numpy function as a reference.
For ancestral and Gibbs sampling we will need to sample from the conditional distribution, i.e. $p(x_1 | x_2)$ and $(x_2 | x_1)$. They are univariate Gaussians. The first exercise is too code these two functions:
If you don't know how to compute the mean and standards deviation of these conditional distribution, you can watch the following video: https://www.youtube.com/watch?v=fb8uE4NM2fc
We will also need the marginal distributions $p(x_1)$ and $p(x_2)$. As you can see, they are straighforward to compute. :)
class BivariateGaussian:
def __init__(self, mean1, std1, mean2, std2, p):
self.mean1 = mean1
self.std1 = std1
self.mean2 = mean2
self.std2 = std2
self.p = p
def _get_param_vecs(self):
mean = np.array([self.mean1, self.mean2])
cov = np.array(
[
[self.std1 ** 2, self.p * self.std1 * self.std2],
[self.p * self.std1 * self.std2, self.std2 ** 2]
]
)
return mean, cov
def sample(self, n_samples=1):
mean, cov = self._get_param_vecs()
return np.random.multivariate_normal(mean, cov, n_samples)
def pdf(self, x):
mean, cov = self._get_param_vecs()
return scipy.stats.multivariate_normal.pdf(x, mean=mean, cov=cov)
# Marginal distributions are easy to compute for bivariate Gaussians! :)
def marginal_x1(self):
return UnivariateGaussian(self.mean1, self.std1)
def marginal_x2(self):
return UnivariateGaussian(self.mean2, self.std2)
# should return the distribution p(x_1 | x_2),
# which is usefull both for ancestral sampling and Gibbs sampling
def x1_cond_x2(self, x2):
#TODO
...
return UnivariateGaussian(mean, std)
# should return the distribution p(x_2 | x_1)
def x2_cond_x1(self, x1):
# TODO
...
return UnivariateGaussian(mean, std)
target_bi_gaussian = BivariateGaussian(1, 1.5, 1, 0.7, -0.9)
plot_samples(target_bi_gaussian.sample(100))
The method consist of sampling from $p(x_1, x_2)$ as follows:
Of course, one can first sample $x_2$ and then $x_1$.
n_samples = 100
# we fix the shape the we can easily concatenate
# the two array before calling the plotting function.
# In the code, use inplace operations to fill these values!
x = np.empty((n_samples, 1))
y = np.empty((n_samples, 1))
x[:, 0] = target_bi_gaussian.marginal_x1().sample(n_samples)
for i in range(n_samples):
y[i] = target_bi_gaussian.x2_cond_x1(x[i]).sample(1)
samples = np.concatenate([x, y], 1)
# blue points: points sampled via numpy function
# red points: points sampled via ancestral sampling
plot_samples(target_bi_gaussian.sample(100), samples)
def gibbs_sampling(bivariate_gaussian, n_samples, n_burnin=0, n_skip=0):
# note that we could initialize randomly instead of 0
point = np.zeros(2)
# array of samples that we will return
samples = np.empty((n_samples, 2))
# number of generated samples
# we keep track of it so its easier to implement
# burn-in and sample skip.
n_filled = 0
# just a "while true" loop
for i in itertools.count(start=0):
# in the loop, we must update the variable point following
# the gibbs sampling rule!
# TODO...
...
# check if we keep this sample or not
if i >= n_burnin and (i - n_burnin) % (n_skip + 1) == 0:
samples[n_filled] = point
n_filled += 1
# break the loop if we sampled the required number of points
if n_filled == n_samples:
break
return samples
n_samples = 100
n_burnin = 100
n_skip = 100
samples = gibbs_sampling(target_bi_gaussian, n_samples, n_burnin, n_skip)
plot_samples(target_bi_gaussian.sample(n_samples), samples)
In the Metropolis sampling algorithm, it is useful to check the acceptance rate. A bad acceptance rate is an indication that we are take "too big steps" in the sampling procedure. If you have a very small acceptance rate (e.g. < 0.3), many of the samples the function return could be equal. The step magnitude depends on the noise_std argument, which is the standard deviant of the Gaussian you use to propose a new point (which can then be either accepted or rejected)
def metropolis_sampling(bivariate_gaussian, n_samples, noise_std = 1, n_burnin=0, n_skip=0):
point = np.zeros(2)
samples = np.empty((n_samples, 2))
n_filled = 0
accept = 0
reject = 0
for i in itertools.count(start=0):
# TODO:
# remember that you must first sample a new point,
# and then accept or reject it.
# You need to use both:
# - np.random.normal
# - and random.uniform
# Note that here you don't use the unnormalized probability,
# just use the PDF!
#
# and don't forget to update variables accept and reject
# to keep track of how many points have been accepted or rejected
...
if i >= n_burnin and (i - n_burnin) % (n_skip + 1) == 0:
samples[n_filled] = point
n_filled += 1
if n_filled == n_samples:
break
print("Acceptance rate: ", accept / (accept + reject))
return samples
n_samples = 100
n_burnin = 100
n_skip = 100
noise_std = 1
samples = metropolis_sampling(target_bi_gaussian, n_samples, noise_std, n_burnin, n_skip)
plot_samples(target_bi_gaussian.sample(n_samples), samples)