Search code examples
pythonmachine-learningdeep-learningamazon-sagemakermlops

Sagemaker batch transformer with my own pre-trained model


I'm trying to run inference on demand for yolo-nas using sagemaker batch transformer.

Using pre trained model with pre trained weights.

But I am getting this error:

python3: can't open file '//serve': [Errno 2] No such file or directory

I have no idea what this '//serve' is. I have no ref or use of it at all, and can't find any docs about it

Some more data about my case:

  1. Data are just images with single object in them (known as 'crop')

  2. seq-gil-transformer.py sits in s3://benny-test/indexer/yolo-nas/1/code

  3. /opt/ml/processing/input is not the right path. I know about it, I'm in the process of understanding which path I should use but the code crashes way before it...

I have my own custom image as follows:

FROM python:3.10-bullseye

RUN apt update && \
    apt install ffmpeg libsm6 libxext6 libgl1 -y && \
    rm -rf /var/lib/apt/lists/*

ADD req.txt req.txt

RUN pip3 install -r req.txt

ENTRYPOINT ["python3"]

I have my own inference code known as 'seq-gil-transformer.py':

(I know that I can improve it, for now trying to make the logic work & understand sagemaker and then I will improve the code)

import super_gradients
import torch
from torch import nn
import json
from collections import OrderedDict
from typing import List
from pathlib import Path
import PIL 
import numpy as np
import time
import cProfile
import trace
import io
import psutil
import numpy as np
import matplotlib.pyplot as plt
import sys
import pandas as pd
import argparse


# getting images from local path
def get_images(path, device):
    crops_path_list = Path(path).glob("*.png")
    images = []
    for crop in crops_path_list:
        image = PIL.Image.open(str(crop)).convert("RGB")
        image = np.asarray(image)
        image = np.ascontiguousarray(image.transpose(2, 0, 1))
        image  = torch.from_numpy(np.array(image)).unsqueeze(0).to(dtype=torch.float32, device=device)
        images.append(image)
    return images

# saving model
def create_yolo_nas_indexer(device="cpu"):
    yolo_nas = super_gradients.training.models.get("yolo_nas_l", pretrained_weights="coco").to(device)
    torch.save(yolo_nas.backbone, f"yolo_nas_l_{device}.tar.gz")

# loading model. 
# using our own module named 'modelwrapper', 
# you can see it in the end of this post
def load_yolo_nas(device="cpu",output_layers=["stage2","stage3","context_module"]):
    yolo_nas = torch.load(f"yolo_nas_l_{device}.pth.tar").to(device)
    wraped_yolo_nas = ModelWrapper(yolo_nas,device=device).to(device)
    wraped_yolo_nas.add_output_layers(output_layers)
    return wraped_yolo_nas

# making the inference using 'modelWrapper'
# and the loaded yolo-nas super-gredients of
# deci-ai model per image(crop)
def primitive_c2v(crop:torch.Tensor, model)->list:
    output = model(crop)[0]
    return output

if __name__ == "__main__":
    engine = "cpu"
    crops = get_images("/opt/ml/processing/input", engine)
    
    H = W = 64
    C = 3
    
    outputs = []
    create_yolo_nas_indexer(engine)
    model = load_yolo_nas(engine)
    model.eval()
    list_o = []
    for crop in crops:
        start_time = time.time()
        output = primitive_c2v(crop, model)
        output = [l.detach().numpy() for l in output]
        list_o.append(output)

# printing a bit of the output just to be sure
# everything went well.
# in my data i got list of lists of numpy tensors.
# each list in the grand list is of size of 4
    for l in list_o:
        print(len(l))
    

And the sagemaker code itself is:

import boto3
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session() 
role = get_execution_role()
bucket = 'benny-test'

model = sagemaker.model.Model(
    source_dir="s3://benny-test/indexer/yolo-nas/1/code",
    entry_point="seq-gil-transformer.py",
    image_uri='*.dkr.ecr.eu-west-1.amazonaws.com/sagemaker:super-gredients-0.1',
    role=role,
    name="yolo-nas-cpu")

transformer = model.transformer(
    instance_count=1,
    instance_type="ml.m5.4xlarge"
)
transformer.transform(
    data="s3://benny-test/indexer/Raw/dummy_set/images"
)
transformer.wait()

Model wrapper ref: this code is not necessarily needed but I'm showing it here for reference and reproducibility :)

def remove_all_hooks_recursive(model: nn.Module) -> None:
    for name, child in model.named_children():
        if child is not None:
            if hasattr(child, "_forward_hooks"):
                child._forward_hooks = OrderedDict()
            elif hasattr(child, "_forward_pre_hooks"):
                child._forward_pre_hooks = OrderedDict()
            elif hasattr(child, "_backward_hooks"):
                child._backward_hooks = OrderedDict()
            remove_all_hooks_recursive(child)
def add_all_modules_to_model_dict_recursive(model, module_dict, prefix=''):
    """Recursively adds all modules in a PyTorch model to a hierarchical dictionary."""
    for name, module in model.named_children():
        full_name = prefix + '.' + name if prefix else name
        full_name = full_name if full_name != "_model" else ""
        module_dict[full_name] = module
        if isinstance(module, nn.Module):
            add_all_modules_to_model_dict_recursive(module, module_dict, full_name)
class StopModel(Exception):
    def __init__(self):
        super().__init__()
def forward_hook(model_wrapper, layer_name, model=None):
    def hook(module, input, output):
        model_wrapper.selected_out[layer_name] = output
        if model is not None:
            _, code = model(output)
            model_wrapper.selected_out[f"code_{layer_name}"] = code
        if model_wrapper.stop_at_last_hook and layer_name == model_wrapper.last_layer:
            raise StopModel()
    return hook
class ModelWrapper(nn.Module):
    def __init__(self, model, stop_at_last_hook=False, device=None):
        super().__init__()
        self.stop_at_last_hook = stop_at_last_hook
        self.model = model
        self.model.eval()
        self.output_layers = []
        self.selected_out = OrderedDict()
        self.fhooks = []
        self.layer_size_dict = {}
        self.layer_stride_dict = {}
        self.model_dict = self.add_all_modules_to_model_dict()
        self.device = device or torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.last_layer = None
    @classmethod
    def from_cfg(cls, cfg_path: str):
        with open(cfg_path, "r") as f:
            cfg_dict = json.load(f)
        cls(**cfg_dict)
    def add_output_layers(self, output_layers: List[str]):
        self.last_layer = output_layers[-1]
        for output_layer in output_layers:
            if output_layer not in self.model_dict:
                raise ValueError(f"Model does not have layer: {output_layer}")
        self.output_layers = output_layers
        for layer_name, module in self.model_dict.items():
            if layer_name in self.output_layers:
                self.fhooks.append(module.register_forward_hook(forward_hook(self, layer_name)))
        self.compute_output_layer_parameters()
    def compute_output_layer_parameters(self):
        random_input = torch.rand(1, 3, 64, 64) #TODO: CHANGE TO ZEROS and avoid seed usage
        random_input = random_input.to(self.device)
        self.forward(random_input)
        for layer_name, output_value in self.selected_out.items():
            if isinstance(output_value, (list, tuple)):
                self.layer_size_dict[layer_name] = None
                self.layer_stride_dict[layer_name] = None
            else:
                self.layer_size_dict[layer_name] = output_value.shape[1]
                self.layer_stride_dict[layer_name] = int(64 / output_value.shape[2])
    def print_all_modules(self, print_module_str=False):
        for layer_name, module in self.model_dict.items():
            layer_txt = layer_name
            if print_module_str:
                layer_txt += f": {str(module)}"
            if layer_name in self.output_layers:
                layer_txt += " (SET AS AN OUTPUT LAYER)"
            print(layer_txt)
    def forward(self, x):
        # TODO: find a way to run the model only for the selected out
        if self.stop_at_last_hook:
            try:
                self.model(x)
            except Exception as e:
                if not isinstance(e, StopModel):
                    raise e
            out = None
        else:
            out = self.model(x)
        return out, self.selected_out
    def inference(self, image, name):
        bbox_list = self.model.inference(image, name)
        return bbox_list, self.selected_out
    def add_all_modules_to_model_dict(self):
        model_dict = {}
        add_all_modules_to_model_dict_recursive(self.model, model_dict)
        return model_dict
    def remove_all_hooks(self):
        remove_all_hooks_recursive(self.model)
        self.selected_out = OrderedDict()

Solution

  • Sagemaker batch inference and endpoints work in same way. They are expecting web server with get[ping] and post[invocations] to start working. The thing is, when sagemaker runs, he is running a file named "serve" which was missing in my case. To be clear, as far as i understood. "my case" is when we dont use estimator before the inference. you can see that im defining my own pre trained model like that:

    model = sagemaker.model.Model(
        image_uri='*.dkr.ecr.eu-west-1.amazonaws.com/sagemaker:yolo-nas-cpu-infra-0.1.28',
        role=role,
        name="yolo-nas-cpu-infra-v0-1-28-dt"+str(date_time_in_numbers),
    )
    
    transformer = model.transformer(
        instance_count=1,
        instance_type="ml.m5.4xlarge",
    )
    
    transformer.transform(
        data="s3://benny-test/indexer/yolo-nas/sagemaker-bs/",
        content_type="application/json"
    )
    

    TL;DR used flask, ngnix, gunicorn. serve file configuring and running ngnix and gunicorn