def get_base64_encoded_image(image_path):
with open(image_path, "rb") as img_file:
return base64.b64encode(img_file.read()).decode('utf-8')
def convert_img():
img = 'astronautcrawling.jpg'
# image_file = u.image_field.read()
f = open(img, 'rb')
f = base64.b64encode(f.read()).decode('utf-8')
# f = get_base64_encoded_image(img)
print(f)
localhost:<port>/graphql
~/application/__init__.py
def create_app():
app = Flask(__name__, instance_relative_config=False)
app.config.from_object('config.Config')
app.register_error_handler(404, page_not_found)
uri = "mongodb+srv://lmin:" + urllib.parse.quote('s') + "@-le5e.ssd.mongodb.net/sssw?retryWrites=true&w=majority"
# app.mongo = connect(db='mongoengine', username=str(os.environ.get('MONGO_USERNAME')), password=str(os.environ.get('MONGO_PASSWORD')), host=uri)
# app.add_url_rule("/graphql", view_func=GraphQLView.as_view("graphql", schema=schema, graphiql=True))
app.config['MONGODB_SETTINGS'] = {
'db': 'mongoengine',
'username': str(os.environ.get('MONGO_USERNAME')),
'password': str(os.environ.get('MONGO_PASSWORD')),
'host': uri,
}
# app.sheets = gspread.service_account(os.environ.get('GOOGLE_SHEET_KEYS'))
# app.config['SIMPLEMDE_JS_IIFE'] = True
# app.config['SIMPLEMDE_USE_CDN'] = True
# SimpleMDE(app)
md = Markdown(
app,
extensions=['footnotes'],
extension_configs={'footnotes': ('PLACE_MARKER','~~~~~~~~')},
safe_mode=True,
output_format='html4'
)
app.pusher = Pusher(
app_id=os.getenv('PUSHER_APP_ID'),
key=os.getenv('PUSHER_APP_KEY'),
secret=os.getenv('PUSHER_APP_SECRET'),
cluster=os.getenv('PUSHER_APP_CLUSTER'),
ssl=True)
assets.init_app(app)
login_manager.init_app(app)
mail.init_app(app)
babel = Babel(app)
mondb.init_app(app)
sess.init_app(app)
mde.init_app(app)
with app.app_context():
from .auth_mongobp import auth_mongo_routes
from .main_bp import main_routes
from .dashboard_mongo_bp import dashboard_routes
# from .scripts.schema import schema
# https://github.com/graphql-python/flask-graphql
# app.add_url_rule("/graphql", view_func=GraphQLView.as_view("graphql",schema=schema, graphiql=True,pretty=True))
app.register_blueprint(main_routes.main_bp)
app.register_blueprint(auth_mongo_routes.auth_mongo_bp)
app.register_blueprint(dashboard_routes.dashboard_bp)
# app.register_blueprint(finance_routes.finance_bp)
# app.register_blueprint(stripe_routes.stripe_bp)
# fb = firestore.client()
compile_assets(assets)
return app
@dashboard_sales_bp.route('/dashboard_user_analytics/data', methods=['GET', 'POST'])
def big_chart():
cust = CustomerMaster.objects.only('start_date')
df = DataFrame([(d.start_date) for d in cust], columns=['start_date'])
res = df.start_date.dt.month_name().value_counts()
chart_labels = res.index.tolist()
chart_data = res.values.tolist()
if request.args.get('id') == 'Chart2':
_customers_per_year = (df.start_date.dt.year).value_counts().sort_index()
_customers_per_year_index = _customers_per_year.index.tolist()
_customers_per_year_values = _customers_per_year.values.tolist()
return {'results': _customers_per_year_values,
'chart_labels': _customers_per_year_index}
if request.args.get('id') == 'Chart3':
return {'results': [1, 1, 1, 1, 211],
'chart_labels': chart_labels}
# This is chart 1 which loads on first paint
return {'results': chart_data,
'chart_labels': chart_labels}
<script type="text/javascript">
gradientBarChartConfiguration = {
maintainAspectRatio: false,
legend: {
display: false
},
tooltips: {
backgroundColor: '#f5f5f5',
titleFontColor: '#333',
bodyFontColor: '#666',
bodySpacing: 4,
xPadding: 12,
mode: "nearest",
intersect: 0,
position: "nearest"
},
responsive: true,
scales: {
yAxes: [{
gridLines: {
drawBorder: false,
color: 'rgba(29,140,248,0.1)',
zeroLineColor: "transparent",
},
ticks: {
suggestedMin: 60,
suggestedMax: 120,
padding: 20,
fontColor: "#9e9e9e"
}
}],
xAxes: [{
gridLines: {
drawBorder: false,
color: 'rgba(29,140,248,0.1)',
zeroLineColor: "transparent",
},
ticks: {
padding: 20,
fontColor: "#9e9e9e"
}
}]
}
};
var ctx = document.getElementById('chartBig1').getContext('2d');
var gradientStroke = ctx.createLinearGradient(0, 230, 0, 50);
gradientStroke.addColorStop(1, 'rgba(72,72,176,0.1)');
gradientStroke.addColorStop(0.4, 'rgba(72,72,176,0.0)');
gradientStroke.addColorStop(0, 'rgba(119,52,169,0)');
var config = {
type: 'bar',
data: {
// labels: ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'],
datasets: [{
backgroundColor: gradientStroke,
borderColor: '#d346b1',
borderWidth: 2,
borderDash: [],
// data: results.results,
}]
},
options: gradientBarChartConfiguration
};
var myChartData = new Chart(ctx, config);
var getData = $.get('/dashboard_user_analytics/data', { 'id': 'Chart1' });
getData.done(function (results) {
var data = myChartData.config.data;
data.datasets[0].data = results.results;
data.labels = results.chart_labels;
myChartData.update();
});
function updateChart(id) {
var updateData = $.get('/dashboard_user_analytics/data', { 'id': id });
updateData.done(function (results) {
var data = myChartData.config.data;
data.datasets[0].data = results.results;
data.labels = results.chart_labels;
myChartData.update();
});
}
$('.chartButton').on('click', function () {
var id = $(this).attr('id');
updateChart(id);
});
</script>
DONT UPDATE PLOT USING JINJA DEFINED
<div class='col-8'>
{% if bar2 is defined and layout2 is defined %}
<div class="chart" id="plot2">
<script>
var graphs = {{ bar2 | safe}};
var layout = {{ layout2 | safe}};
Plotly.newPlot('plot2', graphs, layout);
</script>
</div>
{% endif %}
</div>
<script>
var getData = $.get('/dashboard_user_analytics/data', {'id': 'Chart1'});
getData.done(function(results) {
var data = {
labels: ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'],
series: [
results.results
]
};
var options = {
width: 800,
height: 600
}
myChart = new Chartist.Bar('.ct-chart', data, options);
});
function updateChart(id) {
var updateData = $.get('/dashboard_user_analytics/data', {'id': id});
updateData.done(function(results) {
var data = {
labels: ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'],
series: [
results.results
]
};
myChart.update(data);
});
}
$('.chartButton').on('click', function() {
var id = $(this).attr('id');
updateChart(id);
});
</script>
<script type="text/javascript" src="https://www.gstatic.com/charts/loader.js"></script>
<script type="text/javascript">
google.charts.load("current", {packages:["corechart"]});
google.charts.setOnLoadCallback(drawChart);
function drawChart() {
var data = google.visualization.arrayToDataTable([
{% for key, value in data.items() %}
{% if value is string %}
['{{ key }}', '{{ value }}'],
{% else %}
['{{ key }}', {{ value }}],
{% endif %}
{% endfor %}
]);
var options = {
title: 'My Daily Activities',
is3D: true,
//pieHole: 0.5
pieStartAngle: 100
/*slices: {
2: {offset: 0.2},
3: {offset: 0.3}
}*/
/*slices: {
1: { color: 'transparent' }
}*/
};
var chart = new google.visualization.PieChart(document.getElementById('piechart_3d'));
chart.draw(data, options);
}
</script>
https://flask.palletsprojects.com/en/1.1.x/patterns/viewdecorators/
https://pythonise.com/series/learning-flask/custom-flask-decorators
http://zetcode.com/python/python-decorators/
https://realpython.com/primer-on-python-decorators/
https://blog.miguelgrinberg.com/post/the-ultimate-guide-to-python-decorators-part-i-function-registration
https://www.blog.pythonlibrary.org/2016/06/09/python-how-to-create-an-exception-logging-decorator/
This is what I use currently.
This works for both flask-sqlalchemy and mongoengine with a role
field in the UserMixin
model.
def role_required(*roles):
"""
Does a user have permission to view this page?
:param *roles: 1 or more allowed roles
:return: Function
"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if current_user.role not in roles:
flash('You do not have permission to do that.', 'error')
return redirect('/')
return f(*args, **kwargs)
return decorated_function
return decorator
https://realpython.com/primer-on-python-decorators/
from flask import Flask, g, request, redirect, url_for
import functools
app = Flask(__name__)
def login_required(func):
"""Make sure user is logged in before proceeding"""
@functools.wraps(func)
def wrapper_login_required(*args, **kwargs):
if g.user is None:
return redirect(url_for("login", next=request.url))
return func(*args, **kwargs)
return wrapper_login_required
@app.route("/secret")
@login_required
def secret():
or
def has_role(name):
'''p.121
functools.update_wrapper is needed so the decorated function returns the
function definition instead of the wrapper definition; else, we would lose the routing
'''
def real_decorator(f):
def wraps(*args, **kwargs):
if current_user.has_role(name):
return f(*args, **kwargs)
else:
abort(403)
return functools.update_wrapper(wraps, f)
or
def role_required(name):
'''p.121
functools.update_wrapper is needed so the decorated function returns the
function definition instead of the wrapper definition; else, we would lose the routing
'''
def real_decorator(f):
def wraps(*args, **kwargs):
if current_user.role != name:
return f(*args, **kwargs)
else:
abort(403)
return functools.update_wrapper(wraps, f)
return real_decorator
~/application/__init__.py
import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler
import logging
import logging.handlers
from config import Config
def create_app(config_class=Config):
app = Flask(__name__, instance_relative_config=False)
app.config.from_object(config_class)
client = google.cloud.logging.Client()
# Retrieves a Cloud Logging handler based on the environment
# you're running in and integrates the handler with the Python logging module.
# By default this captures all logs at INFO level and higher
client.get_default_handler()
client.setup_logging()
handler = CloudLoggingHandler(client)
# handler = logging.handlers.RotatingFileHandler('app.log', maxBytes=1024 * 1024)
handler.setLevel(logging.INFO)
app.logger.addHandler(handler)
https://googleapis.dev/python/logging/latest/stdlib-usage.html#integration-with-python-logging-module
import logging
import google.cloud.logging # Don't conflict with standard logging
from google.cloud.logging.handlers import CloudLoggingHandler
client = google.cloud.logging.Client()
handler = CloudLoggingHandler(client)
cloud_logger = logging.getLogger('cloudLogger')
cloud_logger.setLevel(logging.INFO) # defaults to WARN
cloud_logger.addHandler(handler)
cloud_logger.error('bad news')
https://googleapis.dev/python/logging/latest/usage.html#integration-with-python-logging-module
import logging
handler = client.get_default_handler()
cloud_logger = logging.getLogger("cloudLogger")
cloud_logger.setLevel(logging.INFO)
cloud_logger.addHandler(handler)
cloud_logger.error("bad news")
client.setup_logging(log_level=logging.INFO)
from google.cloud.logging.handlers import CloudLoggingHandler
handler = CloudLoggingHandler(client)
cloud_logger = logging.getLogger("cloudLogger")
cloud_logger.setLevel(logging.INFO)
cloud_logger.addHandler(handler)
cloud_logger.error("bad news")
https://googleapis.dev/python/logging/latest/handlers.html#module-google.cloud.logging.handlers.handlers
import logging
import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler
client = google.cloud.logging.Client()
handler = CloudLoggingHandler(client)
cloud_logger = logging.getLogger('cloudLogger')
cloud_logger.setLevel(logging.INFO)
cloud_logger.addHandler(handler)
cloud_logger.error('bad news') # API call
https://googleapis.dev/python/logging/latest/handlers.html#module-google.cloud.logging.handlers.handlers
import logging
import google.cloud.logging
from google.cloud.logging.handlers import CloudLoggingHandler
client = google.cloud.logging.Client()
handler = CloudLoggingHandler(client)
google.cloud.logging.handlers.setup_logging(handler)
logging.getLogger().setLevel(logging.DEBUG)
logging.error('bad news') # API call
@app.after_request
def after_request(response):
""" Logging after every request. """
logger = logging.getLogger("app.access")
logger.info(
"%s [%s] %s %s %s %s %s %s %s",
request.remote_addr,
dt.utcnow().strftime("%d/%b/%Y:%H:%M:%S.%f")[:-3],
request.method,
request.path,
request.scheme,
response.status,
response.content_length,
request.referrer,
request.user_agent,
)
return response
Example 6-5. hello.py: asynchronous email support
from threading import Thread
def send_async_email(app, msg):
with app.app_context():
mail.send(msg)
def send_email(to, subject, template, **kwargs):
msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + subject,
sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to])
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
thr = Thread(target=send_async_email, args=[app, msg])
thr.start()
return thr
http://shon.github.io/httpagentparser/
https://pusher.com/tutorials/web-traffic-monitor-python
https://www.iplocate.io/pricing
https://ipapi.co/
https://ipstack.com/
https://stackoverflow.com/questions/23347387/x-forwarded-proto-and-flask
class RequestAnalytics(mondb.DynamicDocument):
request_remote_addr = mondb.StringField(nullable=True, default='NaN')
user_ip = mondb.StringField(nullable=True, default='NaN')
country_code =mondb.StringField(nullable=True, default='NaN')
region_code = mondb.StringField(nullable=True, default='NaN')
city_code = mondb.StringField(nullable=True, default='NaN')
zip_code = mondb.StringField(nullable=True, default='NaN')
geo_point = GeoPointField(nullable=True, default='NaN')
user_os = mondb.StringField(nullable=True, default='NaN')
user_os_version = mondb.StringField(nullable=True, default='NaN')
user_browser = mondb.StringField(nullable=True, default='NaN')
user_browser_version = mondb.StringField(nullable=True, default='NaN')
is_bot = mondb.StringField(nullable=True, default='NaN')
user_ref = mondb.ReferenceField(User)
meta = {'collection': 'RequestAnalytics'}
def get_geo(ip):
query = f'http://api.ipstack.com/{ip}?access_key=26944'
get_ip_geo = requests.get(query).json()
return get_ip_geo
def get_ip_locate(ip):
query = "https://www.iplocate.io/api/lookup/" + ip
get_ip_geo = requests.get(query).json()
return get_ip_geo
import httpagentparser
@dashboard_user_bp.before_request
def get_request_analytics():
if current_user.is_authenticated:
current_user.update_last_login()
userInfo = httpagentparser.detect(request.headers.get('User-Agent'))
userOS = userInfo['platform']['name']
userOS_version = userInfo['platform'].get('version', 'No Version')
userBrowser = userInfo['browser']['name']
userBrowser_version = userInfo['browser']['version']
is_bot = 'Bot' if userInfo.get('bot') else 'Not Bot'
# userIP = '10.35.58.39' if request.remote_addr == '127.0.0.1' else request.remote_addr
# userIP = '1.17.28.39' if request.headers['X-Forwarded-For'] == '127.0.0.1' else request.remote_addr
# userIP = '0.23.8.5' if request.headers.get('X-Forwarded-For') == '127.0.0.1' else request.remote_addr
userIP = request.headers.get('X-Forwarded-For', '72.82.4')
userAddr = '2.1.18.0' if request.remote_addr == '127.0.0.1' else request.remote_addr
try:
analytics = RequestAnalytics(
request_remote_addr=userAddr,
user_ip=userIP,
country_code=get_geo(userIP)['country_code'],
region_code=get_geo(userIP)['region_code'],
city_code=get_geo(userIP)['city'],
zip_code=get_geo(userIP)['zip'],
geo_point=(get_geo(userIP)['latitude'], get_geo(userIP)['longitude']),
user_os=userOS,
user_os_version=userOS_version,
user_browser = userBrowser,
user_browser_version=userBrowser_version,
is_bot=is_bot,
user_ref=current_user.id
).save()
except KeyError as e:
pass
or
@dashboard_user_bp.before_request
def get_request_analytics():
userInfo = httpagentparser.detect(request.headers.get('User-Agent'))
# {'platform': {'name': 'Linux', 'version': None}, 'os': {'name': 'Linux'}, 'bot': False, 'browser': {'name': 'Chrome', 'version': '81.0.4044.138'}}
print(userInfo)
userOS = userInfo['platform']['name']; print('user OS: ', userOS)
userBrowser = userInfo['browser']['name']; print(userBrowser)
userIP = "72.229.28.185" if request.remote_addr == '127.0.0.1' else request.remote_addr
print(userIP)
api = "https://www.iplocate.io/api/lookup/" + userIP
try:
resp = urllib.request.urlopen(api)
result = resp.read()
result = json.loads(result.decode("utf-8"))
userCountry = result["country"]
userContinent = result["continent"]
userCity = result["city"]
except:
print("Could not find: ", userIP)
https://faker.readthedocs.io/en/stable/index.html
https://python.libhunt.com/church-alternatives
class GenerateReports:
def customer_invoice(self, html_template='invoices/invoice_base.html', css_file=None, _dataframe=None):
pdf_template = render_template(
html_template,
title='.....',
name='...',
address='...',
vendor_address='...',
date=dt.today(),
table_df=_dataframe
)
pdf = pdfkit.from_string(pdf_template, False)
response = make_response(pdf)
response.headers['content-Type']='application/pdf'
response.headers['content-Disposition']= f"inline: filename={dt.today()}.pdf"
# response.headers['content-Disposition']= f"attachment; filename={dt.today()}.pdf"
return response
def create_excel_invoice(self, data, sheet_name=None):
ei = ExcelInvoice()
sheet_save_path = os.path.join(current_app.root_path, 'static/spread_sheets', f'{sheet_name}')
ei.excel_invoice(sheet_save_path, data)
# wb.save(sheet_path)
# return send_from_directory('static/spread_sheets', sheet_name)
fp = os.path.join(current_app.root_path, 'static/spread_sheets')
try:
return send_from_directory(fp, filename=sheet_name, as_attachment=True)
except FileNotFoundError:
abort(404)
def create_excel_invoice(self, sheet_name=None):
wb = Workbook()
ws = wb.active
wb.save(f'{sheet_name}')
sheet_path = os.path.join(current_app.root_path, 'static/spread_sheets', f'{sheet_name}')
output = ws.make_response()
output.headers["Content-Disposition"] = f"attachment; filename=sheet.xlsx"
output.headers["Content-type"] = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
return output
'''PDF INVOICE -- PDF_KIT -- GEN_REPORTS'''
@dashboard_bp.route('/user_order_invoice', methods=['GET', 'POST'])
@login_required
def user_order_invoice():
if request.method == 'POST':
order_invoice = request.form.get('user_order_invoice')
complete_order = MOrder.objects(invoice_num=order_invoice).all()
df = pd.DataFrame([(d.invoice_num, d.sku, d.description, d.quantity, d.price, d.status) for d in complete_order], columns=['Invoice #', 'Sku', 'Description', 'Qty', 'Price', 'Status'])
return gen_reports.customer_invoice()
return redirect(request.referrer)
@dashboard_bp.route('/<invoice_num>admin_order_invoice.pdf', methods=['GET', 'POST'])
def admin_order_invoice(invoice_num):
if request.method == 'POST':
# order_invoice = request.form.get('user_order_invoice')
complete_order = MOrder.objects(invoice_num=invoice_num).all()
weasy_table = WeazyInvoice(complete_order)
order_total_price = MOrder.objects(invoice_num=invoice_num).sum('price')
html = render_template(
'invoices/usmca-comm.html',
# 'invoices/shipping_weasyprint.html',
# invoice_number=str(invoice_num),
# todays_date=date.today(),
from_name='dasdasda',
from_street='dfsdfsdfs',
to_name='dasdasd',
to_',
# weasy_table=weasy_table,
# order_total_price=str(order_total_price)
)
return render_pdf(HTML(string=html))
Mimetypes:
Mimetypes for the integration of upload/download of excel files into a flask application.
The following was taken from the Flask-Excel
github repo.
https://github.com/pyexcel-webwares/Flask-Excel/blob/master/tests/test_upload_n_download_excel.py
_XLSM_MIME = ("application/" +
"vnd.openxmlformats-officedocument.spreadsheetml.sheet")
FILE_TYPE_MIME_TABLE = {
"csv": "text/csv",
"tsv": "text/tab-separated-values",
"csvz": "application/zip",
"tsvz": "application/zip",
"ods": "application/vnd.oasis.opendocument.spreadsheet",
"xls": "application/vnd.ms-excel",
"xlsx": _XLSM_MIME,
"xlsm": "application/vnd.ms-excel.sheet.macroenabled.12"
}
https://www.iana.org/assignments/media-types/media-types.xhtml
https://www.iana.org/assignments/media-types/application/vnd.ms-excel
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type
https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types
https://mourafiq.com/2016/01/01/generating-excel-report-with-python.html
Example 1:
cursor.execute("SELECT emp_id, emp_first_name, emp_last_name, emp_designation FROM employees")
result = cursor.fetchall()
#output in bytes
output = io.BytesIO()
#create WorkBook object
workbook = xlwt.Workbook()
#add a sheet
sh = workbook.add_sheet('Employee Report')
#add headers
sh.write(0, 0, 'Emp Id')
sh.write(0, 1, 'Emp First Name')
sh.write(0, 2, 'Emp Last Name')
sh.write(0, 3, 'Designation')
idx = 0
for row in result:
sh.write(idx+1, 0, str(row['emp_id']))
sh.write(idx+1, 1, row['emp_first_name'])
sh.write(idx+1, 2, row['emp_last_name'])
sh.write(idx+1, 3, row['emp_designation'])
idx += 1
workbook.save(output)
output.seek(0)
return Response(output, mimetype="application/ms-excel", headers={"Content-Disposition":"attachment;filename=employee_report.xls"})
def excel_invoice(self, mongo_q):
ship_ref = ''
ship_addr = ''
_inv_num = ''
_sku = ''
_desc = ''
_qty = ''
for i in mongo_q:
for x in i.address_ref:
if x.invoice_address not in ship_addr:
ship_addr += x.invoice_address
if i.invoice_num not in ship_ref:
ship_ref += i.invoice_num + '\n'
_inv_num += i.invoice_num + '\n'
_sku += i.sku + '\n'
_desc += i.description + '\n'
_qty += str(i.quantity) + '\n'
header_row_start = 21
header_lst = ['invoice_num', 'sku', 'description', 'quantity', 'price', 'origin_country']
q = mongo_q.to_json()
json_query = json.loads(q)
for col_idx, col_title_list in enumerate(header_lst, 1):
self.ws.cell(row=header_row_start, column=col_idx).value = col_title_list
for row_number, row_data in enumerate(json_query, header_row_start + 1):
self.ws.cell(row=row_number, column=1).value = row_data['sku']
self.ws.cell(row=row_number, column=2).value = row_data['description']
self.ws.cell(row=row_number, column=3).value = row_data['quantity']
self.ws.cell(row=row_number, column=4).value = row_data['price']
self.ws.cell(row=row_number, column=5).value = row_data['invoice_num']
print('row_data: --', row_data['invoice_num'])
df = pd.DataFrame(
[(d.invoice_num, d.sku, d.description, d.quantity, d.price, d.origin_country) for d in mongo_q], columns=['invoice_num', 'sku', 'description', 'quantity', 'price', 'origin_country'])
df['Sub-Total'] = df['price'] * df['quantity']
print(df)
dff = dataframe_to_rows(df, index=False)
for idx, val in enumerate(dff, 15):
print(idx)
print(val)
self.ws.cell(row=idx, column=1, value=val[0])
self.ws.cell(row=idx, column=2, value=val[1])
self.ws.cell(row=idx, column=3, value=val[2])
self.ws.cell(row=idx, column=6, value=val[3])
self.ws.cell(row=idx, column=7, value=val[4])
self.ws.cell(row=idx, column=8, value=val[5])
self.ws.cell(row=idx, column=9, value=val[6])
_row = self.ws.row_dimensions[idx]
_row.font = Font(size='8')
self.ws.title = str(date.today())
return self.wb
or
output_filename = f'Invoice-{str(datetime.now())}.xlsx'
basedir = os.path.abspath('application')
sheet_save_path = os.path.join(basedir, 'static/spread_sheets', output_filename)
with NamedTemporaryFile(delete=True) as tmp:
file = BytesIO(tmp.read())
# save_virtual_workbook(self.wb)
# save_workbook(self.wb, tmp.name)
self.wb.save(file)
file.seek(0)
# tmp.seek(0)
try:
return send_file(file, mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", cache_timeout=3, add_etags=False, as_attachment=True, attachment_filename=output_filename)
# return send_from_directory(tmp.name, as_attachment=True, filename=output_filename)
except FileNotFoundError:
abort(404)
finally:
tmp.close()
file = BytesIO()
self.wb.save(file)
file.seek(0)
fp = os.path.join(basedir, 'static/spread_sheets')
try:
return send_file(file, attachment_filename=output_filename, as_attachment=True)
return send_from_directory(fp, filename=output_filename, as_attachment=True)
except FileNotFoundError:
abort(404)
finally:
os.remove(sheet_save_path)
https://flask-table.readthedocs.io/en/latest/#more-about-linkcol
https://github.com/deadlyraptor/boar/blob/develop/boar/tables.py
class Bookings(Table):
classes = ['table', 'table-bordered', 'table-striped']
film = Col('Film')
start_date = DateCol('Start Date')
end_date = DateCol('End Date')
program = Col('Program')
guarantee = Col('Guarantee')
percentage = Col('Percentage')
gross = Col('Gross')
view_results = LinkCol('View Results', 'booking_bp.booking_results',
url_kwargs=dict(id='id'))
distributor = Col('Distributor')
enter_payment = LinkCol('Enter Payment', 'payments_bp.new_payment',
url_kwargs=dict(id='id'))
view_payments = LinkCol('View Payments', 'payments_bp.view_payments',
url_kwargs=dict(id='id'))
update = LinkCol('Update', 'booking_bp.update_booking',
url_kwargs=dict(id='id'),
anchor_attrs={'type': 'button',
'class': 'btn btn-primary'})
delete = LinkCol('Delete', 'booking_bp.delete_booking',
url_kwargs=dict(id='id'),
anchor_attrs={'type': 'button',
'class': 'btn btn-danger'})
@dashboard_bp.route('/admin_update_order_status_table/<id>/<status>', methods=['POST', 'GET'])
def admin_update_order_status_table(id, status):
if request.method == 'POST':
print(id)
print(status)
'''Pending --> Shipped'''
if status == 'Pending': MOrder.objects(id=id).update(status='Shipped')
# Shipped or Cancelled ---> Pending
# Which can then be changed to shipped again.
elif status == 'Shipped' or status == 'Cancelled': MOrder.objects(id=id).update(status='Pending')
return redirect(request.referrer)
from flask_table import (Table,
Col,
LinkCol,
ButtonCol,
create_table,
DatetimeCol,
DateCol,
BoolCol)
class WeazyInvoice(Table):
# classes = app.config['TABLE_CSS']
sku = Col('Unit #')
description = Col('Shipping Address')
price = Col('Price')
quantity = Col('Quantity')
sub_total = Col('Sub Total')
q = MOrder.objects(invoice_num='43544').all()
df = pd.DataFrame([(
d.invoice_num, d.sku, d.description, d.quantity, d.price) for d in q],
columns=['invoice_num', 'sku', 'description', 'quantity', 'price'])
df['sub_total'] = df['price'] * df['quantity']
out_df = df.to_dict(orient='records')
w = WeazyInvoice(out_df)
print(w.__html__())
order_shipment = OrderShipment.objects.all()
def quick_table(mongo_q):
lst = []
for mongo_list in mongo_q:
mongo_json = mongo_list.to_json()
load_top = loads(mongo_json)
lst.append(load_top)
df = json_normalize(lst)
df = df.to_html(classes=['table', 'table-hover',
'table-dark', 'table-sm'],
index=False)
return df
pip install Babel dnspython email-validator Flask cssmin lesscpy jsmin Flask-Assets Flask-Babel Flask-Login Flask-Mail Flask-Table Flask-WTF WTForms Flask-Session flask-mongoengine mongoengine gunicorn pandas requests redis pdfkit gspread python-dotenv pusher
Disable toolbar
https://community.plotly.com/t/is-it-possible-to-hide-the-floating-toolbar/4911/2
https://plotly.com/python/reference/#layout-modebar
.modebar {
display: none !important;
}
application/dashboard_bp
@dashboard_bp.route('/dashboard', methods=['GET', 'POST'])
@login_required
def dashboard():
customer_orders = MOrder.objects.get_dashboard_orders(order_status_codes['stage_5'],
order_status_codes['cancelled'])
df = DataFrame([(d.invoice_num, d.status, d.sku, d.date_submitted) for d in customer_orders], columns=['invoice_num', 'status', 'sku', 'date_submitted'])
df['date_submitted'] = to_datetime(df['date_submitted']).dt.date
fig1 = value_count_chart(df['status'], 'Orders Status (Count)')
fig2 = value_count_chart(df['date_submitted'], 'Order Creation Date (Count)')
fig3 = value_count_chart(df['sku'], 'SKU (Count)')
fig4 = value_count_chart(df['invoice_num'], 'Invoice # (Count)')
return render_template('dashboard.html',
title=f'{current_user.email} | Dashboard',
template='dashboard-static account',
body="Current Orders",
chart_container="col-3",
chart_col_width="plotly-container",
o_stat=order_status_codes,
customer_orders=customer_orders,
plot=fig1,
plot2=fig2,
plot3=fig3,
plot4=fig4)
Fragment using bootstrap4.
<div class="{{chart_container}}">
<div class="{{chart_col_width}}">
<div class="chart" id="bargraph">
<script>
var graphs = {{plot | safe}};
Plotly.plot('bargraph',graphs);
</script>
</div>
</div>
</div>
<div class="{{chart_container}}">
<div class="{{chart_col_width}}">
<div class="chart" id="bargraph2">
<script>
var graphs = {{plot2 | safe}};
Plotly.plot('bargraph2',graphs);
</script>
</div>
</div>
</div>
<div class="{{chart_container}}">
<div class="{{chart_col_width}}">
<div class="chart" id="bargraph3">
<script>
var graphs = {{plot3 | safe}};
Plotly.plot('bargraph3',graphs);
</script>
</div>
</div>
</div>
<div class="{{chart_container}}">
<div class="{{chart_col_width}}">
<div class="chart" id="bargraph4">
<script>
var graphs = {{plot4 | safe}};
Plotly.plot('bargraph4', graphs);
</script>
</div>
</div>
</div>
<td>
<form class="f_form">
<input id="fid{{product.sku}}" type="text" i_sku="{{product.sku}}" class="form-control f_class">
</form>
</td>
<script>
const todays_date = Date.now();
$( ".f_class" ).datepicker({
setDate: todays_date,
dateFormat: 'yy-mm-dd'});
</script>
or
<button onclick="post_sku_date();" type="submit" class="btn btn-sm">Complete Checkout</button>
<script>
function post_sku_date() {
const a = document.getElementsByClassName('f_class');
const dates = [];
const sku = [];
const d = {dates, sku}
for (let i = 0; i < a.length; i++) {
dates.push(a[i].value);
sku.push(a[i].getAttribute('i_sku'))
}
console.log(dates);
console.log(sku);
console.log(d);
fetch(`${window.origin}/complete_order`, {
method: "POST",
mode: 'cors',
// credentials: 'same-origin',
credentials: "include",
body: JSON.stringify(d),
cache: "no-cache",
headers: new Headers({
"content-type": "application/json",
// 'Location': `${window.origin}/dashboard`,
}),
redirect: 'follow',
referrerPolicy: 'no-referrer'
}).then(function (response) {
if (response.status !== 303) {
console.log(`Looks like there was a problem. Status code: ${response.status}`);
return;
}
response.json().then(function (data) {
console.log(data);
});
})
.catch(function (error) {
console.log("Fetch error: " + error);
});
}
// post_sku_date()
</script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.5.1/jquery.min.js"></script>
<script>
$(document).ready(function () {
$('.updateButton').on('click', function () {
var member_id = $(this).attr('member_id');
var quant = $('#quantityInput' + member_id).val();
var sess = $('#sess' + member_id).val();
req = $.ajax({
url: '/cart',
type: 'POST',
data: {
sess: sess,
quant: quant,
id: member_id
}
});
req.done(function (data) {
$('#memberSection' + member_id).fadeOut(500).fadeIn(500);
$('#newQuantity' + member_id).text(data.new_quantity);
$('#cartSize').text(data.cart_size);
});
});
});
</script>
Snippet 2
<script>
function submit_message() {
var name = document.getElementById("qty");
var message = document.getElementById("hidden");
var entry = {
name: name.value,
message: message.value
};
fetch(`${window.origin}/process`, {
method: "POST",
credentials: "include",
body: JSON.stringify(entry),
cache: "no-cache",
headers: new Headers({
"content-type": "application/json"
})
})
.then(function (response) {
if (response.status !== 200) {
console.log(`Looks like there was a problem. Status code: ${response.status}`);
return;
}
response.json().then(function (data) {
console.log(data);
});
})
.catch(function (error) {
console.log("Fetch error: " + error);
});
}
</script>
<script>
$(function(){
$('button').click(function(){
// var qty = $('#qty').val();
// var hidden = $('#hidden').val();
$.ajax({
url: '/process',
data: {
qty: $('#qty').val(),
hidden: $('#hidden').val(),
},
type: 'POST',
success: function(response){
console.log(response);
},
error: function(error){
console.log(error);
}
});
});
});
</script>
<script>
$(document).ready(function() {
$('form').on('submit', function(event) {
$.ajax({
data : {
qty: $('#qty').val(),
hidden: $('#hidden').val(),
},
type : 'POST',
url : '/process'
})
.done(function(data) {
$('#output').text(data.output).show();
});
event.preventDefault();
});
});
</script>
https://jsfiddle.net/x98ug0ar/8/
https://redislabs.com/blog/cache-vs-session-store/
http://brunorocha.org/python/flask/using-flask-cache.html
https://flask.palletsprojects.com/en/1.1.x/patterns/favicon/
It should be 16 × 16 pixels and in the ICO file format.
This is not a requirement but a de-facto standard supported by all relevant browsers.
Put the icon in your static directory as favicon.ico.
Now, to get browsers to find your icon, the correct way is to add a link tag in your HTML.
So, for example:
<link rel="shortcut icon" href="{{ url_for('static', filename='favicon.ico') }}">
Resources:
https://www.favicon-generator.org/
http://tools.dynamicdrive.com/favicon/
There are a number of ways to render markdown in flask.
Surprisingly, there are a number of libraries for working with markdown in python, but only a few libraries for rendering markdown into a webpage with flask.
Almost all of the flask/markdown specific labraries are extremely old, and they all have terrrible documentation. I find this very hard to understand given the popularity of this framework.
https://markdown-it-py.readthedocs.io/en/latest/
Stackexchange and Stack Overflow are both switching to Commonmark as noted here: https://meta.stackexchange.com/questions/348746/were-switching-to-commonmark?cb=1
This is the most advanced markdown renderer for python as far as I can tell.
It's based on Commonmark and is fast. Although it's not a flask extension, it's so easy to implement you wont need anythin else.
Step 1:
The options you add here are up to you, this was just copied from the docs.
from flask import (render_template, url_for, flash, jsonify,
redirect, request, abort, Blueprint, Markup)
from markdown_it import MarkdownIt
from markdown_it.extensions.front_matter import front_matter_plugin
from markdown_it.extensions.footnote import footnote_plugin
from markdown_it.presets import zero
zero.make()
md = (
MarkdownIt()
.use(front_matter_plugin)
.use(footnote_plugin)
.disable('image')
.enable('table')
)
Step 2:
Here i'm querying a sql db with the already created markdown blog post.
@blog_bp.route("/blog/<int:blog_id>")
def blog_post(blog_id):
post = Post.query.get_or_404(blog_id)
_markup = md.render(post.content)
cont = Markup(_markup)
return render_template('post_page.html',
title=post.title,
post=post,
cont=cont)
Step 3:
Use from flask import Markup
to inject the rendered markdown into the template.
<p class="card-text">{{ cont }}</p>
You could also not use markup using the jinja2 safe
<p class="card-text">{{ cont| safe }}</p>
Step 4:
Add highlighjs in the header of course, for code blocks.
<head>
<link rel="stylesheet" href="{{ url_for('static', filename='highlightjs_styles/monokai.css') }}">
<script src="{{ url_for('static', filename='highlight.pack.js') }}"></script>
<script>hljs.initHighlightingOnLoad();</script>
</head>
Note:
Here is the security note from markdown-it-py
https://github.com/executablebooks/markdown-it-py/blob/master/docs/security.md
Auto-escaping with Jinja2
https://jinja.palletsprojects.com/en/2.11.x/templates/#working-with-automatic-escaping
Preventing XSS:
https://flask.palletsprojects.com/en/1.1.x/security/#content-security-policy-csp
You must be careful as this could create an XSS attack vulnerability. Here is a good explanation of this.
https://tedboy.github.io/flask/flask_doc.templating.html#controlling-autoescaping
Autoescaping is the concept of automatically escaping special characters for you.
Special characters in the sense of HTML (or XML, and thus XHTML) are &, >, <, " as well as '.
Because these characters carry specific meanings in documents
on their own you have to replace them by so called “entities” if you want to use them for text.
Not doing so would not only cause user frustration by the inability to use these characters in text,
but can also lead to security problems.
(see Cross-Site Scripting (XSS))
Sometimes however you will need to disable autoescaping in templates.
This can be the case if you want to explicitly inject HTML into pages,
for example if they come from a system that generates secure HTML like a markdown to HTML converter.
There are three ways to accomplish that:
In the Python code, wrap the HTML string in a Markup object before passing it to the template.
This is in general the recommended way.
Inside the template, use the |safe filter to explicitly mark a string as safe HTML ({{ myvariable|safe }})
Temporarily disable the autoescape system altogether.
To disable the autoescape system in templates, you can use the {% autoescape %} block:
{% autoescape false %}
<p>autoescaping is disabled here
<p>{{ will_not_be_escaped }}
{% endautoescape %}
Step 1:
import markdown
from markdown.extensions import fenced_code, codehilite
import houdini as h
import misaka as m
from pygments import highlight
from pygments.formatters import HtmlFormatter, ClassNotFound
from pygments.lexers import get_lexer_by_name
class HighlighterRenderer(m.HtmlRenderer):
def blockcode(self, text, lang):
try:
lexer = get_lexer_by_name(lang, stripall=True)
except ClassNotFound:
lexer = None
if lexer:
formatter = HtmlFormatter()
return highlight(text, lexer, formatter)
return '\n<pre><code>{}</code></pre>\n'.format(h.escape_html(text.strip()))
Step 2:
@blog_bp.route("/blog/<int:blog_id>")
def blog_post(blog_id):
post = Post.query.get_or_404(blog_id)
renderer = HighlighterRenderer()
md = m.Markdown(renderer, extensions=('fenced-code',))
_markdown = md(post.content)
cont = Markup(_markdown)
return render_template('post_page.html',
title=post.title,
post=post,
cont=cont)
@blog_bp.route("/blog/<int:blog_id>")
def blog_post(blog_id):
post = Post.query.get_or_404(blog_id)
md_template_string = markdown.markdown(post.content, extensions=["fenced_code"])
formatter = HtmlFormatter(style="emacs", full=True, cssclass="codehilite")
css_string = formatter.get_style_defs()
md_css_string = "<style>" + css_string + "</style>"
md_template = md_css_string + md_template_string
return render_template('post_page.html',
title=post.title,
post=post,
post_cont=md_template)
{% if current_user.role == 'user' %}
<td>
<div class="ml-2 p-2">
<span class="badge badge-pill badge-danger" style="color: white;">
{{ order.quantity_fulfilled_count }}
</span>
</div>
</td>
<td>
<div>
<form hx-post="/quantity_fulfilled_count" hx-target='#order_count{{ order.id }}'>
<input type="hidden" name="up_quantity" value="{{ order.id }}" />
<button type="submit" name="up_quantity" class="btn btn-sm ml-4">
<i class="arrow-up-fas fa-5x pb-2 pt-2"></i>
</button>
</form>
</div>
<div id='order_count{{ order.id }}' class="ml-4 p-3">
<span style="color: white;">
{{ order.quantity_fulfilled_count }}
</span>
</div>
<div>
<form hx-post="/quantity_fulfilled_count" hx-target='#order_count{{ order.id }}'>
<input type="hidden" name="down_quantity" value="{{ order.id }}" />
<button type="submit" name="down_quantity" class="btn btn-sm ml-4">
<i class="arrow-down-fas fa-5x"></i>
</button>
</form>
</div>
</td>
from flask import render_template_string
quantity_count_template = """
<span>{{quantity_count}}</span>
"""
@dashboard_bp.route("/quantity_fulfilled_count", methods=['POST'])
def quantity_fulfilled_count():
up_count_order_id = request.form.get('up_quantity')
down_count_order_id = request.form.get('down_quantity')
if request.method == 'POST' and up_count_order_id:
order = MOrder.objects(id=up_count_order_id)
inc_modified_order = order.modify(inc__quantity_fulfilled_count=1,
new=True)
return render_template_string(quantity_count_template,
quantity_count=inc_modified_order.quantity_fulfilled_count)
if request.method == 'POST' and down_count_order_id:
order = MOrder.objects(id=down_count_order_id)
dec_modified_order = order.modify(dec__quantity_fulfilled_count=1,
new=True)
return render_template_string(quantity_count_template,
quantity_count=dec_modified_order.quantity_fulfilled_count)
https://blog.miguelgrinberg.com/post/celery-and-the-flask-application-factory-pattern/page/0
Two ways to initialize an object in the flask application context
Example 1:
This attaches the _celery
object to the current_app
object.
~/application/__init__.py
def create_app(config_class=Config):
app = Flask(__name__, instance_relative_config=False)
app.config.from_object(config_class)
app.register_error_handler(404, page_not_found)
app.config.update(PERMANENT_SESSION_LIFETIME=3500)
app.config.update(SENDFILE_MAX_AGE_DEFAULT=3)
app.config['CELERY_BROKER_URL'] = app.config['MONGO_URI']
app.config['CELERY_RESULT_BACKEND'] = app.config['MONGO_URI']
app._celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
app._celery.conf.update(app.config)
return app
Example 2:
~/application/__init__.py
from celery import Celery
_celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)
def create_app(config_class=Config):
app = Flask(__name__, instance_relative_config=False)
app.config.from_object(config_class)
app.register_error_handler(404, page_not_found)
app.config.update(PERMANENT_SESSION_LIFETIME=3500)
app.config.update(SENDFILE_MAX_AGE_DEFAULT=3)
_celery.conf.update(app.config)
return app
Triggering Requests
By default, AJAX requests are triggered by the "natural" event of an element:
input, textarea & select are triggered on the change event
form is triggered on the submit event
everything else is triggered by the click event
If you want different behavior you can use the hx-trigger attribute to specify which event will cause the request.
<td>
<div hx-put="/checkout_htmx">
Put To Messages
</div>
</td>
@main_bp.route('/checkout_htmx', methods=['PUT'])
def checkout_htmx():
return {'Hello': 'World'}
<div class=" card col s12">
<h3>
Search Contacts
<span class="htmx-indicator">
<div class="spinner-border text-danger" role="status">
<span class="sr-only">Loading...</span>
</div>
</span>
</h3>
<input class="form-control" type="text"
name="search" placeholder="Begin Typing To Search Users..."
hx-post="/search_users"
hx-trigger="keyup changed delay:500ms"
hx-target="#search-results"
hx-indicator=".htmx-indicator">
<table class="table">
<thead>
<tr>
<th>First Name</th>
<th>Last Name</th>
<th>Email</th>
</tr>
</thead>
<tbody id="search-results">
</tbody>
</table>
</div>
https://github.com/miguelgrinberg/Flask-SocketIO/tree/master/example
<head>
<meta name="viewport" content="width=device-width, initial-scale=1">
<script src="//code.jquery.com/jquery-1.12.4.min.js" integrity="sha256-ZosEbRLbNQzLpnKIkEdrPv7lOy9C27hHQ+Xp8a4MxAQ=" crossorigin="anonymous"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/socket.io/2.2.0/socket.io.js" integrity="sha256-yr4fRk/GU1ehYJPAs8P4JlTgu0Hdsp4ZKrx8bDEDC3I=" crossorigin="anonymous"></script>
</head>
class RequestResetForm(FlaskForm):
# class RequestResetForm(Form):
email = StringField('Email',
validators=[DataRequired('Please enter your account email.'),
Email('Please enter a valid email.')
]
)
submit = SubmitField('Request Password Reset')
def validate_email(self, email):
user = User.query.filter_by(email=email.data).first()
if user is None:
raise ValidationError('There is no account with that email. You must register first.')
@auth_bp.route("/reset_password", methods=['GET', 'POST'])
def reset_request():
if current_user.is_authenticated:
return redirect(url_for('auth_bp.dashboard'), 307)
form = RequestResetForm()
# form = RequestResetForm(request.form)
if form.validate_on_submit():
# email = request.form.get('email')
# user = User.query.filter_by(email=email).first()
user = User.query.filter_by(email=form.email.data).first()
send_reset_email(user)
flash('An email has been sent with instructions to reset your password.', 'info')
return redirect(url_for('auth_bp.login_page'))
https://stackoverflow.com/questions/18290142/multiple-forms-in-a-single-page-using-flask-and-wtforms
https://developer.mozilla.org/en-US/docs/Web/HTML/Element/select
https://stackoverflow.com/questions/32019733/getting-value-from-select-tag-using-flask
from flask import Flask, flash, redirect, render_template, \
request, url_for
app = Flask(__name__)
@app.route('/')
def index():
return render_template(
'index.html',
data=[{'name':'red'}, {'name':'green'}, {'name':'blue'}])
@app.route("/test" , methods=['GET', 'POST'])
def test():
select = request.form.get('comp_select')
return(str(select)) # just to see what select is
if __name__=='__main__':
app.run(debug=True)
<form class="form-inline" method="POST" action="{{ url_for('test') }}">
<div class="form-group">
<div class="input-group">
<span class="input-group-addon">Please select</span>
<select name="comp_select" class="selectpicker form-control">
{% for o in data %}
<option value="{{ o.name }}">{{ o.name }}</option>
{% endfor %}
</select>
</div>
<button type="submit" class="btn btn-default">Go</button>
</div>
</form>
https://realpython.com/python-web-applications-with-flask-part-ii/#deriving-data-from-visitors
request.referrer contains the URL the request came from, although it might not be sent by the client for various reasons.
The attribute takes its value from the Referer
(not a typo!) header:
referrer = request.headers.get("Referer")
or
referrer = request.referrer
<td>{{order.date_submitted.strftime('%Y-%m-%d')}}</td>
The above is achieves the same as date.today()
from datetime import date
html = render_template('invoices/invoice_weasyprint.html',
todays_date=date.today())
https://stackoverflow.com/questions/3727045/set-variable-in-jinja#4181605
{% set label_cls, field_cls = "col-md-7", "col-md-3" %}
{% set testing = 'it worked' %}
{% set another = testing %}
{{ another }}
parent_list = [{'A': 'val1', 'B': 'val2'}, {'C': 'val3', 'D': 'val4'}]
{% for dict_item in parent_list %}
{% for key, value in dict_item.items() %}
<h1>Key: {{key}}</h1>
<h2>Value: {{value}}</h2>
{% endfor %}
{% endfor %}
Notes:
Because the follow and unfollow actions introduce changes in the application, I'm going to implement them as POST requests, which are triggered from the web browser as a result of submitting a web form.
It would be easier to implement these routes as GET requests, but then they could be exploited in CSRF attacks.
Because GET requests are harder to protect against CSRF, they should only be used on actions that do not introduce state changes.
Implementing these as a result of a form submission is better because then a CSRF token can be added to the form.
Key Points:
followed = db.relationship(
'User',
secondary=followers,
primaryjoin=(followers.c.follower_id == id),
secondaryjoin=(followers.c.followed_id == id),
backref=db.backref('followers', lazy='dynamic'),
lazy='dynamic')
'User'
is the right side entity of the relationship (the left side entity is the parent class). Since this is a self-referential relationship, I have to use the same class on both sides.
secondary configures the association table that is used for this relationship, which I defined right above this class.
primaryjoin
indicates the condition that links the left side entity (the follower user) with the association table. The join condition for the left side of the relationship is the user ID matching the follower_id field of the association table. The followers.c.follower_id expression references the follower_id column of the association table.
secondaryjoin
indicates the condition that links the right side entity (the followed user) with the association table. This condition is similar to the one for primaryjoin, with the only difference that now I'm using followed_id, which is the other foreign key in the association table.
backref
defines how this relationship will be accessed from the right side entity. From the left side, the relationship is named followed, so from the right side I am going to use the name followers to represent all the left side users that are linked to the target user in the right side. The additional lazy argument indicates the execution mode for this query. A mode of dynamic sets up the query to not run until specifically requested, which is also how I set up the posts one-to-many relationship.
lazy
is similar to the parameter of the same name in the backref, but this one applies to the left side query instead of the right side.
user1.followed.append(user2)
user1.followed.remove(user2)
class TheClass(object):
def follow(self, user):
if not self.is_following(user):
self.followed.append(user)
def unfollow(self, user):
if self.is_following(user):
self.followed.remove(user)
def is_following(self, user):
return self.followed.filter(
followers.c.followed_id == user.id).count() > 0
Relational Databases:
come up with a single database query that defines the information that I want to get, and then let the database figure out how to extract that information in the most efficient way.
def followed_posts(self):
return Post.query.join(
followers, (followers.c.followed_id == Post.user_id)).filter(
followers.c.follower_id == self.id).order_by(
Post.timestamp.desc())
Expanded to include your own posts:
def followed_posts(self):
followed = Post.query.join(
followers, (followers.c.followed_id == Post.user_id)).filter(
followers.c.follower_id == self.id)
own = Post.query.filter_by(user_id=self.id)
return followed.union(own).order_by(Post.timestamp.desc())