import sqlite3 as sql import os from datetime import datetime, timezone from typing import Any from config import settings from logger import get_logger logger = get_logger(__name__) def connect_to_db() -> sql.Connection: DB: str = settings.db_path or os.path.join( os.path.dirname(os.path.abspath(__file__)), "..", "..", "data", "three60.db" ) conn: sql.Connection = sql.connect(DB, timeout=10) conn.execute("PRAGMA journal_mode=WAL") return conn def convert_timestamps(row: dict) -> dict: for key in ('created', 'updated'): if row.get(key) is not None: row[key] = datetime.fromtimestamp(row[key], tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') return row # TODO: Break out into separate modules per thing def get_cellsites(conn: sql.Connection) -> list[dict[Any, Any]]: conn.row_factory = sql.Row cur: sql.Cursor = conn.cursor() # NOTE: Data is massive. Let's limit to the US. Still massive but less massive. This app isn't setup to handle this at the moment. # TODO: Add chunking / pagination to handle the data # data = cur.execute("""SELECT * # FROM cellsites # WHERE cellsites.lat BETWEEN 24.396308 AND 49.384358 # AND cellsites.lon BETWEEN -124.848974 AND -66.885444 # LIMIT 500; # """).fetchall() # NOTE: Testing out specific US states. Getting a better spread. data: list[Any] = cur.execute(""" SELECT * FROM cellsites WHERE ( (lat BETWEEN 34.98 AND 36.68 AND lon BETWEEN -90.31 AND -81.65) -- Tennessee OR (lat BETWEEN 30.36 AND 35.00 AND lon BETWEEN -85.61 AND -80.84) -- Georgia OR (lat BETWEEN 36.97 AND 42.51 AND lon BETWEEN -91.51 AND -87.02) -- Illinois OR (lat BETWEEN 36.50 AND 39.15 AND lon BETWEEN -89.57 AND -81.96) -- Kentucky OR (lat BETWEEN 37.77 AND 41.76 AND lon BETWEEN -88.10 AND -84.78) -- Indiana OR (lat BETWEEN 33.84 AND 36.59 AND lon BETWEEN -84.32 AND -75.46) -- North Carolina OR (lat BETWEEN 32.05 AND 35.22 AND lon BETWEEN -83.35 AND -78.54) -- South Carolina ) LIMIT 500; """).fetchall() logger.debug(f"get_cellsites → {len(data)} rows") return [convert_timestamps(dict(row)) for row in data] def get_cellsite(conn: sql.Connection, id: int) -> list[dict[Any, Any]]: conn.row_factory = sql.Row cur = conn.cursor() data = cur.execute("SELECT * FROM cellsites WHERE id = ?", (id,)).fetchall() if not data: logger.warning(f"get_cellsite → no result for id={id}") return [convert_timestamps(dict(row)) for row in data] def get_alarms(conn: sql.Connection, id: int | None = None, before: int | None = None): conn.row_factory = sql.Row cur: sql.Cursor = conn.cursor() query = "SELECT * FROM alarms" params = [] conditions = [] if id is not None: conditions.append("site_id = ?") params.append(id) if before is not None: conditions.append("created <= ?") params.append(before) if conditions: query += " WHERE " + " AND ".join(conditions) data: list[Any] = cur.execute(query, params).fetchall() logger.debug(f"get_alarms (site_id={id}, before={before}) → {len(data)} rows") return [convert_timestamps(dict(row)) for row in data] def get_incident_by_id(conn: sql.Connection, incident_id: int): conn.row_factory = sql.Row cur: sql.Cursor = conn.cursor() row = cur.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,)).fetchone() if not row: logger.warning(f"get_incident_by_id → no result for id={incident_id}") return convert_timestamps(dict(row)) if row else None def get_incidents(conn: sql.Connection, id: int | None = None, before: int | None = None): conn.row_factory = sql.Row cur: sql.Cursor = conn.cursor() query = "SELECT * FROM incidents" params = [] conditions = [] if id is not None: conditions.append("site_id = ?") params.append(id) if before is not None: conditions.append("created <= ?") params.append(before) if conditions: query += " WHERE " + " AND ".join(conditions) data: list[Any] = cur.execute(query, params).fetchall() logger.debug(f"get_incidents (site_id={id}, before={before}) → {len(data)} rows") return [convert_timestamps(dict(row)) for row in data] # TODO: Probably leave out of here. The sim shouldn't be part of the main queries. def get_simulator_stats(conn): cur = conn.cursor() alarm_count = cur.execute("SELECT COUNT(*) FROM alarms WHERE created_by = 'simulator'").fetchone()[0] incident_count = cur.execute("SELECT COUNT(*) FROM incidents WHERE created_by = 'simulator'").fetchone()[0] return { "alarm_count": alarm_count, "incident_count": incident_count } def get_robots(conn): conn.row_factory = sql.Row cur = conn.cursor() data = cur.execute("SELECT * FROM robots").fetchall() logger.debug(f"get_robots → {len(data)} rows") return [convert_timestamps(dict(row)) for row in data] def get_robot_by_id(conn, robot_id: int): conn.row_factory = sql.Row cur = conn.cursor() row = cur.execute("SELECT * FROM robots WHERE id = ?", (robot_id,)).fetchone() if not row: logger.warning(f"get_robot_by_id → no result for id={robot_id}") return convert_timestamps(dict(row)) if row else None