numpy.ones((X.shape[0], 1))
numpy.negative(numpy.ones(size))
numpy.vstack((a, b))
numpy.hstack((a, b))
numpy.all(numpy.dot(X, w) * T > 0)
numpy.any(y_pred == pred)
numpy.abs(a - b)
numpy.linspace(start=-8, stop=8)
numpy.argmax(X) # returns index
numpy.max(X) # returns max value
numpy.atleast_2d(x)
x.flatten()
X[:, 1:] # excludes first column
X[1:, :] # excludes first row
numpy.random.normal(loc=mu, scale=sigma, size=(number_of_samples, len_of_vector))
numpy.linalg.lstsq(X, T, rcond=None)[0]
numpy.sign(X)
1 if numpy.dot(x, w) >= 0 else -1
numpy.random.uniform(low=-10, high=10, size=2)
beta = -w[1]/w[2]
gamma = -w[0]/w[2]
x_2 = beta * x_1 + gamma
pyplot.plot(X[T > 0, 1], X[T > 0, 2], "g.", label="positive data")
def get_updated_weights(w, eta, grad):
return w - eta * grad, w.copy()
def gradient_descent(w, eta=0.01, tolerance_value=1e-8, num_iterations=1000):
w_star = w.copy()
counter = 0
w_star, w_old = get_updated_weights(w_star, eta, gradient(w_star))
loss_difference = loss(w_star) - loss(w_old)
while counter < num_iterations and \
loss_difference > tolerance_value and \
numpy.linalg.norm(gradient(w_star)) > tolerance_value:
counter += 1
w_star, w_old = get_updated_weights(w_star, eta, gradient(w_star))
loss_difference = loss(w_star) - loss(w_old)
return w_star
def network(x, Theta):
W1, w2 = Theta
a_ = numpy.dot(W1, x)
h_ = logistic(a_)
h = numpy.insert(h_, 0, 1)
y = numpy.dot(w2, h)
return y, h
def gradient(X, Theta):
W1, w2 = Theta
dW1 = numpy.zeros((w2.shape[0], W1.shape[1])) # dimensions (K+1, D+1)
dw2 = numpy.zeros(w2.shape[0])
N = len(X)
for x, t in X:
y, h = network(x, Theta)
loss = y - t
dW1 += numpy.outer(loss * w2 * h * (1 - h), x)
dw2 += loss * h
dW1 = dW1 * 2 / N
dw2 = dw2 * 2 / N
return dW1[1:, :], dw2 # ignore W1_0 first column
def gradient_descent(X, Theta, eta):
epochs = 10000
W1, w2 = Theta
for _ in tqdm(range(epochs)):
dW1, dw2 = gradient(X, Theta)
W1 -= eta * dW1
w2 -= eta * dw2
return W1, w2
def initialize_theta(k, d, lower=-1, upper=1):
return (
numpy.random.uniform(lower, upper, (k, d + 1)),
numpy.random.uniform(lower, upper, k + 1)
)
min_val = numpy.min(X_orig[1:, :], axis=1)
max_val = numpy.max(X_orig[1:, :], axis=1)
def normalize(x, min_val, max_val):
# x = x.astype(float)
x[1:, :] = (numpy.divide(numpy.subtract(x[1:, :].T, min_val), numpy.subtract(max_val, min_val))).T
return x
X = normalize(X_orig, min_val, max_val)
permutation = numpy.random.permutation(X.shape[1])
X = X[:, permutation]
T = T[:, permutation]
for i in range(num_batches):
end_index = batch_size * (i + 1) # update end_index (16 * 1, 16 * 2, ...)
yield X[:, start_index:end_index], T[:, start_index:end_index], start_of_epoch
start_index = end_index
start_of_epoch = False
def network(X, Theta):
W1, W2 = Theta
A = numpy.dot(W1, X)
H_ = numpy.divide(1, numpy.add(1, numpy.exp(-A)))
H = numpy.insert(H_, 0, 1, axis=0) # adds bias row
Y = numpy.dot(W2, H)
return Y, H
def gradient(X, T, Y, H, Theta):
W1, W2 = Theta
B = X.shape[1]
g1 = numpy.multiply((2/B), (numpy.dot(numpy.multiply(numpy.dot(W2.T, numpy.subtract(Y, T)), numpy.multiply(H, 1 - H)), X.T)))
g2 = numpy.multiply((2/B), (numpy.dot(numpy.subtract(Y, T), H.T)))
return g1[1:,:], g2
def gradient_descent(X, T, Theta, B, eta=0.001, mu=None):
loss_values = []
W1, W2 = Theta
max_epochs = 10000
for _, (x,t,e) in tqdm(enumerate(batch(X, T, batch_size=B, epochs=max_epochs))):
Y, H = network(x, Theta)
if e:
loss_values.append(loss(Y, t))
dW1, dW2 = gradient(x, t, Y, H, Theta)
W1_p = W1.copy()
W2_p = W2.copy()
W1 -= eta * dW1
W2 -= eta * dW2
if mu:
W1 -= eta * dW1 + mu * numpy.subtract(W1, W1_p)
W2 -= eta * dW2 + mu * numpy.subtract(W2, W2_p)
return loss_values
D = X.shape[0] - 1
O = T.shape[0]
s_D = 1 / numpy.sqrt(D)
s_K = 1 / numpy.sqrt(K)
W1 = numpy.random.uniform(-s_D, s_D, (K, D + 1))
W2 = numpy.random.uniform(-s_K, s_K, (O, K + 1))
Theta = [W1, W2]
def dataset(dataset_file="winequality-red.csv", delimiter=";"):
data = []
with open(dataset_file, 'r') as f:
csv_reader = csv.reader(f, delimiter=delimiter)
next(csv_reader) # skip header line
for sample in csv_reader:
data.append([eval(x) for x in sample]) # convert str to int or float
data = torch.tensor(data)
X = data[:, :-1]
if dataset_file == "winequality-red.csv":
data[:, -1:] -= 3.0
T = data[:, -1:].squeeze().long()
else:
T = data[:, -1:].float()
return X, T
def split_training_data(X, T, train_percentage=0.8):
N = X.shape[0] # number of samples
train_size = int(train_percentage * N)
X_train = X[:train_size]
T_train = T[:train_size]
X_val = X[train_size:]
T_val = T[train_size:]
return X_train, T_train, X_val, T_val
def standardize(X_train, X_val):
mean = torch.mean(X_train, dim=0)
std = torch.std(X_train, dim=0)
get_std = lambda X: (X - mean) / std
X_train_std = get_std(X_train)
X_val_std = get_std(X_val)
return X_train_std, X_val_std
def Network(D, K, O):
return torch.nn.Sequential(
torch.nn.Linear(D, K), # input D -> hidden K
torch.nn.Tanh(), # activation
torch.nn.Linear(K, O) # hidden K -> output O
)
Network(X.shape[1], 30, len(T.unique()))
def accuracy(Z, T):
N = T.shape[0]
if Z.shape[1] == 1: # binary classification
return torch.mean((T == (Z >= 0)).float())
else:
return torch.mean((T == torch.argmax(Z, dim=1)).float()) # categorical classification
def train(X_train, T_train, X_val, T_val, loss, network, learning_rate=0.01, mu=0, epochs=1000):
optimizer = torch.optim.SGD(
params=network.parameters(),
lr=learning_rate,
momentum=mu
)
train_loss, train_acc, val_loss, val_acc = [], [], [], []
for _ in tqdm(range(epochs)):
optimizer.zero_grad()
Z = network(X_train)
J = loss(Z, T_train)
J.backward()
optimizer.step()
train_loss.append(J.item())
train_acc.append(accuracy(Z, T_train))
with torch.no_grad():
Z_val = network(X_val)
J_val = loss(Z_val, T_val)
val_loss.append(J_val.item())
val_acc.append(accuracy(Z_val, T_val))
return train_loss, train_acc, val_loss, val_acc
torch.nn.BCEWithLogitsLoss()
torch.nn.CrossEntropyLoss()
torch.nn.MSELoss()
optimizer = torch.optim.SGD(
params=network.parameters(),
lr=eta,
momentum=momentum
)
optimizer = torch.optim.Adam(
params=network.parameters(),
lr=eta
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def datasets(transform):
trainset = torchvision.datasets.FashionMNIST(root="/temp/FashionMNIST", train=True, download=True, transform=transform)
testset = torchvision.datasets.FashionMNIST(root="/temp/FashionMNIST", train=False, download=True, transform=transform)
return trainset, testset
transform = torchvision.transforms.ToTensor()
trainset, testset = datasets(transform=transform)
B = 512
trainloader = torch.utils.data.DataLoader(trainset, shuffle=True, batch_size=B)
testloader = torch.utils.data.DataLoader(testset, shuffle=False, batch_size=B//2)
Parameters
-
$I$ stands for the input dimensions -
$K$ stands for the size of the kernel -
$P$ stands for the padding -
$S$ stands for the stride, and -
$Q$ stands for the number of channels
Convolutional Output Dimension Formula:
Pooling Output Dimension Formula:
Computation
First Convolution:
First Pooling:
Second Convolution:
Second Pooling:
It's common practice to round down the result to ensure that the output feature map has consistent dimensions and that no information is lost. Since Since
def fully_connected(D, K1, K2, O):
return torch.nn.Sequential(
torch.nn.Flatten(),
torch.nn.Linear(D, K1),
torch.nn.Sigmoid(),
torch.nn.Linear(K1, K2),
torch.nn.Sigmoid(),
torch.nn.Linear(K2, O)
)
def convolutional(C, Q1, Q2, O):
return torch.nn.Sequential(
torch.nn.Conv2d(in_channels=C, out_channels=Q1, kernel_size=(7, 7), stride=1, padding=0),
torch.nn.MaxPool2d(kernel_size=(2, 2), stride=2),
torch.nn.Sigmoid(),
torch.nn.Conv2d(in_channels=Q1, out_channels=Q2, kernel_size=(5, 5), stride=1, padding=2),
torch.nn.MaxPool2d(kernel_size=(2, 2), stride=2),
torch.nn.Sigmoid(),
torch.nn.Flatten(),
torch.nn.Linear((5**2) * Q2, O)
)
Fully-connected Network
For every fully-connected layer the weights
- first fully-connected layer:
$D \cdot K_1 + K_1 = 28^2 \cdot 128 + 128 = 100'480$ - second fully-connected layer:
$K_1 \cdot K_2 + K_2 = 128 \cdot 64 + 64 = 8'256$ - third fully-connected layer:
$K_2 \cdot O + O = 64 \cdot 10 + 10 = 650$ - total:
$100'480 + 8'256 + 650 = 109'386$
Convolutional Network
For every convolutional layer the weights
- first convolutional layer:
$C \cdot Q_1 \cdot K + Q_1 = 1 \cdot 16 \cdot 7^2 + 16 = 800$ - second convolutional layer:
$Q_1 \cdot Q_2 \cdot K + Q_2 = 16 \cdot 16 \cdot 5^2 + 16 = 6'416$ - fully-connected layer:
$K \cdot Q_2 \cdot O + O = 5^2 \cdot 16 \cdot 10 + 10 = 4'010$ - total:
$800 + 6'416 + 4'010 = 11'226$
def parameter_count(network):
return sum(p.numel() for p in network.parameters())
imagenet_transform = torchvision.transforms.Compose([
torchvision.transforms.Resize(256),
torchvision.transforms.CenterCrop(224),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize((0.485,0.456,0.406), (0.229,0.224,0.225))
])
from torchvision.datasets import ImageFolder
#Path to your training and test data (If different, change the path accordingly)
train_dir = './intel-image-classification/seg_train/seg_train/'
test_dir = './intel-image-classification/seg_test/seg_test/'
trainset = ImageFolder(root=train_dir, transform=imagenet_transform)
testset = ImageFolder(root=test_dir, transform=imagenet_transform)
B = 32
trainloader = torch.utils.data.DataLoader(trainset, shuffle=True,batch_size=B)
testloader = torch.utils.data.DataLoader(testset, shuffle=False, batch_size=B)
network_1 = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.DEFAULT)
for param in network_1.parameters():
param.requires_grad = False # freeze layers of network
network_2 = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.DEFAULT)
def replace_last_layer(network, O=6):
in_features = network.fc.in_features
new_fc = torch.nn.Linear(in_features, O)
network.fc = new_fc
return network
def train_eval(model, dataloaders, epochs=1000, eta=0.01, momentum=0):
trainloader, testloader = dataloaders
optimizer = torch.optim.SGD(
params=model.parameters(),
lr=eta,
momentum=momentum
)
loss = torch.nn.CrossEntropyLoss()
device = torch.device("cuda")
model = model.to(device)
train_loss, train_acc, val_loss, val_acc, curr_pred, curr_target = [], [], [], [], [], []
for epoch in tqdm(range(epochs), desc='epoch'):
model.train()
train_loss_epoch = 0
train_number_correct_pred = 0
for x, t in tqdm(trainloader, desc='training', colour='purple', leave=False):
optimizer.zero_grad()
x, t = x.to(device), t.to(device)
z = model(x)
J = loss(z, t)
J.backward()
optimizer.step()
train_loss_epoch += J.item() * trainloader.batch_size
train_number_correct_pred += (torch.argmax(z, dim=1) == t).sum().item()
train_loss.append(train_loss_epoch / len(trainloader.dataset))
train_acc.append(train_number_correct_pred / len(trainloader.dataset))
model.eval()
with torch.no_grad():
val_loss_epoch = 0
val_number_correct_pred = 0
for x_val, t_val in tqdm(testloader, desc='testing', colour='orange', leave=False):
x_val, t_val = x_val.to(device), t_val.to(device)
z_val = model(x_val)
J_val = loss(z_val, t_val)
val_loss_epoch += J_val.item() * testloader.batch_size
val_number_correct_pred += (torch.argmax(z_val, dim=1) == t_val).sum().item()
if epoch == epochs - 1:
curr_pred.append(z_val.detach().cpu().numpy())
curr_target.append(t_val.detach().cpu().numpy())
val_loss.append(val_loss_epoch / len(testloader.dataset))
val_acc.append(val_number_correct_pred / len(testloader.dataset))
print(f"Epoch {epoch + 1}/{epochs}:")
print(f" Training Loss: {train_loss[-1]:.4f}")
print(f" Training Accuracy: {train_acc[-1]:.4f}")
print(f" Validation Loss: {val_loss[-1]:.4f}")
print(f" Validation Accuracy: {val_acc[-1]:.4f}")
pred, target = numpy.concatenate(curr_pred), numpy.concatenate(curr_target)
return pred, target
# use numpy.argmax(pred, axis=1) for average predictions
known_classes = (1, 4, 5, 8)
negative_classes = (0, 2, 3, 7)
unknown_classes = (6, 9)
O = len(known_classes)
labels_known = torch.eye(O)
label_unknown = torch.full((4,), 0.25)
labels_combined = torch.full((10, 4), 0.25)
for i, idx in enumerate(known_classes):
labels_combined[idx] = labels_known[i]
def target_vector(index):
return labels_combined[index]
class DataSet(torchvision.datasets.MNIST):
def __init__(self, purpose="train"):
super(DataSet, self).__init__(
root="/temp/MNIST",
train=(purpose == "train"),
download=True,
transform=torchvision.transforms.ToTensor(),
target_transform=target_vector
)
if purpose == "test":
valid_classes = known_classes + unknown_classes
else:
valid_classes = known_classes + negative_classes
mask = torch.tensor([sample in valid_classes for sample in self.targets])
self.data = self.data[mask]
self.targets = self.targets[mask]
batch_size = 256
train_set = DataSet(purpose="train")
train_loader = torch.utils.data.DataLoader(train_set, shuffle=True, batch_size=batch_size)
validation_set = DataSet(purpose="validation")
validation_loader = torch.utils.data.DataLoader(validation_set, shuffle=False, batch_size=batch_size)
test_set = DataSet(purpose="test")
test_loader = torch.utils.data.DataLoader(test_set, shuffle=False, batch_size=batch_size)
def split_known_unknown(batch, targets):
known = torch.amy(targets == 1, dim=1)
unknown = torch.all(targets == 1/O, dim=1)
return batch[known], targets[known], batch[unknown]
class AdaptedSoftMax(torch.autograd.Function):
@staticmethod
def forward(ctx, logits, targets):
log_probs = torch.nn.functional.log_softmax(logits, dim=1)
ctx.save_for_backward(log_probs, targets)
loss = - targets * log_probs
return torch.sum(loss)
@staticmethod
def backward(ctx, result):
log_probs, targets = ctx.saved_tensors
y = torch.exp(log_probs)
dJ_dz = result * (y - targets)
return dJ_dz, None
adapted_softmax = AdaptedSoftMax.apply
def adapted_softmax_alt(logits, targets):
loss = - torch.sum(targets * logits) + (1 / logits.size(1)) * torch.logsumexp(logits, dim=1).sum()
return loss
def confidence(logits, targets):
softmax_probs = torch.nn.functional.softmax(logits, dim=1).to(device)
batch_known, targets_known, batch_unknown = split_known_unknown(softmax_probs, targets)
conf_known = torch.sum(batch_known * targets_known)
conf_unknown = torch.sum(1 - torch.max(batch_unknown, dim=1).values + (1 / logits.size(1)))
return conf_known + conf_unknown
class Network(torch.nn.Module):
def __init__(self, Q1=32, Q2=32, K=20, O=4):
super(Network,self).__init__()
self.conv1 = torch.nn.Conv2d(in_channels=1, out_channels=Q1, kernel_size=(7, 7), stride=1, padding=0)
self.conv2 = torch.nn.Conv2d(in_channels=Q1, out_channels=Q2, kernel_size=(5, 5), stride=1, padding=2)
self.pool = torch.nn.MaxPool2d(kernel_size=(2, 2), stride=2)
self.act = torch.nn.PReLU()
self.flatten = torch.nn.Flatten()
self.fc1 = torch.nn.Linear((5**2) * Q2, K)
self.fc2 = torch.nn.Linear(K, O)
def forward(self,x):
a = self.act(self.pool(self.conv1(x)))
a = self.act(self.pool(self.conv2(a)))
deep_features = self.fc1(self.flatten(a))
logits = self.fc2(deep_features)
return logits, deep_features
def train(network, loss_function, epochs=1000, eta=0.01, momentum=0):
network = network.to(device)
optimizer = torch.optim.SGD(
params=network.parameters(),
lr=eta,
momentum=momentum
)
for epoch in tqdm(range(epochs), desc='epoch'):
train_conf = validation_conf = 0.
for x, t in tqdm(train_loader, desc='training', colour='purple', leave=False):
optimizer.zero_grad()
x, t = x.to(device), t.to(device)
logits, _ = network.forward(x)
J = loss_function(logits, t)
J.backward()
optimizer.step()
train_conf += confidence(logits, t)
with torch.no_grad():
for x_val, t_val in tqdm(validation_loader, desc='validating', colour='orange', leave=False):
x_val, t_val = x_val.to(device), t_val.to(device)
logits_val, _ = network.forward(x_val)
validation_conf += confidence(logits_val, t_val)
print(f"\rEpoch {epoch+1}; train: {train_conf/len(train_set):1.5f}, val: {validation_conf/len(validation_set):1.5f}")
return network
def plot_features(network):
known, negative, unknown = [], [], []
with torch.no_grad():
for x, t in tqdm(validation_loader, desc='validation'):
x, t = x.to(device), t.to(device)
_, deep_features = network.forward(x)
norms = torch.norm(deep_features, dim=1)
batch_known, _, batch_unkown = split_known_unknown(norms, t)
known.extend(batch_known.detach().cpu().numpy())
negative.extend(batch_unkown.detach().cpu().numpy())
for x,t in tqdm(test_loader, desc='test'):
x, t = x.to(device), t.to(device)
_, deep_features = network.forward(x)
norms = torch.norm(deep_features, dim=1)
batch_known, _, batch_unkown = split_known_unknown(norms, t)
known.extend(batch_known.detach().cpu().numpy())
unknown.extend(batch_unkown.detach().cpu().numpy())
def evaluation(network):
zeta = 0.98
correct = known = 0
false = unknown = 0
with torch.no_grad():
for x,t in tqdm(test_loader, desc='test'):
x, t = x.to(device), t.to(device)
logits, _ = network.forward(x)
softmax_probs = torch.nn.functional.softmax(logits, dim=1)
batch_known, targets_known, batch_unkown = split_known_unknown(softmax_probs, t)
correct += torch.sum(batch_known * targets_known >= zeta)
known += len(batch_known)
false += torch.sum(torch.max(batch_unkown, dim=1).values >= zeta)
unknown += len(batch_unkown)
print (f"CCR: {correct} of {known} = {correct/known*100:2.2f}%")
print (f"FPR: {false} of {unknown} = {false/unknown*100:2.2f}%")
evaluation(network_adapted)
class DatasetWithIndicator(torch.utils.data.Dataset):
def __init__(self, dataset, type_indicator):
self.dataset = dataset
self.type_indicator = type_indicator
def __len__(self):
return len(self.dataset)
def __getitem__(self, idx):
image, target = self.dataset[idx]
return image, target, self.type_indicator
class MixedDataset(torch.utils.data.Dataset):
def __init__(self, root='./data', purpose="train", transform=None, anomaly_size=2000):
self.dataset = DatasetWithIndicator(
dataset=torchvision.datasets.MNIST(root=root, train=purpose=="train", download=True, transform=transform),
type_indicator=1
)
if purpose == "anomaly_detection":
self.fashion_dataset = DatasetWithIndicator(
dataset=torchvision.datasets.FashionMNIST(root=root, train=False, download=True, transform=transform),
type_indicator=-1
)
indices = torch.randperm(len(self.fashion_dataset))[:anomaly_size] # select random samples
self.fashion_dataset = torch.utils.data.Subset(self.fashion_dataset, indices)
self.dataset = torch.utils.data.ConcatDataset([self.dataset, self.fashion_dataset])
def __len__(self):
return len(self.dataset)
def __getitem__(self, idx):
image, target, data_type = self.dataset[idx]
return image, target, data_type
transform = torchvision.transforms.ToTensor()
train_dataset = MixedDataset(purpose="train", transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataset = MixedDataset(purpose="val", transform=transform)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=100, shuffle=False)
anomaly_detection_dataset = MixedDataset(purpose="anomaly_detection", transform=transform, anomaly_size=2000)
anomaly_detection_loader = torch.utils.data.DataLoader(anomaly_detection_dataset, batch_size=1000, shuffle=True)
class Encoder(torch.nn.Module):
def __init__(self, Q1, Q2, K):
super(Encoder,self).__init__()
self.conv1 = torch.nn.Conv2d(in_channels=1, out_channels=Q1, kernel_size=(5, 5), stride=2, padding=2)
self.conv2 = torch.nn.Conv2d(in_channels=Q1, out_channels=Q2, kernel_size=(5, 5), stride=2, padding=2)
self.act = torch.nn.ReLU()
self.flatten = torch.nn.Flatten()
self.fc1 = torch.nn.Linear((7**2) * Q2, K)
def forward(self, x):
deep_feature = self.fc1(self.act(self.flatten(self.conv2(self.act(self.conv1(x))))))
return deep_feature
class Decoder(torch.nn.Module):
def __init__(self, Q1, Q2, K):
super(Decoder,self).__init__()
self.fc = torch.nn.Linear(K, (7**2) * Q2)
self.deconv1 = torch.nn.ConvTranspose2d(in_channels=Q2, out_channels=Q1, kernel_size=(5, 5), stride=2, padding=2, output_padding=1)
self.deconv2 = torch.nn.ConvTranspose2d(in_channels=Q1, out_channels=1, kernel_size=(5, 5), stride=2, padding=2, output_padding=1)
self.act = torch.nn.ReLU()
self.unflatten = torch.nn.Unflatten(1, (Q2, 7, 7))
self.sigmoid = torch.nn.Sigmoid()
def forward(self, x):
output = self.sigmoid(self.deconv2(self.act(self.deconv1(self.unflatten(self.act(self.fc(x)))))))
return output
class AutoEncoder(torch.nn.Module):
def __init__(self, Q1, Q2, K):
super(AutoEncoder,self).__init__()
self.encoder = Encoder(Q1, Q2, K)
self.decoder = Decoder(Q1, Q2, K)
def forward(self,x):
deep_feature = self.encoder(x)
reconstructed = self.decoder(deep_feature)
return reconstructed
network = AutoEncoder(32, 32, 10).to(device)
optimizer = torch.optim.Adam(
params=network.parameters(),
lr=0.0005
)
loss = torch.nn.MSELoss()
for epoch in tqdm(range(10), desc='epoch'):
train_loss = validation_loss = 0.
for x, t, _ in tqdm(train_loader, desc='training', colour='purple', leave=False):
optimizer.zero_grad()
x = x.to(device)
y = network(x)
J = loss(y, x)
J.backward()
optimizer.step()
train_loss += J.item() * train_loader.batch_size
with torch.no_grad():
for x_val, _, _ in tqdm(val_loader, desc='validating', colour='orange', leave=False):
x_val = x_val.to(device)
y_val = network(x_val)
J_val = loss(y_val, x_val)
validation_loss += J_val.item() * val_loader.batch_size
print(f"\rEpoch {epoch+1}; train: {train_loss/len(train_dataset):1.5f}, val: {validation_loss/len(val_dataset):1.5f}")
def compute_tpr_tnr(predictions, truth):
predictions = numpy.array(predictions)
truth = numpy.array(truth)
tn, fp, fn, tp = confusion_matrix(y_true=truth, y_pred=predictions).ravel()
tpr = tp / (tp + fn)
tnr = tn / (tn + fp)
return tpr, tnr
loss = torch.nn.MSELoss(reduction="none")
correct = 0.
predictions = []
truth_values = []
with torch.no_grad():
for x, t, l in tqdm(anomaly_detection_loader, desc="anomaly_detection"):
x, t, l = x.to(device), t.to(device), l.to(device)
y = network(x)
J = loss(y, x).squeeze(1).squeeze(1)
J_per_sample = J.mean(dim=[1, 2])
prediction = torch.where(J_per_sample > 0.04, torch.tensor(-1), torch.tensor(1))
predictions += prediction.detach().cpu().tolist()
truth_values += l.detach().cpu().tolist()
correct += torch.sum(prediction == l).item()
acc = correct / len(anomaly_detection_loader.dataset)
tpr, tnr = compute_tpr_tnr(predictions, truth_values)
print(f"True Positive Rate: {tpr:1.4f}")
print(f"True Negative Rate: {tnr:1.4f}")
print(f"Accuracy: {acc:1.4f}")
def get_data(datafile):
data = pandas.read_csv(datafile)
date = data["Date"].to_numpy().astype('datetime64[D]')
price = torch.tensor(data['Close'].values)
return date, price
def train_test_split(stock_data):
dates, prices = stock_data
split_date = np.datetime64('2018-01-01')
split_index = np.where(dates == split_date)[0][0]
train_dates, train_prices = dates[:split_index], prices[:split_index]
test_dates, test_prices = dates[split_index:], prices[split_index:]
train_data = (train_dates, train_prices)
test_data = (test_dates, test_prices)
return train_data, test_data
def min_max_scaler(train_data, test_data):
min_val = torch.min(train_data)
max_val = torch.max(train_data)
train_data_scaled = (train_data - min_val) / (max_val - min_val)
test_data_scaled = (test_data - min_val) / (max_val - min_val)
return train_data_scaled, test_data_scaled, min_val, max_val
def inverse_min_max_scaler(scaled_data, min_val, max_val):
original_data = scaled_data * (max_val - min_val) + min_val
return original_data
def create_sequences_targets(data: torch.Tensor, S):
X, T = [], []
for i in range(len(data) - S):
sequence = data[i:i + S]
target = data[i + S]
X.append(sequence)
T.append(target)
return torch.stack(X), torch.stack(T)
class Dataset(torch.utils.data.Dataset):
def __init__(self, data, S):
self.X, self.T = create_sequences_targets(data, S)
def __getitem__(self, index):
return self.X[index].unsqueeze(1), self.T[index].unsqueeze(0)
def __len__(self):
return len(self.X)
S = 7
gail_train_dataset = Dataset(train_gail_scaled, S)
gail_train_dataloader = torch.utils.data.DataLoader(gail_train_dataset, batch_size=256, shuffle=True)
gail_test_dataset = Dataset(test_gail_scaled, S)
gail_test_dataloader = torch.utils.data.DataLoader(gail_test_dataset, batch_size=256, shuffle=False)
ntpc_train_dataset = Dataset(train_ntpc_scaled, S)
ntpc_train_dataloader = torch.utils.data.DataLoader(ntpc_train_dataset, batch_size=256, shuffle=True)
ntpc_test_dataset = Dataset(test_ntpc_scaled, S)
ntpc_test_dataloader = torch.utils.data.DataLoader(ntpc_test_dataset, batch_size=256, shuffle=False)
class LSTMModel(torch.nn.Module):
def __init__(self, D, K, O):
super(LSTMModel,self).__init__()
self.lstm = torch.nn.LSTM(input_size=D, hidden_size=K, batch_first=True, dtype=torch.float64)
self.dropout = torch.nn.Dropout(0.2)
self.linear = torch.nn.Linear(K,O, dtype=torch.float64)
def forward(self, x):
lstm_out,_ = self.lstm(x)
lstm_out = self.dropout(lstm_out)
lstm_out = lstm_out[:, -1:]
Z = self.linear(lstm_out)
return Z
def train(network,train_dataloader,optimizer,loss,device,epochs=50):
network.to(device)
for epoch in range(epochs):
network.train()
train_loss = 0
total_sample = len(train_dataloader)
for x, t in train_dataloader:
optimizer.zero_grad()
x, t = x.to(device), t.to(device)
y = network(x)
J = loss(y, t)
J.backward()
optimizer.step()
train_loss += J.item() * train_dataloader.batch_size
print(f"\rEpoch {epoch+1}; train loss: {train_loss/total_sample:1.5f}")
def predict(network,test_dataloader):
network.eval()
predictions = []
with torch.no_grad():
for x, _ in test_dataloader:
x = x.to(device)
pred = network(x)
predictions.append(pred.squeeze())
return predictions
transform = torchvision.transforms.ToTensor()
train_set = torchvision.datasets.MNIST(root="/data/MNIST", train=True, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(train_set, shuffle=True, batch_size=32)
validation_set = torchvision.datasets.MNIST(root="/data/MNIST", train=False, download=True, transform=transform)
validation_loader = torch.utils.data.DataLoader(validation_set, shuffle=False, batch_size=100)
class Network(torch.nn.Module):
def __init__(self, Q1, Q2, K, O):
super(Network,self).__init__()
self.build = torch.nn.Sequential(
torch.nn.Conv2d(in_channels=1, out_channels=Q1, kernel_size=(7, 7), stride=1, padding=0),
torch.nn.MaxPool2d(kernel_size=(2, 2), stride=2),
torch.nn.ReLU(),
torch.nn.Conv2d(in_channels=Q1, out_channels=Q2, kernel_size=(5, 5), stride=1, padding=2),
torch.nn.MaxPool2d(kernel_size=(2, 2), stride=2),
torch.nn.ReLU(),
torch.nn.Flatten(),
torch.nn.Linear((5**2) * Q2, K),
torch.nn.Linear(K, O)
)
def forward(self,x):
return self.build(x)
def FGS(x, t, network, loss, alpha=0.3):
x, t = x.to(device), t.to(device)
x.requires_grad_()
z = network(x)
J = loss(z, t)
J.backward()
gradient = x.grad
adversarial_sample = x + alpha * torch.sign(gradient)
return torch.clamp(adversarial_sample, 0, 1)
def FGV(x, t, network, loss, alpha=0.6):
x, t = x.to(device), t.to(device)
x.requires_grad_()
z = network(x)
J = loss(z, t)
network.zero_grad()
J.backward()
gradient = x.grad
adversarial_sample = x + alpha * (gradient / torch.amax(torch.abs(gradient), dim=[1, 2, 3], keepdim=True))
return torch.clamp(adversarial_sample, 0, 1)
def noise(x, alpha=0.3):
x = x.to(device)
noise = torch.randint(x.size(), device=device).float() * 2 - 1
noisy_sample = x + alpha * noise
return torch.clamp(noisy_sample, 0, 1)
def training_loop(network, loss, optimizer, add_additional_samples=None, alpha=0.3):
network = network.to(device)
description = f'training {"default" if add_additional_samples is None else add_additional_samples}'
for x, t in tqdm(train_loader, desc=description, colour='purple', leave=False):
optimizer.zero_grad()
x, t = x.to(device), t.to(device)
z = network(x)
J = loss(z, t)
J.backward()
optimizer.step()
if add_additional_samples is not None:
if add_additional_samples == "FGS":
x_hat = FGS(x, t, network, loss, alpha)
else:
x_hat = noise(x, alpha)
z_hat = network(x_hat)
J = loss(z_hat, t)
J.backward()
optimizer.step()
def validation_loop(network, loss, add_additional_samples=None, alpha_fgs=0.3, alpha_fgv=0.6):
network = network.to(device)
correct_clean_count, correct_fgs_count, correct_fgv_count = 0, 0, 0
description = f'validating {"default" if add_additional_samples is None else add_additional_samples}'
for x, t in tqdm(validation_loader, desc=description, colour='orange', leave=False):
with torch.no_grad():
x, t = x.to(device), t.to(device)
z = network(x)
preds = torch.argmax(z, dim=1)
correct_clean_count += (preds == t).sum().item()
correct_indices = (preds == t)
x_correct = x[correct_indices]
t_correct = t[correct_indices]
x_attack_fgs = FGS(x_correct, t_correct, network, loss, alpha_fgs)
x_attack_fgv = FGV(x_correct, t_correct, network, loss, alpha_fgv)
with torch.no_grad():
z_attack_fgs = network(x_attack_fgs)
z_attack_fgv = network(x_attack_fgv)
correct_fgs_count += (torch.argmax(z_attack_fgs, dim=1) == t_correct).sum().item()
correct_fgv_count += (torch.argmax(z_attack_fgv, dim=1) == t_correct).sum().item()
clean_accuracy = correct_clean_count / len(validation_loader.dataset)
fgs_accuracy = (correct_fgs_count / correct_clean_count) if correct_clean_count > 0 else 0
fgv_accuracy = (correct_fgv_count / correct_clean_count) if correct_clean_count > 0 else 0
return clean_accuracy, fgs_accuracy, fgv_accuracy
number_networks = 3
networks = [Network(32, 64, 10, 10) for _ in range(number_networks)]
optimizers = [torch.optim.SGD(
params=networks[i].parameters(),
lr=0.005,
momentum=0.8
) for i in range(number_networks)]
loss = torch.nn.CrossEntropyLoss()
alpha = 0.3
clean_accuracies = [[] for _ in range(number_networks)]
fgs_accuracies = [[] for _ in range(number_networks)]
fgv_accuracies = [[] for _ in range(number_networks)]
data_extensions = [None, "noise", "FGS"]
for epoch in tqdm(range(10), desc='epoch'):
for idx, (network, optimizer, data_extension) in enumerate(zip(networks, optimizers, data_extensions)):
training_loop(network, loss, optimizer, data_extension, alpha=alpha)
clean, fgs, fgv = validation_loop(network, loss, data_extension, alpha_fgs=alpha, alpha_fgv=alpha*2)
clean_accuracies[idx].append(clean)
fgs_accuracies[idx].append(fgs)
fgv_accuracies[idx].append(fgv)
imagenet_transform = torchvision.transforms.Compose([
torchvision.transforms.Resize((224, 224)),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize((0.485,0.456,0.406), (0.229,0.224,0.225))
])
model_id = "openai/clip-vit-base-patch32"
processor = CLIPProcessor.from_pretrained(model_id)
model = CLIPModel.from_pretrained(model_id)
label_tokens = processor(
text=text_prompts,
padding=True,
images=None,
return_tensors='pt'
).to(device)
model = model.to(device
label_emb = model.get_text_features(**label_tokens)
label_emb = label_emb.detach().cpu()
label_emb = label_emb / torch.norm(label_emb, dim=1, keepdim=True)
for imgs, labels in tqdm(testloader, desc='testing'):
imgs = imgs.to(device)
y_true.extend(labels.detach().cpu())
img_embd = model.get_image_features(imgs)
img_embd = img_embd.detach().cpu()
scores = torch.matmul(img_embd, label_emb.T)
pred = torch.argmax(scores, dim=1)
y_pred.extend(pred)
Checkerboard Targets
def target(x):
targets = []
for i in range(len(x)):
if (torch.floor(x[i][0]) % 2 and torch.floor(x[i][1]) % 2) or (torch.ceil(x[i][0]) % 2 and torch.ceil(x[i][1]) % 2) :
targets.append(1)
else:
targets.append(-1)
return torch.tensor(targets)
def batch(B, device = "cpu"):
X = torch.distributions.Uniform(-2,2).sample([B,2]).to(device)
T = target(X)
return X,T
$\mathrm{tent}(a) = \begin{cases}1-a & \text{for } 0\leq a\leq 2 \ 1+a & \text{for } -2 \leq a < 0\ -1 & \text{elsewhere}\end{cases}$
def tent(a):
cond1 = torch.logical_and(0 <= a, a <= 2)
cond2 = torch.logical_and(-2 <= a, a < 0)
cond3 = torch.logical_or(a < -2, a > 2)
h = cond1 * (1-a) + cond2 * (1+a) + cond3 * -1
return h
class Tent(torch.autograd.Function):
@staticmethod
def forward(ctx, values, targets):
a = tent(values)
ctx.save_for_backward(values, targets)
return a
@staticmethod
def backward(ctx, dJ_dH):
values, targets = ctx.saved_tensors
dH_dA = torch.zeros_like(values)
cond1 = torch.logical_and(0 <= values, values <= 2)
cond2 = torch.logical_and(-2 <= values, values < 0)
cond3 = torch.logical_or(values < -2, values > 2)
dH_dA[cond1] = -1
dH_dA[cond2] = 1
dH_dA[cond3] = 0
dJ_dA = dJ_dH * dH_dA
return dJ_dA, None
class ResidualBlock(torch.nn.Module):
def __init__(self, in_channels, out_channels, hidden):
super().__init__()
self.conv1 = torch.nn.Conv2d(in_channels, hidden, kernel_size=3, padding=1)
self.conv2 = torch.nn.Conv2d(hidden, out_channels, kernel_size=3, padding=1)
self.conv_adjustx = torch.nn.Conv2d(in_channels, out_channels, kernel_size=1)
self.relu = torch.nn.ReLU()
def forward(self, x):
y = self.relu(self.conv1(x))
y = self.conv2(y)
return self.relu(y + self.conv_adjustx(x))
network = torch.nn.Sequential(
ResidualBlock(1, 4, 1),
torch.nn.MaxPool2d(2),
ResidualBlock(4, 8, 1),
torch.nn.MaxPool2d(2),
ResidualBlock(8, 8, 1),
torch.nn.Flatten(),
torch.nn.Linear(7*7*8, 10)
).to(device)
network = torchvision.models.resnet18(pretrained=True)
network.eval()
transform_1 = torchvision.transforms.Compose([
torchvision.transforms.ToTensor(),
torchvision.transforms.Resize(256),
torchvision.transforms.CenterCrop(224),
])
transform_2 = torchvision.transforms.Compose([
torchvision.transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
X = transform_1(pil_image)
def predict(sample):
output = network(transform_2(sample).unsqueeze(0))
probs = torch.nn.functional.softmax(output, dim=1)
return probs.argmax(), probs.max()
X.requires_grad_(True)
original_class, original_prob = predict(X)
print(f"Original sample: class={original_class}, probability={original_prob}")
true_class = 954 #banana
J = torch.nn.CrossEntropyLoss()(network(transform_2(X).unsqueeze(0)),torch.tensor([original_class]))
J.backward()
X_check = X + 0.5 * torch.div(X.grad, torch.max(X.grad))
adversarial_class, adversarial_prob = predict(X_check)