亲宝软件园·资讯

展开

Flask 偏函数、g对象、flask-session、数据库连接池、信号、自制命令、flask-admin

Jeff的技术栈 人气:0
[TOC] # 一、偏函数 当函数的参数个数太多,需要简化时,使用`functools.partial`可以创建一个新的函数,这个新函数可以固定住原函数的部分参数,从而在调用时更简单。 ```python from functools import partial def func(a1,a2,a3): print(a1,a2,a3) new_func1 = partial(func,a1=1,a2=2) new_func1(a3=3) # 额外传值 new_func2 = partial(func,1,2) new_func2(3) new_func3 = partial(func,a1=1) new_func3(a2=2,a3=3) ``` # 二、g对象 专门用来存储用户信息的g对象,g的全称的为global g对象在一次请求中的所有的代码的地方,都是可以使用的 作用:防止和request内部的属性混乱,比如不知道request里面有没有name属性,可以直接g.name设值。只对这个请求设值 ```python from flask import Flask, g app = Flask(__name__) @app.before_request def be(): g.name = 'jeff' print('befor1') @app.route('/') def index(): print(g.name) return 'ok' @app.route('/login') def login(): print(g.name) return 'ok' if __name__ == '__main__': app.run() ``` ## g对象和session的区别 ``` session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次 ``` # 三、flask-session 作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy 安装:pip3 install flask-session 使用1: ``` from flask import Flask,session from flask_session import RedisSessionInterface import redis app = Flask(__name__) conn=redis.Redis(host='127.0.0.1',port=6379) #use_signer是否对key签名 app.session_interface=RedisSessionInterface(conn,key_prefix='lqz') @app.route('/') def hello_world(): session['name']='lqz' return 'Hello World!' if __name__ == '__main__': app.run() ``` 使用2: ``` from redis import Redis from flask.ext.session import Session app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379') Session(app) ``` 问题:设置cookie时,如何设定关闭浏览器则cookie失效。 ``` response.set_cookie('k','v',exipre=None)#这样设置即可 #在session中设置 app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False) #一般不用,我们一般都设置超时时间,多长时间后失效 ``` 问题:cookie默认超时时间是多少?如何设置超时时间 ``` #源码expires = self.get_expiration_time(app, session) 'PERMANENT_SESSION_LIFETIME': timedelta(days=31),#这个配置文件控制 ``` # 四、数据库连接池 ## pymsql链接数据库 ``` import pymysql conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='s8day127db') cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # cursor.execute("select id,name from users where name=%s and pwd=%s",['lqz','123',]) cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s",{'user':'lqz','pwd':'123'}) obj = cursor.fetchone() conn.commit() cursor.close() conn.close() print(obj) ``` ## 数据库连接池版 setting.py ``` from datetime import timedelta from redis import Redis import pymysql from DBUtils.PooledDB import PooledDB, SharedDBConnection class Config(object): DEBUG = True SECRET_KEY = "umsuldfsdflskjdf" PERMANENT_SESSION_LIFETIME = timedelta(minutes=20) SESSION_REFRESH_EACH_REQUEST= True SESSION_TYPE = "redis" PYMYSQL_POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123456', database='s8day127db', charset='utf8' ) class ProductionConfig(Config): SESSION_REDIS = Redis(host='192.168.0.94', port='6379') class DevelopmentConfig(Config): SESSION_REDIS = Redis(host='127.0.0.1', port='6379') class TestingConfig(Config): pass ``` ## utils/sql.py ``` import pymysql from settings import Config class SQLHelper(object): @staticmethod def open(cursor): POOL = Config.PYMYSQL_POOL conn = POOL.connection() cursor = conn.cursor(cursor=cursor) return conn,cursor @staticmethod def close(conn,cursor): conn.commit() cursor.close() conn.close() @classmethod def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor): conn,cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchone() cls.close(conn,cursor) return obj @classmethod def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor): conn, cursor = cls.open(cursor) cursor.execute(sql, args) obj = cursor.fetchall() cls.close(conn, cursor) return obj ``` # 五、信号 Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为 安装:`pip3 install blinker` 内置信号: ```python request_started = _signals.signal('request-started') # 请求到来前执行 request_finished = _signals.signal('request-finished') # 请求结束后执行 before_render_template = _signals.signal('before-render-template') # 模板渲染前执行 template_rendered = _signals.signal('template-rendered') # 模板渲染后执行 got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行 request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否) appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否) appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行 appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行 message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发 ``` 使用信号: ```python from flask import Flask,signals,render_template app = Flask(__name__) # 往信号中注册函数 def func(*args,**kwargs): print('触发型号',args,kwargs) signals.request_started.connect(func) # 触发信号: signals.request_started.send() @app.before_first_request def before_first1(*args,**kwargs): pass @app.before_first_request def before_first2(*args,**kwargs): pass @app.before_request def before_first3(*args,**kwargs): pass @app.route('/',methods=['GET',"POST"]) def index(): print('视图') return render_template('index.html') if __name__ == '__main__': app.wsgi_app app.run() ``` 一个流程中的信号触发点(了解) ```python a. before_first_request b. 触发 request_started 信号 c. before_request d. 模板渲染 渲染前的信号 before_render_template.send(app, template=template, context=context) rv = template.render(context) # 模板渲染 渲染后的信号 template_rendered.send(app, template=template, context=context) e. after_request f. session.save_session() g. 触发 request_finished信号 如果上述过程出错: 触发错误处理信号 got_request_exception.send(self, exception=e) h. 触发信号 request_tearing_down ``` 自定义信号(了解): ```python from flask import Flask, current_app, flash, render_template from flask.signals import _signals app = Flask(import_name=__name__) # 自定义信号 xxxxx = _signals.signal('xxxxx') def func(sender, *args, **kwargs): print(sender) # 自定义信号中注册函数 xxxxx.connect(func) @app.route("/x") def index(): # 触发信号 xxxxx.send('123123', k1='v1') return 'Index' if __name__ == '__main__': app.run() ``` # 六、命令flask-script 用于实现类似于django中 python3 manage.py runserver ...类似的命令 安装:pip3 install flask-script ## 使用 ``` from flask_script import Manager app = Flask(__name__) manager=Manager(app) ... if __name__ == '__main__': manager.run() #以后在执行,直接:python3 manage.py runserver #python3 manage.py runserver --help ``` ## 自定制命令 ``` @manager.command def custom(arg): """ 自定义命令 python manage.py custom 123 :param arg: :return: """ print(arg) @manager.option('-n', '--name', dest='name') #@manager.option('-u', '--url', dest='url') def cmd(name, url): """ 自定义命令(-n也可以写成--name) 执行: python manage.py cmd -n lqz -u http://www.oldboyedu.com 执行: python manage.py cmd --name lqz --url http://www.oldboyedu.com :param name: :param url: :return: """ print(name, url) #有什么用? #把excel的数据导入数据库,定制个命令,去执行 ``` # 七、flask-admin ## 安装 ``` pip3 install flask_admin ``` ## 简单使用 ``` from flask import Flask from flask_admin import Admin app = Flask(__name__) #将app注册到adminzhong admin = Admin(app) if __name__=="mian": app.run() #访问 #127.0.0.1:5000/admin端口,会得到一个空白的页面 ``` ## 将表模型注册到admin中 ``` #在将表注册之前应该对app进行配置 SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:@127.0.0.1:3307/py9api?charset=utf8mb4" SQLALCHEMY_POOL_SIZE = 5 SQLALCHEMY_POOL_TIMEOUT = 30 SQLALCHEMY_POOL_RECYCLE = -1 #导入models文件的中的表模型 from flask_admin.contrib.sqla import ModelView from api.models import Stock,Product,Images,Category,Wxuser,Banner admin.add_view(ModelView(Stock, db.session)) admin.add_view(ModelView(Product, db.session)) admin.add_view(ModelView(Category, db.session)) ``` ## 如果有个字段是图片指端 ``` #配置上传文件的路径 #导入from flask_admin.contrib.fileadmin import FileAdmin from flask_admin.contrib.fileadmin import FileAdmin,form file_path = op.join(op.dirname(__file__), 'static') admin = Admin(app) admin.add_view(FileAdmin(file_path, '/static/', name='文件')) #如果有个字段要是上传文件重写该方法的modleView类,假设imgae_url是文件图片的字段 class ImagesView(ModelView): form_extra_fields = { 'image_url': form.ImageUploadField('Image', base_path=file_path, relative_path='uploadFile/' ) } admin.add_view(ImagesView(Images, db.session)) ```

加载全部内容

相关教程
猜你喜欢
用户评论