import os
import torch
import torch.nn as nn
import torchvision.transforms as T
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import kaggle
import timm
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.image import show_cam_on_image
import pandas as pd
import cv2
from captum.attr import IntegratedGradients, Occlusion, FeaturePermutation
from scipy.ndimage import gaussian_filter
# from captum.attr._utils.masking import_mask # Corrected import
= torch.device('cuda:1' if torch.cuda.is_available() else 'cpu') device
Attribution–the attribution toolkit
Notebook for attribution testing
# Download dataset from Kaggle only if it does not exist
= './data/barhill'
dataset_dir if not os.path.exists(dataset_dir):
'mshahoyi/barhills-processed', path='./data', unzip=True)
kaggle.api.dataset_download_files(print("Dataset downloaded and unzipped.")
else:
print("Dataset already exists. Skipping download.")
# Load the pretrained MegaDescriptor model
= 'hf-hub:BVRA/MegaDescriptor-T-224'
model_name = timm.create_model(model_name, num_classes=0, pretrained=True).to(device)
model eval() model.
# Define image transformations
= T.Compose([
transform 224, 224)),
T.Resize((
T.ToTensor(),=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
T.Normalize(mean ])
# Load metadata
= './data/barhill/gallery_and_probes.csv'
metadata_path = pd.read_csv(metadata_path) df
# Select two random newts
= df['newt_id'].unique()
unique_newts 42)
np.random.seed(= np.random.choice(unique_newts, 2, replace=False)
random_newts # random_newts[0] and random_newts[1] are guaranteed to be different newt IDs
# (assuming there are at least 2 unique newts in your dataset).
print(f"Selected newts: {random_newts}")
# Get sample images for each newt
= df[df['newt_id'] == random_newts[0]]['image_path'].values[:2]
newt1_images = df[df['newt_id'] == random_newts[1]]['image_path'].values[:2]
newt2_images
print(f"Newt 1 images: {newt1_images}")
print(f"Newt 2 images: {newt2_images}")
# Plot the selected images for visual confirmation
= plt.subplots(2, 2, figsize=(10, 10))
fig, axes "Selected Images for Analysis", fontsize=16)
fig.suptitle(
# Load and preprocess images
def get_full_image_path(rel_path):
return os.path.join('./data', rel_path)
# Newt 1, Image 1
= Image.open(get_full_image_path(newt1_images[0])).convert('RGB')
img_n1_i1 0, 0].imshow(img_n1_i1)
axes[0, 0].set_title(f"Newt {random_newts[0]} - Image 1\n{newt1_images[0]}")
axes[0, 0].axis('off')
axes[
# Newt 1, Image 2
if len(newt1_images) > 1:
= Image.open(get_full_image_path(newt1_images[1])).convert('RGB')
img_n1_i2 0, 1].imshow(img_n1_i2)
axes[0, 1].set_title(f"Newt {random_newts[0]} - Image 2\n{newt1_images[1]}")
axes[0, 1].axis('off')
axes[else:
0, 1].axis('off') # Hide subplot if no second image
axes[0, 1].text(0.5, 0.5, 'No second image', ha='center', va='center')
axes[
# Newt 2, Image 1
= Image.open(get_full_image_path(newt2_images[0])).convert('RGB')
img_n2_i1 1, 0].imshow(img_n2_i1)
axes[1, 0].set_title(f"Newt {random_newts[1]} - Image 1\n{newt2_images[0]}")
axes[1, 0].axis('off')
axes[
# Newt 2, Image 2
if len(newt2_images) > 1:
= Image.open(get_full_image_path(newt2_images[1])).convert('RGB')
img_n2_i2 1, 1].imshow(img_n2_i2)
axes[1, 1].set_title(f"Newt {random_newts[1]} - Image 2\n{newt2_images[1]}")
axes[1, 1].axis('off')
axes[else:
1, 1].axis('off') # Hide subplot if no second image
axes[1, 1].text(0.5, 0.5, 'No second image', ha='center', va='center')
axes[
=[0, 0, 1, 0.96]) # Adjust layout to make space for suptitle
plt.tight_layout(rect plt.show()
def load_and_preprocess_image(image_path):
= Image.open(image_path).convert('RGB')
img = transform(img)
input_tensor return img, input_tensor.unsqueeze(0)
= load_and_preprocess_image(get_full_image_path(newt1_images[0]))
newt1_img1, newt1_tensor1 = load_and_preprocess_image(get_full_image_path(newt1_images[1]))
newt1_img2, newt1_tensor2 = load_and_preprocess_image(get_full_image_path(newt2_images[0]))
newt2_img1, newt2_tensor1 = load_and_preprocess_image(get_full_image_path(newt2_images[1])) newt2_img2, newt2_tensor2
class SimilarityModel(nn.Module):
def __init__(self, backbone):
super().__init__()
self.backbone = backbone
def forward(self, x1, x2):
# Get features from both images
= self.backbone(x1)
features1 = self.backbone(x2)
features2
# Calculate cosine similarity
= torch.nn.functional.cosine_similarity(features1, features2)
similarity return similarity
# Create the similarity model
= SimilarityModel(model).to(device)
similarity_model eval() similarity_model.
Occlusion Sensitivity
my_occlusion_sensitivity
my_occlusion_sensitivity (model, image1, image2, patch_size=16, stride=8, occlusion_value=0, device=None)
*Perform occlusion sensitivity test on the first image to see which regions affect similarity with the second image.
Args: model: The similarity model image1: First image tensor (to be occluded) - shape [1, C, H, W] image2: Second image tensor - shape [1, C, H, W] patch_size: Size of the occlusion patch stride: Stride for moving the occlusion patch occlusion_value: Value used for occlusion (default: 0)
Returns: Sensitivity map showing which regions, when occluded, affect similarity the most*
def occlusion_sensitivity(model, image1, image2, patch_size=16, stride=8, occlusion_value=0):
"""
Perform occlusion sensitivity test on the first image to see which regions
affect similarity with the second image using Captum.
Args:
model: The similarity model
image1: First image tensor (to be occluded) - shape [1, C, H, W]
image2: Second image tensor - shape [1, C, H, W]
patch_size: Size of the occlusion patch
stride: Stride for moving the occlusion patch
occlusion_value: Value used for occlusion (default: 0)
Returns:
Sensitivity map showing which regions, when occluded, affect similarity the most
"""
# Move tensors to the right device
= image1.to(device)
image1 = image2.to(device)
image2
# Create a wrapper function for the model that takes a single input
# This needs to be an nn.Module for Captum's hooks
class ModelWrapper(nn.Module):
def __init__(self, similarity_model_instance, fixed_image_tensor):
super().__init__()
self.similarity_model_instance = similarity_model_instance
self.fixed_image_tensor = fixed_image_tensor
self.similarity_model_instance.eval() # Ensure eval mode
def forward(self, x):
return self.similarity_model_instance(x, self.fixed_image_tensor)
= ModelWrapper(model, image2).to(device)
wrapped_model_for_captum eval()
wrapped_model_for_captum.
# Initialize the Occlusion attribution method
= Occlusion( # Renamed to avoid conflict with captum.attr.Occlusion
occlusion_attr
wrapped_model_for_captum
)
# Compute attributions
= occlusion_attr.attribute(
attributions
image1,=(3, stride, stride), # (channels, height, width)
strides=(3, patch_size, patch_size),
sliding_window_shapes=occlusion_value,
baselines=None, # Corrected: Use None for scalar output per batch item
target
)
# Convert attributions to sensitivity map
# The output of occlusion.attribute is typically [N, C, H, W]
# We want to see the impact, so taking the absolute difference or sum can be useful.
# Here, let's consider the sum of attributions across channels.
# A common way to interpret occlusion is that a large magnitude (positive or negative)
# in attribution for a region means occluding it changed the output significantly.
# The sign indicates direction. If baseline is 0, and output drops, attribution might be negative.
# Let's sum attributions and then take absolute for magnitude of change.
# Or, if we want to see "what makes the score drop", we might not take abs if original_score - perturbed_score is calculated.
# Captum's Occlusion gives attribution of occluded region towards output.
# A simple way to get a per-pixel map is to average over the channel dimension.
= attributions.squeeze(0).abs().mean(dim=0).cpu().detach() # .detach() is good practice
sensitivity_map
# Normalize the sensitivity map for visualization
if sensitivity_map.max() > 0:
= (sensitivity_map - sensitivity_map.min()) / (sensitivity_map.max() - sensitivity_map.min())
sensitivity_map
return sensitivity_map.numpy()
def visualize_occlusion_sensitivity(image, sensitivity_map, title):
"""
Visualize the occlusion sensitivity map overlaid on the original image.
Args:
image: Original PIL image
sensitivity_map: The computed sensitivity map
title: Title for the plot
"""
# Resize sensitivity map to match image dimensions
= cv2.resize(sensitivity_map, (image.size[0], image.size[1]))
resized_map
# Convert PIL image to numpy array and normalize
= np.array(image) / 255.0
img_array
# Create a heatmap visualization
= cv2.applyColorMap(np.uint8(255 * resized_map), cv2.COLORMAP_JET)
heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB) / 255.0
heatmap
# Overlay the heatmap on the image
= 0.7 * img_array + 0.3 * heatmap
overlay = overlay / np.max(overlay)
overlay
=(15, 5))
plt.figure(figsize1, 3, 1)
plt.subplot(
plt.imshow(image)"Original Image")
plt.title('off')
plt.axis(
1, 3, 2)
plt.subplot(='jet')
plt.imshow(resized_map, cmap"Sensitivity Map")
plt.title(=0.046, pad=0.04)
plt.colorbar(fraction'off')
plt.axis(
1, 3, 3)
plt.subplot(
plt.imshow(overlay)f"Overlay - {title}")
plt.title('off')
plt.axis(
plt.tight_layout() plt.show()
# Perform occlusion sensitivity testing on same newt pair
print("Performing occlusion sensitivity test for same newt...")
= occlusion_sensitivity(
sensitivity_map_same
similarity_model,
newt1_tensor1,
newt1_tensor2, =16,
patch_size=8
stride
)
# Visualize occlusion sensitivity results
visualize_occlusion_sensitivity(
newt1_img1,
sensitivity_map_same,f"Same Newt {random_newts[0]} - Regions Important for Similarity"
)
# Perform MY occlusion sensitivity testing on same newt pair
print("Performing MY occlusion sensitivity test for same newts...")
= my_occlusion_sensitivity(
sensitivity_map_same
similarity_model,
newt1_tensor1,
newt1_tensor2, =16,
patch_size=8
stride
)
# Visualize occlusion sensitivity results
visualize_occlusion_sensitivity(
newt1_img1,
sensitivity_map_same,f"Same Newt {random_newts[0]} - Regions Important for Similarity"
)
# Perform occlusion sensitivity testing on different newt pair
print("Performing occlusion sensitivity test for different newts...")
= occlusion_sensitivity(
sensitivity_map_diff
similarity_model,# Image of newt_ID_A (this one will be occluded)
newt1_tensor1, # Image of newt_ID_B (this is the reference for similarity)
newt2_tensor1, =16,
patch_size=8
stride
)
# Visualize occlusion sensitivity results
visualize_occlusion_sensitivity(
newt1_img1,
sensitivity_map_diff,f"Different Newts {random_newts[0]} vs {random_newts[1]} - Regions Important for Similarity"
)
# Perform MY occlusion sensitivity testing on different newt pair
print("Performing MY occlusion sensitivity test for different newts...")
= my_occlusion_sensitivity(
sensitivity_map_diff
similarity_model,# Image of newt_ID_A (this one will be occluded)
newt1_tensor1, # Image of newt_ID_B (this is the reference for similarity)
newt2_tensor1, =16,
patch_size=8
stride
)
# Visualize occlusion sensitivity results
visualize_occlusion_sensitivity(
newt1_img1,
sensitivity_map_diff,f"Different Newts {random_newts[0]} vs {random_newts[1]} - Regions Important for Similarity"
)
Integrated Gradients
def integrated_gradients_similarity(model_instance, image1_tensor, image2_tensor, n_steps=50, target_output_idx=None):
"""
Compute Integrated Gradients for the first image with respect to the similarity
score with the second image.
Args:
model_instance: The SimilarityModel instance (which is an nn.Module).
image1_tensor: Tensor of the first image (to attribute). Shape [1, C, H, W].
image2_tensor: Tensor of the second image (fixed reference). Shape [1, C, H, W].
n_steps: Number of steps for the integration.
target_output_idx: If model outputs multiple values, specify index. For scalar output, can be None.
Returns:
Attributions for image1_tensor.
"""
eval() # Ensure the main model is in eval mode
model_instance.= image1_tensor.to(device)
image1_tensor = image2_tensor.to(device)
image2_tensor
# Ensure tensors require gradients
image1_tensor.requires_grad_()
# Define a wrapper nn.Module for Captum
class ModelWrapper(nn.Module):
def __init__(self, similarity_model_instance, fixed_image_tensor):
super().__init__()
self.similarity_model_instance = similarity_model_instance
self.fixed_image_tensor = fixed_image_tensor
# Ensure the passed model instance is also in eval mode if it wasn't already
self.similarity_model_instance.eval()
def forward(self, img1_input):
return self.similarity_model_instance(img1_input, self.fixed_image_tensor)
= ModelWrapper(model_instance, image2_tensor).to(device)
wrapped_model eval()
wrapped_model.
= IntegratedGradients(wrapped_model)
ig
= torch.zeros_like(image1_tensor).to(device)
baseline
= ig.attribute(image1_tensor,
attributions =baseline,
baselines=target_output_idx,
target=n_steps,
n_steps=False)
return_convergence_deltareturn attributions
def visualize_integrated_gradients(image_pil, attributions_tensor, title):
"""
Visualize Integrated Gradients attributions.
"""
# Convert attributions to numpy array and take the sum across color channels
= attributions_tensor.squeeze().cpu().detach().numpy()
attributions_np = np.transpose(attributions_np, (1, 2, 0))
attributions_np = np.sum(np.abs(attributions_np), axis=2) # Sum absolute attributions across channels
attribution_map
# Normalize the attribution map for visualization
if np.max(attribution_map) > 0:
= (attribution_map - np.min(attribution_map)) / (np.max(attribution_map) - np.min(attribution_map))
attribution_map
= plt.subplots(1, 2, figsize=(12, 6))
fig, axes =16)
fig.suptitle(title, fontsize
0].imshow(image_pil)
axes[0].set_title("Original Image")
axes[0].axis('off')
axes[
= axes[1].imshow(attribution_map, cmap='inferno') # 'inferno' or 'viridis' are good choices
im 1].set_title("Integrated Gradients Attribution")
axes[1].axis('off')
axes[=axes[1], fraction=0.046, pad=0.04)
fig.colorbar(im, ax
=[0, 0, 1, 0.95])
plt.tight_layout(rect plt.show()
# Perform Integrated Gradients for the same newt pair
# We are interested in how image1 contributes to similarity with image2
print("Performing Integrated Gradients for same newt (image1 vs image2)...")
= integrated_gradients_similarity(
attributions_same_newt_img1
similarity_model,# Image to attribute
newt1_tensor1, # Fixed reference image
newt1_tensor2
)
visualize_integrated_gradients(# PIL image corresponding to newt1_tensor1
newt1_img1,
attributions_same_newt_img1,f"IG: Newt {random_newts[0]} (Img 1) vs Newt {random_newts[0]} (Img 2)"
)
# Perform Integrated Gradients for the different newt pair
# We are interested in how image1 (from newt A) contributes to similarity with image1 (from newt B)
print("\nPerforming Integrated Gradients for different newts (newt A img1 vs newt B img1)...")
= integrated_gradients_similarity(
attributions_diff_newt_img1
similarity_model,# Image to attribute (from first newt)
newt1_tensor1, # Fixed reference image (from second newt)
newt2_tensor1
)
visualize_integrated_gradients(# PIL image corresponding to newt1_tensor1
newt1_img1,
attributions_diff_newt_img1,f"IG: Newt {random_newts[0]} (Img 1) vs Newt {random_newts[1]} (Img 1)"
)
Blur Perturbation
def blur_perturbation_similarity(model_instance, image1_tensor, image2_tensor, patch_size=16, stride=8, blur_sigma=5):
"""
Perform perturbation-based saliency by blurring patches of image1 and observing
the change in similarity with image2.
Args:
model_instance: The SimilarityModel instance.
image1_tensor: Tensor of the first image (to be perturbed). Shape [1, C, H, W].
image2_tensor: Tensor of the second image (fixed reference). Shape [1, C, H, W].
patch_size: Size of the patch to blur.
stride: Stride for moving the patch.
blur_sigma: Sigma for Gaussian blur.
Returns:
Sensitivity map (higher values mean blurring that region decreased similarity more).
"""
eval()
model_instance.= image1_tensor.cpu() # Work with CPU tensor for easier numpy conversion and blurring
image1_tensor_cpu = image2_tensor.to(device) # Keep image2 on device for model input
image2_tensor_dev
# Get the original similarity score
with torch.no_grad():
= model_instance(image1_tensor.to(device), image2_tensor_dev).item()
original_similarity
# Get image dimensions
= image1_tensor_cpu.shape
_, c, h, w
# Initialize sensitivity map
= torch.zeros((h, w), device='cpu')
sensitivity_map
# Create a blurred version of the entire image1 (used for replacing patches)
# Convert tensor to numpy for blurring: (C, H, W)
= image1_tensor_cpu.squeeze(0).numpy()
image1_numpy = np.zeros_like(image1_numpy)
blurred_image1_numpy for channel_idx in range(c):
= gaussian_filter(image1_numpy[channel_idx, :, :], sigma=blur_sigma)
blurred_image1_numpy[channel_idx, :, :]
# Compute number of patches
= (h - patch_size) // stride + 1
n_h_patches = (w - patch_size) // stride + 1
n_w_patches
= n_h_patches * n_w_patches
total_patches = 0
patch_count
print(f"Starting blur perturbation: {total_patches} patches to process...")
for i in range(0, h - patch_size + 1, stride):
for j in range(0, w - patch_size + 1, stride):
= image1_numpy.copy()
perturbed_image_numpy
# Replace the patch with the corresponding patch from the blurred image
+patch_size, j:j+patch_size] = \
perturbed_image_numpy[:, i:i+patch_size, j:j+patch_size]
blurred_image1_numpy[:, i:i
# Convert back to tensor and move to device
= torch.from_numpy(perturbed_image_numpy).unsqueeze(0).to(device)
perturbed_image_tensor
with torch.no_grad():
= model_instance(perturbed_image_tensor, image2_tensor_dev).item()
perturbed_similarity
= original_similarity - perturbed_similarity
sensitivity +patch_size, j:j+patch_size] += sensitivity # Accumulate if patches overlap
sensitivity_map[i:i
+= 1
patch_count if patch_count % 20 == 0 or patch_count == total_patches:
print(f"Processed {patch_count}/{total_patches} patches...", end='\r')
print(f"\nCompleted blur perturbation.")
# Normalize the sensitivity map
if sensitivity_map.abs().max() > 0: # Check against absolute max to handle negative sensitivities too
# Center around 0 then scale, or just scale positive changes
if sensitivity_map.max() > sensitivity_map.min() and not (sensitivity_map.max() == 0 and sensitivity_map.min() == 0) :
= (sensitivity_map - sensitivity_map.min()) / (sensitivity_map.max() - sensitivity_map.min())
sensitivity_map elif sensitivity_map.max() > 0 : # if all values are same and positive
= sensitivity_map / sensitivity_map.max()
sensitivity_map
return sensitivity_map.numpy()
# We can reuse visualize_occlusion_sensitivity, let's call it visualize_perturbation_map
# or just use it as is if the title parameter is sufficient.
# For consistency, I'll use the existing visualize_occlusion_sensitivity function.
# Perform Blur Perturbation for the same newt pair
print("Performing Blur Perturbation for same newt (image1 vs image2)...")
= blur_perturbation_similarity(
blur_map_same_newt
similarity_model,
newt1_tensor1,
newt1_tensor2,=24, # Larger patch might be more informative for blur
patch_size=12,
stride=5
blur_sigma
)# Reusing the visualization function
visualize_occlusion_sensitivity(
newt1_img1,
blur_map_same_newt,f"Blur Perturbation: Newt {random_newts[0]} (Img 1) vs Newt {random_newts[0]} (Img 2)"
)
# Perform Blur Perturbation for the different newt pair
print("\nPerforming Blur Perturbation for different newts (newt A img1 vs newt B img1)...")
= blur_perturbation_similarity(
blur_map_diff_newt
similarity_model,
newt1_tensor1,
newt2_tensor1,=24,
patch_size=12,
stride=5
blur_sigma
)# Reusing the visualization function
visualize_occlusion_sensitivity(
newt1_img1,
blur_map_diff_newt,f"Blur Perturbation: Newt {random_newts[0]} (Img 1) vs Newt {random_newts[1]} (Img 1)"
)