Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions art/attacks/poisoning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@
from art.attacks.poisoning.hidden_trigger_backdoor.hidden_trigger_backdoor_pytorch import HiddenTriggerBackdoorPyTorch
from art.attacks.poisoning.hidden_trigger_backdoor.hidden_trigger_backdoor_keras import HiddenTriggerBackdoorKeras
from art.attacks.poisoning.sleeper_agent_attack import SleeperAgentAttack
from art.attacks.poisoning.dynamic_backdoor_gan import DynamicBackdoorGAN

73 changes: 73 additions & 0 deletions art/attacks/poisoning/dynamic_backdoor_gan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
"""dynamic_backdoor_gan

Automatically generated by Colab.

Original file is located at
https://colab.research.google.com/drive/19W9gZ2gUxkgu6rr5qAT1Arf7iauCj2QT
"""

#Trigger Generator:A small CNN that learns to generate input-specific triggers
class TriggerGenerator(nn.Module):
def __init__(self, input_channels=3):
super().__init__()
self.net = nn.Sequential(
nn.Conv2d(input_channels, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(32, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(32, input_channels, kernel_size=3, padding=1),
nn.Tanh()
)

def forward(self, x):
return self.net(x)
# Custom Poisoning Attack: DynamicBackdoorGAN-This class defines how to poison data using the GAN trigger generator
class DynamicBackdoorGAN(PoisoningAttackBackdoor):
def __init__(self, generator, target_label, backdoor_rate, classifier, epsilon=0.5):
super().__init__(perturbation=lambda x: x)
self.classifier = classifier
self.generator = generator.to(classifier.device)
self.target_label = target_label
self.backdoor_rate = backdoor_rate
self.epsilon = epsilon
# Add trigger to a given image batch
def apply_trigger(self, images):
self.generator.eval()
with torch.no_grad():
images = nn.functional.interpolate(images, size=(32, 32), mode='bilinear') # Resize images to ensure uniform dimension
triggers = self.generator(images.to(self.classifier.device)) #Generate dynamic, input-specific triggers using the trained TriggerGenerator
poisoned = (images.to(self.classifier.device) + self.epsilon * triggers).clamp(0, 1) # Clamp the pixel values to ensure they stay in the valid [0, 1] range.
return poisoned
# Poison the training data by injecting dynamic triggers and changing labels
def poison(self, x, y):
# Convert raw image data (x) to torch tensors (float), and convert one-hot labels (y) to class indices-required by ART
x_tensor = torch.tensor(x).float()
y_tensor = torch.tensor(np.argmax(y, axis=1))
# Calculate total number of samples and how many should be poisoned(posion ratio=backdoor_rate)
batch_size = x_tensor.shape[0]
n_poison = int(self.backdoor_rate * batch_size)
# Apply the learned trigger to the first 'n_poison' samples
poisoned = self.apply_trigger(x_tensor[:n_poison])
# The remaining samples remain clean
clean = x_tensor[n_poison:].to(self.classifier.device)
# Combine poisoned and clean samples into a single batch
poisoned_images = torch.cat([poisoned, clean], dim=0).cpu().numpy()
# Modify the labels of poisoned samples to the attacker's target class
new_labels = y_tensor.clone()
new_labels[:n_poison] = self.target_label # Set the poisoned labels to the desired misclassification
# Convert all labels back to one-hot encoding (required by ART classifiers)
new_labels = to_categorical(new_labels.numpy(), nb_classes=self.classifier.nb_classes)
return poisoned_images.astype(np.float32), new_labels.astype(np.float32)
#Evaluate the attack's success on test data
def evaluate(self, x_clean, y_clean):
x_tensor = torch.tensor(x_clean).float()
poisoned_test = self.apply_trigger(x_tensor).cpu().numpy().astype(np.float32)# Apply the trigger to every test image to create a poisoned test set

preds = self.classifier.predict(poisoned_test)
true_target = np.full((len(preds),), self.target_label)
pred_labels = np.argmax(preds, axis=1)

success = np.sum(pred_labels == true_target)
asr = 100.0 * success / len(pred_labels)
return asr
212 changes: 212 additions & 0 deletions examples/dynamicbackdoorgan_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# -*- coding: utf-8 -*-
"""DynamicBackdoorGAN_Demo.ipynb

Automatically generated by Colab.

Original file is located at
https://colab.research.google.com/drive/1Uxw5hHxnvtDh2-dC5cHgSfBMNMl05lpD
"""

# ✅ Imports
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Subset
from torchvision import datasets, transforms, models
from art.estimators.classification import PyTorchClassifier
from art.utils import to_categorical
from art.attacks.poisoning import PoisoningAttackBackdoor


# ✅ User Config
config = {
"dataset": "MNIST", # CIFAR10, CIFAR100, MNIST
"model_name": "densenet121", # resnet18, resnet50, mobilenetv2, densenet121
"poison_ratio": 0.1,
"target_label": 0, # Target label to which poisoned samples are mapped
"epochs": 30,
"batch_size": 128,
"epsilon": 0.5,
"train_subset": None,
"test_subset": None
}


# ✅ Trigger Generator
class TriggerGenerator(nn.Module):
def __init__(self, input_channels=3):
super().__init__()
self.net = nn.Sequential(
nn.Conv2d(input_channels, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(32, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(32, input_channels, kernel_size=3, padding=1),
nn.Tanh()
)

def forward(self, x):
return self.net(x)


# ✅ ART-Compatible Poisoning Attack
class DynamicBackdoorGAN(PoisoningAttackBackdoor):
def __init__(self, generator, target_label, backdoor_rate, classifier, epsilon=0.5):
super().__init__(perturbation=lambda x: x)
self.classifier = classifier
self.generator = generator.to(classifier.device)
self.target_label = target_label
self.backdoor_rate = backdoor_rate
self.epsilon = epsilon

def apply_trigger(self, images):
self.generator.eval()
with torch.no_grad():
images = nn.functional.interpolate(images, size=(32, 32), mode='bilinear')
triggers = self.generator(images.to(self.classifier.device))
poisoned = (images.to(self.classifier.device) + self.epsilon * triggers).clamp(0, 1)
return poisoned

def poison(self, x, y):
x_tensor = torch.tensor(x).float()
y_tensor = torch.tensor(np.argmax(y, axis=1))

batch_size = x_tensor.shape[0]
n_poison = int(self.backdoor_rate * batch_size)

poisoned = self.apply_trigger(x_tensor[:n_poison])
clean = x_tensor[n_poison:].to(self.classifier.device)

poisoned_images = torch.cat([poisoned, clean], dim=0).cpu().numpy()

new_labels = y_tensor.clone()
new_labels[:n_poison] = self.target_label

new_labels = to_categorical(new_labels.numpy(), nb_classes=self.classifier.nb_classes)
return poisoned_images.astype(np.float32), new_labels.astype(np.float32)

def evaluate(self, x_clean, y_clean):
x_tensor = torch.tensor(x_clean).float()
poisoned_test = self.apply_trigger(x_tensor).cpu().numpy().astype(np.float32)

preds = self.classifier.predict(poisoned_test)
true_target = np.full((len(preds),), self.target_label)
pred_labels = np.argmax(preds, axis=1)

success = np.sum(pred_labels == true_target)
asr = 100.0 * success / len(pred_labels)
return asr


# ✅ Utility: Load Data

def get_data(dataset="CIFAR10", train_subset=None, test_subset=None):
if dataset in ["CIFAR10", "CIFAR100"]:
transform = transforms.Compose([transforms.Resize((32, 32)), transforms.ToTensor()])
elif dataset == "MNIST":
transform = transforms.Compose([
transforms.Grayscale(num_output_channels=3),
transforms.Resize((32, 32)),
transforms.ToTensor()
])
else:
raise ValueError("Unsupported dataset")

if dataset == "CIFAR10":
dataset_cls = datasets.CIFAR10
num_classes = 10
elif dataset == "CIFAR100":
dataset_cls = datasets.CIFAR100
num_classes = 100
elif dataset == "MNIST":
dataset_cls = datasets.MNIST
num_classes = 10

train_set = dataset_cls(root="./data", train=True, download=True, transform=transform)
test_set = dataset_cls(root="./data", train=False, download=True, transform=transform)

if train_subset is not None:
train_set = Subset(train_set, range(train_subset))
if test_subset is not None:
test_set = Subset(test_set, range(test_subset))

x_train = torch.stack([x for x, _ in train_set]).numpy()
y_train = to_categorical([y for _, y in train_set], nb_classes=num_classes)

x_test = torch.stack([x for x, _ in test_set]).numpy()
y_test = to_categorical([y for _, y in test_set], nb_classes=num_classes)

return x_train, y_train, x_test, y_test, num_classes


# ✅ Utility: Get ART Classifier
def get_classifier(config):
model_name = config["model_name"]
nb_classes = config["nb_classes"]
input_shape = config["input_shape"]
lr = config.get("learning_rate", 0.001)

if model_name == "resnet18":
model = models.resnet18(num_classes=nb_classes)
elif model_name == "resnet50":
model = models.resnet50(num_classes=nb_classes)
elif model_name == "mobilenetv2":
model = models.mobilenet_v2(num_classes=nb_classes)
elif model_name == "densenet121":
model = models.densenet121(num_classes=nb_classes)
else:
raise ValueError(f"Unsupported model: {model_name}")

loss = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

classifier = PyTorchClassifier(
model=model,
loss=loss,
optimizer=optimizer,
input_shape=input_shape,
nb_classes=nb_classes,
clip_values=(0.0, 1.0),
device_type="gpu" if torch.cuda.is_available() else "cpu"
)
return classifier


# ✅ Full Experiment
def run_dynamic_backdoor_experiment(config):
x_train, y_train, x_test, y_test, num_classes = get_data(
dataset=config["dataset"],
train_subset=config.get("train_subset"),
test_subset=config.get("test_subset")
)
config["nb_classes"] = num_classes
config["input_shape"] = x_train.shape[1:]

classifier = get_classifier(config)

# Clean training
classifier.fit(x_train, y_train, nb_epochs=config["epochs"], batch_size=config["batch_size"])
clean_acc = np.mean(np.argmax(classifier.predict(x_test), axis=1) == np.argmax(y_test, axis=1))
print(f"✅ Clean Accuracy: {clean_acc * 100:.2f}%")

# Poison training
generator = TriggerGenerator()
attack = DynamicBackdoorGAN(
generator,
config["target_label"],
config["poison_ratio"],
classifier,
epsilon=config["epsilon"]
)
x_poison, y_poison = attack.poison(x_train, y_train)

classifier.fit(x_poison, y_poison, nb_epochs=config["epochs"], batch_size=config["batch_size"])
poisoned_acc = np.mean(np.argmax(classifier.predict(x_test), axis=1) == np.argmax(y_test, axis=1))
print(f"🎯 Poisoned Accuracy: {poisoned_acc * 100:.2f}%")

asr = attack.evaluate(x_test, y_test)
print(f"💥 Attack Success Rate (ASR): {asr:.2f}%")


# ✅ Run
run_dynamic_backdoor_experiment(config)
Loading