Skip to content

Ejemplos en Python

Esta sección proporciona ejemplos completos de integración con la API de TSALVA usando Python. Los ejemplos incluyen manejo de autenticación, operaciones principales, webhooks y mejores prácticas.

📍 Importante: En todos los ejemplos de este documento, reemplaza [URL_API] con la URL base proporcionada por el área de TI de RobPixels. Para obtenerla, contacta: gerencia@robpixels.com o soporte@robpixels.com

Instalación de Dependencias

bash
pip install requests flask

Cliente API Base

python
import requests
import base64
import json
from datetime import datetime
from typing import Dict, Any, Optional

class TSalvaAPI:
    """Cliente para la API de TSALVA"""
    
    def __init__(self, username: str, password: str, base_url: str = "[URL_API]"):
        self.username = username
        self.password = password
        self.base_url = base_url
        self.session = requests.Session()
        
        # Configurar autenticación
        credentials = base64.b64encode(f'{username}:{password}'.encode()).decode()
        self.session.headers.update({
            'Authorization': f'Basic {credentials}',
            'Content-Type': 'application/json'
        })
    
    def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
        """Realizar petición HTTP con manejo de errores"""
        url = f"{self.base_url}{endpoint}"
        
        try:
            if method.upper() == 'GET':
                response = self.session.get(url)
            elif method.upper() == 'POST':
                response = self.session.post(url, json=data)
            else:
                raise ValueError(f"Método HTTP no soportado: {method}")
            
            response.raise_for_status()
            return response.json()
            
        except requests.exceptions.HTTPError as e:
            error_data = response.json() if response.content else {}
            raise APIError(f"Error HTTP {response.status_code}: {error_data.get('message', str(e))}")
        except requests.exceptions.RequestException as e:
            raise APIError(f"Error de conexión: {str(e)}")
    
    def crear_oferta(self, origen: Dict[str, Any], destino: Dict[str, Any], **kwargs) -> Dict[str, Any]:
        """Crear una nueva oferta de servicio"""
        data = {
            "origen": origen,
            "destino": destino,
            **kwargs
        }
        return self._make_request('POST', '/api/v2/oferta', data)
    
    def asignar_conductor(self, oferta_id: str, conductor_id: str) -> Dict[str, Any]:
        """Asignar un conductor específico a una oferta"""
        data = {
            "oferta_id": oferta_id,
            "conductor_id": conductor_id
        }
        return self._make_request('POST', '/api/v2/asignacion', data)
    
    def cancelar_servicio(self, oferta_id: str, motivo: str) -> Dict[str, Any]:
        """Cancelar un servicio"""
        data = {
            "oferta_id": oferta_id,
            "motivo": motivo
        }
        return self._make_request('POST', '/api/v2/cancelacion', data)
    
    def obtener_tipos(self) -> Dict[str, Any]:
        """Obtener tipos de vehículos y servicios disponibles"""
        return self._make_request('GET', '/api/v2/types')
    
    def obtener_historial(self, filtros: Optional[Dict] = None) -> Dict[str, Any]:
        """Obtener historial de operaciones"""
        data = filtros or {}
        return self._make_request('POST', '/api/v2/queries/history', data)


class APIError(Exception):
    """Excepción personalizada para errores de API"""
    pass

Ejemplo Completo de Integración

python
#!/usr/bin/env python3
"""
Ejemplo completo de integración con la API de TSALVA
"""

import logging
from datetime import datetime, timedelta

# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TSalvaIntegration:
    """Clase principal para integración con TSALVA"""
    
    def __init__(self, username: str, password: str):
        self.api = TSalvaAPI(username, password)
        self.tipos_vehiculos = None
    
    def inicializar(self):
        """Inicializar la integración obteniendo configuraciones"""
        try:
            tipos_response = self.api.obtener_tipos()
            self.tipos_vehiculos = tipos_response.get('data', {}).get('vehicle_types', [])
            logger.info(f"Inicialización exitosa. {len(self.tipos_vehiculos)} tipos de vehículos disponibles")
        except APIError as e:
            logger.error(f"Error en inicialización: {e}")
            raise
    
    def solicitar_servicio(self, origen: Dict[str, Any], destino: Dict[str, Any], 
                          tipo_vehiculo: int = 1, prioridad: str = "normal") -> str:
        """
        Solicitar un nuevo servicio
        
        Args:
            origen: Dirección de origen con lat, lng, address
            destino: Dirección de destino con lat, lng, address
            tipo_vehiculo: ID del tipo de vehículo (default: 1 - Automóvil)
            prioridad: Nivel de prioridad (low, normal, high, urgent)
        
        Returns:
            ID de la oferta creada
        """
        try:
            # Validar tipo de vehículo
            if not self._validar_tipo_vehiculo(tipo_vehiculo):
                raise ValueError(f"Tipo de vehículo inválido: {tipo_vehiculo}")
            
            # Crear oferta
            oferta_data = {
                "vehicle_type": tipo_vehiculo,
                "priority": prioridad,
                "callback_url": "https://tu-dominio.com/webhook/tsalva",
                "notes": f"Servicio solicitado el {datetime.now().isoformat()}"
            }
            
            response = self.api.crear_oferta(origen, destino, **oferta_data)
            
            if response.get('status') == 'success':
                oferta_id = response['data']['oferta_id']
                logger.info(f"Servicio creado exitosamente: {oferta_id}")
                return oferta_id
            else:
                raise APIError(f"Error al crear servicio: {response.get('message')}")
                
        except Exception as e:
            logger.error(f"Error al solicitar servicio: {e}")
            raise
    
    def _validar_tipo_vehiculo(self, tipo_id: int) -> bool:
        """Validar que el tipo de vehículo existe"""
        if not self.tipos_vehiculos:
            return False
        return any(v['id'] == tipo_id for v in self.tipos_vehiculos)
    
    def consultar_historial_reciente(self, dias: int = 7) -> List[Dict]:
        """Consultar historial de los últimos días"""
        try:
            fecha_inicio = (datetime.now() - timedelta(days=dias)).isoformat()
            fecha_fin = datetime.now().isoformat()
            
            filtros = {
                "fecha_inicio": fecha_inicio,
                "fecha_fin": fecha_fin,
                "limit": 50
            }
            
            response = self.api.obtener_historial(filtros)
            
            if response.get('status') == 'success':
                operaciones = response['data']['operations']
                logger.info(f"Obtenidas {len(operaciones)} operaciones de los últimos {dias} días")
                return operaciones
            else:
                raise APIError(f"Error al obtener historial: {response.get('message')}")
                
        except Exception as e:
            logger.error(f"Error al consultar historial: {e}")
            raise
    
    def generar_reporte_diario(self) -> Dict[str, Any]:
        """Generar reporte de actividad diaria"""
        try:
            operaciones = self.consultar_historial_reciente(1)
            
            reporte = {
                "fecha": datetime.now().date().isoformat(),
                "total_operaciones": len(operaciones),
                "por_estado": {},
                "por_tipo_vehiculo": {},
                "tiempo_promedio_asignacion": 0
            }
            
            # Agrupar por estado
            for op in operaciones:
                estado = op.get('state', 'unknown')
                reporte["por_estado"][estado] = reporte["por_estado"].get(estado, 0) + 1
            
            # Agrupar por tipo de vehículo
            for op in operaciones:
                tipo = op.get('vehicle_type', 'unknown')
                reporte["por_tipo_vehiculo"][tipo] = reporte["por_tipo_vehiculo"].get(tipo, 0) + 1
            
            # Calcular tiempo promedio de asignación
            tiempos_asignacion = []
            for op in operaciones:
                if op.get('assigned_at') and op.get('created_at'):
                    created = datetime.fromisoformat(op['created_at'].replace('Z', '+00:00'))
                    assigned = datetime.fromisoformat(op['assigned_at'].replace('Z', '+00:00'))
                    tiempo = (assigned - created).total_seconds()
                    tiempos_asignacion.append(tiempo)
            
            if tiempos_asignacion:
                reporte["tiempo_promedio_asignacion"] = sum(tiempos_asignacion) / len(tiempos_asignacion)
            
            return reporte
            
        except Exception as e:
            logger.error(f"Error al generar reporte: {e}")
            raise


def ejemplo_uso_basico():
    """Ejemplo de uso básico de la integración"""
    
    # Inicializar cliente
    integration = TSalvaIntegration('tu_usuario', 'tu_password')
    integration.inicializar()
    
    # Definir direcciones
    origen = {
        "lat": -34.6037,
        "lng": -58.3816,
        "address": "Av. Corrientes 1000, Buenos Aires"
    }
    
    destino = {
        "lat": -34.6158,
        "lng": -58.3731,
        "address": "Av. Santa Fe 2000, Buenos Aires"
    }
    
    try:
        # Solicitar servicio
        oferta_id = integration.solicitar_servicio(
            origen=origen,
            destino=destino,
            tipo_vehiculo=1,  # Automóvil
            prioridad="normal"
        )
        
        print(f"Servicio solicitado exitosamente: {oferta_id}")
        
        # Generar reporte diario
        reporte = integration.generar_reporte_diario()
        print(f"Reporte diario: {json.dumps(reporte, indent=2)}")
        
    except APIError as e:
        print(f"Error de API: {e}")
    except Exception as e:
        print(f"Error general: {e}")


# Webhook Handler con Flask
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/webhook/tsalva', methods=['POST'])
def webhook_tsalva():
    """Manejar webhooks de TSALVA"""
    try:
        # Obtener datos del webhook
        webhook_data = request.get_json()
        
        if not webhook_data:
            return jsonify({"error": "No data received"}), 400
        
        # Procesar según el tipo de evento
        event_type = webhook_data.get('event')
        oferta_id = webhook_data.get('oferta_id')
        
        logger.info(f"Webhook recibido - Evento: {event_type}, Oferta: {oferta_id}")
        
        if event_type == 'state_change':
            procesar_cambio_estado(webhook_data)
        elif event_type == 'assignment':
            procesar_asignacion(webhook_data)
        elif event_type == 'completion':
            procesar_finalizacion(webhook_data)
        else:
            logger.warning(f"Tipo de evento desconocido: {event_type}")
        
        return jsonify({"status": "success", "message": "Webhook procesado"}), 200
        
    except Exception as e:
        logger.error(f"Error procesando webhook: {e}")
        return jsonify({"error": "Internal server error"}), 500


def procesar_cambio_estado(data: Dict[str, Any]):
    """Procesar cambio de estado de una oferta"""
    oferta_id = data.get('oferta_id')
    estado_anterior = data.get('previous_state')
    estado_nuevo = data.get('new_state')
    timestamp = data.get('timestamp')
    
    logger.info(f"Oferta {oferta_id}: {estado_anterior} -> {estado_nuevo} at {timestamp}")
    
    # Aquí puedes agregar lógica específica según el estado
    if estado_nuevo == 'assigned':
        conductor_info = data.get('conductor', {})
        logger.info(f"Conductor asignado: {conductor_info.get('nombre')} - {conductor_info.get('telefono')}")
        
        # Notificar al cliente
        notificar_cliente_asignacion(oferta_id, conductor_info)
        
    elif estado_nuevo == 'completed':
        logger.info(f"Servicio {oferta_id} completado exitosamente")
        
        # Procesar facturación
        procesar_facturacion(data)
        
    elif estado_nuevo == 'cancelled':
        motivo = data.get('cancellation_reason', 'No especificado')
        logger.warning(f"Servicio {oferta_id} cancelado: {motivo}")
        
        # Notificar cancelación
        notificar_cliente_cancelacion(oferta_id, motivo)


def procesar_asignacion(data: Dict[str, Any]):
    """Procesar asignación de conductor"""
    oferta_id = data.get('oferta_id')
    conductor = data.get('conductor', {})
    vehiculo = data.get('vehiculo', {})
    eta = data.get('eta_minutes')
    
    logger.info(f"Asignación confirmada para oferta {oferta_id}")
    logger.info(f"Conductor: {conductor.get('nombre')} - Vehículo: {vehiculo.get('placa')}")
    logger.info(f"ETA: {eta} minutos")
    
    # Actualizar base de datos local
    actualizar_estado_local(oferta_id, {
        'estado': 'assigned',
        'conductor_id': conductor.get('id'),
        'conductor_nombre': conductor.get('nombre'),
        'conductor_telefono': conductor.get('telefono'),
        'vehiculo_placa': vehiculo.get('placa'),
        'eta_minutes': eta,
        'assigned_at': datetime.now().isoformat()
    })


def procesar_finalizacion(data: Dict[str, Any]):
    """Procesar finalización de servicio"""
    oferta_id = data.get('oferta_id')
    costo_total = data.get('total_cost')
    distancia_km = data.get('distance_km')
    duracion_minutos = data.get('duration_minutes')
    
    logger.info(f"Servicio {oferta_id} finalizado")
    logger.info(f"Costo: ${costo_total}, Distancia: {distancia_km}km, Duración: {duracion_minutos}min")
    
    # Actualizar base de datos y generar factura
    actualizar_estado_local(oferta_id, {
        'estado': 'completed',
        'costo_total': costo_total,
        'distancia_km': distancia_km,
        'duracion_minutos': duracion_minutos,
        'completed_at': datetime.now().isoformat()
    })


# Funciones auxiliares (implementar según tu lógica de negocio)
def notificar_cliente_asignacion(oferta_id: str, conductor_info: Dict):
    """Notificar al cliente sobre la asignación"""
    # Implementar notificación (email, SMS, push notification, etc.)
    pass

def notificar_cliente_cancelacion(oferta_id: str, motivo: str):
    """Notificar al cliente sobre la cancelación"""
    # Implementar notificación de cancelación
    pass

def procesar_facturacion(data: Dict[str, Any]):
    """Procesar datos de facturación"""
    # Implementar lógica de facturación
    pass

def actualizar_estado_local(oferta_id: str, datos: Dict[str, Any]):
    """Actualizar estado en base de datos local"""
    # Implementar actualización de base de datos
    pass


# Script principal
if __name__ == "__main__":
    import sys
    
    if len(sys.argv) > 1 and sys.argv[1] == 'webhook':
        # Ejecutar servidor de webhooks
        app.run(host='0.0.0.0', port=5000, debug=False)
    else:
        # Ejecutar ejemplo de uso
        ejemplo_uso_basico()

Ejemplo de Monitoreo y Métricas

python
import time
from collections import defaultdict
from threading import Thread
import sqlite3

class TSalvaMonitor:
    """Monitor para métricas y estadísticas de la API"""
    
    def __init__(self, api_client: TSalvaAPI):
        self.api = api_client
        self.metricas = defaultdict(int)
        self.db_path = 'tsalva_metrics.db'
        self._init_db()
    
    def _init_db(self):
        """Inicializar base de datos de métricas"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS metricas (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                evento TEXT NOT NULL,
                oferta_id TEXT,
                datos JSON
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def registrar_evento(self, evento: str, oferta_id: str = None, datos: Dict = None):
        """Registrar evento en la base de datos"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO metricas (evento, oferta_id, datos)
            VALUES (?, ?, ?)
        ''', (evento, oferta_id, json.dumps(datos) if datos else None))
        
        conn.commit()
        conn.close()
        
        self.metricas[evento] += 1
    
    def obtener_metricas_periodo(self, dias: int = 1) -> Dict[str, Any]:
        """Obtener métricas de un período específico"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT evento, COUNT(*) 
            FROM metricas 
            WHERE timestamp >= datetime('now', '-{} days')
            GROUP BY evento
        '''.format(dias))
        
        metricas = dict(cursor.fetchall())
        conn.close()
        
        return metricas
    
    def monitor_continuo(self, intervalo_segundos: int = 300):
        """Monitoreo continuo de la API"""
        def _monitor():
            while True:
                try:
                    # Verificar salud de la API
                    start_time = time.time()
                    tipos = self.api.obtener_tipos()
                    response_time = time.time() - start_time
                    
                    if tipos.get('status') == 'success':
                        self.registrar_evento('api_health_check_success', 
                                            datos={'response_time': response_time})
                    else:
                        self.registrar_evento('api_health_check_failed',
                                            datos={'response': tipos})
                    
                    # Obtener métricas recientes
                    metricas = self.obtener_metricas_periodo(1)
                    print(f"Métricas últimas 24h: {metricas}")
                    
                except Exception as e:
                    self.registrar_evento('monitor_error', datos={'error': str(e)})
                    print(f"Error en monitoreo: {e}")
                
                time.sleep(intervalo_segundos)
        
        # Ejecutar en thread separado
        monitor_thread = Thread(target=_monitor, daemon=True)
        monitor_thread.start()
        return monitor_thread


# Ejemplo de uso del monitor
def ejemplo_monitoreo():
    """Ejemplo de uso del sistema de monitoreo"""
    
    # Inicializar API y monitor
    api = TSalvaAPI('tu_usuario', 'tu_password')
    monitor = TSalvaMonitor(api)
    
    # Iniciar monitoreo continuo
    monitor_thread = monitor.monitor_continuo(300)  # Cada 5 minutos
    
    # Simular algunos eventos
    try:
        # Crear servicio y registrar evento
        origen = {"lat": -34.6037, "lng": -58.3816, "address": "Origen"}
        destino = {"lat": -34.6158, "lng": -58.3731, "address": "Destino"}
        
        response = api.crear_oferta(origen, destino, vehicle_type=1)
        
        if response.get('status') == 'success':
            oferta_id = response['data']['oferta_id']
            monitor.registrar_evento('oferta_creada', oferta_id, 
                                   {'vehicle_type': 1, 'priority': 'normal'})
        
        # Obtener y mostrar métricas
        metricas = monitor.obtener_metricas_periodo(7)
        print(f"Métricas de la semana: {metricas}")
        
    except Exception as e:
        monitor.registrar_evento('error_general', datos={'error': str(e)})
        print(f"Error: {e}")
    
    # Mantener el script corriendo
    try:
        while True:
            time.sleep(60)
    except KeyboardInterrupt:
        print("Deteniendo monitoreo...")


if __name__ == "__main__":
    ejemplo_monitoreo()

Mejores Prácticas

1. Manejo de Errores Robusto

python
import exponential_backoff

def hacer_peticion_con_retry(func, max_retries=3, backoff_factor=2):
    """Ejecutar función con reintentos exponenciales"""
    for intento in range(max_retries):
        try:
            return func()
        except APIError as e:
            if intento == max_retries - 1:
                raise
            
            if e.status_code >= 500:  # Error del servidor
                delay = backoff_factor ** intento
                time.sleep(delay)
            else:
                raise  # Error del cliente, no reintentar

2. Pool de Conexiones

python
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def crear_session_optimizada():
    """Crear sesión HTTP optimizada"""
    session = requests.Session()
    
    # Configurar reintentos
    retry_strategy = Retry(
        total=3,
        status_forcelist=[429, 500, 502, 503, 504],
        method_whitelist=["HEAD", "GET", "POST"]
    )
    
    adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=20, pool_maxsize=20)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    return session

3. Validación de Datos

python
from typing import Union
import re

def validar_coordenadas(lat: float, lng: float) -> bool:
    """Validar coordenadas geográficas"""
    return -90 <= lat <= 90 and -180 <= lng <= 180

def validar_telefono(telefono: str) -> bool:
    """Validar formato de teléfono"""
    pattern = r'^\+?[1-9]\d{1,14}$'
    return re.match(pattern, telefono) is not None

def validar_direccion(direccion: Dict[str, Any]) -> List[str]:
    """Validar estructura de dirección"""
    errores = []
    
    campos_requeridos = ['lat', 'lng', 'address']
    for campo in campos_requeridos:
        if campo not in direccion:
            errores.append(f"Campo requerido faltante: {campo}")
    
    if 'lat' in direccion and 'lng' in direccion:
        if not validar_coordenadas(direccion['lat'], direccion['lng']):
            errores.append("Coordenadas inválidas")
    
    if 'address' in direccion and len(direccion['address'].strip()) < 10:
        errores.append("Dirección muy corta")
    
    return errores

Testing

python
import unittest
from unittest.mock import Mock, patch

class TestTSalvaAPI(unittest.TestCase):
    """Tests para el cliente de la API TSALVA"""
    
    def setUp(self):
        self.api = TSalvaAPI('test_user', 'test_pass')
    
    @patch('requests.Session.post')
    def test_crear_oferta_exitosa(self, mock_post):
        """Test creación exitosa de oferta"""
        # Mock de respuesta exitosa
        mock_response = Mock()
        mock_response.json.return_value = {
            'status': 'success',
            'data': {'oferta_id': 'test-123'}
        }
        mock_response.raise_for_status.return_value = None
        mock_post.return_value = mock_response
        
        # Ejecutar
        origen = {"lat": -34.6037, "lng": -58.3816, "address": "Test"}
        destino = {"lat": -34.6158, "lng": -58.3731, "address": "Test"}
        
        result = self.api.crear_oferta(origen, destino)
        
        # Verificar
        self.assertEqual(result['status'], 'success')
        self.assertEqual(result['data']['oferta_id'], 'test-123')
        mock_post.assert_called_once()
    
    @patch('requests.Session.post')
    def test_crear_oferta_error(self, mock_post):
        """Test manejo de errores en creación de oferta"""
        # Mock de error HTTP
        mock_response = Mock()
        mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError()
        mock_response.status_code = 400
        mock_response.json.return_value = {'message': 'Error de validación'}
        mock_response.content = True
        mock_post.return_value = mock_response
        
        # Ejecutar y verificar excepción
        with self.assertRaises(APIError):
            self.api.crear_oferta({}, {})

if __name__ == '__main__':
    unittest.main()

Referencias

Recursos Adicionales

Tsalva API - Documentación desarrollada por RobPixels