class ExampleConfig(BaseModel):
def to_dict(self, **kwargs):
# print(kwargs)
data = self.dict(by_alias=True)
# print(data)
data["_id"] = str(self.id)
return data
def to_mongo(self, **kwargs):
exclude_unset = kwargs.pop('exclude_unset', True)
by_alias = kwargs.pop('by_alias', True)
parsed = self.dict(exclude_unset=exclude_unset, by_alias=by_alias, **kwargs)
# Mongo uses `_id` as default key.
if '_id' not in parsed and 'id' in parsed:
parsed['_id'] = parsed.pop('id')
return parsed
@classmethod
def from_mongo(cls, data: dict):
"""Convert _id into "id". """
if not data:
return data
id = data.pop('_id', None)
return cls(**dict(data, id=id))
class Config:
anystr_strip_whitespace = True# same as str.strip()
allow_population_by_field_name = True
arbitrary_types_allowed = True
https://pydantic-docs.helpmanual.io/usage/types/#custom-data-types
https://github.com/i8enn/pydantic-odm/blob/master/pydantic_odm/encoders/mongodb.py
validation flow:
model_validator(pre=True):
Custom Type:
model_validator(pre=False):
__get_validators__ validate_1
pre=True
validate_1 February 12, 2021
https://stackoverflow.com/questions/68826089/pydantic-model-convert-uuid-to-string-when-calling-dict
import uuid
from pydantic import BaseModel
class NewUuid(str):
"""
Partial UK postcode validation. Note: this is just an example, and is not
intended for use in production; in particular this does NOT guarantee
a postcode exists, just that it has a valid format.
"""
@classmethod
def __get_validators__(cls):
# one or more validators may be yielded which will be called in the
# order to validate the input, each validator will receive as an input
# the value returned from the previous validator
yield cls.validate
@classmethod
def __modify_schema__(cls, field_schema):
# __modify_schema__ should mutate the dict it receives in place,
# the returned value will be ignored
field_schema.update(
# simplified regex here for brevity, see the wikipedia link above
pattern='^[A-F0-9a-f]{8}(-[A-F0-9a-f]{4}){3}-[A-F0-9a-f]{12}$',
# some example postcodes
examples=['4a33135d-8aa3-47ba-bcfd-faa297b7fb5b'],
)
@classmethod
def validate(cls, v):
if not isinstance(v, str):
raise TypeError('string required')
u = uuid.UUID(v)
# you could also return a string here which would mean model.post_code
# would be a string, pydantic won't care but you could end up with some
# confusion since the value's type won't match the type annotation
# exactly
return cls(f'{v}')
def __repr__(self):
return f'NewUuid({super().__repr__()})'
class Resource(BaseModel):
id: NewUuid
name: str
print('-' * 20)
resource_correct_id: Resource = Resource(id='e8991fd8-b655-45ff-996f-8bc1f60f31e0', name='Server2')
print(resource_correct_id)
print(resource_correct_id.id)
print(resource_correct_id.dict())
print('-' * 20)
resource_malformed_id: Resource = Resource(id='X8991fd8-b655-45ff-996f-8bc1f60f31e0', name='Server3')
print(resource_malformed_id)
print(resource_malformed_id.id)
class RegexTests(object):
'''
8,450.0
$5,546.40
sdad $44.
$18,323.00
$3.00
number_regex = re.compile(
# r'(?:'
# r'(^[$]?\d{1,}[,.]\d{1,}[,.]?\d{1,2})'
# r'^[$]?(\d{1,}[,.]\d{1,})|'
# r'^[$]?(\d{1,}[,.]\d{1,}[,.]\d{1,})'
# r')'
)
number_regex = re.compile(
# r'(?:'
r'(^[$]?\d{1,}[,.]\d{1,}[,.]?\d{1,2})'
# r')'
)
currency_regex = re.compile(
r'(^\$?\d{1,}[,.]\d{1,}[,.]?\d{1,2})'
)
'''
def match_1(self, text):
'''2281'''
match_1 = re.compile(r'(\$?\d{1,}[,.]\d{1,})')
return match_1.match(text)
def fullmatch_1(self, text):
return currency_regex.fullmatch(text, re.MULTILINE)
def search_1(self, text):
return currency_regex.search(text, re.MULTILINE)
class TextType:
@classmethod
def __get_validators__(cls): # type: ignore
yield cls.validate
@classmethod
def validate(cls, val: Any):
try:
return bson.decimal128.Decimal128(val)
except Exception as err:
try:
valid_int = int_validator(val)
return bson.int64.Int64(valid_int)
except Exception as err2:
try:
m = RegexTests().reg_1(val)
if m:
new_val = val.replace(',', '.')
new_val = new_val.replace('$', '')
try:
return bson.decimal128.Decimal128(new_val)
except Exception as err4:
return str(val)
return str(val)
except Exception as err3:
raise Exception(f'TextType Error: {err3}')
def __repr__(self):
return f'TextType: ({super().__repr__()})'
class BSONTextType:
@classmethod
def __get_validators__(cls): # type: ignore
yield cls.validate
@classmethod
def validate(cls, val: Any):
try:
return bson.decimal128.Decimal128(val)
except Exception as err:
try:
valid_int = int_validator(val)
return bson.int64.Int64(valid_int)
except Exception as err2:
try:
return str(val)
except Exception as err3:
raise Exception(f'TextType Error: {err3}')
def __repr__(self):
return f'BSONTextType: ({super().__repr__()})'
https://github.com/samuelcolvin/pydantic/blob/master/tests/test_create_model.py
def test_config_and_base():
with pytest.raises(errors.ConfigError):
create_model('FooModel', __config__=BaseModel.Config, __base__=BaseModel)
https://stackoverflow.com/questions/64944900/obtain-json-from-fastapi-using-pydantic-nested-models
https://pydantic-docs.helpmanual.io/usage/models/#custom-root-types
class Image(BaseModel):
url: str
name: str
class Images(BaseModel):
__root__: List[Image]
images_raw = '[{"url":"url1", "name":"name1"}, {"url":"url2", "name":"name2"}]'
images = parse_raw_as(Images, images_raw)
with open('./images.json', 'w') as f:
f.write(images.json(indent=2))
@app.post("/images/multiple/")
async def create_multiple_images(images: Images):
with open('./images.json', 'w') as f:
f.write(images.json(indent=2))
from typing import Dict, Any, Optional, cast, List
from enum import Enum
from pydantic import BaseModel, Field, validator, EmailStr, root_validator
from datetime import datetime, timezone
from bson import ObjectId
from ..fields import PyObjectId
from ..config import *
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid objectid")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
def __repr__(self):
return f'OID({super().__repr__()})'
class StripeInterval(str, Enum):
month = 'month'
year = 'year'
week = 'week'
day = 'day'
class StripeUsageType(str, Enum):
metered = 'metered'
licensed = 'licensed'
class StripeRecurring(BaseModel):
# https://stripe.com/docs/api/prices/object#price_object-recurring-interval
aggregate_usage: Optional[str]
interval: Optional[StripeInterval]
interval_count: Optional[int]
usage_type: Optional[StripeUsageType]
class ReadStripeProduct(BaseModel):
product_id: Optional[str]
product_img: Optional[str] = None
product_description: Optional[str]
price_id: Optional[str]
price: Optional[str]
quantity: Optional[int]
subtotal: Optional[float]
class UpdateStripePriceModel(BaseModel):
id: Optional[str]
object: Optional[str]
active: Optional[bool]
billing_scheme: Optional[str]
created: Optional[int]# unix epoch timestamp
currency: Optional[str]
livemode: Optional[bool]
lookup_key: Optional[str]
metadata: Dict[]
nickname: Optional[str]
product: Optional[str]
recurring: Optional[StripeRecurring]
tiers_mode: Optional[str]
transform_quantity: Optional[str]
type: Optional[str]
unit_amount: Optional[int]
unit_amount_decimal: Optional[str]
class Config:
arbitrary_types_allowed = True
schema_extra = {
"id": "price_2IfYgsLnbk9JaPUldCxTRlql",
"object": "price",
"active": "true",
"billing_scheme": "per_unit",
"created": 1618268230,
"currency": "cad",
"livemode": "false",
"lookup_key": "null",
"metadata": {},
"nickname": "null",
"product": "prod_JI8yfgDh9CAncO",
"recurring": {
"aggregate_usage": "null",
"interval": "month",
"interval_count": 1,
"usage_type": "licensed"
},
"tiers_mode": "null",
"transform_quantity": "null",
"type": "recurring",
"unit_amount": 2000,
"unit_amount_decimal": "2000"
}
class UpdateStripeProductModel(BaseModel):
id: Optional[str]
# object: Optional[str]
active: Optional[bool]
created: Optional[str]
description: Optional[str]
images: Optional[List]
image: Optional[str]
livemode: Optional[bool]
metadata: Optional[Dict]
name: Optional[str]
package_dimensions: Optional[str]
shippable: Optional[str]
statement_descriptor: Optional[str]
unit_label: Optional[str]
updated: Optional[str]
url: Optional[str]
class Config:
arbitrary_types_allowed = True
# https://pydantic-docs.helpmanual.io/usage/schema/#schema-customization
schema_extra = {
"id": "prod_JcBrwc3oQkwsTJ",
"object": "product",
"active": 'true',
"created": '1622891745',
"description": 'null',
"images": '[]',
"livemode": 'false',
"metadata": '{}',
"name": "Subscriber (1 Week Subscription) for order 73A8A6DB23",
"package_dimensions": 'null',
"shippable": 'null',
"statement_descriptor": 'null',
"unit_label": 'null',
"updated": '1622891745',
"url": 'null'
}
from pydantic import BaseModel, Field
from typing import List
class CatalogBase(BaseModel):
no: int = Field(None, description="always | The numeric post ID | any positive integer")
resto: int = Field(None, description="always | For replies: this is the ID of the thread being replied to. For OP: this value is zero |`0` or `Any positive integer")
sticky: int = Field(None, description="OP only, if thread is currently stickied | If the thread is being pinned to the top of the page| `1` or not set")
closed: int = Field(None, description="OP only, if thread is currently closed | If the thread is closed to replies | `1` or not set")
now: str = Field(None, description="always | MM/DD/YY(Day)HH:MM (:SS on some boards), EST/EDT timezone | `string")
time: int = Field(None, description="always | UNIX timestamp the post was created | UNIX timestamp")
name: str = Field(None, description="always | Name user posted with. Defaults to Anonymous | any string")
trip: str = Field(None, description="if post has tripcode | The users tripcode, in format: !tripcode or !!securetripcode| any string")
id: str = Field(None, description="if post has ID | posters ID | any 8 chars")
capcode: str = Field(None, description="if post has capcode | The capcode identifier for a post | Not set, mod, admin, admin_highlight, manager, developer, founder")
country: str = Field(None, description="if country flags are enabled | Posters [ISO 3166-1 alpha-2 country code](https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 | 2 character string or XX if unknown")
country_name: str = Field(None, description="if country flags are enabled | Posters country name | Name of any country")
sub: str = Field(None, description="OP only, if subject was included| OP Subject text | any string")
com: str = Field(None, description="if comment was included | Comment (HTML escaped) | any HTML escaped string")
tim: int = Field(None, description="always if post has attachment | Unix timestamp + microtime that an image was uploaded | integer")
filename: str = Field(None, description="always if post has attachment | Filename as it appeared on the poster's device | any string")
ext: str = Field(None, description="always if post has attachment | Filetype | jpg, png, gif, pdf, swf, webm")
fsize: int = Field(None, description="always if post has attachment | Size of uploaded file in bytes | any integer")
md5: str = Field(None, description="always if post has attachment | 24 character, packed base64 MD5 hash of file")
w: int = Field(None, description="always if post has attachment | Image width dimension | `any integer")
h: int = Field(None, description="always if post has attachment | Image height dimension | `any integer")
tn_w: int = Field(None, description="always if post has attachment | Thumbnail image width dimension | any integer")
tn_h: int = Field(None, description="always if post has attachment | Thumbnail image height dimension | any integer")
filedeleted: int = Field(None, description="if post had attachment and attachment is deleted | If the file was deleted from the post | `1` or not set")
spoiler: int = Field(None, description="if post has attachment and attachment is spoilered | If the image was spoilered or not | `1` or not set")
custom_spoiler: int = Field(None, description="if post has attachment and attachment is spoilered | The custom spoiler ID for a spoilered image | `1-10` or not set |")
omitted_posts: int = Field(None, description="OP only| Number of replies minus the number of previewed replies | `any integer` |")
omitted_images: int = Field(None, description="OP only| Number of image replies minus the number of previewed image replies | `any integer` |")
replies: int = Field(None, description="OP only | Total number of replies to a thread | any integer")
images: int = Field(None, description="OP only | Total number of image replies to a thread | any integer")
bumplimit: int = Field(None, description="OP only, only if bump limit has been reached | If a thread has reached bumplimit, it will no longer bump | `1` or not set |")
imagelimit: int = Field(None, description="OP only, only if image limit has been reached | If an image has reached image limit, no more image replies can be made | `1` or not set |")
last_modified: int = Field(None, description="OP only | UNIX timestamp marking last time thread was modified post | added/modified/deleted, thread closed/sticky settings modified | `UNIX Timestamp")
tag: str = Field(None, description="OP only, /f/ only | The category of `.swf` upload |`Game`, `Loop`, etc")
semantic_url: str = Field(None, description="OP only | SEO URL slug for thread | `string` |")
since4pass: int = Field(None, description="if poster put 'since4pass' in the options field` | Year 4chan pass bought | `any 4 digit year`")
unique_ips: int = Field(None, description="OP only | Number of unique posters in a thread | any integer")
m_img: int = Field(None, description="any post that has a mobile-optimized image` | Mobile optimized image exists for post | 1 or not set")
class CatalogThread(CatalogBase):
'''
Helps us keep code "DRY"
- Do not
- Repeat
- Yourself
'''
# catalog OP only | JSON representation of the most recent replies to a thread | array of JSON post objects")
last_replies: List[CatalogBase] = []
def get_catalog_threads() -> List[CatalogThread]:
url = 'https://a.4cdn.org/wg/catalog.json'
data = requests.get(url).json()
all_posts = []
for page in data:
for thread in page['threads']:
all_posts.append(CatalogThread(**thread))
return all_posts