Skip to content

Commit e01c1dc

Browse files
committed
feat: Add new product intergrations api interop endpoints
1 parent 61174c5 commit e01c1dc

File tree

1 file changed

+148
-0
lines changed

1 file changed

+148
-0
lines changed

src/takrmapi/api/interop.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
"""Routes for interoperation between products"""
2+
3+
from typing import Sequence, Optional
4+
import logging
5+
from pathlib import Path
6+
7+
from fastapi import APIRouter, Depends, Request, HTTPException
8+
from libpvarki.middleware import MTLSHeader
9+
from libpvarki.schemas.generic import OperationResultResponse
10+
from libpvarki.schemas.product import UserCRUDRequest
11+
from pydantic import Field, BaseModel, ConfigDict, Extra # pylint: disable=E0611 # False positive
12+
13+
14+
from .helpers import comes_from_rm
15+
from ..tak_helpers import UserCRUD as TAKBaseCRUD
16+
from ..tak_helpers import Helpers as TAKBaseHelper
17+
from .. import config
18+
19+
20+
LOGGER = logging.getLogger(__name__)
21+
22+
interoprouter = APIRouter(dependencies=[Depends(MTLSHeader(auto_error=True))])
23+
24+
25+
# FIXME: Move to libpvarki
26+
class ProductAddRequest(BaseModel): # pylint: disable=too-few-public-methods
27+
"""Request to add product interoperability."""
28+
29+
certcn: str = Field(description="CN of the certificate")
30+
x509cert: str = Field(description="Certificate encoded with CFSSL conventions (newlines escaped)")
31+
32+
model_config = ConfigDict(
33+
extra=Extra.forbid,
34+
schema_extra={
35+
"examples": [
36+
{
37+
"certcn": "product.deployment.tld",
38+
"x509cert": "-----BEGIN CERTIFICATE-----\\nMIIEwjCC...\\n-----END CERTIFICATE-----\\n",
39+
},
40+
],
41+
},
42+
)
43+
44+
45+
# FIXME: Move to libpvarki
46+
class ProductAuthzResponse(BaseModel): # pylint: disable=too-few-public-methods
47+
"""Authz info"""
48+
49+
type: str = Field(description="type of authz: bearer-token, basic, mtls")
50+
token: Optional[str] = Field(description="Bearer token", default=None)
51+
username: Optional[str] = Field(description="Username for basic auth", default=None)
52+
password: Optional[str] = Field(description="Password for basic auth", default=None)
53+
54+
model_config = ConfigDict(
55+
extra=Extra.forbid,
56+
schema_extra={
57+
"examples": [
58+
{
59+
"type": "mtls",
60+
},
61+
{
62+
"type": "bearer-token",
63+
"token": "<JWT>",
64+
},
65+
{
66+
"type": "basic",
67+
"username": "product.deployment.tld",
68+
"password": "<PASSWORD>",
69+
},
70+
],
71+
},
72+
)
73+
74+
75+
def cn_to_callsign(certcn: str) -> str:
76+
"""Convert CN to something that is valid as callsign filename"""
77+
return certcn.replace(".", "_")
78+
79+
80+
class ProductCRUD(TAKBaseCRUD):
81+
"""Pretend to be user"""
82+
83+
def __init__(self, user: UserCRUDRequest):
84+
super().__init__(user)
85+
self.helpers = TakProductHelper(self)
86+
self.write_cert_conditional()
87+
88+
@property
89+
def certpath(self) -> Path:
90+
"""Path to local cert"""
91+
return config.TAK_CERTS_FOLDER / f"{self.certcn}_rm.pem"
92+
93+
def write_cert_conditional(self) -> None:
94+
"""write the X509 cert if it did not exist"""
95+
if self.certpath.exists():
96+
return
97+
if self.user.x509cert == "NOTHERE":
98+
raise HTTPException(status_code=404, detail="Certificate not set")
99+
self.certpath.parent.mkdir(parents=True, exist_ok=True)
100+
self.certpath.write_text(self.rm_certpem)
101+
102+
103+
class TakProductHelper(TAKBaseHelper): # pylint: disable=too-few-public-methods
104+
"""Do TAK things"""
105+
106+
@property
107+
def enable_user_cert_names(self) -> Sequence[str]:
108+
"""Return the stems for cert PEM files"""
109+
return (f"{self.user.callsign}_rm",)
110+
111+
112+
@interoprouter.post("/add")
113+
async def add_product(
114+
product: ProductAddRequest,
115+
request: Request,
116+
) -> OperationResultResponse:
117+
"""Product needs interop privileges. This can only be called by RASENMAEHER"""
118+
comes_from_rm(request)
119+
callsign = cn_to_callsign(product.certcn)
120+
product_user = UserCRUDRequest(
121+
uuid=callsign,
122+
callsign=callsign,
123+
x509cert=product.x509cert,
124+
)
125+
crud = ProductCRUD(product_user)
126+
res = await crud.helpers.add_admin_to_tak_with_cert()
127+
if not res:
128+
return OperationResultResponse(success=False, error="TAK admin script failed")
129+
return OperationResultResponse(success=True)
130+
131+
132+
@interoprouter.get("/authz")
133+
async def get_authz(
134+
request: Request,
135+
) -> ProductAuthzResponse:
136+
"""Get authz info for the product, for tak it's always mtls"""
137+
payload = request.state.mtlsdn
138+
callsign = cn_to_callsign(payload.get("CN"))
139+
product_user = UserCRUDRequest(
140+
uuid=callsign,
141+
callsign=callsign,
142+
x509cert="NOTHERE",
143+
)
144+
crud = ProductCRUD(product_user)
145+
if not crud.certpath.exists():
146+
raise HTTPException(status_code=404, detail="Certificate does not exist")
147+
result = ProductAuthzResponse(type="mtls")
148+
return result

0 commit comments

Comments
 (0)