https://datatracker.ietf.org/doc/html/rfc6838
https://www.iana.org/assignments/media-types/media-types.xhtml
https://developer.mozilla.org/en-US/docs/Glossary/MIME_type
WARNING:
- Browsers use the MIME type, not the file extension, to determine how to process a URL, so it's important that web servers send the correct MIME type in the response's Content-Type header. If this is not correctly configured, browsers are likely to misinterpret the contents of files, sites will not work correctly, and downloaded files may be mishandled.
https://developer.mozilla.org/en-US/docs/Learn/Server-side/Configuring_server_MIME_types
https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types
Server default configurations vary wildly
and set different default MIME-type
values for files with no defined content-type
.
Versions of the Apache Web Server before 2.2.7 were configured to report a MIME type of text/plain
or application/octet-stream
for unknown content types.
none
for files with unknown content types
.Nginx
will report text/plain
if you don't define a default content type.
Why MIME is Important:
If a web server or application reports an incorrect MIME type for content (including a "default type" for unknown content), a web browser has no way of knowing the author's intentions.
Some web browsers, such as Internet Explorer
, try to guess the correct MIME type.
https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types#types
There are two classes of type:
discrete and multipart.
Discrete types are types which represent a single file or medium, such as a single text or music file, or a single video.
A multipart type is one which represents a document that's comprised of multiple component parts, each of which may have its own individual MIME type; or, a multipart type may encapsulate multiple files being sent together in one transaction.
For example, multipart MIME types are used when attaching multiple files to an email.
https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types#structure_of_a_mime_type
A MIME type most-commonly consists of two parts
:
no whitespace between
:Each type has its own set of possible subtypes:
never just one or the other
.type/subtype;parameter=value
text/plain;charset=UTF-8
https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types#applicationoctet-stream
This is the default for binary files.
As it means unknown binary file, browsers usually don't execute it, or even ask if it should be executed. They treat it as if the Content-Disposition header was set to attachment, and propose a "Save As" dialog.
RAR-compressed files. In this case, the ideal would be the true type of the original files; this is often impossible as .RAR files can hold several resources of different types. In this case, configure the server to send application/x-rar-compressed.
https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types#textplain
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type
https://developer.mozilla.org/en-US/docs/Glossary/User_agent
A user agent is a computer program representing a person, for example, a browser in a Web context.
Besides a browser, a user agent could be a bot scraping webpages, a download manager, or another app accessing the Web.
Spam bots, download managers, and some browsers often send a fake UA string to announce themselves as a different client. This is known as user agent spoofing.
The user agent string can be accessed with JavaScript on the client side using the NavigatorID.userAgent property.
A typical user agent string looks like this: "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:35.0) Gecko/20100101 Firefox/35.0".
https://fastapi.tiangolo.com/tutorial/body-updates/?h=put#update-replacing-with-put
PUT
is used to receive data that should replace the existing data.class Item(BaseModel):
name: Union[str, None] = None
description: Union[str, None] = None
price: Union[float, None] = None
tax: float = 10.5
tags: List[str] = []
items = {
"foo": {"name": "Foo", "price": 50.2},
"bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
"baz": {"name": "Baz", "description": None, "price": 50.2, "tax": 10.5, "tags": []},
}
@app.put("/items/{item_id}", response_model=Item)
async def update_item(item_id: str, item: Item):
update_item_encoded = jsonable_encoder(item)
items[item_id] = update_item_encoded
return update_item_encoded
Warning about replacing
That means that if you want to update the item bar using PUT with a body containing:
{
"name": "Barz",
"price": 3,
"description": None,
}
because it doesn't include the already stored attribute "tax": 20.2, the input model would take the default value of "tax": 10.5.
And the data would be saved with that "new" tax of 10.5.
https://fastapi.tiangolo.com/tutorial/body-updates/?h=put#partial-updates-with-patch
partial updates.
only send the fields they want to change
to your API and omit the ones that shouldn't change.This is the usual way of implementing a PATCH endpoint.
two arguments
:
id property (from the path)
Now for the interesting part: updating the existing object.
Transform PostPartialUpdate into a dictionary with the dict method.
Then, on our existing post_db database instance, we call the copy method.
This is a useful method to clone a Pydantic object into another instance.
The nice thing about this method is that it even accepts an update argument.
This argument expects a dictionary with all the fields that should be updated during the copy: that's exactly what we want to do with our updated_fields dictionary!
class PostBase(BaseModel):
title: str
content: str
class PostCreate(PostBase):
pass
class PostPublic(PostBase):
id: int
class PostDB(PostBase):
id: int
nb_views: int = 0
class PostPartialUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
@app.patch(
"/posts/{id}",
response_model=PostPublic)
async def partial_update(
id: int,
post_update: PostPartialUpdate
):
try:
post_db = db.posts[id]
updated_fields = post_update.dict(exclude_unset=True)
updated_post = post_db.copy(update=updated_fields)
db.posts[id] = updated_post
return updated_post
except KeyError:
raise HTTPException(status.HTTP_404_NOT_FOUND)
# dummy database
posts = {
1: Post(title="Hello", nb_views=100),
}
@app.put("/posts/{id}")
async def update_or_create_post(
id: int,
post: Post,
response: Response
):
if id not in posts:
response.status_code = status.HTTP_201_CREATED
posts[id] = post
return posts[id]
backend/route.py
from fastapi import FastAPI, WebSocket
from starlette.websockets import WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
except WebSocketDisconnect:
await websocket.close()
~/tests/conftest.py
@pytest.fixture
def websocket_client():
with TestClient(app) as websocket_client:
yield websocket_client
@pytest.mark.asyncio
async def test_websocket_echo(websocket_client: TestClient):
with websocket_client.websocket_connect("/ws") as websocket:
websocket.send_text("Hello")
message = websocket.receive_text()
assert message == "Message text was: Hello"
https://fastapi.tiangolo.com/advanced/testing-websockets/?h=websock
endpoint
from fastapi import FastAPI
from fastapi.testclient import TestClient
from fastapi.websockets import WebSocket
app = FastAPI()
@app.get("/")
async def read_main():
return {"msg": "Hello World"}
@app.websocket_route("/ws")
async def websocket(websocket: WebSocket):
await websocket.accept()
await websocket.send_json({"msg": "Hello WebSocket"})
await websocket.close()
tests
def test_read_main():
client = TestClient(app)
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"msg": "Hello World"}
def test_websocket():
client = TestClient(app)
with client.websocket_connect("/ws") as websocket:
data = websocket.receive_json()
assert data == {"msg": "Hello WebSocket"}
https://www.starlette.io/testclient/#testing-websocket-sessions
from fastapi import FastAPI, WebSocket, Request
from fastapi.responses import HTMLResponse
from BybitWebsocket import BybitWebsocket
import bybit
import os
from collections import deque
from sse_starlette.sse import EventSourceResponse
import asyncio
key = os.environ.get('API_PRIVATE_KEY')
key_secret = os.environ.get('PRIVATE_KEY')
ws = BybitWebsocket(wsURL="wss://stream.bybit.com/realtime", api_key=key, api_secret=key_secret)
ws.subscribe_orderBookL2("BTCUSD")
app = FastAPI()
html_bybit = """
<!DOCTYPE html>
<html>
<head>
<style>
* {
background-color: black;
color: white;
}
</style>
<title>BTC/USD</title>
</head>
<body>
<ul id='messages'>
</ul>
<script>
const evtSource = new EventSource("http://localhost:8000/status/stream");
evtSource.addEventListener("update", function(event) {
// Logic to handle status updates
console.log(event)
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
});
//not implemented
evtSource.addEventListener("end", function(event) {
console.log('Handling end....')
evtSource.close();
});
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html_bybit)
async def stream_gen_deck(request):
deck = deque('0')
while True:
if await request.is_disconnected():
print('disconnected')
# logger.debug('Request disconnected')
break
d = ws.get_data("orderBookL2_25.BTCUSD")
if not isinstance(d, list):
for p in d['update']:
if len(deck) < 10:
deck.appendleft(p['price'])
else:
deck.pop()
await asyncio.sleep(1)
yield {'event': 'update',
'data': deck[0]}
@app.get('/status/stream')
async def runStatus(request: Request):
event_generator = stream_gen_deck(request)
return EventSourceResponse(event_generator)
@router.post("/order_checkout")
def checkout_order(
background_tasks: BackgroundTasks,
cart: pm.GetCheckout,
current_user: User = Depends(fastapi_users.get_current_active_user)
) -> JSONResponse:
try:
cart_items = cart.cart_items
ship_addr = cart_items['shipping_address']
shipto_id = ship_addr['_id']['$oid']
shipto = MShipTo.objects.get(id=shipto_id)
for item in cart_items['cart']:
order = MOrder(shipto_ref=[shipto])
order.quantity = int(item['quantity'])
order.price = float(item['price'])
product_ref = Products.objects.get(sku=item['product'])
order.product_ref = [product_ref]
order.save()
except Exception as e:
print(e)
try:
items = ''.join([f"<li>{i['product']}</li>" for i in cart_items['cart']])
html_email = Utils.html_email(items)
recipients = notification_email_list(current_user.email)
print('recipients: ', recipients)
message = MessageSchema(subject=settings.MAIL_QUE_SUBJECT,
recipients=recipients,
body=html_email,
subtype="html")
fm = FastMail(Utils.conf)
background_tasks.add_task(fm.send_message, message)
except Exception as e:
print(e)
finally:
return JSONResponse(status_code=200, content={"message": "email has been sent"})
@router.post("/order_checkout")
async def checkout_order(
cart: pm.GetCheckout,
current_user: User = Depends(fastapi_users.get_current_active_user)
) -> JSONResponse:
try:
cart_items = cart.cart_items
ship_addr = cart_items['shipping_address']
shipto_id = ship_addr['_id']['$oid']
shipto = MShipTo.objects.get(id=shipto_id)
for item in cart_items['cart']:
order = MOrder(shipto_ref=[shipto])
order.quantity = int(item['quantity'])
print('len', len(item['product']))
product_ref = Products.objects.get(sku=item['product'])
order.product_ref = [product_ref]
order.save()
except Exception as e:
print(e)
try:
items = ''.join([f"<li>{i['product']} -- x{i['quantity']}</li>" for i in cart_items['cart']])
html_email = Utils.html_email(items)
recipients = notification_email_list(current_user.email)
print('recipients: ', recipients)
message = MessageSchema(subject=settings.MAIL_QUE_SUBJECT,
recipients=recipients,
body=html_email,
subtype="html")
fm = FastMail(Utils.conf)
await fm.send_message(message)
except Exception as e:
print(e)
finally:
return JSONResponse(status_code=200, content={"message": "email has been sent"})
pip install python-multipart
1. Upload Standard
https://stackoverflow.com/questions/63048825/how-to-upload-file-using-fastapi
from fastapi import File, UploadFile
@app.post("/upload")
def upload(file: UploadFile = File(...)):
try:
contents = file.file.read()
with open(file.filename, 'wb') as f:
f.write(contents)
except Exception:
return {"message": "There was an error uploading the file"}
finally:
file.file.close()
return {"message": f"Successfully uploaded {file.filename}"}
2. Upload File in Chuncks
https://stackoverflow.com/questions/63048825/how-to-upload-file-using-fastapi
from fastapi import File, UploadFile
@app.post("/upload")
def upload(file: UploadFile = File(...)):
try:
with open(file.filename, 'wb') as f:
while contents := file.file.read(1024 * 1024):
f.write(contents)
except Exception:
return {"message": "There was an error uploading the file"}
finally:
file.file.close()
return {"message": f"Successfully uploaded {file.filename}"}
import aiofiles
class AsyncFile:
@staticmethod
async def write_chunked_bytes_local(in_file: UploadFile, save_path: Path):
async with aiofiles.open(save_path, 'wb') as out_file:
while content := await in_file.read(1024): # async read chunk
await out_file.write(content) # async write chunk
return {"Result": "OK"}
@staticmethod
async def write_bytes_local(in_file: UploadFile, save_path: Path):
"""Save file bytes to file locally.
Args:
in_file (UploadFile): UploadFile.
save_path (Path): Location to save file to.
Returns:
_type_: HTTP_200
"""
async with aiofiles.open(save_path, 'wb') as out_file:
content = await in_file.read()# async read
await out_file.write(content)# async write
return {"Result": "OK"}
class TempCopy:
def save_upload_file_tmp(self, upload_file: UploadFile) -> Path:
try:
suffix = Path(upload_file.filename).suffix
with NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
shutil.copyfileobj(upload_file.file, tmp)
tmp_path = Path(tmp.name)
finally:
upload_file.file.close()
return tmp_path
def handle_upload_file(
self,
upload_file: UploadFile,
handler: Callable[[Path], None]
) -> None:
tmp_path = self.save_upload_file_tmp(upload_file)
try:
handler(tmp_path) # Do something with the saved temp file
finally:
tmp_path.unlink() # Delete the temp file
def save_upload_file(self, upload_file: UploadFile, destination: Path) -> None:
# https://github.com/tiangolo/fastapi/issues/426#issuecomment-542828790
try:
with destination.open("wb") as buffer:
shutil.copyfileobj(upload_file.file, buffer)
finally:
upload_file.file.close()
from pydantic import BaseModel
from typing import List, Tuple
import numpy as np
import cv2
from fastapi import UploadFile
class RGBASchema(BaseModel):
image_rgba: List[Tuple[int, int, int, int]]
async def read_opencv(
image: UploadFile = File(...)
) -> RGBASchema:
data = np.fromfile(image.file, dtype=np.uint8)
image = cv2.imdecode(data, cv2.IMREAD_UNCHANGED)
gray_img = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
return gray_img
class FileResponseParsers:
def save_local_V1(self, doc):
cv2.imwrite(self.output_file, doc)
def save_V4(self, content_type, arr):
im = PILImage.fromarray(arr)
buf = io.BytesIO()
im.save(buf, format='PNG')
buf.seek(0)
return StreamingResponse(buf, media_type="image/png")
def save_V3(self, content_type, img):
ext = content_type.split('/')[-1]
im = PILImage.fromarray(img)
with io.BytesIO() as buf:# save image to an in-memory bytes buffer
im.save(buf, format=ext)
im_bytes = buf.getvalue()
buf.close()
return im_bytes
def save_V2(self, content_type, img):
from imageio import v3 as iio
ext = content_type.split('/')[-1]
with io.BytesIO() as buf:
iio.imwrite(buf, img, plugin="pillow", format=ext)
im_bytes = buf.getvalue()
buf.close()
return im_bytes
def save_V1(self, content_type, arr):
ext = content_type.split('/')[-1]
is_success, buffer = cv2.imencode(f".{ext}", arr)
if is_success:
return buffer.tobytes()
raise Exception('File Error: np.array could not be encoded')
Context:
Lets say you need an endpoint to return a file, to serve as a report for some data in your database.
There are 2x ways your probably going to call this endpoint.
Different response types should be used, depending on the use case.
Example 1 - Non - Client Side:
Actions:
Upload a file to an endpoint that expects UploadFile
.
Process that file.
Convert the now processed file to bytes.
Return file in a response.
This is our endpoint to receive the file.
@app.post("/file", response_class=Response)
async def detect_image(file: UploadFile):
try:
# process image
detect = DetectDocument.from_upload_file(file)
edged = detect.preprocess_image()
np_arr = detect.detect_edges(edged)
# get extension of file.
ext = content_type.split('/')[-1]
is_success, buffer = cv2.imencode(f".{ext}", np_arr)
if is_success:
view = buffer.tobytes()
headers = {'Content-Disposition': f'inline; filename="{file.filename}"'}
resp = Response(view, headers=headers, media_type=file.content_type)
logger.info(f'Headers: {resp.headers}')
logger.info(f'Success!!')
return resp
except Exception as err:
err_msg = f'Error: {err}'
logger.error(err_msg)
raise HTTPException(HTTP_400_BAD_REQUEST, detail=err_msg)
img = 'tests/cam7.jpg'
@pytest.mark.asyncio
class TestFile:
async def test_upload_file(self, conf, test_client: AsyncClient):
payload = {'file': open(img, 'rb')}
r = await test_client.post("/file", files=payload)
assert r.status_code == HTTP_200_OK
assert len(r.content) == int(r.headers['content-length'])
assert 'Content-Disposition' in r.headers
img_name = img.split('/')[-1]
with open(f"save_{img_name}", 'wb') as f:
f.write(r.content)
f.close()
async def report_one(
db: AsyncIOMotorDatabase = Depends(get_database)
) -> StreamingResponse:
try:
df = await report_df()
resp = StreamingResponse(
iter([df]),
media_type="text/csv"
)
resp.headers["Content-Disposition"] = f"attachment; filename=excel_report.txt"
# or
# resp.headers["Content-Disposition"] = f"inline; filename=excel_report.txt"
resp.headers['Content-Type'] = "application/octet-stream"
return resp
except Exception as e:
logger.warn(str(e))
raise HTTPException(detail=e, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
async def report_two(db: AsyncIOMotorDatabase = Depends(get_database)):
query = db[CollectionName.Coll].aggregate(excel_report, allowDiskUse=True)
mongo_q = loads(dumps([i async for i in query], default=json_serialize_oid))
df = pd.json_normalize(mongo_q)
df = df[['name', 'num', 'date', '_po', 'amount', 'tax']]
with io.StringIO() as buf:
df.to_csv(
buf,
index=False,
header=False,
decimal='.',
sep=',',
float_format='%.2f'
)
view = buf.getvalue()
buf.close()
return StreamingResponse(
iter([view]),
media_type='text/csv',
headers={
'Content-Disposition': 'attachment;filename=csv_report.txt',
'Access-Control-Expose-Headers': 'Content-Disposition'
}
)
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse, HTMLResponse
import xlsxwriter
from fastapi.templating import Jinja2Templates
import pandas as pd
from fastapi import File, UploadFile
from io import StringIO, BytesIO
import orjson
from survey.repository.respondent import RespondentRepository
router = APIRouter()
templates = Jinja2Templates(directory="templates")
@router.get("/respondents/xlsx", response_description='xlsx')
async def create_respondent_report_xlsx():
repo = RespondentRepository()
result = await repo.get_all_respondent()
output = BytesIO()
workbook = xlsxwriter.Workbook(output)
worksheet = workbook.add_worksheet()
worksheet.write(0, 0, 'ID')
worksheet.write(0, 1, 'First Name')
worksheet.write(0, 2, 'Last Name')
worksheet.write(0, 3, 'Age')
worksheet.write(0, 4, 'Gender')
worksheet.write(0, 5, 'Married?')
row = 1
for respondent in result:
worksheet.write(row, 0, respondent["id"])
worksheet.write(row, 1, respondent["fname"])
worksheet.write(row, 2, respondent["lname"])
worksheet.write(row, 3, respondent["age"])
worksheet.write(row, 4, respondent["gender"])
worksheet.write(row, 5, respondent["marital"])
row += 1
workbook.close()
output.seek(0)
headers = {
'Content-Disposition': 'attachment; filename="list_respondents.xlsx"'
}
return StreamingResponse(output, headers=headers)
@router.get("/respondents/csv", response_description='csv')
async def create_respondent_report_csv():
repo = RespondentRepository()
result = await repo.get_all_respondent()
ids = [ item["id"] for item in result ]
fnames = [ f'{item["fname"]}' for item in result ]
lnames = [ f'{item["lname"]}' for item in result ]
ages = [ item["age"] for item in result ]
genders = [ f'{item["gender"]}' for item in result ]
maritals = [ f'{item["marital"]}' for item in result ]
dict = {'Id': ids, 'First Name': fnames, 'Last Name': lnames, 'Age': ages,
'Gender': genders, 'Married?': maritals}
df = pd.DataFrame(dict)
outFileAsStr = StringIO()
df.to_csv(outFileAsStr, index = False)
return StreamingResponse(
iter([outFileAsStr.getvalue()]),
media_type='text/csv',
headers={
'Content-Disposition': 'attachment;filename=list_respondents.csv',
'Access-Control-Expose-Headers': 'Content-Disposition'
}
)
@router.post("/upload/csv")
async def upload_csv(file: UploadFile = File(...)):
df = pd.read_csv(StringIO(str(file.file.read(), 'utf-8')), encoding='utf-16')
return orjson.loads(df.to_json(orient='split'))
@router.get("/upload/survey/form", response_class = HTMLResponse)
def upload_survey_form(request:Request):
return templates.TemplateResponse("upload_survey.html", {"request": request})
@router.post("/upload/survey/form")
async def submit_survey_form(request: Request, file: UploadFile = File(...)):
df = pd.read_csv(StringIO(str(file.file.read(), 'utf-8')), encoding='utf-8')
return templates.TemplateResponse('render_survey.html', {'request': request, 'data': df.to_html()})
Headers:
To indicate to the browser that the file should be viewed in the browser, the HTTP response should include these headers:
Content-Type: application/pdf
Content-Disposition: inline; filename="filename.pdf"
To have the file downloaded rather than viewed:
Content-Type: application/pdf
Content-Disposition: attachment; filename="filename.pdf"
The quotes around the filename are required if the filename contains special characters such as filename[1].pdf which may otherwise break the browser's ability to handle the response.
How you set the HTTP response headers will depend on your HTTP server (or, if you are generating the PDF response from server-side code: your server-side programming language).
To force download:
Content-Type: application/octet-stream.
@router.post("/uploadform")
async def upload_form(
file: UploadFile = File(...),
sku: str = Form(...)
):
try:
blob = bucket.blob(os.path.join('product_images', file.filename))
file_string = file.file.read()
blob.upload_from_string(file_string, content_type=file.content_type)
url = blob.public_url
Products.objects(sku=sku).update(gcs_img_url=url)
except Exception as e:
print('EXCEPTION: /uploadform')
raise HTTPException(status_code=400, detail=e)
return JSONResponse(status_code=200,
content={"message": f'{file.filename}'})
@router.post("/uploadform")
async def upload_form(
file: UploadFile = File(...),
sku: str = Form(...)
):
blob = bucket.blob(os.path.join('product_images', file.filename))
f = file.file.read()
blob.upload_from_string(f, content_type=file.content_type)
url = blob.public_url
Products.objects(sku=sku).update(gcs_img_url=url)
return ''
# https://fastapi.tiangolo.com/tutorial/request-files/#import-file
@router.post("/uploadfile")
async def create_upload_file(
file: UploadFile = File(...),
current_user: User = Depends(fastapi_users.get_current_active_user)
) -> Dict:
image_url = upload_image_file(
file,
file.content_type,
'product_images',
file.filename)
product = Products.objects.get(id=product_sku)
return {"filename": file.filename}
Stream file upload and read into memory
async def get_aws_textract_or_404(
file: UploadFile = File(...)
) -> StreamingResponse:
print('content type: ', file.content_type)
if file.content_type:
textract = from_upload_file(await file.read())
key_map, value_map, block_map = textract.get_map()
kvr = get_dict(key_map, value_map, block_map)
df = pd.DataFrame.from_dict(kvr, orient='index')
resp = StreamingResponse(StringIO(df.to_csv()), media_type="text/csv")
resp.headers["Content-Disposition"] = f"attachment; filename=export-{dt.now().strftime('%Y-%m-%d %H:%M')}.csv"
return resp
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
file_path = os.path.join(basedir, f'static/report_styles/{file_name}')
df.to_csv(
file_path,
index=False,
header=False,
decimal='.',
sep=',',
float_format='%.2f')
return FileResponse(path=file_path,
filename=file_name,
headers={f"Content-Disposition": "attachment; filename={file_name}",
'Content-Type': "application/octet-stream"}
)
import graphene
from graphene.relay import Node
from starlette.graphql import GraphQLApp
from ..models import User as UserModel
from ..models import Post as PostModel
from models import UserInfo, Blog
from graphene_mongo import MongoengineConnectionField, MongoengineObjectType
from graphene_sqlalchemy import SQLAlchemyObjectType
from werkzeug.security import (
generate_password_hash,
check_password_hash
)
from json import dumps, loads
from fastapi.responses import RedirectResponse
from fastapi import APIRouter, Request, Depends, BackgroundTasks, HTTPException, status
import bcrypt
import jwt
from datetime import timedelta, datetime
from pydantic import BaseModel
from datetime import timedelta
from graphql import GraphQLError
from bson import ObjectId
secret_key = "09ww11wwqqaa6cac818166b7a9563b93f70233,lf,l;sd,fsf7"
algorithm = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
mutation AuthenUser {
authenUser(username: "tests", password: "passwords") {
token
}
}
class User(MongoengineObjectType):
class Meta:
model = UserModel
interfaces = (Node,)
class Post(MongoengineObjectType):
class Meta:
model = PostModel
interfaces = (Node,)
class Comment(MongoengineObjectType):
class Meta:
model = CommentModel
interfaces = (Node,)
class UserInfoSchema(MongoengineObjectType):
class Meta:
model = UserModel
interfaces = (Node,)
class Query(graphene.ObjectType):
node = Node.Field()
users = graphene.List(User)
all_users = MongoengineConnectionField(User)
all_posts = MongoengineConnectionField(Post)
# all_comment = MongoengineConnectionField(Comment)
# get_post_field = graphene.Field(Post)
# get_user_name = graphene.String(name=graphene.String(default_value="stranger"))
# get_user_email = graphene.String(email=graphene.String(default_value="stranger"))
get_user_name = graphene.Field(User, name=graphene.String())
get_user_email = graphene.Field(User, email=graphene.String())
get_user_post = graphene.Field(Post, id=graphene.String())
get_user_login = graphene.Field(User,
email=graphene.String(),
password=graphene.String())
def resolve_get_user_login(self, info, email, password):
user = UserModel.objects(email=email).first()
if user and user.check_password(password):
return user
else:
return 'failed'
def resolve_get_user_name(self, info, name):
return UserModel.objects(name=name).first()
def resolve_get_user_email(self, info, email):
return UserModel.objects(email=email).first()
def resolve_get_user_post(self, info, id):
return PostModel.objects(id=ObjectId(id)).first()
def create_user(db: Session, user: schemas.UserCreate):
hashed_password = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt())
db_user = models.UserInfo(username=user.username, password=hashed_password, fullname=user.fullname)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def check_username_password(db: Session, user: schemas.UserAuthenticate):
db_user_info: models.UserInfo = get_user_by_username(db, username=user.username)
return bcrypt.checkpw(user.password.encode('utf-8'), db_user_info.password.encode('utf-8'))
def decode_access_token(*, data: str):
to_decode = data
return jwt.decode(to_decode, secret_key, algorithm=algorithm)
class UserInfoBase(BaseModel):
username: str
class UserCreate(UserInfoBase):
password: str
# class UserCreate(UserInfoBase):
# fullname: str
# password: str
# class Config:
# orm_mode = True
class UserAuthenticate(BaseModel):
email: str
password: str
class UserAuthenticate(UserInfoBase):
password: str
class UserInformation(UserInfoBase):
id: int
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str = None
class BlogBase(BaseModel):
title: str
content: str
class BlogInformation(BlogBase):
id: int
class Config:
orm_mode = True
class UserInfo(Base):
__tablename__ = "user_info"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True)
password = Column(String)
fullname = Column(String, unique=True)
class Blog(Base):
__tablename__ = "blog"
id = Column(Integer, primary_key=True, index=True)
title = Column(String)
content = Column(String)
class User(MongoengineObjectType):
class Meta:
model = UserModel
interfaces = (Node,)
class UserInfoSchema(SQLAlchemyObjectType):
class Meta:
model = UserInfo
class BlogSchema(SQLAlchemyObjectType):
class Meta:
model = Blog
def get_user_by_username(db: Session, username: str):
return db.query(models.UserInfo).filter(models.UserInfo.username == username).first()
# class CreateUser(graphene.Mutation):
class SignupUser(graphene.Mutation):
class Arguments:
username = graphene.String(required=True)
password = graphene.String(required=True)
# fullname = graphene.String()
ok = graphene.Boolean()
# user = graphene.Field(lambda: UserInfoSchema)
user = graphene.Field(lambda: User)
@staticmethod
def mutate(root, info, username, password):
# def mutate(root, info, username, password, fullname, ):
hashed_password = bcrypt.hashpw(password.encode('utf-8'),
bcrypt.gensalt())
# user = UserInfoSchema(username=username, password=hashed_password, fullname=fullname)
# user = UserModel(email=username,
# password=hashed_password).save()
ok = True
# db_user = crud.get_user_by_username(db, username=username)
db_user = UserModel.objects(email=username).first()
if db_user:
raise GraphQLError("Username already registered")
# user_info = UserCreate(username=username, password=password, fullname=fullname)
# crud.create_user(db, user_info)
# return CreateUser(user=user, ok=ok)
user = UserModel(email=username,
password=str(hashed_password)).save()
# crud.cre(db, user_info)
# return CreateUser(user=user, ok=ok)
return SignupUser(user=user, ok=ok)
def create_access_token(*, data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm)
return encoded_jwt
class LoginUser(graphene.Mutation):
# class AuthenUser(graphene.Mutation):
class Arguments:
username = graphene.String(required=True)
password = graphene.String(required=True)
token = graphene.String()
@staticmethod
def mutate(root, info, username, password):
# only search for user with email.
db_user = UserModel.objects(email=username).first()
# db_user = crud.get_user_by_username(db, username=username)
# what is the point of this?
# user_authenticate = UserAuthenticate(email=username,
# password=password)
if db_user is None:
raise GraphQLError("Username not existed")
else:
is_password_correct = db_user.check_password(password)# return True
# is_password_correct = crud.check_username_password(db, user_authenticate)
if is_password_correct is False:
raise GraphQLError("Incorrect Password.")
else:
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": username},
expires_delta=access_token_expires)
# return AuthenUser(token=access_token)
return LoginUser(token=access_token)
class AuthMutations(graphene.ObjectType):
# class MyMutations(graphene.ObjectType):
# user = CreateUser.Field()
signup_user = SignupUser.Field()
login_user = LoginUser.Field()
# authen_user = AuthenUser.Field()
# create_new_blog = CreateNewBlog.Field()
router = APIRouter()
router.add_route("/",
GraphQLApp(schema=graphene.Schema(query=Query,
types=[User, Post],
mutation=AuthMutations
)
)
)
async def read_upload_image(
file: UploadFile = File(...)):
doc = Document(BytesIO(await file.read()))
pdfimages_out = RawImageOutput(doc)
print(pdfimages_out.get(0))
img = pdfimages_out.get(0)
fp = STATIC_DIR + '/' + IMG_FILE
img.save(fp)
time.sleep(2)
os.remove(fp)
with tempfile.TemporaryFile() as fp:
fp.write(img)
img.save(fp.seek(0))
fp.seek(0)
f = fp.read()
print(f)
return
class Faces(BaseModel):
faces: List[Tuple[int, int, int, int]]
async def read_upload_image(
image: UploadFile = File(...)) -> Faces:
data = np.fromfile(image, dtype=np.uint8)
image = cv2.imdecode(data, cv2.IMREAD_UNCHANGED)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
return data
def predict(file: UploadFile = File(...)):
content = file.file.read()
image = Image.open(io.BytesIO(content)).convert('L')
output = model.predict_from_image(image)
return output
from PIL import Image, ImageFilter
import io
def apply_filter (file: object, filter: str) -> object:
"""
TODO:
1. Accept the image as file object, and the filter type as string
2. Open the as an PIL Image object
3. Apply the filter
4. Convert the PIL Image object to file object
5. Return the file object
"""
image = Image.open(file)
image = image.filter(eval(f"ImageFilter. {filter.upper()} " ))
file = io.BytesIO()
image.save(file, "JPEG")
file.seek(0)
return file
Convert AWS Textract result to csv file in memory and return as stream.
async def get_aws_textract_or_404(
file: UploadFile = File(...)) -> StreamingResponse:
if file.content_type:
textract = AWSTextract.from_upload_file(await file.read())
textract.aws_region_name = settings.AWS_REGION
key_map, value_map, block_map = textract.get_kv_map()
kvr = textract.get_kv_relationship(key_map, value_map, block_map)
df = pd.DataFrame.from_dict(kvr, orient='index')
resp = StreamingResponse(StringIO(df.to_csv()),
media_type=settings.CSV_MEDIA_TYPE)
resp.headers["Content-Disposition"] = settings.CSV_RESULT_FILENAME
return resp
raise HTTPException(status_code=HTTP_404_NOT_FOUND)
with pytest-cov
Multiple file upload
async def test_multi_file_base(
self,
page_1_jpg,
page_1_pdf,
test_client: AsyncClient):
f1 = {
"file_list": ("file_1", open(page_1_jpg, 'rb'), "multipart/form-data"),
"file_list": ("file_2", open(page_1_pdf, 'rb'), "multipart/form-data")
}
resp = await test_client.post(
"/api/v1/ocr/multiple",
files=f1)
assert resp.status_code == HTTP_200_OK
files = [
('images', ('foo.png', open('foo.png', 'rb'), 'image/png')),
('images', ('bar.png', open('bar.png', 'rb'), 'image/png'))
]
r = httpx.post("https://httpbin.org/post", files=files)
@pytest.mark.asyncio
class TestAWSTextract:
async def test_textract_jpg_upload(
self,
page_1_jpg,
conf,
test_client: AsyncClient):
payload = {'file': open(page_1_jpg, 'rb')}
r = await test_client.post(
"/api/v1/image",
files=payload
)
assert r.status_code == HTTP_200_OK
assert 'Content-Disposition' in r.headers
assert r.headers['Content-Disposition'] == conf.CSV_RESULT_FILENAME
@pytest.mark.asyncio
class TestTest:
async def test_pdf_with_image(
self,
pdf_text_1,
pdf_image_1,
page_1_jpg,
conf,
test_client: AsyncClient):
# payload = {'file': open(pdf_text_1, 'rb')}
payload = {'file': open(pdf_image_1, 'rb')}
# payload = {'file': open(page_1_jpg, 'rb')}
# r = await test_client.post(
# f"/api/v1/test",
# files=payload)
# print(r)
# assert r.status_code != HTTP_400_BAD_REQUEST
# payload = {'file': open(pdf_image_1, 'rb')}
r2 = await test_client.post(
f"/api/v1/pdf",
files=payload)
with:
class NewProduct(BaseNewModel):
id: PyObjectId = Field(default_factory=OID, alias="_id")
sku: str = Field(...)
description: str = Field(...)
hs_code: str = Field(...)
package_dimensions: str = Field(...)
product_weight_lbs: float = Field(default=0.00)
headers: [
{ text: "SKU", align: 'start', value: 'sku', width: '6%' },
{ text: "Description", value: 'description', align: 'start' },
{ text: "HS Code", value: 'hs_code', align: 'end', width: '6%' },
{ text: "Dimensions", value: 'package_dimensions', align: 'end' },
{ text: "Product Weight", value: 'product_weight_lbs', align: 'end' },
];
Serialize json datetime from a javascript frontend
https://github.com/markqiu/fastapi-mongodb-realworld-example-app/blob/master/app/models/dbmodel.py
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Schema
class DateTimeModelMixin(BaseModel):
created_at: Optional[datetime] = Schema(..., alias="createdAt")
updated_at: Optional[datetime] = Schema(..., alias="updatedAt")
class DBModelMixin(DateTimeModelMixin):
id: Optional[int] = None
Config Model
https://github.com/markqiu/fastapi-mongodb-realworld-example-app/blob/master/app/models/rwmodel.py
from datetime import datetime, timezone
from pydantic import BaseConfig, BaseModel
class RWModel(BaseModel):
class Config(BaseConfig):
allow_population_by_alias = True
json_encoders = {
datetime: lambda dt: dt.replace(tzinfo=timezone.utc)
.isoformat()
.replace("+00:00", "Z")
}
Setup below solves the following problems and corresponding errors.
Problems:
Errors in Google Cloud Run:
Uncaught signal: 6, pid=989, tid=989, fault_addr=0.
[2021-02-08 14:58:04 +0000] [2] [CRITICAL] WORKER TIMEOUT (pid:989)
refs:
https://docs.gunicorn.org/en/latest/settings.html#worker-tmp-dir
https://www.uvicorn.org/deployment/#running-from-the-command-line
https://cloud.google.com/run/docs/reference/container-contract?authuser=2#env-vars
https://fastapi.tiangolo.com/deployment/docker/#start-the-docker-container
https://github.com/anthcor/cloudrun-fastapi/blob/master/cloudbuild.yaml
Request Timeout:
Idle Instances and Minimizing Cold Start:
CPU:
Container Concurrency
Container autoscaling
Set minimum number of instances to > 0 to prevent shutdown when there are no requests.
# FROM python:3.7-alpine
# or
# FROM python:3.7-slim
# or
# FROM python:3.8-slim
# or
FROM python:3.9-slim
ENV APP_HOME /app
WORKDIR $APP_HOME
RUN apt-get update \
&& apt-get install -y gcc g++ openssl libxml2-dev libxslt-dev musl-dev libxslt1-dev libffi-dev zlib1g-dev libssl-dev \
&& apt-get install -y build-essential python3-dev python3-pip python3-setuptools python3-wheel python3-cffi libcairo2 libpango-1.0-0 libpangocairo-1.0-0 libgdk-pixbuf2.0-0 shared-mime-info
# WEASYPRINT dependancies
# sudo apt-get install build-essential python3-dev python3-pip python3-setuptools python3-wheel python3-cffi libcairo2 libpango-1.0-0 libpangocairo-1.0-0 libgdk-pixbuf2.0-0 libffi-dev shared-mime-info
COPY . ./
RUN /usr/local/bin/python -m pip install --upgrade pip
RUN pip install -r requirements.txt --no-cache-dir
RUN pip install -vvv uvloop
RUN spacy download en
RUN find /usr/local/lib/python3.9 -name '*.c' -delete \
&& find /usr/local/lib/python3.9 -name '*.pxd' -delete \
&& find /usr/local/lib/python3.9 -name '*.pyd' -delete \
&& find /usr/local/lib/python3.9 -name '__pycache__' | xargs rm -r
# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# CMD uvicorn main:app --host 0.0.0.0 --port 80
CMD gunicorn backend.main:app -c gunicorn_config.py
Another example.
FROM python:3.6.9-slim
Copy all the files to the src folder
COPY build/ /usr/src/
# Create the virtual environment
RUN python3 -m venv /usr/src/myapp_venv
# Install the requirements
RUN /usr/src/myapp_venv/bin/pip3 install -r /usr/src/requirements.txt
# Runs gunicorn
# --chdir sets the directory where gunicorn should look for the server files
# server:app means run the "server.py" file and look for the "app" constructor within that
ENTRYPOINT ["/usr/src/myapp/bin/gunicorn", "--bind", "0.0.0.0:5000", "--worker-tmp-dir", "/dev/shm", "--workers", "2", "--chdir", "/usr/src/", "server:app"]
Expose the gunicorn port
EXPOSE 5000
from os import environ
from multiprocessing import cpu_count
# https://pythonspeed.com/articles/gunicorn-in-docker/
workers = 4
# workers = cpu_count() * 2 + 1
threads = 2
# threads = (2 - 4) * workers
timeout = 60
bind = f":{environ.get('PORT', '8080')}"
# bind = "127.0.0.1:8000"
# https://docs.gunicorn.org/en/latest/faq.html#how-do-i-avoid-gunicorn-excessively-blocking-in-os-fchmod
worker_temp_dir = '/dev/shm'
worker_class = 'gthread'
worker_class = "uvicorn.workers.UvicornWorker"
~/gunicorn_config.py
"""gunicorn server configuration."""
import os
bind = f":{os.environ.get('PORT', '8080')}"
workers = 1
threads = 2
timeout = 30
worker_class = 'uvicorn.workers.UvicornWorker'
~/Dockerfile
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.7-slim-stretch
# FROM python:3.7-slim
# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./
RUN apt-get update \
&& apt-get install -y python3-dev gcc g++ build-essential libssl-dev libffi-dev openssl libxml2-dev libxslt-dev musl-dev python-dev python-pip libxml2-dev libxslt1-dev zlib1g-dev libffi-dev libssl-dev
RUN pip install -r requirements.txt
RUN find /usr/local/lib/python3.7 -name '*.c' -delete
RUN find /usr/local/lib/python3.7 -name '*.pxd' -delete
RUN find /usr/local/lib/python3.7 -name '*.pyd' -delete
RUN find /usr/local/lib/python3.7 -name '__pycache__' | xargs rm -r
# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
CMD gunicorn main:app -c gunicorn_config.py
or
With pipenv.
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7
RUN pip3 install pipenv
# -- Adding Pipfiles
COPY Pipfile Pipfile
COPY Pipfile.lock Pipfile.lock
# -- Install dependencies:
RUN set -ex && pipenv install --deploy --system
COPY ./app /app
~/docker-compose.yaml
fastapiapp:
image: example/app:localdev
build:
context: ./fastapiapp
dockerfile: ./docker/Dockerfile
ports:
- "80:80"
environment:
- GUNICORN_CMD_ARGS="--reload" # Don't include - doesn't work
# Use ONE of the two commands below to
# auto-reload when developing
# will track everything in sys.path and can use a lot of CPU
command: ["/start-reload.sh"]
# will only track files in /app directory
# (can add additional paths with additional reload-dir options)
command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--debug", "--port", "80", "--reload-dir", "/app"]
volumes:
- ./fastapiapp/app:/app