mirror of
https://github.com/TrentSPalmer/flask_photo_scaling_app.git
synced 2024-11-16 14:41:29 -08:00
296 lines
12 KiB
Python
296 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from pyotp import TOTP
|
|
from app import app, db
|
|
from flask_login import current_user, login_user, logout_user
|
|
from flask import render_template, redirect, url_for, session, flash, request, abort, send_file
|
|
from app.forms import LoginForm, RegistrationForm, ResetPasswordRequestForm, ConfirmTotp
|
|
from app.forms import ResetPasswordForm, EditProfile, ChangePassword, DisableTotp
|
|
from app.forms import GetTotp, UploadPhotoForm, ConfirmPhotoDelete
|
|
from app.models import Contributor, Photo
|
|
from app.email import send_password_reset_email
|
|
from app.scripts.set_contributor_id_seq import set_contributor_id_seq
|
|
from app.scripts.totp_utils import get_totp_qr, validate_totp, disable_2fa
|
|
from app.scripts.process_uploaded_photo import process_uploaded_photo
|
|
from app.scripts.get_photo_list import get_photo_list, get_disk_stats, calc_additional_data, find_next_previous
|
|
from app.scripts.delete_photo import delete_photo
|
|
from werkzeug.utils import secure_filename
|
|
|
|
|
|
@app.route('/download')
|
|
def download():
|
|
if current_user.is_authenticated:
|
|
f = request.args['file']
|
|
try:
|
|
return send_file('/var/lib/photo_app/photos/{}'.format(f), attachment_filename=f)
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
|
|
@app.route('/delete', methods=['GET', 'POST'])
|
|
def delete():
|
|
photo = Photo.query.get(request.args['photo_id'])
|
|
if photo is None:
|
|
return(redirect(url_for('index')))
|
|
if not current_user.is_authenticated or photo.contributor_id != current_user.id:
|
|
return(redirect(url_for('index')))
|
|
form = ConfirmPhotoDelete()
|
|
if request.method == 'POST' and form.validate_on_submit():
|
|
return(redirect(url_for('photo', photo_id=delete_photo(photo, app.config))))
|
|
return(render_template(
|
|
'delete_photo.html',
|
|
title="Delete Photo?",
|
|
photo=photo,
|
|
photo_url=app.config['PHOTO_URL'],
|
|
form=form
|
|
))
|
|
|
|
|
|
@app.route('/photo-upload', methods=['GET', 'POST'])
|
|
def photo_upload():
|
|
if not current_user.is_authenticated:
|
|
return(redirect(url_for('index')))
|
|
form = UploadPhotoForm()
|
|
if request.method == 'POST' and form.validate_on_submit():
|
|
f = request.files['image']
|
|
filename = secure_filename(f.filename)
|
|
if filename != '':
|
|
import os
|
|
file_ext = os.path.splitext(filename)[1]
|
|
if file_ext not in ['.jpg', '.png'] or file_ext != validate_image(f.stream):
|
|
abort(400)
|
|
f.save(os.path.join(app.config['PHOTO_SAVE_PATH'], 'raw_' + filename))
|
|
photo_id = process_uploaded_photo(filename, current_user, app.config)
|
|
print(photo_id)
|
|
flash("Thanks for the new photo!")
|
|
return(redirect(url_for('photo', photo_id=photo_id)))
|
|
return(redirect(url_for('index')))
|
|
return render_template('upload.html', title="Photo Upload", form=form)
|
|
|
|
|
|
@app.route('/photo/<int:photo_id>')
|
|
def photo(photo_id):
|
|
photo = Photo.query.get(photo_id)
|
|
if not current_user.is_authenticated or photo is None:
|
|
return(redirect(url_for('index')))
|
|
find_next_previous(photo, app.config)
|
|
calc_additional_data(photo)
|
|
return render_template(
|
|
'photo.html',
|
|
title="Photo",
|
|
photo=photo,
|
|
photo_url=app.config['PHOTO_URL']
|
|
)
|
|
|
|
|
|
@app.route("/")
|
|
@app.route("/index")
|
|
def index():
|
|
if current_user.is_authenticated:
|
|
photos = get_photo_list(current_user.id, app.config)
|
|
flash(get_disk_stats())
|
|
return(render_template(
|
|
'index.html',
|
|
title="Photos",
|
|
photos=photos,
|
|
photo_url=app.config['PHOTO_URL']
|
|
))
|
|
return render_template('index.html', title="Photos")
|
|
|
|
|
|
def validate_image(stream):
|
|
import imghdr
|
|
header = stream.read(512)
|
|
stream.seek(0)
|
|
format = imghdr.what(None, header)
|
|
if not format:
|
|
return None
|
|
return '.' + (format if format != 'jpeg' else 'jpg')
|
|
|
|
|
|
@app.route("/two-factor-input", methods=["GET", "POST"])
|
|
def two_factor_input():
|
|
if current_user.is_authenticated or 'id' not in session:
|
|
return redirect(url_for('index'))
|
|
contributor = Contributor.query.get(session['id'])
|
|
if contributor is None:
|
|
return redirect(url_for('index'))
|
|
form = GetTotp()
|
|
if form.validate_on_submit():
|
|
if TOTP(contributor.totp_key).verify(int(form.totp_code.data), valid_window=5):
|
|
login_user(contributor, remember=session['remember_me'])
|
|
flash("Congratulations, you are now logged in!")
|
|
return redirect(url_for('index'))
|
|
else:
|
|
flash("Oops, the pin was wrong")
|
|
form.totp_code.data = None
|
|
return render_template('two_factor_input.html', form=form, inst="Code was wrong, try again?")
|
|
return render_template('two_factor_input.html', form=form, inst="Enter Auth Code")
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
form = LoginForm()
|
|
if form.validate_on_submit():
|
|
contributor_by_name = Contributor.query.filter_by(name=form.username.data).first()
|
|
contributor_by_email = Contributor.query.filter_by(email=form.email.data).first()
|
|
if contributor_by_name is not None and contributor_by_name.check_password(form.password.data):
|
|
if contributor_by_name.use_totp:
|
|
session['id'] = contributor_by_name.id
|
|
session['remember_me'] = form.remember_me.data
|
|
return redirect(url_for('two_factor_input'))
|
|
else:
|
|
login_user(contributor_by_name, remember=form.remember_me.data)
|
|
flash("Congratulations, you are now logged in!")
|
|
return redirect(url_for('index'))
|
|
elif contributor_by_email is not None and contributor_by_email.check_password(form.password.data):
|
|
if contributor_by_email.use_totp:
|
|
session['id'] = contributor_by_email.id
|
|
session['remember_me'] = form.remember_me.data
|
|
return redirect(url_for('two_factor_input'))
|
|
else:
|
|
login_user(contributor_by_email, remember=form.remember_me.data)
|
|
flash("Congratulations, you are now logged in!")
|
|
return redirect(url_for('index'))
|
|
else:
|
|
flash("Error Invalid Contributor (Username or Email) or Password")
|
|
return(redirect(url_for('login')))
|
|
return render_template('login.html', title='Sign In', form=form)
|
|
|
|
|
|
@app.route('/disable-totp', methods=['GET', 'POST'])
|
|
def disable_totp():
|
|
if current_user.is_anonymous or not current_user.use_totp:
|
|
return(redirect(url_for('index')))
|
|
contributor = Contributor.query.get(current_user.id)
|
|
form = DisableTotp()
|
|
if form.validate_on_submit():
|
|
if disable_2fa(contributor, app.config):
|
|
flash('2FA Now Disabled')
|
|
return(redirect(url_for('edit_profile')))
|
|
else:
|
|
flash('2FA Not Disabled')
|
|
return(redirect(url_for('edit_profile')))
|
|
return render_template('disable_2fa.html', form=form, title="Disable 2FA")
|
|
|
|
|
|
@app.route('/enable-totp', methods=['GET', 'POST'])
|
|
def enable_totp():
|
|
if current_user.is_anonymous or current_user.use_totp:
|
|
return(redirect(url_for('index')))
|
|
contributor = Contributor.query.get(current_user.id)
|
|
form = ConfirmTotp()
|
|
qr = get_totp_qr(contributor, app.config)
|
|
if form.validate_on_submit():
|
|
if contributor.use_totp:
|
|
flash('2FA Already Enabled')
|
|
return(redirect(url_for('edit_profile')))
|
|
if validate_totp(contributor, form.totp_code.data, app.config):
|
|
flash('2FA Now Enabled')
|
|
return(redirect(url_for('edit_profile')))
|
|
else:
|
|
flash("TOTP Code didn't validate, rescan and try again")
|
|
return(redirect(url_for('edit_profile')))
|
|
return render_template('qr.html', qr=qr, form=form, title="Aunthentication Code")
|
|
|
|
|
|
@app.route("/change-password", methods=["GET", "POST"])
|
|
def change_password():
|
|
if not current_user.is_authenticated:
|
|
return(redirect(url_for('index')))
|
|
contributor = Contributor.query.get(current_user.id)
|
|
form = ChangePassword()
|
|
if form.validate_on_submit():
|
|
if contributor.check_password(form.password.data):
|
|
contributor.set_password(form.new_password.data)
|
|
db.session.commit()
|
|
flash("Thanks for the update!")
|
|
return(redirect(url_for('index')))
|
|
else:
|
|
flash("Error Invalid Password")
|
|
return(redirect(url_for('change_password')))
|
|
return render_template('change_password.html', title='Change Password', form=form)
|
|
|
|
|
|
@app.route("/edit-profile", methods=["GET", "POST"])
|
|
def edit_profile():
|
|
if current_user.is_anonymous:
|
|
return(redirect(url_for('index')))
|
|
contributor = Contributor.query.get(current_user.id)
|
|
form = EditProfile()
|
|
if request.method == 'GET':
|
|
form.username.data = contributor.name
|
|
form.email.data = contributor.email
|
|
if form.validate_on_submit():
|
|
if contributor.check_password(form.password.data):
|
|
contributor.name = form.username.data
|
|
contributor.email = form.email.data
|
|
db.session.commit()
|
|
flash("Thanks for the update!")
|
|
return(redirect(url_for('index')))
|
|
else:
|
|
flash("Error Invalid Password")
|
|
return(redirect(url_for('edit_profile')))
|
|
return render_template('edit_profile.html', title='Edit Profile', form=form, contributor_use_totp=contributor.use_totp)
|
|
|
|
|
|
@app.route('/reset-password/<token>', methods=['GET', 'POST'])
|
|
def reset_password(token):
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
contributor = Contributor.verify_reset_password_token(token)
|
|
if not contributor:
|
|
return redirect(url_for('index'))
|
|
form = ResetPasswordForm()
|
|
if form.validate_on_submit():
|
|
contributor.set_password(form.password.data)
|
|
db.session.commit()
|
|
flash('Your password has been reset.')
|
|
return redirect(url_for('login'))
|
|
return render_template('reset_password.html', title="New Password?", form=form)
|
|
|
|
|
|
@app.route('/reset-password-request', methods=['GET', 'POST'])
|
|
def reset_password_request():
|
|
if current_user.is_authenticated:
|
|
return(redirect(url_for('index')))
|
|
else:
|
|
form = ResetPasswordRequestForm()
|
|
if form.validate_on_submit():
|
|
contributor = Contributor.query.filter_by(email=form.email.data).first()
|
|
if contributor:
|
|
send_password_reset_email(contributor, app.config['EXTERNAL_URL'])
|
|
flash('Check your email for the instructions to reset your password')
|
|
return redirect(url_for('login'))
|
|
else:
|
|
flash('Sorry, invalid email')
|
|
return redirect(url_for('login'))
|
|
return render_template('reset_password_request.html', title='Reset Password', form=form)
|
|
|
|
|
|
@app.route("/register", methods=["GET", "POST"])
|
|
def register():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
form = RegistrationForm()
|
|
if form.validate_on_submit():
|
|
set_contributor_id_seq(app.config)
|
|
contributor = Contributor(name=form.username.data, num_photos=0, email=form.email.data)
|
|
contributor.set_password(form.password.data)
|
|
db.session.add(contributor)
|
|
db.session.commit()
|
|
flash("Congratulations, you are now a registered user!")
|
|
return redirect(url_for('login'))
|
|
return render_template('register.html', title='Register', form=form)
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
is_authenticated = current_user.is_authenticated
|
|
logout_user()
|
|
if is_authenticated:
|
|
flash("Congratulations, you are now logged out!")
|
|
return redirect(url_for('index'))
|