亲宝软件园·资讯

展开

Python Flask

XWenXiang 人气:0

1. 数据库连接池

使用 pymsql 链接数据库

导入:pip3 install dbutils

pool.py 创建数据库连接池

from dbutils.pooled_db import PooledDB
import pymysql
POOL = PooledDB(
    creator=pymysql,  	# 使用链接数据库的模块
    maxconnections=6,  	# 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  		# 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  		# 链接池中最多闲置的链接,0和None不限制
    maxshared=3,		# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  	# 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  	# 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  	# 开始会话前执行的命令列表。
    ping=0,				# ping MySQL服务端,检查是否服务可用。
    host='127.0.0.1',
    port=3306,
    user='???',
    password='???',
    database='???',
    charset='utf8'
)

使用连接池

from flask import Flask
# 导入类对象
from pool import POOL
import pymysql
app = Flask(__name__)
app.debug = True
# 定义在外面,全局conn,cursor会出现数据错乱,正常情况应该放在视图函数内部建立链接,但是会影响
# 性能,所以需要建立数据库连接池,限制连接的数量
# conn = pymysql.connect(
# 	host='127.0.0.1', 
# 	port=3306, 
# 	database='数据库', 
# 	user='用户', 
# 	password='密码'
# )
# cursor = conn.cursor()
@app.route('/user')
def user():
    # 从池中拿链接,创建出cursor对象
    conn = POOL.connection()
    cursor = conn.cursor()
    cursor.execute('select * from index_user')
    print(cursor.fetchall())
    # 并未关闭连接,只是将连接重新放回池子
    conn.close()
    return 'hello'
@app.route('/username')
def username():
	# 该方式对数据库的连接会一直增加,有可能造成数据压力过大
    conn = pymysql.connect(
    	host='127.0.0.1', 
    	port=3306, 
    	database='???', 
    	user='???', 
    	password='???')
    cursor = conn.cursor()
    cursor.execute('select username from index_user')
    print(cursor.fetchall())
    return 'hello'
if __name__ == '__main__':
    app.run()

2. wtfroms

其类似于 django 的 forms 组件,不过在 flask 中使用第三方 wtfroms 完成 forms 的功能。

安装:pip3 install wtforms

from flask import Flask,request,render_template
app = Flask(__name__)
from wtforms.fields import core
from wtforms import Form
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
class LoginForm(Form):
    # 字段(内部包含正则表达式)
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired(message='用户名不能为空.'),
            validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
        ],
        widget=widgets.TextInput(),  # 页面上显示的插件
        render_kw={'class': 'form-control'}
    )
    # 字段(内部包含正则表达式)
    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.'),
            validators.Length(min=8, message='用户名长度必须大于%(min)d'),
            validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                              message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )
class RegisterForm(Form):
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'},
        default='alex'
    )
    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.')
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )
    pwd_confirm = simple.PasswordField(
        label='重复密码',
        validators=[
            validators.DataRequired(message='重复密码不能为空.'),
            validators.EqualTo('pwd', message="两次密码输入不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )
    email = html5.EmailField(
        label='邮箱',
        validators=[
            validators.DataRequired(message='邮箱不能为空.'),
            validators.Email(message='邮箱格式错误')
        ],
        widget=widgets.TextInput(input_type='email'),
        render_kw={'class': 'form-control'}
    )
    gender = core.RadioField(
        label='性别',
        choices=(
            (1, '男'),
            (2, '女'),
        ),
        coerce=int # “1” “2”
     )
    city = core.SelectField(
        label='城市',
        choices=(
            ('bj', '北京'),
            ('sh', '上海'),
        )
    )
    hobby = core.SelectMultipleField(
        label='爱好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        coerce=int
    )
    favor = core.SelectMultipleField(
        label='喜好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]
    )
    def __init__(self, *args, **kwargs):
        super(RegisterForm, self).__init__(*args, **kwargs)
        self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
    def validate_pwd_confirm(self, field):
        """
        自定义pwd_confirm字段规则,例:与pwd字段是否一致
        :param field:
        :return:
        """
        # 最开始初始化时,self.data中已经有所有的值
        if field.data != self.data['pwd']:
            # raise validators.ValidationError("密码不一致") # 继续后续验证
            raise validators.StopValidation("密码不一致")  # 不再继续后续验证
@app.route('/login',methods=['GET','POST'])
def login():
    if request.method == 'GET':
        form = LoginForm()
        return render_template('login.html', form=form)
    else:
        form = LoginForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('login.html', form=form)
@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'GET':
        form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
        return render_template('register.html', form=form)
    else:
        form = RegisterForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('register.html', form=form)
if __name__ == '__main__':
    app.run()

login.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
    <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>

    <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
    <input type="submit" value="提交">
</form>
</body>
</html>

register.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0  50px">
    {% for field in form %}
    <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
    {% endfor %}
    <input type="submit" value="提交">
</form>
</body>
</html>

3. 信号

Flask 信号,是由 signal 翻译过来的。

Flask框架中的信号基于 blinker,其主要就是让开发者可是在 Flask 请求过程中定制一些用户行为。

在某种情况下会触发某个函数执行( aop理念,面向切面编程的理念 ),例如在用户注册后统计用户的数量

安装:pip3 install blinker

3.1 内置信号

# 请求到来前执行
request_started = _signals.signal('request-started') 

# 请求结束后执行           
request_finished = _signals.signal('request-finished')              

# 模板渲染前执行 
before_render_template = _signals.signal('before-render-template')  

# 模板渲染后执行
template_rendered = _signals.signal('template-rendered')            

# 请求执行出现异常时执行
got_request_exception = _signals.signal('got-request-exception')    

# 请求执行完毕后自动执行(无论成功与否) 
request_tearing_down = _signals.signal('request-tearing-down')

# 应用上下文执行完毕后自动执行(无论成功与否)      
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')

# 应用上下文push时执行 
appcontext_pushed = _signals.signal('appcontext-pushed')

# 应用上下文pop时执行            
appcontext_popped = _signals.signal('appcontext-popped')   

# 调用flask在其中添加数据时,自动触发        
message_flashed = _signals.signal('message-flashed')               

3.2 使用信号

from flask import Flask, render_template
from flask import signals
app = Flask(__name__)
app.debug = True
# 第一步:编写需要执行的函数
def render_before(*args, **kwargs):
    print(args)
    print(kwargs)
    print('模板渲染前执行')
# 第二步:绑定信号
signals.before_render_template.connect(render_before)	# 模板渲染前执行
# 第三步:自动触发信号
@app.route('/home')
def home():
    return render_template('home.html')
if __name__ == '__main__':
    app.run()

3.3 自定义信号

from flask import Flask
from flask.signals import _signals
app = Flask(import_name=__name__)
# 第一步:自定义信号
x = _signals.signal('x')
# 第二步:编写函数
def func(sender, *args, **kwargs):
    print(sender)
    print(args)
    print(kwargs)
# 第三步:自定义信号中注册函数
x.connect(func)
@app.route("/index")
def index():
    # 第四步:触发信号
    x.send('123123', k1='v1')
    return 'Index'
if __name__ == '__main__':
    app.run()

4. 多app应用

在一个 flask 项目,可以支持多个 app 对象。

from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from werkzeug.serving import run_simple
# 创建多个 app
app1 = Flask('app01')
app2 = Flask('app02')
# 第一个 app 的根路径
@app1.route('/')
def home():
    return "app1"
# 第二个 app 的根路径
@app2.route('/')
def sec():
    return "app2"
# 访问 /sec 才是访问第二个 app 的根路径
dm = DispatcherMiddleware(app1, {
    '/sec': app2,
})
if __name__ == "__main__":
    # 请求来了,会执行 dm(),触发DispatcherMiddleware的__call__
    run_simple('localhost', 5000, dm)

5. flask-script

原本启动项目右键运行即可,若要以命令的方式,类似于 django 的 python3 manage.py runserver 命令可以使用 flask-script 实现。

安装:pip3 install flask-script

5.1 快速使用

from flask_script import Manager
app = Flask(__name__)
manager=Manager(app)
if __name__ == '__main__':
    manager.run()
# 以后在执行,直接:python3 manage.py runserver
# python3 manage.py runserver --help

有可能会出现如下错误,由于 flask 的版本太高只需要将 flask 的版本降低即可

Traceback (most recent call last):
  File "manage.py", line 3, in <module>
    from flask_script import Manager
  File "D:\python36\lib\site-packages\flask_script\__init__.py", line 15, in <module>
    from flask._compat import text_type
ModuleNotFoundError: No module named 'flask._compat'

5.2 自定制命令

可以编一个功能,通过execl,把execl的数据存同步到某个数据表中得命令

from flask import Flask
from flask_script import Manager
app = Flask('__name__')
# 基本使用,多了runserver命令
manager = Manager(app)
@manager.command
def dbinit():
    """
    python manage.py dbinit
    """
    print("数据库初始化完成")
@manager.option('-n', '--name', dest='name')
@manager.option('-a', '--age', dest='age')
def user(name, age):
    """
    自定义命令(-n也可以写成--name)
    执行: python manage.py  user -n xwx -a 12
    执行: python manage.py  user --name xwx --age 12
    """
    print(name, age)
@app.route('/index')
def index():
    return "index"
if __name__ == "__main__":
    manager.run()

加载全部内容

相关教程
猜你喜欢
用户评论