from datetime import timedelta from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import text from app.core.config import settings from app.core.security import ( create_access_token, create_refresh_token, blacklist_token, get_current_user, verify_password, ) from app.db.session import get_db from app.models.user import User from app.schemas.token import Token from app.schemas.user import UserCreate, UserResponse router = APIRouter() @router.post("/login", response_model=Token) async def login( form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db), ): result = await db.execute( text( "SELECT id, username, email, password_hash, role, is_active FROM users WHERE username = :username" ), {"username": form_data.username}, ) row = result.fetchone() if row is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", ) user = User() user.id = row[0] user.username = row[1] user.email = row[2] user.password_hash = row[3] user.role = row[4] user.is_active = row[5] if not verify_password(form_data.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", ) if not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User is inactive", ) access_token = create_access_token(data={"sub": user.id}) refresh_token = create_refresh_token(data={"sub": user.id}) expires_in = None if settings.ACCESS_TOKEN_EXPIRE_MINUTES > 0: expires_in = settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 return { "access_token": access_token, "token_type": "bearer", "expires_in": expires_in, "user": { "id": user.id, "username": user.username, "role": user.role, }, } @router.post("/refresh", response_model=Token) async def refresh_token( current_user: User = Depends(get_current_user), ): access_token = create_access_token(data={"sub": current_user.id}) expires_in = None if settings.ACCESS_TOKEN_EXPIRE_MINUTES > 0: expires_in = settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 return { "access_token": access_token, "token_type": "bearer", "expires_in": expires_in, "user": { "id": current_user.id, "username": current_user.username, "role": current_user.role, }, } @router.post("/logout") async def logout(): return {"message": "Successfully logged out"} @router.get("/me", response_model=UserResponse) async def get_me(current_user: User = Depends(get_current_user)): return { "id": current_user.id, "username": current_user.username, "email": current_user.email, "role": current_user.role, "is_active": current_user.is_active, "created_at": current_user.created_at, }