Files
ebook-extension/ebook_backend&admin_panel/admin-backend/routes/auth.py

514 lines
18 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Form, status, Request, UploadFile, File
from fastapi.responses import JSONResponse, HTMLResponse, RedirectResponse
from sqlalchemy.orm import Session
from models.user import AdminUser
from utils.auth import get_db, hash_password, verify_password
from fastapi.templating import Jinja2Templates
from utils.template_loader import templates
from models.coupon import Coupon
from utils.coupon_utils import generate_coupon
from datetime import datetime
import pytz
from utils.timezone_utils import format_cest_datetime
from schemas import AdminLogin, CodeItem, CouponUpload
from fastapi.responses import StreamingResponse
import os
router = APIRouter()
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
"""
Render the admin login page.
Args:
request (Request): The incoming request object.
Returns:
HTMLResponse: Rendered login page.
"""
# return templates.TemplateResponse("admin_login.html", {"request": request})
return templates.TemplateResponse(request, "admin_login.html", {"data": "something"})
@router.get("/", response_class=HTMLResponse)
def admin_panel(request: Request):
"""
Render the admin dashboard if logged in.
Args:
request (Request): The incoming request object.
Returns:
HTMLResponse or RedirectResponse: Admin dashboard or redirect to login.
"""
if not request.cookies.get("admin_logged_in"):
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
# return templates.TemplateResponse("admin_dashboard.html", {"request": request})
return templates.TemplateResponse(request, "admin_dashboard.html", {"data": "something"})
@router.post("/admin/login")
def login(data: AdminLogin, db: Session = Depends(get_db)):
"""
Handle admin login and set authentication cookie.
Args:
data (AdminLogin): Login data with username and password.
db (Session): Database session.
Returns:
JSONResponse: Login status.
"""
user = db.query(AdminUser).filter_by(username=data.username).first()
if not user or not verify_password(data.password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid credentials")
response = JSONResponse(content={"status": "success"})
response.set_cookie("admin_logged_in", "true", httponly=True, samesite="strict")
return response
@router.post("/admin/logout")
def logout():
"""
Handle admin logout and clear the authentication cookie.
Returns:
JSONResponse: Logout status.
"""
response = JSONResponse(content={"status": "success"})
response.delete_cookie("admin_logged_in")
return response
@router.post("/generate")
async def generate_code(mode: str = Form(...), count: int = Form(1), db: Session = Depends(get_db),
request: Request = None):
"""
Generate coupon codes (single or bulk).
Args:
mode (str): 'single' or 'bulk'.
count (int): Number of codes to generate (used for bulk).
db (Session): Database session.
request (Request): Incoming request for auth check.
Returns:
dict: Generated codes.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
new_codes = []
if mode == "single":
new_code = generate_coupon().upper() # Convert to uppercase
db_code = Coupon(code=new_code, usage_count=0)
db.add(db_code)
db.commit()
return {"code": new_code}
elif mode == "bulk":
for _ in range(count):
code = generate_coupon().upper() # Convert to uppercase
db_code = Coupon(code=code, usage_count=0)
db.add(db_code)
new_codes.append(code)
db.commit()
return {"codes": new_codes}
else:
raise HTTPException(status_code=400, detail="Invalid mode")
@router.get("/list")
async def list_codes(page: int = 1, limit: int = 20, db: Session = Depends(get_db)):
"""
List paginated coupon codes sorted by usage count.
Args:
page (int): Page number.
limit (int): Items per page.
db (Session): Database session.
Returns:
dict: Paginated coupon data.
"""
offset = (page - 1) * limit
total_coupons = db.query(Coupon).count()
coupons = db.query(Coupon).order_by(Coupon.usage_count.desc()).offset(offset).limit(limit).all()
return {
"codes": [{"code": c.code, "used_at": format_cest_datetime(c.used_at) if c.used_at else None, "usage_count": c.usage_count} for c in coupons],
"total": total_coupons,
"page": page,
"limit": limit,
"total_pages": (total_coupons + limit - 1) // limit
}
@router.get("/search-codes")
def search_codes(query: str, db: Session = Depends(get_db)):
"""
Search coupon codes by partial match (case-insensitive).
Args:
query (str): Search query.
db (Session): Database session.
Returns:
list: Matching coupon codes.
"""
# Search with case-insensitive matching
codes = (
db.query(Coupon)
.filter(Coupon.code.ilike(f"%{query.upper()}%"))
.all()
)
return [{"code": c.code, "used": c.usage_count, "usage_count": c.usage_count, "used_at": format_cest_datetime(c.used_at) if c.used_at else None} for c in codes]
@router.post("/use-code")
async def use_code(item: dict, db: Session = Depends(get_db)):
"""
Mark a coupon code as used (only if not already used).
Args:
item (dict): Dictionary containing the code to mark as used.
db (Session): Database session.
Returns:
dict: Updated code and timestamp.
"""
code = item["code"].strip()
coupon = db.query(Coupon).filter(Coupon.code.ilike(code)).first()
if not coupon:
raise HTTPException(status_code=404, detail="Invalid code")
if coupon.usage_count >= 1:
raise HTTPException(status_code=400, detail="Coupon already used")
coupon.usage_count += 1
coupon.used_at = datetime.now(pytz.timezone('Asia/Kolkata'))
db.commit()
return {"code": coupon.code, "used_at": format_cest_datetime(coupon.used_at)}
@router.get("/check-code/{code}")
async def check_code(code: str, db: Session = Depends(get_db)):
"""
Check if a specific coupon code exists and its usage count.
Args:
code (str): Coupon code to check.
db (Session): Database session.
Returns:
dict: Code and usage count.
"""
# Use case-insensitive search to handle both cases
coupon = db.query(Coupon).filter(Coupon.code.ilike(code.strip())).first()
if not coupon:
raise HTTPException(status_code=404, detail="Code not found")
return {"code": coupon.code, "used": coupon.usage_count}
@router.post("/verify")
async def verify_coupon(coupon_req: dict, db: Session = Depends(get_db)):
"""
Verify and mark a coupon as used if it exists and is unused.
Args:
coupon_req (dict): Dictionary with 'code' key.
db (Session): Database session.
Returns:
dict: Success message and timestamp.
"""
raw_code = coupon_req["code"]
code = raw_code.strip()
coupon = db.query(Coupon).filter(Coupon.code.ilike(code)).first()
if not coupon:
raise HTTPException(status_code=404, detail="Invalid coupon code")
if coupon.usage_count >= 1:
raise HTTPException(status_code=400, detail="Coupon already used")
coupon.usage_count += 1
coupon.used_at = datetime.now(pytz.timezone('Asia/Kolkata'))
db.commit()
return {"message": "Coupon verified", "used_at": format_cest_datetime(coupon.used_at)}
@router.post("/upload-codes")
async def upload_codes(upload_data: CouponUpload, db: Session = Depends(get_db), request: Request = None):
"""
Upload multiple coupon codes from Excel data.
Args:
upload_data (CouponUpload): Pydantic model containing code list.
db (Session): Database session.
request (Request): Request object for auth check.
Returns:
dict: Upload summary (uploaded, skipped, total).
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
uploaded = 0
skipped = 0
for coupon_data in upload_data.codes:
try:
# Normalize code to uppercase
normalized_code = coupon_data.code.strip().upper()
# Check if code already exists
existing_coupon = db.query(Coupon).filter(Coupon.code == normalized_code).first()
if existing_coupon:
skipped += 1
continue
# Create new coupon with usage count from Excel
new_coupon = Coupon(
code=normalized_code, # Store as uppercase
usage_count=coupon_data.usage
)
db.add(new_coupon)
uploaded += 1
except Exception as e:
print(f"Error inserting code {coupon_data.code}: {e}")
skipped += 1
try:
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
return {
"uploaded": uploaded,
"skipped": skipped,
"total": len(upload_data.codes)
}
@router.post("/add-code")
def add_code(item: CodeItem, db: Session = Depends(get_db), request: Request = None):
"""
Add a single coupon code manually.
Args:
item (CodeItem): Coupon data from request body.
db (Session): Database session.
request (Request): Request object for auth check.
Returns:
dict: Success message.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
# Normalize code to uppercase for consistency
normalized_code = item.code.strip().upper()
existing = db.query(Coupon).filter(Coupon.code == normalized_code).first()
if existing:
raise HTTPException(status_code=400, detail="Code already exists")
new_coupon = Coupon(
code=normalized_code, # Store as uppercase
usage_count=max(0, item.usage)
)
db.add(new_coupon)
db.commit()
return {"message": "Code added successfully"}
@router.delete("/delete-code/{code}")
def delete_code(code: str, db: Session = Depends(get_db), request: Request = None):
"""
Delete a specific coupon code.
Args:
code (str): Coupon code to delete.
db (Session): Database session.
request (Request): Request object for auth check.
Returns:
dict: Success message.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
# Use case-insensitive search to handle both uppercase and lowercase codes
coupon = db.query(Coupon).filter(Coupon.code.ilike(code.strip())).first()
if not coupon:
raise HTTPException(status_code=404, detail="Code not found")
db.delete(coupon)
db.commit()
return {"message": "Code deleted successfully"}
# Translation file management
TRANSLATION_DIR = os.path.join(os.path.dirname(__file__), '..', 'translationfile')
TRANSLATION_DIR = os.path.abspath(TRANSLATION_DIR)
TRANSLATION_FILENAME = 'translation.xlsx'
TRANSLATION_PATH = os.path.join(TRANSLATION_DIR, TRANSLATION_FILENAME)
@router.post("/upload-translations")
async def upload_translation(file: UploadFile = File(...), request: Request = None):
"""
Upload a new translation Excel file. Stores the file on disk and saves the original filename in metadata.
Args:
file (UploadFile): The uploaded Excel file.
request (Request): Request object to check admin authentication.
Returns:
dict: Success message with original filename.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
# Create directory if it doesn't exist
if not os.path.exists(TRANSLATION_DIR):
os.makedirs(TRANSLATION_DIR, exist_ok=True)
# Check if a translation file already exists
if os.path.exists(TRANSLATION_PATH):
raise HTTPException(status_code=400, detail="A translation file already exists. Please delete it first.")
# Store the original filename in a metadata file
original_filename = file.filename or "translation.xlsx"
metadata_path = os.path.join(TRANSLATION_DIR, 'metadata.txt')
try:
# Read and save the uploaded file
content = await file.read()
with open(TRANSLATION_PATH, 'wb') as f:
f.write(content)
# Save the original filename to metadata file
with open(metadata_path, 'w') as f:
f.write(original_filename)
return {"message": "Translation file uploaded successfully", "filename": original_filename}
except Exception as e:
# Clean up if there was an error
if os.path.exists(TRANSLATION_PATH):
os.remove(TRANSLATION_PATH)
if os.path.exists(metadata_path):
os.remove(metadata_path)
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
@router.delete("/delete-translation")
def delete_translation(request: Request = None):
"""
Delete the uploaded translation file and its metadata.
Args:
request (Request): Request object to check admin authentication.
Returns:
dict: Success message if deletion was successful.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
metadata_path = os.path.join(TRANSLATION_DIR, 'metadata.txt')
files_deleted = []
# Delete the translation file
if os.path.exists(TRANSLATION_PATH):
os.remove(TRANSLATION_PATH)
files_deleted.append("translation file")
# Delete the metadata file
if os.path.exists(metadata_path):
os.remove(metadata_path)
files_deleted.append("metadata")
# Delete the translation directory if it exists and is empty
if os.path.exists(TRANSLATION_DIR) and not os.listdir(TRANSLATION_DIR):
os.rmdir(TRANSLATION_DIR)
files_deleted.append("directory")
if files_deleted:
return {"message": f"Translation file deleted successfully"}
else:
raise HTTPException(status_code=404, detail="No translation file found")
@router.get("/download-translation")
def download_translation(request: Request = None):
"""
Download the uploaded translation file with original filename.
Args:
request (Request): Request object to check admin authentication.
Returns:
StreamingResponse: Downloadable Excel file.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
if not os.path.exists(TRANSLATION_PATH):
raise HTTPException(status_code=404, detail="No translation file found")
# Get the original filename from metadata
metadata_path = os.path.join(TRANSLATION_DIR, 'metadata.txt')
original_filename = TRANSLATION_FILENAME # Default filename
if os.path.exists(metadata_path):
try:
with open(metadata_path, 'r') as f:
stored_filename = f.read().strip()
if stored_filename:
original_filename = stored_filename
except Exception:
# If we can't read metadata, use default filename
pass
# Return the file with proper headers
def file_generator():
with open(TRANSLATION_PATH, 'rb') as f:
while True:
chunk = f.read(8192) # 8KB chunks
if not chunk:
break
yield chunk
return StreamingResponse(
file_generator(),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename=\"{original_filename}\""}
)
@router.get("/translations/status")
def check_translation_file():
"""Check if translation file exists and return filename"""
file_exists = os.path.exists(TRANSLATION_PATH)
if not file_exists:
return {"file_exists": False, "file_name": None}
# Get the original filename from metadata
metadata_path = os.path.join(TRANSLATION_DIR, 'metadata.txt')
original_filename = TRANSLATION_FILENAME # Default filename
if os.path.exists(metadata_path):
try:
with open(metadata_path, 'r') as f:
stored_filename = f.read().strip()
if stored_filename:
original_filename = stored_filename
except Exception:
# If we can't read metadata, use default filename
pass
return {
"file_exists": True,
"file_name": original_filename
}
@router.get("/translations/latest")
def get_latest_translation():
"""
Legacy endpoint that returns the latest uploaded translation file.
Returns:
StreamingResponse: Downloadable Excel file.
"""
if not os.path.exists(TRANSLATION_PATH):
raise HTTPException(status_code=404, detail="No translation file found")
# Get the original filename from metadata for consistency
metadata_path = os.path.join(TRANSLATION_DIR, 'metadata.txt')
original_filename = TRANSLATION_FILENAME # Default filename
if os.path.exists(metadata_path):
try:
with open(metadata_path, 'r') as f:
stored_filename = f.read().strip()
if stored_filename:
original_filename = stored_filename
except Exception:
pass
return StreamingResponse(
open(TRANSLATION_PATH, 'rb'),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename=\"{original_filename}\""}
)