Séance 3 : Simuler l'ouverture de canaux ioniques grâce aux chaînes de Markov

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats
%matplotlib inline

Modélisation

Nous allons utiliser une Chaîne de Markov pour modéliser l'ouverture d'un canal ionique :

  • avec comme espace d'état (de cardinal fini) l'ensemble {Fermé, Ouvert} ou $\{0,1\}$.
  • avec comme condition initiale $X_0$, qui peut être une variable aléatoire (Sa loi est appelée mesure initiale de la chaîne) ou une valeur déterministe (par exemple Ouvert, ce qui revient à avoir comme mesure initiale $P(X_0=1)=1$).
  • avec les probabilités de transitions: $a_{ij} = \mathbb P (X_{n+1} = j \mid X_n=i)$

On note la matrice de transition suivante: $$ A = \begin{pmatrix} 1-p & p \\ q & 1-q \\ \end{pmatrix} $$

Exercice 1 :

1) Quelles sont les propriétés d'une matrice de transition si $p \notin \{0,1\} $ and $q \notin \{0,1\}$ ?

2) Écrire une fonction qui prend une matrice comme paramètre et lève une erreur si ce n'est pas une matrice stochastique. (Indice: Utiliser assert, numpy.testing.assert_allclose).

Correction Exercice 1 - Question 1 :

Si $p \notin \{0,1\} $ and $q \notin \{0,1\}$, Cette chaîne de markov est irréductible car elle comporte une seule classe d'état qui communiquent. Cette classe est récurrente. Comme la chaîne a un espace d'état fini, cette récurrence est nécessairement positive.

In [8]:
# Correction Exercice 1 - Question 2 :

def assert_transition_matrix(P):
    """Lève une erreur si la matrice n'est pas stochastique"""
    assert P.shape[0] == P.shape[1], "La matrice n'est pas carrée"
    np.testing.assert_allclose(np.sum(P,1), [1]*P.shape[0], err_msg='Les lignes ne somment pas à 1.')

P = np.array([[0.9,.1],[.2,.8]])
assert_transition_matrix(P)

Partie I : Simulation d'une chaîne de Markov

A) Chaînes à 2 états :

In [3]:
## Réaliser des tirages de nombres aléatoires à l'aide d'un ordinateur. 

# Fixe une graine
np.random.seed(111)

# Génère un nombre pseudo aléatoire
assert np.random.random() - 0.6121701756176187 < 1e-10

Exercice 2 :

1) Écrire une fonction mc_trajectoire(mu, P, Tmax) qui échantillonne, du temps $0$ à $Tmax$, la chaîne de Markov ($mu$, $P$) où $P$ est une matrice de transition 2x2 et $mu$ la mesure initiale. (Indice : Utiliser np.random.random).

2) Représenter la trajectoire de la chaîne (fonction trace_mc_trajectoire) et appliquer aux chaînes suivantes :

P = np.array([[0.9,.1],
              [0.4,0.6]])

P3 = np.array([[0,1],
               [1,0]])

mu = np.array([.3,.7])
mu3 = np.array([.1,.9])
In [14]:
# Correction Exercice 2

def mc_trajectoire(mu, P, Tmax):
    """Retourne la trajectoire de la chaîne mu,P sur 1...Tmax"""
    trajectoire = np.zeros(Tmax, dtype=int)
    
    # Condition initiales: 
    x0 = int(np.random.random() < mu[1])
    trajectoire[0]=x0
    
    # Chaque pas de temps
    for t in range(Tmax-1):
        x = trajectoire[t]
        rd = np.random.random()
        if x == 0: 
            if rd < P[0,1]:
                new_x = 1 
            else:
                new_x = 0 
        else: 
            new_x = int(rd < P[1,1])
        trajectoire[t+1] = new_x
    
    return trajectoire


def trace_mc_trajectoire(traj, noms_etats=None, ax=None):
    """Trace la trajectoire d'une chaîne de markov.
    
    Args:
        traj (np.array): traj[i] est l'état de la chaîne à l'instant i.
        noms_etats (iterable): nom associé à chaque état
        ax (matplotlib.axis): axis on which the trajectory is drawn
    """
    if ax is None:
        fig, ax = plt.subplots(figsize=(15,2))
    ticks = np.arange(traj.max()+1)
    if noms_etats is None:
        noms_etats = ticks
    ax.plot(np.arange(len(traj)), traj,color='k')
    ax.set(xlabel='Temps', ylabel='État', yticks=ticks, yticklabels=noms_etats)
    ax.scatter(np.arange(len(traj)), traj)
In [20]:
P = np.array([[0.9,.1],
              [0.4,0.6]])
P3 = np.array([[0,1],
               [1,0]])

mu = np.array([.3,.7])
mu3 = np.array([.1,.9])


trace_mc_trajectoire(mc_trajectoire(mu, P, Tmax=30))
plt.savefig('markov_canaux_1.pdf')

trace_mc_trajectoire(mc_trajectoire(mu3, P3, Tmax=30))
plt.savefig('markov_canaux_2.pdf')

B) Rajout d'un troisième état :

svg image

Exercice 3 :

1) Écrire une fonction sample_cdf qui permet d'échantillonner une loi de probabilité discrète arbitraire. (Indice : Utiliser np.random, np.cumsum, np.sum).

2) Ré-écrire la fonction mc_trajectoire pour prendre en compte des matrices stochastique de taille arbitraire. Utiliser votre fonction sample_cdf ou np.random.choice.

L'appliquer pour la chaîne suivante :

P2 = np.array([[0.8,.1,.1],
              [0.4,0.3,0.3],
              [0.2,0.6,0.2]])

mu2 = np.array([.3,.7,0])
In [48]:
### Correction Exercice 3 - Question 1 

def sample_cdf(p):
    ''' échantillonne une distribution discrète. 
    Args:
        p (np.array de longueur 4): p[i] probabilités de X=i (pas necessairement normalisées pour sommer à 1)
    Return:
        i \in 0...3. '''
    
    # Normaliser si necessaire. 
    if p.sum() != 1:
        p /= p.sum()
    
    # Tirage. 
    rdm = np.random.random()
    i=0
    for csum in np.cumsum(p):
        if rdm < csum:
           return i
        i=i+1
In [49]:
### Correction Exercice 3 - Question 2

def mc_trajectoire(mu, P, Tmax):
    '''Simule une chaîne de markov discrète de matrice de transition P (de dimension finie)
       et de mesure initiale mu sur Tmax pas de temps'''
    
    # Tester si la matrice de transition est valide. 
    assert_transition_matrix(P) 
    assert mu.sum() - 1 < 1e-6, 'Mesure initiale invalide'
    
    # Initialisation
    traj = np.zeros(Tmax, dtype=int)
    choices = np.arange(P.shape[0], dtype=int)
    traj[0] = np.random.choice(choices, 1, p=mu)

    # Simulation 
    for i in range(Tmax-1):
        traj[i+1] = np.random.choice(choices,1,p=P[traj[i],:])
    return traj

def trace_mc_trajectoire(traj, noms_etats=None, ax=None):
    """Trace la trajectoire d'une chaîne de markov.
    
    Args:
        traj (np.array): traj[i] est l'état de la chaîne à l'instant i.
        noms_etats (iterable): nom associé à chaque état
        ax (matplotlib.axis): axis on which the trajectory is drawn
    """
    if ax is None:
        fig, ax = plt.subplots(figsize=(15,2))
    ticks = np.arange(traj.max()+1)
    if noms_etats is None:
        noms_etats = ticks
    ax.plot(np.arange(len(traj)), traj,color='k')
    ax.set(xlabel='Temps', ylabel='État', yticks=ticks, yticklabels=noms_etats)
    ax.scatter(np.arange(len(traj)), traj)
In [46]:
P = np.array([[0.9,.1],
              [0.4,0.6]])
mu = np.array([.3,.7])

traj = mc_trajectoire(mu, P, 100)
trace_mc_trajectoire(traj, ['Fermé','Ouvert'])
In [47]:
P2 = np.array([[0.8,.1,.1],
              [0.4,0.3,0.3],
              [0.2,0.6,0.2]])
mu2 = np.array([.3,.7,0])

traj = mc_trajectoire(mu2, P2, 100)
trace_mc_trajectoire(traj, ['Fermé','Ouvert','Inactivé'])

plt.savefig('markov_canaux_3.pdf')
In [50]:
P3 = np.array([[0,1],
               [1,0]])
mu3 = np.array([.1,.9])

traj = mc_trajectoire(mu3, P3, 100)
trace_mc_trajectoire(traj, ['Fermé','Ouvert','Inactivé'])

Partie II : Mesure invariante et théorème ergodique :

Exercice 4 :

Écrire une fonction qui renvoie la mesure invariante d'une matrice stochastique $P$. (Indice : Utiliser np.linalg.eig et np.argmax).

In [55]:
def trouver_mesure_invariante(P):
    """Retourne la mesure invariante d'une matrice stochastique P"""
       
    # Tester si la matrice est stochastique. 
    assert_transition_matrix(P) 
    
    # Trouver le vecteur propre à gauche 
    # /!\ linalg.eig renvoie les vecteurs propres à droite,
    # il faut transposer la matrice de départ pour avoir
    # ceux à gauche.
    valp, vecp = np.linalg.eig(P.transpose())
    
    # Trouver la valeur propre dominante
    # On sait qu'elle existe graçe au théorème de Perron-Frobenius. 
    pos_valp_dom = np.argmax(valp)
    # Trouver le vecteur propre dominant 
    vecp_dom = vecp[:, pos_valp_dom]
        
    # Vérifier que la valeur propre dominante est 1. 
    assert abs(np.max(valp) - 1) < 1e-10
    
    # Remettre le vecteur propre dominant à l'échelle pour que la somme 
    # de ses éléments soit 1. 
    vecp_dom = vecp_dom/np.sum(vecp_dom)
    
    return vecp_dom
In [56]:
mesure_inv = trouver_mesure_invariante(P2)
print(mesure_inv)
np.testing.assert_allclose(mesure_inv, mesure_inv @ P2)
[0.61290323 0.22580645 0.16129032]

A) Proportion du temps passé en un état donné :

Exercice 5 :

1) Écrire une fonction trace_proportion_temps_passe qui affiche la proportion du temps passé dans chaque état pour une trajectoire. L'appliquer sur la matrice $P2$. Tester avec 100 trajectoires de 50 pas de temps, puis 1000 trajectoires de 500 pas de temps. Vérifier la convergence vers la mesure invariante. (Indice : Utiliser plt.hist et plt.hlines).

2) Représenter pour une chaîne la convergence vers la mesure invariante au cours du temps.

In [62]:
# Correction Exercice 5 - Question 1

# Faire de nombreuses réalisations

R = 100 #Nombre de trajectoires
T = 50 #Longueur d'une trajectoire

R = 10000 #Nombre de trajectoires
T = 500 #Longueur d'une trajectoire

state_space = np.arange(P2.shape[0])
names = ('Fermé', 'Ouvert', 'Inactivé')

# Simuler une mesure initiale
mu = np.random.random(P2.shape[0])
mu /= mu.sum()
print(mu)

trajs =  np.array([mc_trajectoire(mu,P2,T) for _ in range(R)])
print(trajs.shape)

mesure_inv = trouver_mesure_invariante(P2)
print(mesure_inv)
[0.20234595 0.42975998 0.36789407]
(10000, 500)
[0.61290323 0.22580645 0.16129032]
In [63]:
def trace_proportion_temps_passe(espace_detat, trajs, mesure_inv=None, noms_etats=None, ax=None):
    if ax is None:
        fig, ax = plt.subplots(figsize=(7,5))
    ticks = espace_detat
    if noms_etats is None:
        noms_etats = espace_detat
    plt.hist(trajs.flat, density=True, bins=[-0.5,0.5,1.5,2.5],          label = r'$\frac{1}{T}\sum^T 1_{(X_k = i)}$')

    if mesure_inv is not None:
        ax.hlines(mesure_inv,
                  -0.5,len(state_space)-0.5,
                  label='Mesure invariante')
    ax.set(xlabel='État',
           ylabel='Proportion du temps passé',
           title=r'$\frac{1}{T}\sum^T 1_{(X_k = i)}$',
           xticks=state_space,
           xticklabels=noms_etats)
    plt.legend()
In [64]:
proportions = np.array([  np.sum(trajs==i) for i in state_space])  /trajs.size 
trace_proportion_temps_passe(state_space,
                             trajs,
                             mesure_inv=mesure_inv, 
                             noms_etats=names)

plt.savefig('markov_mesure.pdf')
In [75]:
# Correction Exercice 5 - Question 2
print(trajs.shape)

def trace_convergence_temps_passe(trajs, noms,  mesure_inv, mu,ax=None):
    proportions = np.array([np.array([(trajs[0,:t]==i).sum()/(trajs[0,:t]).size for i in state_space])
                            for t in range(1,trajs.shape[1])])
    if ax is None:
        fig, ax = plt.subplots(figsize=(16,4))
    for i,name in enumerate(names):
        plt.plot(proportions[:,i],label=name)
    ax.hlines(mesure_inv,
              0,trajs.shape[1],
              linestyle=':',
              color=('C0','C1','C2'),
              label='Mesure invariante')
    ax.hlines(mu,
              0,trajs.shape[1],
              linestyle='-.',
              linewidth=1,
              color=('C0','C1','C2'),
              label='Mesure initiale')
    ax.set(xlabel='Temps',
           ylabel='Proportion du temps passé',
           ylim=(0,1),
           xlim=(0,trajs.shape[1]),
           title=r'Convergence de $\frac{1}{T}\sum^T 1_{(X_k = i)}$')
    ax.legend()
    
trace_convergence_temps_passe(trajs,names, mesure_inv, mu)

plt.savefig('markov_temps_passe.pdf')
(10000, 500)

B) Probabilité d'être dans un état donné à un temps donné :

Exercice 6 :

1) Écrire une fonction qui estime numériquement $P(X_t)$ à partir de plusieurs trajectoires de $P2$.

2) Que se passe-t-il pour $P3$ ? Comment l'expliquer ?

In [80]:
# Correction Exercice 6 - Question 1

def trace_convergence_loi(trajs, noms, mesure_inv, mu, ax=None):
    proportions = np.array([(trajs==n).sum(0)/trajs.shape[0] for n,name in enumerate(noms)])

    if ax is None:
            fig, ax = plt.subplots(figsize=(16,4))
    for i,name in enumerate(names):
        plt.plot(proportions[i,:],label=name)
    ax.hlines(mesure_inv,
              0,trajs.shape[1],
              linestyle=':',
              color=('C0','C1','C2'),
              label='Mesure invariante')
    ax.hlines(mu,
              0,trajs.shape[1],
              linestyle='-.',
              linewidth=1,
              color=('C0','C1','C2'),
              label='Mesure initiale')
    ax.set(xlabel='Temps',
           ylabel='Probabilité',
           ylim=(0,1),
           xlim=(0,trajs.shape[1]),
           title=r'Convergence de $P(X_n = i)$')
    ax.legend()
trace_convergence_loi(trajs[:100,:],names, mesure_inv, mu)

plt.savefig('markov_proba_1.pdf')
In [81]:
# Correction Exercice 6 - Question 2

# Faire de nombreuses réalisations,
R = 1000 # Nombre de trajectoires
T = 100 #Longueur d'une trajectoire

state_space = np.arange(P3.shape[0])
names = ('Fermé', 'Ouvert')

trajs =  np.array([mc_trajectoire(mu3,P3,T) for _ in range(R)])

mesure_inv3 = trouver_mesure_invariante(P3)

trace_convergence_temps_passe(trajs,names,mesure_inv3,mu3)
plt.show()
trace_convergence_loi(trajs,names,mesure_inv3,mu3)
plt.show()

III) Inférence :

Exercice 7 :

Les fichiers ion_channel_1 et ion_channel_2 contiennent des séries temporelles de mesure de patch clamp qui donnent l'état ouvert ou fermé d'un canal au cours du temps.

Écrire une fonction estimate_P(trajectoire) qui estime une matrice de transition de taille 2x2 à partir de la trajectoire. (Indice : Utiliser np.fromfile et zip).

In [91]:
def estimate_P(traj):
    """Estime la matrice de transition de la chaine 2x2 à partir d'une réalisation de la trajectoire.
    Args:
        traj (np.array): réalisation de la trajectoire.  
    Returns:
        (np.array) Une matrice stochastique 2x2 .
    """
    
    # On vérifie que les données sont conformes. 
    assert all(np.logical_or(traj==0, traj==1))
    
    # Créer une matrice vide. 
    P = np.zeros((2,2))
    
    # Compter le nombre de transition entre chaque paire d'état

    for pn,pnm1 in zip(traj[1:], traj[:-1]):
        P[pnm1,pn] += 1
    
    # Normaliser pour que la matrice soit stochastique. 
    P /= np.hstack([P.sum(1)[:,np.newaxis]]*2)
    
    return P
In [83]:
ion_channel_1 = np.fromfile('ion_channel_1', sep=',', dtype=int)
ion_channel_2 = np.fromfile('ion_channel_2', sep=',', dtype=int)
In [84]:
fig, ax = plt.subplots(2,1,figsize=(20,3))
trace_mc_trajectoire(ion_channel_1[:100], ['Fermé','Ouvert'], ax[0])
trace_mc_trajectoire(ion_channel_2[:100], ['Fermé','Ouvert'], ax[1])
plt.savefig('markov_channel_1.pdf')
In [92]:
estimate_P(ion_channel_1)
Out[92]:
array([[0.79979838, 0.20020162],
       [0.30676121, 0.69323879]])
In [93]:
estimate_P(ion_channel_2)
Out[93]:
array([[0.79834154, 0.20165846],
       [0.30936874, 0.69063126]])

Exercice 8 :

1) Écrire une fonction qui renvoie les liste des durées pendant lequel le canal reste ouvert (resp. fermé). Tracer les deux histogrames pour ion_channel_1 (ouvert/fermé) et ion_channel_2 (ouvert/fermé). (Indice : Utiliser plt.hist).

2) Quelle est la distribution théorique de ces temps d'ouverture/fermeture si le phénomène suit la distribution d'une chaîne de Markov de taille 2x2 ? La tracer par dessus les histogrammes. (Indice : Utiliser: scipy.stats.xxx.pmfxxx est le nom de la loi discrète parmi: https://docs.scipy.org/doc/scipy/reference/stats.html#discrete-distributions )

In [96]:
# Correction Exercice 8

def measure_residence_times(traj):
    """Estime les temps d'attente de la chaîne 2x2 à partir d'une réalisation de la trajectoire.
    """
    
    # Vérifie qu'il s'agit bien d'une trajectoire de chaîne à deux état. 
    assert all(np.logical_or(traj==0, traj==1)), 'Invalid trajectory'

    # Initialisation
    stretch = ([], [])
    current = 1

    # Pour chaque transition: 
    # si l'état a changé on enregistre la durée du séjour dans l'état précédent,
    # si non on ajoute 1 à la durée du séjour. 
    for pn,pnm1 in zip(traj[1:], traj[:-1]):
        if pn != pnm1:
            stretch[pnm1].append(current)
            current = 1
        else:
            current += 1
            
    stretch[traj[-1]].append(current)
    
    return stretch

assert measure_residence_times(np.array([0,0,0,1,1,1,0,0,1,0,0])) == ([3,2,2],[3,1])

def display_residence_times(rtimes):
    """Affiche le résultat de measure_residence_time"""
    fig, axes = plt.subplots(1,2, figsize=(12,4))
    for i,(val,ax) in enumerate(zip(rtimes,axes)):
        x = np.arange(np.max(val))
        ax.step(x+1, scipy.stats.geom.pmf(x, 1/np.mean(val)),label='Loi géométrique')
        h,b,_ = ax.hist(val, density=True, bins=x, label='Empirique')
        ax.set(xlabel='durée', ylabel='densité', title="Temps de séjour dans l'état {}".format(i))
        ax.legend()
In [97]:
display_residence_times(measure_residence_times(ion_channel_1))

plt.savefig('markov_inference_1.pdf')
In [39]:
display_residence_times(measure_residence_times(ion_channel_2))
plt.savefig('markov_inference_2.pdf')

Les deux jeux de données donnent la même matrice de transition mais le second n'est pas issu d'une chaîne de Markov à deux états ! La méthode pour étudier ce genre de sytème est de faire un modèle de mélange (statistique) pour voir si le temps de séjour n'est pas la composition de plusieurs loi géométriques, ce qui donne un indice sur le nombre d'états cachés qui existent. On peut ensuite paramétriser un modèle de Markov Caché.

In [27]:
# Code pour générer le jeu de données... 
P2 = np.array([[0.8,.1,.1],
              [0.9,0,0.1],
              [0.2,0,.8]])
def collapse(P):
    o_to_c = P[0,1] + P[0,2]
    pi = trouver_mesure_invariante(P)
    c_to_o = (pi[1] * P[1,0] + pi[2] * P[2,0]) / (pi[1]+pi[2])
    return np.array([[1-o_to_c, o_to_c],
                     [c_to_o, 1-c_to_o]])
P1 = collapse(P2)
print(P1)
ion_channel_2 = mc_trajectoire(mu2, P2, 100000)
ion_channel_1 = mc_trajectoire(trouver_mesure_invariante(P1), P1, 100000)
ion_channel_2 = np.int_(np.bool_(ion_channel_2))
ion_channel_1.tofile('ion_channel_1', sep=',')
ion_channel_2.tofile('ion_channel_2', sep=',')
[[0.8        0.2       ]
 [0.30769231 0.69230769]]
In [ ]: