Keras 3 API 文件 / 資料載入 / 時間序列資料載入

時間序列資料載入

[原始碼]

timeseries_dataset_from_array 函數

keras.utils.timeseries_dataset_from_array(
    data,
    targets,
    sequence_length,
    sequence_stride=1,
    sampling_rate=1,
    batch_size=128,
    shuffle=False,
    seed=None,
    start_index=None,
    end_index=None,
)

從陣列提供的時間序列建立滑動視窗的資料集。

此函數接收以相等間隔收集的資料點序列,以及時間序列參數,例如序列/視窗的長度、兩個序列/視窗之間的間距等,以產生時間序列輸入和目標的批次。

引數

  • data:包含連續資料點(時間步)的 Numpy 陣列或 Eager 張量。軸 0 預期為時間維度。
  • targets:對應於 data 中時間步的目標。targets[i] 應為對應於從索引 i 開始的視窗的目標(請參閱下方的範例 2)。如果您沒有目標資料,請傳遞 None(在這種情況下,資料集將僅產生輸入資料)。
  • sequence_length:輸出序列的長度(以時間步數為單位)。
  • sequence_stride:連續輸出序列之間的週期。對於步幅 s,輸出樣本將從索引 data[i]data[i + s]data[i + 2 * s] 等開始。
  • sampling_rate:序列內連續個別時間步之間的週期。對於速率 r,時間步 data[i], data[i + r], ... data[i + sequence_length] 用於建立樣本序列。
  • batch_size:每個批次中的時間序列樣本數(最後一個批次可能除外)。如果為 None,則資料將不會批次處理(資料集將產生個別樣本)。
  • shuffle:是否打亂輸出樣本,或改為按時間順序繪製它們。
  • seed:選用整數;用於打亂的隨機種子。
  • start_index:選用整數;早於(不包含)start_index 的資料點將不會用於輸出序列。這對於保留部分資料以進行測試或驗證非常有用。
  • end_index:選用整數;晚於(不包含)end_index 的資料點將不會用於輸出序列。這對於保留部分資料以進行測試或驗證非常有用。

返回

一個 tf.data.Dataset 實例。如果傳遞了 targets,則資料集會產生元組 (batch_of_sequences, batch_of_targets)。否則,資料集僅產生 batch_of_sequences

範例 1

考慮索引 [0, 1, ... 98]。使用 sequence_length=10, sampling_rate=2, sequence_stride=3, shuffle=False,資料集將產生由以下索引組成的序列批次

First sequence:  [0  2  4  6  8 10 12 14 16 18]
Second sequence: [3  5  7  9 11 13 15 17 19 21]
Third sequence:  [6  8 10 12 14 16 18 20 22 24]
...
Last sequence:   [78 80 82 84 86 88 90 92 94 96]

在這種情況下,最後 2 個資料點被捨棄,因為無法產生包含它們的完整序列(下一個序列將從索引 81 開始,因此其最後一步將超過 98)。

範例 2:時間迴歸。

考慮形狀為 (steps,) 的純量值陣列 data。若要產生使用過去 10 個時間步來預測下一個時間步的資料集,您可以使用

input_data = data[:-10]
targets = data[10:]
dataset = timeseries_dataset_from_array(
    input_data, targets, sequence_length=10)
for batch in dataset:
  inputs, targets = batch
  assert np.array_equal(inputs[0], data[:10])  # First sequence: steps [0-9]
  # Corresponding target: step 10
  assert np.array_equal(targets[0], data[10])
  break

範例 3:多對多架構的時間迴歸。

考慮兩個純量值陣列 XY,兩者的形狀均為 (100,)。產生的資料集應包含每個樣本 20 個時間戳記。樣本不應重疊。若要產生使用目前時間戳記來預測對應目標時間步的資料集,您可以使用

X = np.arange(100)
Y = X*2

sample_length = 20
input_dataset = timeseries_dataset_from_array(
    X, None, sequence_length=sample_length, sequence_stride=sample_length)
target_dataset = timeseries_dataset_from_array(
    Y, None, sequence_length=sample_length, sequence_stride=sample_length)

for batch in zip(input_dataset, target_dataset):
    inputs, targets = batch
    assert np.array_equal(inputs[0], X[:sample_length])

    # second sample equals output timestamps 20-40
    assert np.array_equal(targets[1], Y[sample_length:2*sample_length])
    break

[原始碼]

pad_sequences 函數

keras.utils.pad_sequences(
    sequences, maxlen=None, dtype="int32", padding="pre", truncating="pre", value=0.0
)

將序列填充到相同長度。

此函數將序列列表(長度為 num_samples)(整數列表)轉換為形狀為 (num_samples, num_timesteps) 的 2D NumPy 陣列。num_timesteps 要么是提供的 maxlen 引數,要么是列表中最長序列的長度。

短於 num_timesteps 的序列會使用 value 填充,直到它們達到 num_timesteps 長度。

長於 num_timesteps 的序列會被截斷,以便符合所需的長度。

填充或截斷發生的位置由引數 paddingtruncating 分別決定。從序列的開頭進行預先填充或移除值是預設行為。

>>> sequence = [[1], [2, 3], [4, 5, 6]]
>>> keras.utils.pad_sequences(sequence)
array([[0, 0, 1],
       [0, 2, 3],
       [4, 5, 6]], dtype=int32)
>>> keras.utils.pad_sequences(sequence, value=-1)
array([[-1, -1,  1],
       [-1,  2,  3],
       [ 4,  5,  6]], dtype=int32)
>>> keras.utils.pad_sequences(sequence, padding='post')
array([[1, 0, 0],
       [2, 3, 0],
       [4, 5, 6]], dtype=int32)
>>> keras.utils.pad_sequences(sequence, maxlen=2)
array([[0, 1],
       [2, 3],
       [5, 6]], dtype=int32)

引數

  • sequences:序列列表(每個序列都是整數列表)。
  • maxlen:選用整數,所有序列的最大長度。如果未提供,序列將填充到最長個別序列的長度。
  • dtype:(選用,預設為 "int32")。輸出序列的類型。若要使用可變長度字串填充序列,您可以使用 object
  • padding:字串,"pre" 或 "post"(選用,預設為 "pre"):在每個序列之前或之後填充。
  • truncating:字串,"pre" 或 "post"(選用,預設為 "pre"):從大於 maxlen 的序列中移除值,在序列的開頭或結尾。
  • value:浮點數或字串,填充值。(選用,預設為 0.

返回

形狀為 (len(sequences), maxlen) 的 NumPy 陣列