Flask + PyJWT 实现基于Json Web Token的用户认证授权

前言

这注定又将是一篇长文,接触到的第一个关于python的框架flask,本文将从最开始的安装flask,到与mysql结合,最后到用pyjwt和flask-jwt和restful写一个带有token的接口。

技术栈简介

Flask

是一个使用Python编写的轻量级Web应用框架。基于Werkzeug WSGI(PythonWeb服务器网关接口(Python Web Server Gateway Interface,缩写为WSGI)是Python应用程序或框架和Web服务器之间的一种接口,已经被广泛接受, 它已基本达成它的可移植性方面的目标)工具箱和Jinja2 模板引擎。 Flask使用BSD授权。 Flask也被称为“microframework”,因为它使用简单的核心,用extension增加其他功能。Flask没有默认使用的数据库、窗体验证工具。然而,Flask保留了扩增的弹性,可以用Flask-extension加入这些功能:ORM、窗体验证工具、文件上传、各种开放式身份验证技术

SQLAlchemy

Flask是一个微型框架,自身没有提供数据库管理,表单验证,cookie处理等功能,很多功能需要通过扩展才能实现,数据库管理就需要SQLAlchemy。SqlAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简而言之:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

The first demo

1
2
3
4
5
6
7
8
9
from flask import Flask
app = Flask(__name__)

@app.route("/")
def index():
return "hello world"

if __name__ == '__main__':
app.run()
  • 第一行
    from…import…就是导入flask中的Flask模块,不同于直接import,from…import简单来说就是帮我从车里拿瓶水,而import则是把车拿过来,所以二者在使用的时候还是有些区别的,在导入某个.py文件时,使用from…import…可以直接在该文件夹下使用import后的函数名,而如果是import,如果要调用函数必须用类.模块。

  • 第二行
    这行代码里有一个参数name,这个参数用到告诉flask你的application的名字,官方有一句话:

    1
    2
    If you are using a single module,__name__ is always the correct value.If you however are using a package, it’s usually recommended to hardcode the name of
    your package there.

    意思就是说,如果是单一的应用,用name就可以了,如果是一个应用程序包,就hardcode一个名字给这个参数。比如:

    1
    app = Flask(“myApp”)

    由于目前我们的应用都相对简单,所以统一使用name作为参数。

  • 第三行
    使用route()修饰器注明通过什么样的url可以访问我们的函数,同时在函数中返回要显示在浏览器中的信息。

  • 最后
    调用run()方法,运行flask web应用程序

    1
    2
    if __name__ == '__main__':
    app.run()

    其中if __name__=='__main__'的意思是,如果此文件是直接运行的才会执行app.run()这个方法,如果是通过import在其它py文件中调用的话是不会执行的。
    比如我们修改code.py中的hello_world方法,如下:

    1
    2
    3
    4
    5
    6
    7
    @app.route('/index')

    def hello_world():
    if __name__=='main':
    return 'Hello World!'
    else:
    return "hello my name is "+__name__

    即当namemain时还是执行原来的逻辑,返回hello world,如果不是则输出此时的名字。
    然后我们新建一个sub.py文件然后导入code.py,并且执行hello_world方法:

    1
    2
    3
    4
    5
    6
    7
    import code
    def CallCodeFun():
    result = code.hello_world()
    print(result)

    CallCodeFun()
    print(__name__)

    此时的name是Code而不是main,而此时,在sub.py中加一句print(name)可以发现sub.py中的name变成了main

    image.png

    由此我们可以得出 name 如果是 main 那么代表他是一个入口文件,直接执行的。
    tip:建文件时文件名最好不要叫code,因为python有模块名就叫code。

flask-sqlalchemy

  • 数据库配置,上demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from flask_sqlalchemy import SQLAlchemy
from flask import Flask
import configparser
# 告诉flask app的名字
app = Flask(__name__)
//通过configparser获取读取配置文件的解释器
my_config = configparser.ConfigParser()
my_config.read('db.conf')
# dialect+driver://username:password@host:port/database?charset=utf8
# 配置 sqlalchemy 数据库驱动://数据库用户名:密码@主机地址:端口/数据库?编码
# py3无法继续使用sqldb,所以采用pymysql
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://' + my_config.get('DB', 'DB_USER') + ':' + \
my_config.get('DB', 'DB_PASSWORD') + '@' + my_config.get('DB', 'DB_HOST') + '/' + \
my_config.get('DB', 'DB_DB')
# 设为True 表示每次请求结束后都会自动提交数据库的变动,但像add delete insert等仍需要commit,建议不要设为true
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True

mydb = SQLAlchemy()
mydb.init_app(app)

if __name__ == "__main__":
app.run(debug=True)
  • 为了使代码结构更清晰,这里使用另一种配置方式,即使用单独配置文件的文件来做全局Flask配置

    • 管控所有的配置文件conf.py:
    1
    2
    3
    4
    5
    6
    7
    DB_USER = 'root'
    DB_PASSWORD = 'jwyjwy9951206-=-'
    DB_HOST = 'localhost'
    DB_DB = 'flask_migrate_demo'
    DEBUG = True
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://' + DB_USER + ':' + DB_PASSWORD + '@' + DB_HOST + '/' + DB_DB
  • SQLAlchemy允许我们根据数据库的表结构来创建数据模型,反之亦可。 所以我们一般无须手动的登录到数据库中使用 SQL 语句来创建表, 我们只需把数据模型定义好了之后, 表结构也就有了

    • 新建model.py,并定义一个用户表数据模型
    1
    2
    3
    4
    5
    6
    7
    8
    from flask_sqlalchemy import SQLAlchemy
    db = SQLAlchemy()
    class User(db.Model):
    user_id = db.Column(db.Integer, primary_key=True)
    user_name = db.Column(db.String(60), nullable=False)
    user_password = db.Column(db.String(30), nullable=False)
    user_nickname = db.Column(db.String(50))
    user_email = db.Column(db.String(30), nullable=False)
    • 新建db.py,利用Flask-Script和Flask-Migrate,搭建db和app之间的桥梁
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    from flask import Flask
    from flask_script import Manager
    from flask_migrate import Migrate, MigrateCommand
    from model import db
    app = Flask(__name__)
    app.config.from_object('conf')
    migrate = Migrate(app, db)
    manager = Manager(app)
    manager.add_command('db', MigrateCommand)
    if __name__ == '__main__':
    manager.run()
    1
    2
    3
    python db.py db init
    python db.py db migrate
    python db.py db upgrade
    • 示例接口实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    # 用到flask中的Flask框架,jsonify返回json数据,request将前端字段传入后端
    from flask import Flask, jsonify, request
    # 将model中的User,db对象传入
    from model import db, User
    # 告诉flask这个app要用它
    app = Flask(__name__)
    # 读取配置文件,主要是连接mysql
    app.config.from_object('conf')
    # 初始化
    db.init_app(app)
    # 开始路由
    @app.route('/')
    def index():
    return '<h1>Hello Flask!</h1>'

    # 增
    @app.route('/user', methods=['POST'])
    def addUser():
    user_name = request.form.get('user_name')
    user_password = request.form.get('user_password')
    user_nickname = request.form.get('user_nickname')
    user_email = request.form.get('user_email')
    user = User(user_name=user_name, user_password=user_password, user_nickname=user_nickname,
    user_email=user_email)
    try:
    db.session.add(user)
    db.session.commit()
    except:
    db.session.rollback()
    db.session.flush()
    userId = user.user_id
    if (user.user_id is None):
    result = {'msg': '添加失败'}
    return jsonify(data=result)
    data = User.query.filter_by(user_id=userId).first()
    result = {'user_id': data.user_id, 'user_name': data.user_name, 'user_nickname': data.user_nickname,
    'user_email': data.user_email}
    return jsonify(data=result)
    # 查
    @app.route('/user/<int:userId>', methods=['GET'])
    def getUser(userId):
    user = User.query.filter_by(user_id=userId).first()
    if (user is None):
    result = {'msg': '找不到数据'}
    else:
    result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname,
    'user_email': user.user_email}
    return jsonify(data=result)
    # 改
    @app.route('/user/<int:userId>', methods=['PATCH'])
    def updateUser(userId):
    user_name = request.form.get('user_name')
    user_password = request.form.get('user_password')
    user_nickname = request.form.get('user_nickname')
    user_email = request.form.get('user_email')
    try:
    user = User.query.filter_by(user_id=userId).first()
    if (user is None):
    result = {'msg': '找不到要修改的记录'}
    return jsonify(data=result)
    else:
    user.user_name = user_name
    user.user_password = user_password
    user.user_nickname = user_nickname
    user.user_email = user_email
    db.session.commit()
    except:
    db.session.rollback()
    db.session.flush()
    userId = user.user_id
    data = User.query.filter_by(user_id=userId).first()
    result = {'user_id': data.user_id, 'user_name': data.user_name, 'user_password': data.user_password, 'user_nickname': data.user_nickname, 'user_email': data.user_email}
    return jsonify(data=result)
    # 查全部
    @app.route('/user', methods=['GET'])
    def getUsers():
    data = User.query.all()
    data_all = []
    for user in data:
    data_all.append({'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email})
    return jsonify(users=data_all)
    # 删
    @app.route('/user/<int:userId>', methods=['DELETE'])
    def deleteUser(userId):
    # 删除数据
    User.query.filter_by(user_id=userId).delete()
    db.session.commit()
    return getUsers()

    if __name__ == '__main__':
    app.run(debug=app.config['DEBUG'])

使用Flask-RESTful快速创建RESTful API接口

先容我把项目做完…待续…….
无聊看看文档:flask-restful中文官方文档

使用Flask + PyJWT 实现基于Json Web Token的用户认证授权

待更…无聊看看文档:Flask + PyJWT 实现基于Json Web Token的用户认证授权

问题

1.主要问题就是flask和mysql的连接问题,出现的最棘手的问题首先是mysqldb是py2的,py3没有,用pymysql代替,在SQLALCHEMY_DATABASE_URI注意修改driver驱动。
2.最终修改方案是自己用一个demo建好了数据库和表(记得保存好demo),demo如下,然后再运行就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from sqlalchemy import create_engine,Table,Column,Integer,String,MetaData,ForeignKey
engine=create_engine("mysql+pymysql://root:a5230411@localhost:3306/test",echo=True)
metadata=MetaData(engine)

user=Table('user',metadata,
Column('id',Integer,primary_key=True),
Column('name',String(20)),
Column('fullname',String(40)),
)
address_table = Table('address', metadata,
Column('id', Integer, primary_key=True),
Column('user_id', None, ForeignKey('user.id')),
Column('email', String(128), nullable=False)
)

metadata.create_all()

3.解决 sqlalchemy 报错:(1193, “Unknown system variable ‘tx_isolation’”): 传送门

参考网址

Thank you for your accept. mua!
-------------本文结束感谢您的阅读-------------