1
0
forked from crony/UpFast
UpFast/main.py

148 lines
4.9 KiB
Python
Raw Normal View History

2023-11-02 10:53:19 +01:00
import shutil
import magic
import re
2023-03-26 08:53:01 +02:00
from typing import Annotated, Union
2023-03-23 21:00:39 +01:00
from os import listdir, remove
from os.path import abspath, dirname, exists
from fastapi import FastAPI, Request, UploadFile, File, Header
from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
2023-11-01 16:32:47 +01:00
from pydantic_settings import BaseSettings, SettingsConfigDict
# Settings
class Settings(BaseSettings):
upload_only: bool = False
2023-11-01 16:32:47 +01:00
model_config = SettingsConfigDict(env_file=".env")
2023-03-23 21:00:39 +01:00
# file class
class _File:
def __init__(self, name: str, fileType: str, content=None):
self.name = name
self.fileType = fileType
2023-07-31 18:02:35 +02:00
self.content = content if content is not None else ""
2023-03-23 21:00:39 +01:00
# File list generator with filetype assignemt and content reading for previews
def file_list_generator(path: str, file_list: list[str]):
files = []
for file in file_list:
file_type = magic.from_file(f"{path}{file}", mime=True)
if re.search("^image/.*", file_type):
_file = _File(file, "image")
elif re.search("^video/.*", file_type):
_file = _File(file, "video")
elif re.search("^text/.*", file_type):
with open(f"{path}{file}") as f:
content = f.read()
_file = _File(file, "text", content)
2023-07-31 18:02:35 +02:00
elif file_type == "application/json":
2023-03-23 21:00:39 +01:00
with open(f"{path}{file}") as f:
content = f.read()
_file = _File(file, "text", content)
else:
_file = _File(file, "else")
files.append(_file)
return files
# Load jinja2 for templating
templates = Jinja2Templates(directory="templates")
2023-11-01 16:32:47 +01:00
# Create settings
settings = Settings()
2023-03-23 21:00:39 +01:00
# Create fastapi template
app = FastAPI()
# Mount all uploaded files at the /files path
2023-07-31 18:02:35 +02:00
app.mount("/files", StaticFiles(directory="upload"), name="upload")
2023-03-23 21:00:39 +01:00
# Mount static files like css
2023-07-31 18:02:35 +02:00
app.mount("/static", StaticFiles(directory="static"), name="static")
2023-03-23 21:00:39 +01:00
# show the homepage when just getting the website
@app.get("/", response_class=HTMLResponse)
2023-07-31 18:02:35 +02:00
async def index(
request: Request, user_agent: Annotated[Union[str, None], Header()] = None
):
2023-03-23 21:00:39 +01:00
if re.search("^curl/.*", str(user_agent)):
return PlainTextResponse("It fucking works!\n")
else:
2023-11-01 16:32:47 +01:00
context = {
"request": request,
"user_agent": user_agent,
"upload_only": settings.upload_only,
}
2023-03-23 21:00:39 +01:00
return templates.TemplateResponse("index.html", context)
# get the file user want's to upload and save it
@app.post("/")
2023-07-31 18:02:35 +02:00
async def upload(
request: Request,
file: UploadFile = File(...),
user_agent: Annotated[Union[str, None], Header()] = None,
):
2023-03-23 21:00:39 +01:00
file_location = f"upload/{file.filename}"
if exists(file_location):
2023-07-31 18:02:35 +02:00
context = f"""File: "{request.url._url}files/{file.filename}" already exists on the server!
Please rename the file or delete the one on the server\n"""
2023-03-23 21:00:39 +01:00
return PlainTextResponse(context)
with open(file_location, "wb+") as file_object:
shutil.copyfileobj(file.file, file_object)
if re.search("^curl/.*", str(user_agent)):
return PlainTextResponse(f"{request.url._url}files/{file.filename}\n")
else:
2023-07-31 18:02:35 +02:00
return {
"filename": file.filename,
"path": f"{request.url._url}files/{file.filename}",
}
2023-03-23 21:00:39 +01:00
# show the files page with the list of all files or if run with curl list all files in url format
@app.get("/files")
2023-07-31 18:02:35 +02:00
async def files(
request: Request, user_agent: Annotated[Union[str, None], Header()] = None
):
2023-03-23 21:00:39 +01:00
path = f"{dirname(abspath(__file__))}/upload/"
file_list = listdir(path)
files = file_list_generator(path, file_list)
2023-11-01 16:32:47 +01:00
context = {"request": request, "files": files, "upload_only": settings.upload_only}
2023-03-23 21:00:39 +01:00
if re.search("^curl/.*", str(user_agent)):
2023-11-01 16:32:47 +01:00
response = ""
2023-03-23 21:00:39 +01:00
for file in files:
2023-11-01 16:32:47 +01:00
response += f"{request.url._url}/{file.name}\n"
return PlainTextResponse(f"{response}")
2023-03-23 21:00:39 +01:00
else:
return templates.TemplateResponse("files.html", context)
# delete specific file when getting this request
@app.get("/delete/{file}")
2023-07-31 18:02:35 +02:00
async def delete(
request: Request,
file: str,
user_agent: Annotated[Union[str, None], Header()] = None,
):
2023-11-01 16:32:47 +01:00
if settings.upload_only:
return PlainTextResponse(
"This api endpoint is not available on upload only instance. \
If you wan't to delete a file ask the hoster of the instance!!"
)
else:
file_path = f"{dirname(abspath(__file__))}/upload/{file}"
if exists(file_path):
remove(file_path)
if re.search("^curl/.*", str(user_agent)):
return PlainTextResponse(f"file {file} deleted from the server\n")
2023-12-19 13:10:13 +01:00
return RedirectResponse(request.url_for("files"))
2023-03-23 21:00:39 +01:00
if re.search("^curl/.*", str(user_agent)):
2023-11-01 16:32:47 +01:00
return PlainTextResponse(f"file {file} doesn't exist on the server\n")
2023-12-19 13:10:13 +01:00
return RedirectResponse(request.url_for("files"))