The connection URI
import re
from bs4 import BeautifulSoup
from datetime import datetime as dt
from datetime import date
from os import environ
import urllib
from mongoengine import (connect, Document, queryset_manager,
QuerySet, ListField, DynamicField,
DynamicDocument, DateTimeField,
StringField, ReferenceField, DateField,
CASCADE, BooleanField, GenericReferenceField)
uri = "mongodb+srv://<username>:" + urllib.parse.quote('<your_password>') + "@gcp-eoo-k8l5e.gcp.mongodb.net/<collection>?retryWrites=true&w=majority"
connect(
db='mongoengine',
username=str(environ.get('MONGO_USERNAME')),
password=str(environ.get('MONGO_PASSWORD')),
host=uri)
Since the 4chan API is a large volume of semi-structured text data exclusively, MongoDB is a great choice, as it has a flexible schema, making it easy to work with when develping.
It also has built in text search functionality. I like mongoengine as it allows for explicitly defined models. For people comfortable with sqlalchemy, it is easy to learn.
There are two models.
class Threads(DynamicDocument):
#124205675 post number
no = StringField(max_length=200, required=False)
# Thread creators comment
com = StringField(max_length=200000, required=False)
# User ID hash
ids = StringField(max_length=200, required=False)
# Thread Title
semantic_url = StringField(max_length=200, required=False, primary_key=True)
# comment_ref = ReferenceField(Comments, reverse_delete_rule=CASCADE)
my_time = DateTimeField(default=dt.utcnow)
meta = {
'indexes': [{
'fields': ['$no', '$ids'],
'default_language': 'english',
# 'weights': {'no': 10, 'ids': 2}
}],
"ordering": "no",
'collection': 'threads',
}
This is the comments model.
You can see I am using the @queryset_manager. This is a more advanced feature of mongoengine that allows you to customize your query. See the section on MongoDB in databases for more info.
class Comments(DynamicDocument):
no = StringField(max_length=200, required=False, primary_key=True)
com = StringField(max_length=2000000, required=False)
ids = StringField(max_length=200, required=False)
resto = StringField(max_length=200, required=False)
thread_ref = ReferenceField(Threads, reverse_delete_rule=CASCADE)
my_time = DateTimeField(default=dt.utcnow)
meta = {
"indexes": [{
'fields': ['$no'],
#'fields': ['$no', '$com'],
'default_language': 'english',
#'unique': 'false'
}],
"ordering": "resto",
'queryset_class': Pipelines,
'collection': 'comments'}
@queryset_manager
def update_clean_comments(doc_cls, queryset):
for _c in doc_cls.objects:
_index = str(_c.id)
com_processed = doc_cls.strip_stop_words_spacy(_c['com'])
return queryset.filter(id=_index).update(com=com_processed)
def __unicode__(self):
return f'{self.com}'
def __repr__(self):
return f'{self.com}'
This is a class with helper/utility functions for processing the threads.
class BaseMetaAbstract(object):
def _substitute_text_pattern(self, pat, replacement, text):
text = re.sub(pat, replacement, text)
return str(text)
def _parse_html_and_strip_special_chars(self, df):
soup = BeautifulSoup(str(df), 'lxml')
df = self._substitute_text_pattern(r'>|>>|>>>|>>>>|\d{9}', '', soup.get_text())
return df
def strip_stop_words_spacy(self, df):
df = self._parse_html_and_strip_special_chars(df)
doc = nlp(str(df))
return ' '.join([str(w) for w in doc if not w.is_stop])
def strip_stop_words(self, df):
df = self._parse_html_and_strip_special_chars(df)
return ' '.join([x for x in df.split(' ') if x not in Stops.stop_words])
def strip_stop_words_jobs(self, df):
df = df.lower()
return ' '.join([x for x in df.split(' ') if x not in Stops.stop_words])
def clean_and_update_comments(self):
for _c in Comments.objects:
com_processed = self.strip_stop_words_spacy(_c['com'])
Comments.objects(id=_c.id).update(com=com_processed)
print(com_processed)
# return True
This is the main class to pull data from the API.
The two _ methods are generators for memory efficiency.
class GetChan(BaseMetaAbstract):
_page = {}
_page_num = 0
def _catalog_page_generator(self, x):
for page_num in x[self._page_num]['threads']:
yield page_num
def _catalog_generator(self, x):
for idx, page in enumerate(x):
for thread in x[idx]['threads']:
yield thread
def get_catalog_page(self, board='pol', page_num=9):
r = requests.get(f'https://a.4cdn.org/{board}/catalog.json').json()
self._page_num = page_num
for threads in self._catalog_page_generator(r):
self._page['no'] = str(threads.get('no'))
self._page['com'] = threads.get('com')
self._page['id'] = threads.get('id')
self._page['semantic_url'] = threads.get('semantic_url')
thread_pages = Threads(
no=self._page['no'],
com=self._page['com'],
ids=self._page['id'],
semantic_url=self._page['semantic_url']
).save()
if 'last_replies' in threads:
for comments in threads['last_replies']:
self._page['no'] = str(comments.get('no'))
self._page['com'] = comments.get('com')
self._page['id'] = comments.get('id')
self._page['resto'] = str(comments.get('resto'))
thread_comments = Comments(
no=self._page['no'],
com=self._page['com'],
ids=self._page['id'],
resto=self._page['resto'],
thread_ref=thread_pages).save()
print(self._page['com'])
print('Complete..Updating...')
print('Update Complete...')
return self._page
To run the code.
c = GetChan()
c.get_catalog_page('pol', 9)