-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbattery_transformer.py
356 lines (285 loc) · 13.8 KB
/
battery_transformer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
import tensorflow as tf
from tensorflow.keras.layers import MultiHeadAttention, Dense, Input, Dropout, BatchNormalization
import tensorflow.keras.backend as K
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from global_dataclass import G
from transformer_helper_dc import *
#Dense Layers
def FullyConnected():
return tf.keras.Sequential([
tf.keras.layers.Dense(G.dense_dim, activation='relu',
kernel_initializer = tf.keras.initializers.HeNormal(),
bias_initializer = tf.keras.initializers.RandomUniform(minval=0.005, maxval = 0.08)
),
# (G.batch_size, G.window_size, G.dense_dim)
tf.keras.layers.BatchNormalization(momentum = 0.98, epsilon=5e-4),
tf.keras.layers.Dense(G.dense_dim, activation='relu',
kernel_initializer = tf.keras.initializers.HeNormal(),
bias_initializer = tf.keras.initializers.RandomUniform(minval=0.001, maxval = 0.01)
),
# (G.batch_size, G.window_size, G.dense_dim)
tf.keras.layers.BatchNormalization(momentum = 0.95, epsilon=5e-4)
])
#Encoder Layer
class EncoderLayer(tf.keras.layers.Layer):
"""
The encoder layer is composed by a multi-head self-attention mechanism,
followed by a simple, positionwise fully connected feed-forward network.
This archirecture includes a residual connection around each of the two
sub-layers, followed by batch normalization.
"""
def __init__(self,
num_heads,
num_features,
dense_dim,
dropout_rate,
batchnorm_eps):
super(EncoderLayer, self).__init__()
self.mha = MultiHeadAttention(
num_heads = num_heads,
key_dim = dense_dim,
dropout = dropout_rate,
kernel_initializer = tf.keras.initializers.HeNormal(),
kernel_regularizer = tf.keras.regularizers.L2(1e-4),
bias_initializer = tf.keras.initializers.RandomUniform(minval=0.001, maxval = 0.01)
)
#feed-forward-network
self.ffn = FullyConnected()
self.batchnorm1 = BatchNormalization(momentum = 0.95, epsilon=batchnorm_eps)
self.batchnorm2 = BatchNormalization(momentum = 0.95, epsilon=batchnorm_eps)
self.dropout_ffn = Dropout(dropout_rate)
def call(self, x, training):
"""
Forward pass for the Encoder Layer
Arguments:
x -- Tensor of shape (G.batch_size, G.window_size, G.num_features)
training -- Boolean, set to true to activate
the training mode for dropout layers
Returns:
encoder_layer_out -- Tensor of shape (G.batch_size, G.window_size, G.num_features)
"""
# Dropout is added by Keras automatically if the dropout parameter is non-zero during training
attn_output = self.mha(query = x,
value = x) # Self attention
out1 = self.batchnorm1(tf.add(x, attn_output)) # (G.batch_size, G.src_len, G.dense_dim)
ffn_output = self.ffn(out1)
ffn_output = self.dropout_ffn(ffn_output) # (G.batch_size, G.src_len, G.dense_dim)
encoder_layer_out = self.batchnorm2(tf.add(ffn_output, out1))
# (G.batch_size, G.src_len, G.dense_dim)
return encoder_layer_out
#Encoder itself
class Encoder(tf.keras.layers.Layer):
"""
The entire Encoder starts by passing the input to an embedding layer
and using positional encoding to then pass the output through a stack of
encoder Layers
"""
def __init__(self,
num_layers = G.num_layers,
num_heads = G.num_heads,
num_features = G.num_features,
dense_dim = G.dense_dim,
maximum_position_encoding = G.src_len,
dropout_rate=0.15,
batchnorm_eps=1e-4):
super(Encoder, self).__init__()
self.num_layers = num_layers
#linear input layer
self.lin_input = tf.keras.layers.Dense(dense_dim, activation="relu")
self.pos_encoding = positional_encoding(maximum_position_encoding,
dense_dim)
self.enc_layers = [EncoderLayer(num_heads = num_heads,
num_features = num_features,
dense_dim = dense_dim,
dropout_rate = dropout_rate,
batchnorm_eps = batchnorm_eps)
for _ in range(self.num_layers)]
def call(self, x, training):
"""
Forward pass for the Encoder
Arguments:
x -- Tensor of shape (G.batch_size, G.window_size, G.num_features)
training -- Boolean, set to true to activate
the training mode for dropout layers
mask -- Boolean mask to ensure that the padding is not
treated as part of the input
Returns:
Tensor of shape (G.batch_size, G.window_size, G.dense_dim)
"""
x = self.lin_input(x)
seq_len = tf.shape(x)[1]
x += self.pos_encoding[:, :seq_len, :]
for i in range(self.num_layers):
x = self.enc_layers[i](x, training)
return x # (G.batch_size, G.src_len, G.dense_dim)
#Decoder Layer
class DecoderLayer(tf.keras.layers.Layer):
"""
The decoder layer is composed by two multi-head attention blocks,
one that takes the new input and uses self-attention, and the other
one that combines it with the output of the encoder, followed by a
fully connected block.
"""
def __init__(self,
num_heads,
num_features,
dense_dim,
dropout_rate,
batchnorm_eps):
super(DecoderLayer, self).__init__()
self.mha1 = MultiHeadAttention(
num_heads = num_heads,
key_dim = dense_dim,
dropout = dropout_rate,
kernel_initializer = tf.keras.initializers.HeNormal(),
kernel_regularizer = tf.keras.regularizers.L2(1e-4),
bias_initializer = tf.keras.initializers.RandomUniform(minval=0.001, maxval = 0.01)
)
self.mha2 = MultiHeadAttention(
num_heads = num_heads,
key_dim = dense_dim,
dropout = dropout_rate,
kernel_initializer = tf.keras.initializers.HeNormal(),
kernel_regularizer = tf.keras.regularizers.L2(1e-4),
bias_initializer = tf.keras.initializers.RandomUniform(minval=0.001, maxval = 0.01)
)
self.ffn = FullyConnected()
self.batchnorm1 = BatchNormalization(momentum = 0.95, epsilon=batchnorm_eps)
self.batchnorm2 = BatchNormalization(momentum = 0.95, epsilon=batchnorm_eps)
self.batchnorm3 = BatchNormalization(momentum = 0.95, epsilon=batchnorm_eps)
self.dropout_ffn = Dropout(dropout_rate)
def call(self, y, enc_output, dec_ahead_mask, enc_memory_mask, training):
"""
Forward pass for the Decoder Layer
Arguments:
y -- Tensor of shape (G.batch_size, G.tgt_len, 1) #the soc values for the batches
enc_output -- Tensor of shape(G.batch_size, G.num_features)
training -- Boolean, set to true to activate
the training mode for dropout and batchnorm layers
Returns:
out3 -- Tensor of shape (G.batch_size, G.tgt_len, 1)
"""
# BLOCK 1
# Dropout will be applied during training only
mult_attn_out1 = self.mha1(query = y,
value = y,
attention_mask = dec_ahead_mask,
return_attention_scores=False)
# (G.batch_size, G.tgt_len, G.dense_dim)
Q1 = self.batchnorm1(tf.add(y,mult_attn_out1))
# BLOCK 2
# calculate self-attention using the Q from the first block and K and V from the encoder output.
# Dropout will be applied during training
mult_attn_out2 = self.mha2(query = Q1,
value = enc_output,
key = enc_output,
attention_mask = enc_memory_mask,
return_attention_scores=False)
mult_attn_out2 = self.batchnorm2( tf.add(mult_attn_out1, mult_attn_out2) )
#BLOCK 3
# pass the output of the second block through a ffn
ffn_output = self.ffn(mult_attn_out2)
# apply a dropout layer to the ffn output
ffn_output = self.dropout_ffn(ffn_output)
out3 = self.batchnorm3( tf.add(ffn_output, mult_attn_out2) )
return out3
#Decoder itself
class Decoder(tf.keras.layers.Layer):
"""
"""
def __init__(self,
num_layers = G.num_layers,
num_heads = G.num_heads,
num_features = G.num_features,
dense_dim = G.dense_dim,
target_size = G.num_features,
maximum_position_encoding = G.tgt_len,
dropout_rate=0.15,
batchnorm_eps=1e-5):
super(Decoder, self).__init__()
self.num_layers = num_layers
self.pos_encoding = positional_encoding(maximum_position_encoding,
dense_dim)
#linear input layer
self.lin_input = tf.keras.layers.Dense(dense_dim, activation="relu")
self.dec_layers = [DecoderLayer(num_heads,
num_features,
dense_dim,
dropout_rate,
batchnorm_eps
)
for _ in range(self.num_layers)]
#look_ahead_masks for decoder:
self.dec_ahead_mask = create_look_ahead_mask(G.tgt_len, G.tgt_len)
self.enc_memory_mask = create_look_ahead_mask(G.tgt_len, G.src_len)
def call(self, y, enc_output, training):
"""
Forward pass for the Decoder
Arguments:
y -- Tensor of shape (G.batch_size, G.tgt_len, G.dense_dim) #the final SOC values in the batches
enc_output -- Tensor of shape(G.batch_size, G.src_len, G.dense_dim)
training -- Boolean, set to true to activate
the training mode for dropout layers
Returns:
y -- Tensor of shape (G.batch_size, G.tgt_len, 1)
"""
y = self.lin_input(y) #maps to dense_dim, the dimension of all the sublayer outputs.
seq_len = tf.shape(y)[1]
y += self.pos_encoding[:, :seq_len, :]
# use a for loop to pass y through a stack of decoder layers and update attention_weights
for i in range(self.num_layers):
# pass y and the encoder output through a stack of decoder layers and save attention weights
y = self.dec_layers[i](y,
enc_output,
self.dec_ahead_mask,
self.enc_memory_mask,
training)
return y
#Transformer
class Transformer(tf.keras.Model):
"""
Complete transformer with an Encoder and a Decoder
"""
def __init__(self,
num_layers = G.num_layers,
num_heads = G.num_heads,
dense_dim = G.dense_dim,
src_len = G.src_len,
tgt_len = G.tgt_len,
max_positional_encoding_input = G.src_len,
max_positional_encoding_target = G.tgt_len):
super(Transformer, self).__init__()
self.tgt_len = tgt_len
self.src_len = src_len
self.encoder = Encoder()
self.decoder = Decoder()
self.linear_map = tf.keras.Sequential([
tf.keras.layers.Dense(
dense_dim, activation = "relu",
kernel_initializer = tf.keras.initializers.HeNormal(),
bias_initializer = tf.keras.initializers.RandomUniform(minval=0.001, maxval = 0.02)
),
tf.keras.layers.BatchNormalization(momentum = 0.97, epsilon=5e-4),
tf.keras.layers.Dense(
1, activation = "sigmoid",
bias_initializer = tf.keras.initializers.RandomUniform(minval=0.001, maxval = 0.005)
)
])
def call(self, x, training):
"""
Forward pass for the entire Transformer
Arguments:
x -- Tensor of shape (G.batch_size, G.window_size, G.num_features)
An array of the windowed voltage, current and soc data
training -- Boolean, set to true to activate
the training mode for dropout and batchnorm layers
Returns:
final_output -- SOC prediction at time t
"""
enc_input = x[:, :self.src_len, :]
dec_input = x[:, -self.tgt_len:, -1:] #only want the SOC thats why -1 is there
enc_output = self.encoder(enc_input, training) # (G.batch_size, G.src_len, G.num_features)
dec_output = self.decoder(dec_input, enc_output, training)
# (G.batch_size, G.tgt_len, 1)
final_output = self.linear_map(dec_output) # (G.batch_size, G.tgt_len, 1)
return final_output