作者: Victor Basu
建立日期 2022/03/10
上次修改日期 2024/12/17
描述: 實作卷積變分自編碼器 (VAE) 用於藥物發現。
在此範例中,我們使用變分自編碼器來生成用於藥物發現的分子。我們參考研究論文 使用分子的資料驅動連續表示進行自動化化學設計 和 MolGAN:小型分子圖的隱式生成模型。
論文 **使用分子的資料驅動連續表示進行自動化化學設計** 中描述的模型,透過有效探索化學化合物的開放式空間來生成新分子。該模型由三個元件組成:編碼器、解碼器和預測器。編碼器將分子的離散表示轉換為實數值的連續向量,而解碼器將這些連續向量轉換回離散分子表示。預測器從分子的潛在連續向量表示中估計化學性質。連續表示允許使用基於梯度的最佳化,以有效地引導搜尋最佳化的功能化合物。
圖 (a) - 用於分子設計的自編碼器的圖表,包括聯合性質預測模型。從離散分子表示(例如 SMILES 字串)開始,編碼器網路將每個分子轉換為潛在空間中的向量,這有效地是連續分子表示。給定潛在空間中的一個點,解碼器網路會產生相應的 SMILES 字串。多層感知器網路估計與每個分子相關的目標性質的值。
圖 (b) - 連續潛在空間中基於梯度的最佳化。在訓練替代模型 f(z)
以根據分子的潛在表示 z
來預測分子的性質之後,我們可以針對 z
最佳化 f(z)
,以尋找預期與特定所需性質相符的新潛在表示。然後,這些新的潛在表示可以解碼為 SMILES 字串,此時可以根據經驗測試它們的性質。
如需 MolGAN 的說明和實作,請參閱 Alexander Kensert 撰寫的 Keras 範例 使用 R-GCN 的 WGAN-GP 來產生小型分子圖。本範例中使用的許多函數來自上述 Keras 範例。
RDKit 是一個用於化學資訊學和機器學習的開源工具包。如果您進入藥物發現領域,這個工具包會很有用。在本範例中,RDKit 用於方便且有效率地將 SMILES 轉換為分子物件,然後從這些分子物件取得原子和鍵的集合。
引用自 使用 R-GCN 的 WGAN-GP 來產生小型分子圖)
「SMILES 以 ASCII 字串的形式表示給定分子的結構。SMILES 字串是一種緊湊的編碼,對於較小的分子而言,它相對易於人類閱讀。將分子編碼為字串既減輕又促進給定分子的資料庫和/或網頁搜尋。RDKit 使用演算法準確地將給定 SMILES 轉換為分子物件,然後可用於計算大量的分子性質/特徵。」
!pip -q install rdkit-pypi==2021.9.4
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import ast
import pandas as pd
import numpy as np
import tensorflow as tf
import keras
from keras import layers
from keras import ops
import matplotlib.pyplot as plt
from rdkit import Chem, RDLogger
from rdkit.Chem import BondType
from rdkit.Chem.Draw import MolsToGridImage
RDLogger.DisableLog("rdApp.*")
我們使用 ZINC – 用於虛擬篩選的商業可用化合物免費資料庫 資料集。該資料集隨附 SMILE 表示形式的分子式,以及它們各自的分子性質,例如 logP(水-辛醛分配係數)、SAS(合成可及性分數)和 QED(藥物相似性的定性估計)。
csv_path = keras.utils.get_file(
"250k_rndm_zinc_drugs_clean_3.csv",
"https://raw.githubusercontent.com/aspuru-guzik-group/chemical_vae/master/models/zinc_properties/250k_rndm_zinc_drugs_clean_3.csv",
)
df = pd.read_csv(csv_path)
df["smiles"] = df["smiles"].apply(lambda s: s.replace("\n", ""))
df.head()
smiles | logP | qed | SAS | |
---|---|---|---|---|
0 | CC(C)(C)c1ccc2occ(CC(=O)Nc3ccccc3F)c2c1 | 5.05060 | 0.702012 | 2.084095 |
1 | C[C@@H]1CC(Nc2cncc(-c3nncn3C)c2)C[C@@H](C)C1 | 3.11370 | 0.928975 | 3.432004 |
2 | N#Cc1ccc(-c2ccc(O[C@@H](C(=O)N3CCCC3)c3ccccc3)... | 4.96778 | 0.599682 | 2.470633 |
3 | CCOC(=O)[C@@H]1CCCN(C(=O)c2nc(-c3ccc(C)cc3)n3c... | 4.00022 | 0.690944 | 2.822753 |
4 | N#CC1=C(SCC(=O)Nc2cccc(Cl)c2)N=C([O-])[C@H](C#... | 3.60956 | 0.789027 | 4.035182 |
---
## Hyperparameters
```python
SMILE_CHARSET = '["C", "B", "F", "I", "H", "O", "N", "S", "P", "Cl", "Br"]'
bond_mapping = {"SINGLE": 0, "DOUBLE": 1, "TRIPLE": 2, "AROMATIC": 3}
bond_mapping.update(
{0: BondType.SINGLE, 1: BondType.DOUBLE, 2: BondType.TRIPLE, 3: BondType.AROMATIC}
)
SMILE_CHARSET = ast.literal_eval(SMILE_CHARSET)
MAX_MOLSIZE = max(df["smiles"].str.len())
SMILE_to_index = dict((c, i) for i, c in enumerate(SMILE_CHARSET))
index_to_SMILE = dict((i, c) for i, c in enumerate(SMILE_CHARSET))
atom_mapping = dict(SMILE_to_index)
atom_mapping.update(index_to_SMILE)
BATCH_SIZE = 100
EPOCHS = 10
VAE_LR = 5e-4
NUM_ATOMS = 120 # Maximum number of atoms
ATOM_DIM = len(SMILE_CHARSET) # Number of atom types
BOND_DIM = 4 + 1 # Number of bond types
LATENT_DIM = 435 # Size of the latent space
def smiles_to_graph(smiles):
# Converts SMILES to molecule object
molecule = Chem.MolFromSmiles(smiles)
# Initialize adjacency and feature tensor
adjacency = np.zeros((BOND_DIM, NUM_ATOMS, NUM_ATOMS), "float32")
features = np.zeros((NUM_ATOMS, ATOM_DIM), "float32")
# loop over each atom in molecule
for atom in molecule.GetAtoms():
i = atom.GetIdx()
atom_type = atom_mapping[atom.GetSymbol()]
features[i] = np.eye(ATOM_DIM)[atom_type]
# loop over one-hop neighbors
for neighbor in atom.GetNeighbors():
j = neighbor.GetIdx()
bond = molecule.GetBondBetweenAtoms(i, j)
bond_type_idx = bond_mapping[bond.GetBondType().name]
adjacency[bond_type_idx, [i, j], [j, i]] = 1
# Where no bond, add 1 to last channel (indicating "non-bond")
# Notice: channels-first
adjacency[-1, np.sum(adjacency, axis=0) == 0] = 1
# Where no atom, add 1 to last column (indicating "non-atom")
features[np.where(np.sum(features, axis=1) == 0)[0], -1] = 1
return adjacency, features
def graph_to_molecule(graph):
# Unpack graph
adjacency, features = graph
# RWMol is a molecule object intended to be edited
molecule = Chem.RWMol()
# Remove "no atoms" & atoms with no bonds
keep_idx = np.where(
(np.argmax(features, axis=1) != ATOM_DIM - 1)
& (np.sum(adjacency[:-1], axis=(0, 1)) != 0)
)[0]
features = features[keep_idx]
adjacency = adjacency[:, keep_idx, :][:, :, keep_idx]
# Add atoms to molecule
for atom_type_idx in np.argmax(features, axis=1):
atom = Chem.Atom(atom_mapping[atom_type_idx])
_ = molecule.AddAtom(atom)
# Add bonds between atoms in molecule; based on the upper triangles
# of the [symmetric] adjacency tensor
(bonds_ij, atoms_i, atoms_j) = np.where(np.triu(adjacency) == 1)
for bond_ij, atom_i, atom_j in zip(bonds_ij, atoms_i, atoms_j):
if atom_i == atom_j or bond_ij == BOND_DIM - 1:
continue
bond_type = bond_mapping[bond_ij]
molecule.AddBond(int(atom_i), int(atom_j), bond_type)
# Sanitize the molecule; for more information on sanitization, see
# https://www.rdkit.org/docs/RDKit_Book.html#molecular-sanitization
flag = Chem.SanitizeMol(molecule, catchErrors=True)
# Let's be strict. If sanitization fails, return None
if flag != Chem.SanitizeFlags.SANITIZE_NONE:
return None
return molecule
train_df = df.sample(frac=0.75, random_state=42) # random state is a seed value
train_df.reset_index(drop=True, inplace=True)
adjacency_tensor, feature_tensor, qed_tensor = [], [], []
for idx in range(8000):
adjacency, features = smiles_to_graph(train_df.loc[idx]["smiles"])
qed = train_df.loc[idx]["qed"]
adjacency_tensor.append(adjacency)
feature_tensor.append(features)
qed_tensor.append(qed)
adjacency_tensor = np.array(adjacency_tensor)
feature_tensor = np.array(feature_tensor)
qed_tensor = np.array(qed_tensor)
class RelationalGraphConvLayer(keras.layers.Layer):
def __init__(
self,
units=128,
activation="relu",
use_bias=False,
kernel_initializer="glorot_uniform",
bias_initializer="zeros",
kernel_regularizer=None,
bias_regularizer=None,
**kwargs
):
super().__init__(**kwargs)
self.units = units
self.activation = keras.activations.get(activation)
self.use_bias = use_bias
self.kernel_initializer = keras.initializers.get(kernel_initializer)
self.bias_initializer = keras.initializers.get(bias_initializer)
self.kernel_regularizer = keras.regularizers.get(kernel_regularizer)
self.bias_regularizer = keras.regularizers.get(bias_regularizer)
def build(self, input_shape):
bond_dim = input_shape[0][1]
atom_dim = input_shape[1][2]
self.kernel = self.add_weight(
shape=(bond_dim, atom_dim, self.units),
initializer=self.kernel_initializer,
regularizer=self.kernel_regularizer,
trainable=True,
name="W",
dtype="float32",
)
if self.use_bias:
self.bias = self.add_weight(
shape=(bond_dim, 1, self.units),
initializer=self.bias_initializer,
regularizer=self.bias_regularizer,
trainable=True,
name="b",
dtype="float32",
)
self.built = True
def call(self, inputs, training=False):
adjacency, features = inputs
# Aggregate information from neighbors
x = ops.matmul(adjacency, features[:, None])
# Apply linear transformation
x = ops.matmul(x, self.kernel)
if self.use_bias:
x += self.bias
# Reduce bond types dim
x_reduced = ops.sum(x, axis=1)
# Apply non-linear transformation
return self.activation(x_reduced)
編碼器將分子的圖形鄰接矩陣和特徵矩陣作為輸入。這些特徵透過圖形卷積層進行處理,然後將其展平,並透過數個密集層進行處理,以導出 z_mean
和 log_var
,即分子的潛在空間表示。
圖形卷積層:關係圖卷積層實作非線性轉換的鄰域聚合。我們可以將這些層定義如下
H_hat**(l+1) = σ(D_hat**(-1) * A_hat * H_hat**(l+1) * W**(l))
其中 σ
表示非線性轉換(通常是 ReLU 激活函數)、A
鄰接張量、H_hat**(l)
第 l
層的特徵張量、D_hat**(-1)
A_hat
的逆對角度張量,以及 W_hat**(l)
第 l
層的可訓練權重張量。具體而言,對於每個鍵類型(關係),度張量在對角線上表示附著於每個原子的鍵數。
來源:使用 R-GCN 的 WGAN-GP 來產生小型分子圖)
解碼器將潛在空間表示作為輸入,並預測相應分子的圖形鄰接矩陣和特徵矩陣。
def get_encoder(
gconv_units, latent_dim, adjacency_shape, feature_shape, dense_units, dropout_rate
):
adjacency = layers.Input(shape=adjacency_shape)
features = layers.Input(shape=feature_shape)
# Propagate through one or more graph convolutional layers
features_transformed = features
for units in gconv_units:
features_transformed = RelationalGraphConvLayer(units)(
[adjacency, features_transformed]
)
# Reduce 2-D representation of molecule to 1-D
x = layers.GlobalAveragePooling1D()(features_transformed)
# Propagate through one or more densely connected layers
for units in dense_units:
x = layers.Dense(units, activation="relu")(x)
x = layers.Dropout(dropout_rate)(x)
z_mean = layers.Dense(latent_dim, dtype="float32", name="z_mean")(x)
log_var = layers.Dense(latent_dim, dtype="float32", name="log_var")(x)
encoder = keras.Model([adjacency, features], [z_mean, log_var], name="encoder")
return encoder
def get_decoder(dense_units, dropout_rate, latent_dim, adjacency_shape, feature_shape):
latent_inputs = keras.Input(shape=(latent_dim,))
x = latent_inputs
for units in dense_units:
x = layers.Dense(units, activation="tanh")(x)
x = layers.Dropout(dropout_rate)(x)
# Map outputs of previous layer (x) to [continuous] adjacency tensors (x_adjacency)
x_adjacency = layers.Dense(np.prod(adjacency_shape))(x)
x_adjacency = layers.Reshape(adjacency_shape)(x_adjacency)
# Symmetrify tensors in the last two dimensions
x_adjacency = (x_adjacency + ops.transpose(x_adjacency, (0, 1, 3, 2))) / 2
x_adjacency = layers.Softmax(axis=1)(x_adjacency)
# Map outputs of previous layer (x) to [continuous] feature tensors (x_features)
x_features = layers.Dense(np.prod(feature_shape))(x)
x_features = layers.Reshape(feature_shape)(x_features)
x_features = layers.Softmax(axis=2)(x_features)
decoder = keras.Model(
latent_inputs, outputs=[x_adjacency, x_features], name="decoder"
)
return decoder
class Sampling(layers.Layer):
def __init__(self, seed=None, **kwargs):
super().__init__(**kwargs)
self.seed_generator = keras.random.SeedGenerator(seed)
def call(self, inputs):
z_mean, z_log_var = inputs
batch, dim = ops.shape(z_log_var)
epsilon = keras.random.normal(shape=(batch, dim), seed=self.seed_generator)
return z_mean + ops.exp(0.5 * z_log_var) * epsilon
此模型經過訓練以最佳化四個損失
分類交叉熵損失函數測量模型的重建準確性。性質預測損失估計通過性質預測模型運行潛在表示後,預測性質和實際性質之間的均方誤差。模型的性質預測透過二元交叉熵進行最佳化。梯度懲罰會進一步受到模型性質 (QED) 預測的引導。
梯度懲罰是 1-Lipschitz 連續性的替代軟性約束,是對原始神經網路中梯度裁剪方案的改進(「1-Lipschitz 連續性」表示梯度的範數在函數的每個單點處最多為 1)。它會將正規化項新增至損失函數。
class MoleculeGenerator(keras.Model):
def __init__(self, encoder, decoder, max_len, seed=None, **kwargs):
super().__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.property_prediction_layer = layers.Dense(1)
self.max_len = max_len
self.seed_generator = keras.random.SeedGenerator(seed)
self.sampling_layer = Sampling(seed=seed)
self.train_total_loss_tracker = keras.metrics.Mean(name="train_total_loss")
self.val_total_loss_tracker = keras.metrics.Mean(name="val_total_loss")
def train_step(self, data):
adjacency_tensor, feature_tensor, qed_tensor = data[0]
graph_real = [adjacency_tensor, feature_tensor]
self.batch_size = ops.shape(qed_tensor)[0]
with tf.GradientTape() as tape:
z_mean, z_log_var, qed_pred, gen_adjacency, gen_features = self(
graph_real, training=True
)
graph_generated = [gen_adjacency, gen_features]
total_loss = self._compute_loss(
z_log_var, z_mean, qed_tensor, qed_pred, graph_real, graph_generated
)
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.train_total_loss_tracker.update_state(total_loss)
return {"loss": self.train_total_loss_tracker.result()}
def _compute_loss(
self, z_log_var, z_mean, qed_true, qed_pred, graph_real, graph_generated
):
adjacency_real, features_real = graph_real
adjacency_gen, features_gen = graph_generated
adjacency_loss = ops.mean(
ops.sum(
keras.losses.categorical_crossentropy(
adjacency_real, adjacency_gen, axis=1
),
axis=(1, 2),
)
)
features_loss = ops.mean(
ops.sum(
keras.losses.categorical_crossentropy(features_real, features_gen),
axis=(1),
)
)
kl_loss = -0.5 * ops.sum(
1 + z_log_var - z_mean**2 - ops.minimum(ops.exp(z_log_var), 1e6), 1
)
kl_loss = ops.mean(kl_loss)
property_loss = ops.mean(
keras.losses.binary_crossentropy(qed_true, ops.squeeze(qed_pred, axis=1))
)
graph_loss = self._gradient_penalty(graph_real, graph_generated)
return kl_loss + property_loss + graph_loss + adjacency_loss + features_loss
def _gradient_penalty(self, graph_real, graph_generated):
# Unpack graphs
adjacency_real, features_real = graph_real
adjacency_generated, features_generated = graph_generated
# Generate interpolated graphs (adjacency_interp and features_interp)
alpha = keras.random.uniform(shape=(self.batch_size,), seed=self.seed_generator)
alpha = ops.reshape(alpha, (self.batch_size, 1, 1, 1))
adjacency_interp = (adjacency_real * alpha) + (
1.0 - alpha
) * adjacency_generated
alpha = ops.reshape(alpha, (self.batch_size, 1, 1))
features_interp = (features_real * alpha) + (1.0 - alpha) * features_generated
# Compute the logits of interpolated graphs
with tf.GradientTape() as tape:
tape.watch(adjacency_interp)
tape.watch(features_interp)
_, _, logits, _, _ = self(
[adjacency_interp, features_interp], training=True
)
# Compute the gradients with respect to the interpolated graphs
grads = tape.gradient(logits, [adjacency_interp, features_interp])
# Compute the gradient penalty
grads_adjacency_penalty = (1 - ops.norm(grads[0], axis=1)) ** 2
grads_features_penalty = (1 - ops.norm(grads[1], axis=2)) ** 2
return ops.mean(
ops.mean(grads_adjacency_penalty, axis=(-2, -1))
+ ops.mean(grads_features_penalty, axis=(-1))
)
def inference(self, batch_size):
z = keras.random.normal(
shape=(batch_size, LATENT_DIM), seed=self.seed_generator
)
reconstruction_adjacency, reconstruction_features = model.decoder.predict(z)
# obtain one-hot encoded adjacency tensor
adjacency = ops.argmax(reconstruction_adjacency, axis=1)
adjacency = ops.one_hot(adjacency, num_classes=BOND_DIM, axis=1)
# Remove potential self-loops from adjacency
adjacency = adjacency * (1.0 - ops.eye(NUM_ATOMS, dtype="float32")[None, None])
# obtain one-hot encoded feature tensor
features = ops.argmax(reconstruction_features, axis=2)
features = ops.one_hot(features, num_classes=ATOM_DIM, axis=2)
return [
graph_to_molecule([adjacency[i].numpy(), features[i].numpy()])
for i in range(batch_size)
]
def call(self, inputs):
z_mean, log_var = self.encoder(inputs)
z = self.sampling_layer([z_mean, log_var])
gen_adjacency, gen_features = self.decoder(z)
property_pred = self.property_prediction_layer(z_mean)
return z_mean, log_var, property_pred, gen_adjacency, gen_features
vae_optimizer = keras.optimizers.Adam(learning_rate=VAE_LR)
encoder = get_encoder(
gconv_units=[9],
adjacency_shape=(BOND_DIM, NUM_ATOMS, NUM_ATOMS),
feature_shape=(NUM_ATOMS, ATOM_DIM),
latent_dim=LATENT_DIM,
dense_units=[512],
dropout_rate=0.0,
)
decoder = get_decoder(
dense_units=[128, 256, 512],
dropout_rate=0.2,
latent_dim=LATENT_DIM,
adjacency_shape=(BOND_DIM, NUM_ATOMS, NUM_ATOMS),
feature_shape=(NUM_ATOMS, ATOM_DIM),
)
model = MoleculeGenerator(encoder, decoder, MAX_MOLSIZE)
model.compile(vae_optimizer)
history = model.fit([adjacency_tensor, feature_tensor, qed_tensor], epochs=EPOCHS)
Epoch 1/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 125s 481ms/step - loss: 2841.6440
Epoch 2/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 113s 451ms/step - loss: 197.5607
Epoch 3/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 115s 460ms/step - loss: 220.5820
Epoch 4/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 109s 434ms/step - loss: 394.0200
Epoch 5/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 109s 436ms/step - loss: 388.5954
Epoch 6/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 108s 431ms/step - loss: 323.4093
Epoch 7/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 108s 432ms/step - loss: 278.2234
Epoch 8/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 110s 439ms/step - loss: 393.4183
Epoch 9/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 114s 456ms/step - loss: 523.3671
Epoch 10/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 111s 445ms/step - loss: 223.5443
---
## Inference
We use our model to generate new valid molecules from different points of the latent space.
### Generate unique Molecules with the model
```python
molecules = model.inference(1000)
MolsToGridImage(
[m for m in molecules if m is not None][:1000], molsPerRow=5, subImgSize=(260, 160)
)
def plot_latent(vae, data, labels):
# display a 2D plot of the property in the latent space
z_mean, _ = vae.encoder.predict(data)
plt.figure(figsize=(12, 10))
plt.scatter(z_mean[:, 0], z_mean[:, 1], c=labels)
plt.colorbar()
plt.xlabel("z[0]")
plt.ylabel("z[1]")
plt.show()
plot_latent(model, [adjacency_tensor[:8000], feature_tensor[:8000]], qed_tensor[:8000])