文章
json schema验证
check_with_function.py
from datetime import datetime
check_with_functions = {}
def register(name):
def decorator(func):
check_with_functions[name] = func
return func
return decorator
@register("is_datetime_str")
def is_datetime_str(self, field, value):
# 验证当前字段是否符合日期格式
cur_dt = datetime.now()
try:
dt = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
if dt > cur_dt:
self._error(field, "不可使用未来日期")
except ValueError:
self._error(field, "日期格式不符合")
@register("is_current_year")
def is_current_year(self, field, value):
# 验证当前字段是否符合日期格式
cur_dt = datetime.now()
try:
dt = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
if dt.year != cur_dt.year:
self._error(field, "不是今年的数据!")
except ValueError:
self._error(field, "日期格式不符合")
@register("is_datetime_ts")
def is_datetime_ts(self, field, value):
if len(str(value)) == 13:
value = value / 1000
cur_dt = datetime.now()
dt = datetime.fromtimestamp(value)
if dt > cur_dt:
self._error(field, "不可使用未来日期")
coerce_function.py
from datetime import datetime
coerce_functions = {
"str": str,
"int": int
}
def register(name):
def decorator(func):
coerce_functions[name] = func
return func
return decorator
@register('str_to_datetime')
def str_to_datetime(dt_str):
return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
@register('ts_to_datetime')
def ts_to_datetime(ts):
return datetime.fromtimestamp(ts)
custom_error.py
from cerberus import errors
class CustomErrorHandler(errors.BasicErrorHandler):
messages = errors.BasicErrorHandler.messages.copy()
messages.update({
errors.REQUIRED_FIELD.code: "Required field",
errors.UNKNOWN_FIELD.code: "Unknown field",
errors.DEPENDENCIES_FIELD.code: "Field '{0}' is required",
errors.DEPENDENCIES_FIELD_VALUE.code: "Depends on these values: {constraint}",
errors.EXCLUDES_FIELD.code: "{0} must not be present with '{field}'",
errors.EMPTY_NOT_ALLOWED.code: "Field should not be empty",
errors.NOT_NULLABLE.code: "Field should not be empty",
errors.BAD_TYPE.code: "Must be of {constraint} type",
errors.ITEMS_LENGTH.code: "Length of list should be {constraint}, it is {0}",
errors.MIN_LENGTH.code: "Min length is {constraint}",
errors.MAX_LENGTH.code: "Max length is {constraint}",
errors.REGEX_MISMATCH.code: "Value does not match regex '{constraint}'",
errors.MIN_VALUE.code: "不能小于最小值: {constraint}",
errors.MAX_VALUE.code: "Max value is {constraint}",
errors.UNALLOWED_VALUE.code: "不在值域范围内: {value}",
errors.UNALLOWED_VALUES.code: "Unallowed values {0}",
errors.FORBIDDEN_VALUE.code: "Unallowed value {value}",
errors.FORBIDDEN_VALUES.code: "Unallowed values {0}",
errors.COERCION_FAILED.code: "Field '{field}' cannot be coerced",
errors.RENAMING_FAILED.code: "Field '{field}' cannot be renamed",
errors.READONLY_FIELD.code: "Field is read-only",
errors.SETTING_DEFAULT_FAILED.code: "Default value for '{field}' cannot be set: {0}",
errors.MAPPING_SCHEMA.code: "Mapping doesn't validate subschema: {0}",
errors.SEQUENCE_SCHEMA.code: "One or more sequence-items don't validate: {0}",
errors.KEYSCHEMA.code: "One or more properties of a mapping don't validate: {0}",
errors.VALUESCHEMA.code: "One or more values in a mapping don't validate: {0}",
errors.NONEOF.code: "One or more definitions validate",
errors.ONEOF.code: "None or more than one rule validate",
errors.ANYOF.code: "No definitions validate",
errors.ALLOF.code: "One or more definitions don't validate",
})
def __init__(self, tree=None, custom_messages=None):
super().__init__(tree)
self.custom_messages = custom_messages or {}
def format_message(self, field, error):
tmp = self.custom_messages
for i, x in enumerate(error.schema_path):
try:
tmp = tmp[x]
except KeyError:
if i == len(error.schema_path) - 1 and 'any' in tmp:
return tmp['any']
return super()._format_message(field, error)
if isinstance(tmp, dict):
return super()._format_message(field, error)
else:
return tmp
schema.py
import json
import re
from task_center.common.json_validator.coerce_function import coerce_functions
from task_center.common.data_tools.mysql import MysqlTools
class Schema(object):
def __init__(self, datasource_id, table_name):
self.datasource_id = datasource_id
self.table_name = table_name
def get_data_standards(self):
dbc = MysqlTools.get_task_center_dbc()
sql = f"""
SELECT *
FROM data_standards
WHERE delete_flag = '0'
AND datasource_id = '{self.datasource_id}'
AND table_name = '{self.table_name}'
"""
records = dbc.select_all(sql)
res = dict()
for record in records:
field_name = record['field_name']
rule = json.loads(record['rule'])
coerce = record['coerce']
if coerce:
# 强制类型转换
rule["coerce"] = self.parse_coerce_str(coerce)
check_with = record['check_with']
if check_with:
rule["check_with"] = check_with.split(",")
res[field_name] = rule
return res
def gen_schema(self):
return self.get_data_standards()
def parse_coerce_str(self, trans_type_str: str):
trans_type_str = re.sub(r"\s+", "", trans_type_str)
trans_type_list = trans_type_str.split(",")
if len(trans_type_list) == 2:
return coerce_functions[trans_type_list[0]], coerce_functions[trans_type_list[1]]
else:
raise RuntimeError("强制类型转换参数配置格式错误!")
def __str__(self):
return str(self.gen_schema())
validator.py
from cerberus import Validator
from task_center.common.tools.klass import create_class
from task_center.common.json_validator.schema import Schema
from task_center.common.json_validator.check_with_function import check_with_functions
class JsonValidator:
def __init__(self, schema: Schema):
self.schema = schema.gen_schema()
self.class_methods = {}
self.instance = self.build_instance()
def build_instance(self):
self.get_check_with_methods()
klass = create_class("MyValidator", Validator, {}, self.class_methods)
instance = klass(self.schema)
instance.allow_unknown = True
return instance
def __getattr__(self, attr):
return getattr(self.instance, attr)
def get_check_with_methods(self):
check_with_methods = []
for k, v in self.schema.items():
if isinstance(v, dict):
check_with = v.get("check_with", [])
if check_with:
check_with_methods.extend(check_with)
if check_with_methods:
for method_name in check_with_methods:
self.add_method(method_name)
def add_method(self, method_name):
prefix = "_check_with_"
class_method_name = prefix + method_name
self.class_methods[class_method_name] = check_with_functions[method_name]
验证:
from json_validator.schema import Schema
from json_validator.validator import JsonValidator
schema = Schema("1001", "table01")
document = {
"id": "1" * 65,
"is_default": 1,
"create_time": "2028-05-27 11:11:11"
}
v = JsonValidator(schema)
print(v.validate(document))
print(v.errors)
print(v.document)
from json_validator.schema import Schema
from json_validator.validator import JsonValidator
def run():
schema = Schema("1001", "table01")
# print(schema)
v = JsonValidator(schema)
document = {"id": "1", "is_default": "1", "create_time": "2028-05-27 11:11:11", "update_time": 1615128000000}
print(v.validate(document))
for error in v._errors:
print("err: ", error.value, error.info, error.code)
print(v.document)
if __name__ == '__main__':
run()