Files
three_60/backend/api/v1/__init__.py

66 lines
2.3 KiB
Python

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from config import settings
from simulator import alarm_simulator, ticket_simulator, cleanup_alarms, cleanup_incidents, robot_simulator
import asyncio
from contextlib import asynccontextmanager
import time
from .incidents import router as incidents_router
from .alarms import router as alarms_router
from .cellsites import router as cellsites_router
from .simulator import router as simulator_router
from .robots import router as robots_router
from .gqlapi import graphql_app as test_graphql
origins: list[str] = settings.cors_origins.split(",")
ALARM_CLEANUP_INTERVAL = 180
INCIDENT_CLEANUP_INTERVAL = 240
# NOTE: All 5 simulators run on a single thread using Python's event loop. Using this instead of multithreading to keep things simple. Good enough for demoing the idea.
@asynccontextmanager
async def lifespan(app: FastAPI):
task1 = asyncio.create_task(alarm_simulator(5))
task2 = asyncio.create_task(ticket_simulator(20))
task3 = asyncio.create_task(cleanup_alarms(interval=ALARM_CLEANUP_INTERVAL, max_age_seconds=5))
task4 = asyncio.create_task(cleanup_incidents(interval=INCIDENT_CLEANUP_INTERVAL, max_age_seconds=10))
task5 = asyncio.create_task(robot_simulator(8))
# NOTE: This is for the stats endpoint, to send to frontend
simulator.next_alarm_cleanup_at = time.time() + ALARM_CLEANUP_INTERVAL
simulator.next_incident_cleanup_at = time.time() + INCIDENT_CLEANUP_INTERVAL
yield
task1.cancel()
task2.cancel()
task3.cancel()
task4.cancel()
task5.cancel()
def create_app() -> FastAPI:
# TODO: Add a toggle / Flag or endpoint to turn off the simulator
# app = FastAPI()
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(incidents_router, prefix="/api/v1")
app.include_router(cellsites_router, prefix="/api/v1")
app.include_router(alarms_router, prefix="/api/v1")
app.include_router(simulator_router, prefix="/api/v1")
app.include_router(robots_router, prefix="/api/v1")
# Graphql test
app.include_router(test_graphql, prefix="/graphql")
return app