0

I am implementing the forward algorithm for Hidden Markov Models (see below for the algorithm). To prevent over/underflow, I work with log-probabilities instead, and use the log-sum-exp trick to compute each forward coefficient.

I plotted the computed forward coefficient and compared it with the states I used to simulate my data. As shown in the picture below, the general shape looks to be correct because the forward coefficient spikes at the same places as the states. The problem is that forward coefficients are probabilities, so their logs should never exceed 0, however in the images below, I see that there is a gradual drift and the log coefficients clearly exceed zero, which I suspect is due to accumulated numerical errors. (Note, in my notation g_j(z_j) denotes the log of the forward coefficient at time j, for state z_j=1 or 2). enter image description here

I have already used the log-sum-exp trick, so I am wondering what else I can do to fix this issue? (Prevent the log probabilities from exceeding 0, and remove this gradual upwards drift).

The relevant part of my code is given below:

    def log_sum_exp(self, sequence):
        '''
        Returns np.log( np.sum(sequence) ) without under/overflow.
        '''
        sequence = np.array(sequence)
        if np.abs(np.min(sequence)) > np.abs(np.max(sequence)):
            b = np.min(sequence)
        else:
            b = np.max(sequence)

        return b + np.log(np.sum(np.exp(sequence-b)))

    def g_j_z(self, j, z_j):
        '''
        Returns g_j(z_j).
        j: (int) time index. zero-indexed 0, 1, 2, ... n-1
        z_j: (int) state index. zero-indexed. 0, 1, 2, ... K-1
        '''
        
        if j == 0:
            return np.log(self.p_init[z_j]) + self.log_distributions[z_j](self.pre_x+[self.x[0]], self.pre_exog+[self.exog[0]])

        if (j, z_j) in self.g_cache:
            return self.g_cache[(j, z_j)]

        temp = []
        for state in range(self.K):
            temp.append(
                self.g_j_z(j-1, state) + np.log(self.p_transition[state][z_j]) 
            )

        self.g_cache[(j, z_j)] = self.log_sum_exp(temp) + self.log_distributions[z_j](self.pre_x+self.x[0:j+1], self.pre_exog+self.exog[0:j+1])  

        return self.g_cache[(j, z_j)]

Explanation of the variables:

self.g_cache is a dictionary that maps the tuple (j, z_j) (the time and state) to the log coefficient g_j(z_j). This is used to avoid repeated computation.

self.p_init is a list. self.p_init[i]contains the initial probability to be in state i.

self.p_transition is a matrix. self.p_transition[i][j] contains the probability to transition from state i to state j.

self.log_distributions is a list of functions. self.log_distributions[i] is the log probability distribution for state i, which is a function that takes the history of observations and exogenous variables as input, and returns the log-probability for the latest observation. For example, for an AR-1 process, the log distribution is implemented as follows

def log_pdf1(x, exog, params=regime1_params):
    '''
    x: list of all history of x up to current point
    exog: list of all history of exogenous variable up to current point
    '''
    # AR1 process with exogenous mean
    alpha, dt, sigma = params[0], params[1], params[2]
    mu = x[-2] + alpha*(exog[-1] - x[-2])*dt
    std = sigma*np.sqrt(dt)
    return norm.logpdf(x[-1], loc=mu, scale=std)

The algorithm I am implementing is given here:

enter image description here

However, I am instead computing log of the coefficients using log-sum-exp trick to avoid over/underflow:

enter image description here

Thank you very much for the help!

1 Answer 1

1

Your current implementation of the log_sum_exp function not robust enough for all edge cases. You can try the updated log_sum_exp function

def log_sum_exp(self, sequence):
    '''
    Returns np.log( np.sum(sequence) ) without under/overflow.
    '''
    sequence = np.array(sequence)
    max_val = np.max(sequence)

    # return early if the maximum is negative infinity
    if np.isneginf(max_val):
        return -np.inf

    # calculate log_sum_exp
    return max_val + np.log(np.sum(np.exp(sequence - max_val)))

To prevent the drift you can normalize your forward coefficients at each time step. After computing g_j(z_j) for all states at time j you can normalize them such that they sum to 1.

def g_j_z(self, j, z_j):
    '''
    Returns g_j(z_j).
    j: (int) time index. zero-indexed 0, 1, 2, ... n-1
    z_j: (int) state index. zero-indexed. 0, 1, 2, ... K-1
    '''
    
    if j == 0:
        initial_prob = np.log(self.p_init[z_j]) + self.log_distributions[z_j](self.pre_x + [self.x[0]], self.pre_exog + [self.exog[0]])
        # normalize at the initial step
        self.C[0] = -initial_prob
        return initial_prob + self.C[0]

    if (j, z_j) in self.g_cache:
        return self.g_cache[(j, z_j)]

    temp = []
    for state in range(self.K):
        temp.append(
            self.g_j_z(j-1, state) + np.log(self.p_transition[state][z_j])
        )

    # use the log_sum_exp function
    g_j_value = self.log_sum_exp(temp) + self.log_distributions[z_j](self.pre_x + self.x[0:j+1], self.pre_exog + self.exog[0:j+1])
    
    # normalize to prevent numerical drift
    if j not in self.C:
        self.C[j] = -g_j_value
    g_j_value += self.C[j]

    self.g_cache[(j, z_j)] = g_j_value
    return g_j_value

We need a function to calculate the final probability

def compute_final_probability(self):
    # sum over the last time step's forward coefficients
    final_probs = [self.g_j_z(len(self.x) - 1, state) for state in range(self.K)]
    final_log_prob = self.log_sum_exp(final_probs)
    
    # adjust by the cumulative scaling factors
    cumulative_scaling = sum(self.C.values())
    
    return final_log_prob - cumulative_scaling
Sign up to request clarification or add additional context in comments.

4 Comments

By performing the normalization, would this not impact the value and interpretation of the final step of the forward algorithm? i.e. in the final step we are required to sum the probabilities over all states of the final time step.
Yes, you are right. When perform normalization at each time step to prevent numerical instability, it does affect the scale of the computed forward probabilities. To resolve this issue is to keep track of the scaling factors used during normalization and incorporate them back when computing the final result. Check the updated code to make it workable.
From my understanding, you are just subtracting all the log coefficients by a single log coefficient, whichever one happens to be computed first. Why does a simple readjustment in the end as you do in final_log_prob - cumulative_scaling work? The normalization is applied on each coefficient, and the modified coefficient is then used to compute the next coefficient...and so on recursively, so I am struggling to understand why just subtracting by the normalizations in the final step works.
Normalization helps prevent numerical instability but does not change the relative importance of each state. Then the cumulative adjustment at the end works because each scaling factor is applied uniformly across all states at each time step.And finally, the log_sum_exp ensures that even though we normalize, the relative contributions of different states are preserved.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.