Compare commits

..

17 Commits

Author SHA1 Message Date
c936756f7d update README.md 2025-02-11 17:33:12 -08:00
f0d0d1479c update app/photo_routes/scripts/get_exif_data.py to account
for new exif data type of IFDRational, instead of tuple for
some exif attributes
2025-02-10 16:27:51 -08:00
7997202d77 fix style app/photo_routes/scripts/process_uploaded_photo.py 2025-02-09 18:08:03 -08:00
533a82f314 fix style app/photo_routes/scripts/get_exif_data.py 2025-02-09 17:51:25 -08:00
a2de385638 fix style app/photo_routes/proutes.py 2025-02-09 17:33:26 -08:00
6b27846388 fix style app/photo_routes/photox.py 2025-02-09 17:31:44 -08:00
13e6f98c5f fix style app/photo_routes/photo_upload.py 2025-02-09 17:22:16 -08:00
b3f0de046b fix style app/photo_routes/delete_download.py 2025-02-09 17:19:38 -08:00
bab12d33e9 fix style app/auth/totp.py 2025-02-09 17:13:58 -08:00
5eae85ca4a fix style app/auth/reset_password.py 2025-02-09 17:12:51 -08:00
72a8b46375 fix style app/auth/register.py 2025-02-09 17:08:42 -08:00
4cb734be40 fix style app/auth/profile.py 2025-02-09 17:03:51 -08:00
4b67a96f56 fix style app/auth/email.py 2025-02-09 17:03:05 -08:00
c8d06abbe2 fix style app/auth/auth.py 2025-02-09 16:58:20 -08:00
cec0716bc3 fix style app/sendxmpp_handler.py 2025-02-09 16:52:30 -08:00
0a6c6a2afd fix style app/models.py 2025-02-09 16:50:39 -08:00
86d482691b fix style app/forms.py 2025-02-09 16:48:06 -08:00
16 changed files with 368 additions and 95 deletions

View File

@ -85,3 +85,6 @@ For 2fa, you can use an authenticator application such as
2. install certbot and get certs for each of the two subdomains 2. install certbot and get certs for each of the two subdomains
2. install service file in `/etc/systemd/system` 2. install service file in `/etc/systemd/system`
2. enable and start systemd service 2. enable and start systemd service
## Debian 12 Upgrade
2. wtforms.validators now requires `python3-wtforms-components` for the email validator

View File

@ -20,15 +20,25 @@ def two_factor_input():
return redirect(url_for('proute.index')) return redirect(url_for('proute.index'))
form = GetTotp() form = GetTotp()
if form.validate_on_submit(): if form.validate_on_submit():
if TOTP(contributor.totp_key).verify(int(form.totp_code.data), valid_window=5): if TOTP(
contributor.totp_key,
).verify(int(form.totp_code.data), valid_window=5):
login_user(contributor, remember=session['remember_me']) login_user(contributor, remember=session['remember_me'])
flash("Congratulations, you are now logged in!") flash("Congratulations, you are now logged in!")
return redirect(url_for('proute.index')) return redirect(url_for('proute.index'))
else: else:
flash("Oops, the pin was wrong") flash("Oops, the pin was wrong")
form.totp_code.data = None form.totp_code.data = None
return render_template('two_factor_input.html', form=form, inst="Code was wrong, try again?") return render_template(
return render_template('two_factor_input.html', form=form, inst="Enter Auth Code") 'two_factor_input.html',
form=form,
inst="Code was wrong, try again?",
)
return render_template(
'two_factor_input.html',
form=form,
inst="Enter Auth Code",
)
@auths.route("/login", methods=["GET", "POST"]) @auths.route("/login", methods=["GET", "POST"])
@ -37,9 +47,14 @@ def login():
return redirect(url_for('proute.index')) return redirect(url_for('proute.index'))
form = LoginForm() form = LoginForm()
if form.validate_on_submit(): if form.validate_on_submit():
contributor_by_name = Contributor.query.filter_by(name=form.username.data).first() contributor_by_name = Contributor.query.filter_by(
contributor_by_email = Contributor.query.filter_by(email=form.email.data).first() name=form.username.data,
if contributor_by_name is not None and contributor_by_name.check_password(form.password.data): ).first()
contributor_by_email = Contributor.query.filter_by(
email=form.email.data,
).first()
cbn, cbe = contributor_by_name, contributor_by_email
if cbn is not None and cbn.check_password(form.password.data):
if contributor_by_name.use_totp: if contributor_by_name.use_totp:
session['id'] = contributor_by_name.id session['id'] = contributor_by_name.id
session['remember_me'] = form.remember_me.data session['remember_me'] = form.remember_me.data
@ -48,13 +63,16 @@ def login():
login_user(contributor_by_name, remember=form.remember_me.data) login_user(contributor_by_name, remember=form.remember_me.data)
flash("Congratulations, you are now logged in!") flash("Congratulations, you are now logged in!")
return redirect(url_for('proute.index')) return redirect(url_for('proute.index'))
elif contributor_by_email is not None and contributor_by_email.check_password(form.password.data): elif cbe is not None and cbe.check_password(form.password.data):
if contributor_by_email.use_totp: if contributor_by_email.use_totp:
session['id'] = contributor_by_email.id session['id'] = contributor_by_email.id
session['remember_me'] = form.remember_me.data session['remember_me'] = form.remember_me.data
return redirect(url_for('auths.two_factor_input')) return redirect(url_for('auths.two_factor_input'))
else: else:
login_user(contributor_by_email, remember=form.remember_me.data) login_user(
contributor_by_email,
remember=form.remember_me.data,
)
flash("Congratulations, you are now logged in!") flash("Congratulations, you are now logged in!")
return redirect(url_for('proute.index')) return redirect(url_for('proute.index'))
else: else:

View File

@ -8,13 +8,23 @@ from threading import Thread
def send_password_reset_email(contributor, external_url): def send_password_reset_email(contributor, external_url):
token = contributor.get_reset_password_token() token = contributor.get_reset_password_token()
send_email('Photo App Reset Your Password', send_email(
'Photo App Reset Your Password',
sender=current_app.config['MAIL_ADMINS'][0], sender=current_app.config['MAIL_ADMINS'][0],
recipients=[contributor.email], recipients=[contributor.email],
text_body=render_template('email/reset_password_email_text.txt', text_body=render_template(
contributor=contributor, token=token, external_url=external_url), 'email/reset_password_email_text.txt',
html_body=render_template('email/reset_password_email_html.html', contributor=contributor,
contributor=contributor, token=token, external_url=external_url)) token=token,
external_url=external_url,
),
html_body=render_template(
'email/reset_password_email_html.html',
contributor=contributor,
token=token,
external_url=external_url,
),
)
def send_async_email(app, msg): def send_async_email(app, msg):
@ -26,4 +36,7 @@ def send_email(subject, sender, recipients, text_body, html_body):
msg = Message(subject, sender=sender, recipients=recipients) msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body msg.body = text_body
msg.html = html_body msg.html = html_body
Thread(target=send_async_email, args=(current_app._get_current_object(), msg)).start() Thread(
target=send_async_email,
args=(current_app._get_current_object(), msg),
).start()

View File

@ -26,7 +26,11 @@ def change_password():
else: else:
flash("Error Invalid Password") flash("Error Invalid Password")
return(redirect(url_for('prof.change_password'))) return(redirect(url_for('prof.change_password')))
return render_template('change_password.html', title='Change Password', form=form) return render_template(
'change_password.html',
title='Change Password',
form=form,
)
@prof.route("/edit-profile", methods=["GET", "POST"]) @prof.route("/edit-profile", methods=["GET", "POST"])

View File

@ -17,9 +17,15 @@ def register():
return redirect(url_for('proute.index')) return redirect(url_for('proute.index'))
form = RegistrationForm() form = RegistrationForm()
if form.validate_on_submit(): if form.validate_on_submit():
db.engine.execute("SELECT setval('contributor_id_seq', (SELECT MAX(id) FROM contributor))") my_sql = "SELECT setval('contributor_id_seq', "
my_sql += "(SELECT MAX(id) FROM contributor))"
db.engine.execute(my_sql)
db.session.commit() db.session.commit()
contributor = Contributor(name=form.username.data, num_photos=0, email=form.email.data) contributor = Contributor(
name=form.username.data,
num_photos=0,
email=form.email.data,
)
contributor.set_password(form.password.data) contributor.set_password(form.password.data)
db.session.add(contributor) db.session.add(contributor)
db.session.commit() db.session.commit()

View File

@ -1,6 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from flask import Blueprint, redirect, url_for, flash, render_template, current_app from flask import (
Blueprint, redirect, url_for, flash, render_template, current_app
)
from flask_login import current_user from flask_login import current_user
from app.models import Contributor from app.models import Contributor
from app.forms import ResetPasswordForm, ResetPasswordRequestForm from app.forms import ResetPasswordForm, ResetPasswordRequestForm
@ -25,7 +27,11 @@ def reset_password(token):
db.session.commit() db.session.commit()
flash('Your password has been reset.') flash('Your password has been reset.')
return redirect(url_for('auths.login')) return redirect(url_for('auths.login'))
return render_template('reset_password.html', title="New Password?", form=form) return render_template(
'reset_password.html',
title="New Password?",
form=form,
)
@pwd.route('/reset-password-request', methods=['GET', 'POST']) @pwd.route('/reset-password-request', methods=['GET', 'POST'])
@ -35,12 +41,23 @@ def reset_password_request():
else: else:
form = ResetPasswordRequestForm() form = ResetPasswordRequestForm()
if form.validate_on_submit(): if form.validate_on_submit():
contributor = Contributor.query.filter_by(email=form.email.data).first() contributor = Contributor.query.filter_by(
email=form.email.data,
).first()
if contributor: if contributor:
send_password_reset_email(contributor, current_app.config['EXTERNAL_URL']) send_password_reset_email(
flash('Check your email for the instructions to reset your password') contributor,
current_app.config['EXTERNAL_URL'],
)
my_flash = 'Check your email for the instructions '
my_flash += 'to reset your password'
flash(my_flash)
return redirect(url_for('auths.login')) return redirect(url_for('auths.login'))
else: else:
flash('Sorry, invalid email') flash('Sorry, invalid email')
return redirect(url_for('auths.login')) return redirect(url_for('auths.login'))
return render_template('reset_password_request.html', title='Reset Password', form=form) return render_template(
'reset_password_request.html',
title='Reset Password',
form=form,
)

View File

@ -32,7 +32,12 @@ def enable_totp():
else: else:
flash("TOTP Code didn't validate, rescan and try again") flash("TOTP Code didn't validate, rescan and try again")
return(redirect(url_for('prof.edit_profile'))) return(redirect(url_for('prof.edit_profile')))
return render_template('qr.html', qr=qr, form=form, title="Aunthentication Code") return render_template(
'qr.html',
qr=qr,
form=form,
title="Aunthentication Code",
)
def get_totp_qr(contributor): def get_totp_qr(contributor):
@ -40,7 +45,9 @@ def get_totp_qr(contributor):
contributor.totp_key = pyotp.random_base32() contributor.totp_key = pyotp.random_base32()
db.session.commit() db.session.commit()
totp_uri = pyotp.totp.TOTP(contributor.totp_key).provisioning_uri(name=contributor.email, issuer_name='Photo App') totp_uri = pyotp.totp.TOTP(
contributor.totp_key,
).provisioning_uri(name=contributor.email, issuer_name='Photo App')
img = qrcode.make(totp_uri, image_factory=qrcode.image.svg.SvgPathImage) img = qrcode.make(totp_uri, image_factory=qrcode.image.svg.SvgPathImage)
f = BytesIO() f = BytesIO()
img.save(f) img.save(f)

View File

@ -2,7 +2,9 @@
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired, Email, Optional, Regexp, ValidationError, EqualTo, Length from wtforms.validators import (
DataRequired, Email, Optional, Regexp, ValidationError, EqualTo, Length
)
from flask_wtf.file import FileField, FileAllowed, FileRequired from flask_wtf.file import FileField, FileAllowed, FileRequired
from app.models import Contributor, EmailWhiteList from app.models import Contributor, EmailWhiteList
from zxcvbn import zxcvbn from zxcvbn import zxcvbn
@ -13,17 +15,38 @@ class ConfirmPhotoDelete(FlaskForm):
class GetTotp(FlaskForm): class GetTotp(FlaskForm):
totp_code = StringField('6-Digit Code?', validators=[DataRequired(), Length(min=6, max=6, message="6 Digits")], render_kw={'autofocus': True}) totp_code = StringField(
'6-Digit Code?',
validators=[DataRequired(), Length(min=6, max=6, message="6 Digits")],
render_kw={'autofocus': True},
)
submit = SubmitField('OK') submit = SubmitField('OK')
class ConfirmTotp(FlaskForm): class ConfirmTotp(FlaskForm):
totp_code = StringField('6-Digit Code?', validators=[DataRequired(), Length(min=6, max=6, message="Rescan And Try Again")], render_kw={'autofocus': True}) totp_code = StringField(
'6-Digit Code?',
validators=[
DataRequired(),
Length(min=6, max=6, message="Rescan And Try Again"),
],
render_kw={'autofocus': True},
)
submit = SubmitField('Enable 2FA') submit = SubmitField('Enable 2FA')
class EditProfile(FlaskForm): class EditProfile(FlaskForm):
username = StringField('Username', validators=[DataRequired(), Regexp('^[a-zA-Z0-9]+$', message='letters and digits only (no spaces)')], render_kw={'autofocus': True}) username = StringField(
'Username',
validators=[
DataRequired(),
Regexp(
'^[a-zA-Z0-9]+$',
message='letters and digits only (no spaces)',
),
],
render_kw={'autofocus': True},
)
email = StringField('Email', validators=[Optional(), Email()]) email = StringField('Email', validators=[Optional(), Email()])
password = PasswordField('Confirm Password', validators=[DataRequired()]) password = PasswordField('Confirm Password', validators=[DataRequired()])
submit = SubmitField('Update Name/Email') submit = SubmitField('Update Name/Email')
@ -44,7 +67,10 @@ class EditProfile(FlaskForm):
class LoginForm(FlaskForm): class LoginForm(FlaskForm):
username = StringField('Username', validators=[Optional()], render_kw={'autofocus': True}) username = StringField(
'Username',
validators=[Optional()], render_kw={'autofocus': True},
)
email = StringField('Email', validators=[Optional(), Email()]) email = StringField('Email', validators=[Optional(), Email()])
password = PasswordField('Password', validators=[DataRequired()]) password = PasswordField('Password', validators=[DataRequired()])
remember_me = BooleanField('Remember Me') remember_me = BooleanField('Remember Me')
@ -52,8 +78,15 @@ class LoginForm(FlaskForm):
class ResetPasswordForm(FlaskForm): class ResetPasswordForm(FlaskForm):
password = PasswordField('Password', validators=[DataRequired(), Length(min=15, )], render_kw={'autofocus': True}) password = PasswordField(
password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')]) 'Password',
validators=[DataRequired(), Length(min=15, )],
render_kw={'autofocus': True},
)
password2 = PasswordField(
'Repeat Password',
validators=[DataRequired(), EqualTo('password')],
)
submit = SubmitField('Request Password Reset') submit = SubmitField('Request Password Reset')
def validate_password(self, password): def validate_password(self, password):
@ -62,10 +95,26 @@ class ResetPasswordForm(FlaskForm):
class RegistrationForm(FlaskForm): class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[DataRequired(), Regexp('^[a-zA-Z0-9]+$', message='letters and digits only (no spaces)')], render_kw={'autofocus': True}) username = StringField(
'Username',
validators=[
DataRequired(),
Regexp(
'^[a-zA-Z0-9]+$',
message='letters and digits only (no spaces)',
),
],
render_kw={'autofocus': True},
)
email = StringField('Email', validators=[DataRequired(), Email()]) email = StringField('Email', validators=[DataRequired(), Email()])
password = PasswordField('Password', validators=[DataRequired(), Length(min=15, )]) password = PasswordField(
password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')]) 'Password',
validators=[DataRequired(), Length(min=15, )],
)
password2 = PasswordField(
'Repeat Password',
validators=[DataRequired(), EqualTo('password')],
)
submit = SubmitField('Register') submit = SubmitField('Register')
def validate_password(self, password): def validate_password(self, password):
@ -78,7 +127,9 @@ class RegistrationForm(FlaskForm):
raise ValidationError('Please use a different username.') raise ValidationError('Please use a different username.')
def validate_email(self, email): def validate_email(self, email):
white_listed_user = EmailWhiteList.query.filter_by(email=email.data).first() white_listed_user = EmailWhiteList.query.filter_by(
email=email.data,
).first()
if white_listed_user is None: if white_listed_user is None:
raise ValidationError('This email address is not authorized.') raise ValidationError('This email address is not authorized.')
user = Contributor.query.filter_by(email=email.data).first() user = Contributor.query.filter_by(email=email.data).first()
@ -87,14 +138,28 @@ class RegistrationForm(FlaskForm):
class ResetPasswordRequestForm(FlaskForm): class ResetPasswordRequestForm(FlaskForm):
email = StringField('Email', validators=[DataRequired(), Email()], render_kw={'autofocus': True}) email = StringField(
'Email',
validators=[DataRequired(), Email()],
render_kw={'autofocus': True},
)
submit = SubmitField('Request Password Reset') submit = SubmitField('Request Password Reset')
class ChangePassword(FlaskForm): class ChangePassword(FlaskForm):
password = PasswordField('Confirm Password', validators=[DataRequired()], render_kw={'autofocus': True}) password = PasswordField(
new_password = PasswordField('New Password', validators=[DataRequired(), Length(min=15, )]) 'Confirm Password',
new_password2 = PasswordField('Repeat New Password', validators=[DataRequired(), EqualTo('new_password')]) validators=[DataRequired()],
render_kw={'autofocus': True},
)
new_password = PasswordField(
'New Password',
validators=[DataRequired(), Length(min=15, )],
)
new_password2 = PasswordField(
'Repeat New Password',
validators=[DataRequired(), EqualTo('new_password')],
)
submit = SubmitField('Save') submit = SubmitField('Save')
def validate_password(self, password): def validate_password(self, password):

View File

@ -64,12 +64,20 @@ class Contributor(UserMixin, db.Model):
return '<Contributor {}>'.format(self.name) return '<Contributor {}>'.format(self.name)
def get_reset_password_token(self, expires_in=1800): def get_reset_password_token(self, expires_in=1800):
return jwt.encode({'reset_password': self.id, 'exp': time() + expires_in}, current_app.config['SECRET_KEY'], algorithm='HS256').decode('utf-8') return jwt.encode(
{'reset_password': self.id, 'exp': time() + expires_in},
current_app.config['SECRET_KEY'],
algorithm='HS256',
).decode('utf-8')
@staticmethod @staticmethod
def verify_reset_password_token(token): def verify_reset_password_token(token):
try: try:
id = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])['reset_password'] id = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256'],
)['reset_password']
except BaseException as error: except BaseException as error:
print('An exception occurred: {}'.format(error)) print('An exception occurred: {}'.format(error))
return Contributor.query.get(id) return Contributor.query.get(id)

View File

@ -1,6 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from flask import Blueprint, request, redirect, url_for, render_template, current_app, send_file from flask import (
Blueprint, request, redirect, url_for,
render_template, current_app, send_file,
)
from flask_login import current_user from flask_login import current_user
from app.models import Photo from app.models import Photo
from app.forms import ConfirmPhotoDelete from app.forms import ConfirmPhotoDelete
@ -17,7 +20,10 @@ def download():
if current_user.is_authenticated: if current_user.is_authenticated:
f = request.args['file'] f = request.args['file']
try: try:
return send_file('/var/lib/photo_app/photos/{}'.format(f), attachment_filename=f) return send_file(
'/var/lib/photo_app/photos/{}'.format(f),
attachment_filename=f,
)
except Exception as e: except Exception as e:
return str(e) return str(e)
@ -27,11 +33,14 @@ def delete():
photo = Photo.query.get(request.args['photo_id']) photo = Photo.query.get(request.args['photo_id'])
if photo is None: if photo is None:
return(redirect(url_for('proute.index'))) return(redirect(url_for('proute.index')))
if not current_user.is_authenticated or photo.contributor_id != current_user.id: cu = current_user
if not cu.is_authenticated or photo.contributor_id != cu.id:
return(redirect(url_for('proute.index'))) return(redirect(url_for('proute.index')))
form = ConfirmPhotoDelete() form = ConfirmPhotoDelete()
if request.method == 'POST' and form.validate_on_submit(): if request.method == 'POST' and form.validate_on_submit():
return(redirect(url_for('p_route.photo', photo_id=delete_photo(photo)))) return(
redirect(url_for('p_route.photo', photo_id=delete_photo(photo))),
)
return(render_template( return(render_template(
'delete_photo.html', 'delete_photo.html',
title="Delete Photo?", title="Delete Photo?",
@ -49,11 +58,19 @@ def delete_photo(photo):
password=current_app.config['DATABASE_PASSWORD'] password=current_app.config['DATABASE_PASSWORD']
) )
cur = conn.cursor() cur = conn.cursor()
cur.execute("SELECT count(id) FROM photo WHERE contributor_id=%s AND id>%s", (photo.contributor_id, photo.id)) cur.execute(
"SELECT count(id) FROM photo WHERE contributor_id=%s AND id>%s",
(photo.contributor_id, photo.id),
)
if cur.fetchone()[0] == 0: if cur.fetchone()[0] == 0:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s ORDER BY id", (photo.contributor_id, )) cur.execute(
"SELECT id FROM photo WHERE contributor_id=%s ORDER BY id",
(photo.contributor_id, ),
)
else: else:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s AND id>%s ORDER BY id", (photo.contributor_id, photo.id)) my_statement = "SELECT id FROM photo WHERE contributor_id=%s "
my_statement += "AND id>%s ORDER BY id"
cur.execute(my_statement, (photo.contributor_id, photo.id))
next_photo_id = cur.fetchone()[0] next_photo_id = cur.fetchone()[0]
os.chdir(current_app.config['PHOTO_SAVE_PATH']) os.chdir(current_app.config['PHOTO_SAVE_PATH'])
if os.path.exists('raw_' + photo.photo_name): if os.path.exists('raw_' + photo.photo_name):

View File

@ -2,7 +2,9 @@
from flask import current_app from flask import current_app
from flask_login import current_user from flask_login import current_user
from flask import Blueprint, render_template, redirect, url_for, flash, request, abort from flask import (
Blueprint, render_template, redirect, url_for, flash, request, abort
)
from app.forms import UploadPhotoForm from app.forms import UploadPhotoForm
from .scripts.process_uploaded_photo import process_uploaded_photo from .scripts.process_uploaded_photo import process_uploaded_photo
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
@ -23,10 +25,20 @@ def photo_upload():
if filename != '': if filename != '':
import os import os
file_ext = os.path.splitext(filename)[1] file_ext = os.path.splitext(filename)[1]
if file_ext not in ['.jpg', '.png'] or file_ext != validate_image(f.stream): fe = file_ext
if fe not in ['.jpg', '.png'] or fe != validate_image(f.stream):
abort(400) abort(400)
f.save(os.path.join(current_app.config['PHOTO_SAVE_PATH'], 'raw_' + filename)) f.save(
photo_id = process_uploaded_photo(filename, current_user, current_app.config) os.path.join(
current_app.config['PHOTO_SAVE_PATH'],
'raw_' + filename,
),
)
photo_id = process_uploaded_photo(
filename,
current_user,
current_app.config,
)
print(photo_id) print(photo_id)
flash("Thanks for the new photo!") flash("Thanks for the new photo!")
return(redirect(url_for('p_route.photo', photo_id=photo_id))) return(redirect(url_for('p_route.photo', photo_id=photo_id)))

View File

@ -34,19 +34,35 @@ def find_next_previous(photo):
password=current_app.config['DATABASE_PASSWORD'] password=current_app.config['DATABASE_PASSWORD']
) )
cur = conn.cursor() cur = conn.cursor()
cur.execute("SELECT count(id) FROM photo WHERE contributor_id=%s AND id > %s", (photo.contributor_id, photo.id)) cur.execute(
"SELECT count(id) FROM photo WHERE contributor_id=%s AND id > %s",
(photo.contributor_id, photo.id),
)
count = cur.fetchone()[0] count = cur.fetchone()[0]
if count == 0: if count == 0:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s ORDER BY id", (photo.contributor_id, )) cur.execute(
"SELECT id FROM photo WHERE contributor_id=%s ORDER BY id",
(photo.contributor_id, ),
)
else: else:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s AND id > %s ORDER BY id", (photo.contributor_id, photo.id)) my_sql = "SELECT id FROM photo WHERE contributor_id=%s "
my_sql += "AND id > %s ORDER BY id"
cur.execute(my_sql, (photo.contributor_id, photo.id))
photo.next_photo_id = cur.fetchone()[0] photo.next_photo_id = cur.fetchone()[0]
cur.execute("SELECT count(id) FROM photo WHERE contributor_id=%s AND id < %s", (photo.contributor_id, photo.id)) cur.execute(
"SELECT count(id) FROM photo WHERE contributor_id=%s AND id < %s",
(photo.contributor_id, photo.id),
)
count = cur.fetchone()[0] count = cur.fetchone()[0]
if count == 0: if count == 0:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s ORDER BY id DESC", (photo.contributor_id, )) cur.execute(
"SELECT id FROM photo WHERE contributor_id=%s ORDER BY id DESC",
(photo.contributor_id, ),
)
else: else:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s AND id < %s ORDER BY id DESC", (photo.contributor_id, photo.id)) my_sql = "SELECT id FROM photo WHERE contributor_id=%s "
my_sql += "AND id < %s ORDER BY id DESC"
cur.execute(my_sql, (photo.contributor_id, photo.id))
photo.previous_photo_id = cur.fetchone()[0] photo.previous_photo_id = cur.fetchone()[0]
conn.close() conn.close()
@ -58,11 +74,17 @@ def calc_additional_data(photo):
else: else:
photo.SizeOnDisc = str(round(photo.photo_raw_size / 1024, 1)) + 'K' photo.SizeOnDisc = str(round(photo.photo_raw_size / 1024, 1)) + 'K'
if photo.photo_1280_size >= 1048576: if photo.photo_1280_size >= 1048576:
photo.SizeOnDisc1280 = str(round(photo.photo_1280_size / 1048576, 1)) + 'M' photo.SizeOnDisc1280 = str(
round(photo.photo_1280_size / 1048576, 1),
) + 'M'
else: else:
photo.SizeOnDisc1280 = str(round(photo.photo_1280_size / 1024, 1)) + 'K' photo.SizeOnDisc1280 = str(
round(photo.photo_1280_size / 1024, 1),
) + 'K'
if photo.photo_480_size >= 1048576: if photo.photo_480_size >= 1048576:
photo.SizeOnDisc480 = str(round(photo.photo_480_size / 1048576, 1)) + 'M' photo.SizeOnDisc480 = str(
round(photo.photo_480_size / 1048576, 1),
) + 'M'
else: else:
photo.SizeOnDisc480 = str(round(photo.photo_480_size / 1024, 1)) + 'K' photo.SizeOnDisc480 = str(round(photo.photo_480_size / 1024, 1)) + 'K'
if photo.GPSAltitude is not None: if photo.GPSAltitude is not None:
@ -71,6 +93,8 @@ def calc_additional_data(photo):
photo.GPSAltitudeFeet = None photo.GPSAltitudeFeet = None
if photo.GPSLatitude is not None and photo.GPSLongitude is not None: if photo.GPSLatitude is not None and photo.GPSLongitude is not None:
photo.LatLong = "{},{}".format(photo.GPSLatitude, photo.GPSLongitude) photo.LatLong = "{},{}".format(photo.GPSLatitude, photo.GPSLongitude)
photo.MapUrl = "https://www.google.com/maps/search/?api=1&query={}".format(photo.LatLong) my_map_url = "https://www.google.com/"
my_map_url += "maps/search/?api=1&query={}".format(photo.LatLong)
photo.MapUrl = my_map_url
else: else:
photo.LatLong, photo.MapUrl = None, None photo.LatLong, photo.MapUrl = None, None

View File

@ -33,7 +33,9 @@ def get_photo_list(contributor_id):
password=current_app.config['DATABASE_PASSWORD'] password=current_app.config['DATABASE_PASSWORD']
) )
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT photo_name,id FROM photo WHERE contributor_id=%s ORDER BY timestamp,\"DateTimeOriginal\" DESC", (contributor_id, )) my_sql = "SELECT photo_name,id FROM photo WHERE contributor_id=%s "
my_sql += "ORDER BY timestamp,\"DateTimeOriginal\" DESC"
cur.execute(my_sql, (contributor_id, ))
photos = cur.fetchall() photos = cur.fetchall()
conn.close() conn.close()
return photos return photos

View File

@ -26,15 +26,34 @@ def get_exif(img_raw, exif_data):
if v == "Software": if v == "Software":
exif_data['Software'] = exifdata[k] exif_data['Software'] = exifdata[k]
if v == "DateTime": if v == "DateTime":
exif_data['DateTime'] = datetime.strptime(exifdata[k], date_format) exif_data['DateTime'] = datetime.strptime(
exifdata[k],
date_format,
)
if v == "DateTimeOriginal": if v == "DateTimeOriginal":
exif_data['DateTimeOriginal'] = datetime.strptime(exifdata[k], date_format) exif_data['DateTimeOriginal'] = datetime.strptime(
exifdata[k],
date_format,
)
if v == "DateTimeDigitized": if v == "DateTimeDigitized":
exif_data['DateTimeDigitized'] = datetime.strptime(exifdata[k], date_format) exif_data['DateTimeDigitized'] = datetime.strptime(
exifdata[k],
date_format,
)
if v == "FNumber": if v == "FNumber":
exif_data['fnumber'] = round(exifdata[k][0] / exifdata[k][1], 1) if type(exifdata[k]) == tuple:
x, y = exifdata[k][0], exifdata[k][1]
else:
x = exifdata[k].numerator
y = exifdata[k].denominator
exif_data['fnumber'] = round(x / y, 1)
if v == "DigitalZoomRatio": if v == "DigitalZoomRatio":
exif_data['DigitalZoomRatio'] = round(exifdata[k][0] / exifdata[k][1], 2) if type(exifdata[k]) == tuple:
x, y = exifdata[k][0], exifdata[k][1]
else:
x = exifdata[k].numerator
y = exifdata[k].denominator
exif_data['DigitalZoomRatio'] = round(x / y, 2)
if v == "TimeZoneOffset": if v == "TimeZoneOffset":
exif_data['TimeZoneOffset'] = exifdata[k] exif_data['TimeZoneOffset'] = exifdata[k]
if v == "GPSInfo": if v == "GPSInfo":
@ -42,35 +61,66 @@ def get_exif(img_raw, exif_data):
for h, i in GPSTAGS.items(): for h, i in GPSTAGS.items():
if h in exifdata[k]: if h in exifdata[k]:
if i == 'GPSAltitudeRef': if i == 'GPSAltitudeRef':
gpsinfo['GPSAltitudeRef'] = int.from_bytes(exifdata[k][h], "big") gpsinfo['GPSAltitudeRef'] = int.from_bytes(
exifdata[k][h],
"big",
)
if i == 'GPSAltitude': if i == 'GPSAltitude':
gpsinfo['GPSAltitude'] = round(exifdata[k][h][0] / exifdata[k][h][1], 3) if type(exifdata[k][h]) == tuple:
x = exifdata[k][h][0]
y = exifdata[k][h][1]
else:
x = exifdata[k][h].numerator
y = exifdata[k][h].denominator
gpsinfo['GPSAltitude'] = round(x / y, 3)
if i == 'GPSLatitudeRef': if i == 'GPSLatitudeRef':
gpsinfo['GPSLatitudeRef'] = exifdata[k][h] gpsinfo['GPSLatitudeRef'] = exifdata[k][h]
if i == 'GPSLatitude': if i == 'GPSLatitude':
gpsinfo['GPSLatitude'] = calc_coordinate(exifdata[k][h]) gpsinfo['GPSLatitude'] = calc_coordinate(
exifdata[k][h],
)
if i == 'GPSLongitudeRef': if i == 'GPSLongitudeRef':
gpsinfo['GPSLongitudeRef'] = exifdata[k][h] gpsinfo['GPSLongitudeRef'] = exifdata[k][h]
if i == 'GPSLongitude': if i == 'GPSLongitude':
gpsinfo['GPSLongitude'] = calc_coordinate(exifdata[k][h]) gpsinfo['GPSLongitude'] = calc_coordinate(
exifdata[k][h],
)
update_gpsinfo(gpsinfo, exif_data) update_gpsinfo(gpsinfo, exif_data)
def update_gpsinfo(gpsinfo, exif_data): def update_gpsinfo(gpsinfo, exif_data):
if 'GPSAltitudeRef' in gpsinfo and 'GPSLatitude' in gpsinfo: if 'GPSAltitudeRef' in gpsinfo and 'GPSLatitude' in gpsinfo:
exif_data['GPSAltitude'] = gpsinfo['GPSAltitude'] exif_data['GPSAltitude'] = gpsinfo['GPSAltitude']
exif_data['GPSAboveSeaLevel'] = False if gpsinfo['GPSAltitudeRef'] != 0 else True exif_data['GPSAboveSeaLevel'] = False
if gpsinfo['GPSAltitudeRef'] == 0:
exif_data['GPSAboveSeaLevel'] = True
if 'GPSLatitudeRef' in gpsinfo and 'GPSLatitude' in gpsinfo: if 'GPSLatitudeRef' in gpsinfo and 'GPSLatitude' in gpsinfo:
exif_data['GPSLatitude'] = gpsinfo['GPSLatitude'] if gpsinfo['GPSLatitudeRef'] != 'S' else 0 - gpsinfo['GPSLatitude'] if gpsinfo['GPSLatitudeRef'] != 'S':
exif_data['GPSLatitude'] = gpsinfo['GPSLatitude']
else:
exif_data['GPSLatitude'] = 0 - gpsinfo['GPSLatitude']
if 'GPSLongitudeRef' in gpsinfo and 'GPSLongitude' in gpsinfo: if 'GPSLongitudeRef' in gpsinfo and 'GPSLongitude' in gpsinfo:
exif_data['GPSLongitude'] = gpsinfo['GPSLongitude'] if gpsinfo['GPSLongitudeRef'] != 'W' else 0 - gpsinfo['GPSLongitude'] if gpsinfo['GPSLongitudeRef'] != 'W':
exif_data['GPSLongitude'] = gpsinfo['GPSLongitude']
else:
exif_data['GPSLongitude'] = 0 - gpsinfo['GPSLongitude']
def calc_coordinate(x): def calc_coordinate(x):
if type(x[0]) == tuple:
degrees = x[0][0] / x[0][1] degrees = x[0][0] / x[0][1]
else:
degrees = x[0].numerator / x[0].denominator
if type(x[1]) == tuple:
minutes = x[1][0] / x[1][1] minutes = x[1][0] / x[1][1]
else:
minutes = x[1].numerator / x[1].denominator
if type(x[2]) == tuple:
seconds = x[2][0] / x[2][1] seconds = x[2][0] / x[2][1]
return round(degrees + minutes / 60 + seconds / 3600, 5) else:
seconds = x[2].numerator / x[2].denominator
result = round(degrees + minutes / 60 + seconds / 3600, 5)
return result
def get_dimensions_and_format(photo, exif_data): def get_dimensions_and_format(photo, exif_data):

View File

@ -10,6 +10,28 @@ from .get_exif_data import get_exif_data
def process_uploaded_photo(filename, current_user, app_config): def process_uploaded_photo(filename, current_user, app_config):
crop_photo(filename, app_config['PHOTO_SAVE_PATH']) crop_photo(filename, app_config['PHOTO_SAVE_PATH'])
exif_data = get_exif_data(filename) exif_data = get_exif_data(filename)
if 'DateTimeOriginal' in exif_data:
my_date_time_original = exif_data['DateTimeOriginal']
else:
my_date_time_original = None
if 'DateTimeDigitized' in exif_data:
my_date_time_digitized = exif_data['DateTimeDigitized']
else:
my_date_time_digitized = None
if 'DigitalZoomRatio' in exif_data:
my_digital_zoom_ratio = exif_data['DigitalZoomRatio']
else:
my_digital_zoom_ratio = None
if 'TimeZoneOffset' in exif_data:
my_timezone_offset = exif_data['TimeZoneOffset']
else:
my_timezone_offset = None
if 'GPSAboveSeaLevel' in exif_data:
my_gps_above_sealevel = exif_data['GPSAboveSeaLevel']
else:
my_gps_above_sealevel = None
conn = psycopg2.connect( conn = psycopg2.connect(
dbname=app_config['DATABASE_NAME'], dbname=app_config['DATABASE_NAME'],
user=app_config['DATABASE_USER'], user=app_config['DATABASE_USER'],
@ -19,7 +41,10 @@ def process_uploaded_photo(filename, current_user, app_config):
cur = conn.cursor() cur = conn.cursor()
cur.execute("SELECT setval('photo_id_seq', (SELECT MAX(id) FROM photo))") cur.execute("SELECT setval('photo_id_seq', (SELECT MAX(id) FROM photo))")
conn.commit() conn.commit()
cur.execute("SELECT count(id) FROM photo WHERE photo_name=%s", (filename, )) cur.execute(
"SELECT count(id) FROM photo WHERE photo_name=%s",
(filename, ),
)
if cur.fetchone()[0] == 0: if cur.fetchone()[0] == 0:
sql_statement = "INSERT INTO photo(" sql_statement = "INSERT INTO photo("
@ -84,14 +109,14 @@ def process_uploaded_photo(filename, current_user, app_config):
exif_data['Model'] if 'Model' in exif_data else None, exif_data['Model'] if 'Model' in exif_data else None,
exif_data['Software'] if 'Software' in exif_data else None, exif_data['Software'] if 'Software' in exif_data else None,
exif_data['DateTime'] if 'DateTime' in exif_data else None, exif_data['DateTime'] if 'DateTime' in exif_data else None,
exif_data['DateTimeOriginal'] if 'DateTimeOriginal' in exif_data else None, my_date_time_original,
exif_data['DateTimeDigitized'] if 'DateTimeDigitized' in exif_data else None, my_date_time_digitized,
exif_data['fnumber'] if 'fnumber' in exif_data else None, exif_data['fnumber'] if 'fnumber' in exif_data else None,
exif_data['DigitalZoomRatio'] if 'DigitalZoomRatio' in exif_data else None, my_digital_zoom_ratio,
exif_data['AspectRatio'], exif_data['AspectRatio'],
exif_data['TimeZoneOffset'] if 'TimeZoneOffset' in exif_data else None, my_timezone_offset,
exif_data['GPSAltitude'] if 'GPSAltitude' in exif_data else None, exif_data['GPSAltitude'] if 'GPSAltitude' in exif_data else None,
exif_data['GPSAboveSeaLevel'] if 'GPSAboveSeaLevel' in exif_data else None, my_gps_above_sealevel,
exif_data['GPSLatitude'] if 'GPSLatitude' in exif_data else None, exif_data['GPSLatitude'] if 'GPSLatitude' in exif_data else None,
exif_data['GPSLongitude'] if 'GPSLongitude' in exif_data else None, exif_data['GPSLongitude'] if 'GPSLongitude' in exif_data else None,
int(time() * 1000) int(time() * 1000)
@ -145,14 +170,14 @@ def process_uploaded_photo(filename, current_user, app_config):
exif_data['Model'] if 'Model' in exif_data else None, exif_data['Model'] if 'Model' in exif_data else None,
exif_data['Software'] if 'Software' in exif_data else None, exif_data['Software'] if 'Software' in exif_data else None,
exif_data['DateTime'] if 'DateTime' in exif_data else None, exif_data['DateTime'] if 'DateTime' in exif_data else None,
exif_data['DateTimeOriginal'] if 'DateTimeOriginal' in exif_data else None, my_date_time_original,
exif_data['DateTimeDigitized'] if 'DateTimeDigitized' in exif_data else None, my_date_time_digitized,
exif_data['fnumber'] if 'fnumber' in exif_data else None, exif_data['fnumber'] if 'fnumber' in exif_data else None,
exif_data['DigitalZoomRatio'] if 'DigitalZoomRatio' in exif_data else None, my_digital_zoom_ratio,
exif_data['AspectRatio'], exif_data['AspectRatio'],
exif_data['TimeZoneOffset'] if 'TimeZoneOffset' in exif_data else None, my_timezone_offset,
exif_data['GPSAltitude'] if 'GPSAltitude' in exif_data else None, exif_data['GPSAltitude'] if 'GPSAltitude' in exif_data else None,
exif_data['GPSAboveSeaLevel'] if 'GPSAboveSeaLevel' in exif_data else None, my_gps_above_sealevel,
exif_data['GPSLatitude'] if 'GPSLatitude' in exif_data else None, exif_data['GPSLatitude'] if 'GPSLatitude' in exif_data else None,
exif_data['GPSLongitude'] if 'GPSLongitude' in exif_data else None, exif_data['GPSLongitude'] if 'GPSLongitude' in exif_data else None,
int(time() * 1000), int(time() * 1000),

View File

@ -24,8 +24,10 @@ class SENDXMPPHandler(Handler):
self.logging_xmpp_use_tls = logging_xmpp_use_tls self.logging_xmpp_use_tls = logging_xmpp_use_tls
''' '''
This works on Debian 10 with flask running under gunicorn3 as a systemd service, hack as necessary This works on Debian 10 with flask running under
echo '<message>' | /usr/bin/sendxmpp -t -u <sender> -j <server> -p <password> <recipient@example.com> gunicorn3 as a systemd service, hack as necessary
echo '<message>' | /usr/bin/sendxmpp -t -u <sender> \
-j <server> -p <password> <recipient@example.com>
''' '''
def emit(self, record): def emit(self, record):
try: try: