開發者指南 / 透過子類別化建立新的層與模型

透過子類別化建立新的層和模型

作者: fchollet
建立日期 2019/03/01
上次修改日期 2023/06/25
描述: 從頭開始編寫 LayerModel 物件的完整指南。

在 Colab 中檢視 GitHub 來源


簡介

本指南將涵蓋您建立自己的子類別化層和模型所需的一切知識。特別是,您將了解以下功能

  • Layer 類別
  • add_weight() 方法
  • 可訓練和不可訓練的權重
  • build() 方法
  • 確保您的層可以在任何後端中使用
  • add_loss() 方法
  • call() 中的 training 引數
  • call() 中的 mask 引數
  • 確保您的層可以序列化

讓我們開始吧。


設定

import numpy as np
import keras
from keras import ops
from keras import layers

Layer 類別:狀態(權重)和某些計算的組合

Keras 中最重要的抽象概念之一是 Layer 類別。一個層封裝了一個狀態(層的「權重」)和從輸入到輸出的轉換(一個「呼叫」,層的前向傳遞)。

這是一個密集連接層。它有兩個狀態變數:變數 wb

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super().__init__()
        self.w = self.add_weight(
            shape=(input_dim, units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return ops.matmul(inputs, self.w) + self.b

您可以像 Python 函式一樣,透過在某些張量輸入上呼叫它來使用層。

x = ops.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
[[ 0.085416   -0.06821361 -0.00741937 -0.03429271]
 [ 0.085416   -0.06821361 -0.00741937 -0.03429271]]

請注意,權重 wb 在設定為層屬性後會自動由層追蹤

assert linear_layer.weights == [linear_layer.w, linear_layer.b]

層可以有不可訓練的權重

除了可訓練的權重之外,您也可以在層中新增不可訓練的權重。當您訓練層時,這些權重不會被納入反向傳播的考量。

以下是如何新增和使用不可訓練的權重

class ComputeSum(keras.layers.Layer):
    def __init__(self, input_dim):
        super().__init__()
        self.total = self.add_weight(
            initializer="zeros", shape=(input_dim,), trainable=False
        )

    def call(self, inputs):
        self.total.assign_add(ops.sum(inputs, axis=0))
        return self.total


x = ops.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())
[2. 2.]
[4. 4.]

它是 layer.weights 的一部分,但它被歸類為不可訓練的權重

print("weights:", len(my_sum.weights))
print("non-trainable weights:", len(my_sum.non_trainable_weights))

# It's not included in the trainable weights:
print("trainable_weights:", my_sum.trainable_weights)
weights: 1
non-trainable weights: 1
trainable_weights: []

最佳實務:將權重建立延遲到已知輸入形狀

我們上面的 Linear 層採用了一個 input_dim 引數,該引數用於計算 __init__() 中的權重 wb 的形狀

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super().__init__()
        self.w = self.add_weight(
            shape=(input_dim, units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return ops.matmul(inputs, self.w) + self.b

在許多情況下,您可能無法預先知道輸入的大小,並且您希望在實例化層之後的某個時間點,當該值已知時,才懶惰地建立權重。

在 Keras API 中,我們建議在您的層的 build(self, inputs_shape) 方法中建立層權重。像這樣

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return ops.matmul(inputs, self.w) + self.b

您的層的 __call__() 方法會在第一次呼叫時自動執行建構。您現在有一個懶惰且因此更容易使用的層

# At instantiation, we don't know on what inputs this is going to get called
linear_layer = Linear(32)

# The layer's weights are created dynamically the first time the layer is called
y = linear_layer(x)

如上所示,單獨實作 build() 可以很好地將只建立一次權重與每次呼叫中使用權重分開。


層可以遞迴組合

如果您將一個 Layer 實例指定為另一個 Layer 的屬性,則外部層將開始追蹤內部層建立的權重。

我們建議在 __init__() 方法中建立此類子層,並讓第一次 __call__() 觸發建構它們的權重。

class MLPBlock(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)

    def call(self, inputs):
        x = self.linear_1(inputs)
        x = keras.activations.relu(x)
        x = self.linear_2(x)
        x = keras.activations.relu(x)
        return self.linear_3(x)


mlp = MLPBlock()
y = mlp(ops.ones(shape=(3, 64)))  # The first call to the `mlp` will create the weights
print("weights:", len(mlp.weights))
print("trainable weights:", len(mlp.trainable_weights))
weights: 6
trainable weights: 6

與後端無關的層和特定於後端的層

只要一個層僅使用 keras.ops 命名空間(或其他 Keras 命名空間,例如 keras.activationskeras.randomkeras.layers)中的 API,它就可以與任何後端(TensorFlow、JAX 或 PyTorch)一起使用。

您在本指南中到目前為止看到的所有層都可以與所有 Keras 後端一起使用。

keras.ops 命名空間可讓您存取

  • NumPy API,例如 ops.matmulops.sumops.reshapeops.stack 等。
  • 特定於神經網路的 API,例如 ops.softmaxops.convops.binary_crossentropyops.relu` 等。

您也可以在您的層中使用後端原生 API(例如 tf.nn 函式),但是如果您這樣做,則您的層將僅能與相關的後端一起使用。例如,您可以使用 jax.numpy 編寫以下特定於 JAX 的層

import jax

class Linear(keras.layers.Layer):
    ...

    def call(self, inputs):
        return jax.numpy.matmul(inputs, self.w) + self.b

這將是等效的特定於 TensorFlow 的層

import tensorflow as tf

class Linear(keras.layers.Layer):
    ...

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

這將是等效的特定於 PyTorch 的層

import torch

class Linear(keras.layers.Layer):
    ...

    def call(self, inputs):
        return torch.matmul(inputs, self.w) + self.b

由於跨後端相容性是非常有用的屬性,我們強烈建議您盡可能透過僅利用 Keras API 來使您的層與後端無關。


add_loss() 方法

當編寫層的 call() 方法時,您可以建立您稍後在編寫訓練迴圈時想要使用的損失張量。這可以透過呼叫 self.add_loss(value) 來實現

# A layer that creates an activity regularization loss
class ActivityRegularizationLayer(keras.layers.Layer):
    def __init__(self, rate=1e-2):
        super().__init__()
        self.rate = rate

    def call(self, inputs):
        self.add_loss(self.rate * ops.mean(inputs))
        return inputs

這些損失(包括任何內部層建立的損失)可以透過 layer.losses 檢索。此屬性會在每次呼叫最上層層的 __call__() 開始時重設,以便 layer.losses 始終包含在上次前向傳遞期間建立的損失值。

class OuterLayer(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)

    def call(self, inputs):
        return self.activity_reg(inputs)


layer = OuterLayer()
assert len(layer.losses) == 0  # No losses yet since the layer has never been called

_ = layer(ops.zeros((1, 1)))
assert len(layer.losses) == 1  # We created one loss value

# `layer.losses` gets reset at the start of each __call__
_ = layer(ops.zeros((1, 1)))
assert len(layer.losses) == 1  # This is the loss created during the call above

此外,loss 屬性還包含為任何內部層的權重建立的正規化損失

class OuterLayerWithKernelRegularizer(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.dense = keras.layers.Dense(
            32, kernel_regularizer=keras.regularizers.l2(1e-3)
        )

    def call(self, inputs):
        return self.dense(inputs)


layer = OuterLayerWithKernelRegularizer()
_ = layer(ops.zeros((1, 1)))

# This is `1e-3 * sum(layer.dense.kernel ** 2)`,
# created by the `kernel_regularizer` above.
print(layer.losses)
[Array(0.00217911, dtype=float32)]

這些損失旨在在編寫自訂訓練迴圈時加以考慮。

它們也可以與 fit() 無縫協作(它們會自動加總並新增到主要損失(如果有的話))

inputs = keras.Input(shape=(3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)

# If there is a loss passed in `compile`, the regularization
# losses get added to it
model.compile(optimizer="adam", loss="mse")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))

# It's also possible not to pass any loss in `compile`,
# since the model already has a loss to minimize, via the `add_loss`
# call during the forward pass!
model.compile(optimizer="adam")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 60ms/step - loss: 0.2650
 1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 15ms/step - loss: 0.0050

<keras.src.callbacks.history.History at 0x146f71960>

您可以選擇在您的層上啟用序列化

如果您需要您的自訂層可序列化為 函數式模型 的一部分,您可以選擇實作 get_config() 方法

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return ops.matmul(inputs, self.w) + self.b

    def get_config(self):
        return {"units": self.units}


# Now you can recreate the layer from its config:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'units': 64}

請注意,基底 Layer 類別的 __init__() 方法接受一些關鍵字引數,特別是 namedtype。最佳實務是在 __init__() 中將這些引數傳遞給父類別,並將它們包含在層設定中

class Linear(keras.layers.Layer):
    def __init__(self, units=32, **kwargs):
        super().__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return ops.matmul(inputs, self.w) + self.b

    def get_config(self):
        config = super().get_config()
        config.update({"units": self.units})
        return config


layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'name': 'linear_7', 'trainable': True, 'dtype': 'float32', 'units': 64}

如果您在從其設定反序列化層時需要更高的彈性,您也可以覆寫 from_config() 類別方法。這是 from_config() 的基底實作

def from_config(cls, config):
    return cls(**config)

若要深入了解序列化和儲存,請參閱完整的 模型儲存和序列化指南


call() 方法中的特權 training 引數

某些層,特別是 BatchNormalization 層和 Dropout 層,在訓練和推論期間具有不同的行為。對於此類層,標準實務是在 call() 方法中公開 training(布林值)引數。

透過在 call() 中公開此引數,您可以讓內建的訓練和評估迴圈(例如 fit())在訓練和推論中正確使用該層。

class CustomDropout(keras.layers.Layer):
    def __init__(self, rate, **kwargs):
        super().__init__(**kwargs)
        self.rate = rate
        self.seed_generator = keras.random.SeedGenerator(1337)

    def call(self, inputs, training=None):
        if training:
            return keras.random.dropout(
                inputs, rate=self.rate, seed=self.seed_generator
            )
        return inputs

call() 方法中的特權 mask 引數

call() 支援的另一個特權引數是 mask 引數。

您會在所有 Keras RNN 層中找到它。遮罩是一個布林張量(輸入中的每個時間步一個布林值),用於在處理時間序列資料時跳過某些輸入時間步。

當先前的層產生遮罩時,Keras 會自動將正確的 mask 引數傳遞給支援它的層的 __call__()。產生遮罩的層是配置了 mask_zero=TrueEmbedding 層和 Masking 層。


Model 類別

一般而言,您將使用 Layer 類別來定義內部計算區塊,並將使用 Model 類別來定義外部模型 - 您將訓練的物件。

例如,在 ResNet50 模型中,您將有幾個子類別化 Layer 的 ResNet 區塊,以及一個包含整個 ResNet50 網路的單一 Model

Model 類別具有與 Layer 相同的 API,但有以下差異

  • 它公開了內建的訓練、評估和預測迴圈(model.fit()model.evaluate()model.predict())。
  • 它透過 model.layers 屬性公開其內部層的清單。
  • 它公開了儲存和序列化 API (save(), save_weights()...)

實際上,Layer 類別對應於我們在文獻中稱為「層」(如「捲積層」或「遞迴層」)或「區塊」(如「ResNet 區塊」或「Inception 區塊」)的內容。

同時,Model 類別對應於文獻中稱為「模型」(如「深度學習模型」)或「網路」(如「深度神經網路」)的內容。

因此,如果您想知道,「我應該使用 Layer 類別還是 Model 類別?」,請自問:我是否需要在它上面呼叫 fit()?我是否需要在它上面呼叫 save()?如果是,請選擇 Model。如果不是(因為您的類別只是較大系統中的一個區塊,或者因為您自己編寫訓練和儲存程式碼),請使用 Layer

例如,我們可以採用上面我們的小型 resnet 範例,並使用它來建構一個可以使用 fit() 訓練並且可以使用 save_weights() 儲存的 Model

class ResNet(keras.Model):

    def __init__(self, num_classes=1000):
        super().__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)

    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)


resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save(filepath.keras)

將它們整合在一起:一個端對端範例

以下是您到目前為止所學到的內容

  • Layer 封裝了一個狀態(在 __init__()build() 中建立)和一些計算(在 call() 中定義)。
  • 可以遞迴巢狀層來建立新的、更大的計算區塊。
  • 只要層僅使用 Keras API,它們就與後端無關。您可以使用後端原生 API(例如 jax.numpytorch.nntf.nn),但隨後您的層將僅能與該特定後端一起使用。
  • 層可以透過 add_loss() 建立和追蹤損失(通常是正規化損失)。
  • 外部容器,也就是您想要訓練的東西,是一個 ModelModel 就像一個 Layer,但增加了訓練和序列化公用程式。

讓我們將所有這些東西整合到一個端對端範例中:我們將以與後端無關的方式實作一個變分自編碼器 (Variational AutoEncoder, VAE) – 這樣它就能在 TensorFlow、JAX 和 PyTorch 上以相同的方式執行。我們將在 MNIST 數字集上訓練它。

我們的 VAE 將會是 Model 的子類別,構建成 Layer 子類別的巢狀層組合。它將具有一個正規化損失(KL 散度)。

class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.seed_generator = keras.random.SeedGenerator(1337)

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = ops.shape(z_mean)[0]
        dim = ops.shape(z_mean)[1]
        epsilon = keras.random.normal(shape=(batch, dim), seed=self.seed_generator)
        return z_mean + ops.exp(0.5 * z_log_var) * epsilon


class Encoder(layers.Layer):
    """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

    def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
        super().__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z


class Decoder(layers.Layer):
    """Converts z, the encoded digit vector, back into a readable digit."""

    def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
        super().__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_output = layers.Dense(original_dim, activation="sigmoid")

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)


class VariationalAutoEncoder(keras.Model):
    """Combines the encoder and decoder into an end-to-end model for training."""

    def __init__(
        self,
        original_dim,
        intermediate_dim=64,
        latent_dim=32,
        name="autoencoder",
        **kwargs
    ):
        super().__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # Add KL divergence regularization loss.
        kl_loss = -0.5 * ops.mean(
            z_log_var - ops.square(z_mean) - ops.exp(z_log_var) + 1
        )
        self.add_loss(kl_loss)
        return reconstructed

讓我們使用 fit() API 在 MNIST 上訓練它

(x_train, _), _ = keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

original_dim = 784
vae = VariationalAutoEncoder(784, 64, 32)

optimizer = keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=keras.losses.MeanSquaredError())

vae.fit(x_train, x_train, epochs=2, batch_size=64)
Epoch 1/2
 938/938 ━━━━━━━━━━━━━━━━━━━━ 2s 1ms/step - loss: 0.0942
Epoch 2/2
 938/938 ━━━━━━━━━━━━━━━━━━━━ 1s 859us/step - loss: 0.0677

<keras.src.callbacks.history.History at 0x146fe62f0>