# Copy from Cheng An Hsieh, et. al.: https://github.com/RewardMultiverse/reward-multiverse import torch import torch.nn as nn import torchvision from transformers import CLIPModel, CLIPProcessor class SimpleCNN(nn.Module): # parameter = 6333513 def __init__(self, num_class = None): super(SimpleCNN, self).__init__() self.layer1 = nn.Sequential( nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer2 = nn.Sequential( nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer3 = nn.Sequential( nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.layer4 = nn.Sequential( nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2)) self.fc1 = nn.Linear(128 * 32 * 32, 1000) self.fc2 = nn.Linear(1000, num_class) def forward(self, x): x = self.layer1(x) # print("x1", x.shape) x = self.layer2(x) # print("x2", x.shape) x = self.layer3(x) # print("x3", x.shape) x = self.layer4(x) # print("x4", x.shape) x = x.reshape(x.size(0), -1) # print("x reshape", x.shape) x = torch.relu(self.fc1(x)) x = self.fc2(x) return x class MLP(nn.Module): def __init__(self): super().__init__() self.layers = nn.Sequential( # regression nn.Linear(768, 1024), nn.Dropout(0.2), nn.Linear(1024, 128), nn.Dropout(0.2), nn.Linear(128, 64), nn.Dropout(0.1), nn.Linear(64, 16), nn.Linear(16, 1), nn.Sigmoid() ) # self.layers = nn.Sequential( # classification # nn.Linear(768, 1024), # nn.Dropout(0.2), # nn.Linear(1024, 128), # nn.Dropout(0.2), # nn.Linear(128, 64), # nn.Dropout(0.1), # nn.Linear(64, 16), # nn.Linear(16, 2) # ) def forward(self, embed): return self.layers(embed) class MLP_Resnet(nn.Module): def __init__(self, num_class): super().__init__() self.layers = nn.Sequential( nn.Linear(1000, 128), # nn.Dropout(0.2), nn.Linear(128, 64), # nn.Dropout(0.2), nn.Linear(64, 16), nn.Linear(16, num_class), ) def forward(self, embed): return self.layers(embed) def weather_loss_fn(target=None, # TODO: use config.task to decide returned loss_fn grad_scale=0, device=None, accelerator=None, torch_dtype=None, reward_model_resume_from=None, num_of_labels=None): scorer = WeatherScorer(dtype=torch_dtype, model_path=reward_model_resume_from, num_class=num_of_labels).to(device, dtype=torch_dtype) scorer.requires_grad_(False) scorer.eval() def loss_fn(im_pix_un): if accelerator.mixed_precision == "fp16": with accelerator.autocast(): rewards = scorer(im_pix_un) else: rewards = scorer(im_pix_un) target_tensors = torch.full((rewards.shape[0],), target).to(rewards.device, dtype=rewards.dtype) # regression criterion = torch.nn.MSELoss(reduction = "sum") # regression # target_tensors = torch.full((rewards.shape[0],), target).to(rewards.device, dtype=torch.long) # classification # criterion = nn.CrossEntropyLoss(reduction="sum") # classification loss = criterion(rewards, target_tensors) return loss * grad_scale, rewards #nn.Softmax(dim=-1)(rewards) # rewards (reg) return loss_fn class WeatherModel(nn.Module): def __init__(self, num_class = None): super().__init__() self.embed_model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True) self.score_model = MLP_Resnet(num_class) def __call__(self, im): return self.score_model(self.embed_model(im)) class WeatherScorer(nn.Module): # Reward model def __init__(self, dtype=None, model_path = None, num_class = None): super().__init__() self.clip = CLIPModel.from_pretrained("openai/clip-vit-large-patch14") self.clip.requires_grad_(False) self.clip.eval() self.score_generator = MLP() # self.score_generator = WeatherModel(num_class) # resnet + mlp if model_path: state_dict = torch.load(model_path) self.score_generator.load_state_dict(state_dict) self.score_generator.requires_grad_(False) self.score_generator.eval() # self.clip.requires_grad_(False) # self.clip.eval() else: self.score_generator.requires_grad_(True) if dtype: self.dtype = dtype self.target_size = (224,224) # resnet 224, cnn 512 (use 224 for both...?) self.normalize = torchvision.transforms.Normalize(mean=[0.48145466, 0.4578275, 0.40821073], std=[0.26862954, 0.26130258, 0.27577711]) def set_device(self, device, inference_type): self.clip.to(device, dtype = inference_type) # uncomment for mlp self.score_generator.to(device) # dtype = inference_dtype def __call__(self, images): device = next(self.parameters()).device im_pix = torchvision.transforms.Resize(self.target_size)(images) im_pix = self.normalize(im_pix).to(images.dtype) embed = self.clip.get_image_features(pixel_values=im_pix) embed = embed / torch.linalg.vector_norm(embed, dim=-1, keepdim=True) return self.score_generator(embed).squeeze(1) # CLIP + MLP # return self.score_generator(im_pix).squeeze(1) # for simpleCNN