DataSkills Hub

Python

Python es un lenguaje versátil para scripting, automatización y análisis de datos. Se usa en ETL pipelines (Airflow DAGs), análisis interactivo (JupyterHub) y servicios API (FastAPI).

#Getting Started

#Variables y asignación

nombre = "pipeline_ventas"   # str
registros = 150000            # int
ratio = 0.85                  # float
activo = True                 # bool

# Asignación múltiple
x, y, z = 1, 2, 3

# f-string (Python 3.6+)
print(f"Procesando {nombre}: {registros} registros")

Python no requiere declarar tipo, pero soporta type hints.

#Operadores aritméticos

Operador Descripción Ejemplo
+ Suma 10 + 313
- Resta 10 - 37
* Multiplicación 10 * 330
/ División (float) 10 / 33.33
// División entera 10 // 33
% Módulo 10 % 31
** Potencia 2 ** 101024

#Strings esenciales

tabla = "fact_ventas"

# Slicing
tabla[0:4]          # => 'fact'
tabla[-6:]          # => 'ventas'

# Métodos comunes
tabla.upper()       # => 'FACT_VENTAS'
tabla.split("_")    # => ['fact', 'ventas']
tabla.startswith("fact")  # => True
tabla.replace("fact", "dim")  # => 'dim_ventas'

# Join — construir queries
cols = ["id", "monto", "fecha"]
", ".join(cols)     # => 'id, monto, fecha'

#f-Strings y formateo

monto = 1234567.891
nombre = "DataCorp"

# Separador de miles
f"{monto:,.2f}"           # => '1,234,567.89'

# Padding y alineación
f"{'col':>15}"            # => '            col'
f"{'col':<15}"            # => 'col            '
f"{'col':^15}"            # => '      col      '

# Porcentaje
f"{0.8523:.1%}"           # => '85.2%'

# Expresiones dentro de f-string
f"Registros: {registros:,} en {nombre}"
# => 'Registros: 150,000 en DataCorp'

#Operadores de comparación y lógicos

# Comparación
x == y    # Igual
x != y    # Diferente
x > y     # Mayor
x >= y    # Mayor o igual

# Lógicos
x > 0 and y > 0
x > 0 or y > 0
not x

# Identidad y membresía
x is None
"ventas" in ["ventas", "clientes", "pagos"]

#Entrada por consola y casting

# Casting explícito
int("42")         # => 42
float("3.14")     # => 3.14
str(100)          # => '100'
bool(0)           # => False
bool("texto")     # => True

# Valores falsy en Python
# False, None, 0, 0.0, "", [], {}, set()

#Tipos de Datos

#Listas

# Crear
tablas = ["ventas", "clientes", "pagos"]
nums = list(range(1, 6))  # [1, 2, 3, 4, 5]

# CRUD
tablas.append("productos")
tablas.insert(0, "dim_fecha")
tablas.remove("pagos")
ultimo = tablas.pop()

# Slicing
tablas[1:3]       # sublista
tablas[::-1]       # invertir

# Ordenar
tablas.sort()
tablas.sort(key=len, reverse=True)

#Diccionarios

config = {
    "dag_id": "etl_ventas",
    "schedule": "0 6 * * *",
    "retries": 3
}

# Acceso
config["dag_id"]              # => 'etl_ventas'
config.get("timeout", 300)    # => 300 (default)

# Modificar
config["owner"] = "data_team"
config.update({"retries": 5, "pool": "default"})

# Iterar
for key, val in config.items():
    print(f"{key}: {val}")

# Destructuring
keys = config.keys()
vals = config.values()

#Tuplas y Sets

# Tupla — inmutable, útil para registros
registro = ("2024-01-15", "ventas", 1500.00)
fecha, tabla, monto = registro  # unpacking

# Tupla con nombre (mejor para datos)
from collections import namedtuple
Venta = namedtuple("Venta", ["fecha", "monto", "canal"])
v = Venta("2024-01-15", 1500.00, "web")
v.monto  # => 1500.00

# Set — valores únicos
fuentes = {"oracle", "postgres", "s3", "oracle"}
# => {'oracle', 'postgres', 's3'}

fuentes.add("bigquery")
fuentes.discard("oracle")

#Comprehensions

# List comprehension
cuadrados = [x**2 for x in range(10)]
pares = [x for x in range(20) if x % 2 == 0]

# Dict comprehension
col_types = {col: "VARCHAR" for col in ["id", "name"]}

# Set comprehension
unicos = {row["status"] for row in dataset}

# Filtrar tablas fact_
tablas = ["fact_ventas", "dim_fecha", "fact_pagos", "stg_raw"]
facts = [t for t in tablas if t.startswith("fact_")]
# => ['fact_ventas', 'fact_pagos']

# Aplanar lista de listas
nested = [[1, 2], [3, 4], [5]]
flat = [x for sub in nested for x in sub]
# => [1, 2, 3, 4, 5]

#Stacks y Queues

from collections import deque

# Cola (FIFO) — procesar tareas en orden
cola = deque(["task_1", "task_2", "task_3"])
cola.append("task_4")       # agregar al final
siguiente = cola.popleft()  # => 'task_1'

# Stack (LIFO) — deshacer operaciones
stack = deque()
stack.append("paso_1")
stack.append("paso_2")
ultimo = stack.pop()        # => 'paso_2'

#Type Hints

from typing import Optional

def procesar_registros(
    tabla: str,
    limite: int = 1000,
    filtro: Optional[str] = None
) -> list[dict]:
    """Procesa registros de una tabla."""
    ...

# Variables con tipo
dag_id: str = "etl_ventas"
registros: list[dict] = []
config: dict[str, any] = {"retries": 3}

# Union (Python 3.10+)
def get_valor(key: str) -> int | str | None:
    ...

#Funciones y Control de Flujo

#Funciones

# Básica con default
def extract_table(table: str, limit: int = 1000) -> list:
    print(f"Extrayendo {table}, límite: {limit}")
    return []

# *args y **kwargs
def log_event(*args, **kwargs):
    print(f"Args: {args}")
    print(f"Kwargs: {kwargs}")

log_event("error", code=500, msg="timeout")

# Lambda
ordenar = sorted(tablas, key=lambda t: len(t))

# Retorno múltiple
def validate(data):
    is_valid = len(data) > 0
    errors = [] if is_valid else ["vacío"]
    return is_valid, errors

ok, errs = validate([1, 2, 3])

#Decoradores

import functools
import time

# Decorator para medir tiempo de ejecución
def timer(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        elapsed = time.time() - start
        print(f"{func.__name__}: {elapsed:.2f}s")
        return result
    return wrapper

@timer
def extract_data(query: str):
    """Ejecuta query y retorna resultados."""
    time.sleep(1)  # simula latencia
    return [{"id": 1}]

extract_data("SELECT * FROM ventas")
# => extract_data: 1.00s

#if / elif / else

status_code = 200

if status_code == 200:
    print("OK")
elif status_code == 404:
    print("No encontrado")
elif status_code >= 500:
    print("Error del servidor")
else:
    print(f"Código: {status_code}")

# Ternario
env = "prod" if status_code == 200 else "dev"

# Match/case (Python 3.10+)
match status_code:
    case 200:
        print("OK")
    case 404:
        print("Not found")
    case _:
        print("Otro")

#Loops

# for con enumerate
tablas = ["ventas", "clientes", "pagos"]
for i, tabla in enumerate(tablas):
    print(f"{i}: procesando {tabla}")

# for con zip
nombres = ["col_a", "col_b", "col_c"]
tipos = ["INT", "VARCHAR", "DATE"]
for nombre, tipo in zip(nombres, tipos):
    print(f"{nombre} {tipo}")

# while con break
intentos = 0
while intentos < 5:
    intentos += 1
    if conexion_exitosa():
        break

# range
for i in range(0, 100, 10):
    print(i)  # 0, 10, 20, ..., 90

#Manejo de errores

import logging

logger = logging.getLogger(__name__)

try:
    resultado = ejecutar_query(sql)
except ConnectionError as e:
    logger.error(f"Conexión fallida: {e}")
    raise  # re-lanzar para que Airflow capture
except TimeoutError:
    logger.warning("Timeout, reintentando...")
except Exception as e:
    logger.exception(f"Error inesperado: {e}")
finally:
    conexion.close()

# Excepciones custom
class DataQualityError(Exception):
    """Error de calidad de datos en pipeline."""
    pass

if null_ratio > 0.5:
    raise DataQualityError(
        f"Ratio de nulos: {null_ratio:.0%}"
    )

#Generadores

# Generador — procesar sin cargar todo en memoria
def leer_en_bloques(archivo, chunk_size=1024):
    with open(archivo, "r") as f:
        while True:
            bloque = f.read(chunk_size)
            if not bloque:
                break
            yield bloque

# Uso en pipeline
for bloque in leer_en_bloques("datos.csv"):
    procesar(bloque)

# Generator expression
total = sum(row["monto"] for row in dataset)

#Manejo de Archivos y Datos

#Leer y escribir archivos

# Leer archivo completo
with open("config.txt", "r", encoding="utf-8") as f:
    contenido = f.read()

# Leer línea por línea (memoria eficiente)
with open("datos.csv", "r") as f:
    for linea in f:
        procesar(linea.strip())

# Escribir archivo
with open("output.txt", "w", encoding="utf-8") as f:
    f.write("línea 1\n")
    f.writelines(["línea 2\n", "línea 3\n"])

# Append
with open("log.txt", "a") as f:
    f.write(f"{datetime.now()}: evento\n")

#JSON

import json

# Leer JSON
with open("config.json", "r") as f:
    config = json.load(f)

# Escribir JSON
with open("output.json", "w") as f:
    json.dump(data, f, indent=2, ensure_ascii=False)

# String ↔ JSON
json_str = json.dumps({"dag": "ventas"})
obj = json.loads(json_str)

#CSV con csv y pandas

import csv
import pandas as pd

# csv estándar
with open("datos.csv", "r") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row["columna"])

# pandas — preferido en JupyterHub
df = pd.read_csv("datos.csv")
df = pd.read_csv("datos.csv", sep=";", encoding="latin-1")

# Escribir CSV
df.to_csv("output.csv", index=False)

#datetime — fechas en pipelines

from datetime import datetime, timedelta, date

# Fecha actual
hoy = date.today()              # 2026-03-14
ahora = datetime.now()          # con hora

# Parsing y formateo
dt = datetime.strptime("2026-03-14", "%Y-%m-%d")
dt.strftime("%d/%m/%Y")         # => '14/03/2026'

# Aritmética de fechas
ayer = hoy - timedelta(days=1)
hace_7d = hoy - timedelta(weeks=1)

# Timestamp Unix
ts = datetime.now().timestamp()
dt_from_ts = datetime.fromtimestamp(ts)

# Rango de fechas (útil para particiones)
inicio = date(2026, 1, 1)
fin = date(2026, 1, 31)
dias = (fin - inicio).days       # => 30

#pathlib — manejo de rutas

from pathlib import Path

# Rutas
base = Path("/data/raw")
archivo = base / "ventas" / "2026-03-14.csv"

# Verificar existencia
archivo.exists()
archivo.is_file()
base.is_dir()

# Crear directorios
Path("/data/output/ventas").mkdir(parents=True, exist_ok=True)

# Listar archivos
csvs = list(base.glob("**/*.csv"))
parquets = list(base.glob("*.parquet"))

# Leer/escribir
contenido = archivo.read_text(encoding="utf-8")
Path("out.txt").write_text("datos")

#os y subprocess

import os
import subprocess

# Variables de entorno
db_host = os.environ.get("DB_HOST", "<tu-host>")
os.environ["PYTHONPATH"] = "/app/src"

# Verificar archivos
os.path.exists("/data/raw/ventas.csv")
os.listdir("/data/raw/")

# Ejecutar comandos del sistema
result = subprocess.run(
    ["ls", "-la", "/data/raw"],
    capture_output=True,
    text=True,
    check=True
)
print(result.stdout)

#Módulos Esenciales

#Airflow — DAG básico

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data_team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="etl_ventas_diario",
    default_args=default_args,
    schedule="0 6 * * *",       # 6 AM diario
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["etl", "ventas"],
) as dag:

    def extract(**context):
        ds = context["ds"]  # fecha de ejecución
        print(f"Extrayendo datos para {ds}")

    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )

#minio — S3/Ceph

from minio import Minio

# Conectar a almacenamiento S3-compatible
client = Minio(
    os.environ["S3_ENDPOINT"],
    access_key=os.environ["S3_ACCESS_KEY"],
    secret_key=os.environ["S3_SECRET_KEY"],
    secure=True,
)

# Upload
client.fput_object("my-data-bucket", "raw/ventas.csv", "local.csv")

# Download
client.fget_object("my-data-bucket", "raw/ventas.csv", "local.csv")

# Listar objetos
objects = client.list_objects("my-data-bucket", prefix="raw/", recursive=True)
for obj in objects:
    print(obj.object_name, obj.size)

#pandas — análisis en JupyterHub

import pandas as pd

# Leer desde distintas fuentes
df = pd.read_csv("ventas.csv")
df = pd.read_parquet("ventas.parquet")
df = pd.read_sql(query, connection)

# Exploración rápida
df.shape              # (filas, columnas)
df.dtypes             # tipos de cada columna
df.describe()         # estadísticas
df.isnull().sum()     # nulos por columna

# Transformaciones comunes
df["fecha"] = pd.to_datetime(df["fecha"])
df["monto_usd"] = df["monto"] / 7.5
df_filtrado = df[df["monto"] > 1000]
df_agrupado = df.groupby("canal")["monto"].sum()

# Exportar
df.to_parquet("output.parquet", index=False)

#requests y FastAPI

# --- requests: consumir APIs ---
import requests

resp = requests.get(
    "https://api.example.com/v1/clientes",
    headers={"Authorization": f"Bearer {token}"},
    params={"limit": 100},
    timeout=30
)
resp.raise_for_status()
data = resp.json()

# --- FastAPI: crear APIs ---
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/health")
def health():
    return {"status": "ok"}

@app.get("/ventas/{fecha}")
def get_ventas(fecha: str, limit: int = 100):
    if not validar_fecha(fecha):
        raise HTTPException(400, "Fecha inválida")
    return {"fecha": fecha, "registros": []}

#logging — logging estructurado

import logging

# Configuración básica
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("etl_ventas")

# Uso en pipelines
logger.info("Inicio extracción")
logger.warning(f"Registros nulos: {null_count}")
logger.error(f"Fallo conexión: {e}")
logger.exception("Error con traceback completo")

# En Airflow, los logs van al task log automáticamente
# En JupyterHub, usar print() o logger para output visible

#Instalación de paquetes

# pip — gestor estándar
pip install pandas minio fastapi
pip install -r requirements.txt
pip freeze > requirements.txt

# Entorno virtual (recomendado)
python -m venv .venv
source .venv/bin/activate    # Linux/Mac
pip install -r requirements.txt

# Verificar versión
python --version
pip list | grep pandas

#Also see