Source code for ska.tmlite.server.main

"""
The fastAPI server for the prototype model

"""
from functools import lru_cache
from typing import Union
import importlib

from fastapi import FastAPI, HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic import ValidationError

from ..models import TModelLite, LayOut, as_TModelLite, Instrument, Mccs, MCCSGeoJSON
from .config import Settings


app = FastAPI(
    title="Prototype TM API for SKA-SDP",
    description="Simple fastAPI based interface to the prototype telescope model")

@lru_cache()
def get_settings():
    return Settings()

model = {}


@app.on_event("startup")
async def startup_event():
    """
    Startup method for the server - clone and load the current model from
    the ska-sdp-tmlite-repository GitLab repository
    """
    # see https://github.com/tiangolo/fastapi/issues/425#issuecomment-954963966
    settings = app.dependency_overrides.get(get_settings, get_settings)()

    storage = importlib.import_module(
        f'.storage.{settings.storage_backend}', package=__package__)
    tm = await storage.load_telescope_model(settings.storage)
    tm_model_lite = TModelLite.parse_obj(tm['model'])
    await add_model("current", tm_model_lite)
    await add_model("default", tm_model_lite)


[docs]@app.get("/") def read_root(): return model["default"]
[docs]@app.get("/model/{item_id}/{instrument_id}/static_rfi_mask") async def get_static_rfi_mask(item_id: str, instrument_id: str): """ GET method to return the rfi mask :return: the JSON representation of the RFI mask """ rfi_mask = model[item_id]["instrument"][instrument_id]["static_rfi_mask"] return rfi_mask
[docs]@app.get("/model/{item_id}", response_model=Union[TModelLite, MCCSGeoJSON]) async def get_model(item_id: str): """ GET method for the given model :param item_id: str the label for the model that is to be returned :return: model """ if item_id not in model: raise HTTPException(status_code=404, detail="Item not found") return model[item_id]
[docs]@app.get("/model/{item_id}/{instrument}/layout", response_model=Union[LayOut, MCCSGeoJSON]) async def get_layout(item_id: str, instrument:str): """ GET method for the given model - perform a conversion from any other model :param item_id: str :return: model """ if 'type' in model[item_id]: return model[item_id] else: return model[item_id]["instrument"][instrument]["layout"]
[docs]@app.get("/model/{item_id}/{instrument_id}", response_model=Instrument) async def get_instrument(item_id: str, instrument_id:str): """ GET method for the given instrument :param item_id: str :return: model """ return Instrument.parse_obj(model[item_id]["instrument"][instrument_id])
[docs]@app.put("/model/{item_id}") async def add_model(item_id: str, item: Union[TModelLite, MCCSGeoJSON]): """ PUT method that takes a full model to add - will replace If the model is in MCCSGeoJSON format we will convert it to TModelLite model :param model: TModelLite or MCCSGeoJSON in json format :return: the model: """ test = jsonable_encoder(item) if 'type' in test: in_model = MCCSGeoJSON.parse_obj(test) out_model_dict = as_TModelLite(in_model.dict()) model[item_id] = out_model_dict else: model[item_id] = test
[docs]@app.put("/model/{item_id}/{instrument_id}/update_layout") async def update_model_layout_from_file(item_id: str, instrument_id: str, item: Union[TModelLite, MCCSGeoJSON]): """ Updates the layout using an input model (either TModelLite or MCCSGeoJSON """ if item_id not in model: raise HTTPException(status_code=404, detail="Item not found") layout = jsonable_encoder(item) update_model_layout(item_id, instrument_id,layout)
[docs]@app.put("/model/{item_id}/{instrument_id}/update_layout/{file_id}") async def update_model_layout_from_storage(item_id: str, instrument_id: str, file_id:str): """ Updates the layout using the contents of a file in the backend storage """ if item_id not in model: raise HTTPException(status_code=404, detail="Item not found") settings = get_settings() storage = importlib.import_module( f'.storage.{settings.storage_backend}', package=__package__) settings.storage.model_path = f"data/instrument/{instrument_id}/layout/{file_id}" try: layout = await storage.load_telescope_model(settings.storage) except FileNotFoundError: raise HTTPException(status_code=404, detail="Item not found") update_model_layout(item_id, instrument_id, layout)
[docs]def update_model_layout(item_id: str, instrument_id: str, item: dict): """ Updates the current model layout with the contents of the dictionary :param item_id: str - the label of the model to be updated :param instrument_id: str the instrument [ska1_low | ska1_mod] :param item: dict JSON representation of the station position """ current_stations = model[item_id]["instrument"][instrument_id]["layout"]["receptors"] if 'type' in item.keys(): # this is a MCCSGeoJSON model - will update the stations based on the numbers for feature in item['features']: name = feature["properties"]["name"] name_found = False for index, station in enumerate(current_stations): if station["station_name"] == name: name_found = True if "long" in current_stations[index].keys(): current_stations[index]["location"]["geodetic"]["lon"] = feature["geometry"]["coordinates"][0] current_stations[index]["location"]["geodetic"]["lat"] = feature["geometry"]["coordinates"][1] if not name_found: station = { "station_name": name, "diameter": 38.0, "location": {} } station["location"]["geodetic"] = {} station["location"]["geodetic"]["lon"] = feature["geometry"]["coordinates"][0] station["location"]["geodetic"]["lat"] = feature["geometry"]["coordinates"][1] current_stations.append(station) else: current_stations = item["instrument"][instrument_id]["layout"]["receptors"] model[item_id]["instrument"][instrument_id]["layout"]["receptors"] = current_stations
[docs]@app.post("/model/{item_id}/{instrument_id}/update_antennas", response_model=Instrument) async def update_model_antennas(item_id: str, instrument_id: str, item: Mccs): """ Update the layout of the current model to match the input This actually takes the current station_id's and uses them to add the antennas that match the IDs from the default model into the current model. """ if item_id not in model: raise HTTPException(status_code=404, detail="Item not found") if item_id == "default": raise HTTPException(status_code=403, detail="Update not allowed on item") stations = jsonable_encoder(item) new_antennas = [] default_antennas = model["default"]["instrument"][instrument_id]["layout"]["receptors"] for _, in_index in enumerate(stations["station_ids"]): to_add = default_antennas[in_index] new_antennas.append(to_add) model[item_id]["instrument"][instrument_id]["layout"]["receptors"] = new_antennas mymodel = Instrument.parse_obj(model[item_id]["instrument"][instrument_id]) return mymodel