partial updates.
only send the fields they want to change
to your API and omit the ones that shouldn't change.This is the usual way of implementing a PATCH endpoint.
class PostBase(BaseModel):
title: str
content: str
class PostCreate(PostBase):
pass
class PostPublic(PostBase):
id: int
class PostDB(PostBase):
id: int
nb_views: int = 0
two arguments
:
id property (from the path)
Now for the interesting part: updating the existing object.
Transform PostPartialUpdate into a dictionary with the dict method.
Then, on our existing post_db database instance, we call the copy method.
This is a useful method to clone a Pydantic object into another instance.
The nice thing about this method is that it even accepts an update argument.
This argument expects a dictionary with all the fields that should be updated during the copy: that's exactly what we want to do with our updated_fields dictionary!
class PostPartialUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
@app.patch(
"/posts/{id}",
response_model=PostPublic)
async def partial_update(
id: int,
post_update: PostPartialUpdate
):
try:
post_db = db.posts[id]
updated_fields = post_update.dict(exclude_unset=True)
updated_post = post_db.copy(update=updated_fields)
db.posts[id] = updated_post
return updated_post
except KeyError:
raise HTTPException(status.HTTP_404_NOT_FOUND)
# dummy database
posts = {
1: Post(title="Hello", nb_views=100),
}
@app.put("/posts/{id}")
async def update_or_create_post(
id: int,
post: Post,
response: Response
):
if id not in posts:
response.status_code = status.HTTP_201_CREATED
posts[id] = post
return posts[id]
Setup below solves the following problems and corresponding errors.
Problems:
Errors in Google Cloud Run:
Uncaught signal: 6, pid=989, tid=989, fault_addr=0.
[2021-02-08 14:58:04 +0000] [2] [CRITICAL] WORKER TIMEOUT (pid:989)
refs:
https://docs.gunicorn.org/en/latest/settings.html#worker-tmp-dir
https://www.uvicorn.org/deployment/#running-from-the-command-line
https://cloud.google.com/run/docs/reference/container-contract?authuser=2#env-vars
https://fastapi.tiangolo.com/deployment/docker/#start-the-docker-container
https://github.com/anthcor/cloudrun-fastapi/blob/master/cloudbuild.yaml
Request Timeout:
Idle Instances and Minimizing Cold Start:
CPU:
Container Concurrency
Container autoscaling
Set minimum number of instances to > 0 to prevent shutdown when there are no requests.
# FROM python:3.7-alpine
# or
# FROM python:3.7-slim
# or
# FROM python:3.8-slim
# or
FROM python:3.9-slim
ENV APP_HOME /app
WORKDIR $APP_HOME
RUN apt-get update \
&& apt-get install -y gcc g++ openssl libxml2-dev libxslt-dev musl-dev libxslt1-dev libffi-dev zlib1g-dev libssl-dev \
&& apt-get install -y build-essential python3-dev python3-pip python3-setuptools python3-wheel python3-cffi libcairo2 libpango-1.0-0 libpangocairo-1.0-0 libgdk-pixbuf2.0-0 shared-mime-info
# WEASYPRINT dependancies
# sudo apt-get install build-essential python3-dev python3-pip python3-setuptools python3-wheel python3-cffi libcairo2 libpango-1.0-0 libpangocairo-1.0-0 libgdk-pixbuf2.0-0 libffi-dev shared-mime-info
COPY . ./
RUN /usr/local/bin/python -m pip install --upgrade pip
RUN pip install -r requirements.txt --no-cache-dir
RUN pip install -vvv uvloop
RUN spacy download en
RUN find /usr/local/lib/python3.9 -name '*.c' -delete \
&& find /usr/local/lib/python3.9 -name '*.pxd' -delete \
&& find /usr/local/lib/python3.9 -name '*.pyd' -delete \
&& find /usr/local/lib/python3.9 -name '__pycache__' | xargs rm -r
# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# CMD uvicorn main:app --host 0.0.0.0 --port 80
CMD gunicorn backend.main:app -c gunicorn_config.py
Another example.
FROM python:3.6.9-slim
Copy all the files to the src folder
COPY build/ /usr/src/
# Create the virtual environment
RUN python3 -m venv /usr/src/myapp_venv
# Install the requirements
RUN /usr/src/myapp_venv/bin/pip3 install -r /usr/src/requirements.txt
# Runs gunicorn
# --chdir sets the directory where gunicorn should look for the server files
# server:app means run the "server.py" file and look for the "app" constructor within that
ENTRYPOINT ["/usr/src/myapp/bin/gunicorn", "--bind", "0.0.0.0:5000", "--worker-tmp-dir", "/dev/shm", "--workers", "2", "--chdir", "/usr/src/", "server:app"]
Expose the gunicorn port
EXPOSE 5000
from os import environ
from multiprocessing import cpu_count
# https://pythonspeed.com/articles/gunicorn-in-docker/
workers = 4
# workers = cpu_count() * 2 + 1
threads = 2
# threads = (2 - 4) * workers
timeout = 60
bind = f":{environ.get('PORT', '8080')}"
# bind = "127.0.0.1:8000"
# https://docs.gunicorn.org/en/latest/faq.html#how-do-i-avoid-gunicorn-excessively-blocking-in-os-fchmod
worker_temp_dir = '/dev/shm'
worker_class = 'gthread'
worker_class = "uvicorn.workers.UvicornWorker"
~/gunicorn_config.py
"""gunicorn server configuration."""
import os
bind = f":{os.environ.get('PORT', '8080')}"
workers = 1
threads = 2
timeout = 30
worker_class = 'uvicorn.workers.UvicornWorker'
~/Dockerfile
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.7-slim-stretch
# FROM python:3.7-slim
# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./
RUN apt-get update \
&& apt-get install -y python3-dev gcc g++ build-essential libssl-dev libffi-dev openssl libxml2-dev libxslt-dev musl-dev python-dev python-pip libxml2-dev libxslt1-dev zlib1g-dev libffi-dev libssl-dev
RUN pip install -r requirements.txt
RUN find /usr/local/lib/python3.7 -name '*.c' -delete
RUN find /usr/local/lib/python3.7 -name '*.pxd' -delete
RUN find /usr/local/lib/python3.7 -name '*.pyd' -delete
RUN find /usr/local/lib/python3.7 -name '__pycache__' | xargs rm -r
# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
CMD gunicorn main:app -c gunicorn_config.py
or
With pipenv.
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.7
RUN pip3 install pipenv
# -- Adding Pipfiles
COPY Pipfile Pipfile
COPY Pipfile.lock Pipfile.lock
# -- Install dependencies:
RUN set -ex && pipenv install --deploy --system
COPY ./app /app
~/docker-compose.yaml
fastapiapp:
image: example/app:localdev
build:
context: ./fastapiapp
dockerfile: ./docker/Dockerfile
ports:
- "80:80"
environment:
- GUNICORN_CMD_ARGS="--reload" # Don't include - doesn't work
# Use ONE of the two commands below to
# auto-reload when developing
# will track everything in sys.path and can use a lot of CPU
command: ["/start-reload.sh"]
# will only track files in /app directory
# (can add additional paths with additional reload-dir options)
command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--debug", "--port", "80", "--reload-dir", "/app"]
volumes:
- ./fastapiapp/app:/app
from fastapi import FastAPI, WebSocket, Request
from fastapi.responses import HTMLResponse
from BybitWebsocket import BybitWebsocket
import bybit
import os
from collections import deque
from sse_starlette.sse import EventSourceResponse
import asyncio
key = os.environ.get('API_PRIVATE_KEY')
key_secret = os.environ.get('PRIVATE_KEY')
ws = BybitWebsocket(wsURL="wss://stream.bybit.com/realtime", api_key=key, api_secret=key_secret)
ws.subscribe_orderBookL2("BTCUSD")
app = FastAPI()
html_bybit = """
<!DOCTYPE html>
<html>
<head>
<style>
* {
background-color: black;
color: white;
}
</style>
<title>BTC/USD</title>
</head>
<body>
<ul id='messages'>
</ul>
<script>
const evtSource = new EventSource("http://localhost:8000/status/stream");
evtSource.addEventListener("update", function(event) {
// Logic to handle status updates
console.log(event)
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
});
//not implemented
evtSource.addEventListener("end", function(event) {
console.log('Handling end....')
evtSource.close();
});
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html_bybit)
async def stream_gen_deck(request):
deck = deque('0')
while True:
if await request.is_disconnected():
print('disconnected')
# logger.debug('Request disconnected')
break
d = ws.get_data("orderBookL2_25.BTCUSD")
if not isinstance(d, list):
for p in d['update']:
if len(deck) < 10:
deck.appendleft(p['price'])
else:
deck.pop()
await asyncio.sleep(1)
yield {'event': 'update',
'data': deck[0]}
@app.get('/status/stream')
async def runStatus(request: Request):
event_generator = stream_gen_deck(request)
return EventSourceResponse(event_generator)
@router.post("/order_checkout")
def checkout_order(
background_tasks: BackgroundTasks,
cart: pm.GetCheckout,
current_user: User = Depends(fastapi_users.get_current_active_user)
) -> JSONResponse:
try:
cart_items = cart.cart_items
ship_addr = cart_items['shipping_address']
shipto_id = ship_addr['_id']['$oid']
shipto = MShipTo.objects.get(id=shipto_id)
for item in cart_items['cart']:
order = MOrder(shipto_ref=[shipto])
order.quantity = int(item['quantity'])
order.price = float(item['price'])
product_ref = Products.objects.get(sku=item['product'])
order.product_ref = [product_ref]
order.save()
except Exception as e:
print(e)
try:
items = ''.join([f"<li>{i['product']}</li>" for i in cart_items['cart']])
html_email = Utils.html_email(items)
recipients = notification_email_list(current_user.email)
print('recipients: ', recipients)
message = MessageSchema(subject=settings.MAIL_QUE_SUBJECT,
recipients=recipients,
body=html_email,
subtype="html")
fm = FastMail(Utils.conf)
background_tasks.add_task(fm.send_message, message)
except Exception as e:
print(e)
finally:
return JSONResponse(status_code=200, content={"message": "email has been sent"})
@router.post("/order_checkout")
async def checkout_order(
cart: pm.GetCheckout,
current_user: User = Depends(fastapi_users.get_current_active_user)
) -> JSONResponse:
try:
cart_items = cart.cart_items
ship_addr = cart_items['shipping_address']
shipto_id = ship_addr['_id']['$oid']
shipto = MShipTo.objects.get(id=shipto_id)
for item in cart_items['cart']:
order = MOrder(shipto_ref=[shipto])
order.quantity = int(item['quantity'])
print('len', len(item['product']))
product_ref = Products.objects.get(sku=item['product'])
order.product_ref = [product_ref]
order.save()
except Exception as e:
print(e)
try:
items = ''.join([f"<li>{i['product']} -- x{i['quantity']}</li>" for i in cart_items['cart']])
html_email = Utils.html_email(items)
recipients = notification_email_list(current_user.email)
print('recipients: ', recipients)
message = MessageSchema(subject=settings.MAIL_QUE_SUBJECT,
recipients=recipients,
body=html_email,
subtype="html")
fm = FastMail(Utils.conf)
await fm.send_message(message)
except Exception as e:
print(e)
finally:
return JSONResponse(status_code=200, content={"message": "email has been sent"})
Context:
Lets say you need an endpoint to return a file, to serve as a report for some data in your database.
There are 2x ways your probably going to call this endpoint.
Different response types should be used, depending on the use case.
Example 1 - Non - Client Side:
Actions:
Upload a file to an endpoint that expects UploadFile
.
Process that file.
Convert the now processed file to bytes.
Return file in a response.
This is our endpoint to receive the file.
@app.post("/file", response_class=Response)
async def detect_image(file: UploadFile):
try:
# process image
detect = DetectDocument.from_upload_file(file)
edged = detect.preprocess_image()
np_arr = detect.detect_edges(edged)
# get extension of file.
ext = content_type.split('/')[-1]
is_success, buffer = cv2.imencode(f".{ext}", np_arr)
if is_success:
view = buffer.tobytes()
headers = {'Content-Disposition': f'inline; filename="{file.filename}"'}
resp = Response(view, headers=headers, media_type=file.content_type)
logger.info(f'Headers: {resp.headers}')
logger.info(f'Success!!')
return resp
except Exception as err:
err_msg = f'Error: {err}'
logger.error(err_msg)
raise HTTPException(HTTP_400_BAD_REQUEST, detail=err_msg)
img = 'tests/cam7.jpg'
@pytest.mark.asyncio
class TestFile:
async def test_upload_file(self, conf, test_client: AsyncClient):
payload = {'file': open(img, 'rb')}
r = await test_client.post("/file", files=payload)
assert r.status_code == HTTP_200_OK
assert len(r.content) == int(r.headers['content-length'])
assert 'Content-Disposition' in r.headers
img_name = img.split('/')[-1]
with open(f"save_{img_name}", 'wb') as f:
f.write(r.content)
f.close()
Headers:
To indicate to the browser that the file should be viewed in the browser, the HTTP response should include these headers:
Content-Type: application/pdf
Content-Disposition: inline; filename="filename.pdf"
To have the file downloaded rather than viewed:
Content-Type: application/pdf
Content-Disposition: attachment; filename="filename.pdf"
The quotes around the filename are required if the filename contains special characters such as filename[1].pdf which may otherwise break the browser's ability to handle the response.
How you set the HTTP response headers will depend on your HTTP server (or, if you are generating the PDF response from server-side code: your server-side programming language).
To force download:
Content-Type: application/octet-stream.
@router.post("/uploadform")
async def upload_form(
file: UploadFile = File(...),
sku: str = Form(...)
):
try:
blob = bucket.blob(os.path.join('product_images', file.filename))
file_string = file.file.read()
blob.upload_from_string(file_string, content_type=file.content_type)
url = blob.public_url
Products.objects(sku=sku).update(gcs_img_url=url)
except Exception as e:
print('EXCEPTION: /uploadform')
raise HTTPException(status_code=400, detail=e)
return JSONResponse(status_code=200,
content={"message": f'{file.filename}'})
@router.post("/uploadform")
async def upload_form(
file: UploadFile = File(...),
sku: str = Form(...)
):
blob = bucket.blob(os.path.join('product_images', file.filename))
f = file.file.read()
blob.upload_from_string(f, content_type=file.content_type)
url = blob.public_url
Products.objects(sku=sku).update(gcs_img_url=url)
return ''
# https://fastapi.tiangolo.com/tutorial/request-files/#import-file
@router.post("/uploadfile")
async def create_upload_file(
file: UploadFile = File(...),
current_user: User = Depends(fastapi_users.get_current_active_user)
) -> Dict:
image_url = upload_image_file(
file,
file.content_type,
'product_images',
file.filename)
product = Products.objects.get(id=product_sku)
return {"filename": file.filename}
Stream file upload and read into memory
async def get_aws_textract_or_404(
file: UploadFile = File(...)
) -> StreamingResponse:
print('content type: ', file.content_type)
if file.content_type:
textract = from_upload_file(await file.read())
key_map, value_map, block_map = textract.get_map()
kvr = get_dict(key_map, value_map, block_map)
df = pd.DataFrame.from_dict(kvr, orient='index')
resp = StreamingResponse(StringIO(df.to_csv()), media_type="text/csv")
resp.headers["Content-Disposition"] = f"attachment; filename=export-{dt.now().strftime('%Y-%m-%d %H:%M')}.csv"
return resp
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
file_path = os.path.join(basedir, f'static/report_styles/{file_name}')
df.to_csv(
file_path,
index=False,
header=False,
decimal='.',
sep=',',
float_format='%.2f')
return FileResponse(path=file_path,
filename=file_name,
headers={f"Content-Disposition": "attachment; filename={file_name}",
'Content-Type': "application/octet-stream"}
)
import graphene
from graphene.relay import Node
from starlette.graphql import GraphQLApp
from ..models import User as UserModel
from ..models import Post as PostModel
from models import UserInfo, Blog
from graphene_mongo import MongoengineConnectionField, MongoengineObjectType
from graphene_sqlalchemy import SQLAlchemyObjectType
from werkzeug.security import (
generate_password_hash,
check_password_hash
)
from json import dumps, loads
from fastapi.responses import RedirectResponse
from fastapi import APIRouter, Request, Depends, BackgroundTasks, HTTPException, status
import bcrypt
import jwt
from datetime import timedelta, datetime
from pydantic import BaseModel
from datetime import timedelta
from graphql import GraphQLError
from bson import ObjectId
secret_key = "09ww11wwqqaa6cac818166b7a9563b93f70233,lf,l;sd,fsf7"
algorithm = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
mutation AuthenUser {
authenUser(username: "tests", password: "passwords") {
token
}
}
class User(MongoengineObjectType):
class Meta:
model = UserModel
interfaces = (Node,)
class Post(MongoengineObjectType):
class Meta:
model = PostModel
interfaces = (Node,)
class Comment(MongoengineObjectType):
class Meta:
model = CommentModel
interfaces = (Node,)
class UserInfoSchema(MongoengineObjectType):
class Meta:
model = UserModel
interfaces = (Node,)
class Query(graphene.ObjectType):
node = Node.Field()
users = graphene.List(User)
all_users = MongoengineConnectionField(User)
all_posts = MongoengineConnectionField(Post)
# all_comment = MongoengineConnectionField(Comment)
# get_post_field = graphene.Field(Post)
# get_user_name = graphene.String(name=graphene.String(default_value="stranger"))
# get_user_email = graphene.String(email=graphene.String(default_value="stranger"))
get_user_name = graphene.Field(User, name=graphene.String())
get_user_email = graphene.Field(User, email=graphene.String())
get_user_post = graphene.Field(Post, id=graphene.String())
get_user_login = graphene.Field(User,
email=graphene.String(),
password=graphene.String())
def resolve_get_user_login(self, info, email, password):
user = UserModel.objects(email=email).first()
if user and user.check_password(password):
return user
else:
return 'failed'
def resolve_get_user_name(self, info, name):
return UserModel.objects(name=name).first()
def resolve_get_user_email(self, info, email):
return UserModel.objects(email=email).first()
def resolve_get_user_post(self, info, id):
return PostModel.objects(id=ObjectId(id)).first()
def create_user(db: Session, user: schemas.UserCreate):
hashed_password = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt())
db_user = models.UserInfo(username=user.username, password=hashed_password, fullname=user.fullname)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def check_username_password(db: Session, user: schemas.UserAuthenticate):
db_user_info: models.UserInfo = get_user_by_username(db, username=user.username)
return bcrypt.checkpw(user.password.encode('utf-8'), db_user_info.password.encode('utf-8'))
def decode_access_token(*, data: str):
to_decode = data
return jwt.decode(to_decode, secret_key, algorithm=algorithm)
class UserInfoBase(BaseModel):
username: str
class UserCreate(UserInfoBase):
password: str
# class UserCreate(UserInfoBase):
# fullname: str
# password: str
# class Config:
# orm_mode = True
class UserAuthenticate(BaseModel):
email: str
password: str
class UserAuthenticate(UserInfoBase):
password: str
class UserInformation(UserInfoBase):
id: int
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str = None
class BlogBase(BaseModel):
title: str
content: str
class BlogInformation(BlogBase):
id: int
class Config:
orm_mode = True
class UserInfo(Base):
__tablename__ = "user_info"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True)
password = Column(String)
fullname = Column(String, unique=True)
class Blog(Base):
__tablename__ = "blog"
id = Column(Integer, primary_key=True, index=True)
title = Column(String)
content = Column(String)
class User(MongoengineObjectType):
class Meta:
model = UserModel
interfaces = (Node,)
class UserInfoSchema(SQLAlchemyObjectType):
class Meta:
model = UserInfo
class BlogSchema(SQLAlchemyObjectType):
class Meta:
model = Blog
def get_user_by_username(db: Session, username: str):
return db.query(models.UserInfo).filter(models.UserInfo.username == username).first()
# class CreateUser(graphene.Mutation):
class SignupUser(graphene.Mutation):
class Arguments:
username = graphene.String(required=True)
password = graphene.String(required=True)
# fullname = graphene.String()
ok = graphene.Boolean()
# user = graphene.Field(lambda: UserInfoSchema)
user = graphene.Field(lambda: User)
@staticmethod
def mutate(root, info, username, password):
# def mutate(root, info, username, password, fullname, ):
hashed_password = bcrypt.hashpw(password.encode('utf-8'),
bcrypt.gensalt())
# user = UserInfoSchema(username=username, password=hashed_password, fullname=fullname)
# user = UserModel(email=username,
# password=hashed_password).save()
ok = True
# db_user = crud.get_user_by_username(db, username=username)
db_user = UserModel.objects(email=username).first()
if db_user:
raise GraphQLError("Username already registered")
# user_info = UserCreate(username=username, password=password, fullname=fullname)
# crud.create_user(db, user_info)
# return CreateUser(user=user, ok=ok)
user = UserModel(email=username,
password=str(hashed_password)).save()
# crud.cre(db, user_info)
# return CreateUser(user=user, ok=ok)
return SignupUser(user=user, ok=ok)
def create_access_token(*, data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm)
return encoded_jwt
class LoginUser(graphene.Mutation):
# class AuthenUser(graphene.Mutation):
class Arguments:
username = graphene.String(required=True)
password = graphene.String(required=True)
token = graphene.String()
@staticmethod
def mutate(root, info, username, password):
# only search for user with email.
db_user = UserModel.objects(email=username).first()
# db_user = crud.get_user_by_username(db, username=username)
# what is the point of this?
# user_authenticate = UserAuthenticate(email=username,
# password=password)
if db_user is None:
raise GraphQLError("Username not existed")
else:
is_password_correct = db_user.check_password(password)# return True
# is_password_correct = crud.check_username_password(db, user_authenticate)
if is_password_correct is False:
raise GraphQLError("Incorrect Password.")
else:
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": username},
expires_delta=access_token_expires)
# return AuthenUser(token=access_token)
return LoginUser(token=access_token)
class AuthMutations(graphene.ObjectType):
# class MyMutations(graphene.ObjectType):
# user = CreateUser.Field()
signup_user = SignupUser.Field()
login_user = LoginUser.Field()
# authen_user = AuthenUser.Field()
# create_new_blog = CreateNewBlog.Field()
router = APIRouter()
router.add_route("/",
GraphQLApp(schema=graphene.Schema(query=Query,
types=[User, Post],
mutation=AuthMutations
)
)
)
async def read_upload_image(
file: UploadFile = File(...)):
doc = Document(BytesIO(await file.read()))
pdfimages_out = RawImageOutput(doc)
print(pdfimages_out.get(0))
img = pdfimages_out.get(0)
fp = STATIC_DIR + '/' + IMG_FILE
img.save(fp)
time.sleep(2)
os.remove(fp)
with tempfile.TemporaryFile() as fp:
fp.write(img)
img.save(fp.seek(0))
fp.seek(0)
f = fp.read()
print(f)
return
class Faces(BaseModel):
faces: List[Tuple[int, int, int, int]]
async def read_upload_image(
image: UploadFile = File(...)) -> Faces:
data = np.fromfile(image, dtype=np.uint8)
image = cv2.imdecode(data, cv2.IMREAD_UNCHANGED)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
return data
def predict(file: UploadFile = File(...)):
content = file.file.read()
image = Image.open(io.BytesIO(content)).convert('L')
output = model.predict_from_image(image)
return output
from PIL import Image, ImageFilter
import io
def apply_filter (file: object, filter: str) -> object:
"""
TODO:
1. Accept the image as file object, and the filter type as string
2. Open the as an PIL Image object
3. Apply the filter
4. Convert the PIL Image object to file object
5. Return the file object
"""
image = Image.open(file)
image = image.filter(eval(f"ImageFilter. {filter.upper()} " ))
file = io.BytesIO()
image.save(file, "JPEG")
file.seek(0)
return file
Convert AWS Textract result to csv file in memory and return as stream.
async def get_aws_textract_or_404(
file: UploadFile = File(...)) -> StreamingResponse:
if file.content_type:
textract = AWSTextract.from_upload_file(await file.read())
textract.aws_region_name = settings.AWS_REGION
key_map, value_map, block_map = textract.get_kv_map()
kvr = textract.get_kv_relationship(key_map, value_map, block_map)
df = pd.DataFrame.from_dict(kvr, orient='index')
resp = StreamingResponse(StringIO(df.to_csv()),
media_type=settings.CSV_MEDIA_TYPE)
resp.headers["Content-Disposition"] = settings.CSV_RESULT_FILENAME
return resp
raise HTTPException(status_code=HTTP_404_NOT_FOUND)
with pytest-cov
Multiple file upload
async def test_multi_file_base(
self,
page_1_jpg,
page_1_pdf,
test_client: AsyncClient):
f1 = {
"file_list": ("file_1", open(page_1_jpg, 'rb'), "multipart/form-data"),
"file_list": ("file_2", open(page_1_pdf, 'rb'), "multipart/form-data")
}
resp = await test_client.post(
"/api/v1/ocr/multiple",
files=f1)
assert resp.status_code == HTTP_200_OK
files = [
('images', ('foo.png', open('foo.png', 'rb'), 'image/png')),
('images', ('bar.png', open('bar.png', 'rb'), 'image/png'))
]
r = httpx.post("https://httpbin.org/post", files=files)
@pytest.mark.asyncio
class TestAWSTextract:
async def test_textract_jpg_upload(
self,
page_1_jpg,
conf,
test_client: AsyncClient):
payload = {'file': open(page_1_jpg, 'rb')}
r = await test_client.post(
"/api/v1/image",
files=payload
)
assert r.status_code == HTTP_200_OK
assert 'Content-Disposition' in r.headers
assert r.headers['Content-Disposition'] == conf.CSV_RESULT_FILENAME
@pytest.mark.asyncio
class TestTest:
async def test_pdf_with_image(
self,
pdf_text_1,
pdf_image_1,
page_1_jpg,
conf,
test_client: AsyncClient):
# payload = {'file': open(pdf_text_1, 'rb')}
payload = {'file': open(pdf_image_1, 'rb')}
# payload = {'file': open(page_1_jpg, 'rb')}
# r = await test_client.post(
# f"/api/v1/test",
# files=payload)
# print(r)
# assert r.status_code != HTTP_400_BAD_REQUEST
# payload = {'file': open(pdf_image_1, 'rb')}
r2 = await test_client.post(
f"/api/v1/pdf",
files=payload)