Tomek Korbak bio photo

Tomek Korbak

PhD student,
University of Sussex

Email Twitter Scholar LinkedIn GitHub

The goal of this blogpost is to present a concise implementation of the Gaussian Mixture Model (GMM) using einsum notation. Along the way, I will also describe the expectation-maximization (EM) algorithm.

Einsum

Einstein notation is a particular convention of writing tensor operations that is implemented in NumPy, PyTorch and other deep learning frameworks. Its essence lies in indicating summations implicitly: the summation is over indices that are skipped or occur repeatedly. For instance, a batched bilinear transformation \(d = aBc\), for \(a \in \mathbb{R}^{B \times I}, B \in \mathbb{R}^{B \times I \times J}, c \in \mathbb{E}^{B \times J}\), can be wrriten as \(d_b = a_i B_{ij} c_j\) or np.einsum('bi,bij,bj->b', a, B, c). As I will show below, it is particularly useful when dealing with mixtures or multivariate Gaussians, which are effectively 3-dimensional tensors.

Tim Rocktäschel has a nice introductory post on einsum in the context of deep learning.

Gaussian mixtures

Let’s assume we have a dataset \(X\) of \(N\) data points in an \(D\)-dimensional space. Here \(D=2\) and \(N=100\).

def create_dataset():
    x1 = np.random.normal(size=(100, 2)) + np.array([-1, -1])
    x2 = np.random.normal(size=(100, 2)) + np.array([1, -1])
    x3 = np.random.normal(size=(100, 2)) + np.array([0, 1])
    return np.vstack((x1, x2, x3))
X = create_dataset()
plt.scatter(X[:, 0], X[:, 1], cmap='gist_rainbow', s=50)
sns.despine()
The dataset $$X \in \mathbb{R}^D$$ for our motivating example
The dataset \(X \in \mathbb{R}^D\) for our motivating example

We would like to discover clusters in the data and assign each point \(x_n \in X\) with a categorical probability distribution over \(K\) clusters. To that end, we will try to model the dataset as a mixture of \(K\) Gaussians.

Gaussian mixtures can be posed as latent variable models of the following form:

\[p_\theta(X,Z) = \prod_{n=1}^N \prod_{k=1}^K \Big[\pi_k \mathcal{N}(x_n|\mu_k, \Sigma_k) \Big]^{z_{nk}},\]

where the first product is over data points and the second over mixture components. \(z_{nk}\) is a latent variable determining whether \(n\)-th data point belongs to \(k\)-th mixture components and the parameters are \(\theta = \{\pi_k, \mu_k, \Sigma_k\}_k\).

Will will implement our GMM as a class GaussianMixture with a scikit-learn-like API:

class GaussianMixture:

    def __init__(self, num_components, input_size, max_iter=10):
        self.num_components = num_components
        self.input_size = input_size
        self.max_iter = max_iter

        # Parameters
        self.pi = np.ones((self.num_components))/self.num_components
        self.mu = np.random.normal(size=(self.num_components, self.input_size))
        self.sigma = np.tile(np.eye(self.input_size), (self.num_components, 1, 1))

When fitting the model, we are interested in maximising the logarithm of \(p_\theta(X,Z)\):

\[\log p(X,Z) = \sum_{n=1}^N \sum_{k=1}^K z_{nk} \Big[\log \pi_k + \log \mathcal{N}(x_n|\mu_k, \Sigma_k) \Big].\]

To make this problem, we instead maximise the following lower bound

\[\mathcal{L}(\theta, q) = \sum_{n=1}^N \sum_{k=1}^K q(z|x_n) \Big[\log \pi_k + \log \mathcal{N}(x_n|\mu_k, \Sigma_k) - \log q(z|x)\Big]\]

where \(q(z \vert x)\) is a variational distribution introduced for convenience: a categorical over \(K\) components. \(\mathcal{L}\) is is known as the ELBo or evidence lower bound as it is always the case that \(\mathcal{L} \leq p_\theta(X)\).

The EM algorithm

EM is an algorithm for maximum likelihood (or maximum a posteriori) estimation of parameters for latent variable models. The algorithm is iterative: we alternate between minimising \(\mathcal{L}(\theta, q)\) with respect to \(\theta\) and updating \(q\) to make the bound tighter, each time fixing one and updating the other. We to that until \(\mathcal{L}\) stops increasing.

def fit(self, X):
    prev_elbo = -np.inf
    for i in range(self.max_iter):
        q = self._expect(X)
        self.pi, self.mu, self.cov = self._maximize(X, q)
        elbo = self.compute_elbo(X, q)
        if np.allclose(elbo, prev_elbo):
            break
        prev_elbo = elbo

E-step

In the E-step, we infer the \(q(z \vert x)\) that inverts \(p_\theta(x \vert z)\) for current \(\theta\) as determined by Bayes’ theorem. For a particular data point \(n\) and a component \(k\), we have:

\[q(z_k|x_n) = \frac{p(z_k)p(x_n|z_k)}{p(x_n)} = \frac{\pi_k \mathcal{N}(x_n|\mu_k, \Sigma_k)}{\sum_{k'=1}^K \pi_{k'} \mathcal{N}(x_n|\mu_{k'}, \Sigma_{k'})}\]
def _expect(self, X):
    n = X.shape[0]
    c = self.num_components
    d = self.input_size
    pi = self.pi.reshape((1, c))
    X = X.reshape((n, 1, d))
    mu = self.mu.reshape((1, c, d))
    inv_sigma = np.linalg.inv(self.sigma)
    det_sigma = np.linalg.det(self.sigma)
    distance =  np.einsum('ncd,cde,nce->nc', X - mu, inv_sigma, X - mu)
    Z = np.sqrt(det_sigma) * (2 * np.pi) ** (d/2)
    p_x_given_t = np.einsum('nc,c->nc', np.exp(-distance/2), 1/Z)
    q_unnormalized = p_x_given_t * pi
    return q_unnormalized/q_unnormalized.sum(axis=1, keepdims=True)

M-step

In the M-step, we maximise \(\mathcal{L}\) with respect to \(\theta\) given \(q(z \vert x)\) from the E-step. There are closed-form solutions for each of the parameters:

\[\pi_k = \frac{1}{N}\sum_{n=1}^N z_{nk},\] \[\mu_k = \frac{1}{N_k} \sum_{n=1}^N q(z_k|x_n)x_n,\] \[\Sigma_k = \frac{1}{N_k} \sum_{n=1}^N q(z_k|x_n) (x_n-\mu_k) (x_n-\mu_k)^{\text T},\]

where \(N_k = \sum_{n=1}^N q(z_k \vert x_n)\) can be interpreted as a pseudo-count of data points assigned to component \(k\).

def _maximize(self, X, q):
    n = X.shape[0]
    c = self.num_components
    d = self.input_size
    nk = q.sum(axis=0).reshape((c, 1))
    mu = np.einsum('nd,nc->cd', X, q)/nk
    error = X.reshape((n, 1, d)) - mu.reshape((1, c, d))
    sigma = np.einsum('ncd,nce,nc->cde', error, error, q)/nk.reshape(c, 1, 1)
    pi = (nk/n).reshape((c,))
    return pi, mu, sigma

Computing the ELBo

Finally, a we also need to compute the ELBo which we use as a stopping criterion (although in principle it could also be used as a validation metric or a debugging tool). Recall that the ELBo is just

\[\mathcal{L}(\theta, q) = \sum_{n=1}^N \sum_{k=1}^K q(z|x_n) \Big[\log \pi_k + \log \mathcal{N}(x_n|\mu_k, \Sigma_k) - \log q(z|x)\Big].\]
def compute_elbo(self, X, q):
    n = X.shape[0]
    c = self.num_components
    d = self.input_size
    q += 10e-20
    pi = self.pi.reshape((1, c))
    X = X.reshape((n, 1, d))
    mu = self.mu.reshape((1, c, d))
    inv_sigma = np.linalg.inv(self.sigma)
    logdet_sigma = np.linalg.slogdet(self.sigma)[1].reshape(1, c)
    distance = np.einsum('ncd,cde,nce->nc', X - mu, inv_sigma, X - mu)
    constants = -d/2*np.log(2*np.pi)
    log_p_x_given_t = constants - logdet_sigma/2 - distance.reshape((n, c))/2
    log_pi = np.log(pi).reshape((1, c))
    log_q = np.log(q)
    return ((log_pi + log_p_x_given_t - log_q) * q).sum()

Using the model

Since our latent \(q(z_k \vert z_n)\) is interpretable as cluster assignments, clustering the data boils down to running the E-step with a trained model:

def predict(self, X):
    return self._expect(X)

Let us illustrate the obtained components by colour-coding soft cluster assignments:

gmm = GaussianMixture(num_components=3, input_size=2)
gmm.fit(X)
y = gmm.predict(X)
plt.scatter(X[:, 0], X[:, 1], c=y, cmap='gist_rainbow', s=50)
sns.despine()
The dataset $$X \in \mathbb{R}^d$$ for out motivating example
The dataset \(X\) with each \(x_n\) displayed in a colour obtained from interpreting the vector \(q(z \vert x_n)\) as an RGB triplet: \(z_{n1}\) codes for the value of the red component, \(z_{n2}\) for green and \(z_{n3}\) for blue

EM and the wake-sleep algorithm

There is a non-trivial connection between the EM algorithm and the wake-sleep algorithm I described earlier. The wake step corresponds to the M-step: minimizing the ELBo w.r.t. \(p_\theta\). The sleep step corresponds to the E-step with an important quirk: the input is not an \(x\) sampled from the dataset \(X\), but a synthetic example sampled from \(p(x \vert z)\), where the \(z\) is in turn sampled from a prior \(p(z)\). Moreover, the \(q(z \vert x)\) is an amortised model: it is parametrised by some weights that are should minimise the ELBo across data points.

A notebook accompanying this blog post is available on GitHub. It also contains some tests of my einsum implementation against a simpler one with an explicit for loop over \(K\) components and calls to scipy.stats.multivariate_normal.