Todos los artículos

Vertex AI en Producción: Construyendo Pipelines ML que Escalan en LATAM

Cómo arquitectar un pipeline de machine learning completo en Google Cloud usando Vertex AI Pipelines, Cloud Storage y BigQuery — listo para producción.

¿Por qué Vertex AI y no alternativas?

Cuando las empresas latinoamericanas piensan en IA, el primer instinto es usar OpenAI o montar algo en un servidor propio. Ninguna es la opción correcta para producción empresarial.

Vertex AI ofrece lo mejor de ambos mundos: la infraestructura de Google con control total sobre tus modelos y datos. Y para empresas que ya están en GCP, la integración es nativa.

Arquitectura del Pipeline

El pipeline que presentamos hoy es production-ready y maneja el flujo completo:

Ingesta de datos → BigQuery

Preprocesamiento → Vertex AI Pipelines

Entrenamiento → Custom Training Job

Evaluación → Vertex AI Experiments

Deploy → Vertex AI Endpoints

Monitoreo → Model Monitoring

Configuración inicial

1. Habilitar APIs necesarias

gcloud services enable \
  aiplatform.googleapis.com \
  bigquery.googleapis.com \
  storage.googleapis.com \
  artifactregistry.googleapis.com

2. Container de entrenamiento personalizado

# trainer/task.py
import argparse
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from google.cloud import storage, bigquery
import joblib
import os

def train_model(args):
    # Cargar datos desde BigQuery
    client = bigquery.Client()
    query = f"""
        SELECT * FROM `{args.project}.{args.dataset}.{args.table}`
        WHERE fecha >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
    """
    df = client.query(query).to_dataframe()

    X = df.drop('target', axis=1)
    y = df['target']

    model = GradientBoostingClassifier(
        n_estimators=200,
        learning_rate=0.1,
        max_depth=5,
        random_state=42
    )
    model.fit(X, y)

    # Guardar modelo en GCS
    model_path = '/tmp/model.joblib'
    joblib.dump(model, model_path)

    storage_client = storage.Client()
    bucket = storage_client.bucket(args.bucket)
    blob = bucket.blob(f'models/{args.version}/model.joblib')
    blob.upload_from_filename(model_path)

    print(f"Modelo guardado en gs://{args.bucket}/models/{args.version}/")

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--project', required=True)
    parser.add_argument('--dataset', required=True)
    parser.add_argument('--table', required=True)
    parser.add_argument('--bucket', required=True)
    parser.add_argument('--version', required=True)
    args = parser.parse_args()
    train_model(args)

Definiendo el Pipeline con KFP

# pipeline.py
from kfp import dsl
from kfp.v2 import compiler
import google.cloud.aiplatform as aip

@dsl.component(
    base_image='python:3.11',
    packages_to_install=['pandas', 'scikit-learn', 'google-cloud-bigquery', 'google-cloud-storage']
)
def validate_data(project: str, dataset: str, table: str) -> bool:
    from google.cloud import bigquery
    client = bigquery.Client(project=project)
    
    # Validar calidad de datos
    query = f"""
        SELECT 
            COUNT(*) as total,
            COUNTIF(target IS NULL) as nulls,
            COUNTIF(feature_1 < 0) as anomalias
        FROM `{project}.{dataset}.{table}`
    """
    result = client.query(query).result()
    row = next(iter(result))
    
    null_rate = row.nulls / row.total
    anomaly_rate = row.anomalias / row.total
    
    return null_rate < 0.05 and anomaly_rate < 0.01

@dsl.pipeline(
    name='dakylabs-ml-pipeline',
    description='Pipeline ML producción para DakyLabs'
)
def ml_pipeline(project: str, dataset: str, table: str, bucket: str):
    validate_task = validate_data(project=project, dataset=dataset, table=table)
    
    with dsl.Condition(validate_task.output == True, name='datos-validos'):
        training_job = dsl.ContainerSpec(
            image=f'gcr.io/{project}/trainer:latest',
            command=['python', '-m', 'trainer.task'],
            args=[
                '--project', project,
                '--dataset', dataset,
                '--table', table,
                '--bucket', bucket,
                '--version', '{{$.pipeline_run_id}}'
            ]
        )

Monitoreo en producción

Una vez deployado, activa Vertex AI Model Monitoring para detectar data drift automáticamente:

aip.ModelDeploymentMonitoringJob.create(
    display_name='monitor-produccion',
    endpoint=endpoint.resource_name,
    logging_sampling_strategy=aip.gapic.SamplingStrategy(
        random_sample_config=aip.gapic.SamplingStrategy.RandomSampleConfig(
            sample_rate=0.1  # 10% de requests
        )
    ),
    model_deployment_monitoring_objective_configs=[
        aip.gapic.ModelDeploymentMonitoringObjectiveConfig(
            deployed_model_id=deployed_model_id,
            objective_config=aip.gapic.ModelMonitoringObjectiveConfig(
                prediction_drift_detection_config=aip.gapic.ModelMonitoringObjectiveConfig.PredictionDriftDetectionConfig(
                    drift_thresholds={'feature_1': aip.gapic.ThresholdConfig(value=0.3)}
                )
            )
        )
    ]
)

Siguientes pasos

Este pipeline es el punto de partida. En artículos siguientes cubriremos:

  • Fine-tuning de modelos Gemini en Vertex AI
  • A/B testing de endpoints
  • Optimización de costos en entrenamiento con TPUs

¿Tu empresa necesita ayuda implementando ML en producción? Trabajamos con equipos de todo LATAM.