博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
网站搭建笔记精简版---廖雪峰WebApp实战-Day5:编写Web框架笔记
阅读量:4165 次
发布时间:2019-05-26

本文共 13616 字,大约阅读时间需要 45 分钟。

网站搭建笔记精简版-廖雪峰教程学习@[三川水祭]

仅作学习交流使用,将来的你会感谢现在拼命努力的自己!!!

本文首先对web框架进行了代码上的解释,之后对编辑middleware部分进行了代码的分析,最后讲述了如何测试从开始到现在所有代码的流程。其中代码的原版参照,在此基础上对handlers.py文件增加了部分内容以便进行测试。

web框架定义

在编写过程中,由于aiohttp太过于底层,因此自己定义一个web框架,以实现自动化URL信息提取与函数的注册,增加的文件为coroweb.py,如下代码

#!/usr/bin/env python3# -*- coding: utf-8 -*-# 导入异步工具包import asyncio, os, inspect, logging, functools# 导入网页处理工具包from urllib import parse# 导入底层web框架from aiohttp import webfrom apis import APIError# 将函数映射为URL处理函数,使得get函数附带URL信息def get(path):    '''    Define decorator @get('/path')    '''    def decorator(func):        @functools.wraps(func)        def wrapper(*args, **kw):            return func(*args, **kw)        wrapper.__method__ = 'GET'        wrapper.__route__ = path        return wrapper    return decorator# 将函数映射为URL处理函数,使得post函数附带URL信息def post(path):    '''    Define decorator @post('/path')    '''    def decorator(func):        @functools.wraps(func)        def wrapper(*args, **kw):            return func(*args, **kw)        wrapper.__method__ = 'POST' # 存储方法信息        wrapper.__route__ = path # 存储路径信息        return wrapper    return decorator# 运用inspect模块,创建几个函数用以获取URL处理函数与request参数之间的关系def get_required_kw_args(fn): # 收集没有默认值的命名关键字参数    args = []    params = inspect.signature(fn).parameters # inspect模块是用来分析模块,函数    for name, param in params.items():        if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty:            args.append(name)    return tuple(args)def get_named_kw_args(fn): # 获取命名关键字参数    args = []    params = inspect.signature(fn).parameters    for name, param in params.items():        if param.kind == inspect.Parameter.KEYWORD_ONLY:            args.append(name)    return tuple(args)def has_named_kw_args(fn): # 判断有没有命名关键字参数    params = inspect.signature(fn).parameters    for name, param in params.items():        if param.kind == inspect.Parameter.KEYWORD_ONLY:            return Truedef has_var_kw_arg(fn): # 判断有没有关键字参数    params = inspect.signature(fn).parameters    for name, param in params.items():        if param.kind == inspect.Parameter.VAR_KEYWORD:            return Truedef has_request_arg(fn): # 判断是否含有名字叫做'request'参数,且该参数是否为最后一个参数    sig = inspect.signature(fn)    params = sig.parameters    found = False    for name, param in params.items():        if name == 'request':            found = True            continue        if found and (param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD):            raise ValueError('request parameter must be the last named parameter in function: %s%s' % (fn.__name__, str(sig)))    return found# 使用RequestHandler函数封装一个URL处理函数,向request参数获取URL处理函数所需要的参数class RequestHandler(object):    def __init__(self, app, fn): # 接收app参数        self._app = app        self._func = fn        self._has_request_arg = has_request_arg(fn)        self._has_var_kw_arg = has_var_kw_arg(fn)        self._has_named_kw_args = has_named_kw_args(fn)        self._named_kw_args = get_named_kw_args(fn)        self._required_kw_args = get_required_kw_args(fn)	# RequestHandler本身是一个类,由于定义了__call__方法,因此将其实例视为函数    # 该函数从request中获取必要参数,之后调用URL函数	# 最后将结果转换为web.Response对象。上述比较符合aiohttp框架	async def __call__(self, request): # 构造协程        kw = None        if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args:            if request.method == 'POST': # 判断客户端发来的方法是否是POST                if not request.content_type: # 查询有没提交数据的格式(EncType)                    return web.HTTPBadRequest(text='Missing Content-Type.')                ct = request.content_type.lower()                if ct.startswith('application/json'):                    params = await request.json() # 读取请求的body代码作为json文件                    if not isinstance(params, dict):                        return web.HTTPBadRequest(text='JSON body must be object.')                    kw = params                elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'):                    params = await request.post()                    kw = dict(**params)                else:                    return web.HTTPBadRequest('Unsupported Content-Type: %s' % request.content_type)            if request.method == 'GET': # 判断客户端发来的方法是否是GET                qs = request.query_string                if qs:                    kw = dict()                    for k, v in parse.parse_qs(qs, True).items():                        kw[k] = v[0]        if kw is None:            kw = dict(**request.match_info)        else:			# 当函数参数没有关键字参数时,移去request除命名关键字参数外所有的参数信息            if not self._has_var_kw_arg and self._named_kw_args:                # remove all unamed kw:                copy = dict()                for name in self._named_kw_args:                    if name in kw:                        copy[name] = kw[name]                kw = copy            # check named arg:            for k, v in request.match_info.items():                if k in kw:                    logging.warning('Duplicate arg name in named arg and kw args: %s' % k)                kw[k] = v        if self._has_request_arg:            kw['request'] = request        # check required kw:即加入命名关键字参数(没有附加默认值),request没有提供相应的数值,报错        if self._required_kw_args:            for name in self._required_kw_args:                if not name in kw:                    return web.HTTPBadRequest('Missing argument: %s' % name)        logging.info('call with args: %s' % str(kw))        try:            r = await self._func(**kw)            return r        except APIError as e: # APIError另外创建            return dict(error=e.error, data=e.data, message=e.message)# 添加静态文件夹的路径def add_static(app):    path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')    app.router.add_static('/static/', path)    logging.info('add static %s => %s' % ('/static/', path))# 用来注册一个URL处理函数,主要起验证函数是否包含URL的相应方法与路径信息,并将其函数变为协程def add_route(app, fn):    method = getattr(fn, '__method__', None)    path = getattr(fn, '__route__', None)    if path is None or method is None:        raise ValueError('@get or @post not defined in %s.' % str(fn))    if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):        fn = asyncio.coroutine(fn)    logging.info('add route %s %s => %s(%s)' % (method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys())))    app.router.add_route(method, path, RequestHandler(app, fn))# 自动将module_name模块中所有符合条件的函数进行注册# 只需要向这个函数提供要批量注册函数的文件路径,新编写的函数就会筛选,注册文件内所有符合注册条件的函数def add_routes(app, module_name):    n = module_name.rfind('.')    if n == (-1):        mod = __import__(module_name, globals(), locals())    else:        name = module_name[n+1:]        mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name)    for attr in dir(mod):        if attr.startswith('_'):            continue        fn = getattr(mod, attr)        if callable(fn):            method = getattr(fn, '__method__', None)            path = getattr(fn, '__route__', None)            if method and path: # 此处查询path以及method是否存在而不是等待add_route函数查询		add_route(app, fn)

编辑middleware

将函数返回值变为web.response(),修改的文件为app.py,代码如下:

#!/usr/bin/env python3# -*- coding: utf-8 -*-__author__ = 'Michael Liao''''async web application.'''import logging; logging.basicConfig(level=logging.INFO)import asyncio, os, json, timefrom datetime import datetimefrom aiohttp import webfrom jinja2 import Environment, FileSystemLoaderimport ormfrom coroweb import add_routes, add_static# 初始化jinja2模板,以便其他函数使用jinja2模板def init_jinja2(app, **kw):    logging.info('init jinja2...')    options = dict(        autoescape = kw.get('autoescape', True),        block_start_string = kw.get('block_start_string', '{%'),        block_end_string = kw.get('block_end_string', '%}'),        variable_start_string = kw.get('variable_start_string', '{
{'), variable_end_string = kw.get('variable_end_string', '}}'), auto_reload = kw.get('auto_reload', True) ) path = kw.get('path', None) if path is None: path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates') logging.info('set jinja2 template path: %s' % path) env = Environment(loader=FileSystemLoader(path), **options) filters = kw.get('filters', None) if filters is not None: for name, f in filters.items(): env.filters[name] = f app['__templating__'] = env# 使用middleware参数将函数返回值转化为web.response对象# middlewares是一个拦截器、中间键,即在URL真正被处理之前,需要经过一系列middleware的处理。async def logger_factory(app, handler): # 协程,两个参数 async def logger(request): logging.info('Request: %s %s' % (request.method, request.path)) # await asyncio.sleep(0.3) return (await handler(request)) return loggerasync def data_factory(app, handler): async def parse_data(request): if request.method == 'POST': if request.content_type.startswith('application/json'): request.__data__ = await request.json() logging.info('request json: %s' % str(request.__data__)) elif request.content_type.startswith('application/x-www-form-urlencoded'): request.__data__ = await request.post() logging.info('request form: %s' % str(request.__data__)) return (await handler(request)) return parse_data# 函数返回值转化为'web.response'对象async def response_factory(app, handler): async def response(request): logging.info('Response handler...') r = await handler(request) if isinstance(r, web.StreamResponse): return r if isinstance(r, bytes): resp = web.Response(body=r) resp.content_type = 'application/octet-stream' return resp if isinstance(r, str): if r.startswith('redirect:'): # 重定向 return web.HTTPFound(r[9:]) # 转入别的网站 resp = web.Response(body=r.encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, dict): template = r.get('__template__') if template is None: resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8')) resp.content_type = 'application/json;charset=utf-8' return resp else: # jinja2模板 resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, int) and r >= 100 and r < 600: return web.Response(r) if isinstance(r, tuple) and len(r) == 2: t, m = r if isinstance(t, int) and t >= 100 and t < 600: return web.Response(t, str(m)) # default: resp = web.Response(body=str(r).encode('utf-8')) resp.content_type = 'text/plain;charset=utf-8' return resp return responsedef datetime_filter(t): delta = int(time.time() - t) if delta < 60: return u'1分钟前' if delta < 3600: return u'%s分钟前' % (delta // 60) if delta < 86400: return u'%s小时前' % (delta // 3600) if delta < 604800: return u'%s天前' % (delta // 86400) dt = datetime.fromtimestamp(t) return u'%s年%s月%s日' % (dt.year, dt.month, dt.day)

编写测试代码

数据库服务启动

打开cmd命令行,输入net start mysql,如果未安装mysql,参考。

数据表建立

在cmd命令行输入:mysql -u root -p,进而输入密码进入数据库,之后输入以下命令建立网站的数据表。

drop database if exists awesome;# 创建数据库create database awesome;# 使用数据库use awesome;# 此处改为自己的主机名和密码grant select, insert, update, delete on awesome.* to 'root'@'localhost' identified by 'password';create table users (    `id` varchar(50) not null,    `email` varchar(50) not null,    `passwd` varchar(50) not null,    `admin` bool not null,    `name` varchar(50) not null,    `image` varchar(500) not null,    `created_at` real not null,    unique key `idx_email` (`email`),    key `idx_created_at` (`created_at`),    primary key (`id`)) engine=innodb default charset=utf8;create table blogs (    `id` varchar(50) not null,    `user_id` varchar(50) not null,    `user_name` varchar(50) not null,    `user_image` varchar(500) not null,    `name` varchar(50) not null,    `summary` varchar(200) not null,    `content` mediumtext not null,    `created_at` real not null,    key `idx_created_at` (`created_at`),    primary key (`id`)) engine=innodb default charset=utf8;create table comments (    `id` varchar(50) not null,    `blog_id` varchar(50) not null,    `user_id` varchar(50) not null,    `user_name` varchar(50) not null,    `user_image` varchar(500) not null,    `content` mediumtext not null,    `created_at` real not null,    key `idx_created_at` (`created_at`),    primary key (`id`)) engine=innodb default charset=utf8;

app.py部分的代码:

async def init(loop):	# 这里的user和password改成自己的用户名和密码    await orm.create_pool(loop=loop, host='127.0.0.1', port=3306, user='root', password='www', db='awesome')    app = web.Application(loop=loop, middlewares=[        logger_factory, response_factory    ])    init_jinja2(app, filters=dict(datetime=datetime_filter))    # 对接收到的不同类型的浏览器请求语言具体处理的代码放在'handlers.py'文件中    add_routes(app, 'handlers')    add_static(app)    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 9000)    logging.info('server started at http://127.0.0.1:9000...')    return srvloop = asyncio.get_event_loop()loop.run_until_complete(init(loop))loop.run_forever()

Handlers.py部分的代码

import asyncio from coroweb import get,post#编写用于测试的URL处理函数 @get('/') async def handler_url_blog(request): 	body='

Awesome

' return body

参考博客

转载地址:http://nmlxi.baihongyu.com/

你可能感兴趣的文章
Java集合(1) - List集合源码解析
查看>>
Java集合(2) - Map与AbstractMap源码解析
查看>>
Java集合(3) - HashMap源码解析与常见问题(一)
查看>>
Java集合(4) - HashMap-put()源码解析与常见问题(二)
查看>>
Java集合(5) - HashMap查删源码解析与常见问题(三)
查看>>
Java集合(6) - LinkedHashMap源码解析
查看>>
Java集合(7) - TreeMap源码解析
查看>>
Java集合(8) - Set与AbstractSet源码解析
查看>>
Java多线程(2) - 多线程之线程安全详解(synchronized、Lock)
查看>>
OKR与CFR管理模式(二)-CFR与OKR的绩效管理
查看>>
Java多线程(3) - 多线程之死锁
查看>>
Java多线程(4) - 多线程之Volatile关键字、ThreadLocal、Atomic系列类、CAS
查看>>
Java多线程(5) - 多线程之线程通讯(一)(wait、notify、join、yield、sleep区别与应用)
查看>>
Java多线程(6) - 多线程之线程通讯(二)(wait与notify案例、守护线程)
查看>>
什么是项目管理?怎么管?(二)
查看>>
Java多线程(7) - 多线程之线程停止方式
查看>>
Java设计模式(1) - 单例设计模式多种写法
查看>>
Java设计模式(2) - 工厂设计模式
查看>>
Java多线程(8) - 同步(并发)类容器详解(CopyOnWrite容器、ConcurrentMap容器、Queue队列容器)
查看>>
Java设计模式(3) - 多线程并发设计模式 - Future设计模式
查看>>