import numpy as np import tensorflow as tf from tensorflow.keras.layers import Dense, LayerNormalization, Dropout, MultiHeadAttention, Flatten, Input, Lambda from tensorflow.keras.models import Model from tensorflow.keras.optimizers import Adam from tensorflow.keras.utils import Sequence # Generating synthetic time-series data def generate_data(size=1000, sequence_length=10): data = np.sin(np.linspace(0, 10 * np.pi, size)) # Sine wave data sequences = np.array([data[i:i+sequence_length] for i in range(size - sequence_length)]) next_points = data[sequence_length:] return sequences, next_points # Custom dataset class using Keras Sequence class TimeSeriesDataset(Sequence): def __init__(self, sequences, next_points, batch_size=32, shuffle=True): self.sequences = sequences self.next_points = next_points self.batch_size = batch_size self.shuffle = shuffle self.indices = np.arange(len(self.sequences)) self.on_epoch_end() def __len__(self): return int(np.ceil(len(self.sequences) / self.batch_size)) def __getitem__(self, index): batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size] X = self.sequences[batch_indices] y = self.next_points[batch_indices] return X, y def on_epoch_end(self): if self.shuffle: np.random.shuffle(self.indices) # Transformer model definition using Keras def create_transformer_model(sequence_length, d_model=32, num_heads=2, ff_dim=64, num_layers=1, dropout=0.1): inputs = Input(shape=(sequence_length,)) # Wrap tf.expand_dims in a Lambda layer to work with Keras tensors x = Lambda(lambda x: tf.expand_dims(x, axis=-1))(inputs) # Shape: (batch, sequence_length, 1) x = Dense(d_model)(x) for _ in range(num_layers): attn_output = MultiHeadAttention(num_heads=num_heads, key_dim=d_model, dropout=dropout)(x, x) attn_output = Dropout(dropout)(attn_output) x = LayerNormalization(epsilon=1e-6)(x + attn_output) ff = Dense(ff_dim, activation="relu")(x) ff = Dense(d_model)(ff) ff = Dropout(dropout)(ff) x = LayerNormalization(epsilon=1e-6)(x + ff) x = Flatten()(x) outputs = Dense(1)(x) model = Model(inputs=inputs, outputs=outputs) return model # Prepare data sequences, next_points = generate_data() dataset = TimeSeriesDataset(sequences, next_points, batch_size=32) # Create and compile the model model = create_transformer_model(sequence_length=10, d_model=32, num_heads=2, ff_dim=64, num_layers=1) model.compile(optimizer=Adam(learning_rate=0.001), loss='mse') # Train the model model.fit(dataset, epochs=9) # Predict the next point after a sequence test_seq = np.array(sequences[0]) predicted_point = model.predict(np.expand_dims(test_seq, axis=0)) print("Predicted next point:", predicted_point[0, 0])