程式碼範例 / 電腦視覺 / 使用 Siamese 網路與三元組損失函數進行圖像相似度估計

使用 Siamese 網路與三元組損失函數進行圖像相似度估計

作者: Hazem EssamSantiago L. Valdarrama
建立日期 2021/03/25
上次修改 2021/03/25
描述: 訓練一個 Siamese 網路,使用三元組損失函數比較圖像的相似度。

ⓘ 這個範例使用 Keras 3

在 Colab 中檢視 GitHub 原始碼


簡介

Siamese 網路是一種網路架構,其中包含兩個或多個相同的子網路,用於為每個輸入生成特徵向量並比較它們。

Siamese 網路可以應用於不同的用例,例如檢測重複項、尋找異常和人臉辨識。

這個範例使用一個具有三個相同子網路的 Siamese 網路。我們將向模型提供三個圖像,其中兩個圖像將相似(錨點正向樣本),第三個將不相關(一個負向範例)。我們的目標是讓模型學習估計圖像之間的相似度。

為了讓網路學習,我們使用三元組損失函數。您可以在 Schroff 等人於 2015 年發表的 FaceNet 論文中找到三元組損失函數的介紹。在這個範例中,我們將三元組損失函數定義如下:

L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)

這個範例使用 Totally Looks Like 資料集,由 Rosenfeld 等人於 2018 年發表。


設定

import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from pathlib import Path
from keras import applications
from keras import layers
from keras import losses
from keras import ops
from keras import optimizers
from keras import metrics
from keras import Model
from keras.applications import resnet


target_shape = (200, 200)

載入資料集

我們將載入 Totally Looks Like 資料集,並將其解壓縮到本機環境中的 ~/.keras 目錄中。

資料集由兩個獨立的檔案組成:

  • left.zip 包含我們將用作錨點的圖像。
  • right.zip 包含我們將用作正向樣本的圖像(看起來像錨點的圖像)。
cache_dir = Path(Path.home()) / ".keras"
anchor_images_path = cache_dir / "left"
positive_images_path = cache_dir / "right"
!gdown --id 1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34
!gdown --id 1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW
!unzip -oq left.zip -d $cache_dir
!unzip -oq right.zip -d $cache_dir
Downloading...
From (uriginal): https://drive.google.com/uc?id=1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34
From (redirected): https://drive.google.com/uc?id=1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34&confirm=t&uuid=be98abe4-8be7-4c5f-a8f9-ca95d178fbda
To: /home/scottzhu/keras-io/scripts/tmp_9629511/left.zip
100%|█████████████████████████████████████████| 104M/104M [00:00<00:00, 278MB/s]
/home/scottzhu/.local/lib/python3.10/site-packages/gdown/cli.py:126: FutureWarning: Option `--id` was deprecated in version 4.3.1 and will be removed in 5.0. You don't need to pass it anymore to use a file ID.
Downloading...
From (uriginal): https://drive.google.com/uc?id=1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW
From (redirected): https://drive.google.com/uc?id=1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW&confirm=t&uuid=0eb1b2e2-beee-462a-a9b8-c0bf21bea257
To: /home/scottzhu/keras-io/scripts/tmp_9629511/right.zip
100%|█████████████████████████████████████████| 104M/104M [00:00<00:00, 285MB/s]

準備資料

我們將使用 tf.data 管道來載入資料並產生訓練 Siamese 網路所需的三元組。

我們將使用包含錨點、正向和負向檔案名稱的壓縮清單來設定管道作為來源。該管道將載入並預處理相應的圖像。

def preprocess_image(filename):
    """
    Load the specified file as a JPEG image, preprocess it and
    resize it to the target shape.
    """

    image_string = tf.io.read_file(filename)
    image = tf.image.decode_jpeg(image_string, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, target_shape)
    return image


def preprocess_triplets(anchor, positive, negative):
    """
    Given the filenames corresponding to the three images, load and
    preprocess them.
    """

    return (
        preprocess_image(anchor),
        preprocess_image(positive),
        preprocess_image(negative),
    )

讓我們使用包含錨點、正向和負向圖像檔案名稱的壓縮清單來設定我們的資料管道作為來源。該管道的輸出包含相同的三元組,其中每個圖像都已載入和預處理。

# We need to make sure both the anchor and positive images are loaded in
# sorted order so we can match them together.
anchor_images = sorted(
    [str(anchor_images_path / f) for f in os.listdir(anchor_images_path)]
)

positive_images = sorted(
    [str(positive_images_path / f) for f in os.listdir(positive_images_path)]
)

image_count = len(anchor_images)

anchor_dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
positive_dataset = tf.data.Dataset.from_tensor_slices(positive_images)

# To generate the list of negative images, let's randomize the list of
# available images and concatenate them together.
rng = np.random.RandomState(seed=42)
rng.shuffle(anchor_images)
rng.shuffle(positive_images)

negative_images = anchor_images + positive_images
np.random.RandomState(seed=32).shuffle(negative_images)

negative_dataset = tf.data.Dataset.from_tensor_slices(negative_images)
negative_dataset = negative_dataset.shuffle(buffer_size=4096)

dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.map(preprocess_triplets)

# Let's now split our dataset in train and validation.
train_dataset = dataset.take(round(image_count * 0.8))
val_dataset = dataset.skip(round(image_count * 0.8))

train_dataset = train_dataset.batch(32, drop_remainder=False)
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)

val_dataset = val_dataset.batch(32, drop_remainder=False)
val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE)

讓我們看看一些三元組的範例。請注意,前兩個圖像看起來很相似,而第三個圖像總是不同的。

def visualize(anchor, positive, negative):
    """Visualize a few triplets from the supplied batches."""

    def show(ax, image):
        ax.imshow(image)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    fig = plt.figure(figsize=(9, 9))

    axs = fig.subplots(3, 3)
    for i in range(3):
        show(axs[i, 0], anchor[i])
        show(axs[i, 1], positive[i])
        show(axs[i, 2], negative[i])


visualize(*list(train_dataset.take(1).as_numpy_iterator())[0])

png


設定嵌入產生器模型

我們的 Siamese 網路將為三元組中的每個圖像產生嵌入。為此,我們將使用在 ImageNet 上預先訓練的 ResNet50 模型,並將一些 Dense 層連接到它,以便我們可以學習分離這些嵌入。

我們將凍結模型中所有層的權重,直到 conv5_block1_out 層。這對於避免影響模型已經學習到的權重非常重要。我們將保留底部的一些層可訓練,以便我們可以在訓練期間微調它們的權重。

base_cnn = resnet.ResNet50(
    weights="imagenet", input_shape=target_shape + (3,), include_top=False
)

flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
dense2 = layers.Dense(256, activation="relu")(dense1)
dense2 = layers.BatchNormalization()(dense2)
output = layers.Dense(256)(dense2)

embedding = Model(base_cnn.input, output, name="Embedding")

trainable = False
for layer in base_cnn.layers:
    if layer.name == "conv5_block1_out":
        trainable = True
    layer.trainable = trainable

設定 Siamese 網路模型

Siamese 網路將接收每個三元組圖像作為輸入,產生嵌入,並輸出錨點與正向嵌入之間的距離,以及錨點與負向嵌入之間的距離。

為了計算距離,我們可以使用自訂層 DistanceLayer,它會將兩個值作為元組傳回。

class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = ops.sum(tf.square(anchor - positive), -1)
        an_distance = ops.sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)


anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))

distances = DistanceLayer()(
    embedding(resnet.preprocess_input(anchor_input)),
    embedding(resnet.preprocess_input(positive_input)),
    embedding(resnet.preprocess_input(negative_input)),
)

siamese_network = Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

將所有內容整合在一起

我們現在需要使用自訂訓練迴圈來實作一個模型,以便我們可以使用 Siamese 網路產生的三個嵌入來計算三元組損失。

讓我們建立一個 Mean 指標實例,以追蹤訓練過程中的損失。

class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=0.5):
        super().__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`.
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        # Let's update and return the loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker]

訓練

現在我們準備好訓練模型了。

siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.0001))
siamese_model.fit(train_dataset, epochs=10, validation_data=val_dataset)
Epoch 1/10
   1/151 ━━━━━━━━━━━━━━━━━━━━  1:21:32 33s/step - loss: 1.5020

WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1699919378.193493    9680 device_compiler.h:187] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.

 151/151 ━━━━━━━━━━━━━━━━━━━━ 80s 317ms/step - loss: 0.7004 - val_loss: 0.3704
Epoch 2/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 136ms/step - loss: 0.3749 - val_loss: 0.3609
Epoch 3/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 21s 140ms/step - loss: 0.3548 - val_loss: 0.3399
Epoch 4/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 135ms/step - loss: 0.3432 - val_loss: 0.3533
Epoch 5/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.3299 - val_loss: 0.3522
Epoch 6/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 135ms/step - loss: 0.3263 - val_loss: 0.3177
Epoch 7/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.3032 - val_loss: 0.3308
Epoch 8/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.2944 - val_loss: 0.3282
Epoch 9/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 135ms/step - loss: 0.2893 - val_loss: 0.3046
Epoch 10/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.2679 - val_loss: 0.2841

<keras.src.callbacks.history.History at 0x7f6945c08820>

檢視網路學習到的內容

此時,我們可以檢查網路如何學習根據嵌入向量是否屬於相似的圖像來區分它們。

我們可以使用 餘弦相似度 來衡量嵌入向量之間的相似性。

讓我們從資料集中選取一個樣本,以檢查為每個圖像生成的嵌入向量之間的相似性。

sample = next(iter(train_dataset))
visualize(*sample)

anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
    embedding(resnet.preprocess_input(anchor)),
    embedding(resnet.preprocess_input(positive)),
    embedding(resnet.preprocess_input(negative)),
)

png

最後,我們可以計算錨點圖像和正向圖像之間的餘弦相似度,並將其與錨點圖像和負向圖像之間的相似度進行比較。

我們應該預期錨點圖像和正向圖像之間的相似度大於錨點圖像和負向圖像之間的相似度。

cosine_similarity = metrics.CosineSimilarity()

positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print("Positive similarity:", positive_similarity.numpy())

negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print("Negative similarity", negative_similarity.numpy())
Positive similarity: 0.99608964
Negative similarity 0.9941576

總結

  1. tf.data API 使您能夠為您的模型建立有效率的輸入管道。如果您有大型資料集,這特別有用。您可以從 tf.data:建立 TensorFlow 輸入管道 中深入了解 tf.data 管道。

  2. 在這個範例中,我們使用預訓練的 ResNet50 作為子網路的一部分,以生成特徵嵌入向量。透過使用 遷移學習,我們可以顯著縮短訓練時間和資料集大小。

  3. 請注意我們如何 微調 ResNet50 網路最後幾層的權重,但保持其餘層不動。使用指定給每一層的名稱,我們可以將權重凍結到特定點,並保持最後幾層開放。

  4. 我們可以透過建立一個繼承自 tf.keras.layers.Layer 的類別來建立自訂層,就像我們在 DistanceLayer 類別中所做的一樣。

  5. 我們使用餘弦相似度指標來衡量 2 個輸出嵌入向量彼此之間的相似程度。

  6. 您可以透過覆寫 train_step() 方法來實作自訂訓練迴圈。train_step() 使用 tf.GradientTape,它會記錄您在其內部執行的每個操作。在這個範例中,我們使用它來存取傳遞給最佳化器的梯度,以在每個步驟更新模型權重。如需更多詳細資訊,請查看 Keras 研究人員入門從頭開始編寫訓練迴圈