remove python files
This commit is contained in:
66
database.py
66
database.py
@@ -1,66 +0,0 @@
|
||||
import os
|
||||
import psycopg2
|
||||
from psycopg2 import IntegrityError
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
def get_connection():
|
||||
"""Return a database connection."""
|
||||
return psycopg2.connect(
|
||||
host=os.getenv("PG_HOST"),
|
||||
port=os.getenv("PG_PORT"),
|
||||
dbname=os.getenv("PG_DATABASE"),
|
||||
user=os.getenv("PG_USER"),
|
||||
password=os.getenv("PG_PASSWORD"),
|
||||
connect_timeout=10
|
||||
)
|
||||
|
||||
def init_db():
|
||||
conn = get_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS subscribers (
|
||||
id SERIAL PRIMARY KEY,
|
||||
email TEXT UNIQUE NOT NULL
|
||||
)
|
||||
""")
|
||||
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS newsletters(
|
||||
id SERIAL PRIMARY KEY,
|
||||
subject TEXT NOT NULL,
|
||||
body TEXT NOT NULL,
|
||||
sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
def add_email(email):
|
||||
try:
|
||||
with get_connection() as conn:
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("INSERT INTO subscribers (email) VALUES (%s)", (email,))
|
||||
conn.commit()
|
||||
return True
|
||||
except IntegrityError:
|
||||
return False
|
||||
except psycopg2.OperationalError as e:
|
||||
print(f"Error: {e}")
|
||||
return False
|
||||
|
||||
def remove_email(email):
|
||||
try:
|
||||
with get_connection() as conn:
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("DELETE FROM subscribers WHERE email = %s", (email,))
|
||||
conn.commit()
|
||||
if cursor.rowcount > 0:
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Error removing email: {e}")
|
||||
return False
|
||||
104
server.py
104
server.py
@@ -1,104 +0,0 @@
|
||||
import os
|
||||
import smtplib
|
||||
from email.mime.text import MIMEText
|
||||
from flask import Flask, render_template, request, jsonify
|
||||
from database import get_connection, init_db, add_email, remove_email
|
||||
from dotenv import load_dotenv
|
||||
from collections import namedtuple
|
||||
|
||||
load_dotenv()
|
||||
app = Flask(__name__)
|
||||
init_db()
|
||||
|
||||
def send_confirmation_email(email):
|
||||
SMTP_SERVER = os.getenv('SMTP_SERVER')
|
||||
SMTP_PORT = int(os.getenv('SMTP_PORT', 465))
|
||||
SMTP_USER = os.getenv('SMTP_USER')
|
||||
SMTP_PASSWORD = os.getenv('SMTP_PASSWORD')
|
||||
|
||||
unsubscribe_link = f"{request.url_root}unsubscribe?email={email}"
|
||||
|
||||
subject = 'Thanks for subscribing!'
|
||||
|
||||
html_body = render_template(
|
||||
'confirmation_email.html',
|
||||
unsubscribe_link=unsubscribe_link
|
||||
)
|
||||
|
||||
msg = MIMEText(html_body, 'html', 'utf-8') # Specify HTML
|
||||
msg['Subject'] = subject
|
||||
msg['From'] = SMTP_USER
|
||||
msg['To'] = email
|
||||
|
||||
try:
|
||||
server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, timeout=10)
|
||||
server.login(SMTP_USER, SMTP_PASSWORD)
|
||||
server.sendmail(SMTP_USER, email, msg.as_string())
|
||||
server.quit()
|
||||
except Exception as e:
|
||||
print(f"Failed to send email to {email}: {e}")
|
||||
|
||||
@app.route("/")
|
||||
def index():
|
||||
return render_template("index.html")
|
||||
|
||||
@app.route("/subscribe", methods=["POST"])
|
||||
def subscribe():
|
||||
data = request.get_json()
|
||||
email = data.get('email')
|
||||
if not email:
|
||||
return jsonify({"error": "No email provided"}), 400
|
||||
|
||||
if add_email(email):
|
||||
send_confirmation_email(email)
|
||||
return jsonify({"message": "Email has been added"}), 201
|
||||
else:
|
||||
return jsonify({"error": "Email already exists"}), 400
|
||||
|
||||
@app.route("/unsubscribe", methods=["GET"])
|
||||
def unsubscribe():
|
||||
email = request.args.get("email")
|
||||
if not email:
|
||||
return "No email specified.", 400
|
||||
|
||||
if remove_email(email):
|
||||
return f"The email {email} has been unsubscribed.", 200
|
||||
else:
|
||||
return f"Email {email} was not found or has already been unsubscribed.", 400
|
||||
|
||||
@app.route("/newsletters")
|
||||
def newsletters():
|
||||
conn = get_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT id, subject, body, sent_at FROM newsletters ORDER BY sent_at DESC")
|
||||
results = cursor.fetchall()
|
||||
newsletters = [
|
||||
{"id": rec[0], "subject": rec[1], "body": rec[2], "sent_at": rec[3]}
|
||||
for rec in results
|
||||
]
|
||||
cursor.close()
|
||||
conn.close()
|
||||
return render_template("newsletters.html", newsletters=newsletters)
|
||||
|
||||
@app.route("/newsletter/<int:newsletter_id>")
|
||||
def newsletter_detail(newsletter_id):
|
||||
conn = get_connection()
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT id, subject, body, sent_at FROM newsletters WHERE id = %s", (newsletter_id,))
|
||||
record = cursor.fetchone()
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
if record is None:
|
||||
return "Newsletter not found.", 404
|
||||
|
||||
newsletter = {
|
||||
"id": record[0],
|
||||
"subject": record[1],
|
||||
"body": record[2],
|
||||
"sent_at": record[3]
|
||||
}
|
||||
return render_template("newsletter_detail.html", newsletter=newsletter)
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(debug=True)
|
||||
Reference in New Issue
Block a user