from typing import List from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import text from app.core.security import get_current_user, get_password_hash from app.db.session import get_db from app.models.user import User from app.schemas.user import UserCreate, UserResponse, UserUpdate router = APIRouter() def check_permission(current_user: User, required_roles: List[str]) -> bool: user_role_value = ( current_user.role.value if hasattr(current_user.role, "value") else current_user.role ) return user_role_value in required_roles @router.get("", response_model=dict) async def list_users( page: int = 1, page_size: int = 20, role: str = None, is_active: bool = None, search: str = None, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): if not check_permission(current_user, ["super_admin", "admin"]): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions", ) # Build WHERE clause where_clauses = [] params = {} if role: where_clauses.append("role = :role") params["role"] = role if is_active is not None: where_clauses.append("is_active = :is_active") params["is_active"] = is_active if search: where_clauses.append("(username ILIKE :search OR email ILIKE :search)") params["search"] = f"%{search}%" where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" offset = (page - 1) * page_size query = text( f"SELECT id, username, email, role, is_active, last_login_at, created_at FROM users WHERE {where_sql} ORDER BY created_at DESC LIMIT {page_size} OFFSET {offset}" ) count_query = text(f"SELECT COUNT(*) FROM users WHERE {where_sql}") result = await db.execute(query, params) users = result.fetchall() count_result = await db.execute(count_query, params) total = count_result.scalar() return { "total": total, "page": page, "page_size": page_size, "data": [ { "id": u[0], "username": u[1], "email": u[2], "role": u[3], "is_active": u[4], "last_login_at": u[5], "created_at": u[6], } for u in users ], } @router.get("/{user_id}", response_model=dict) async def get_user( user_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): if not check_permission(current_user, ["super_admin", "admin"]) and current_user.id != user_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions", ) result = await db.execute( text( "SELECT id, username, email, role, is_active, last_login_at, created_at FROM users WHERE id = :id" ), {"id": user_id}, ) user = result.fetchone() if user is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) return { "id": user[0], "username": user[1], "email": user[2], "role": user[3], "is_active": user[4], "last_login_at": user[5], "created_at": user[6], } @router.post("", response_model=dict, status_code=status.HTTP_201_CREATED) async def create_user( user_data: UserCreate, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): if not check_permission(current_user, ["super_admin"]): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only super_admin can create users", ) result = await db.execute( text("SELECT id FROM users WHERE username = :username OR email = :email"), {"username": user_data.username, "email": user_data.email}, ) if result.fetchone(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username or email already exists", ) hashed_password = get_password_hash(user_data.password) await db.execute( text("""INSERT INTO users (username, email, password_hash, role, is_active, created_at, updated_at) VALUES (:username, :email, :password_hash, :role, :is_active, NOW(), NOW())"""), { "username": user_data.username, "email": user_data.email, "password_hash": hashed_password, "role": user_data.role, "is_active": True, }, ) await db.commit() # Get the inserted user ID result = await db.execute( text("SELECT id FROM users WHERE username = :username"), {"username": user_data.username}, ) new_user = result.fetchone() if new_user is None: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create user", ) return { "id": new_user[0], "username": user_data.username, "email": user_data.email, "role": user_data.role, "is_active": True, } @router.put("/{user_id}") async def update_user( user_id: int, user_data: UserUpdate, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): if not check_permission(current_user, ["super_admin", "admin"]) and current_user.id != user_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions", ) if not check_permission(current_user, ["super_admin"]) and user_data.role is not None: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only super_admin can change user role", ) result = await db.execute( text("SELECT id FROM users WHERE id = :id"), {"id": user_id}, ) if not result.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) update_fields = [] params = {"id": user_id} if user_data.email is not None: update_fields.append("email = :email") params["email"] = user_data.email if user_data.role is not None: update_fields.append("role = :role") params["role"] = user_data.role if user_data.is_active is not None: update_fields.append("is_active = :is_active") params["is_active"] = user_data.is_active if update_fields: update_fields.append("updated_at = NOW()") query = text(f"UPDATE users SET {', '.join(update_fields)} WHERE id = :id") await db.execute(query, params) await db.commit() return {"message": "User updated successfully"} @router.delete("/{user_id}") async def delete_user( user_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): if not check_permission(current_user, ["super_admin"]): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only super_admin can delete users", ) if current_user.id == user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete yourself", ) result = await db.execute( text("SELECT id FROM users WHERE id = :id"), {"id": user_id}, ) if not result.fetchone(): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) await db.execute( text("DELETE FROM users WHERE id = :id"), {"id": user_id}, ) await db.commit() return {"message": "User deleted successfully"}