大数据

json schema验证

check_with_function.py

from datetime import datetime

check_with_functions = {}


def register(name):
    def decorator(func):
        check_with_functions[name] = func
        return func

    return decorator


@register("is_datetime_str")
def is_datetime_str(self, field, value):
    # 验证当前字段是否符合日期格式
    cur_dt = datetime.now()
    try:
        dt = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
        if dt > cur_dt:
            self._error(field, "不可使用未来日期")
    except ValueError:
        self._error(field, "日期格式不符合")


@register("is_current_year")
def is_current_year(self, field, value):
    # 验证当前字段是否符合日期格式
    cur_dt = datetime.now()
    try:
        dt = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
        if dt.year != cur_dt.year:
            self._error(field, "不是今年的数据!")
    except ValueError:
        self._error(field, "日期格式不符合")


@register("is_datetime_ts")
def is_datetime_ts(self, field, value):
    if len(str(value)) == 13:
        value = value / 1000
    cur_dt = datetime.now()
    dt = datetime.fromtimestamp(value)
    if dt > cur_dt:
        self._error(field, "不可使用未来日期")

coerce_function.py

from datetime import datetime

coerce_functions = {
    "str": str,
    "int": int
}


def register(name):
    def decorator(func):
        coerce_functions[name] = func
        return func

    return decorator


@register('str_to_datetime')
def str_to_datetime(dt_str):
    return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")


@register('ts_to_datetime')
def ts_to_datetime(ts):
    return datetime.fromtimestamp(ts)

custom_error.py

from cerberus import errors


class CustomErrorHandler(errors.BasicErrorHandler):
    messages = errors.BasicErrorHandler.messages.copy()
    messages.update({
        errors.REQUIRED_FIELD.code: "Required field",
        errors.UNKNOWN_FIELD.code: "Unknown field",
        errors.DEPENDENCIES_FIELD.code: "Field '{0}' is required",
        errors.DEPENDENCIES_FIELD_VALUE.code: "Depends on these values: {constraint}",
        errors.EXCLUDES_FIELD.code: "{0} must not be present with '{field}'",

        errors.EMPTY_NOT_ALLOWED.code: "Field should not be empty",
        errors.NOT_NULLABLE.code: "Field should not be empty",
        errors.BAD_TYPE.code: "Must be of {constraint} type",
        errors.ITEMS_LENGTH.code: "Length of list should be {constraint}, it is {0}",
        errors.MIN_LENGTH.code: "Min length is {constraint}",
        errors.MAX_LENGTH.code: "Max length is {constraint}",

        errors.REGEX_MISMATCH.code: "Value does not match regex '{constraint}'",
        errors.MIN_VALUE.code: "不能小于最小值: {constraint}",
        errors.MAX_VALUE.code: "Max value is {constraint}",
        errors.UNALLOWED_VALUE.code: "不在值域范围内: {value}",
        errors.UNALLOWED_VALUES.code: "Unallowed values {0}",
        errors.FORBIDDEN_VALUE.code: "Unallowed value {value}",
        errors.FORBIDDEN_VALUES.code: "Unallowed values {0}",

        errors.COERCION_FAILED.code: "Field '{field}' cannot be coerced",
        errors.RENAMING_FAILED.code: "Field '{field}' cannot be renamed",
        errors.READONLY_FIELD.code: "Field is read-only",
        errors.SETTING_DEFAULT_FAILED.code: "Default value for '{field}' cannot be set: {0}",

        errors.MAPPING_SCHEMA.code: "Mapping doesn't validate subschema: {0}",
        errors.SEQUENCE_SCHEMA.code: "One or more sequence-items don't validate: {0}",
        errors.KEYSCHEMA.code: "One or more properties of a mapping  don't validate: {0}",
        errors.VALUESCHEMA.code: "One or more values in a mapping don't validate: {0}",

        errors.NONEOF.code: "One or more definitions validate",
        errors.ONEOF.code: "None or more than one rule validate",
        errors.ANYOF.code: "No definitions validate",
        errors.ALLOF.code: "One or more definitions don't validate",
    })

    def __init__(self, tree=None, custom_messages=None):
        super().__init__(tree)
        self.custom_messages = custom_messages or {}

    def format_message(self, field, error):
        tmp = self.custom_messages
        for i, x in enumerate(error.schema_path):
            try:
                tmp = tmp[x]
            except KeyError:
                if i == len(error.schema_path) - 1 and 'any' in tmp:
                    return tmp['any']
                return super()._format_message(field, error)
        if isinstance(tmp, dict):
            return super()._format_message(field, error)
        else:
            return tmp

schema.py

import json
import re

from task_center.common.json_validator.coerce_function import coerce_functions
from task_center.common.data_tools.mysql import MysqlTools


class Schema(object):
    def __init__(self, datasource_id, table_name):
        self.datasource_id = datasource_id
        self.table_name = table_name

    def get_data_standards(self):
        dbc = MysqlTools.get_task_center_dbc()
        sql = f"""
        SELECT *
        FROM data_standards
        WHERE delete_flag = '0'
        AND datasource_id = '{self.datasource_id}' 
        AND table_name = '{self.table_name}'
        """
        records = dbc.select_all(sql)
        res = dict()
        for record in records:
            field_name = record['field_name']
            rule = json.loads(record['rule'])
            coerce = record['coerce']
            if coerce:
                # 强制类型转换
                rule["coerce"] = self.parse_coerce_str(coerce)
            check_with = record['check_with']
            if check_with:
                rule["check_with"] = check_with.split(",")
            res[field_name] = rule
        return res

    def gen_schema(self):
        return self.get_data_standards()

    def parse_coerce_str(self, trans_type_str: str):
        trans_type_str = re.sub(r"\s+", "", trans_type_str)
        trans_type_list = trans_type_str.split(",")
        if len(trans_type_list) == 2:
            return coerce_functions[trans_type_list[0]], coerce_functions[trans_type_list[1]]
        else:
            raise RuntimeError("强制类型转换参数配置格式错误!")

    def __str__(self):
        return str(self.gen_schema())

validator.py

from cerberus import Validator
from task_center.common.tools.klass import create_class
from task_center.common.json_validator.schema import Schema
from task_center.common.json_validator.check_with_function import check_with_functions


class JsonValidator:
    def __init__(self, schema: Schema):
        self.schema = schema.gen_schema()
        self.class_methods = {}
        self.instance = self.build_instance()

    def build_instance(self):
        self.get_check_with_methods()
        klass = create_class("MyValidator", Validator, {}, self.class_methods)
        instance = klass(self.schema)
        instance.allow_unknown = True
        return instance

    def __getattr__(self, attr):
        return getattr(self.instance, attr)

    def get_check_with_methods(self):
        check_with_methods = []
        for k, v in self.schema.items():
            if isinstance(v, dict):
                check_with = v.get("check_with", [])
                if check_with:
                    check_with_methods.extend(check_with)
        if check_with_methods:
            for method_name in check_with_methods:
                self.add_method(method_name)

    def add_method(self, method_name):
        prefix = "_check_with_"
        class_method_name = prefix + method_name
        self.class_methods[class_method_name] = check_with_functions[method_name]

验证:

from json_validator.schema import Schema
from json_validator.validator import JsonValidator

schema = Schema("1001", "table01")

document = {
    "id": "1" * 65,
    "is_default": 1,
    "create_time": "2028-05-27 11:11:11"
}
v = JsonValidator(schema)
print(v.validate(document))
print(v.errors)
print(v.document)
from json_validator.schema import Schema
from json_validator.validator import JsonValidator


def run():
    schema = Schema("1001", "table01")
    # print(schema)
    v = JsonValidator(schema)
    document = {"id": "1", "is_default": "1", "create_time": "2028-05-27 11:11:11", "update_time": 1615128000000}
    print(v.validate(document))
    for error in v._errors:
        print("err: ", error.value, error.info, error.code)
    print(v.document)


if __name__ == '__main__':
    run()