Files
three_60/backend/database/__init__.py
2026-04-01 12:40:40 -04:00

133 lines
5.3 KiB
Python

import sqlite3 as sql
import os
from datetime import datetime, timezone
from typing import Any
from config import settings
from logger import get_logger
logger = get_logger(__name__)
def connect_to_db() -> sql.Connection:
DB: str = settings.db_path or os.path.join(
os.path.dirname(os.path.abspath(__file__)), "..", "..", "data", "three60.db"
)
conn: sql.Connection = sql.connect(DB, timeout=10)
conn.execute("PRAGMA journal_mode=WAL")
return conn
def convert_timestamps(row: dict) -> dict:
for key in ('created', 'updated'):
if row.get(key) is not None:
row[key] = datetime.fromtimestamp(row[key], tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
return row
# TODO: Break out into separate modules per thing
def get_cellsites(conn: sql.Connection) -> list[dict[Any, Any]]:
conn.row_factory = sql.Row
cur: sql.Cursor = conn.cursor()
# NOTE: Data is massive. Let's limit to the US. Still massive but less massive. This app isn't setup to handle this at the moment.
# TODO: Add chunking / pagination to handle the data
# data = cur.execute("""SELECT *
# FROM cellsites
# WHERE cellsites.lat BETWEEN 24.396308 AND 49.384358
# AND cellsites.lon BETWEEN -124.848974 AND -66.885444
# LIMIT 500;
# """).fetchall()
# NOTE: Testing out specific US states. Getting a better spread.
data: list[Any] = cur.execute("""
SELECT *
FROM cellsites
WHERE (
(lat BETWEEN 34.98 AND 36.68 AND lon BETWEEN -90.31 AND -81.65) -- Tennessee
OR (lat BETWEEN 30.36 AND 35.00 AND lon BETWEEN -85.61 AND -80.84) -- Georgia
OR (lat BETWEEN 36.97 AND 42.51 AND lon BETWEEN -91.51 AND -87.02) -- Illinois
OR (lat BETWEEN 36.50 AND 39.15 AND lon BETWEEN -89.57 AND -81.96) -- Kentucky
OR (lat BETWEEN 37.77 AND 41.76 AND lon BETWEEN -88.10 AND -84.78) -- Indiana
OR (lat BETWEEN 33.84 AND 36.59 AND lon BETWEEN -84.32 AND -75.46) -- North Carolina
OR (lat BETWEEN 32.05 AND 35.22 AND lon BETWEEN -83.35 AND -78.54) -- South Carolina
)
LIMIT 500;
""").fetchall()
logger.debug(f"get_cellsites → {len(data)} rows")
return [convert_timestamps(dict(row)) for row in data]
def get_cellsite(conn: sql.Connection, id: int) -> list[dict[Any, Any]]:
conn.row_factory = sql.Row
cur = conn.cursor()
data = cur.execute("SELECT * FROM cellsites WHERE id = ?", (id,)).fetchall()
if not data:
logger.warning(f"get_cellsite → no result for id={id}")
return [convert_timestamps(dict(row)) for row in data]
def get_alarms(conn: sql.Connection, id: int | None = None, before: int | None = None):
conn.row_factory = sql.Row
cur: sql.Cursor = conn.cursor()
query = "SELECT * FROM alarms"
params = []
conditions = []
if id is not None:
conditions.append("site_id = ?")
params.append(id)
if before is not None:
conditions.append("created <= ?")
params.append(before)
if conditions:
query += " WHERE " + " AND ".join(conditions)
data: list[Any] = cur.execute(query, params).fetchall()
logger.debug(f"get_alarms (site_id={id}, before={before}) → {len(data)} rows")
return [convert_timestamps(dict(row)) for row in data]
def get_incident_by_id(conn: sql.Connection, incident_id: int):
conn.row_factory = sql.Row
cur: sql.Cursor = conn.cursor()
row = cur.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,)).fetchone()
if not row:
logger.warning(f"get_incident_by_id → no result for id={incident_id}")
return convert_timestamps(dict(row)) if row else None
def get_incidents(conn: sql.Connection, id: int | None = None, before: int | None = None):
conn.row_factory = sql.Row
cur: sql.Cursor = conn.cursor()
query = "SELECT * FROM incidents"
params = []
conditions = []
if id is not None:
conditions.append("site_id = ?")
params.append(id)
if before is not None:
conditions.append("created <= ?")
params.append(before)
if conditions:
query += " WHERE " + " AND ".join(conditions)
data: list[Any] = cur.execute(query, params).fetchall()
logger.debug(f"get_incidents (site_id={id}, before={before}) → {len(data)} rows")
return [convert_timestamps(dict(row)) for row in data]
# TODO: Probably leave out of here. The sim shouldn't be part of the main queries.
def get_simulator_stats(conn):
cur = conn.cursor()
alarm_count = cur.execute("SELECT COUNT(*) FROM alarms WHERE created_by = 'simulator'").fetchone()[0]
incident_count = cur.execute("SELECT COUNT(*) FROM incidents WHERE created_by = 'simulator'").fetchone()[0]
return { "alarm_count": alarm_count, "incident_count": incident_count }
def get_robots(conn):
conn.row_factory = sql.Row
cur = conn.cursor()
data = cur.execute("SELECT * FROM robots").fetchall()
logger.debug(f"get_robots → {len(data)} rows")
return [convert_timestamps(dict(row)) for row in data]
def get_robot_by_id(conn, robot_id: int):
conn.row_factory = sql.Row
cur = conn.cursor()
row = cur.execute("SELECT * FROM robots WHERE id = ?", (robot_id,)).fetchone()
if not row:
logger.warning(f"get_robot_by_id → no result for id={robot_id}")
return convert_timestamps(dict(row)) if row else None