Mi primer pipeline¶
Kubeflow pipelines: MNIST benchmark¶
Este ejempo sirve para ilustrar un pipeline de kubeflow
usando MNIST
dataset.
El pipeline
lo vamos a construir usando Lightweight Python Components
Constantes¶
Vamos a definir algunas constantes que serán usadas más adelante
In [361]:
Copied!
PIPELINE_NAME = "MNIST"
PIPELINE_DESCRIPTION = "Ejemplo de pipeline"
PIPELINE_VERSION = "V1"
EXPERIMENT_NAME = PIPELINE_NAME + "_" + "Best-accuracy"
PIPELINE_NAME = "MNIST"
PIPELINE_DESCRIPTION = "Ejemplo de pipeline"
PIPELINE_VERSION = "V1"
EXPERIMENT_NAME = PIPELINE_NAME + "_" + "Best-accuracy"
Componentes¶
Cada paso en un pipeline se define via un componente
, en este caso vamos a hacer un pipeline sencillo:
download_data
->train_data
->test_data
In [327]:
Copied!
import kfp
from kfp import dsl, compiler
from kfp.dsl import *
from typing import *
from kfp_server_api.exceptions import ApiException
import kfp
from kfp import dsl, compiler
from kfp.dsl import *
from typing import *
from kfp_server_api.exceptions import ApiException
Download data¶
Descargamos el conjunto de train y test
In [321]:
Copied!
@dsl.component(base_image="python:3.10")
def download_data(test_dataset_path: Output[Dataset],
train_dataset_path: Output[Dataset],
metrics: Output[Metrics]):
"""Descarga los datos de train y test y las rutas donde se guardan estos datasets/artefactos, se envian el siguiente paso."""
## Install packages
import subprocess
import sys
def install(package):
subprocess.check_call([sys.executable, "-m", "pip", "install", package,
"--extra-index-url=https://download.pytorch.org/whl/cpu",
"--trusted-host=download.pytorch.org"])
install("torch==2.0.0+cpu")
install("torchvision==0.15.1+cpu")
#install("matplotlib")
## Libraries
import torch
from torchvision.transforms import transforms
from torchvision.datasets import MNIST
## Download MNIST DATASET
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
mnist_train = MNIST(".", download=True, train=True, transform=transform)
mnist_test = MNIST(".", download=True, train=False, transform=transform)
with open(train_dataset_path.path, "wb") as f:
torch.save(mnist_train,f)
with open(test_dataset_path.path, "wb") as f:
torch.save(mnist_test,f)
# metricas
metrics.log_metric("# Samples train dataset", len(mnist_train))
metrics.log_metric("# Samples test dataset", len(mnist_test))
@dsl.component(base_image="python:3.10")
def download_data(test_dataset_path: Output[Dataset],
train_dataset_path: Output[Dataset],
metrics: Output[Metrics]):
"""Descarga los datos de train y test y las rutas donde se guardan estos datasets/artefactos, se envian el siguiente paso."""
## Install packages
import subprocess
import sys
def install(package):
subprocess.check_call([sys.executable, "-m", "pip", "install", package,
"--extra-index-url=https://download.pytorch.org/whl/cpu",
"--trusted-host=download.pytorch.org"])
install("torch==2.0.0+cpu")
install("torchvision==0.15.1+cpu")
#install("matplotlib")
## Libraries
import torch
from torchvision.transforms import transforms
from torchvision.datasets import MNIST
## Download MNIST DATASET
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
mnist_train = MNIST(".", download=True, train=True, transform=transform)
mnist_test = MNIST(".", download=True, train=False, transform=transform)
with open(train_dataset_path.path, "wb") as f:
torch.save(mnist_train,f)
with open(test_dataset_path.path, "wb") as f:
torch.save(mnist_test,f)
# metricas
metrics.log_metric("# Samples train dataset", len(mnist_train))
metrics.log_metric("# Samples test dataset", len(mnist_test))
Entrenamos el modelo¶
In [322]:
Copied!
@dsl.component(base_image="python:3.10")
def train(train_dataset_path: Input[Dataset],
model_path: Output[Model],
model_params: Output[Markdown],
batch_size: int,
epochs: int,
lr: float=0.1,
gamma: float=0.7,
):
"""Entrenamos un model usando pytorch. La ruta donde se guarda el modelo/artefacto se envia al guiente paso"""
## Install packages
import subprocess
import sys
def install(package):
subprocess.check_call([sys.executable, "-m", "pip", "install", package,
"--extra-index-url=https://download.pytorch.org/whl/cpu",
"--trusted-host=download.pytorch.org"])
install("torch==2.0.0+cpu")
install("torchvision==0.15.1+cpu")
install("numba==0.56.4")
install("numpy==1.23.5")
## Libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
## Read dataset
mnist_train = torch.load(train_dataset_path.path)
## Net definition
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
# Training
train_loader = torch.utils.data.DataLoader(mnist_train, batch_size=batch_size)
model = Net().to("cpu")
optimizer = optim.Adadelta(model.parameters(), lr=lr)
scheduler = StepLR(optimizer, step_size=1, gamma=gamma)
def _train(model, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to("cpu"), target.to("cpu")
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
for epoch in range(1, epochs+1):
_train(model=model, train_loader = train_loader, optimizer=optimizer, epoch=epoch)
scheduler.step()
with open(model_path.path, "wb") as f:
torch.save(model.state_dict(),f)
markdown_content = '# MNIST Model \n\n '
markdown_content+= '## Model state dict: \n'
for param_tensor in model.state_dict():
markdown_content+= "\t" + str(model.state_dict()[param_tensor].size()) + "\n"
with open(model_params.path, 'w') as f:
f.write(markdown_content)
@dsl.component(base_image="python:3.10")
def train(train_dataset_path: Input[Dataset],
model_path: Output[Model],
model_params: Output[Markdown],
batch_size: int,
epochs: int,
lr: float=0.1,
gamma: float=0.7,
):
"""Entrenamos un model usando pytorch. La ruta donde se guarda el modelo/artefacto se envia al guiente paso"""
## Install packages
import subprocess
import sys
def install(package):
subprocess.check_call([sys.executable, "-m", "pip", "install", package,
"--extra-index-url=https://download.pytorch.org/whl/cpu",
"--trusted-host=download.pytorch.org"])
install("torch==2.0.0+cpu")
install("torchvision==0.15.1+cpu")
install("numba==0.56.4")
install("numpy==1.23.5")
## Libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
## Read dataset
mnist_train = torch.load(train_dataset_path.path)
## Net definition
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
# Training
train_loader = torch.utils.data.DataLoader(mnist_train, batch_size=batch_size)
model = Net().to("cpu")
optimizer = optim.Adadelta(model.parameters(), lr=lr)
scheduler = StepLR(optimizer, step_size=1, gamma=gamma)
def _train(model, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to("cpu"), target.to("cpu")
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
for epoch in range(1, epochs+1):
_train(model=model, train_loader = train_loader, optimizer=optimizer, epoch=epoch)
scheduler.step()
with open(model_path.path, "wb") as f:
torch.save(model.state_dict(),f)
markdown_content = '# MNIST Model \n\n '
markdown_content+= '## Model state dict: \n'
for param_tensor in model.state_dict():
markdown_content+= "\t" + str(model.state_dict()[param_tensor].size()) + "\n"
with open(model_params.path, 'w') as f:
f.write(markdown_content)
Evaluamos el modelo¶
In [323]:
Copied!
@dsl.component(packages_to_install=['scikit-learn'], base_image="python:3.10")
def test(test_dataset_path: Input[Dataset],
model_path: Input[Model],
cm: Output[ClassificationMetrics],
acc: Output[Metrics]):
from typing import Tuple, List
## Install packages
import subprocess
import sys
def install(package):
subprocess.check_call([sys.executable, "-m", "pip", "install", package,
"--extra-index-url=https://download.pytorch.org/whl/cpu",
"--trusted-host=download.pytorch.org"])
install("torch==2.0.0+cpu")
install("torchvision==0.15.1+cpu")
install("numba==0.56.4")
install("numpy==1.23.5")
import torch
import torch.nn as nn
import torch.nn.functional as F
## Net definition
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
model = Net()
model.load_state_dict(torch.load(model_path.path))
model.eval()
mnist_test = torch.load(test_dataset_path.path)
test_loader = torch.utils.data.DataLoader(mnist_test, batch_size=32)
def _test(model, test_loader) -> Tuple[List, List, float, float]:
y_pred = []
y_true = []
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to("cpu"), target.to("cpu")
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
y_true.extend(target.numpy())
y_pred.extend(pred.numpy())
test_loss /= len(test_loader.dataset)
accuracy = 100. * correct / len(test_loader.dataset)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset), accuracy))
return y_true, y_pred, test_loss, accuracy
y_true, y_pred, test_loss, accuracy = _test(model=model, test_loader=test_loader)
from sklearn.metrics import confusion_matrix
# confusion matrix
cm.log_confusion_matrix(
[str(i) for i in range(10)],
confusion_matrix(y_true, y_pred).tolist() # .tolist() to convert np array to list.
)
acc.log_metric("avg_loss",test_loss)
acc.log_metric("accuracy",accuracy)
@dsl.component(packages_to_install=['scikit-learn'], base_image="python:3.10")
def test(test_dataset_path: Input[Dataset],
model_path: Input[Model],
cm: Output[ClassificationMetrics],
acc: Output[Metrics]):
from typing import Tuple, List
## Install packages
import subprocess
import sys
def install(package):
subprocess.check_call([sys.executable, "-m", "pip", "install", package,
"--extra-index-url=https://download.pytorch.org/whl/cpu",
"--trusted-host=download.pytorch.org"])
install("torch==2.0.0+cpu")
install("torchvision==0.15.1+cpu")
install("numba==0.56.4")
install("numpy==1.23.5")
import torch
import torch.nn as nn
import torch.nn.functional as F
## Net definition
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
model = Net()
model.load_state_dict(torch.load(model_path.path))
model.eval()
mnist_test = torch.load(test_dataset_path.path)
test_loader = torch.utils.data.DataLoader(mnist_test, batch_size=32)
def _test(model, test_loader) -> Tuple[List, List, float, float]:
y_pred = []
y_true = []
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to("cpu"), target.to("cpu")
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
y_true.extend(target.numpy())
y_pred.extend(pred.numpy())
test_loss /= len(test_loader.dataset)
accuracy = 100. * correct / len(test_loader.dataset)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset), accuracy))
return y_true, y_pred, test_loss, accuracy
y_true, y_pred, test_loss, accuracy = _test(model=model, test_loader=test_loader)
from sklearn.metrics import confusion_matrix
# confusion matrix
cm.log_confusion_matrix(
[str(i) for i in range(10)],
confusion_matrix(y_true, y_pred).tolist() # .tolist() to convert np array to list.
)
acc.log_metric("avg_loss",test_loss)
acc.log_metric("accuracy",accuracy)
Pipeline¶
In [324]:
Copied!
# Definimos el pipeline
@dsl.pipeline(
name='MNIST',
description='Ejemplo de pipeline de entrenamiento',
)
def run(batch_size:int=32, epochs:int=2):
step_1 = download_data()
step_2 = train(train_dataset_path=step_1.outputs["train_dataset_path"], batch_size=batch_size, epochs=epochs)
step_3 = test(test_dataset_path=step_1.outputs["test_dataset_path"], model_path=step_2.outputs["model_path"] )
# Definimos el pipeline
@dsl.pipeline(
name='MNIST',
description='Ejemplo de pipeline de entrenamiento',
)
def run(batch_size:int=32, epochs:int=2):
step_1 = download_data()
step_2 = train(train_dataset_path=step_1.outputs["train_dataset_path"], batch_size=batch_size, epochs=epochs)
step_3 = test(test_dataset_path=step_1.outputs["test_dataset_path"], model_path=step_2.outputs["model_path"] )
In [287]:
Copied!
# Compilamos el pipeline
compiler.Compiler().compile(run, package_path='pipeline.yaml')
# Compilamos el pipeline
compiler.Compiler().compile(run, package_path='pipeline.yaml')
A continuación registramos el pipeline
In [ ]:
Copied!
client = kfp.Client()
namespace = client.get_user_namespace()
client = kfp.Client()
namespace = client.get_user_namespace()
In [ ]:
Copied!
try:
pipeline = client.upload_pipeline(pipeline_package_path="pipeline.yaml",
pipeline_name=PIPELINE_NAME,
description=PIPELINE_DESCRIPTION,
namespace=namespace)
except ApiException as e:
if "Already exist" in e.body:
print("Pipeline {} already exists.".format(PIPELINE_NAME))
try:
pipeline = client.upload_pipeline(pipeline_package_path="pipeline.yaml",
pipeline_name=PIPELINE_NAME,
description=PIPELINE_DESCRIPTION,
namespace=namespace)
except ApiException as e:
if "Already exist" in e.body:
print("Pipeline {} already exists.".format(PIPELINE_NAME))
Los pipelines se pueden agrupar y versionar
In [ ]:
Copied!
try:
pipeline_version = client.upload_pipeline_version(
pipeline_package_path="pipeline.yaml",
pipeline_version_name=PIPELINE_VERSION,
pipeline_id=pipeline.pipeline_id,
description="Primera version"
)
except ApiException as e:
if "Already exist" in e.body:
print("Pipeline {} with version {} already exists.".format(pipeline.display_name, PIPELINE_VERSION))
try:
pipeline_version = client.upload_pipeline_version(
pipeline_package_path="pipeline.yaml",
pipeline_version_name=PIPELINE_VERSION,
pipeline_id=pipeline.pipeline_id,
description="Primera version"
)
except ApiException as e:
if "Already exist" in e.body:
print("Pipeline {} with version {} already exists.".format(pipeline.display_name, PIPELINE_VERSION))
Ejecucion¶
Como vamos a hacer varias ejecuciones, estas se pueden agrupar por experimentos
In [ ]:
Copied!
try:
experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)
except:
experiment = client.create_experiment(EXPERIMENT_NAME)
try:
experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)
except:
experiment = client.create_experiment(EXPERIMENT_NAME)
In [313]:
Copied!
import time
import time
In [ ]:
Copied!
params = {"batch_size": 32, "epochs": 1}
run_pipeline = client.run_pipeline(experiment_id=experiment.experiment_id,
job_name=pipeline.display_name + "_run_" + time.strftime("%Y-%m-%d--%H:%M:%S", time.gmtime()),
params=params,
pipeline_id=pipeline_version.pipeline_id,
version_id=pipeline_version.pipeline_version_id)
params = {"batch_size": 32, "epochs": 1}
run_pipeline = client.run_pipeline(experiment_id=experiment.experiment_id,
job_name=pipeline.display_name + "_run_" + time.strftime("%Y-%m-%d--%H:%M:%S", time.gmtime()),
params=params,
pipeline_id=pipeline_version.pipeline_id,
version_id=pipeline_version.pipeline_version_id)
In [ ]:
Copied!
# Vamos a hacer otra ejecución con otros parámetros
params = {"batch_size": 64, "epochs": 5}
run_pipeline = client.run_pipeline(experiment_id=experiment.experiment_id,
job_name=pipeline.display_name + "_run_" + time.strftime("%Y-%m-%d--%H:%M:%S", time.gmtime()),
params=params,
pipeline_id=pipeline_version.pipeline_id,
version_id=pipeline_version.pipeline_version_id)
# Vamos a hacer otra ejecución con otros parámetros
params = {"batch_size": 64, "epochs": 5}
run_pipeline = client.run_pipeline(experiment_id=experiment.experiment_id,
job_name=pipeline.display_name + "_run_" + time.strftime("%Y-%m-%d--%H:%M:%S", time.gmtime()),
params=params,
pipeline_id=pipeline_version.pipeline_id,
version_id=pipeline_version.pipeline_version_id)