Fix: Rename directory to remove & character causing shell issues

Renamed ebook_backend&admin_panel to ebook_backend_admin_panel
  The & character was being interpreted by shell as background
  process operator, causing 'Dockerfile not found' errors in Coolify.
This commit is contained in:
richardtekula
2025-11-11 17:06:39 +01:00
parent a3b609eab7
commit f78c2199e1
35 changed files with 2 additions and 1 deletions

View File

@@ -0,0 +1,514 @@
from fastapi import APIRouter, Depends, HTTPException, Form, status, Request, UploadFile, File
from fastapi.responses import JSONResponse, HTMLResponse, RedirectResponse
from sqlalchemy.orm import Session
from models.user import AdminUser
from utils.auth import get_db, hash_password, verify_password
from fastapi.templating import Jinja2Templates
from utils.template_loader import templates
from models.coupon import Coupon
from utils.coupon_utils import generate_coupon
from datetime import datetime
import pytz
from utils.timezone_utils import format_cest_datetime
from schemas import AdminLogin, CodeItem, CouponUpload
from fastapi.responses import StreamingResponse
import os
router = APIRouter()
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
"""
Render the admin login page.
Args:
request (Request): The incoming request object.
Returns:
HTMLResponse: Rendered login page.
"""
# return templates.TemplateResponse("admin_login.html", {"request": request})
return templates.TemplateResponse(request, "admin_login.html", {"data": "something"})
@router.get("/", response_class=HTMLResponse)
def admin_panel(request: Request):
"""
Render the admin dashboard if logged in.
Args:
request (Request): The incoming request object.
Returns:
HTMLResponse or RedirectResponse: Admin dashboard or redirect to login.
"""
if not request.cookies.get("admin_logged_in"):
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND)
# return templates.TemplateResponse("admin_dashboard.html", {"request": request})
return templates.TemplateResponse(request, "admin_dashboard.html", {"data": "something"})
@router.post("/admin/login")
def login(data: AdminLogin, db: Session = Depends(get_db)):
"""
Handle admin login and set authentication cookie.
Args:
data (AdminLogin): Login data with username and password.
db (Session): Database session.
Returns:
JSONResponse: Login status.
"""
user = db.query(AdminUser).filter_by(username=data.username).first()
if not user or not verify_password(data.password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid credentials")
response = JSONResponse(content={"status": "success"})
response.set_cookie("admin_logged_in", "true", httponly=True, samesite="strict")
return response
@router.post("/admin/logout")
def logout():
"""
Handle admin logout and clear the authentication cookie.
Returns:
JSONResponse: Logout status.
"""
response = JSONResponse(content={"status": "success"})
response.delete_cookie("admin_logged_in")
return response
@router.post("/generate")
async def generate_code(mode: str = Form(...), count: int = Form(1), db: Session = Depends(get_db),
request: Request = None):
"""
Generate coupon codes (single or bulk).
Args:
mode (str): 'single' or 'bulk'.
count (int): Number of codes to generate (used for bulk).
db (Session): Database session.
request (Request): Incoming request for auth check.
Returns:
dict: Generated codes.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
new_codes = []
if mode == "single":
new_code = generate_coupon().upper() # Convert to uppercase
db_code = Coupon(code=new_code, usage_count=0)
db.add(db_code)
db.commit()
return {"code": new_code}
elif mode == "bulk":
for _ in range(count):
code = generate_coupon().upper() # Convert to uppercase
db_code = Coupon(code=code, usage_count=0)
db.add(db_code)
new_codes.append(code)
db.commit()
return {"codes": new_codes}
else:
raise HTTPException(status_code=400, detail="Invalid mode")
@router.get("/list")
async def list_codes(page: int = 1, limit: int = 20, db: Session = Depends(get_db)):
"""
List paginated coupon codes sorted by usage count.
Args:
page (int): Page number.
limit (int): Items per page.
db (Session): Database session.
Returns:
dict: Paginated coupon data.
"""
offset = (page - 1) * limit
total_coupons = db.query(Coupon).count()
coupons = db.query(Coupon).order_by(Coupon.usage_count.desc()).offset(offset).limit(limit).all()
return {
"codes": [{"code": c.code, "used_at": format_cest_datetime(c.used_at) if c.used_at else None, "usage_count": c.usage_count} for c in coupons],
"total": total_coupons,
"page": page,
"limit": limit,
"total_pages": (total_coupons + limit - 1) // limit
}
@router.get("/search-codes")
def search_codes(query: str, db: Session = Depends(get_db)):
"""
Search coupon codes by partial match (case-insensitive).
Args:
query (str): Search query.
db (Session): Database session.
Returns:
list: Matching coupon codes.
"""
# Search with case-insensitive matching
codes = (
db.query(Coupon)
.filter(Coupon.code.ilike(f"%{query.upper()}%"))
.all()
)
return [{"code": c.code, "used": c.usage_count, "usage_count": c.usage_count, "used_at": format_cest_datetime(c.used_at) if c.used_at else None} for c in codes]
@router.post("/use-code")
async def use_code(item: dict, db: Session = Depends(get_db)):
"""
Mark a coupon code as used (only if not already used).
Args:
item (dict): Dictionary containing the code to mark as used.
db (Session): Database session.
Returns:
dict: Updated code and timestamp.
"""
code = item["code"].strip()
coupon = db.query(Coupon).filter(Coupon.code.ilike(code)).first()
if not coupon:
raise HTTPException(status_code=404, detail="Invalid code")
if coupon.usage_count >= 1:
raise HTTPException(status_code=400, detail="Coupon already used")
coupon.usage_count += 1
coupon.used_at = datetime.now(pytz.timezone('Asia/Kolkata'))
db.commit()
return {"code": coupon.code, "used_at": format_cest_datetime(coupon.used_at)}
@router.get("/check-code/{code}")
async def check_code(code: str, db: Session = Depends(get_db)):
"""
Check if a specific coupon code exists and its usage count.
Args:
code (str): Coupon code to check.
db (Session): Database session.
Returns:
dict: Code and usage count.
"""
# Use case-insensitive search to handle both cases
coupon = db.query(Coupon).filter(Coupon.code.ilike(code.strip())).first()
if not coupon:
raise HTTPException(status_code=404, detail="Code not found")
return {"code": coupon.code, "used": coupon.usage_count}
@router.post("/verify")
async def verify_coupon(coupon_req: dict, db: Session = Depends(get_db)):
"""
Verify and mark a coupon as used if it exists and is unused.
Args:
coupon_req (dict): Dictionary with 'code' key.
db (Session): Database session.
Returns:
dict: Success message and timestamp.
"""
raw_code = coupon_req["code"]
code = raw_code.strip()
coupon = db.query(Coupon).filter(Coupon.code.ilike(code)).first()
if not coupon:
raise HTTPException(status_code=404, detail="Invalid coupon code")
if coupon.usage_count >= 1:
raise HTTPException(status_code=400, detail="Coupon already used")
coupon.usage_count += 1
coupon.used_at = datetime.now(pytz.timezone('Asia/Kolkata'))
db.commit()
return {"message": "Coupon verified", "used_at": format_cest_datetime(coupon.used_at)}
@router.post("/upload-codes")
async def upload_codes(upload_data: CouponUpload, db: Session = Depends(get_db), request: Request = None):
"""
Upload multiple coupon codes from Excel data.
Args:
upload_data (CouponUpload): Pydantic model containing code list.
db (Session): Database session.
request (Request): Request object for auth check.
Returns:
dict: Upload summary (uploaded, skipped, total).
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
uploaded = 0
skipped = 0
for coupon_data in upload_data.codes:
try:
# Normalize code to uppercase
normalized_code = coupon_data.code.strip().upper()
# Check if code already exists
existing_coupon = db.query(Coupon).filter(Coupon.code == normalized_code).first()
if existing_coupon:
skipped += 1
continue
# Create new coupon with usage count from Excel
new_coupon = Coupon(
code=normalized_code, # Store as uppercase
usage_count=coupon_data.usage
)
db.add(new_coupon)
uploaded += 1
except Exception as e:
print(f"Error inserting code {coupon_data.code}: {e}")
skipped += 1
try:
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
return {
"uploaded": uploaded,
"skipped": skipped,
"total": len(upload_data.codes)
}
@router.post("/add-code")
def add_code(item: CodeItem, db: Session = Depends(get_db), request: Request = None):
"""
Add a single coupon code manually.
Args:
item (CodeItem): Coupon data from request body.
db (Session): Database session.
request (Request): Request object for auth check.
Returns:
dict: Success message.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
# Normalize code to uppercase for consistency
normalized_code = item.code.strip().upper()
existing = db.query(Coupon).filter(Coupon.code == normalized_code).first()
if existing:
raise HTTPException(status_code=400, detail="Code already exists")
new_coupon = Coupon(
code=normalized_code, # Store as uppercase
usage_count=max(0, item.usage)
)
db.add(new_coupon)
db.commit()
return {"message": "Code added successfully"}
@router.delete("/delete-code/{code}")
def delete_code(code: str, db: Session = Depends(get_db), request: Request = None):
"""
Delete a specific coupon code.
Args:
code (str): Coupon code to delete.
db (Session): Database session.
request (Request): Request object for auth check.
Returns:
dict: Success message.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
# Use case-insensitive search to handle both uppercase and lowercase codes
coupon = db.query(Coupon).filter(Coupon.code.ilike(code.strip())).first()
if not coupon:
raise HTTPException(status_code=404, detail="Code not found")
db.delete(coupon)
db.commit()
return {"message": "Code deleted successfully"}
# Translation file management
TRANSLATION_DIR = os.path.join(os.path.dirname(__file__), '..', 'translationfile')
TRANSLATION_DIR = os.path.abspath(TRANSLATION_DIR)
TRANSLATION_FILENAME = 'translation.xlsx'
TRANSLATION_PATH = os.path.join(TRANSLATION_DIR, TRANSLATION_FILENAME)
@router.post("/upload-translations")
async def upload_translation(file: UploadFile = File(...), request: Request = None):
"""
Upload a new translation Excel file. Stores the file on disk and saves the original filename in metadata.
Args:
file (UploadFile): The uploaded Excel file.
request (Request): Request object to check admin authentication.
Returns:
dict: Success message with original filename.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
# Create directory if it doesn't exist
if not os.path.exists(TRANSLATION_DIR):
os.makedirs(TRANSLATION_DIR, exist_ok=True)
# Check if a translation file already exists
if os.path.exists(TRANSLATION_PATH):
raise HTTPException(status_code=400, detail="A translation file already exists. Please delete it first.")
# Store the original filename in a metadata file
original_filename = file.filename or "translation.xlsx"
metadata_path = os.path.join(TRANSLATION_DIR, 'metadata.txt')
try:
# Read and save the uploaded file
content = await file.read()
with open(TRANSLATION_PATH, 'wb') as f:
f.write(content)
# Save the original filename to metadata file
with open(metadata_path, 'w') as f:
f.write(original_filename)
return {"message": "Translation file uploaded successfully", "filename": original_filename}
except Exception as e:
# Clean up if there was an error
if os.path.exists(TRANSLATION_PATH):
os.remove(TRANSLATION_PATH)
if os.path.exists(metadata_path):
os.remove(metadata_path)
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
@router.delete("/delete-translation")
def delete_translation(request: Request = None):
"""
Delete the uploaded translation file and its metadata.
Args:
request (Request): Request object to check admin authentication.
Returns:
dict: Success message if deletion was successful.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
metadata_path = os.path.join(TRANSLATION_DIR, 'metadata.txt')
files_deleted = []
# Delete the translation file
if os.path.exists(TRANSLATION_PATH):
os.remove(TRANSLATION_PATH)
files_deleted.append("translation file")
# Delete the metadata file
if os.path.exists(metadata_path):
os.remove(metadata_path)
files_deleted.append("metadata")
# Delete the translation directory if it exists and is empty
if os.path.exists(TRANSLATION_DIR) and not os.listdir(TRANSLATION_DIR):
os.rmdir(TRANSLATION_DIR)
files_deleted.append("directory")
if files_deleted:
return {"message": f"Translation file deleted successfully"}
else:
raise HTTPException(status_code=404, detail="No translation file found")
@router.get("/download-translation")
def download_translation(request: Request = None):
"""
Download the uploaded translation file with original filename.
Args:
request (Request): Request object to check admin authentication.
Returns:
StreamingResponse: Downloadable Excel file.
"""
if not request.cookies.get("admin_logged_in"):
raise HTTPException(status_code=401, detail="Unauthorized")
if not os.path.exists(TRANSLATION_PATH):
raise HTTPException(status_code=404, detail="No translation file found")
# Get the original filename from metadata
metadata_path = os.path.join(TRANSLATION_DIR, 'metadata.txt')
original_filename = TRANSLATION_FILENAME # Default filename
if os.path.exists(metadata_path):
try:
with open(metadata_path, 'r') as f:
stored_filename = f.read().strip()
if stored_filename:
original_filename = stored_filename
except Exception:
# If we can't read metadata, use default filename
pass
# Return the file with proper headers
def file_generator():
with open(TRANSLATION_PATH, 'rb') as f:
while True:
chunk = f.read(8192) # 8KB chunks
if not chunk:
break
yield chunk
return StreamingResponse(
file_generator(),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename=\"{original_filename}\""}
)
@router.get("/translations/status")
def check_translation_file():
"""Check if translation file exists and return filename"""
file_exists = os.path.exists(TRANSLATION_PATH)
if not file_exists:
return {"file_exists": False, "file_name": None}
# Get the original filename from metadata
metadata_path = os.path.join(TRANSLATION_DIR, 'metadata.txt')
original_filename = TRANSLATION_FILENAME # Default filename
if os.path.exists(metadata_path):
try:
with open(metadata_path, 'r') as f:
stored_filename = f.read().strip()
if stored_filename:
original_filename = stored_filename
except Exception:
# If we can't read metadata, use default filename
pass
return {
"file_exists": True,
"file_name": original_filename
}
@router.get("/translations/latest")
def get_latest_translation():
"""
Legacy endpoint that returns the latest uploaded translation file.
Returns:
StreamingResponse: Downloadable Excel file.
"""
if not os.path.exists(TRANSLATION_PATH):
raise HTTPException(status_code=404, detail="No translation file found")
# Get the original filename from metadata for consistency
metadata_path = os.path.join(TRANSLATION_DIR, 'metadata.txt')
original_filename = TRANSLATION_FILENAME # Default filename
if os.path.exists(metadata_path):
try:
with open(metadata_path, 'r') as f:
stored_filename = f.read().strip()
if stored_filename:
original_filename = stored_filename
except Exception:
pass
return StreamingResponse(
open(TRANSLATION_PATH, 'rb'),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename=\"{original_filename}\""}
)