本文共 13616 字,大约阅读时间需要 45 分钟。
网站搭建笔记精简版-廖雪峰教程学习@[三川水祭]
仅作学习交流使用,将来的你会感谢现在拼命努力的自己!!!本文首先对web框架进行了代码上的解释,之后对编辑middleware部分进行了代码的分析,最后讲述了如何测试从开始到现在所有代码的流程。其中代码的原版参照,在此基础上对handlers.py
文件增加了部分内容以便进行测试。
在编写过程中,由于aiohttp太过于底层,因此自己定义一个web框架,以实现自动化URL信息提取与函数的注册,增加的文件为coroweb.py
,如下代码
#!/usr/bin/env python3# -*- coding: utf-8 -*-# 导入异步工具包import asyncio, os, inspect, logging, functools# 导入网页处理工具包from urllib import parse# 导入底层web框架from aiohttp import webfrom apis import APIError# 将函数映射为URL处理函数,使得get函数附带URL信息def get(path): ''' Define decorator @get('/path') ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): return func(*args, **kw) wrapper.__method__ = 'GET' wrapper.__route__ = path return wrapper return decorator# 将函数映射为URL处理函数,使得post函数附带URL信息def post(path): ''' Define decorator @post('/path') ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): return func(*args, **kw) wrapper.__method__ = 'POST' # 存储方法信息 wrapper.__route__ = path # 存储路径信息 return wrapper return decorator# 运用inspect模块,创建几个函数用以获取URL处理函数与request参数之间的关系def get_required_kw_args(fn): # 收集没有默认值的命名关键字参数 args = [] params = inspect.signature(fn).parameters # inspect模块是用来分析模块,函数 for name, param in params.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty: args.append(name) return tuple(args)def get_named_kw_args(fn): # 获取命名关键字参数 args = [] params = inspect.signature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY: args.append(name) return tuple(args)def has_named_kw_args(fn): # 判断有没有命名关键字参数 params = inspect.signature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY: return Truedef has_var_kw_arg(fn): # 判断有没有关键字参数 params = inspect.signature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.VAR_KEYWORD: return Truedef has_request_arg(fn): # 判断是否含有名字叫做'request'参数,且该参数是否为最后一个参数 sig = inspect.signature(fn) params = sig.parameters found = False for name, param in params.items(): if name == 'request': found = True continue if found and (param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD): raise ValueError('request parameter must be the last named parameter in function: %s%s' % (fn.__name__, str(sig))) return found# 使用RequestHandler函数封装一个URL处理函数,向request参数获取URL处理函数所需要的参数class RequestHandler(object): def __init__(self, app, fn): # 接收app参数 self._app = app self._func = fn self._has_request_arg = has_request_arg(fn) self._has_var_kw_arg = has_var_kw_arg(fn) self._has_named_kw_args = has_named_kw_args(fn) self._named_kw_args = get_named_kw_args(fn) self._required_kw_args = get_required_kw_args(fn) # RequestHandler本身是一个类,由于定义了__call__方法,因此将其实例视为函数 # 该函数从request中获取必要参数,之后调用URL函数 # 最后将结果转换为web.Response对象。上述比较符合aiohttp框架 async def __call__(self, request): # 构造协程 kw = None if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args: if request.method == 'POST': # 判断客户端发来的方法是否是POST if not request.content_type: # 查询有没提交数据的格式(EncType) return web.HTTPBadRequest(text='Missing Content-Type.') ct = request.content_type.lower() if ct.startswith('application/json'): params = await request.json() # 读取请求的body代码作为json文件 if not isinstance(params, dict): return web.HTTPBadRequest(text='JSON body must be object.') kw = params elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'): params = await request.post() kw = dict(**params) else: return web.HTTPBadRequest('Unsupported Content-Type: %s' % request.content_type) if request.method == 'GET': # 判断客户端发来的方法是否是GET qs = request.query_string if qs: kw = dict() for k, v in parse.parse_qs(qs, True).items(): kw[k] = v[0] if kw is None: kw = dict(**request.match_info) else: # 当函数参数没有关键字参数时,移去request除命名关键字参数外所有的参数信息 if not self._has_var_kw_arg and self._named_kw_args: # remove all unamed kw: copy = dict() for name in self._named_kw_args: if name in kw: copy[name] = kw[name] kw = copy # check named arg: for k, v in request.match_info.items(): if k in kw: logging.warning('Duplicate arg name in named arg and kw args: %s' % k) kw[k] = v if self._has_request_arg: kw['request'] = request # check required kw:即加入命名关键字参数(没有附加默认值),request没有提供相应的数值,报错 if self._required_kw_args: for name in self._required_kw_args: if not name in kw: return web.HTTPBadRequest('Missing argument: %s' % name) logging.info('call with args: %s' % str(kw)) try: r = await self._func(**kw) return r except APIError as e: # APIError另外创建 return dict(error=e.error, data=e.data, message=e.message)# 添加静态文件夹的路径def add_static(app): path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') app.router.add_static('/static/', path) logging.info('add static %s => %s' % ('/static/', path))# 用来注册一个URL处理函数,主要起验证函数是否包含URL的相应方法与路径信息,并将其函数变为协程def add_route(app, fn): method = getattr(fn, '__method__', None) path = getattr(fn, '__route__', None) if path is None or method is None: raise ValueError('@get or @post not defined in %s.' % str(fn)) if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn): fn = asyncio.coroutine(fn) logging.info('add route %s %s => %s(%s)' % (method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys()))) app.router.add_route(method, path, RequestHandler(app, fn))# 自动将module_name模块中所有符合条件的函数进行注册# 只需要向这个函数提供要批量注册函数的文件路径,新编写的函数就会筛选,注册文件内所有符合注册条件的函数def add_routes(app, module_name): n = module_name.rfind('.') if n == (-1): mod = __import__(module_name, globals(), locals()) else: name = module_name[n+1:] mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name) for attr in dir(mod): if attr.startswith('_'): continue fn = getattr(mod, attr) if callable(fn): method = getattr(fn, '__method__', None) path = getattr(fn, '__route__', None) if method and path: # 此处查询path以及method是否存在而不是等待add_route函数查询 add_route(app, fn)
将函数返回值变为web.response(),修改的文件为app.py
,代码如下:
#!/usr/bin/env python3# -*- coding: utf-8 -*-__author__ = 'Michael Liao''''async web application.'''import logging; logging.basicConfig(level=logging.INFO)import asyncio, os, json, timefrom datetime import datetimefrom aiohttp import webfrom jinja2 import Environment, FileSystemLoaderimport ormfrom coroweb import add_routes, add_static# 初始化jinja2模板,以便其他函数使用jinja2模板def init_jinja2(app, **kw): logging.info('init jinja2...') options = dict( autoescape = kw.get('autoescape', True), block_start_string = kw.get('block_start_string', '{%'), block_end_string = kw.get('block_end_string', '%}'), variable_start_string = kw.get('variable_start_string', '{ {'), variable_end_string = kw.get('variable_end_string', '}}'), auto_reload = kw.get('auto_reload', True) ) path = kw.get('path', None) if path is None: path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates') logging.info('set jinja2 template path: %s' % path) env = Environment(loader=FileSystemLoader(path), **options) filters = kw.get('filters', None) if filters is not None: for name, f in filters.items(): env.filters[name] = f app['__templating__'] = env# 使用middleware参数将函数返回值转化为web.response对象# middlewares是一个拦截器、中间键,即在URL真正被处理之前,需要经过一系列middleware的处理。async def logger_factory(app, handler): # 协程,两个参数 async def logger(request): logging.info('Request: %s %s' % (request.method, request.path)) # await asyncio.sleep(0.3) return (await handler(request)) return loggerasync def data_factory(app, handler): async def parse_data(request): if request.method == 'POST': if request.content_type.startswith('application/json'): request.__data__ = await request.json() logging.info('request json: %s' % str(request.__data__)) elif request.content_type.startswith('application/x-www-form-urlencoded'): request.__data__ = await request.post() logging.info('request form: %s' % str(request.__data__)) return (await handler(request)) return parse_data# 函数返回值转化为'web.response'对象async def response_factory(app, handler): async def response(request): logging.info('Response handler...') r = await handler(request) if isinstance(r, web.StreamResponse): return r if isinstance(r, bytes): resp = web.Response(body=r) resp.content_type = 'application/octet-stream' return resp if isinstance(r, str): if r.startswith('redirect:'): # 重定向 return web.HTTPFound(r[9:]) # 转入别的网站 resp = web.Response(body=r.encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, dict): template = r.get('__template__') if template is None: resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8')) resp.content_type = 'application/json;charset=utf-8' return resp else: # jinja2模板 resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, int) and r >= 100 and r < 600: return web.Response(r) if isinstance(r, tuple) and len(r) == 2: t, m = r if isinstance(t, int) and t >= 100 and t < 600: return web.Response(t, str(m)) # default: resp = web.Response(body=str(r).encode('utf-8')) resp.content_type = 'text/plain;charset=utf-8' return resp return responsedef datetime_filter(t): delta = int(time.time() - t) if delta < 60: return u'1分钟前' if delta < 3600: return u'%s分钟前' % (delta // 60) if delta < 86400: return u'%s小时前' % (delta // 3600) if delta < 604800: return u'%s天前' % (delta // 86400) dt = datetime.fromtimestamp(t) return u'%s年%s月%s日' % (dt.year, dt.month, dt.day)
打开cmd命令行,输入net start mysql
,如果未安装mysql,参考。
在cmd命令行输入:mysql -u root -p
,进而输入密码进入数据库,之后输入以下命令建立网站的数据表。
drop database if exists awesome;# 创建数据库create database awesome;# 使用数据库use awesome;# 此处改为自己的主机名和密码grant select, insert, update, delete on awesome.* to 'root'@'localhost' identified by 'password';create table users ( `id` varchar(50) not null, `email` varchar(50) not null, `passwd` varchar(50) not null, `admin` bool not null, `name` varchar(50) not null, `image` varchar(500) not null, `created_at` real not null, unique key `idx_email` (`email`), key `idx_created_at` (`created_at`), primary key (`id`)) engine=innodb default charset=utf8;create table blogs ( `id` varchar(50) not null, `user_id` varchar(50) not null, `user_name` varchar(50) not null, `user_image` varchar(500) not null, `name` varchar(50) not null, `summary` varchar(200) not null, `content` mediumtext not null, `created_at` real not null, key `idx_created_at` (`created_at`), primary key (`id`)) engine=innodb default charset=utf8;create table comments ( `id` varchar(50) not null, `blog_id` varchar(50) not null, `user_id` varchar(50) not null, `user_name` varchar(50) not null, `user_image` varchar(500) not null, `content` mediumtext not null, `created_at` real not null, key `idx_created_at` (`created_at`), primary key (`id`)) engine=innodb default charset=utf8;
async def init(loop): # 这里的user和password改成自己的用户名和密码 await orm.create_pool(loop=loop, host='127.0.0.1', port=3306, user='root', password='www', db='awesome') app = web.Application(loop=loop, middlewares=[ logger_factory, response_factory ]) init_jinja2(app, filters=dict(datetime=datetime_filter)) # 对接收到的不同类型的浏览器请求语言具体处理的代码放在'handlers.py'文件中 add_routes(app, 'handlers') add_static(app) srv = await loop.create_server(app.make_handler(), '127.0.0.1', 9000) logging.info('server started at http://127.0.0.1:9000...') return srvloop = asyncio.get_event_loop()loop.run_until_complete(init(loop))loop.run_forever()
import asyncio from coroweb import get,post#编写用于测试的URL处理函数 @get('/') async def handler_url_blog(request): body='Awesome
' return body
参考博客
转载地址:http://nmlxi.baihongyu.com/