本文共 13326 字,大约阅读时间需要 44 分钟。
学习笔记来源-廖雪峰老师
coreweb.py
#!/usr/bin/env python3# -*- coding: utf-8 -*-'''Python基础- Web 框架 coreweb 处理 URL 操作'''import asyncio, os, inspect, logging, functoolsfrom urllib import parsefrom aiohttp import webfrom apis import APIErrordef get(path): ''' URL 的 get 装饰方法 ''' def decorator(func): # 把一个函数映射为URL处理函数 @functools.wraps(func) def wrapper(*args, **kw): return func(*args, **kw) wrapper.__method__ = "GET" wrapper.__route__ = path return wrapper return decoratordef post(path): ''' URL 的 post 装饰方法 ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): return func(*args, **kw) wrapper.__method__ = "POST" wrapper.__route__ = path return wrapper return decoratordef get_required_kw_args(fn): args = [] params = inspect.sinature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty: args.append(name) return tuple(args)def get_name_kw_args(fn): args = [] params = inspect.sinature(fn).parameters for name, param in param.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY: args.append(name) return tuple(args)def has_name_kw_args(fn): params = inspect.sinature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY: return Truedef ha_var_kw_arg(fn): params = inspect(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.VAR_KEYWORD: return Truedef has_request_arg(fn): sig = inspect.sinature(fn) param = sig.parameters found = False for name, param in param.items(): if name == "request": found = True continue if found and (param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD): raise valueError("请求的参数必须the last named parameter in function:%s%s" % (fn.__name__), str(sig)) return foundclass RequestHandler(object): def __init__(self, app, fn): self._app = app self._func = fn self._has_request_arg = has_request_arg(fn) self._has_var_kw_arg = has_var_kw_arg(fn) self._has_named_kw_args = has_named_kw_args(fn) self._named_kw_args = named_kw_args(fn) self._required_kw_args = _required_kw_args(fn) # 异步 @asyncio.coroutine def __call__(self, request): kw = None if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args: if request.method == "POST": if not request.content_type: return web.HTTPBadRequest("Missing Content-Type") # 转换为小写 ct = request.content_type.lower() # JSON 格式 if ct.startswith('application/json'): params = yield from request.json() if not isinstance(params, dict): return web.HTTPBadRequest("JSON body must be object(dict)") kw = params elif ct.startswith("application/x-www-form-urlencoded") or ct.startswith('multipart/form-data'): params = yield from request.post() kw = dict(**params) else: return web.HTTPBadRequest("不支持的 Content-Type: %s" % request.content_type) if kw is None: kw = dict(**request.match_info) else: if not self._has_var_kw_arg and self._named_kw_args: # 移除所有未被命名的 kw copy = dict() for name in self._named_kw_args: if name in kw: copy[name] = kw[name] kw = copy # 检查 kw for k, v in request.match_info.items(): if k in kw: logging.warning("arg 和 kw arg重复命名了 %s" % k) kw[k] = v if self._required_kw_args: kw["request"] = request # 检查 required kw if self._required_kw_args: for name in self.required_kw_args: if not name in kw: return web.HTTPBadRequest("Missing arg %s" % name) logging.info("call with args : %s" % str(kw)) try: r = yield from self._func(**kw) return r except APIError as e: return dict(error = e.error, data = e.data, message = e.message)def add_static(app): path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") app.router.add_static("/static/", path) logging.info("add_static %s -> %s" % ('/static/', path))def add_route(app, fn): method = getattr(fn, "__method__", None) path = getattr(fn, "__route__", None) if path is None or method is None: raise ValueError('@get or @post 未定义 in %s.' % str(fn)) if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn): fn = asyncio.coroutine(n) logging.info('add route %s %s => %s(%s)' % (method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys()))) app.router.add_route(method, path, RequestHandler(app, fn))def add_routes(app, module_name): n = module_name.rfind(".") if n == (-1): mod = __import__(module_name, globals(), locals()) else: name = module_name[n+1] mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name) for attr in dir(mod): if attr.startswith("_"): continue fn = getattr(mod, attr) if callable(fn): method = getattr(fn, "__method__", None) path = getattr(fn, "__route__", None) if method and path: add_route(app, fn)
#!/usr/bin/env python3# -*- coding: utf-8 -*-'''JSON API definition.'''import json, logging, inspect, functoolsclass APIError(Exception): ''' the base APIError which contains error(required), data(optional) and message(optional). ''' def __init__(self, error, data='', message=''): super(APIError, self).__init__(message) self.error = error self.data = data elf.message = messageclass APIValueError(APIError): ''' Indicate the input value has error or invalid. The data specifies the error field of input form. ''' def __init__(self, field, message=''): super(APIValueError, self).__init__('value:invalid', field, message)class APIResourceNotFoundError(APIError): ''' Indicate the resource was not found. The data specifies the resource name. ''' def __init__(self, field, message=''): super(APIResourceNotFoundError, self).__init__('value:notfound', field, message)class APIPermissionError(APIError): ''' Indicate the api has no permission. ''' def __init__(self, message=''): super(APIPermissionError, self).__init__('permission:forbidden', 'permission', message)
# handlers.py#!/usr/bin/env python3# -*- coding: utf-8 -*-'url handlers'
#!/usr/bin/env python3# -*- coding: utf-8 -*-# Python基础-Web App 骨架import logging; logging.basicConfig(level = logging.INFO)import asyncio, os, json, timefrom datetime import datetimeimport ormfrom aiohttp import webfrom coreweb import add_routes, add_staticfrom jinja2 import Environment, FileSystemLoaderdef init_jinja2(app, **kw): logging.info("初始化 jinja2") options = dict( autoescape = kw.get("autoescape ", True), block_start_string = kw.get("block_start_string","{%"), block_end_string = kw.get("block_end_string", "%}"), variable_start_string = kw.get("variable_start_string", '{ {'), variable_end_string = kw.get("variable_end_string", '}}'), auto_reload = kw.get("auto_reload", True) ) path = kw.get("path", None) if path is None: # templates文件夹 放置 html 文件 path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates") logging.info("设置 jinja2 templates 地址为 %s" % path) env = Environment(loader = FileSystemLoader(path), **options) filters = kw.get("filters", None) if filters is not None: for name, f in filters.items(): env.filters[name] = f app['__templating__'] = envdef datetime_filter(t): # 时间差 delta = int(time.time() - t) if delta < 60: return u'1分钟前' if delta < 3600: return u'%s分钟前' % (delta // 60) if delta < 86400: return u'%s小时前' % (delta // 3600) if delta < 604800: return u'%s天前' % (delta // 86400) dt = datetime.fromtimestamp(t) return u'%s年%s月%s日' % (dt.year, dt.month, dt.day)@asyncio.coroutinedef logger_factory(app, handler): @asyncio.coroutine def logger(request): logging.info('Request: %s %s' % (request.method, request.path)) return (yield from handler(request)) return logger@asyncio.coroutinedef data_factory(app, handler): @asyncio.coroutine def parse_data(request): if request.method == 'POST': if request.content_type.startswith('application/json'): request.__data__ = yield from request.json() logging.info('request json: %s' % str(request.__data__)) elif request.content_type.startswith('application/x-www-form-urlencoded'): request.__data__ = yield from request.post() logging.info('request form: %s' % str(request.__data__)) return (yield from handler(request)) return parse_data@asyncio.coroutinedef response_factory(app, handler): @asyncio.coroutine def response(request): logging.info('Response handler...') r = yield from handler(request) if isinstance(r, web.StreamResponse): return r if isinstance(r, bytes): resp = web.Response(body=r) resp.content_type = 'application/octet-stream' return resp if isinstance(r, str): if r.startswith('redirect:'): return web.HTTPFound(r[9:]) resp = web.Response(body=r.encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, dict): template = r.get('__template__') if template is None: resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8')) resp.content_type = 'application/json;charset=utf-8' return resp else: resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, int) and r >= 100 and r < 600: return web.Response(r) if isinstance(r, tuple) and len(r) == 2: t, m = r if isinstance(t, int) and t >= 100 and t < 600: return web.Response(t, str(m)) # default: resp = web.Response(body=str(r).encode('utf-8')) resp.content_type = 'text/plain;charset=utf-8' return resp return response@asyncio.coroutinedef init(loop): yield from orm.create_pool(loop = loop, host = "127.0.0.1", port = 3306, user = "root", password = "", database = "test") #创建一个web服务器对象 app = web.Application(loop=loop, middlewares=[ logger_factory, response_factory ]) #通过router的指定的方法可以把请求的链接和对应的处理函数关联在一起 init_jinja2(app, filters=dict(datetime = datetime_filter)) add_routes(app, "handlers") add_static(app) #运行web服务器,服务器启动后,有用户在浏览器访问,就可以做出对应的响应 # 127.0.0.1 本机地址 srv = yield from loop.create_server(app.make_handler(), "127.0.0.1", 9000) logging.info("服务端 http://127.0.0.1:9000....") return srv# 固定写法loop = asyncio.get_event_loop()loop.run_until_complete(init(loop))loop.run_forever()
D:\python3-webapp-Su\www>python app.pyINFO:root:建立数据库连接池INFO:root:初始化 jinja2INFO:root:设置 jinja2 templates 地址为 D:\python3-webapp-Su\www\templatesINFO:root:add_static /static/ -> D:\python3-webapp-Su\www\staticINFO:root:服务端 http://127.0.0.1:9000....INFO:root:Request: GET /indexINFO:root:Response handler...INFO:aiohttp.access:127.0.0.1 - - [30/Dec/2017:15:37:26 +0000] "GET /index HTTP/1.1" 404 172 "-" "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299"INFO:root:Request: GET /indexINFO:root:Response handler...INFO:aiohttp.access:127.0.0.1 - - [30/Dec/2017:15:39:20 +0000] "GET /index HTTP/1.1" 404 172 "-" "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299"INFO:root:Request: GET /INFO:root:Response handler...INFO:aiohttp.access:127.0.0.1 - - [30/Dec/2017:15:39:25 +0000] "GET / HTTP/1.1" 404 172 "-" "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299"
Web框架实现对前面的 web 和 mysql 的基本调用
git add .git commit -m "Python基础-项目-day5 Web框架"git push -u origin master
转载地址:http://cacti.baihongyu.com/