Compare commits

..

17 Commits

Author SHA1 Message Date
c936756f7d update README.md 2025-02-11 17:33:12 -08:00
f0d0d1479c update app/photo_routes/scripts/get_exif_data.py to account
for new exif data type of IFDRational, instead of tuple for
some exif attributes
2025-02-10 16:27:51 -08:00
7997202d77 fix style app/photo_routes/scripts/process_uploaded_photo.py 2025-02-09 18:08:03 -08:00
533a82f314 fix style app/photo_routes/scripts/get_exif_data.py 2025-02-09 17:51:25 -08:00
a2de385638 fix style app/photo_routes/proutes.py 2025-02-09 17:33:26 -08:00
6b27846388 fix style app/photo_routes/photox.py 2025-02-09 17:31:44 -08:00
13e6f98c5f fix style app/photo_routes/photo_upload.py 2025-02-09 17:22:16 -08:00
b3f0de046b fix style app/photo_routes/delete_download.py 2025-02-09 17:19:38 -08:00
bab12d33e9 fix style app/auth/totp.py 2025-02-09 17:13:58 -08:00
5eae85ca4a fix style app/auth/reset_password.py 2025-02-09 17:12:51 -08:00
72a8b46375 fix style app/auth/register.py 2025-02-09 17:08:42 -08:00
4cb734be40 fix style app/auth/profile.py 2025-02-09 17:03:51 -08:00
4b67a96f56 fix style app/auth/email.py 2025-02-09 17:03:05 -08:00
c8d06abbe2 fix style app/auth/auth.py 2025-02-09 16:58:20 -08:00
cec0716bc3 fix style app/sendxmpp_handler.py 2025-02-09 16:52:30 -08:00
0a6c6a2afd fix style app/models.py 2025-02-09 16:50:39 -08:00
86d482691b fix style app/forms.py 2025-02-09 16:48:06 -08:00
16 changed files with 368 additions and 95 deletions

View File

@ -85,3 +85,6 @@ For 2fa, you can use an authenticator application such as
2. install certbot and get certs for each of the two subdomains
2. install service file in `/etc/systemd/system`
2. enable and start systemd service
## Debian 12 Upgrade
2. wtforms.validators now requires `python3-wtforms-components` for the email validator

View File

@ -20,15 +20,25 @@ def two_factor_input():
return redirect(url_for('proute.index'))
form = GetTotp()
if form.validate_on_submit():
if TOTP(contributor.totp_key).verify(int(form.totp_code.data), valid_window=5):
if TOTP(
contributor.totp_key,
).verify(int(form.totp_code.data), valid_window=5):
login_user(contributor, remember=session['remember_me'])
flash("Congratulations, you are now logged in!")
return redirect(url_for('proute.index'))
else:
flash("Oops, the pin was wrong")
form.totp_code.data = None
return render_template('two_factor_input.html', form=form, inst="Code was wrong, try again?")
return render_template('two_factor_input.html', form=form, inst="Enter Auth Code")
return render_template(
'two_factor_input.html',
form=form,
inst="Code was wrong, try again?",
)
return render_template(
'two_factor_input.html',
form=form,
inst="Enter Auth Code",
)
@auths.route("/login", methods=["GET", "POST"])
@ -37,9 +47,14 @@ def login():
return redirect(url_for('proute.index'))
form = LoginForm()
if form.validate_on_submit():
contributor_by_name = Contributor.query.filter_by(name=form.username.data).first()
contributor_by_email = Contributor.query.filter_by(email=form.email.data).first()
if contributor_by_name is not None and contributor_by_name.check_password(form.password.data):
contributor_by_name = Contributor.query.filter_by(
name=form.username.data,
).first()
contributor_by_email = Contributor.query.filter_by(
email=form.email.data,
).first()
cbn, cbe = contributor_by_name, contributor_by_email
if cbn is not None and cbn.check_password(form.password.data):
if contributor_by_name.use_totp:
session['id'] = contributor_by_name.id
session['remember_me'] = form.remember_me.data
@ -48,13 +63,16 @@ def login():
login_user(contributor_by_name, remember=form.remember_me.data)
flash("Congratulations, you are now logged in!")
return redirect(url_for('proute.index'))
elif contributor_by_email is not None and contributor_by_email.check_password(form.password.data):
elif cbe is not None and cbe.check_password(form.password.data):
if contributor_by_email.use_totp:
session['id'] = contributor_by_email.id
session['remember_me'] = form.remember_me.data
return redirect(url_for('auths.two_factor_input'))
else:
login_user(contributor_by_email, remember=form.remember_me.data)
login_user(
contributor_by_email,
remember=form.remember_me.data,
)
flash("Congratulations, you are now logged in!")
return redirect(url_for('proute.index'))
else:

View File

@ -8,13 +8,23 @@ from threading import Thread
def send_password_reset_email(contributor, external_url):
token = contributor.get_reset_password_token()
send_email('Photo App Reset Your Password',
send_email(
'Photo App Reset Your Password',
sender=current_app.config['MAIL_ADMINS'][0],
recipients=[contributor.email],
text_body=render_template('email/reset_password_email_text.txt',
contributor=contributor, token=token, external_url=external_url),
html_body=render_template('email/reset_password_email_html.html',
contributor=contributor, token=token, external_url=external_url))
text_body=render_template(
'email/reset_password_email_text.txt',
contributor=contributor,
token=token,
external_url=external_url,
),
html_body=render_template(
'email/reset_password_email_html.html',
contributor=contributor,
token=token,
external_url=external_url,
),
)
def send_async_email(app, msg):
@ -26,4 +36,7 @@ def send_email(subject, sender, recipients, text_body, html_body):
msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body
msg.html = html_body
Thread(target=send_async_email, args=(current_app._get_current_object(), msg)).start()
Thread(
target=send_async_email,
args=(current_app._get_current_object(), msg),
).start()

View File

@ -26,7 +26,11 @@ def change_password():
else:
flash("Error Invalid Password")
return(redirect(url_for('prof.change_password')))
return render_template('change_password.html', title='Change Password', form=form)
return render_template(
'change_password.html',
title='Change Password',
form=form,
)
@prof.route("/edit-profile", methods=["GET", "POST"])

View File

@ -17,9 +17,15 @@ def register():
return redirect(url_for('proute.index'))
form = RegistrationForm()
if form.validate_on_submit():
db.engine.execute("SELECT setval('contributor_id_seq', (SELECT MAX(id) FROM contributor))")
my_sql = "SELECT setval('contributor_id_seq', "
my_sql += "(SELECT MAX(id) FROM contributor))"
db.engine.execute(my_sql)
db.session.commit()
contributor = Contributor(name=form.username.data, num_photos=0, email=form.email.data)
contributor = Contributor(
name=form.username.data,
num_photos=0,
email=form.email.data,
)
contributor.set_password(form.password.data)
db.session.add(contributor)
db.session.commit()

View File

@ -1,6 +1,8 @@
#!/usr/bin/env python3
from flask import Blueprint, redirect, url_for, flash, render_template, current_app
from flask import (
Blueprint, redirect, url_for, flash, render_template, current_app
)
from flask_login import current_user
from app.models import Contributor
from app.forms import ResetPasswordForm, ResetPasswordRequestForm
@ -25,7 +27,11 @@ def reset_password(token):
db.session.commit()
flash('Your password has been reset.')
return redirect(url_for('auths.login'))
return render_template('reset_password.html', title="New Password?", form=form)
return render_template(
'reset_password.html',
title="New Password?",
form=form,
)
@pwd.route('/reset-password-request', methods=['GET', 'POST'])
@ -35,12 +41,23 @@ def reset_password_request():
else:
form = ResetPasswordRequestForm()
if form.validate_on_submit():
contributor = Contributor.query.filter_by(email=form.email.data).first()
contributor = Contributor.query.filter_by(
email=form.email.data,
).first()
if contributor:
send_password_reset_email(contributor, current_app.config['EXTERNAL_URL'])
flash('Check your email for the instructions to reset your password')
send_password_reset_email(
contributor,
current_app.config['EXTERNAL_URL'],
)
my_flash = 'Check your email for the instructions '
my_flash += 'to reset your password'
flash(my_flash)
return redirect(url_for('auths.login'))
else:
flash('Sorry, invalid email')
return redirect(url_for('auths.login'))
return render_template('reset_password_request.html', title='Reset Password', form=form)
return render_template(
'reset_password_request.html',
title='Reset Password',
form=form,
)

View File

@ -32,7 +32,12 @@ def enable_totp():
else:
flash("TOTP Code didn't validate, rescan and try again")
return(redirect(url_for('prof.edit_profile')))
return render_template('qr.html', qr=qr, form=form, title="Aunthentication Code")
return render_template(
'qr.html',
qr=qr,
form=form,
title="Aunthentication Code",
)
def get_totp_qr(contributor):
@ -40,7 +45,9 @@ def get_totp_qr(contributor):
contributor.totp_key = pyotp.random_base32()
db.session.commit()
totp_uri = pyotp.totp.TOTP(contributor.totp_key).provisioning_uri(name=contributor.email, issuer_name='Photo App')
totp_uri = pyotp.totp.TOTP(
contributor.totp_key,
).provisioning_uri(name=contributor.email, issuer_name='Photo App')
img = qrcode.make(totp_uri, image_factory=qrcode.image.svg.SvgPathImage)
f = BytesIO()
img.save(f)

View File

@ -2,7 +2,9 @@
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired, Email, Optional, Regexp, ValidationError, EqualTo, Length
from wtforms.validators import (
DataRequired, Email, Optional, Regexp, ValidationError, EqualTo, Length
)
from flask_wtf.file import FileField, FileAllowed, FileRequired
from app.models import Contributor, EmailWhiteList
from zxcvbn import zxcvbn
@ -13,17 +15,38 @@ class ConfirmPhotoDelete(FlaskForm):
class GetTotp(FlaskForm):
totp_code = StringField('6-Digit Code?', validators=[DataRequired(), Length(min=6, max=6, message="6 Digits")], render_kw={'autofocus': True})
totp_code = StringField(
'6-Digit Code?',
validators=[DataRequired(), Length(min=6, max=6, message="6 Digits")],
render_kw={'autofocus': True},
)
submit = SubmitField('OK')
class ConfirmTotp(FlaskForm):
totp_code = StringField('6-Digit Code?', validators=[DataRequired(), Length(min=6, max=6, message="Rescan And Try Again")], render_kw={'autofocus': True})
totp_code = StringField(
'6-Digit Code?',
validators=[
DataRequired(),
Length(min=6, max=6, message="Rescan And Try Again"),
],
render_kw={'autofocus': True},
)
submit = SubmitField('Enable 2FA')
class EditProfile(FlaskForm):
username = StringField('Username', validators=[DataRequired(), Regexp('^[a-zA-Z0-9]+$', message='letters and digits only (no spaces)')], render_kw={'autofocus': True})
username = StringField(
'Username',
validators=[
DataRequired(),
Regexp(
'^[a-zA-Z0-9]+$',
message='letters and digits only (no spaces)',
),
],
render_kw={'autofocus': True},
)
email = StringField('Email', validators=[Optional(), Email()])
password = PasswordField('Confirm Password', validators=[DataRequired()])
submit = SubmitField('Update Name/Email')
@ -44,7 +67,10 @@ class EditProfile(FlaskForm):
class LoginForm(FlaskForm):
username = StringField('Username', validators=[Optional()], render_kw={'autofocus': True})
username = StringField(
'Username',
validators=[Optional()], render_kw={'autofocus': True},
)
email = StringField('Email', validators=[Optional(), Email()])
password = PasswordField('Password', validators=[DataRequired()])
remember_me = BooleanField('Remember Me')
@ -52,8 +78,15 @@ class LoginForm(FlaskForm):
class ResetPasswordForm(FlaskForm):
password = PasswordField('Password', validators=[DataRequired(), Length(min=15, )], render_kw={'autofocus': True})
password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')])
password = PasswordField(
'Password',
validators=[DataRequired(), Length(min=15, )],
render_kw={'autofocus': True},
)
password2 = PasswordField(
'Repeat Password',
validators=[DataRequired(), EqualTo('password')],
)
submit = SubmitField('Request Password Reset')
def validate_password(self, password):
@ -62,10 +95,26 @@ class ResetPasswordForm(FlaskForm):
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[DataRequired(), Regexp('^[a-zA-Z0-9]+$', message='letters and digits only (no spaces)')], render_kw={'autofocus': True})
username = StringField(
'Username',
validators=[
DataRequired(),
Regexp(
'^[a-zA-Z0-9]+$',
message='letters and digits only (no spaces)',
),
],
render_kw={'autofocus': True},
)
email = StringField('Email', validators=[DataRequired(), Email()])
password = PasswordField('Password', validators=[DataRequired(), Length(min=15, )])
password2 = PasswordField('Repeat Password', validators=[DataRequired(), EqualTo('password')])
password = PasswordField(
'Password',
validators=[DataRequired(), Length(min=15, )],
)
password2 = PasswordField(
'Repeat Password',
validators=[DataRequired(), EqualTo('password')],
)
submit = SubmitField('Register')
def validate_password(self, password):
@ -78,7 +127,9 @@ class RegistrationForm(FlaskForm):
raise ValidationError('Please use a different username.')
def validate_email(self, email):
white_listed_user = EmailWhiteList.query.filter_by(email=email.data).first()
white_listed_user = EmailWhiteList.query.filter_by(
email=email.data,
).first()
if white_listed_user is None:
raise ValidationError('This email address is not authorized.')
user = Contributor.query.filter_by(email=email.data).first()
@ -87,14 +138,28 @@ class RegistrationForm(FlaskForm):
class ResetPasswordRequestForm(FlaskForm):
email = StringField('Email', validators=[DataRequired(), Email()], render_kw={'autofocus': True})
email = StringField(
'Email',
validators=[DataRequired(), Email()],
render_kw={'autofocus': True},
)
submit = SubmitField('Request Password Reset')
class ChangePassword(FlaskForm):
password = PasswordField('Confirm Password', validators=[DataRequired()], render_kw={'autofocus': True})
new_password = PasswordField('New Password', validators=[DataRequired(), Length(min=15, )])
new_password2 = PasswordField('Repeat New Password', validators=[DataRequired(), EqualTo('new_password')])
password = PasswordField(
'Confirm Password',
validators=[DataRequired()],
render_kw={'autofocus': True},
)
new_password = PasswordField(
'New Password',
validators=[DataRequired(), Length(min=15, )],
)
new_password2 = PasswordField(
'Repeat New Password',
validators=[DataRequired(), EqualTo('new_password')],
)
submit = SubmitField('Save')
def validate_password(self, password):

View File

@ -64,12 +64,20 @@ class Contributor(UserMixin, db.Model):
return '<Contributor {}>'.format(self.name)
def get_reset_password_token(self, expires_in=1800):
return jwt.encode({'reset_password': self.id, 'exp': time() + expires_in}, current_app.config['SECRET_KEY'], algorithm='HS256').decode('utf-8')
return jwt.encode(
{'reset_password': self.id, 'exp': time() + expires_in},
current_app.config['SECRET_KEY'],
algorithm='HS256',
).decode('utf-8')
@staticmethod
def verify_reset_password_token(token):
try:
id = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])['reset_password']
id = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256'],
)['reset_password']
except BaseException as error:
print('An exception occurred: {}'.format(error))
return Contributor.query.get(id)

View File

@ -1,6 +1,9 @@
#!/usr/bin/env python3
from flask import Blueprint, request, redirect, url_for, render_template, current_app, send_file
from flask import (
Blueprint, request, redirect, url_for,
render_template, current_app, send_file,
)
from flask_login import current_user
from app.models import Photo
from app.forms import ConfirmPhotoDelete
@ -17,7 +20,10 @@ def download():
if current_user.is_authenticated:
f = request.args['file']
try:
return send_file('/var/lib/photo_app/photos/{}'.format(f), attachment_filename=f)
return send_file(
'/var/lib/photo_app/photos/{}'.format(f),
attachment_filename=f,
)
except Exception as e:
return str(e)
@ -27,11 +33,14 @@ def delete():
photo = Photo.query.get(request.args['photo_id'])
if photo is None:
return(redirect(url_for('proute.index')))
if not current_user.is_authenticated or photo.contributor_id != current_user.id:
cu = current_user
if not cu.is_authenticated or photo.contributor_id != cu.id:
return(redirect(url_for('proute.index')))
form = ConfirmPhotoDelete()
if request.method == 'POST' and form.validate_on_submit():
return(redirect(url_for('p_route.photo', photo_id=delete_photo(photo))))
return(
redirect(url_for('p_route.photo', photo_id=delete_photo(photo))),
)
return(render_template(
'delete_photo.html',
title="Delete Photo?",
@ -49,11 +58,19 @@ def delete_photo(photo):
password=current_app.config['DATABASE_PASSWORD']
)
cur = conn.cursor()
cur.execute("SELECT count(id) FROM photo WHERE contributor_id=%s AND id>%s", (photo.contributor_id, photo.id))
cur.execute(
"SELECT count(id) FROM photo WHERE contributor_id=%s AND id>%s",
(photo.contributor_id, photo.id),
)
if cur.fetchone()[0] == 0:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s ORDER BY id", (photo.contributor_id, ))
cur.execute(
"SELECT id FROM photo WHERE contributor_id=%s ORDER BY id",
(photo.contributor_id, ),
)
else:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s AND id>%s ORDER BY id", (photo.contributor_id, photo.id))
my_statement = "SELECT id FROM photo WHERE contributor_id=%s "
my_statement += "AND id>%s ORDER BY id"
cur.execute(my_statement, (photo.contributor_id, photo.id))
next_photo_id = cur.fetchone()[0]
os.chdir(current_app.config['PHOTO_SAVE_PATH'])
if os.path.exists('raw_' + photo.photo_name):

View File

@ -2,7 +2,9 @@
from flask import current_app
from flask_login import current_user
from flask import Blueprint, render_template, redirect, url_for, flash, request, abort
from flask import (
Blueprint, render_template, redirect, url_for, flash, request, abort
)
from app.forms import UploadPhotoForm
from .scripts.process_uploaded_photo import process_uploaded_photo
from werkzeug.utils import secure_filename
@ -23,10 +25,20 @@ def photo_upload():
if filename != '':
import os
file_ext = os.path.splitext(filename)[1]
if file_ext not in ['.jpg', '.png'] or file_ext != validate_image(f.stream):
fe = file_ext
if fe not in ['.jpg', '.png'] or fe != validate_image(f.stream):
abort(400)
f.save(os.path.join(current_app.config['PHOTO_SAVE_PATH'], 'raw_' + filename))
photo_id = process_uploaded_photo(filename, current_user, current_app.config)
f.save(
os.path.join(
current_app.config['PHOTO_SAVE_PATH'],
'raw_' + filename,
),
)
photo_id = process_uploaded_photo(
filename,
current_user,
current_app.config,
)
print(photo_id)
flash("Thanks for the new photo!")
return(redirect(url_for('p_route.photo', photo_id=photo_id)))

View File

@ -34,19 +34,35 @@ def find_next_previous(photo):
password=current_app.config['DATABASE_PASSWORD']
)
cur = conn.cursor()
cur.execute("SELECT count(id) FROM photo WHERE contributor_id=%s AND id > %s", (photo.contributor_id, photo.id))
cur.execute(
"SELECT count(id) FROM photo WHERE contributor_id=%s AND id > %s",
(photo.contributor_id, photo.id),
)
count = cur.fetchone()[0]
if count == 0:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s ORDER BY id", (photo.contributor_id, ))
cur.execute(
"SELECT id FROM photo WHERE contributor_id=%s ORDER BY id",
(photo.contributor_id, ),
)
else:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s AND id > %s ORDER BY id", (photo.contributor_id, photo.id))
my_sql = "SELECT id FROM photo WHERE contributor_id=%s "
my_sql += "AND id > %s ORDER BY id"
cur.execute(my_sql, (photo.contributor_id, photo.id))
photo.next_photo_id = cur.fetchone()[0]
cur.execute("SELECT count(id) FROM photo WHERE contributor_id=%s AND id < %s", (photo.contributor_id, photo.id))
cur.execute(
"SELECT count(id) FROM photo WHERE contributor_id=%s AND id < %s",
(photo.contributor_id, photo.id),
)
count = cur.fetchone()[0]
if count == 0:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s ORDER BY id DESC", (photo.contributor_id, ))
cur.execute(
"SELECT id FROM photo WHERE contributor_id=%s ORDER BY id DESC",
(photo.contributor_id, ),
)
else:
cur.execute("SELECT id FROM photo WHERE contributor_id=%s AND id < %s ORDER BY id DESC", (photo.contributor_id, photo.id))
my_sql = "SELECT id FROM photo WHERE contributor_id=%s "
my_sql += "AND id < %s ORDER BY id DESC"
cur.execute(my_sql, (photo.contributor_id, photo.id))
photo.previous_photo_id = cur.fetchone()[0]
conn.close()
@ -58,11 +74,17 @@ def calc_additional_data(photo):
else:
photo.SizeOnDisc = str(round(photo.photo_raw_size / 1024, 1)) + 'K'
if photo.photo_1280_size >= 1048576:
photo.SizeOnDisc1280 = str(round(photo.photo_1280_size / 1048576, 1)) + 'M'
photo.SizeOnDisc1280 = str(
round(photo.photo_1280_size / 1048576, 1),
) + 'M'
else:
photo.SizeOnDisc1280 = str(round(photo.photo_1280_size / 1024, 1)) + 'K'
photo.SizeOnDisc1280 = str(
round(photo.photo_1280_size / 1024, 1),
) + 'K'
if photo.photo_480_size >= 1048576:
photo.SizeOnDisc480 = str(round(photo.photo_480_size / 1048576, 1)) + 'M'
photo.SizeOnDisc480 = str(
round(photo.photo_480_size / 1048576, 1),
) + 'M'
else:
photo.SizeOnDisc480 = str(round(photo.photo_480_size / 1024, 1)) + 'K'
if photo.GPSAltitude is not None:
@ -71,6 +93,8 @@ def calc_additional_data(photo):
photo.GPSAltitudeFeet = None
if photo.GPSLatitude is not None and photo.GPSLongitude is not None:
photo.LatLong = "{},{}".format(photo.GPSLatitude, photo.GPSLongitude)
photo.MapUrl = "https://www.google.com/maps/search/?api=1&query={}".format(photo.LatLong)
my_map_url = "https://www.google.com/"
my_map_url += "maps/search/?api=1&query={}".format(photo.LatLong)
photo.MapUrl = my_map_url
else:
photo.LatLong, photo.MapUrl = None, None

View File

@ -33,7 +33,9 @@ def get_photo_list(contributor_id):
password=current_app.config['DATABASE_PASSWORD']
)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT photo_name,id FROM photo WHERE contributor_id=%s ORDER BY timestamp,\"DateTimeOriginal\" DESC", (contributor_id, ))
my_sql = "SELECT photo_name,id FROM photo WHERE contributor_id=%s "
my_sql += "ORDER BY timestamp,\"DateTimeOriginal\" DESC"
cur.execute(my_sql, (contributor_id, ))
photos = cur.fetchall()
conn.close()
return photos

View File

@ -26,15 +26,34 @@ def get_exif(img_raw, exif_data):
if v == "Software":
exif_data['Software'] = exifdata[k]
if v == "DateTime":
exif_data['DateTime'] = datetime.strptime(exifdata[k], date_format)
exif_data['DateTime'] = datetime.strptime(
exifdata[k],
date_format,
)
if v == "DateTimeOriginal":
exif_data['DateTimeOriginal'] = datetime.strptime(exifdata[k], date_format)
exif_data['DateTimeOriginal'] = datetime.strptime(
exifdata[k],
date_format,
)
if v == "DateTimeDigitized":
exif_data['DateTimeDigitized'] = datetime.strptime(exifdata[k], date_format)
exif_data['DateTimeDigitized'] = datetime.strptime(
exifdata[k],
date_format,
)
if v == "FNumber":
exif_data['fnumber'] = round(exifdata[k][0] / exifdata[k][1], 1)
if type(exifdata[k]) == tuple:
x, y = exifdata[k][0], exifdata[k][1]
else:
x = exifdata[k].numerator
y = exifdata[k].denominator
exif_data['fnumber'] = round(x / y, 1)
if v == "DigitalZoomRatio":
exif_data['DigitalZoomRatio'] = round(exifdata[k][0] / exifdata[k][1], 2)
if type(exifdata[k]) == tuple:
x, y = exifdata[k][0], exifdata[k][1]
else:
x = exifdata[k].numerator
y = exifdata[k].denominator
exif_data['DigitalZoomRatio'] = round(x / y, 2)
if v == "TimeZoneOffset":
exif_data['TimeZoneOffset'] = exifdata[k]
if v == "GPSInfo":
@ -42,35 +61,66 @@ def get_exif(img_raw, exif_data):
for h, i in GPSTAGS.items():
if h in exifdata[k]:
if i == 'GPSAltitudeRef':
gpsinfo['GPSAltitudeRef'] = int.from_bytes(exifdata[k][h], "big")
gpsinfo['GPSAltitudeRef'] = int.from_bytes(
exifdata[k][h],
"big",
)
if i == 'GPSAltitude':
gpsinfo['GPSAltitude'] = round(exifdata[k][h][0] / exifdata[k][h][1], 3)
if type(exifdata[k][h]) == tuple:
x = exifdata[k][h][0]
y = exifdata[k][h][1]
else:
x = exifdata[k][h].numerator
y = exifdata[k][h].denominator
gpsinfo['GPSAltitude'] = round(x / y, 3)
if i == 'GPSLatitudeRef':
gpsinfo['GPSLatitudeRef'] = exifdata[k][h]
if i == 'GPSLatitude':
gpsinfo['GPSLatitude'] = calc_coordinate(exifdata[k][h])
gpsinfo['GPSLatitude'] = calc_coordinate(
exifdata[k][h],
)
if i == 'GPSLongitudeRef':
gpsinfo['GPSLongitudeRef'] = exifdata[k][h]
if i == 'GPSLongitude':
gpsinfo['GPSLongitude'] = calc_coordinate(exifdata[k][h])
gpsinfo['GPSLongitude'] = calc_coordinate(
exifdata[k][h],
)
update_gpsinfo(gpsinfo, exif_data)
def update_gpsinfo(gpsinfo, exif_data):
if 'GPSAltitudeRef' in gpsinfo and 'GPSLatitude' in gpsinfo:
exif_data['GPSAltitude'] = gpsinfo['GPSAltitude']
exif_data['GPSAboveSeaLevel'] = False if gpsinfo['GPSAltitudeRef'] != 0 else True
exif_data['GPSAboveSeaLevel'] = False
if gpsinfo['GPSAltitudeRef'] == 0:
exif_data['GPSAboveSeaLevel'] = True
if 'GPSLatitudeRef' in gpsinfo and 'GPSLatitude' in gpsinfo:
exif_data['GPSLatitude'] = gpsinfo['GPSLatitude'] if gpsinfo['GPSLatitudeRef'] != 'S' else 0 - gpsinfo['GPSLatitude']
if gpsinfo['GPSLatitudeRef'] != 'S':
exif_data['GPSLatitude'] = gpsinfo['GPSLatitude']
else:
exif_data['GPSLatitude'] = 0 - gpsinfo['GPSLatitude']
if 'GPSLongitudeRef' in gpsinfo and 'GPSLongitude' in gpsinfo:
exif_data['GPSLongitude'] = gpsinfo['GPSLongitude'] if gpsinfo['GPSLongitudeRef'] != 'W' else 0 - gpsinfo['GPSLongitude']
if gpsinfo['GPSLongitudeRef'] != 'W':
exif_data['GPSLongitude'] = gpsinfo['GPSLongitude']
else:
exif_data['GPSLongitude'] = 0 - gpsinfo['GPSLongitude']
def calc_coordinate(x):
if type(x[0]) == tuple:
degrees = x[0][0] / x[0][1]
else:
degrees = x[0].numerator / x[0].denominator
if type(x[1]) == tuple:
minutes = x[1][0] / x[1][1]
else:
minutes = x[1].numerator / x[1].denominator
if type(x[2]) == tuple:
seconds = x[2][0] / x[2][1]
return round(degrees + minutes / 60 + seconds / 3600, 5)
else:
seconds = x[2].numerator / x[2].denominator
result = round(degrees + minutes / 60 + seconds / 3600, 5)
return result
def get_dimensions_and_format(photo, exif_data):

View File

@ -10,6 +10,28 @@ from .get_exif_data import get_exif_data
def process_uploaded_photo(filename, current_user, app_config):
crop_photo(filename, app_config['PHOTO_SAVE_PATH'])
exif_data = get_exif_data(filename)
if 'DateTimeOriginal' in exif_data:
my_date_time_original = exif_data['DateTimeOriginal']
else:
my_date_time_original = None
if 'DateTimeDigitized' in exif_data:
my_date_time_digitized = exif_data['DateTimeDigitized']
else:
my_date_time_digitized = None
if 'DigitalZoomRatio' in exif_data:
my_digital_zoom_ratio = exif_data['DigitalZoomRatio']
else:
my_digital_zoom_ratio = None
if 'TimeZoneOffset' in exif_data:
my_timezone_offset = exif_data['TimeZoneOffset']
else:
my_timezone_offset = None
if 'GPSAboveSeaLevel' in exif_data:
my_gps_above_sealevel = exif_data['GPSAboveSeaLevel']
else:
my_gps_above_sealevel = None
conn = psycopg2.connect(
dbname=app_config['DATABASE_NAME'],
user=app_config['DATABASE_USER'],
@ -19,7 +41,10 @@ def process_uploaded_photo(filename, current_user, app_config):
cur = conn.cursor()
cur.execute("SELECT setval('photo_id_seq', (SELECT MAX(id) FROM photo))")
conn.commit()
cur.execute("SELECT count(id) FROM photo WHERE photo_name=%s", (filename, ))
cur.execute(
"SELECT count(id) FROM photo WHERE photo_name=%s",
(filename, ),
)
if cur.fetchone()[0] == 0:
sql_statement = "INSERT INTO photo("
@ -84,14 +109,14 @@ def process_uploaded_photo(filename, current_user, app_config):
exif_data['Model'] if 'Model' in exif_data else None,
exif_data['Software'] if 'Software' in exif_data else None,
exif_data['DateTime'] if 'DateTime' in exif_data else None,
exif_data['DateTimeOriginal'] if 'DateTimeOriginal' in exif_data else None,
exif_data['DateTimeDigitized'] if 'DateTimeDigitized' in exif_data else None,
my_date_time_original,
my_date_time_digitized,
exif_data['fnumber'] if 'fnumber' in exif_data else None,
exif_data['DigitalZoomRatio'] if 'DigitalZoomRatio' in exif_data else None,
my_digital_zoom_ratio,
exif_data['AspectRatio'],
exif_data['TimeZoneOffset'] if 'TimeZoneOffset' in exif_data else None,
my_timezone_offset,
exif_data['GPSAltitude'] if 'GPSAltitude' in exif_data else None,
exif_data['GPSAboveSeaLevel'] if 'GPSAboveSeaLevel' in exif_data else None,
my_gps_above_sealevel,
exif_data['GPSLatitude'] if 'GPSLatitude' in exif_data else None,
exif_data['GPSLongitude'] if 'GPSLongitude' in exif_data else None,
int(time() * 1000)
@ -145,14 +170,14 @@ def process_uploaded_photo(filename, current_user, app_config):
exif_data['Model'] if 'Model' in exif_data else None,
exif_data['Software'] if 'Software' in exif_data else None,
exif_data['DateTime'] if 'DateTime' in exif_data else None,
exif_data['DateTimeOriginal'] if 'DateTimeOriginal' in exif_data else None,
exif_data['DateTimeDigitized'] if 'DateTimeDigitized' in exif_data else None,
my_date_time_original,
my_date_time_digitized,
exif_data['fnumber'] if 'fnumber' in exif_data else None,
exif_data['DigitalZoomRatio'] if 'DigitalZoomRatio' in exif_data else None,
my_digital_zoom_ratio,
exif_data['AspectRatio'],
exif_data['TimeZoneOffset'] if 'TimeZoneOffset' in exif_data else None,
my_timezone_offset,
exif_data['GPSAltitude'] if 'GPSAltitude' in exif_data else None,
exif_data['GPSAboveSeaLevel'] if 'GPSAboveSeaLevel' in exif_data else None,
my_gps_above_sealevel,
exif_data['GPSLatitude'] if 'GPSLatitude' in exif_data else None,
exif_data['GPSLongitude'] if 'GPSLongitude' in exif_data else None,
int(time() * 1000),

View File

@ -24,8 +24,10 @@ class SENDXMPPHandler(Handler):
self.logging_xmpp_use_tls = logging_xmpp_use_tls
'''
This works on Debian 10 with flask running under gunicorn3 as a systemd service, hack as necessary
echo '<message>' | /usr/bin/sendxmpp -t -u <sender> -j <server> -p <password> <recipient@example.com>
This works on Debian 10 with flask running under
gunicorn3 as a systemd service, hack as necessary
echo '<message>' | /usr/bin/sendxmpp -t -u <sender> \
-j <server> -p <password> <recipient@example.com>
'''
def emit(self, record):
try: