Vertex AI en Producción: Construyendo Pipelines ML que Escalan en LATAM
Cómo arquitectar un pipeline de machine learning completo en Google Cloud usando Vertex AI Pipelines, Cloud Storage y BigQuery — listo para producción.
¿Por qué Vertex AI y no alternativas?
Cuando las empresas latinoamericanas piensan en IA, el primer instinto es usar OpenAI o montar algo en un servidor propio. Ninguna es la opción correcta para producción empresarial.
Vertex AI ofrece lo mejor de ambos mundos: la infraestructura de Google con control total sobre tus modelos y datos. Y para empresas que ya están en GCP, la integración es nativa.
Arquitectura del Pipeline
El pipeline que presentamos hoy es production-ready y maneja el flujo completo:
Ingesta de datos → BigQuery
↓
Preprocesamiento → Vertex AI Pipelines
↓
Entrenamiento → Custom Training Job
↓
Evaluación → Vertex AI Experiments
↓
Deploy → Vertex AI Endpoints
↓
Monitoreo → Model Monitoring
Configuración inicial
1. Habilitar APIs necesarias
gcloud services enable \
aiplatform.googleapis.com \
bigquery.googleapis.com \
storage.googleapis.com \
artifactregistry.googleapis.com
2. Container de entrenamiento personalizado
# trainer/task.py
import argparse
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from google.cloud import storage, bigquery
import joblib
import os
def train_model(args):
# Cargar datos desde BigQuery
client = bigquery.Client()
query = f"""
SELECT * FROM `{args.project}.{args.dataset}.{args.table}`
WHERE fecha >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
"""
df = client.query(query).to_dataframe()
X = df.drop('target', axis=1)
y = df['target']
model = GradientBoostingClassifier(
n_estimators=200,
learning_rate=0.1,
max_depth=5,
random_state=42
)
model.fit(X, y)
# Guardar modelo en GCS
model_path = '/tmp/model.joblib'
joblib.dump(model, model_path)
storage_client = storage.Client()
bucket = storage_client.bucket(args.bucket)
blob = bucket.blob(f'models/{args.version}/model.joblib')
blob.upload_from_filename(model_path)
print(f"Modelo guardado en gs://{args.bucket}/models/{args.version}/")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--project', required=True)
parser.add_argument('--dataset', required=True)
parser.add_argument('--table', required=True)
parser.add_argument('--bucket', required=True)
parser.add_argument('--version', required=True)
args = parser.parse_args()
train_model(args)
Definiendo el Pipeline con KFP
# pipeline.py
from kfp import dsl
from kfp.v2 import compiler
import google.cloud.aiplatform as aip
@dsl.component(
base_image='python:3.11',
packages_to_install=['pandas', 'scikit-learn', 'google-cloud-bigquery', 'google-cloud-storage']
)
def validate_data(project: str, dataset: str, table: str) -> bool:
from google.cloud import bigquery
client = bigquery.Client(project=project)
# Validar calidad de datos
query = f"""
SELECT
COUNT(*) as total,
COUNTIF(target IS NULL) as nulls,
COUNTIF(feature_1 < 0) as anomalias
FROM `{project}.{dataset}.{table}`
"""
result = client.query(query).result()
row = next(iter(result))
null_rate = row.nulls / row.total
anomaly_rate = row.anomalias / row.total
return null_rate < 0.05 and anomaly_rate < 0.01
@dsl.pipeline(
name='dakylabs-ml-pipeline',
description='Pipeline ML producción para DakyLabs'
)
def ml_pipeline(project: str, dataset: str, table: str, bucket: str):
validate_task = validate_data(project=project, dataset=dataset, table=table)
with dsl.Condition(validate_task.output == True, name='datos-validos'):
training_job = dsl.ContainerSpec(
image=f'gcr.io/{project}/trainer:latest',
command=['python', '-m', 'trainer.task'],
args=[
'--project', project,
'--dataset', dataset,
'--table', table,
'--bucket', bucket,
'--version', '{{$.pipeline_run_id}}'
]
)
Monitoreo en producción
Una vez deployado, activa Vertex AI Model Monitoring para detectar data drift automáticamente:
aip.ModelDeploymentMonitoringJob.create(
display_name='monitor-produccion',
endpoint=endpoint.resource_name,
logging_sampling_strategy=aip.gapic.SamplingStrategy(
random_sample_config=aip.gapic.SamplingStrategy.RandomSampleConfig(
sample_rate=0.1 # 10% de requests
)
),
model_deployment_monitoring_objective_configs=[
aip.gapic.ModelDeploymentMonitoringObjectiveConfig(
deployed_model_id=deployed_model_id,
objective_config=aip.gapic.ModelMonitoringObjectiveConfig(
prediction_drift_detection_config=aip.gapic.ModelMonitoringObjectiveConfig.PredictionDriftDetectionConfig(
drift_thresholds={'feature_1': aip.gapic.ThresholdConfig(value=0.3)}
)
)
)
]
)
Siguientes pasos
Este pipeline es el punto de partida. En artículos siguientes cubriremos:
- Fine-tuning de modelos Gemini en Vertex AI
- A/B testing de endpoints
- Optimización de costos en entrenamiento con TPUs
¿Tu empresa necesita ayuda implementando ML en producción? Trabajamos con equipos de todo LATAM.