PK ! `& dophon/__init__.py# coding: utf-8
from flask import Blueprint
from dophon.tools.dynamic_import import d_import
name = 'dophon'
def blue_print(name, import_name, inject_config: dict = {}, static_folder=None,
static_url_path=None, template_folder=None,
url_prefix=None, subdomain=None, url_defaults=None,
root_path=None):
"""
获取Flask路由,同时实现自动注入(可细粒度管理)
:param inject_config:注入配置,类型为dict
:param name:同Blueprint.name
:param import_name:同Blueprint.import_name
:param static_folder:同Blueprint.static_floder
:param static_url_path:同Blueprint.static_url_path
:param template_folder:同Blueprint.template_folder
:param url_prefix:同Blueprint.url_prefix
:param subdomain:同Blueprint.subdomain
:param url_defaults:同Blueprint.url_defaults
:param root_path:同Blueprint.root_path
:return:
"""
blue_print_obj = Blueprint(name=name, import_name=import_name, static_folder=static_folder,
static_url_path=static_url_path, template_folder=template_folder,
url_prefix=url_prefix, subdomain=subdomain, url_defaults=url_defaults,
root_path=root_path)
if inject_config:
autowire = __import__('dophon.annotation.AutoWired', fromlist=True)
outerwired = getattr(autowire, 'OuterWired')
outerwired(
obj_obj=inject_config['inj_obj_list'],
g=inject_config['global_obj']
)(inject)()
return blue_print_obj
def inject():
"""
用于构造注入入口,无意义
:return:
"""
pass
def dophon_boot(f):
"""
装饰器形式启动
:return:
"""
def arg(*args, **kwargs):
from . import tools
boot = __import__('dophon.boot', fromlist=True)
kwargs['boot'] = boot
return f(*args, **kwargs)
return arg
__all__ = ['BluePrint', 'blue_print']
BluePrint = blue_print
PK ! | dophon/annotation/__init__.py# coding: utf-8
from dophon.annotation import AutoWired
from dophon.annotation import response, request
from dophon.annotation.AutoWired import *
from dophon.annotation.description import *
from dophon.annotation.request import BaseHandler
"""
注解集合(部分)
author:CallMeE
date:2018-06-01
"""
__all__ = [
'ResponseBody',
'ResponseTemplate',
'AutoParam',
'FullParam',
'RequestMapping',
'GetRoute',
'PostRoute',
'Get',
'Post',
'AutoWired',
'AsResBody',
'AsResTemp',
'AsArgs',
'AsJson',
'AsFile',
'BeanConfig',
'bean',
'Bean',
'Desc',
'DefBean',
'res',
'req',
'BaseHandler'
]
AutoWired = AutoWired
ResponseBody = AsResBody = response.response_body
ResponseTemplate = AsResTemp = response.response_template
AutoParam = AsArgs = request.auto_param
FullParam = AsJson = request.full_param
FileParam = AsFile = request.file_param
BeanConfig = AutoWired.BeanConfig
bean = AutoWired.bean
Bean = AutoWired.Bean
RequestMapping = request.request_mapping
GetRoute = request.get_route
PostRoute = request.post_route
Get = request.get
Post = request.post
Desc = desc
DefBean = DefBean
res = response
req = requestPK ! /R, , dophon/annotation/AutoWired.py# coding: utf-8
# 自动注入修饰器
import re
import inspect
from dophon import properties
from dophon_logger import *
import types
from dophon.tools import sys_utils
logger = get_logger(eval(properties.log_types))
"""
自动注入注解
ps:默认为单例模式,利用模块局内变量域管理实例,减少内存冗余
author:CallMeE
date:2018-06-01
实现自动注入功能需在路由模块(也可以在其他位置)中定义方法(inject_obj)并添加下列两个注解之一
DEMO;
# 注入对象
@AutoWired.InnerWired([ShopGoodsController.ShopGoodsController],a_w_list=['_SGCobj'],g=globals())
或者
@AutoWired.OuterWired(obj_list,g=globals())
def inject_obj():
pass
调用inject_obj()即可实现自动注入
注意:
1.a_w_list中的元素为注入引用名,必须要与注入目标引用名一致,否则注入失效
2.注入位置必须显式定义一个值为None的引用,否则编译不通过
3.注入类型必须为可初始化类型(定义__new__ or __init__)
"""
logger.inject_logger(globals())
obj_manager = {}
class UniqueError(Exception):
"""
唯一错误,存在覆盖已存在的别名实例
"""
# 显式参数注入
def InnerWired(clz, g, a_w_list=[]):
# print(locals())
# 注入实例
def wn(f):
def inner_function(*args, **dic_args):
# 获取数组
if a_w_list and 0 < len(a_w_list):
# 装饰器参数查找赋值
a_name = a_w_list
else:
if dic_args['a_w_list'] and 0 < dic_args['a_w_list']:
# 被装饰函数关键字参数查找赋值
a_name = dic_args['a_w_list']
else:
if not args:
logger.error('动态形参为空')
raise Exception('动态形参为空!!')
# 被装饰函数位置参数查找赋值
a_name = args[0]
for index in range(len(a_name)):
logger.info(str(a_name[index]) + " 注入 " + str(clz[index]))
obj_name = a_name[index]
if obj_name in obj_manager:
g[obj_name] = obj_manager[obj_name]
else:
instance = clz[index]()
obj_manager[obj_name] = instance
g[obj_name] = instance
# return arg
return f()
return inner_function
return wn
# 自定义参数列表注入
def OuterWired(obj_obj, g):
# 前期准备
clz = []
a_w_list = []
for key in obj_obj.keys():
a_w_list.append(key)
clz.append(obj_obj[key])
# 注入实例
def wn(f):
logger.info('开始注入实例')
def inner_function(*args, **dic_args):
# 获取数组
if a_w_list and 0 < len(a_w_list):
# 装饰器参数查找赋值
a_name = a_w_list
else:
if dic_args['a_w_list']:
# 被装饰函数关键字参数查找赋值
a_name = dic_args['a_w_list']
else:
if not args:
logger.error('动态形参为空!!')
raise Exception('动态形参为空!!')
# 被装饰函数位置参数查找赋值
a_name = args[0]
for index in range(len(a_name)):
logger.info(str(a_name[index]) + " 注入 " + str(clz[index]))
try:
obj_name = a_name[index]
if obj_name in obj_manager:
g[obj_name] = obj_manager[obj_name]
else:
instance = clz[index]()
obj_manager[obj_name] = instance
g[obj_name] = instance
except Exception as e:
logger.error('注入' + str(a_name[index]) + '失败,原因:' + str(e))
continue
return f()
return inner_function
return wn
"""
2018-07-22
参照spring实例管理实现实例管理
1.bean装饰器
参照spring中bean注解
需执行被装饰方法才能交由实例管理器管理
默认使用方法名作为实例管理名
可传入自定义别名替换别名
同名别名会抛出二义性错误
暂不支持lambda表达式(强制调用非装饰器装饰lambda表达式会产生注册实例无效)
2.BeanConfig实例管理器
定义管理器子类后定义bean装饰方法后实例化一次后全部交由全局实例管理管理实例
支持with语法
3.Bean实例获取类
实例获取需传入实例别名或实例类型
可直接赋值变量
"""
def bean(name: str = None, value=None):
"""
向实例管理器插入实例
:param by_name: 别名(不传值默认为类型)
:return:
"""
def insert_method(f):
def args(*args, **kwargs):
result = f(*args, **kwargs)
if result is None:
raise TypeError('无法注册实例:' + str(result))
if name:
if name in obj_manager:
raise UniqueError('存在已注册的实例:' + name)
obj_manager[name] = result
else:
alias_name = getattr(f, '__name__') if getattr(f, '__name__') else getattr(type(result), '__name__')
if alias_name in obj_manager:
raise UniqueError('存在已注册的实例')
else:
obj_manager[alias_name] = result
return
return args
def insert_value():
if name:
if name in obj_manager:
raise UniqueError('存在已注册的实例:' + name)
obj_manager[name] = value
else:
alias_name = name
if alias_name in obj_manager:
raise UniqueError('存在已注册的实例')
else:
obj_manager[alias_name] = value
return insert_value if value else insert_method
bean_const = ['file_init', 'sort_bean_config']
class BeanConfig:
"""
实力配置类,类内自定义方法带上bean注解即可(参照springboot中been定义)
"""
def __call__(self, *args, **kwargs):
for name in dir(self):
if re.match('__.+__', name) or name in bean_const:
continue
attr = getattr(self, name)
if callable(attr):
fields = inspect.getfullargspec(attr).args
staticmethod(attr(*fields))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type and exc_val and exc_tb:
logger.error(f'{str(exc_val)} 实例不存在')
pass
def __init__(self, files: list = [], config_file: str = ''):
if obj_manager:
logger.warning(f'存在已初始化实例管理器,跳过初始化: {self}')
return
logger.info('执行批量实例管理初始化')
if files or config_file:
self.file_init(files if files else [config_file])
else:
# 无参构造,加载内部bean装饰方法,实例统一配置
self()
def file_init(self, files: list = []):
for config_file in files:
if isinstance(config_file, str):
# 字符串形式声明配置文件(文件名)
logger.info('读取配置文件: %s ' % (config_file,))
# 有参构造,分布式实例配置
from dophon import properties
import os
file_path = properties.project_root + (config_file if config_file.startswith(os.sep) else (
os.sep + config_file))
if os.path.exists(file_path):
try:
bean_config = __import__(config_file.replace(os.sep, '.').rstrip('.py'), fromlist=True)
self.sort_bean_config(bean_config)
except Exception as e:
logger.error('实例配置读取失败,信息: %s' % (e,))
else:
logger.error('不存在实例配置文件')
self()
elif isinstance(config_file, types.ModuleType):
# 模块形式配置文件
try:
logger.info('读取配置文件: %s ' % (config_file,))
self.sort_bean_config(config_file)
except Exception as e:
logger.error('实例配置读取失败,信息: %s' % (e,))
else:
# 其他形式声明配置
logger.error('配置文件声明无效(%s)' % (config_file,))
continue
def sort_bean_config(self, bean_config: object):
for k in dir(bean_config):
if not k.startswith('__') and not k.endswith('__'):
# 获取自定义参数
# print(k, '---', getattr(bean_config, k))
bean(k, getattr(bean_config, k))()
# print(obj_manager[k])
class Bean:
def __new__(cls, *args, **kwargs):
if args or len(args) > 1:
bean_key = args[0]
elif kwargs or len(kwargs) > 1:
bean_key = kwargs.keys()[0]
else:
raise KeyError('不存在实例别名或实例类型')
if isinstance(bean_key, str):
if bean_key in obj_manager:
return obj_manager[bean_key]
raise KeyError(f'不存在该别名实例:({bean_key})')
elif isinstance(bean_key, type):
type_list = []
for key in obj_manager.keys():
if re.match('__.+__', key):
continue
bean_obj = obj_manager[key]
if isinstance(bean_obj, bean_key):
if len(type_list) > 0:
raise UniqueError(f'存在定义模糊的实例获取类型:({bean_key})')
type_list.append(bean_obj)
if not type_list:
raise KeyError(f'不存在该类型实例:({bean_key})')
return type_list[0]
else:
logger.error('无法获取的实例: %s' % (bean_key,))
def DefBean(clz):
# print(f'define bean class{clz}')
if type(clz) is type:
try:
__instance = clz()
# for name in dir(clz):
# print(f'{name} => {getattr(clz, name)}')
# print(f'''{clz} => {}''')
alias_fstr = f"_{getattr(clz, '__name__')}__bean_alias"
clz_alias = sys_utils.to_lower_camel(getattr(clz, alias_fstr)) \
if hasattr(clz, alias_fstr) \
else getattr(clz, '__name__')
if clz_alias not in obj_manager:
obj_manager[sys_utils.to_lower_camel(clz_alias)] = __instance
except Exception as e:
logger.error(f'初始化{clz}失败,{e}')
# print(obj_manager)
PK ! s5 5 ) dophon/annotation/description/__init__.pyimport re
from functools import wraps
from ..request import auto_param
from inspect import getfullargspec
DESC_INFO = {}
PATH_TRANSLATE_DICT = {}
def desc(
operation: str = 'None'
):
"""
函数描述装饰器
自动添加请求属性配对
:return:
"""
def inner_method(f):
# 获取方法的参数名集合
# FullArgSpec(args=['test_arg1'], varargs=None, varkw=None, defaults=None, kwonlyargs=[], kwonlydefaults=None, annotations={})
full_arg_spec = getfullargspec(f)
__own_doc = getattr(f, '__doc__') # 自身定义的文档
__args = full_arg_spec.args # 通过函数定义的属性名集合
__defaults = full_arg_spec.defaults # 通过函数定义参数默认值
# print(__defaults)
__annotations = full_arg_spec.annotations # 通过函数定义的参数修饰
# print(full_arg_spec)
# 处理参数信息,放入参数信息列
__function_arg_info = {}
__func_info = {
'own_doc': re.sub('(\n)', '
', __own_doc) if __own_doc else operation
}
for __inner_arg_index in range(len(__args)):
__function_arg_info[__args[__inner_arg_index]] = {
'name': __args[__inner_arg_index],
# 获取参数上的默认值
'default_value': __defaults[__inner_arg_index]
if __defaults and __inner_arg_index < len(__defaults)
else 'nothing',
# 获取参数上的类型修饰
'annotation_type': __annotations[__args[__inner_arg_index]].__name__
if __annotations and __args[__inner_arg_index] in __annotations
else 'any',
}
__func_info['args_info'] = __function_arg_info
DESC_INFO[f'{getattr(f, "__module__")}.{getattr(f, "__name__")}'] = __func_info
@wraps(f)
def inner_args(*args, **kwargs):
return auto_param()(f)(*args, **kwargs)
return inner_args
return inner_method
PK ! %RCl! l! % dophon/annotation/request/__init__.py# coding: utf-8
import functools
import re
from flask import request, abort
from dophon import properties
from dophon_logger import *
from .param_adapter.handler import BaseHandler, CustomizeHandleProcessNotFoundError
from .param_adapter import *
import inspect
import json
logger = get_logger(eval(properties.log_types))
# Content-TYPE 2 adapter
PARAM_DICT = {
None: is_none,
'application/x-www-form-urlencoded': is_form,
'multipart/form-data': is_form,
'application/json': is_json,
'text/xml': is_xml,
'application/xml': is_xml,
'text/plain': is_args
}
'''
参数体可以为多个,形参名称必须与请求参数名一一对应(只少不多)
装饰器中关键字参数列表可以指定参数名
'''
logger.inject_logger(globals())
# 处理请求参数装饰器(分离参数)
def auto_param(content_handler: BaseHandler = None):
def method(f):
@functools.wraps(f)
def args(*args, **kwargs):
try:
content_type = request.headers.get('Content-Type')
if not content_handler:
handle_result = PARAM_DICT[content_type](request)
req_kwargs = eval(json.dumps(handle_result, ensure_ascii=False)) \
if isinstance(handle_result, dict) \
else handle_result
else:
try:
# 尝试自定义处理器处理请求
req_kwargs = content_handler.handle(content_type)(request)
except CustomizeHandleProcessNotFoundError as e:
# 尝试使用框架自定义处理器处理
handle_result = PARAM_DICT[content_type](request)
req_kwargs = eval(json.dumps(handle_result, ensure_ascii=False)) \
if isinstance(handle_result, dict) \
else handle_result
assert isinstance(req_kwargs, dict), f'参数类型异常{type(req_kwargs)}'
for k in inspect.signature(f).parameters.keys():
if k in req_kwargs:
kwargs[k] = req_kwargs[k]
return f(**kwargs)
except TypeError as t_e:
logger.error('参数不匹配!!,msg:' + repr(t_e))
raise t_e
return abort(500)
return args
return method
'''
注意!!!
该注解只需方法体内存在一个形参!!!
同样,需要指定参数名的形式参数列表条目也只能存在一个,多个会默认取第一个
不匹配会打印异常
参数以json形式赋值
'''
# 处理请求参数装饰器(统一参数,参数体内参数指向json串)
def full_param(kwarg_list=[]):
def method(f):
@functools.wraps(f)
def args(*args, **kwargs):
try:
if 'POST' == str(request.method):
r_arg = ()
r_kwarg = {}
if not kwarg_list:
r_arg = (request.json if request.is_json else request.form.to_dict(),)
else:
r_kwarg[kwarg_list[0]] = request.json if request.is_json else request.form.to_dict()
return f(*r_arg, **r_kwarg)
elif 'GET' == str(request.method):
r_arg = ()
r_kwarg = {}
if not kwarg_list:
r_arg = (request.args.to_dict(),)
else:
r_kwarg[kwarg_list[0]] = request.args.to_dict()
return f(*r_arg, **r_kwarg)
else:
logger.error('json统一参数不支持该请求方法!!')
return abort(400)
except TypeError as t_e:
logger.error('参数不匹配!!,msg:' + repr(t_e))
return abort(500)
return args
return method
def file_param(alias_name: str = 'files', extra_param: str = 'args'):
"""
文件参数装在装饰器
ps 文件上传暂时只支持路由方法内单个参数接收(会有校验策略)
参数demo(小程序):
ImmutableMultiDict([('img_upload_test', )])
:return:
"""
def method(f):
@functools.wraps(f)
def args(*args, **kwargs):
# 检测参数
a_nums = len(args) + len(kwargs)
if a_nums > 0:
logger.error('路由绑定参数数量异常')
raise Exception('路由绑定参数数量异常')
try:
extra_param_value = (request.form if request.form else request.json).to_dict()
except:
extra_param_value = {}
k_args = {
alias_name: request.files.to_dict(),
extra_param: extra_param_value
}
return f(**k_args)
return args
return method
# 路径绑定装饰器
# 默认服务器从boot获取
def request_mapping(path='', methods=[], app=None):
def method(f):
try:
# 自动获取蓝图实例并进行http协议绑定
current_package = __import__(str(getattr(f, "__module__")), fromlist=True)
try:
package_app = getattr(current_package, '__app') \
if hasattr(current_package, '__app') \
else current_package.app \
if hasattr(current_package, 'app') \
else app \
if hasattr(app, 'routes') \
else __import__('dophon').blue_print(f"_annotation_auto_reg.{getattr(current_package, '__name__')}",
getattr(current_package, '__name__'))
except Exception as e:
logger.warn(f'{e}')
package_app = __import__('dophon.boot', fromlist=True).app
# 回设内部蓝图参数
# print(package_app)
setattr(current_package, '__app', package_app)
result = package_app.route(path, methods=methods)(f)
except Exception as e:
logger.error(f'{getattr(f, "__module__")}参数配置缺失,请检查({path},{methods},{package_app})')
def m_args(*args, **kwargs):
if properties.debug_trace:
logger.info(f'router endpoint:{getattr(f, "__module__")}.{f},router path:{path}')
return result(*args, **kwargs)
return m_args
return method
# 请求方法缩写
def method_route(req_method: str, path: str = '', ):
def method(f):
result = request_mapping(path, [req_method])(f)
def _args(*args, **kwargs):
return result(*args, **kwargs)
return _args
return method
# get方法缩写
def get_route(path=''):
def method(f):
result = method_route('get', path)(f)
def _args(*args, **kwargs):
return result(*args, **kwargs)
return _args
return method
# post方法缩写
def post_route(path=''):
def method(f):
result = method_route('post', path)(f)
def _args(*args, **kwargs):
return result(*args, **kwargs)
return _args
return method
func_to_path = lambda f: f"""{"/" if re.match("^[a-zA-Z0-9]+", getattr(f, "__name__")) else ""}{re.sub("[^a-zA-Z0-9]",
"/",
getattr(f,
"__name__"))}"""
# get方法缩写
def get(f, *args, **kwargs):
path = func_to_path(f)
result = get_route(re.sub('\s+', '', path))(f)
def method():
def _args():
return result(*args, **kwargs)
return _args
return method
# post方法缩写
def post(f, *args, **kwargs):
path = func_to_path(f)
result = post_route(re.sub('\s+', '', path))(f)
def method():
def _args():
return result(*args, **kwargs)
return _args
return method
PK ! Ry 3 dophon/annotation/request/param_adapter/__init__.pyimport xmltodict
def is_none(request):
return request.args
def is_json(request):
return request.json
def is_form(request):
return request.form.to_dict()
def is_args(request):
return request.args.to_dict()
def is_xml(request):
xml_str = str(request.data, encoding='utf8')
target = xmltodict.parse(xml_str, encoding='utf-8')
return target
PK ! :hƬ 2 dophon/annotation/request/param_adapter/handler.pyclass CustomizeHandleProcessNotFoundError(KeyError):
pass
class BaseHandler:
def handle(self, content_type: str) -> callable:
if content_type in self.__init__handle_dict:
return self.__init__handle_dict[content_type]
raise CustomizeHandleProcessNotFoundError(f'未找到自定义处理器{content_type}')
def __init__(self, handle_dict: dict):
self.__init__handle_dict = handle_dict
PK ! &