133 lines
5.3 KiB
Python
133 lines
5.3 KiB
Python
import sqlite3 as sql
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
from config import settings
|
|
|
|
from logger import get_logger
|
|
logger = get_logger(__name__)
|
|
|
|
def connect_to_db() -> sql.Connection:
|
|
DB: str = settings.db_path or os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)), "..", "..", "data", "three60.db"
|
|
)
|
|
conn: sql.Connection = sql.connect(DB, timeout=10)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
return conn
|
|
|
|
def convert_timestamps(row: dict) -> dict:
|
|
for key in ('created', 'updated'):
|
|
if row.get(key) is not None:
|
|
row[key] = datetime.fromtimestamp(row[key], tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
return row
|
|
|
|
# TODO: Break out into separate modules per thing
|
|
def get_cellsites(conn: sql.Connection) -> list[dict[Any, Any]]:
|
|
conn.row_factory = sql.Row
|
|
cur: sql.Cursor = conn.cursor()
|
|
# NOTE: Data is massive. Let's limit to the US. Still massive but less massive. This app isn't setup to handle this at the moment.
|
|
# TODO: Add chunking / pagination to handle the data
|
|
# data = cur.execute("""SELECT *
|
|
# FROM cellsites
|
|
# WHERE cellsites.lat BETWEEN 24.396308 AND 49.384358
|
|
# AND cellsites.lon BETWEEN -124.848974 AND -66.885444
|
|
# LIMIT 500;
|
|
# """).fetchall()
|
|
|
|
# NOTE: Testing out specific US states. Getting a better spread.
|
|
data: list[Any] = cur.execute("""
|
|
SELECT *
|
|
FROM cellsites
|
|
WHERE (
|
|
(lat BETWEEN 34.98 AND 36.68 AND lon BETWEEN -90.31 AND -81.65) -- Tennessee
|
|
OR (lat BETWEEN 30.36 AND 35.00 AND lon BETWEEN -85.61 AND -80.84) -- Georgia
|
|
OR (lat BETWEEN 36.97 AND 42.51 AND lon BETWEEN -91.51 AND -87.02) -- Illinois
|
|
OR (lat BETWEEN 36.50 AND 39.15 AND lon BETWEEN -89.57 AND -81.96) -- Kentucky
|
|
OR (lat BETWEEN 37.77 AND 41.76 AND lon BETWEEN -88.10 AND -84.78) -- Indiana
|
|
OR (lat BETWEEN 33.84 AND 36.59 AND lon BETWEEN -84.32 AND -75.46) -- North Carolina
|
|
OR (lat BETWEEN 32.05 AND 35.22 AND lon BETWEEN -83.35 AND -78.54) -- South Carolina
|
|
)
|
|
LIMIT 500;
|
|
""").fetchall()
|
|
|
|
logger.debug(f"get_cellsites → {len(data)} rows")
|
|
return [convert_timestamps(dict(row)) for row in data]
|
|
|
|
def get_cellsite(conn: sql.Connection, id: int) -> list[dict[Any, Any]]:
|
|
conn.row_factory = sql.Row
|
|
cur = conn.cursor()
|
|
data = cur.execute("SELECT * FROM cellsites WHERE id = ?", (id,)).fetchall()
|
|
|
|
if not data:
|
|
logger.warning(f"get_cellsite → no result for id={id}")
|
|
return [convert_timestamps(dict(row)) for row in data]
|
|
|
|
def get_alarms(conn: sql.Connection, id: int | None = None, before: int | None = None):
|
|
conn.row_factory = sql.Row
|
|
cur: sql.Cursor = conn.cursor()
|
|
query = "SELECT * FROM alarms"
|
|
params = []
|
|
conditions = []
|
|
if id is not None:
|
|
conditions.append("site_id = ?")
|
|
params.append(id)
|
|
if before is not None:
|
|
conditions.append("created <= ?")
|
|
params.append(before)
|
|
if conditions:
|
|
query += " WHERE " + " AND ".join(conditions)
|
|
data: list[Any] = cur.execute(query, params).fetchall()
|
|
logger.debug(f"get_alarms (site_id={id}, before={before}) → {len(data)} rows")
|
|
return [convert_timestamps(dict(row)) for row in data]
|
|
|
|
|
|
def get_incident_by_id(conn: sql.Connection, incident_id: int):
|
|
conn.row_factory = sql.Row
|
|
cur: sql.Cursor = conn.cursor()
|
|
row = cur.execute("SELECT * FROM incidents WHERE id = ?", (incident_id,)).fetchone()
|
|
if not row:
|
|
logger.warning(f"get_incident_by_id → no result for id={incident_id}")
|
|
return convert_timestamps(dict(row)) if row else None
|
|
|
|
|
|
def get_incidents(conn: sql.Connection, id: int | None = None, before: int | None = None):
|
|
conn.row_factory = sql.Row
|
|
cur: sql.Cursor = conn.cursor()
|
|
query = "SELECT * FROM incidents"
|
|
params = []
|
|
conditions = []
|
|
if id is not None:
|
|
conditions.append("site_id = ?")
|
|
params.append(id)
|
|
if before is not None:
|
|
conditions.append("created <= ?")
|
|
params.append(before)
|
|
if conditions:
|
|
query += " WHERE " + " AND ".join(conditions)
|
|
data: list[Any] = cur.execute(query, params).fetchall()
|
|
logger.debug(f"get_incidents (site_id={id}, before={before}) → {len(data)} rows")
|
|
return [convert_timestamps(dict(row)) for row in data]
|
|
|
|
# TODO: Probably leave out of here. The sim shouldn't be part of the main queries.
|
|
def get_simulator_stats(conn):
|
|
cur = conn.cursor()
|
|
alarm_count = cur.execute("SELECT COUNT(*) FROM alarms WHERE created_by = 'simulator'").fetchone()[0]
|
|
incident_count = cur.execute("SELECT COUNT(*) FROM incidents WHERE created_by = 'simulator'").fetchone()[0]
|
|
return { "alarm_count": alarm_count, "incident_count": incident_count }
|
|
|
|
def get_robots(conn):
|
|
conn.row_factory = sql.Row
|
|
cur = conn.cursor()
|
|
data = cur.execute("SELECT * FROM robots").fetchall()
|
|
logger.debug(f"get_robots → {len(data)} rows")
|
|
return [convert_timestamps(dict(row)) for row in data]
|
|
|
|
def get_robot_by_id(conn, robot_id: int):
|
|
conn.row_factory = sql.Row
|
|
cur = conn.cursor()
|
|
row = cur.execute("SELECT * FROM robots WHERE id = ?", (robot_id,)).fetchone()
|
|
if not row:
|
|
logger.warning(f"get_robot_by_id → no result for id={robot_id}")
|
|
return convert_timestamps(dict(row)) if row else None
|