from . import routes
\ No newline at end of file
from typing import List, Optional
import stripe
from sqlalchemy.orm import Session
from ..customers.models import Customer
from ..database import engine
from ..exceptions import ConditionException, NotFoundException
from .schemas import AddCustomerSchema
def get_customers(offset: int, limit: int) -> List[Customer]:
with Session(engine) as db:
return db.query(Customer).offset(offset).limit(limit).all()
def get_customer(customer_id: int) -> Optional[Customer]:
with Session(engine) as db:
customer: Optional[Customer] = db.query(
Customer).filter(Customer.id == customer_id).first()
if not customer:
raise NotFoundException("Customer not found")
return customer
def add_customer(customer: AddCustomerSchema) -> Optional[Customer]:
with Session(engine) as db:
if db.query(Customer).filter(Customer.email == customer.email).first():
raise ConditionException("Customer already exists")
customer_stripe = stripe.Customer.create()
customer_db: Customer = Customer(id=customer_stripe.id, email=customer.email)
db.add(customer_db)
db.commit()
db.refresh(customer_db)
return customer_db
from sqlalchemy import Column, String
from sqlalchemy.orm import relationship
from ..database import Base
class Customer(Base):
__tablename__ = "customers"
# Stripe customer id
id = Column(String, primary_key=True, autoincrement=False)
email = Column(String, unique=True, index=True)
purchased_items = relationship("PurchasedItem")
payments = relationship("Payment")
from typing import List, Optional
from fastapi import APIRouter, Query
from .controllers import add_customer, get_customer, get_customers
from .schemas import AddCustomerSchema, CustomerSchema
customers_router = APIRouter(
prefix="/customers",
tags=["Customers"],
responses={
404: {"description": "Not found"},
}
)
@customers_router.get('/', response_model=List[CustomerSchema])
def read_customers(
offset: int = 0,
limit: int = Query(default=100, lte=100),
):
return get_customers(offset, limit)
@customers_router.get('/{customer_id}', response_model=Optional[CustomerSchema])
def read_customer(
customer_id: str,
):
return get_customer(customer_id)
@customers_router.post('/', response_model=Optional[CustomerSchema])
def create_customer(
customer: AddCustomerSchema,
):
return add_customer(customer)
from pydantic import BaseModel
class CustomerBase(BaseModel):
id: str
email: str
class Config:
orm_mode = True
class CustomerSchema(CustomerBase):
pass
class AddCustomerSchema(BaseModel):
email: str
class Config:
orm_mode = True
\ No newline at end of file
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from fastapi import Request
SQLALCHEMY_DATABASE_URL = "sqlite:///./dev.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db(request: Request):
return request.state.db
from fastapi import HTTPException, Request, status
from fastapi.responses import JSONResponse
class NotFoundException(Exception):
def __init__(self, detail: str):
self.detail = detail
class ConditionException(Exception):
def __init__(self, detail: str):
self.detail = detail
def not_found_exception_handler(request: Request, exc: NotFoundException):
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={"message": exc.detail},
)
def condition_exception_handler(request: Request, exc: ConditionException):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"message": exc.detail},
)
from . import routes
\ No newline at end of file
from typing import List, Optional
from sqlalchemy.orm import Session
from ..exceptions import ConditionException, NotFoundException
from ..database import engine
from .models import Item
from .schemas import AddItemSchema
def add_item(item: AddItemSchema) -> Optional[Item]:
with Session(engine) as db:
if item.price < 100:
raise ConditionException(detail="Price must be greater or equal than 100")
db_item: Item = Item(**item.dict())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
def delete_item(item_id: int) -> Optional[Item]:
with Session(engine) as db:
item: Optional[Item] = db.query(
Item).filter(Item.id == item_id).first()
if not item:
raise NotFoundException("Item not found")
db.delete(item)
db.commit()
return item
def get_item(item_id: int) -> Optional[Item]:
with Session(engine) as db:
item: Optional[Item] = db.query(
Item).filter(Item.id == item_id).first()
if not item:
raise NotFoundException("Item not found")
return item
def get_items(offset: int, limit: int) -> List[Item]:
with Session(engine) as db:
return db.query(Item).offset(offset).limit(limit).all()
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from ..database import Base
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
price = Column(Integer, default=0)
purchased_items = relationship("PurchasedItem")
class PurchasedItem(Base):
__tablename__ = "purchased_items"
id = Column(Integer, primary_key=True, index=True)
amount = Column(Integer, nullable=False)
payment_id = Column(Integer, ForeignKey("payments.id"))
payment = relationship("Payment", back_populates="purchased_items")
customer_id = Column(String, ForeignKey("customers.id"))
customer = relationship("Customer", back_populates="purchased_items")
item_id = Column(Integer, ForeignKey("items.id"))
item = relationship("Item", back_populates="purchased_items")
from typing import List
from fastapi import APIRouter, Query
from .controllers import add_item, delete_item, get_item, get_items
from .schemas import AddItemSchema, ItemSchema
item_router = APIRouter(
prefix="/items",
tags=["Items"],
responses={
400: {"description": "Bad Request"},
404: {"description": "Not found"},
}
)
@item_router.post("/", response_model=ItemSchema)
def create_item(
item: AddItemSchema,
):
return add_item(item)
@item_router.get("/{item_id}", response_model=ItemSchema)
def read_item(
item_id: int,
):
return get_item(item_id)
@item_router.get("/", response_model=List[ItemSchema])
def read_items(
offset: int = 0,
limit: int = Query(default=100, lte=100),
):
return get_items(offset, limit)
@item_router.delete("/", response_model=ItemSchema)
def remove_item(
item_id: int,
):
return delete_item(item_id)
from typing import Optional
from pydantic import BaseModel
class ItemBase(BaseModel):
name: str
price: int
class Config:
orm_mode = True
class ItemSchema(ItemBase):
id: int
class Config:
orm_mode = True
class AddItemSchema(ItemBase):
pass
class RemoveItemSchema(BaseModel):
id: int
class Config:
orm_mode = True
class PendingItem(BaseModel):
id: int
amount: int
class Config:
orm_mode = True
class PurchasedItem(BaseModel):
item: Optional[ItemSchema]
amount: int
class Config:
orm_mode = True
from . import routes
\ No newline at end of file
from typing import List
from sqlalchemy.orm import Session
from ..database import engine, get_db
from .models import Payment
def get_payments(offset: int, limit: int) -> List[Payment]:
with Session(engine) as db:
return db.query(Payment).offset(offset).limit(limit).all()
def get_payments_by_customer(customer_id: str, offset: int, limit: int) -> List[Payment]:
with Session(engine) as db:
return db.query(Payment).filter(Payment.customer_id == customer_id).offset(offset).limit(limit).all()
from sqlalchemy import Boolean, Column, ForeignKey, String, DateTime
from sqlalchemy.orm import relationship
from ..database import Base
class Payment(Base):
__tablename__ = "payments"
id = Column(String, primary_key=True, autoincrement=False, index=True)
is_checked = Column(Boolean, default=False)
checkout_date = Column(DateTime, nullable=True)
customer_id = Column(String, ForeignKey("customers.id"))
customer = relationship("Customer", back_populates="payments")
purchased_items = relationship("PurchasedItem")
\ No newline at end of file
import datetime
import os
from typing import List, Optional
import stripe
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from ..customers.controllers import get_customer
from ..customers.models import Customer
from ..database import get_db
from ..exceptions import ConditionException, NotFoundException
from ..items.models import Item, PurchasedItem
from .controllers import get_payments_by_customer
from .models import Payment
from .schemas import (PaymentCheckSchema, PaymentCreateSchema, PaymentSchema,
PaymentSheetSchema)
payments_router = APIRouter(
prefix="/payments",
tags=["Payments"],
responses={
400: {"description": "Bad Request"},
404: {"description": "Not found"},
}
)
@payments_router.get('/', response_model=List[PaymentSchema])
def get_payments(
offset: int = 0,
limit: int = Query(default=100, lte=100),
db: Session = Depends(get_db)
):
# ERROR: RecursionError: maximum recursion depth exceeded
# return get_payments(offset, limit)
return db.query(Payment).offset(offset).limit(limit).all()
@payments_router.get('/{customer_id}', response_model=List[PaymentSchema])
def get_payments_by_customer_id(
customer_id: str,
offset: int = 0,
limit: int = Query(default=100, lte=100),
db: Session = Depends(get_db)
):
# ERROR: Parent instance <Payment at 0x7f88a9185f90> is not bound to
# a Session; lazy load operation of attribute 'customer' cannot
# proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
# return get_payments_by_customer(customer_id, offset, limit)
return db.query(Payment).filter(Payment.customer_id == customer_id).offset(offset).limit(limit).all()
@payments_router.post('/', response_model=PaymentSheetSchema)
def create_sheet(
payment_sheet: PaymentCreateSchema,
db: Session = Depends(get_db)
):
customer: Optional[Customer] = get_customer(payment_sheet.customer_id)
ephemeral_key = stripe.EphemeralKey.create(
customer=customer.id,
stripe_version='2022-08-01',
)
customer_stripe = stripe.Customer.retrieve(
customer.id
)
pending_items: dict = {pi.id: pi.amount for pi in payment_sheet.pending_items}
items_id: List[int] = [pi.id for pi in payment_sheet.pending_items]
items: List[Item] = db.query(Item).filter(Item.id.in_(items_id)).all()
if len(items) != len(payment_sheet.pending_items):
raise NotFoundException(detail="Item not found.")
price: int = sum([pending_items[i.id] * i.price for i in items])
payment_intent = stripe.PaymentIntent.create(
amount=price,
currency='eur',
customer=customer_stripe,
automatic_payment_methods={
'enabled': True,
},
)
payment: Payment = Payment(
id=payment_intent.id,
customer_id=payment_sheet.customer_id
)
db.add(payment)
db.commit()
db.refresh(payment)
purchased_items: List[PurchasedItem] = []
for key, value in pending_items.items():
purchased_item: PurchasedItem = PurchasedItem(
customer_id=payment_sheet.customer_id,
amount=value,
item_id=key,
payment_id=payment.id
)
purchased_items.append(purchased_item)
db.add(purchased_item)
db.commit()
return {
"paymentIntent": payment_intent.client_secret,
"ephemeralKey": ephemeral_key.secret,
"customer": customer_stripe["id"],
"publishableKey": os.environ.get("STRIPE_PK")
}
@payments_router.post('/check/{payment_intent_id}', response_model=PaymentSchema)
def check_sheet_status_and_get_purchased_items(
payment_intent_id: str,
payment_check: PaymentCheckSchema,
db: Session = Depends(get_db)
):
payment: Optional[Payment] = db.query(Payment).filter(
Payment.customer_id == payment_check.customer_id,
Payment.id == payment_intent_id,
Payment.is_checked == False
).first()
if not payment:
raise NotFoundException(detail="Payment not found or already checked.")
payment_intent = stripe.PaymentIntent.retrieve(
payment_intent_id
)
if not payment_intent:
raise NotFoundException(detail="Payment intent not found.")
if not payment_intent.status == "succeeded":
raise ConditionException(detail="Unsuccessful payment intent.")
amount: int = payment_intent.amount_received
purchased_items: List[PurchasedItem] = db.query(PurchasedItem).filter(
PurchasedItem.payment_id == payment.id
).all()
price: int = sum([pi.amount * pi.item.price for pi in purchased_items])
# validation of the price against the amount paid
if not price == amount:
print(price, amount)
raise ConditionException(detail="Price does not match with amount paid.")
# payment validation to avoid fraud
payment.is_checked = True
payment.checkout_date = datetime.datetime.now()
db.add(payment)
db.commit()
db.refresh(payment)
return payment
from datetime import datetime
from typing import List, Optional, Union
from pydantic import BaseModel
from ..items.schemas import PendingItem, PurchasedItem
from ..customers.schemas import CustomerSchema
class PaymentSheetSchema(BaseModel):
paymentIntent: str
ephemeralKey: str
customer: str
publishableKey: str
class Config:
orm_mode = True
class PaymentCheckSchema(BaseModel):
customer_id: str
class Config:
orm_mode = True
class PaymentCreateSchema(BaseModel):
pending_items: List[PendingItem]
customer_id: str
class Config:
orm_mode = True
class PaymentSchema(BaseModel):
id: str
checkout_date: Union[datetime, None]
is_checked: bool
customer: Optional[CustomerSchema]
purchased_items: List[PurchasedItem]
class Config:
orm_mode = True
\ No newline at end of file
from fastapi import FastAPI
from src.core import create_app
app: FastAPI = create_app()