diff options
author | Johan Fleury <jfleury@arcaik.net> | 2018-10-08 12:58:12 -0400 |
---|---|---|
committer | Johan Fleury <jfleury@arcaik.net> | 2018-10-08 12:58:12 -0400 |
commit | 735c78d3ecb695dd16cb37879880f522c4b29c72 (patch) | |
tree | 8bfb2ecbd0f03730efa1540cc65d8490b67e8ddd /pykwalify |
Import upstream version 1.7.0
Diffstat (limited to 'pykwalify')
-rw-r--r-- | pykwalify/__init__.py | 57 | ||||
-rw-r--r-- | pykwalify/cli.py | 95 | ||||
-rw-r--r-- | pykwalify/compat.py | 56 | ||||
-rw-r--r-- | pykwalify/core.py | 978 | ||||
-rw-r--r-- | pykwalify/errors.py | 237 | ||||
-rw-r--r-- | pykwalify/rule.py | 1358 | ||||
-rw-r--r-- | pykwalify/types.py | 160 |
7 files changed, 2941 insertions, 0 deletions
diff --git a/pykwalify/__init__.py b/pykwalify/__init__.py new file mode 100644 index 0000000..998125f --- /dev/null +++ b/pykwalify/__init__.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- + +""" pykwalify """ + +# python stdlib +import logging +import logging.config +import os + +__author__ = 'Grokzen <Grokzen@gmail.com>' +__version_info__ = (1, 7, 0) +__version__ = '.'.join(map(str, __version_info__)) + + +log_level_to_string_map = { + 5: "DEBUG", + 4: "INFO", + 3: "WARNING", + 2: "ERROR", + 1: "CRITICAL", + 0: "INFO" +} + + +def init_logging(log_level): + """ + Init logging settings with default set to INFO + """ + log_level = log_level_to_string_map[min(log_level, 5)] + + msg = "%(levelname)s - %(name)s:%(lineno)s - %(message)s" if log_level in os.environ else "%(levelname)s - %(message)s" + + logging_conf = { + "version": 1, + "root": { + "level": log_level, + "handlers": ["console"] + }, + "handlers": { + "console": { + "class": "logging.StreamHandler", + "level": log_level, + "formatter": "simple", + "stream": "ext://sys.stdout" + } + }, + "formatters": { + "simple": { + "format": " {0}".format(msg) + } + } + } + + logging.config.dictConfig(logging_conf) + + +partial_schemas = {} diff --git a/pykwalify/cli.py b/pykwalify/cli.py new file mode 100644 index 0000000..e908033 --- /dev/null +++ b/pykwalify/cli.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- + +""" pyKwalify - cli.py """ + +# python std lib +import logging +import logging.config +import sys + +# 3rd party imports +from docopt import docopt + + +def parse_cli(): + """ + The outline of this function needs to be like this: + + 1. parse arguments + 2. validate arguments only, dont go into other logic/code + 3. run application logic + """ + + # + # 1. parse cli arguments + # + + __docopt__ = """ +usage: pykwalify -d FILE -s FILE ... [-e FILE ...] + [--strict-rule-validation] [--fix-ruby-style-regex] [--allow-assertions] [-v ...] [-q] + +optional arguments: + -d FILE, --data-file FILE the file to be tested + -e FILE, --extension FILE file containing python extension + -s FILE, --schema-file FILE schema definition file + --fix-ruby-style-regex This flag fixes some of the quirks of ruby style regex + that is not compatible with python style regex + --strict-rule-validation enables strict validation of all keywords for all + Rule objects to find unsupported keyword usage + --allow-assertions By default assertions is disabled due to security risk. + Error will be raised if assertion is used in schema + but this flag is not used. This option enables assert keyword. + -h, --help show this help message and exit + -q, --quiet suppress terminal output + -v, --verbose verbose terminal output (multiple -v increases verbosity) + --version display the version number and exit +""" + + # Import pykwalify package + import pykwalify + + args = docopt(__docopt__, version=pykwalify.__version__) + + pykwalify.init_logging(1 if args["--quiet"] else args["--verbose"]) + log = logging.getLogger(__name__) + + # + # 2. validate arguments only, dont go into other code/logic + # + + log.debug("Setting verbose level: %s", args["--verbose"]) + log.debug("Arguments from CLI: %s", args) + + return args + + +def run(cli_args): + """ + Split the functionality into 2 methods. + + One for parsing the cli and one that runs the application. + """ + from .core import Core + + c = Core( + source_file=cli_args["--data-file"], + schema_files=cli_args["--schema-file"], + extensions=cli_args['--extension'], + strict_rule_validation=cli_args['--strict-rule-validation'], + fix_ruby_style_regex=cli_args['--fix-ruby-style-regex'], + allow_assertions=cli_args['--allow-assertions'], + ) + c.validate() + return c + + +def cli_entrypoint(): + """ + Main entrypoint for script. Used by setup.py to automatically + create a cli script + """ + # Check minimum version of Python + if sys.version_info < (2, 7, 0): + sys.stderr.write(u"WARNING: pykwalify: It is recommended to run pykwalify on python version 2.7.x or later...\n\n") + + run(parse_cli()) diff --git a/pykwalify/compat.py b/pykwalify/compat.py new file mode 100644 index 0000000..fc2df0a --- /dev/null +++ b/pykwalify/compat.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- + +# python stdlib +import sys +import logging + + +log = logging.getLogger(__name__) + + +try: + from ruamel import yaml +except ImportError: + try: + import yaml + except ImportError: + log.critical("Unable to import either ruamel.yaml or pyyaml") + sys.exit(1) + +log.debug("Using yaml library: {0}".format(yaml.__file__)) + + +if sys.version_info[0] < 3: + # Python 2.x.x series + basestring = basestring # NOQA: F821 + unicode = unicode # NOQA: F821 + bytes = str # NOQA: F821 + + def u(x): + """ """ + return x.decode() + + def b(x): + """ """ + return x + + def nativestr(x): + """ """ + return x if isinstance(x, str) else x.encode('utf-8', 'replace') +else: + # Python 3.x.x series + basestring = str # NOQA: F821 + unicode = str # NOQA: F821 + bytes = bytes # NOQA: F821 + + def u(x): + """ """ + return x + + def b(x): + """ """ + return x.encode('latin-1') if not isinstance(x, bytes) else x + + def nativestr(x): + """ """ + return x if isinstance(x, str) else x.decode('utf-8', 'replace') diff --git a/pykwalify/core.py b/pykwalify/core.py new file mode 100644 index 0000000..e94ac80 --- /dev/null +++ b/pykwalify/core.py @@ -0,0 +1,978 @@ +# -*- coding: utf-8 -*- + +""" pyKwalify - core.py """ + +# python std lib +import datetime +import imp +import json +import logging +import os +import re +import sys +import traceback +import time + +# pyKwalify imports +import pykwalify +from pykwalify.compat import unicode, nativestr, basestring +from pykwalify.errors import CoreError, SchemaError, NotMappingError, NotSequenceError +from pykwalify.rule import Rule +from pykwalify.types import is_scalar, is_string, tt + +# 3rd party imports +from pykwalify.compat import yaml +from dateutil.parser import parse + +log = logging.getLogger(__name__) + + +class Core(object): + """ Core class of pyKwalify """ + + def __init__(self, source_file=None, schema_files=None, source_data=None, schema_data=None, extensions=None, strict_rule_validation=False, + fix_ruby_style_regex=False, allow_assertions=False,): + """ + :param extensions: + List of paths to python files that should be imported and available via 'func' keywork. + This list of extensions can be set manually or they should be provided by the `--extension` + flag from the cli. This list should not contain files specified by the `extensions` list keyword + that can be defined at the top level of the schema. + """ + if schema_files is None: + schema_files = [] + if extensions is None: + extensions = [] + + log.debug(u"source_file: %s", source_file) + log.debug(u"schema_file: %s", schema_files) + log.debug(u"source_data: %s", source_data) + log.debug(u"schema_data: %s", schema_data) + log.debug(u"extension files: %s", extensions) + + self.source = None + self.schema = None + self.validation_errors = None + self.validation_errors_exceptions = None + self.root_rule = None + self.extensions = extensions + self.errors = [] + self.strict_rule_validation = strict_rule_validation + self.fix_ruby_style_regex = fix_ruby_style_regex + self.allow_assertions = allow_assertions + + if source_file is not None: + if not os.path.exists(source_file): + raise CoreError(u"Provided source_file do not exists on disk: {0}".format(source_file)) + + with open(source_file, "r") as stream: + if source_file.endswith(".json"): + self.source = json.load(stream) + elif source_file.endswith(".yaml") or source_file.endswith('.yml'): + self.source = yaml.safe_load(stream) + else: + raise CoreError(u"Unable to load source_file. Unknown file format of specified file path: {0}".format(source_file)) + + if not isinstance(schema_files, list): + raise CoreError(u"schema_files must be of list type") + + # Merge all schema files into one single file for easy parsing + if len(schema_files) > 0: + schema_data = {} + for f in schema_files: + if not os.path.exists(f): + raise CoreError(u"Provided source_file do not exists on disk : {0}".format(f)) + + with open(f, "r") as stream: + if f.endswith(".json"): + data = json.load(stream) + elif f.endswith(".yaml") or f.endswith(".yml"): + data = yaml.safe_load(stream) + if not data: + raise CoreError(u"No data loaded from file : {0}".format(f)) + else: + raise CoreError(u"Unable to load file : {0} : Unknown file format. Supported file endings is [.json, .yaml, .yml]") + + for key in data.keys(): + if key in schema_data.keys(): + raise CoreError(u"Parsed key : {0} : two times in schema files...".format(key)) + + schema_data = dict(schema_data, **data) + + self.schema = schema_data + + # Nothing was loaded so try the source_data variable + if self.source is None: + log.debug(u"No source file loaded, trying source data variable") + self.source = source_data + if self.schema is None: + log.debug(u"No schema file loaded, trying schema data variable") + self.schema = schema_data + + # Test if anything was loaded + if self.source is None: + raise CoreError(u"No source file/data was loaded") + if self.schema is None: + raise CoreError(u"No schema file/data was loaded") + + # Merge any extensions defined in the schema with the provided list of extensions from the cli + for f in self.schema.get('extensions', []): + self.extensions.append(f) + + if not isinstance(self.extensions, list) and all(isinstance(e, str) for e in self.extensions): + raise CoreError(u"Specified extensions must be a list of file paths") + + self._load_extensions() + + if self.strict_rule_validation: + log.info("Using strict rule keywords validation...") + + def _load_extensions(self): + """ + Load all extension files into the namespace pykwalify.ext + """ + log.debug(u"loading all extensions : %s", self.extensions) + + self.loaded_extensions = [] + + for f in self.extensions: + if not os.path.isabs(f): + f = os.path.abspath(f) + + if not os.path.exists(f): + raise CoreError(u"Extension file: {0} not found on disk".format(f)) + + self.loaded_extensions.append(imp.load_source("", f)) + + log.debug(self.loaded_extensions) + log.debug([dir(m) for m in self.loaded_extensions]) + + def validate(self, raise_exception=True): + """ + """ + log.debug(u"starting core") + + self._start_validate(self.source) + self.validation_errors = [unicode(error) for error in self.errors] + self.validation_errors_exceptions = self.errors + + if self.errors is None or len(self.errors) == 0: + log.info(u"validation.valid") + else: + log.error(u"validation.invalid") + log.error(u" --- All found errors ---") + log.error(self.validation_errors) + if raise_exception: + raise SchemaError(u"Schema validation failed:\n - {error_msg}.".format( + error_msg=u'.\n - '.join(self.validation_errors))) + else: + log.error(u"Errors found but will not raise exception...") + + # Return validated data + return self.source + + def _start_validate(self, value=None): + """ + """ + path = "" + self.errors = [] + done = [] + + s = {} + + # Look for schema; tags so they can be parsed before the root rule is parsed + for k, v in self.schema.items(): + if k.startswith("schema;"): + log.debug(u"Found partial schema; : %s", v) + r = Rule(schema=v) + log.debug(u" Partial schema : %s", r) + pykwalify.partial_schemas[k.split(";", 1)[1]] = r + else: + # readd all items that is not schema; so they can be parsed + s[k] = v + + self.schema = s + + log.debug(u"Building root rule object") + root_rule = Rule(schema=self.schema) + self.root_rule = root_rule + log.debug(u"Done building root rule") + log.debug(u"Root rule: %s", self.root_rule) + + self._validate(value, root_rule, path, done) + + def _validate(self, value, rule, path, done): + """ + """ + log.debug(u"Core validate") + log.debug(u" Root validate : Rule: %s", rule) + log.debug(u" Root validate : Rule_type: %s", rule.type) + log.debug(u" Root validate : Seq: %s", rule.sequence) + log.debug(u" Root validate : Map: %s", rule.mapping) + log.debug(u" Root validate : Done: %s", done) + + if rule.required and value is None and not rule.type == 'none': + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"required.novalue : '{path}'", + path=path, + value=value.encode('unicode_escape') if value else value, + )) + return + + if not rule.nullable and value is None and not rule.type == 'none': + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"nullable.novalue : '{path}'", + path=path, + value=value.encode('unicode_escape') if value else value, + )) + return + + log.debug(u" ? ValidateRule: %s", rule) + if rule.include_name is not None: + self._validate_include(value, rule, path, done=None) + elif rule.sequence is not None: + self._validate_sequence(value, rule, path, done=None) + elif rule.mapping is not None or rule.allowempty_map: + self._validate_mapping(value, rule, path, done=None) + else: + self._validate_scalar(value, rule, path, done=None) + + def _handle_func(self, value, rule, path, done=None): + """ + Helper function that should check if func is specified for this rule and + then handle it for all cases in a generic way. + """ + func = rule.func + + # func keyword is not defined so nothing to do + if not func: + return + + found_method = False + + for extension in self.loaded_extensions: + method = getattr(extension, func, None) + if method: + found_method = True + + # No exception will should be caught. If one is raised it should bubble up all the way. + ret = method(value, rule, path) + + # If False or None or some other object that is interpreted as False + if not ret: + raise CoreError(u"Error when running extension function : {0}".format(func)) + + # Only run the first matched function. Sinc loading order is determined + # it should be easy to determine which file is used before others + break + + if not found_method: + raise CoreError(u"Did not find method '{0}' in any loaded extension file".format(func)) + + def _validate_include(self, value, rule, path, done=None): + """ + """ + # TODO: It is difficult to get a good test case to trigger this if case + if rule.include_name is None: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u'Include name not valid', + path=path, + value=value.encode('unicode_escape'))) + return + include_name = rule.include_name + partial_schema_rule = pykwalify.partial_schemas.get(include_name) + if not partial_schema_rule: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Cannot find partial schema with name '{include_name}'. Existing partial schemas: '{existing_schemas}'. Path: '{path}'", + path=path, + value=value, + include_name=include_name, + existing_schemas=", ".join(sorted(pykwalify.partial_schemas.keys())))) + return + + self._validate(value, partial_schema_rule, path, done) + + def _validate_sequence(self, value, rule, path, done=None): + """ + """ + log.debug(u"Core Validate sequence") + log.debug(u" Sequence : Data: %s", value) + log.debug(u" Sequence : Rule: %s", rule) + log.debug(u" Sequence : RuleType: %s", rule.type) + log.debug(u" Sequence : Path: %s", path) + log.debug(u" Sequence : Seq: %s", rule.sequence) + log.debug(u" Sequence : Map: %s", rule.mapping) + + if len(rule.sequence) <= 0: + raise CoreError(u"Sequence must contains atleast one item : {0}".format(path)) + + if value is None: + log.debug(u" * Core seq: sequence data is None") + return + + if not isinstance(value, list): + if isinstance(value, str): + value = value.encode('unicode_escape') + self.errors.append(SchemaError.SchemaErrorEntry( + u"Value '{value}' is not a list. Value path: '{path}'", + path, + value, + )) + return + + # Handle 'func' argument on this sequence + self._handle_func(value, rule, path, done) + + ok_values = [] + error_tracker = [] + + unique_errors = {} + map_unique_errors = {} + + for i, item in enumerate(value): + processed = [] + + for r in rule.sequence: + tmp_errors = [] + + try: + # Create a sub core object to enable error tracking that do not + # collide with this Core objects errors + tmp_core = Core(source_data={}, schema_data={}) + tmp_core.fix_ruby_style_regex = self.fix_ruby_style_regex + tmp_core.allow_assertions = self.allow_assertions + tmp_core.strict_rule_validation = self.strict_rule_validation + tmp_core.loaded_extensions = self.loaded_extensions + tmp_core._validate(item, r, "{0}/{1}".format(path, i), done) + tmp_errors = tmp_core.errors + except NotMappingError: + # For example: If one type was specified as 'map' but data + # was 'str' a exception will be thrown but we should ignore it + pass + except NotSequenceError: + # For example: If one type was specified as 'seq' but data + # was 'str' a exception will be thrown but we shold ignore it + pass + + processed.append(tmp_errors) + + if r.type == "map": + log.debug(u" * Found map inside sequence") + unique_keys = [] + + if r.mapping is None: + log.debug(u" + No rule to apply, prolly because of allowempty: True") + return + + for k, _rule in r.mapping.items(): + log.debug(u" * Key: %s", k) + log.debug(u" * Rule: %s", _rule) + + if _rule.unique or _rule.ident: + unique_keys.append(k) + + if len(unique_keys) > 0: + for v in unique_keys: + table = {} + for j, V in enumerate(value): + # If key do not exists it should be ignored by unique because that is not a broken constraint + val = V.get(v, None) + + if val is None: + continue + + if val in table: + curr_path = "{0}/{1}/{2}".format(path, j, v) + prev_path = "{0}/{1}/{2}".format(path, table[val], v) + s = SchemaError.SchemaErrorEntry( + msg=u"Value '{duplicate}' is not unique. Previous path: '{prev_path}'. Path: '{path}'", + path=curr_path, + value=value, + duplicate=val, + prev_path=prev_path, + ) + map_unique_errors[s.__repr__()] = s + else: + table[val] = j + elif r.unique: + log.debug(u" * Found unique value in sequence") + table = {} + + for j, val in enumerate(value): + if val is None: + continue + + if val in table: + curr_path = "{0}/{1}".format(path, j) + prev_path = "{0}/{1}".format(path, table[val]) + s = SchemaError.SchemaErrorEntry( + msg=u"Value '{duplicate}' is not unique. Previous path: '{prev_path}'. Path: '{path}'", + path=curr_path, + value=value, + duplicate=val, + prev_path=prev_path, + ) + unique_errors[s.__repr__()] = s + else: + table[val] = j + + error_tracker.append(processed) + no_errors = [] + for _errors in processed: + no_errors.append(len(_errors) == 0) + + if rule.matching == "any": + log.debug(u" * any rule %s", True in no_errors) + ok_values.append(True in no_errors) + elif rule.matching == "all": + log.debug(u" * all rule".format(all(no_errors))) + ok_values.append(all(no_errors)) + elif rule.matching == "*": + log.debug(u" * star rule", "...") + ok_values.append(True) + + for _error in unique_errors: + self.errors.append(_error) + + for _error in map_unique_errors: + self.errors.append(_error) + + log.debug(u" * ok : %s", ok_values) + + # All values must pass the validation, otherwise add the parsed errors + # to the global error list and throw up some error. + if not all(ok_values): + # Ignore checking for '*' type because it should allways go through + if rule.matching == "any": + log.debug(u" * Value: %s did not validate against one or more sequence schemas", value) + elif rule.matching == "all": + log.debug(u" * Value: %s did not validate against all possible sequence schemas", value) + + for i, is_ok in enumerate(ok_values): + if not is_ok: + for error in error_tracker[i]: + for e in error: + self.errors.append(e) + + log.debug(u" * Core seq: validation recursivley done...") + + if rule.range is not None: + rr = rule.range + + self._validate_range( + rr.get("max"), + rr.get("min"), + rr.get("max-ex"), + rr.get("min-ex"), + len(value), + path, + "seq", + ) + + def _validate_mapping(self, value, rule, path, done=None): + """ + """ + log.debug(u"Validate mapping") + log.debug(u" Mapping : Data: %s", value) + log.debug(u" Mapping : Rule: %s", rule) + log.debug(u" Mapping : RuleType: %s", rule.type) + log.debug(u" Mapping : Path: %s", path) + log.debug(u" Mapping : Seq: %s", rule.sequence) + log.debug(u" Mapping : Map: %s", rule.mapping) + + if not isinstance(value, dict): + self.errors.append(SchemaError.SchemaErrorEntry( + u"Value '{value}' is not a dict. Value path: '{path}'", + path, + value, + )) + return + + if rule.mapping is None: + log.debug(u" + No rule to apply, prolly because of allowempty: True") + return + + # Handle 'func' argument on this mapping + self._handle_func(value, rule, path, done) + + m = rule.mapping + log.debug(u" Mapping: Rule-Mapping: %s", m) + + if rule.range is not None: + r = rule.range + + self._validate_range( + r.get("max"), + r.get("min"), + r.get("max-ex"), + r.get("min-ex"), + len(value), + path, + "map", + ) + + for k, rr in m.items(): + # Handle if the value of the key contains a include keyword + if rr.include_name is not None: + include_name = rr.include_name + partial_schema_rule = pykwalify.partial_schemas.get(include_name) + + if not partial_schema_rule: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Cannot find partial schema with name '{include_name}'. Existing partial schemas: '{existing_schemas}'. Path: '{path}'", + path=path, + value=value, + include_name=include_name, + existing_schemas=", ".join(sorted(pykwalify.partial_schemas.keys())))) + return + + rr = partial_schema_rule + + # Find out if this is a regex rule + is_regex_rule = False + required_regex = "" + for regex_rule in rule.regex_mappings: + if k == "regex;({})".format(regex_rule.map_regex_rule) or k == "re;({})".format(regex_rule.map_regex_rule): + is_regex_rule = True + required_regex = regex_rule.map_regex_rule + + # Check for the presense of the required key + is_present = False + if not is_regex_rule: + is_present = k in value + else: + is_present = any([re.search(required_regex, v) for v in value]) + + # Specifying =: as key is considered the "default" if no other keys match + if rr.required and not is_present and k != "=": + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Cannot find required key '{key}'. Path: '{path}'", + path=path, + value=value, + key=k)) + if k not in value and rr.default is not None: + value[k] = rr.default + + for k, v in value.items(): + # If no other case was a match, check if a default mapping is valid/present and use + # that one instead + r = m.get(k, m.get('=')) + log.debug(u" Mapping-value : %s", m) + log.debug(u" Mapping-value : %s %s", k, v) + log.debug(u" Mapping-value : %s", r) + + regex_mappings = [(regex_rule, re.search(regex_rule.map_regex_rule, str(k))) for regex_rule in rule.regex_mappings] + log.debug(u" Mapping-value: Mapping Regex matches: %s", regex_mappings) + + if r is not None: + # validate recursively + log.debug(u" Mapping-value: Core Map: validate recursively: %s", r) + self._validate(v, r, u"{0}/{1}".format(path, k), done) + elif any(regex_mappings): + sub_regex_result = [] + + # Found at least one that matches a mapping regex + for mm in regex_mappings: + if mm[1]: + log.debug(u" Mapping-value: Matching regex patter: %s", mm[0]) + self._validate(v, mm[0], "{0}/{1}".format(path, k), done) + sub_regex_result.append(True) + else: + sub_regex_result.append(False) + + if rule.matching_rule == "any": + if any(sub_regex_result): + log.debug(u" Mapping-value: Matched at least one regex") + else: + log.debug(u" Mapping-value: No regex matched") + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Key '{key}' does not match any regex '{regex}'. Path: '{path}'", + path=path, + value=value, + key=k, + regex="' or '".join(sorted([mm[0].map_regex_rule for mm in regex_mappings])))) + elif rule.matching_rule == "all": + if all(sub_regex_result): + log.debug(u" Mapping-value: Matched all regex rules") + else: + log.debug(u" Mapping-value: Did not match all regex rules") + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Key '{key}' does not match all regex '{regex}'. Path: '{path}'", + path=path, + value=value, + key=k, + regex="' and '".join(sorted([mm[0].map_regex_rule for mm in regex_mappings])))) + else: + log.debug(u" Mapping-value: No mapping rule defined") + else: + if not rule.allowempty_map: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Key '{key}' was not defined. Path: '{path}'", + path=path, + value=value, + key=k)) + + def _validate_scalar(self, value, rule, path, done=None): + """ + """ + log.debug(u"Validate scalar") + log.debug(u" Scalar : Value : %s", value) + log.debug(u" Scalar : Rule : %s", rule) + log.debug(u" Scalar : RuleType : %s", rule.type) + log.debug(u" Scalar : Path %s", path) + + # Handle 'func' argument on this scalar + self._handle_func(value, rule, path, done) + + if rule.assertion is not None: + self._validate_assert(rule, value, path) + + if value is None: + return True + + if rule.enum is not None and value not in rule.enum: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Enum '{value}' does not exist. Path: '{path}'", + path=path, + value=nativestr(value) if tt['str'](value) else value, + )) + + # Set default value + if rule.default and value is None: + value = rule.default + + if not self._validate_scalar_type(value, rule.type, path): + return + + if value is None: + return + + if rule.pattern is not None: + # + # Try to trim away the surrounding slashes around ruby style /<regex>/ if they are defined. + # This is a quirk from ruby that they define regex patterns with surrounding slashes. + # Docs on how ruby regex works can be found here: https://ruby-doc.org/core-2.4.0/Regexp.html + # The original ruby implementation uses this code to validate patterns + # unless value.to_s =~ rule.regexp + # Becuase python do not work with surrounding slashes we have to trim them away in order to make the regex work + # + if rule.pattern.startswith('/') and rule.pattern.endswith('/') and self.fix_ruby_style_regex: + rule.pattern = rule.pattern[1:-1] + log.debug("Trimming slashes around ruby style regex. New pattern value: '{0}'".format(rule.pattern)) + + try: + log.debug("Matching pattern '{0}' to regex '{1}".format(rule.pattern, value)) + res = re.match(rule.pattern, value, re.UNICODE) + except TypeError: + res = None + + if res is None: # Not matching + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Value '{value}' does not match pattern '{pattern}'. Path: '{path}'", + path=path, + value=nativestr(str(value)), + pattern=rule._pattern)) + else: + log.debug("Pattern matched...") + + if rule.range is not None: + if not is_scalar(value): + raise CoreError(u"value is not a valid scalar") + + r = rule.range + + try: + v = len(value) + value = v + except Exception: + pass + + self._validate_range( + r.get("max"), + r.get("min"), + r.get("max-ex"), + r.get("min-ex"), + value, + path, + "scalar", + ) + + if rule.length is not None: + self._validate_length( + rule.length, + value, + path, + 'scalar', + ) + + # Validate timestamp + if rule.type == "timestamp": + self._validate_scalar_timestamp(value, path) + + if rule.type == "date": + if not is_scalar(value): + raise CoreError(u'value is not a valid scalar') + date_format = rule.format + self._validate_scalar_date(value, date_format, path) + + def _validate_scalar_timestamp(self, timestamp_value, path): + """ + """ + def _check_int_timestamp_boundaries(timestamp): + """ + """ + if timestamp < 1: + # Timestamp integers can't be negative + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Integer value of timestamp can't be below 0", + path=path, + value=timestamp, + timestamp=str(timestamp), + )) + if timestamp > 2147483647: + # Timestamp integers can't be above the upper limit of + # 32 bit integers + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Integer value of timestamp can't be above 2147483647", + path=path, + value=timestamp, + timestamp=str(timestamp), + )) + + if isinstance(timestamp_value, (int, float)): + _check_int_timestamp_boundaries(timestamp_value) + elif isinstance(timestamp_value, datetime.datetime): + # Datetime objects currently have nothing to validate. + # In the future, more options will be added to datetime validation + pass + elif isinstance(timestamp_value, basestring): + v = timestamp_value.strip() + + # parse("") will give a valid date but it should not be + # considered a valid timestamp + if v == "": + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Timestamp value is empty. Path: '{path}'", + path=path, + value=nativestr(timestamp_value), + timestamp=nativestr(timestamp_value))) + else: + # A string can contain a valid unit timestamp integer. Check if it is valid and validate it + try: + int_v = int(v) + _check_int_timestamp_boundaries(int_v) + except ValueError: + # Just continue to parse it as a timestamp + try: + parse(timestamp_value) + # If it can be parsed then it is valid + except Exception: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Timestamp: '{timestamp}'' is invalid. Path: '{path}'", + path=path, + value=nativestr(timestamp_value), + timestamp=nativestr(timestamp_value))) + else: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Not a valid timestamp", + path=path, + value=timestamp_value, + timestamp=timestamp_value, + )) + + def _validate_scalar_date(self, date_value, date_formats, path): + log.debug(u"Validate date : %(value)s : %(format)s : %(path)s" % { + 'value': date_value, + 'format': date_formats, + 'path': path, + }) + + if isinstance(date_value, str): + # If a date_format is specefied then use strptime on all formats + # If no date_format is specefied then use dateutils.parse() to test the value + log.debug(date_formats) + + if date_formats: + # Run through all date_formats and it is valid if atleast one of them passed time.strptime() parsing + valid = False + for date_format in date_formats: + try: + time.strptime(date_value, date_format) + valid = True + except ValueError: + pass + + if not valid: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Not a valid date: {value} format: {format}. Path: '{path}'", + path=path, + value=date_value, + format=date_format, + )) + return + else: + try: + parse(date_value) + except ValueError: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Not a valid date: {value} Path: '{path}'", + path=path, + value=date_value, + )) + elif isinstance(date_value, (datetime.date, datetime.datetime)): + # If the object already is a datetime or date object it passes validation + pass + else: + # If value is any other type then raise error + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Not a valid date: {value} date must be a string or a datetime.date not a '{type}'", + path=path, + value=date_value, + type=type(date_value).__name__, + )) + + def _validate_length(self, rule, value, path, prefix): + if not is_string(value): + raise CoreError("Value: '{0}' must be a 'str' type for length check to work".format(value)) + + value_length = len(str(value)) + max_, min_, max_ex, min_ex = rule.get('max'), rule.get('min'), rule.get('max-ex'), rule.get('min-ex') + + log.debug( + u"Validate length : %s : %s : %s : %s : %s : %s", + max, min, max_ex, min_ex, value, path, + ) + + if max_ is not None and max_ < value_length: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Value: '{value_str}' has length of '{value}', greater than max limit '{max_}'. Path: '{path}'", + value_str=value, + path=path, + value=len(value), + prefix=prefix, + max_=max_)) + + if min_ is not None and min_ > value_length: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Value: '{value_str}' has length of '{value}', greater than min limit '{min_}'. Path: '{path}'", + value_str=value, + path=path, + value=len(value), + prefix=prefix, + min_=min_)) + + if max_ex is not None and max_ex <= value_length: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Value: '{value_str}' has length of '{value}', greater than max_ex limit '{max_ex}'. Path: '{path}'", + value_str=value, + path=path, + value=len(value), + prefix=prefix, + max_ex=max_ex)) + + if min_ex is not None and min_ex >= value_length: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Value: '{value_str}' has length of '{value}', greater than min_ex limit '{min_ex}'. Path: '{path}'", + value_str=value, + path=path, + value=len(value), + prefix=prefix, + min_ex=min_ex)) + + def _validate_assert(self, rule, value, path): + if not self.allow_assertions: + raise CoreError('To allow usage of keyword "assert" you must use cli flag "--allow-assertions" or set the keyword "allow_assert" in Core class') + + # Small hack to make strings work as a value. + if isinstance(value, str): + assert_value_str = '"{0}"'.format(value) + else: + assert_value_str = '{0}'.format(value) + + assertion_string = "val = {0}; assert {1}".format(assert_value_str, rule.assertion) + try: + exec(assertion_string, {}, {}) + except AssertionError: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Value: '{0}' assertion expression failed ({1})".format(value, rule.assertion), + path=path, + value=value, + )) + return + except Exception as err: + error_class = err.__class__.__name__ + detail = err.args[0] + cl, exc, tb = sys.exc_info() + line_number = traceback.extract_tb(tb)[-1][1] + raise Exception("Unknown error during assertion\n{0}\n{1}\n{2}\n{3}\n{4}\n{5}".format( + error_class, detail, cl, exc, tb, line_number, + )) + + def _validate_range(self, max_, min_, max_ex, min_ex, value, path, prefix): + """ + Validate that value is within range values. + """ + if not isinstance(value, int) and not isinstance(value, float): + raise CoreError("Value must be a integer type") + + log.debug( + u"Validate range : %s : %s : %s : %s : %s : %s", + max_, + min_, + max_ex, + min_ex, + value, + path, + ) + + if max_ is not None and max_ < value: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Type '{prefix}' has size of '{value}', greater than max limit '{max_}'. Path: '{path}'", + path=path, + value=nativestr(value) if tt['str'](value) else value, + prefix=prefix, + max_=max_)) + + if min_ is not None and min_ > value: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Type '{prefix}' has size of '{value}', less than min limit '{min_}'. Path: '{path}'", + path=path, + value=nativestr(value) if tt['str'](value) else value, + prefix=prefix, + min_=min_)) + + if max_ex is not None and max_ex <= value: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Type '{prefix}' has size of '{value}', greater than or equals to max limit(exclusive) '{max_ex}'. Path: '{path}'", + path=path, + value=nativestr(value) if tt['str'](value) else value, + prefix=prefix, + max_ex=max_ex)) + + if min_ex is not None and min_ex >= value: + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Type '{prefix}' has size of '{value}', less than or equals to min limit(exclusive) '{min_ex}'. Path: '{path}'", + path=path, + value=nativestr(value) if tt['str'](value) else value, + prefix=prefix, + min_ex=min_ex)) + + def _validate_scalar_type(self, value, t, path): + """ + """ + log.debug(u" # Core scalar: validating scalar type : %s", t) + log.debug(u" # Core scalar: scalar type: %s", type(value)) + + try: + if not tt[t](value): + self.errors.append(SchemaError.SchemaErrorEntry( + msg=u"Value '{value}' is not of type '{scalar_type}'. Path: '{path}'", + path=path, + value=unicode(value) if tt['str'](value) else value, + scalar_type=t)) + return False + return True + except KeyError as e: + # Type not found in valid types mapping + log.debug(e) + raise CoreError(u"Unknown type check: {0!s} : {1!s} : {2!s}".format(path, value, t)) diff --git a/pykwalify/errors.py b/pykwalify/errors.py new file mode 100644 index 0000000..c9f9b54 --- /dev/null +++ b/pykwalify/errors.py @@ -0,0 +1,237 @@ +# -*- coding: utf-8 -*- + +""" pyKwalify - errors.py """ + +# python stdlib +from pykwalify.compat import basestring + +retcodes = { + # PyKwalifyExit + 0: 'noerror', + + # UnknownError + 1: 'unknownerror', + + # SchemaError + # e.g. when a rule or the core finds an error + 2: 'schemaerror', + + # CoreError + # e.g. when the core finds an error that is not a SchemaError + 3: 'coreerror', + + # RuleError + # e.g. when the rule class finds an error that is not a SchemaError, similar to CoreError + 4: 'ruleerror', + + # SchemaConflict + # e.g. when a schema conflict occurs + 5: 'schemaconflict', + + # NotMappingError + # e.g. when a value is not a mapping when it was expected it should be + 6: 'notmaperror', + + # NotSequenceError + # e.g. when a value is not a sequence when it was expected it should be + 7: 'notsequenceerror', +} + + +retnames = dict((v, k) for (k, v) in retcodes.items()) + + +class PyKwalifyException(RuntimeError): + """ + """ + + def __init__(self, msg=None, error_key=None, retcode=None, path=None): + """ + Arguments: + - `msg`: a string + - `error_key`: a unique string that makes it easier to identify what error it is + - `retcode`: an integer, defined in PyKwalify.errors.retcodes + """ + self.msg = msg or "" + self.retcode = retcode or retnames['unknownerror'] + self.retname = retcodes[retcode] + self.error_key = error_key + self.path = path or "/" + + def __str__(self): + """ + """ + # <PyKwalifyException msg='foo bar' retcode=1> + # kwargs = [] + # if self.msg: + # kwargs.append("msg='{0}'".format(self.msg)) + # if self.retcode != retnames['noerror']: + # kwargs.append("retcode=%d" % self.retcode) + # if kwargs: + # kwargs.insert(0, '') + # return "<{0}{1}>".format(self.__class__.__name__, ' '.join(kwargs)) + + # <PyKwalifyException: error code 1: foo bar> + kwargs = [] + if self.retcode != retnames['noerror']: + kwargs.append("error code {0}".format(self.retcode)) + if self.msg: + kwargs.append(self.msg) + if kwargs: + kwargs.insert(0, '') + if self.path: + kwargs.append("Path: '{0}'".format(self.path)) + return "<{0}{1}>".format(self.__class__.__name__, ': '.join(kwargs)) + + def __repr__(self): + """ + """ + kwargs = [] + if self.msg: + kwargs.append("msg='{0}'".format(self.msg)) + return "{0}({1})".format(self.__class__.__name__, ', '.join(kwargs)) + + def msg(): + doc = """ """ + + def fget(self): + return self._msg + + def fset(self, value): + assert isinstance(value, basestring), "argument is not string" + self._msg = value + + return locals() + msg = property(**msg()) + + def retcode(): + doc = """ """ + + def fget(self): + return self._retcode + + def fset(self, value): + assert isinstance(value, int), "argument is not integer" + self._retcode = value + + return locals() + retcode = property(**retcode()) + + def retname(): + doc = """ """ + + def fget(self): + return self._retname + + def fset(self, value): + assert isinstance(value, str), "argument is not string" + self._retname = value + + return locals() + retname = property(**retname()) + + +class UnknownError(PyKwalifyException): + """ + """ + def __init__(self, *args, **kwargs): + """ + """ + assert 'retcode' not in kwargs, "keyword retcode implicitly defined" + super(self.__class__, self).__init__( + retcode=retnames['unknownerror'], + *args, **kwargs + ) + + +class SchemaError(PyKwalifyException): + """ + """ + class SchemaErrorEntry(object): + """ + """ + def __init__(self, msg, path, value, **kwargs): + """ + """ + self.msg = msg + self.path = path + self.value = value + for key, value in kwargs.items(): + self.__setattr__(key, value) + + def __repr__(self): + return self.msg.format(**self.__dict__) + + def __init__(self, *args, **kwargs): + """ + """ + assert "retcode" not in kwargs, "keyword retcode implicitly defined" + super(self.__class__, self).__init__( + retcode=retnames["schemaerror"], + *args, **kwargs + ) + + +class CoreError(PyKwalifyException): + """ + """ + def __init__(self, *args, **kwargs): + """ + """ + assert "retcode" not in kwargs, "keyword retcode implicitly defined" + super(self.__class__, self).__init__( + retcode=retnames["coreerror"], + *args, **kwargs + ) + + +class NotMappingError(PyKwalifyException): + """ + """ + def __init__(self, *args, **kwargs): + """ + """ + assert "retcode" not in kwargs, "keyword retcode implicitly defined" + super(self.__class__, self).__init__( + retcode=retnames['notmaperror'], + *args, **kwargs + ) + + +class NotSequenceError(PyKwalifyException): + """ + """ + def __init__(self, *args, **kwargs): + """ + """ + assert "retcode" not in kwargs, "keyword retcode implicitly defined" + super(self.__class__, self).__init__( + retcode=retnames['notsequenceerror'], + *args, **kwargs + ) + + +class RuleError(PyKwalifyException): + """ + """ + def __init__(self, *args, **kwargs): + """ + """ + assert "retcode" not in kwargs, "keyword retcode implicitly defined" + super(self.__class__, self).__init__( + retcode=retnames["ruleerror"], + *args, **kwargs + ) + + +class SchemaConflict(PyKwalifyException): + """ + """ + def __init__(self, *args, **kwargs): + """ + """ + assert "retcode" not in kwargs, "keyword retcode implicitly defined" + super(self.__class__, self).__init__( + retcode=retnames["schemaconflict"], + *args, **kwargs + ) diff --git a/pykwalify/rule.py b/pykwalify/rule.py new file mode 100644 index 0000000..7ac2c9e --- /dev/null +++ b/pykwalify/rule.py @@ -0,0 +1,1358 @@ +# -*- coding: utf-8 -*- + +""" pyKwalify - rule.py """ + +# python stdlib +import logging +import re + +# pykwalify imports +from pykwalify.compat import basestring +from pykwalify.errors import SchemaConflict, RuleError +from pykwalify.types import ( + DEFAULT_TYPE, + is_bool, + is_builtin_type, + is_collection_type, + is_number, + is_string, + mapping_aliases, + sequence_aliases, + type_class, +) + +log = logging.getLogger(__name__) + + +class Rule(object): + """ Rule class that handles a rule constraint """ + + def __init__(self, schema=None, parent=None, strict_rule_validation=False): + self._allowempty_map = None + self._assertion = None + self._default = None + self._desc = None + self._enum = None + self._example = None + self._extensions = None + self._format = None + self._func = None + self._ident = None + self._include_name = None + self._length = None + self._map_regex_rule = None + self._mapping = None + # Possible values: [any, all, *] + self._matching = "any" + self._matching_rule = "any" + self._name = None + self._nullable = True + self._parent = parent + self._pattern = None + self._pattern_regexp = None + self._range = None + self._regex_mappings = None + self._required = False + self._schema = schema + self._schema_str = schema + self._sequence = None + self.strict_rule_validation = strict_rule_validation + self._type = None + self._type_class = None + self._unique = None + self._version = None + + if isinstance(schema, dict): + self.init(schema, "") + + @property + def allowempty_map(self): + return self._allowempty_map + + @allowempty_map.setter + def allowempty_map(self, value): + self._allowempty_map = value + + @property + def assertion(self): + return self._assertion + + @assertion.setter + def assertion(self, value): + self._assertion = value + + @property + def default(self): + return self._default + + @default.setter + def default(self, value): + self._default = value + + @property + def desc(self): + return self._desc + + @desc.setter + def desc(self, value): + self._desc = value + + @property + def enum(self): + return self._enum + + @enum.setter + def enum(self, value): + self._enum = value + + @property + def example(self): + return self._example + + @example.setter + def example(self, value): + self._example = value + + @property + def extensions(self): + return self._extensions + + @extensions.setter + def extensions(self, value): + self._extensions = value + + @property + def format(self): + return self._format + + @format.setter + def format(self, value): + self._format = value + + @property + def func(self): + return self._func + + @func.setter + def func(self, value): + self._func = value + + @property + def ident(self): + return self._ident + + @ident.setter + def ident(self, value): + self._ident = value + + @property + def include_name(self): + return self._include_name + + @include_name.setter + def include_name(self, value): + self._include_name = value + + @property + def length(self): + return self._length + + @length.setter + def length(self, value): + self._length = value + + @property + def map_regex_rule(self): + return self._map_regex_rule + + @map_regex_rule.setter + def map_regex_rule(self, value): + self._map_regex_rule = value + + @property + def mapping(self): + return self._mapping + + @mapping.setter + def mapping(self, value): + self._mapping = value + + @property + def matching(self): + return self._matching + + @matching.setter + def matching(self, value): + self._matching = value + + @property + def matching_rule(self): + return self._matching_rule + + @matching_rule.setter + def matching_rule(self, value): + self._matching_rule = value + + @property + def name(self): + return self._name + + @name.setter + def name(self, value): + self._name = value + + @property + def nullable(self): + return self._nullable + + @nullable.setter + def nullable(self, value): + self._nullable = value + + @property + def parent(self): + return self._parent + + @parent.setter + def parent(self, value): + self._parent = value + + @property + def pattern(self): + return self._pattern + + @pattern.setter + def pattern(self, value): + self._pattern = value + + @property + def pattern_regexp(self): + return self._pattern_regexp + + @pattern_regexp.setter + def pattern_regexp(self, value): + self._pattern_regexp = value + + @property + def range(self): + return self._range + + @range.setter + def range(self, value): + self._range = value + + @property + def regex_mappings(self): + return self._regex_mappings + + @regex_mappings.setter + def regex_mappings(self, value): + self._regex_mappings = value + + @property + def required(self): + return self._required + + @required.setter + def required(self, value): + self._required = value + + @property + def schema(self): + return self._schema + + @schema.setter + def schema(self, value): + self._schema = value + + @property + def schema_str(self): + return self._schema_str + + @schema_str.setter + def schema_str(self, value): + self._schema_str = value + + @property + def sequence(self): + return self._sequence + + @sequence.setter + def sequence(self, value): + self._sequence = value + + @property + def type(self): + return self._type + + @type.setter + def type(self, value): + self._type = value + + @property + def type_class(self): + return self._type_class + + @type_class.setter + def type_class(self, value): + self._type_class = value + + @property + def unique(self): + return self._unique + + @unique.setter + def unique(self, value): + self._unique = value + + @property + def version(self): + return self._version + + @version.setter + def version(self, value): + self._version = value + + def __str__(self): + return "Rule: {0}".format(str(self.schema_str)) + + def keywords(self): + """ + Returns a list of all keywords that this rule object has defined. + A keyword is considered defined if the value it returns != None. + """ + defined_keywords = [ + ('allowempty_map', 'allowempty_map'), + ('assertion', 'assertion'), + ('default', 'default'), + ('class', 'class'), + ('desc', 'desc'), + ('enum', 'enum'), + ('example', 'example'), + ('extensions', 'extensions'), + ('format', 'format'), + ('func', 'func'), + ('ident', 'ident'), + ('include_name', 'include'), + ('length', 'length'), + ('map_regex_rule', 'map_regex_rule'), + ('mapping', 'mapping'), + ('matching', 'matching'), + ('matching_rule', 'matching_rule'), + ('name', 'name'), + ('nullable', 'nullable') + ('parent', 'parent'), + ('pattern', 'pattern'), + ('pattern_regexp', 'pattern_regexp'), + ('range', 'range'), + ('regex_mappings', 'regex_mappings'), + ('required', 'required'), + ('schema', 'schema'), + ('schema_str', 'schema_str'), + ('sequence', 'sequence'), + ('type', 'type'), + ('type_class', 'type_class'), + ('unique', 'unique'), + ('version', 'version'), + ] + found_keywords = [] + + for var_name, keyword_name in defined_keywords: + if getattr(self, var_name, None): + found_keywords.append(keyword_name) + + return found_keywords + + def init(self, schema, path): + """ + """ + log.debug(u"Init schema: %s", schema) + + include = schema.get("include") + + # Check if this item is a include, overwrite schema with include schema and continue to parse + if include: + log.debug(u"Found include tag...") + self.include_name = include + return + + t = None + rule = self + + if schema is not None: + if "type" not in schema: + # Mapping and sequence do not need explicit type defenitions + if any(sa in schema for sa in sequence_aliases): + t = "seq" + self.init_type_value(t, rule, path) + elif any(ma in schema for ma in mapping_aliases): + t = "map" + self.init_type_value(t, rule, path) + else: + t = DEFAULT_TYPE + self.type = t + else: + if not is_string(schema["type"]): + raise RuleError( + msg=u"Key 'type' in schema rule is not a string type (found %s)" % type(schema["type"]).__name__, + error_key=u"type.not_string", + path=path, + ) + + self.type = schema["type"] + + self.schema_str = schema + + if not t: + t = schema["type"] + self.init_type_value(t, rule, path) + + func_mapping = { + "allowempty": self.init_allow_empty_map, + "assert": self.init_assert_value, + "class": lambda x, y, z: (), + "default": self.init_default_value, + "desc": self.init_desc_value, + "enum": self.init_enum_value, + "example": self.init_example, + "extensions": self.init_extensions, + "format": self.init_format_value, + "func": self.init_func, + "ident": self.init_ident_value, + "length": self.init_length_value, + "map": self.init_mapping_value, + "mapping": self.init_mapping_value, + "matching": self.init_matching, + "matching-rule": self.init_matching_rule, + "name": self.init_name_value, + "nul": self.init_nullable_value, + "nullable": self.init_nullable_value, + "pattern": self.init_pattern_value, + "range": self.init_range_value, + "req": self.init_required_value, + "required": self.init_required_value, + "seq": self.init_sequence_value, + "sequence": self.init_sequence_value, + "type": lambda x, y, z: (), + "unique": self.init_unique_value, + "version": self.init_version, + } + + for k, v in schema.items(): + if k in func_mapping: + func_mapping[k](v, rule, path) + elif k.startswith("schema;"): + # Schema tag is only allowed on top level of data + log.debug(u"Found schema tag...") + raise RuleError( + msg=u"Schema is only allowed on top level of schema file", + error_key=u"schema.not.toplevel", + path=path, + ) + else: + raise RuleError( + msg=u"Unknown key: {0} found".format(k), + error_key=u"key.unknown", + path=path, + ) + + self.check_conflicts(schema, rule, path) + + self.check_type_keywords(schema, rule, path) + + def init_format_value(self, v, rule, path): + log.debug(u"Init format value : %s", path) + + if is_string(v): + self._format = [v] + elif isinstance(v, list): + valid = True + for date_format in v: + if not isinstance(date_format, basestring): + valid = False + + if valid: + self._format = v + else: + raise RuleError( + msg=u"All values in format list must be strings", + error_key=u"format.not_string", + path=path, + ) + else: + raise RuleError( + msg=u"Value of format keyword: '{}' must be a string or list or string values".format(v), + error_key=u"format.not_string", + path=path, + ) + + valid_types = ("date", ) + + # Format is only supported when used with "type=date" + if self._type not in valid_types: + raise RuleError( + msg="Keyword format is only allowed when used with the following types: {0}".format(valid_types), + error_key=u"format.not_used_with_correct_type", + path=path, + ) + + def init_version(self, v, rule, path): + """ + """ + log.debug(u"Init version value : {0}".format(path)) + + self._version = str(v) + + def init_example(self, v, rule, path): + log.debug(u'Init example value : {0}'.format(path)) + + if not is_string(v): + raise RuleError( + msg=u"Value: {0} for keyword example must be a string".format(v), + error_key=u"example.not_string", + path=path, + ) + + self.desc = v + + def init_length_value(self, v, rule, path): + log.debug(u'Init length value : {0}'.format(path)) + + supported_types = ["str", "int", "float", "number", "map", "seq"] + + if not isinstance(v, dict): + raise RuleError( + msg=u"Length value is not a dict type: '{0}'".format(v), + error_key=u"length.not_map", + path=path, + ) + + if self.type not in supported_types: + raise RuleError( + msg=u"Length value type: '{0}' is not a supported type".format(self.type), + error_key=u"length.not_supported_type", + path=path, + ) + + # dict that should contain min, max, min-ex, max-ex keys + self.length = v + + # This should validate that only min, max, min-ex, max-ex exists in the dict + for k, v in self.length.items(): + if k not in ["max", "min", "max-ex", "min-ex"]: + raise RuleError( + msg=u"Unknown key: '{0}' found in length keyword".format(k), + error_key=u"length.unknown_key", + path=path, + ) + + if "max" in self.length and "max-ex" in self.length: + raise RuleError( + msg=u"'max' and 'max-ex' can't be used in the same length rule", + error_key=u"length.max_duplicate_keywords", + path=path, + ) + + if "min" in self.length and "min-ex" in self.length: + raise RuleError( + msg=u"'min' and 'min-ex' can't be used in the same length rule", + error_key=u"length.min_duplicate_keywords", + path=path, + ) + + max = self.length.get("max") + min = self.length.get("min") + max_ex = self.length.get("max-ex") + min_ex = self.length.get("min-ex") + + if max is not None and not is_number(max) or is_bool(max): + raise RuleError( + msg=u"Value: '{0}' for 'max' keyword is not a number".format(v), + error_key=u"length.max.not_number", + path=path, + ) + + if min is not None and not is_number(min) or is_bool(min): + raise RuleError( + msg=u"Value: '{0}' for 'min' keyword is not a number".format(v), + error_key=u"length.min.not_number", + path=path, + ) + + if max_ex is not None and not is_number(max_ex) or is_bool(max_ex): + raise RuleError( + msg=u"Value: '{0}' for 'max-ex' keyword is not a number".format(v), + error_key=u"length.max_ex.not_number", + path=path, + ) + + if min_ex is not None and not is_number(min_ex) or is_bool(min_ex): + raise RuleError( + msg=u"Value: '{0}' for 'min-ex' keyword is not a number".format(v), + error_key=u"length.min_ex.not_number", + path=path, + ) + + # only numbers allow negative lengths + # string, map and seq require non negative lengtsh + if self.type not in ["int", "float", "number"]: + if min is not None and min < 0: + raise RuleError( + msg=u"Value for 'min' can't be negative in case of type {0}.".format(self.type), + error_key=u"length.min_negative", + path=path, + ) + elif min_ex is not None and min_ex < 0: + raise RuleError( + msg=u"Value for 'min-ex' can't be negative in case of type {0}.".format(self.type), + error_key=u"length.min-ex_negative", + path=path, + ) + if max is not None and max < 0: + raise RuleError( + msg=u"Value for 'max' can't be negative in case of type {0}.".format(self.type), + error_key=u"length.max_negative", + path=path, + ) + elif max_ex is not None and max_ex < 0: + raise RuleError( + msg=u"Value for 'max-ex' can't be negative in case of type {0}.".format(self.type), + error_key=u"length.max-ex_negative", + path=path, + ) + + if max is not None: + if min is not None and max < min: + raise RuleError( + msg=u"Value for 'max' can't be less then value for 'min'. {0} < {1}".format(max, min), + error_key=u"length.max_lt_min", + path=path, + ) + elif min_ex is not None and max <= min_ex: + raise RuleError( + msg=u"Value for 'max' can't be less then value for 'min-ex'. {0} <= {1}".format(max, min_ex), + error_key=u"length.max_le_min-ex", + path=path, + ) + elif max_ex is not None: + if min is not None and max_ex < min: + raise RuleError( + msg=u"Value for 'max-ex' can't be less then value for 'min'. {0} < {1}".format(max_ex, min), + error_key=u"length.max-ex_le_min", + path=path, + ) + elif min_ex is not None and max_ex <= min_ex: + raise RuleError( + msg=u"Value for 'max-ex' can't be less then value for 'min-ex'. {0} <= {1}".format(max_ex, min_ex), + error_key=u"length.max-ex_le_min-ex", + path=path, + ) + + def init_func(self, v, rule, path): + """ + """ + if not is_string(v): + raise RuleError( + msg=u"Value: {0} for func keyword must be a string".format(v), + error_key=u"func.notstring", + path=path, + ) + + self.func = v + + def init_extensions(self, v, rule, path): + """ + """ + if not isinstance(v, list): + raise RuleError( + msg=u"Extension definition should be a list", + error_key=u"extension.not_list", + path=path, + ) + + # TODO: Add limitation that this keyword can only be used at the top level of the file + + self.extensions = v + + def init_matching_rule(self, v, rule, path): + """ + """ + log.debug(u"Init matching-rule: %s", path) + log.debug(u"%s %s", v, rule) + + # Verify that the provided rule is part of one of the allowed one + allowed = ["any", "all"] + # ["none", "one"] Is currently awaiting proper implementation + if v not in allowed: + raise RuleError( + msg=u"Specified rule in key: {0} is not part of allowed rule set : {1}".format(v, allowed), + error_key=u"matching_rule.not_allowed", + path=path, + ) + else: + self.matching_rule = v + + def init_allow_empty_map(self, v, rule, path): + """ + """ + log.debug(u"Init allow empty value: %s", path) + log.debug(u"Type: %s : %s", v, rule) + + self.allowempty_map = v + + def init_type_value(self, v, rule, path): + """ + """ + log.debug(u"Init type value : %s", path) + log.debug(u"Type: %s %s", v, rule) + + if v is None: + v = DEFAULT_TYPE + + self.type = v + self.type_class = type_class(v) + + if not is_builtin_type(self.type): + raise RuleError( + msg=u"Type: {0} is not any of the known types".format(self.type), + error_key=u"type.unknown", + path=path, + ) + + def init_matching(self, v, rule, path): + """ + """ + log.debug(u"Init matching rule : %s", path) + + valid_values = ["any", "all", "*"] + + if str(v) not in valid_values: + raise RuleError( + msg=u"matching value: {0} is not one of {1}".format(str(v), valid_values), + error_key=u"matching_rule.invalid", + path=path, + ) + + self.matching = str(v) + + def init_name_value(self, v, rule, path): + """ + """ + log.debug(u"Init name value : %s", path) + + if not is_string(v): + raise RuleError( + msg=u"Value: {0} for keyword name must be a string".format(v), + error_key=u"name.not_string", + path=path, + ) + + self.name = v + + def init_nullable_value(self, v, rule, path): + """ + """ + log.debug(u"Init nullable value : %s", path) + + if not isinstance(v, bool): + raise RuleError( + msg=u"Value: '{0}' for nullable keyword must be a boolean".format(v), + error_key=u"nullable.not_bool", + path=path, + ) + + self.nullable = v + + def init_desc_value(self, v, rule, path): + """ + """ + log.debug(u"Init descr value : %s", path) + + if not is_string(v): + raise RuleError( + msg=u"Value: {0} for keyword desc must be a string".format(v), + error_key=u"desc.not_string", + path=path, + ) + + self.desc = v + + def init_required_value(self, v, rule, path): + """ + """ + log.debug(u"Init required value : %s", path) + + if not is_bool(v): + raise RuleError( + msg=u"Value: '{0}' for required keyword must be a boolean".format(v), + error_key=u"required.not_bool", + path=path, + ) + self.required = v + + def init_pattern_value(self, v, rule, path): + """ + """ + log.debug(u"Init pattern value : %s", path) + + if not is_string(v): + raise RuleError( + msg=u"Value of pattern keyword: '{0}' is not a string".format(v), + error_key=u"pattern.not_string", + path=path, + ) + + self.pattern = v + + if self.schema_str["type"] == "map": + raise RuleError( + msg=u"Keyword pattern is not allowed inside map", + error_key=u"pattern.not_allowed_in_map", + path=path, + ) + + # TODO: Some form of validation of the regexp? it exists in the source + + try: + self.pattern_regexp = re.compile(self.pattern) + except Exception: + raise RuleError( + msg=u"Syntax error when compiling regex pattern: {0}".format(self.pattern_regexp), + error_key=u"pattern.syntax_error", + path=path, + ) + + def init_enum_value(self, v, rule, path): + """ + """ + log.debug(u"Init enum value : %s", path) + + if not isinstance(v, list): + raise RuleError( + msg=u"Enum is not a sequence", + error_key=u"enum.not_seq", + path=path, + ) + self.enum = v + + if is_collection_type(self.type): + raise RuleError( + msg=u"Enum is not a scalar", + error_key=u"enum.not_scalar", + path=path, + ) + + lookup = set() + for item in v: + if not isinstance(item, self.type_class): + raise RuleError( + msg=u"Item: '{0}' in enum is not of correct class type: '{1}'".format(item, self.type_class), + error_key=u"enum.type.unmatch", + path=path, + ) + + if item in lookup: + raise RuleError( + msg=u"Duplicate items: '{0}' found in enum".format(item), + error_key=u"enum.duplicate_items", + path=path, + ) + + lookup.add(item) + + def init_assert_value(self, v, rule, path): + """ + """ + log.debug(u"Init assert value : %s", path) + + if not is_string(v): + raise RuleError( + msg=u"Value: '{0}' for keyword 'assert' is not a string".format(v), + error_key=u"assert.not_str", + path=path, + ) + + self.assertion = v + + if any(k in self.assertion for k in (';', 'import', '__import__')): + raise RuleError( + msg=u"Value: '{assertion}' contain invalid content that is not allowed to be present in assertion keyword".format(assertion=self.assertion), + error_key=u"assert.unsupported_content", + path=path, + ) + + def init_range_value(self, v, rule, path): + """ + """ + log.debug(u"Init range value : %s", path) + + supported_types = ["str", "int", "float", "number", "map", "seq"] + + if not isinstance(v, dict): + raise RuleError( + msg=u"Range value is not a dict type: '{0}'".format(v), + error_key=u"range.not_map", + path=path, + ) + + if self.type not in supported_types: + raise RuleError( + msg=u"Range value type: '{0}' is not a supported type".format(self.type), + error_key=u"range.not_supported_type", + path=path, + ) + + # dict that should contain min, max, min-ex, max-ex keys + self.range = v + + # This should validate that only min, max, min-ex, max-ex exists in the dict + for k, v in self.range.items(): + if k not in ["max", "min", "max-ex", "min-ex"]: + raise RuleError( + msg=u"Unknown key: '{0}' found in range keyword".format(k), + error_key=u"range.unknown_key", + path=path, + ) + + if "max" in self.range and "max-ex" in self.range: + raise RuleError( + msg=u"'max' and 'max-ex' can't be used in the same range rule", + error_key=u"range.max_duplicate_keywords", + path=path, + ) + + if "min" in self.range and "min-ex" in self.range: + raise RuleError( + msg=u"'min' and 'min-ex' can't be used in the same range rule", + error_key=u"range.min_duplicate_keywords", + path=path, + ) + + max = self.range.get("max") + min = self.range.get("min") + max_ex = self.range.get("max-ex") + min_ex = self.range.get("min-ex") + + if max is not None and not is_number(max) or is_bool(max): + raise RuleError( + msg=u"Value: '{0}' for 'max' keyword is not a number".format(v), + error_key=u"range.max.not_number", + path=path, + ) + + if min is not None and not is_number(min) or is_bool(min): + raise RuleError( + msg=u"Value: '{0}' for 'min' keyword is not a number".format(v), + error_key=u"range.min.not_number", + path=path, + ) + + if max_ex is not None and not is_number(max_ex) or is_bool(max_ex): + raise RuleError( + msg=u"Value: '{0}' for 'max-ex' keyword is not a number".format(v), + error_key=u"range.max_ex.not_number", + path=path, + ) + + if min_ex is not None and not is_number(min_ex) or is_bool(min_ex): + raise RuleError( + msg=u"Value: '{0}' for 'min-ex' keyword is not a number".format(v), + error_key=u"range.min_ex.not_number", + path=path, + ) + + # only numbers allow negative ranges + # string, map and seq require non negative ranges + if self.type not in ["int", "float", "number"]: + if min is not None and min < 0: + raise RuleError( + msg=u"Value for 'min' can't be negative in case of type {0}.".format(self.type), + error_key=u"range.min_negative", + path=path, + ) + elif min_ex is not None and min_ex < 0: + raise RuleError( + msg=u"Value for 'min-ex' can't be negative in case of type {0}.".format(self.type), + error_key=u"range.min-ex_negative", + path=path, + ) + if max is not None and max < 0: + raise RuleError( + msg=u"Value for 'max' can't be negative in case of type {0}.".format(self.type), + error_key=u"range.max_negative", + path=path, + ) + elif max_ex is not None and max_ex < 0: + raise RuleError( + msg=u"Value for 'max-ex' can't be negative in case of type {0}.".format(self.type), + error_key=u"range.max-ex_negative", + path=path, + ) + + if max is not None: + if min is not None and max < min: + raise RuleError( + msg=u"Value for 'max' can't be less then value for 'min'. {0} < {1}".format(max, min), + error_key=u"range.max_lt_min", + path=path, + ) + elif min_ex is not None and max <= min_ex: + raise RuleError( + msg=u"Value for 'max' can't be less then value for 'min-ex'. {0} <= {1}".format(max, min_ex), + error_key=u"range.max_le_min-ex", + path=path, + ) + elif max_ex is not None: + if min is not None and max_ex < min: + raise RuleError( + msg=u"Value for 'max-ex' can't be less then value for 'min'. {0} < {1}".format(max_ex, min), + error_key=u"range.max-ex_le_min", + path=path, + ) + elif min_ex is not None and max_ex <= min_ex: + raise RuleError( + msg=u"Value for 'max-ex' can't be less then value for 'min-ex'. {0} <= {1}".format(max_ex, min_ex), + error_key=u"range.max-ex_le_min-ex", + path=path, + ) + + def init_ident_value(self, v, rule, path): + """ + """ + log.debug(u"Init ident value : %s", path) + + if v is None or not is_bool(v): + raise RuleError( + msg=u"Value: '{0}' of 'ident' is not a boolean value".format(v), + error_key=u"ident.not_bool", + path=path, + ) + + self.ident = bool(v) + self.required = True + + if is_collection_type(self.type): + raise RuleError( + msg=u"Value: '{0}' of 'ident' is not a scalar value".format(v), + error_key=u"ident.not_scalar", + path=path, + ) + + if path == "": + raise RuleError( + msg=u"Keyword 'ident' can't be on root level of schema", + error_key=u"ident.not_on_root_level", + path=path, + ) + + if self.parent is None or not self.parent.type == "map": + raise RuleError( + msg=u"Keword 'ident' can't be inside 'map'", + error_key=u"ident.not_in_map", + path=path, + ) + + def init_unique_value(self, v, rule, path): + """ + """ + log.debug(u"Init unique value : %s", path) + + if not is_bool(v): + raise RuleError( + msg=u"Value: '{0}' for 'unique' keyword is not boolean".format(v), + error_key=u"unique.not_bool", + path=path, + ) + + self.unique = v + + if is_collection_type(self.type): + raise RuleError( + msg=u"Type of the value: '{0}' for 'unique' keyword is not a scalar type".format(self.type), + error_key=u"unique.not_scalar", + path=path, + ) + if path == "": + raise RuleError( + msg=u"Keyword 'unique' can't be on root level of schema", + error_key=u"unique.not_on_root_level", + path=path, + ) + + def init_sequence_value(self, v, rule, path): + """ + """ + log.debug(u"Init sequence value : %s", path) + + if v is not None and not isinstance(v, list): + raise RuleError( + msg=u"Sequence keyword is not a list", + error_key=u"sequence.not_seq", + path=path, + ) + + self.sequence = v + + if self.sequence is None or len(self.sequence) == 0: + raise RuleError( + msg=u"Sequence contains 0 elements", + error_key=u"sequence.no_elements", + path=path, + ) + + tmp_seq = [] + + for i, e in enumerate(self.sequence): + elem = e or {} + + rule = Rule(None, self) + rule.init(elem, u"{0}/sequence/{1}".format(path, i)) + + tmp_seq.append(rule) + + self.sequence = tmp_seq + + return rule + + def init_mapping_value(self, v, rule, path): + """ + """ + # Check for duplicate use of 'map' and 'mapping' + if self.mapping: + raise RuleError( + msg=u"Keywords 'map' and 'mapping' can't be used on the same level", + error_key=u"mapping.duplicate_keywords", + path=path, + ) + + log.debug(u"Init mapping value : %s", path) + + if v is not None and not isinstance(v, dict): + raise RuleError( + msg=u"Value for keyword 'map/mapping' is not a dict", + error_key=u"mapping.not_dict", + path=path, + ) + + if v is None or len(v) == 0: + raise RuleError( + msg=u"Mapping do not contain any elements", + error_key=u"mapping.no_elements", + path=path, + ) + + self.mapping = {} + self.regex_mappings = [] + + for k, v in v.items(): + if v is None: + v = {} + + # Check if this is a regex rule. Handle specially + if k.startswith("regex;") or k.startswith("re;"): + log.debug(u"Found regex map rule") + regex = k.split(";", 1) + if len(regex) != 2: + raise RuleError( + msg=u"Value: '{0}' for keyword regex is malformed".format(k), + error_key=u"mapping.regex.malformed", + path=path, + ) + else: + regex = regex[1] + try: + re.compile(regex) + except Exception as e: + log.debug(e) + raise RuleError( + msg=u"Unable to compile regex '{0}'".format(regex), + error_key=u"mapping.regex.compile_error", + path=path, + ) + + regex_rule = Rule(None, self) + regex_rule.init(v, u"{0}/mapping;regex/{1}".format(path, regex[1:-1])) + regex_rule.map_regex_rule = regex[1:-1] + self.regex_mappings.append(regex_rule) + self.mapping[k] = regex_rule + else: + rule = Rule(None, self) + rule.init(v, u"{0}/mapping/{1}".format(path, k)) + self.mapping[k] = rule + + return rule + + def init_default_value(self, v, rule, path): + """ + """ + log.debug(u"Init default value : %s", path) + self.default = v + + if is_collection_type(self.type): + raise RuleError( + msg=u"Value: {0} for keyword 'default' is not a scalar type".format(v), + error_key=u"default.not_scalar", + path=path, + ) + + if self.type == "map" or self.type == "seq": + raise RuleError( + msg=u"Value: {0} for keyword 'default' is not a scalar type".format(v), + error_key=u"default.not_scalar", + path=path, + ) + + if not isinstance(v, self.type_class): + raise RuleError( + msg=u"Types do not match: '{0}' --> '{1}'".format(v, self.type_class), + error_key=u"default.type.unmatch", + path=path, + ) + + def check_type_keywords(self, schema, rule, path): + """ + All supported keywords: + - allowempty_map + - assertion + - class + - date + - default + - desc + - enum + - example + - extensions + - func + - ident + - include_name + - map_regex_rule + - mapping + - matching + - matching_rule + - name + - nullable + - pattern + - pattern_regexp + - range + - regex_mappings + - required + - schema + - sequence + - type + - type_class + - unique + - version + """ + if not self.strict_rule_validation: + return + + global_keywords = ['type', 'desc', 'example', 'extensions', 'name', 'nullable', 'version', 'func', 'include'] + all_allowed_keywords = { + 'str': global_keywords + ['default', 'pattern', 'range', 'enum', 'required', 'unique', 'req'], + 'int': global_keywords + ['default', 'range', 'enum', 'required', 'unique'], + 'float': global_keywords + ['default', 'enum', 'range', 'required'], + 'number': global_keywords + ['default', 'enum'], + 'bool': global_keywords + ['default', 'enum'], + 'map': global_keywords + ['allowempty_map', 'mapping', 'map', 'allowempty', 'required', 'matching-rule', 'range', 'class'], + 'seq': global_keywords + ['sequence', 'seq', 'required', 'range', 'matching'], + 'sequence': global_keywords + ['sequence', 'seq', 'required'], + 'mapping': global_keywords + ['mapping', 'seq', 'required'], + 'timestamp': global_keywords + ['default', 'enum'], + 'date': global_keywords + ['default', 'enum'], + 'symbol': global_keywords + ['default', 'enum'], + 'scalar': global_keywords + ['default', 'enum'], + 'text': global_keywords + ['default', 'enum', 'pattern'], + 'any': global_keywords + ['default', 'enum'], + 'enum': global_keywords + ['default', 'enum'], + 'none': global_keywords + ['default', 'enum', 'required'], + } + rule_type = schema.get('type') + if not rule_type: + # Special cases for the "shortcut methods" + if 'sequence' in schema or 'seq' in schema: + rule_type = 'sequence' + elif 'mapping' in schema or 'map' in schema: + rule_type = 'mapping' + + allowed_keywords = all_allowed_keywords.get(rule_type) + if not allowed_keywords and 'sequence' not in schema and 'mapping' not in schema and 'seq' not in schema and 'map' not in schema: + raise RuleError('No allowed keywords found for type: {0}'.format(rule_type)) + + for k, v in schema.items(): + if k not in allowed_keywords: + raise RuleError('Keyword "{0}" is not supported for type: "{1}" '.format(k, rule_type)) + + def check_conflicts(self, schema, rule, path): + """ + """ + log.debug(u"Checking for conflicts : %s", path) + + if self.type == "seq": + if all(sa not in schema for sa in sequence_aliases): + raise SchemaConflict( + msg="Type is sequence but no sequence alias found on same level", + error_key=u"seq.no_sequence", + path=path, + ) + + if self.enum is not None: + raise SchemaConflict( + msg="Sequence and enum can't be on the same level in the schema", + error_key=u"seq.conflict.enum", + path=path, + ) + + if self.pattern is not None: + raise SchemaConflict( + msg="Sequence and pattern can't be on the same level in the schema", + error_key=u"seq.conflict.pattern", + path=path, + ) + + if self.mapping is not None: + raise SchemaConflict( + msg="Sequence and mapping can't be on the same level in the schema", + error_key=u"seq.conflict.mapping", + path=path, + ) + elif self.type == "map": + if all(ma not in schema for ma in mapping_aliases) and not self.allowempty_map: + raise SchemaConflict( + msg="Type is mapping but no mapping alias found on same level", + error_key=u"map.no_mapping", + path=path, + ) + + if self.enum is not None: + raise SchemaConflict( + msg="Mapping and enum can't be on the same level in the schema", + error_key=u"map.conflict.enum", + path=path, + ) + + if self.sequence is not None: + raise SchemaConflict( + msg="Mapping and sequence can't be on the same level in the schema", + error_key=u"map.conflict.sequence", + path=path, + ) + else: + if self.sequence is not None: + raise SchemaConflict( + msg="Scalar and sequence can't be on the same level in the schema", + error_key=u"scalar.conflict.sequence", + path=path, + ) + + if self.mapping is not None: + raise SchemaConflict( + msg="Scalar and mapping can't be on the same level in the schema", + error_key=u"scalar.conflict.mapping", + path=path, + ) + + if self.enum is not None and self.range is not None: + raise SchemaConflict( + msg="Enum and range can't be on the same level in the schema", + error_key=u"enum.conflict.range", + path=path, + ) diff --git a/pykwalify/types.py b/pykwalify/types.py new file mode 100644 index 0000000..beb33ad --- /dev/null +++ b/pykwalify/types.py @@ -0,0 +1,160 @@ +# -*- coding: utf-8 -*- + +""" pyKwalify - types.py """ + +# python stdlib +import datetime +from pykwalify.compat import basestring, bytes + +DEFAULT_TYPE = "str" + + +class TextMeta(type): + def __instancecheck__(self, instance): + return is_text(instance) + + +class text(object): + __metaclass__ = TextMeta + + +_types = { + "str": str, + "int": int, + "float": float, + "number": None, + "bool": bool, + "map": dict, + "seq": list, + "timestamp": datetime.datetime, + "date": datetime.date, + "symbol": str, + "scalar": None, + "text": text, + "any": object, + "enum": str, + "none": None +} + + +sequence_aliases = ["sequence", "seq"] +mapping_aliases = ["map", "mapping"] + + +def type_class(type): + return _types[type] + + +def is_builtin_type(type): + return type in _types + + +def is_collection_type(type): + return type.lower().strip() == "map" or type.lower().strip() == "seq" + + +def is_scalar_type(type): + return not is_collection_type(type) + + +def is_collection(obj): + return isinstance(obj, dict) or isinstance(obj, list) + + +def is_scalar(obj): + return not is_collection(obj) and obj is not None + + +def is_correct_type(obj, type): + return isinstance(obj, type) + + +def is_string(obj): + return isinstance(obj, basestring) or isinstance(obj, bytes) + + +def is_int(obj): + """ + True & False is not considered valid integers even if python considers them 1 & 0 in some versions + """ + return isinstance(obj, int) and not isinstance(obj, bool) + + +def is_bool(obj): + return isinstance(obj, bool) + + +def is_float(obj): + """ + Valid types are: + - objects of float type + - Strings that can be converted to float. For example '1e-06' + """ + is_f = isinstance(obj, float) + if not is_f: + try: + float(obj) + is_f = True + except (ValueError, TypeError): + is_f = False + return is_f and not is_bool(obj) + + +def is_number(obj): + return is_int(obj) or is_float(obj) + + +def is_text(obj): + return (is_string(obj) or is_number(obj)) and is_bool(obj) is False + + +def is_any(obj): + return True + + +def is_enum(obj): + return isinstance(obj, basestring) + + +def is_none(obj): + return obj is None + + +def is_sequence_alias(alias): + return alias in sequence_aliases + + +def is_mapping_alias(alias): + return alias in mapping_aliases + + +def is_timestamp(obj): + """ + Yaml either have automatically converted it to a datetime object + or it is a string that will be validated later. + """ + return isinstance(obj, datetime.datetime) or is_string(obj) or is_int(obj) or is_float(obj) + + +def is_date(obj): + """ + :param obj: Object that is to be validated + :return: True/False if obj is valid date object + """ + return isinstance(obj, basestring) or isinstance(obj, datetime.date) + + +tt = { + "str": is_string, + "int": is_int, + "bool": is_bool, + "float": is_float, + "number": is_number, + "text": is_text, + "any": is_any, + "enum": is_enum, + "none": is_none, + "timestamp": is_timestamp, + "scalar": is_scalar, + "date": is_date, +} |