FastAPI操作database

简单记录一下使用FastAPI完成对数据库的CRUD操作。在参考文档的基础上,增加了U、D部分,全部代码可以参考Githubhttps://fastapi.tiangolo.com/tutorial/sql-databases/1. Installationpip install fastapipip install uvicornpip install sqlalchemy2. File Structure

1581931091044.jpg其中:crud.py 完成对数据库的CRUD操作database.py 关于数据库相关配置main.py fastapi路由部分models.py 定义表结构schemas.py 定义pydantic models信息,也就是schemasinit.py3. Details3.1 database.py首先需要定义数据库部分,作为demo,使用sqlite作为我们的数据库。这部分都是常规配置操作。from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmakerSQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"engine = create_engine( SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)Base = declarative_base()3.2 models.py第二步需要定义我们的表结构,这里定义了User表存储用户数据,item表用于存储物品数据,两者关系是一对多。from sqlalchemy import Boolean, Column, ForeignKey, Integer, Stringfrom sqlalchemy.orm import relationshipfrom .database import Baseclass User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) email = Column(String, unique=True, index=True) hashed_password = Column(String) is_active = Column(Boolean, default=True) items = relationship("Item", back_populates="owner") def to_dict(self): return {c.name: getattr(self, c.name) for c in self.__table__.columns}class Item(Base): __tablename__ = "items" id = Column(Integer, primary_key=True, index=True) title = Column(String, index=True) description = Column(String, index=True) owner_id = Column(Integer, ForeignKey("users.id")) owner = relationship("User", back_populates="items") def to_dict(self): return {c.name: getattr(self, c.name) for c in self.__table__.columns}3.3 schemas.py第三步需要定义fastapi中schemas信息,后续路由、CRUD时都需要使用。由于增删改查需要不同的schema,所以官网的最佳实践一般都是通过继承解决。from typing import Listfrom pydantic import BaseModelclass ItemBase(BaseModel): title: str description: str = Noneclass ItemCreate(ItemBase): passclass ItemUpdate(ItemBase): passclass Item(ItemBase): id: int owner_id: int class Config: orm_mode = Trueclass UserBase(BaseModel): email: strclass UserCreate(UserBase): password: strclass UserUpdate(UserBase): is_active: boolclass User(UserBase): id: int is_active: bool items: List[Item] = [] class Config: orm_mode = True3.4 crud.py接下来就是实际完成CRUD,官网上没有对update、delete做实例展示,这里补充一下。from sqlalchemy.orm import Sessionfrom . import models, schemasdef get_user(db: Session, user_id: int): return db.query(models.User).filter(models.User.id == user_id).first()def get_user_by_email(db: Session, email: str): return db.query(models.User).filter(models.User.email == email).first()def get_users(db: Session, skip: int = 0, limit: int = 100): return db.query(models.User).offset(skip).limit(limit).all()def create_user(db: Session, user: schemas.UserCreate): fake_hashed_password = user.password + "notreallyhashed" db_user = models.User(email=user.email, hashed_password=fake_hashed_password) db.add(db_user) db.commit() db.refresh(db_user) return db_userdef update_user(db: Session, user_id: int, update_user: schemas.UserUpdate): db_user = db.query(models.User).filter(models.User.id == user_id).first() if db_user: update_dict = update_user.dict(exclude_unset=True) for k, v in update_dict.items(): setattr(db_user, k, v) db.commit() db.flush() db.refresh(db_user) return db_userdef delete_user(db: Session, user_id: int): db_user = db.query(models.User).filter(models.User.id == user_id).first() if db_user: db.delete(db_user) db.commit() db.flush() return db_userdef get_items(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Item).offset(skip).limit(limit).all()def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int): db_item = models.Item(**item.dict(), owner_id=user_id) db.add(db_item) db.commit() db.refresh(db_item) return db_itemdef relate_user_item(db: Session, user_id: int, item_id: int): db_item = db.query(models.Item).filter(models.Item.id == item_id).first() if db_item: db_item.owner_id = user_id db.commit() db.flush() return db.query(models.User).filter(models.User.id == user_id).first()def update_item(db: Session, item_id: int, update_item: schemas.ItemUpdate): db_item = db.query(models.Item).filter(models.Item.id == item_id).first() if db_item: update_dict = update_item.dict(exclude_unset=True) for k, v in update_dict.items(): setattr(db_item, k, v) db.commit() db.flush() db.refresh(db_item) return db_itemdef delete_item(db: Session, item_id: int): db_item = db.query(models.Item).filter(models.Item.id == item_id).first() if db_item: db.delete(db_item) db.commit() db.flush() return db_item3.5 main.py最后完成路由的配置,基本都是按照官网上来的,只是稍微补充了一些删和改的内容,为了让这个demo更加完善。from typing import Listfrom fastapi import Depends, FastAPI, HTTPExceptionfrom sqlalchemy.orm import Sessionfrom . import crud, models, schemasfrom .database import SessionLocal, enginemodels.Base.metadata.create_all(bind=engine)app = FastAPI()# Dependencydef get_db(): try: db = SessionLocal() yield db finally: db.close()@app.post("/users/", response_model=schemas.User)def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): db_user = crud.get_user_by_email(db, email=user.email) if db_user: raise HTTPException(status_code=400, detail="Email already registered") return crud.create_user(db=db, user=user)@app.get("/users/", response_model=List[schemas.User])def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): users = crud.get_users(db, skip=skip, limit=limit) return users@app.get("/users/{user_id}", response_model=schemas.User)def read_user(user_id: int, db: Session = Depends(get_db)): db_user = crud.get_user(db, user_id=user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user@app.delete('/users/{user_id}', response_model=schemas.User)def delete_user(user_id: int, db: Session = Depends(get_db)): db_user = crud.delete_user(db, user_id=user_id) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user@app.put("/users/{user_id}", response_model=schemas.User)def update_user(user_id: int, update_user: schemas.UserUpdate, db: Session = Depends(get_db)): updated_user = crud.update_user(db, user_id, update_user) if updated_user is None: raise HTTPException(status_code=404, detail="User not found") return updated_user@app.post("/users/{user_id}/items/", response_model=schemas.Item)def create_item_for_user( user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)): return crud.create_user_item(db=db, item=item, user_id=user_id)@app.get("/items/", response_model=List[schemas.Item])def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): items = crud.get_items(db, skip=skip, limit=limit) return items@app.put("/items/{user_id}/{item_id}/", response_model=schemas.User)def relate_user_item(user_id: int, item_id: int, db: Session = Depends(get_db)): user = crud.relate_user_item(db=db, item_id=item_id, user_id=user_id) return user@app.put("/items/{item_id}", response_model=schemas.Item)def update_item(item_id: int, update_item: schemas.ItemUpdate, db: Session = Depends(get_db)): updated_item = crud.update_item(db, item_id, update_item) if updated_item is None: raise HTTPException(status_code=404, detail="Item not found") return updated_item@app.delete('/items/{item_id}', response_model=schemas.Item)def delete_item(item_id: int, db: Session = Depends(get_db)): db_item = crud.delete_item(db, item_id=item_id) if db_item is None: raise HTTPException(status_code=404, detail="User not found") return db_item4. Tests在文件路径下执行,--reload当代码有修改时,可以自动加载。uvicorn sql_app.main:app --reload可以通过postman发送请求做测试,这里贴图一张。

1581943691192.jpg最后fastapi还可以自动生成文档,可以访问http://127.0.0.1:8000/docs查阅。

1581943651716.jpg

(0)

相关推荐