Working with files in a FastAPI application.
from pydantic import BaseModel
from typing import List, Tuple
import numpy as np
import cv2
from fastapi import UploadFile
class Faces(BaseModel):
faces: List[Tuple[int, int, int, int]]
async def read_opencv(
image: UploadFile = File(...)
) -> Faces:
data = np.fromfile(image.file, dtype=np.uint8)
image = cv2.imdecode(data, cv2.IMREAD_UNCHANGED)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
return gray
class FileResponseParsers:
def save_local_V1(self, doc):
cv2.imwrite(self.output_file, doc)
def save_V4(self, content_type, arr):
im = PILImage.fromarray(arr)
buf = io.BytesIO()
im.save(buf, format='PNG')
buf.seek(0)
return StreamingResponse(buf, media_type="image/png")
def save_V3(self, content_type, img):
ext = content_type.split('/')[-1]
im = PILImage.fromarray(img)
with io.BytesIO() as buf:# save image to an in-memory bytes buffer
im.save(buf, format=ext)
im_bytes = buf.getvalue()
buf.close()
return im_bytes
def save_V2(self, content_type, img):
from imageio import v3 as iio
ext = content_type.split('/')[-1]
with io.BytesIO() as buf:
iio.imwrite(buf, img, plugin="pillow", format=ext)
im_bytes = buf.getvalue()
buf.close()
return im_bytes
def save_V1(self, content_type, arr):
ext = content_type.split('/')[-1]
is_success, buffer = cv2.imencode(f".{ext}", arr)
if is_success:
return buffer.tobytes()
raise Exception('File Error: np.array could not be encoded')
https://stackoverflow.com/questions/2104080/how-do-i-check-file-size-in-python
import os
os.path.getsize("/path/to/file.mp3")
You need the st_size property of the object returned by os.stat. You can get it by either using pathlib (Python 3.4+):
>>> from pathlib import Path
>>> Path('somefile.txt').stat()
os.stat_result(st_mode=33188, st_ino=6419862, st_dev=16777220, st_nlink=1, st_uid=501, st_gid=20, st_size=1564, st_atime=1584299303, st_mtime=1584299400, st_ctime=1584299400)
>>> Path('somefile.txt').stat().st_size
1564
or
>> import os
>>> os.stat('somefile.txt')
os.stat_result(st_mode=33188, st_ino=6419862, st_dev=16777220, st_nlink=1, st_uid=501, st_gid=20, st_size=1564, st_atime=1584299303, st_mtime=1584299400, st_ctime=1584299400)
>>> os.stat('somefile.txt').st_size
1564
File Like Object
# f is a file-like object.
f.seek(0, os.SEEK_END)
size = f.tell()
tar single file.
import tarfile
tar = tarfile.open("largefile.tar", "w")
tar.add("largeFile.txt")
tar.close()
tar directory
import tarfile
tar = tarfile.open("temp.tar", "w")
import os
for root, dir, files in os.walk("/tmp"):
for file in filenames:
for root, dir, files in os.walk("/tmp"):
for file in files:
fullpath = os.path.join(root, file)
tar.add(fullpath)
tar.close()
read contenct of tar file
import tarfile
tar = tarfile.open("temp.tar","r")
tar.list()
tar.name
tar.getnames()
tar.members
w|bz2 is small but slower
tar = tarfile.open("largefilecompressed.tar.bzip2", "w|bz2")
tar.add("largeFile.txt")
w|gz is bigger but faster
tar = tarfile.open("largefile.tar.gzip", "w|gz")
tar.add("largeFile.txt")
tar.close()
Example with zip and tempfile.
from PIL import Image as PILImage, ExifTags
from zipfile import ZipFile
import tempfile
async def translate_compressed_file(
self,
from_lang: TranslatorLangCodes,
to_lang: TranslatorLangCodes,
file = Depends(upload_file)
) -> List[str]:
lst = []
with tempfile.NamedTemporaryFile(suffix='.zip') as temp:
temp.write(await file.read())
with ZipFile(temp.name) as zp:
file_list = zp.namelist()
for file_name in file_list:
with zp.open(file_name) as z_file:
text = await ocr_from_file(z_file.read(), lang=TessLangCodes[from_lang])
lst.append(text)
z_file.close()
zp.close()
temp.close()
return lst
class FileAutomation:
def walk_files(self, root_dir: Path, ends_with: str):
assert '.' in ends_with, 'File extension must include .'
try:
doc_id = 0
for root, dirs, files in os.walk(root_dir, topdown=False):
for name in files:
if name.endswith(ends_with):
full_file_path = os.path.join(root, name)
# f_name, f_ext = os.path.splitext(os.path.basename(full_file_path))
util_id = root.split('/')[-1]
doc_id += 1
yield (full_file_path, util_id, doc_id)
except Exception as err:
print(err)
def walk_pdf(self, root_dir, dpi: int = 300):
'''https://pymupdf.readthedocs.io/en/1.19.3/faq.html'''
try:
for root, dirs, files in os.walk(root_dir, topdown=False):
for name in files:
if name.endswith('.pdf'):
pdf_file_path = os.path.join(root, name)
f_name, f_ext = os.path.splitext(os.path.basename(pdf_file_path))
# print(f_name, '--', f_ext)
with OpenContextManager(pdf_file_path, mode='rb') as pdf_b:
pdf_bytes = pdf_b.read()
pdf_doc = fitz.open('pdf', pdf_bytes)
for page in pdf_doc:
pix = page.get_pixmap(dpi=dpi)
print(pix.width, pix.height)
save_path = f"{root}/{f_name}-page-{page.number}.png"
pix.pil_save(
save_path,
format='PNG',
optimize=True,
dpi=(dpi,dpi))
except Exception as err:
print(err)
def walk_analze_png(self, root_dir):
try:
for root, dirs, files in os.walk(root_dir, topdown=False):
for name in files:
if name.endswith('.png'):
png_file_path = os.path.join(root, name)
f_name, f_ext = os.path.splitext(os.path.basename(png_file_path))
if f_name.split('-')[-1] not in ['textract_table_form', 'textract_key_value']:
try:
img = aws.draw_doc(png_file_path)
img.save(f'{root}/{f_name}-textract_table_form.png')
except Exception as e:
print(e)
continue
except Exception as err:
print(err)
import json
from fastapi.encoder import jsonable_encoder
class BaseData:
def write_json(self, json_file, data):
try:
path = self.data_dir + "/" + json_file
json_object = json.dumps(jsonable_encoder(data), indent=4)
with open(path, "w") as outfile:
outfile.write(json_object)
outfile.close()
except Exception as e:
print(e)
def read_json(self, json_file):
path = self.data_dir + "/" + json_file
with open(path, "r") as f:
data = json.load(f)
f.close()
return data
def image_decoder(image):
image = base64.b64encode(open(image, 'rb').read())
image_decoded = image.decode()
return 'data:image/png;base64,{}'.format(image_decoded)
def get_base64_encoded_image(image_path):
with open(image_path, "rb") as img_file:
encoded = base64.b64encode(img_file.read()).decode('utf-8')
img_file.close()
return encoded
def test_img():
app = create_app()
app_cont = app.app_context().push()
u = User.objects(email='asdasfasfa@gmail.com').first()
img = u.image_field.read()
encoded_img = base64.b64decode(img)
try:
image_file = ImageLoad.loadImage(str(encoded_img))
print(image_file)
except ImageLoadError as e:
print(e.reason)
print(es)
SQLite 3:
table_name = 'twitter'
table_columns_and_types = '(unix real, tweet text, retweet text, hashtags blob)'
column_names = '(unix, tweet, retweet, hashtags)'
values = '(?, ?, ?, ?)'
db_name = 'twitter_db_class1.db'
class TwitterSqlite:
def __init__(self, table_name: str, table_columns_and_types: str, column_names: str, values: str, db_name: str):
self.table_name = table_name
self.table_columns_and_types = table_columns_and_types
self.column_names = column_names
self.values = values
self.db_name = db_name
@contextmanager
def _temptable(self, cur):
cur.execute(f'''create table if not exists {self.table_name} {self.table_columns_and_types}''')
try:
print('Setup action and yield created table')
yield
finally:
print('Tare down action')
def sqlite_db(self, *args):
with connect(self.db_name) as conn:
cur = conn.cursor()
with self._temptable(cur):
cur.execute(f'''insert into {self.table_name} {self.column_names} values{self.values}''', (args))
for row in cur.execute(f'''select * from {self.table_name}'''):
print(row)
File Like Object Managers:
With contectlib.
from contextlib import contextmanager
import io
class ContextManagers(object):
@contextmanager
def file_manager(self, name, mode):
try:
_file = open(name, mode)
print('The file is opened.')
yield _file
finally:
print('The file will be closed now.')
_file.close()
@contextmanager
def buffer_manager(self):
try:
buf = io.BytesIO()
print('The file is opened.')
yield buf
finally:
print('The file will be closed now.')
buf.close()
With underscore methods.
class BytesIOManager:
def __init__(self):
self._buf = io.BytesIO()
def __enter__(self):
print('The context manager setup...')
return self._buf
def __exit__(self, type, value, traceback):
print('Context manager tear down...')
self._buf.close()
# return True # no error propogation
return False# error propogation
class BaseManager:
def __init__(self, file_path):
self.__path = file_path
self.__file_object = None
def __enter__(self):
self.__file_object = open(self.__path)
return self
def __exit__(self, type, val, tb):
self.__file_object.close()
PNG Methods
class PngReader:
# Every .png file contains this in the header. Use it to verify
# the file is indeed a .png.
_expected_magic = b'\x89PNG\r\n\x1a\n'
def __init__(self, file_path):
# Ensure the file has the right extension
if not file_path.endswith('.png'):
raise NameError("File must be a '.png' extension")
self.__path = file_path
self.__file_object = None
def __enter__(self):
self.__file_object = open(self.__path, 'rb')
magic = self.__file_object.read(8)
if magic != self._expected_magic:
raise TypeError("The File is not a properly formatted .png file!")
return self
def __exit__(self, type, val, tb):
self.__file_object.close()
def __iter__(self):
# This and __next__() are used to create a custom iterator
# See https://dbader.org/blog/python-iterators
return self
def __next__(self):
# Read the file in "Chunks"
# See https://en.wikipedia.org/wiki/Portable_Network_Graphics#%22Chunks%22_within_the_file
initial_data = self.__file_object.read(4)
# The file hasn't been opened or reached EOF. This means we
# can't go any further so stop the iteration by raising the
# StopIteration.
if self.__file_object is None or initial_data == b'':
raise StopIteration
else:
# Each chunk has a len, type, data (based on len) and crc
# Grab these values and return them as a tuple
chunk_len = int.from_bytes(initial_data, byteorder='big')
chunk_type = self.__file_object.read(4)
chunk_data = self.__file_object.read(chunk_len)
chunk_crc = self.__file_object.read(4)
return chunk_len, chunk_type, chunk_data, chunk_crc
import yaml
from yaml import Loader
class YamlIO:
def yaml_config(self, path):
res = {}
with open(path, 'r') as stream:
try:
res = yaml.load(stream, Loader=Loader)
except yaml.YAMLError as exc:
print(exc)
exit(1)
og = res['og']
bwlwgp = res['bwlwgp']
return og, bwlwgp
def yaml_config_test(self, path):
res = {}
with open(path, 'r') as stream:
try:
res = yaml.load(stream, Loader=Loader)
except yaml.YAMLError as exc:
print(exc)
exit(1)
return res
import os
import PyPDF2
from PyPDF2 import PdfFileWriter
from PyPDF2 import PdfFileReader
import pandas as pd
os.chdir(r'')
with open('fileinput.pdf', 'rb') as pdf_file, open('fileoutput.txt', 'w') as text_file:
read_pdf = PyPDF2.PdfFileReader(pdf_file)
number_of_pages = read_pdf.getNumPages()
for page_number in range(number_of_pages): # use xrange in Py2
page = read_pdf.getPage(page_number)
page_content = page.extractText()
text_file.write(page_content)
def merge_pdfs():
pdfFiles = []
for filename in os.listdir('.'):
if filename.endswith('.pdf'):
pdfFiles.append(filename)
pdfFiles.sort()
print(pdfFiles)
pdfWriter = PyPDF2.PdfFileWriter()
# Loop through all the PDF files.
try:
for filename in pdfFiles:
pdfFileObj = open(filename, 'rb')
pdfReader = PyPDF2.PdfFileReader(pdfFileObj)
# Loop through all the pages (except the first) and add them.
for pageNum in range(0, pdfReader.numPages):
pageObj = pdfReader.getPage(pageNum)
pdfWriter.addPage(pageObj)
except BaseException as e:
print('did not work, ', str(e))
# Save the resulting PDF to a file.
pdfOutput = open('yaaassss.pdf', 'wb')
pdfWriter.write(pdfOutput)
pdfOutput.close()
def merge_pdf_pages():
# Get all the PDF filenames.
pdfFiles = []
for filename in os.listdir('.'):
if filename.endswith('.pdf'):
pdfFiles.append(filename)
pdfFiles.sort(key=str.lower)
pdfWriter = PyPDF2.PdfFileWriter()
def extract_text_from_pdf():
pdf = open('total_of_these.pdf', 'rb')
reader = PyPDF2.PdfFileReader(pdf)
print(reader.numPages)
print(reader.documentInfo)
for pagenum in range(reader.numPages):
print(reader.getPage(pagenum).extractText())
def name_list():
os.chdir(r'')
print(os.getcwd())
names= []
for i in os.listdir():
head, tail = os.path.splitext(i)
names.append(head)
names.sort()
print(names)
df = pd.DataFrame(index=names)
df.to_csv('mod_56.csv')
def build_test_1():
os.chdir(r'F:\cancelled')
print(os.getcwd())
names = []
for f in os.listdir():
root, ext = os.path.splitext(f)
root1 = root.strip()[0:11]
new_name = '{}'.format(root)
names.append(root)
df = pd.Series(names)
df.to_csv('cancelled.csv')
os.rename(f, new_name)
def build():
os.chdir(r'')
print(os.getcwd())
mod = []
canc = []
desc = []
for f in os.listdir():
root, ext = os.path.splitext(f)
modnum = root.strip()[0:11]
cancelled = root.strip()[12:21]
cancelled = cancelled.upper()
description = root.strip()[22:len(root)]
description = description.upper()
mod.append(modnum)
canc.append(cancelled)
desc.append(description)
df1 = pd.Series(mod)
df2 = pd.Series(desc)
df3 = pd.Series(canc)
df = pd.concat([df2, df1, df3], axis=1)
df.to_csv('desc.csv')
os.chdir(r'')
regmodnum = []
regdesc = []
for m in os.listdir():
root1, ext1 = os.path.splitext(m)
modnum = root1.strip()[0:11]
cancel = root1.strip()[12:len(root1)]
cancel = cancel.upper()
regmodnum.append(modnum)
regdesc.append(cancel)
df4 = pd.Series(regmodnum)
df5 = pd.Series(regdesc)
df6 = pd.concat([df5, df4], axis=1)
df7 = pd.concat([df, df6], ignore_index=True)
df7.to_csv('full_test.csv')
# build()
import tempfile
import base64
from PIL import Image
import numpy as np
import potrace
import os
import svgwrite
import base64
from wand.image import Image
I = numpy.asarray(PIL.Image.open('test.jpg'))
im = PIL.Image.fromarray(numpy.uint8(I))
# Load PNG Image
dwg = svgwrite.Drawing()
img = Image(filename=img2)
# Then get raw PNG data and encode DIRECTLY into the SVG file.
image_data = img.make_blob(format='png')
encoded = base64.b64encode(image_data).decode()
pngdata = 'data:image/png;base64,{}'.format(encoded)
image = dwg.add(dwg.image(href=(pngdata)))
# Bonus, the wand library lets you use ANY format image and encode it as a PNG
# You can also do cropping, transforms, etc on the image before encoding it.
# You can also embed an SVG inside an SVG the same way:
# Load SVG Image
img = Image(filename="my.svg")
# Then get raw SVG data and encode DIRECTLY into the SVG file.
image_data = img.make_blob() # Don't change its format, just use it as an SVG
encoded = base64.b64encode(image_data).decode()
svgdata = 'data:image/svg+xml;base64,{}'.format(encoded)
image = dwg.add(dwg.image(href=(svgdata)))
def testing_png_to_svg():
# im = np.asarray(Image.open(img1))
im = Image.open(img3)
print(type(im))
bmp = potrace.Bitmap(im)
path = bmp.trace()
im.save(path)
def t2():
startSvgTag = """<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<svg version="1.1"
xmlns="http://www.w3.org/2000/svg"
xmlns:xlink="http://www.w3.org/1999/xlink"
width="240px" height="240px" viewBox="0 0 240 240">"""
endSvgTag = """</svg>"""
for files in os.listdir("."):
if files.endswith(".png"):
# pngFile = open(files, 'rb')
# base64data = pngFile.read().decode("utf-8").replace('\n','')
# base64data = pngFile.read().encode("base64").replace('\n','')
with open(files, "rb") as image_file:
img_bytes = base64.b64encode(image_file.read())
base64String = '''<image xlink:href="data:image/png;base64,{0}" width="240" height="240" x="0" y="0" />'''.format(base64data)
f = open(os.path.splitext(files)[0]+".svg",'w')
f.write(startSvgTag + base64String + endSvgTag)
print('Converted '+ files + ' to ' + os.path.splitext(files)[0]+".svg")
def bitmap_to_svg():
data = np.zeros((32, 32), np.uint32)
data[8:32-8, 8:32-8] = 1
# Create a bitmap from the array
bmp = potrace.Bitmap(data)
# Trace the bitmap to a path
path = bmp.trace()
# Iterate over path curves
for curve in path:
print("start_point =", curve.start_point)
for segment in curve:
print(segment)
end_point_x, end_point_y = segment.end_point
if segment.is_corner:
c_x, c_y = segment.c
else:
c1_x, c1_y = segment.c1
c2_x, c2_y = segment.c2
def png_to_bmp(input_file, output_file='png_to_bmp.bmp'):
'''https://www.daniweb.com/posts/jump/1107225'''
img = Image.open(input_file)
print(len(img.split()))
if len(img.split()) == 4:
# prevent IOError: cannot write mode RGBA as BMP
r, g, b, a = img.split()
img = Image.merge("RGB", (r, g, b))
img.save(output_file)
else:
img.save(output_file)
png_to_bmp(img2)
def is_pdf_scanned(file, content_type):
is_text = ''.join(
[page.get_text() for page in fitz.open(stream=file, filetype=content_type)])
return {"text": is_text} if is_text else "scan"
@staticmethod
def check_pdf_contents(file, content_type):
is_text = ''.join([
page.get_text() for page in fitz.open(stream=file, filetype=content_type)])
if is_text:
logger.info('PDF Type: TEXT')
return is_text
else:
logger.info('PDF Type: IMAGE')
doc = Document(file)
pdfimages_out = RawImageOutput(doc)
img = pdfimages_out.get(0)
text = tess_ocr.tesseract_img_to_str(img)
return text
https://github.com/pymupdf/PyMuPDF/discussions/1213?sort=old
These PDFs are severely damaged. Impossible to tell why from this end.
The message may sound cryptic, but that PDF viewer compresses PDF objects into so-called objects streams (object type "Objstm" to save space. These are objects that contain other object definitions. Here, MuPDF cannot find objects in some of these compressed data.
Try to clean the file via mutool clean -gggg damaged.pdf .... May not work if the problems are too severe. If it fails you may be able to at least rescue a few pages via insert_pdf using a new file.
https://github.com/pymupdf/PyMuPDF/issues/1124