Data Team MICPython es un lenguaje versátil para scripting, automatización y análisis de datos. Se usa en ETL pipelines (Airflow DAGs), análisis interactivo (JupyterHub) y servicios API (FastAPI).
nombre = "pipeline_ventas" # str
registros = 150000 # int
ratio = 0.85 # float
activo = True # bool
# Asignación múltiple
x, y, z = 1, 2, 3
# f-string (Python 3.6+)
print(f"Procesando {nombre}: {registros} registros")
Python no requiere declarar tipo, pero soporta type hints.
| Operador | Descripción | Ejemplo |
|---|---|---|
+ |
Suma | 10 + 3 → 13 |
- |
Resta | 10 - 3 → 7 |
* |
Multiplicación | 10 * 3 → 30 |
/ |
División (float) | 10 / 3 → 3.33 |
// |
División entera | 10 // 3 → 3 |
% |
Módulo | 10 % 3 → 1 |
** |
Potencia | 2 ** 10 → 1024 |
tabla = "fact_ventas"
# Slicing
tabla[0:4] # => 'fact'
tabla[-6:] # => 'ventas'
# Métodos comunes
tabla.upper() # => 'FACT_VENTAS'
tabla.split("_") # => ['fact', 'ventas']
tabla.startswith("fact") # => True
tabla.replace("fact", "dim") # => 'dim_ventas'
# Join — construir queries
cols = ["id", "monto", "fecha"]
", ".join(cols) # => 'id, monto, fecha'
monto = 1234567.891
nombre = "DataCorp"
# Separador de miles
f"{monto:,.2f}" # => '1,234,567.89'
# Padding y alineación
f"{'col':>15}" # => ' col'
f"{'col':<15}" # => 'col '
f"{'col':^15}" # => ' col '
# Porcentaje
f"{0.8523:.1%}" # => '85.2%'
# Expresiones dentro de f-string
f"Registros: {registros:,} en {nombre}"
# => 'Registros: 150,000 en DataCorp'
# Comparación
x == y # Igual
x != y # Diferente
x > y # Mayor
x >= y # Mayor o igual
# Lógicos
x > 0 and y > 0
x > 0 or y > 0
not x
# Identidad y membresía
x is None
"ventas" in ["ventas", "clientes", "pagos"]
# Casting explícito
int("42") # => 42
float("3.14") # => 3.14
str(100) # => '100'
bool(0) # => False
bool("texto") # => True
# Valores falsy en Python
# False, None, 0, 0.0, "", [], {}, set()
# Crear
tablas = ["ventas", "clientes", "pagos"]
nums = list(range(1, 6)) # [1, 2, 3, 4, 5]
# CRUD
tablas.append("productos")
tablas.insert(0, "dim_fecha")
tablas.remove("pagos")
ultimo = tablas.pop()
# Slicing
tablas[1:3] # sublista
tablas[::-1] # invertir
# Ordenar
tablas.sort()
tablas.sort(key=len, reverse=True)
config = {
"dag_id": "etl_ventas",
"schedule": "0 6 * * *",
"retries": 3
}
# Acceso
config["dag_id"] # => 'etl_ventas'
config.get("timeout", 300) # => 300 (default)
# Modificar
config["owner"] = "data_team"
config.update({"retries": 5, "pool": "default"})
# Iterar
for key, val in config.items():
print(f"{key}: {val}")
# Destructuring
keys = config.keys()
vals = config.values()
# Tupla — inmutable, útil para registros
registro = ("2024-01-15", "ventas", 1500.00)
fecha, tabla, monto = registro # unpacking
# Tupla con nombre (mejor para datos)
from collections import namedtuple
Venta = namedtuple("Venta", ["fecha", "monto", "canal"])
v = Venta("2024-01-15", 1500.00, "web")
v.monto # => 1500.00
# Set — valores únicos
fuentes = {"oracle", "postgres", "s3", "oracle"}
# => {'oracle', 'postgres', 's3'}
fuentes.add("bigquery")
fuentes.discard("oracle")
# List comprehension
cuadrados = [x**2 for x in range(10)]
pares = [x for x in range(20) if x % 2 == 0]
# Dict comprehension
col_types = {col: "VARCHAR" for col in ["id", "name"]}
# Set comprehension
unicos = {row["status"] for row in dataset}
# Filtrar tablas fact_
tablas = ["fact_ventas", "dim_fecha", "fact_pagos", "stg_raw"]
facts = [t for t in tablas if t.startswith("fact_")]
# => ['fact_ventas', 'fact_pagos']
# Aplanar lista de listas
nested = [[1, 2], [3, 4], [5]]
flat = [x for sub in nested for x in sub]
# => [1, 2, 3, 4, 5]
from collections import deque
# Cola (FIFO) — procesar tareas en orden
cola = deque(["task_1", "task_2", "task_3"])
cola.append("task_4") # agregar al final
siguiente = cola.popleft() # => 'task_1'
# Stack (LIFO) — deshacer operaciones
stack = deque()
stack.append("paso_1")
stack.append("paso_2")
ultimo = stack.pop() # => 'paso_2'
from typing import Optional
def procesar_registros(
tabla: str,
limite: int = 1000,
filtro: Optional[str] = None
) -> list[dict]:
"""Procesa registros de una tabla."""
...
# Variables con tipo
dag_id: str = "etl_ventas"
registros: list[dict] = []
config: dict[str, any] = {"retries": 3}
# Union (Python 3.10+)
def get_valor(key: str) -> int | str | None:
...
# Básica con default
def extract_table(table: str, limit: int = 1000) -> list:
print(f"Extrayendo {table}, límite: {limit}")
return []
# *args y **kwargs
def log_event(*args, **kwargs):
print(f"Args: {args}")
print(f"Kwargs: {kwargs}")
log_event("error", code=500, msg="timeout")
# Lambda
ordenar = sorted(tablas, key=lambda t: len(t))
# Retorno múltiple
def validate(data):
is_valid = len(data) > 0
errors = [] if is_valid else ["vacío"]
return is_valid, errors
ok, errs = validate([1, 2, 3])
import functools
import time
# Decorator para medir tiempo de ejecución
def timer(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start
print(f"{func.__name__}: {elapsed:.2f}s")
return result
return wrapper
@timer
def extract_data(query: str):
"""Ejecuta query y retorna resultados."""
time.sleep(1) # simula latencia
return [{"id": 1}]
extract_data("SELECT * FROM ventas")
# => extract_data: 1.00s
status_code = 200
if status_code == 200:
print("OK")
elif status_code == 404:
print("No encontrado")
elif status_code >= 500:
print("Error del servidor")
else:
print(f"Código: {status_code}")
# Ternario
env = "prod" if status_code == 200 else "dev"
# Match/case (Python 3.10+)
match status_code:
case 200:
print("OK")
case 404:
print("Not found")
case _:
print("Otro")
# for con enumerate
tablas = ["ventas", "clientes", "pagos"]
for i, tabla in enumerate(tablas):
print(f"{i}: procesando {tabla}")
# for con zip
nombres = ["col_a", "col_b", "col_c"]
tipos = ["INT", "VARCHAR", "DATE"]
for nombre, tipo in zip(nombres, tipos):
print(f"{nombre} {tipo}")
# while con break
intentos = 0
while intentos < 5:
intentos += 1
if conexion_exitosa():
break
# range
for i in range(0, 100, 10):
print(i) # 0, 10, 20, ..., 90
import logging
logger = logging.getLogger(__name__)
try:
resultado = ejecutar_query(sql)
except ConnectionError as e:
logger.error(f"Conexión fallida: {e}")
raise # re-lanzar para que Airflow capture
except TimeoutError:
logger.warning("Timeout, reintentando...")
except Exception as e:
logger.exception(f"Error inesperado: {e}")
finally:
conexion.close()
# Excepciones custom
class DataQualityError(Exception):
"""Error de calidad de datos en pipeline."""
pass
if null_ratio > 0.5:
raise DataQualityError(
f"Ratio de nulos: {null_ratio:.0%}"
)
# Generador — procesar sin cargar todo en memoria
def leer_en_bloques(archivo, chunk_size=1024):
with open(archivo, "r") as f:
while True:
bloque = f.read(chunk_size)
if not bloque:
break
yield bloque
# Uso en pipeline
for bloque in leer_en_bloques("datos.csv"):
procesar(bloque)
# Generator expression
total = sum(row["monto"] for row in dataset)
# Leer archivo completo
with open("config.txt", "r", encoding="utf-8") as f:
contenido = f.read()
# Leer línea por línea (memoria eficiente)
with open("datos.csv", "r") as f:
for linea in f:
procesar(linea.strip())
# Escribir archivo
with open("output.txt", "w", encoding="utf-8") as f:
f.write("línea 1\n")
f.writelines(["línea 2\n", "línea 3\n"])
# Append
with open("log.txt", "a") as f:
f.write(f"{datetime.now()}: evento\n")
import json
# Leer JSON
with open("config.json", "r") as f:
config = json.load(f)
# Escribir JSON
with open("output.json", "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# String ↔ JSON
json_str = json.dumps({"dag": "ventas"})
obj = json.loads(json_str)
import csv
import pandas as pd
# csv estándar
with open("datos.csv", "r") as f:
reader = csv.DictReader(f)
for row in reader:
print(row["columna"])
# pandas — preferido en JupyterHub
df = pd.read_csv("datos.csv")
df = pd.read_csv("datos.csv", sep=";", encoding="latin-1")
# Escribir CSV
df.to_csv("output.csv", index=False)
from datetime import datetime, timedelta, date
# Fecha actual
hoy = date.today() # 2026-03-14
ahora = datetime.now() # con hora
# Parsing y formateo
dt = datetime.strptime("2026-03-14", "%Y-%m-%d")
dt.strftime("%d/%m/%Y") # => '14/03/2026'
# Aritmética de fechas
ayer = hoy - timedelta(days=1)
hace_7d = hoy - timedelta(weeks=1)
# Timestamp Unix
ts = datetime.now().timestamp()
dt_from_ts = datetime.fromtimestamp(ts)
# Rango de fechas (útil para particiones)
inicio = date(2026, 1, 1)
fin = date(2026, 1, 31)
dias = (fin - inicio).days # => 30
from pathlib import Path
# Rutas
base = Path("/data/raw")
archivo = base / "ventas" / "2026-03-14.csv"
# Verificar existencia
archivo.exists()
archivo.is_file()
base.is_dir()
# Crear directorios
Path("/data/output/ventas").mkdir(parents=True, exist_ok=True)
# Listar archivos
csvs = list(base.glob("**/*.csv"))
parquets = list(base.glob("*.parquet"))
# Leer/escribir
contenido = archivo.read_text(encoding="utf-8")
Path("out.txt").write_text("datos")
import os
import subprocess
# Variables de entorno
db_host = os.environ.get("DB_HOST", "<tu-host>")
os.environ["PYTHONPATH"] = "/app/src"
# Verificar archivos
os.path.exists("/data/raw/ventas.csv")
os.listdir("/data/raw/")
# Ejecutar comandos del sistema
result = subprocess.run(
["ls", "-la", "/data/raw"],
capture_output=True,
text=True,
check=True
)
print(result.stdout)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data_team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="etl_ventas_diario",
default_args=default_args,
schedule="0 6 * * *", # 6 AM diario
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl", "ventas"],
) as dag:
def extract(**context):
ds = context["ds"] # fecha de ejecución
print(f"Extrayendo datos para {ds}")
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
from minio import Minio
# Conectar a almacenamiento S3-compatible
client = Minio(
os.environ["S3_ENDPOINT"],
access_key=os.environ["S3_ACCESS_KEY"],
secret_key=os.environ["S3_SECRET_KEY"],
secure=True,
)
# Upload
client.fput_object("my-data-bucket", "raw/ventas.csv", "local.csv")
# Download
client.fget_object("my-data-bucket", "raw/ventas.csv", "local.csv")
# Listar objetos
objects = client.list_objects("my-data-bucket", prefix="raw/", recursive=True)
for obj in objects:
print(obj.object_name, obj.size)
import pandas as pd
# Leer desde distintas fuentes
df = pd.read_csv("ventas.csv")
df = pd.read_parquet("ventas.parquet")
df = pd.read_sql(query, connection)
# Exploración rápida
df.shape # (filas, columnas)
df.dtypes # tipos de cada columna
df.describe() # estadísticas
df.isnull().sum() # nulos por columna
# Transformaciones comunes
df["fecha"] = pd.to_datetime(df["fecha"])
df["monto_usd"] = df["monto"] / 7.5
df_filtrado = df[df["monto"] > 1000]
df_agrupado = df.groupby("canal")["monto"].sum()
# Exportar
df.to_parquet("output.parquet", index=False)
# --- requests: consumir APIs ---
import requests
resp = requests.get(
"https://api.example.com/v1/clientes",
headers={"Authorization": f"Bearer {token}"},
params={"limit": 100},
timeout=30
)
resp.raise_for_status()
data = resp.json()
# --- FastAPI: crear APIs ---
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/health")
def health():
return {"status": "ok"}
@app.get("/ventas/{fecha}")
def get_ventas(fecha: str, limit: int = 100):
if not validar_fecha(fecha):
raise HTTPException(400, "Fecha inválida")
return {"fecha": fecha, "registros": []}
import logging
# Configuración básica
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("etl_ventas")
# Uso en pipelines
logger.info("Inicio extracción")
logger.warning(f"Registros nulos: {null_count}")
logger.error(f"Fallo conexión: {e}")
logger.exception("Error con traceback completo")
# En Airflow, los logs van al task log automáticamente
# En JupyterHub, usar print() o logger para output visible
# pip — gestor estándar
pip install pandas minio fastapi
pip install -r requirements.txt
pip freeze > requirements.txt
# Entorno virtual (recomendado)
python -m venv .venv
source .venv/bin/activate # Linux/Mac
pip install -r requirements.txt
# Verificar versión
python --version
pip list | grep pandas