{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Sampling from a bivariate Gaussian\n",
"\n",
"In this exercise, we will introduce standard sampling procedure with a simple toy distribution: a bivariate Gaussian!\n",
"To this end, we will only use samples from univariate Gaussians.\n",
"Note that the method we will look at are not the best one in order to sample from this distribution, our goal is to implement ancestral sampling and MCMC on a simple example that we can easily visualize.\n",
"\n",
"## Preamble"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import scipy.stats\n",
"import matplotlib.pyplot as plt\n",
"import itertools\n",
"import random\n",
"import math\n",
"import time\n",
"\n",
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following function can be used to plot samples. If you pass two arrays of samples, the first one will be in blue and the second one in red. You can then visualize if your generated samples looks like coming from the same distribution as the target.\n",
"\n",
"Note: this may not be optimal if you have a vision deficiency. Please let me know if this is the case so we can work out a better plotting format."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# samples1-2 shape must be (n samples, 2)\n",
"def plot_samples(samples1, samples2=None):\n",
" fig, ax = plt.subplots()\n",
"\n",
" ax.scatter(samples1[:,0], samples1[:,1], marker=\"x\", color=\"blue\")\n",
" if samples2 is not None:\n",
" ax.scatter(samples2[:,0], samples2[:,1], marker=\"x\", color=\"red\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following class implement an univariate Gaussian distribution.\n",
"Note that we initialize it with the standard deviation instead of the variance.\n",
"We use the numpy function to sample: https://numpy.org/doc/stable/reference/random/generated/numpy.random.normal.html"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class UnivariateGaussian:\n",
" def __init__(self, mean, std):\n",
" self.mean = mean\n",
" self.std = std\n",
" \n",
" def sample(self, n_samples=1):\n",
" return np.random.normal(self.mean, self.std, n_samples)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The bivariate gaussian class takes 5 arguments:\n",
"\n",
"- the mean and standard deviation for the first coordinate;\n",
"- the mean and standard deviation for the second coordinate;\n",
"- the correlation coefficient.\n",
"\n",
"The sample function can be used to directly generate samples: this is the function we will try to recode using ancestral and MCMC sampling. Here we again use the numpy function as a reference.\n",
"\n",
"For ancestral and Gibbs sampling we will need to sample from the conditional distribution, i.e. $p(x_1 | x_2)$ and $(x_2 | x_1)$. They are univariate Gaussians. The first exercise is too code these two functions:\n",
"\n",
"- x1_cond_x2: conditional distribution $p(x_1 | x_2)$\n",
"- x2_cond_x1: conditional distribution $p(x_2 | x_1)$\n",
"\n",
"If you don't know how to compute the mean and standards deviation of these conditional distribution, you can watch the following video: https://www.youtube.com/watch?v=fb8uE4NM2fc\n",
"\n",
"We will also need the marginal distributions $p(x_1)$ and $p(x_2)$. As you can see, they are straighforward to compute. :)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class BivariateGaussian:\n",
" def __init__(self, mean1, std1, mean2, std2, p):\n",
" self.mean1 = mean1\n",
" self.std1 = std1\n",
" self.mean2 = mean2\n",
" self.std2 = std2\n",
" self.p = p\n",
" \n",
" def _get_param_vecs(self):\n",
" mean = np.array([self.mean1, self.mean2])\n",
" cov = np.array(\n",
" [\n",
" [self.std1 ** 2, self.p * self.std1 * self.std2],\n",
" [self.p * self.std1 * self.std2, self.std2 ** 2]\n",
" ]\n",
" )\n",
" \n",
" return mean, cov\n",
" \n",
" def sample(self, n_samples=1):\n",
" mean, cov = self._get_param_vecs()\n",
" return np.random.multivariate_normal(mean, cov, n_samples)\n",
" \n",
" def pdf(self, x):\n",
" mean, cov = self._get_param_vecs()\n",
" return scipy.stats.multivariate_normal.pdf(x, mean=mean, cov=cov)\n",
" \n",
" # Marginal distributions are easy to compute for bivariate Gaussians! :)\n",
" def marginal_x1(self):\n",
" return UnivariateGaussian(self.mean1, self.std1)\n",
" \n",
" def marginal_x2(self):\n",
" return UnivariateGaussian(self.mean2, self.std2)\n",
" \n",
" # should return the distribution p(x_1 | x_2),\n",
" # which is usefull both for ancestral sampling and Gibbs sampling\n",
" def x1_cond_x2(self, x2):\n",
" #TODO\n",
" ...\n",
"\n",
" return UnivariateGaussian(mean, std)\n",
" \n",
" # should return the distribution p(x_2 | x_1)\n",
" def x2_cond_x1(self, x1):\n",
" # TODO\n",
" ...\n",
"\n",
" return UnivariateGaussian(mean, std)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"target_bi_gaussian = BivariateGaussian(1, 1.5, 1, 0.7, -0.9)\n",
"\n",
"plot_samples(target_bi_gaussian.sample(100))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Ancestral sampling\n",
"\n",
"The method consist of sampling from $p(x_1, x_2)$ as follows:\n",
"\n",
"- first sample $x_1$: $x_1 \\sim p(x_1)$\n",
"- then sample $x_2$: $x_2 \\sim p(x_2 | x_1)$\n",
"\n",
"Of course, one can first sample $x_2$ and then $x_1$."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"n_samples = 100\n",
"\n",
"# we fix the shape the we can easily concatenate\n",
"# the two array before calling the plotting function.\n",
"# In the code, use inplace operations to fill these values!\n",
"x = np.empty((n_samples, 1))\n",
"y = np.empty((n_samples, 1))\n",
"\n",
"x[:, 0] = target_bi_gaussian.marginal_x1().sample(n_samples)\n",
"for i in range(n_samples):\n",
" y[i] = target_bi_gaussian.x2_cond_x1(x[i]).sample(1)\n",
"\n",
"samples = np.concatenate([x, y], 1)\n",
"\n",
"# blue points: points sampled via numpy function\n",
"# red points: points sampled via ancestral sampling\n",
"plot_samples(target_bi_gaussian.sample(100), samples)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Gibbs sampling"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def gibbs_sampling(bivariate_gaussian, n_samples, n_burnin=0, n_skip=0):\n",
" # note that we could initialize randomly instead of 0\n",
" point = np.zeros(2)\n",
" \n",
" # array of samples that we will return\n",
" samples = np.empty((n_samples, 2))\n",
" \n",
" # number of generated samples\n",
" # we keep track of it so its easier to implement\n",
" # burn-in and sample skip.\n",
" n_filled = 0 \n",
" \n",
" # just a \"while true\" loop\n",
" for i in itertools.count(start=0):\n",
" # in the loop, we must update the variable point following\n",
" # the gibbs sampling rule!\n",
" # TODO...\n",
" ...\n",
" \n",
" # check if we keep this sample or not\n",
" if i >= n_burnin and (i - n_burnin) % (n_skip + 1) == 0:\n",
" samples[n_filled] = point\n",
" n_filled += 1\n",
" \n",
" # break the loop if we sampled the required number of points\n",
" if n_filled == n_samples:\n",
" break\n",
"\n",
" return samples"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"n_samples = 100\n",
"n_burnin = 100\n",
"n_skip = 100\n",
"\n",
"samples = gibbs_sampling(target_bi_gaussian, n_samples, n_burnin, n_skip)\n",
"\n",
"plot_samples(target_bi_gaussian.sample(n_samples), samples)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Metropolis sampling\n",
"\n",
"In the Metropolis sampling algorithm, it is useful to check the acceptance rate.\n",
"A bad acceptance rate is an indication that we are take \"too big steps\" in the sampling procedure. If you have a very small acceptance rate (e.g. < 0.3), many of the samples the function return could be equal. The step magnitude depends on the noise_std argument, which is the standard deviant of the Gaussian you use to propose a new point (which can then be either accepted or rejected)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def metropolis_sampling(bivariate_gaussian, n_samples, noise_std = 1, n_burnin=0, n_skip=0):\n",
" point = np.zeros(2)\n",
" samples = np.empty((n_samples, 2))\n",
" \n",
" n_filled = 0\n",
" accept = 0\n",
" reject = 0\n",
" for i in itertools.count(start=0):\n",
" \n",
" # TODO:\n",
" # remember that you must first sample a new point,\n",
" # and then accept or reject it.\n",
" # You need to use both:\n",
" # - np.random.normal\n",
" # - and random.uniform\n",
" # Note that here you don't use the unnormalized probability,\n",
" # just use the PDF!\n",
" #\n",
" # and don't forget to update variables accept and reject\n",
" # to keep track of how many points have been accepted or rejected\n",
" ...\n",
" \n",
" if i >= n_burnin and (i - n_burnin) % (n_skip + 1) == 0:\n",
" samples[n_filled] = point\n",
" n_filled += 1\n",
" if n_filled == n_samples:\n",
" break\n",
"\n",
" print(\"Acceptance rate: \", accept / (accept + reject))\n",
" return samples"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"n_samples = 100\n",
"n_burnin = 100\n",
"n_skip = 100\n",
"noise_std = 1\n",
"\n",
"samples = metropolis_sampling(target_bi_gaussian, n_samples, noise_std, n_burnin, n_skip)\n",
"\n",
"plot_samples(target_bi_gaussian.sample(n_samples), samples)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}