summaryrefslogtreecommitdiff
path: root/pykwalify
diff options
context:
space:
mode:
authorJohan Fleury <jfleury@arcaik.net>2018-10-08 12:58:12 -0400
committerJohan Fleury <jfleury@arcaik.net>2018-10-08 12:58:12 -0400
commit735c78d3ecb695dd16cb37879880f522c4b29c72 (patch)
tree8bfb2ecbd0f03730efa1540cc65d8490b67e8ddd /pykwalify
Import upstream version 1.7.0
Diffstat (limited to 'pykwalify')
-rw-r--r--pykwalify/__init__.py57
-rw-r--r--pykwalify/cli.py95
-rw-r--r--pykwalify/compat.py56
-rw-r--r--pykwalify/core.py978
-rw-r--r--pykwalify/errors.py237
-rw-r--r--pykwalify/rule.py1358
-rw-r--r--pykwalify/types.py160
7 files changed, 2941 insertions, 0 deletions
diff --git a/pykwalify/__init__.py b/pykwalify/__init__.py
new file mode 100644
index 0000000..998125f
--- /dev/null
+++ b/pykwalify/__init__.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+
+""" pykwalify """
+
+# python stdlib
+import logging
+import logging.config
+import os
+
+__author__ = 'Grokzen <Grokzen@gmail.com>'
+__version_info__ = (1, 7, 0)
+__version__ = '.'.join(map(str, __version_info__))
+
+
+log_level_to_string_map = {
+ 5: "DEBUG",
+ 4: "INFO",
+ 3: "WARNING",
+ 2: "ERROR",
+ 1: "CRITICAL",
+ 0: "INFO"
+}
+
+
+def init_logging(log_level):
+ """
+ Init logging settings with default set to INFO
+ """
+ log_level = log_level_to_string_map[min(log_level, 5)]
+
+ msg = "%(levelname)s - %(name)s:%(lineno)s - %(message)s" if log_level in os.environ else "%(levelname)s - %(message)s"
+
+ logging_conf = {
+ "version": 1,
+ "root": {
+ "level": log_level,
+ "handlers": ["console"]
+ },
+ "handlers": {
+ "console": {
+ "class": "logging.StreamHandler",
+ "level": log_level,
+ "formatter": "simple",
+ "stream": "ext://sys.stdout"
+ }
+ },
+ "formatters": {
+ "simple": {
+ "format": " {0}".format(msg)
+ }
+ }
+ }
+
+ logging.config.dictConfig(logging_conf)
+
+
+partial_schemas = {}
diff --git a/pykwalify/cli.py b/pykwalify/cli.py
new file mode 100644
index 0000000..e908033
--- /dev/null
+++ b/pykwalify/cli.py
@@ -0,0 +1,95 @@
+# -*- coding: utf-8 -*-
+
+""" pyKwalify - cli.py """
+
+# python std lib
+import logging
+import logging.config
+import sys
+
+# 3rd party imports
+from docopt import docopt
+
+
+def parse_cli():
+ """
+ The outline of this function needs to be like this:
+
+ 1. parse arguments
+ 2. validate arguments only, dont go into other logic/code
+ 3. run application logic
+ """
+
+ #
+ # 1. parse cli arguments
+ #
+
+ __docopt__ = """
+usage: pykwalify -d FILE -s FILE ... [-e FILE ...]
+ [--strict-rule-validation] [--fix-ruby-style-regex] [--allow-assertions] [-v ...] [-q]
+
+optional arguments:
+ -d FILE, --data-file FILE the file to be tested
+ -e FILE, --extension FILE file containing python extension
+ -s FILE, --schema-file FILE schema definition file
+ --fix-ruby-style-regex This flag fixes some of the quirks of ruby style regex
+ that is not compatible with python style regex
+ --strict-rule-validation enables strict validation of all keywords for all
+ Rule objects to find unsupported keyword usage
+ --allow-assertions By default assertions is disabled due to security risk.
+ Error will be raised if assertion is used in schema
+ but this flag is not used. This option enables assert keyword.
+ -h, --help show this help message and exit
+ -q, --quiet suppress terminal output
+ -v, --verbose verbose terminal output (multiple -v increases verbosity)
+ --version display the version number and exit
+"""
+
+ # Import pykwalify package
+ import pykwalify
+
+ args = docopt(__docopt__, version=pykwalify.__version__)
+
+ pykwalify.init_logging(1 if args["--quiet"] else args["--verbose"])
+ log = logging.getLogger(__name__)
+
+ #
+ # 2. validate arguments only, dont go into other code/logic
+ #
+
+ log.debug("Setting verbose level: %s", args["--verbose"])
+ log.debug("Arguments from CLI: %s", args)
+
+ return args
+
+
+def run(cli_args):
+ """
+ Split the functionality into 2 methods.
+
+ One for parsing the cli and one that runs the application.
+ """
+ from .core import Core
+
+ c = Core(
+ source_file=cli_args["--data-file"],
+ schema_files=cli_args["--schema-file"],
+ extensions=cli_args['--extension'],
+ strict_rule_validation=cli_args['--strict-rule-validation'],
+ fix_ruby_style_regex=cli_args['--fix-ruby-style-regex'],
+ allow_assertions=cli_args['--allow-assertions'],
+ )
+ c.validate()
+ return c
+
+
+def cli_entrypoint():
+ """
+ Main entrypoint for script. Used by setup.py to automatically
+ create a cli script
+ """
+ # Check minimum version of Python
+ if sys.version_info < (2, 7, 0):
+ sys.stderr.write(u"WARNING: pykwalify: It is recommended to run pykwalify on python version 2.7.x or later...\n\n")
+
+ run(parse_cli())
diff --git a/pykwalify/compat.py b/pykwalify/compat.py
new file mode 100644
index 0000000..fc2df0a
--- /dev/null
+++ b/pykwalify/compat.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+
+# python stdlib
+import sys
+import logging
+
+
+log = logging.getLogger(__name__)
+
+
+try:
+ from ruamel import yaml
+except ImportError:
+ try:
+ import yaml
+ except ImportError:
+ log.critical("Unable to import either ruamel.yaml or pyyaml")
+ sys.exit(1)
+
+log.debug("Using yaml library: {0}".format(yaml.__file__))
+
+
+if sys.version_info[0] < 3:
+ # Python 2.x.x series
+ basestring = basestring # NOQA: F821
+ unicode = unicode # NOQA: F821
+ bytes = str # NOQA: F821
+
+ def u(x):
+ """ """
+ return x.decode()
+
+ def b(x):
+ """ """
+ return x
+
+ def nativestr(x):
+ """ """
+ return x if isinstance(x, str) else x.encode('utf-8', 'replace')
+else:
+ # Python 3.x.x series
+ basestring = str # NOQA: F821
+ unicode = str # NOQA: F821
+ bytes = bytes # NOQA: F821
+
+ def u(x):
+ """ """
+ return x
+
+ def b(x):
+ """ """
+ return x.encode('latin-1') if not isinstance(x, bytes) else x
+
+ def nativestr(x):
+ """ """
+ return x if isinstance(x, str) else x.decode('utf-8', 'replace')
diff --git a/pykwalify/core.py b/pykwalify/core.py
new file mode 100644
index 0000000..e94ac80
--- /dev/null
+++ b/pykwalify/core.py
@@ -0,0 +1,978 @@
+# -*- coding: utf-8 -*-
+
+""" pyKwalify - core.py """
+
+# python std lib
+import datetime
+import imp
+import json
+import logging
+import os
+import re
+import sys
+import traceback
+import time
+
+# pyKwalify imports
+import pykwalify
+from pykwalify.compat import unicode, nativestr, basestring
+from pykwalify.errors import CoreError, SchemaError, NotMappingError, NotSequenceError
+from pykwalify.rule import Rule
+from pykwalify.types import is_scalar, is_string, tt
+
+# 3rd party imports
+from pykwalify.compat import yaml
+from dateutil.parser import parse
+
+log = logging.getLogger(__name__)
+
+
+class Core(object):
+ """ Core class of pyKwalify """
+
+ def __init__(self, source_file=None, schema_files=None, source_data=None, schema_data=None, extensions=None, strict_rule_validation=False,
+ fix_ruby_style_regex=False, allow_assertions=False,):
+ """
+ :param extensions:
+ List of paths to python files that should be imported and available via 'func' keywork.
+ This list of extensions can be set manually or they should be provided by the `--extension`
+ flag from the cli. This list should not contain files specified by the `extensions` list keyword
+ that can be defined at the top level of the schema.
+ """
+ if schema_files is None:
+ schema_files = []
+ if extensions is None:
+ extensions = []
+
+ log.debug(u"source_file: %s", source_file)
+ log.debug(u"schema_file: %s", schema_files)
+ log.debug(u"source_data: %s", source_data)
+ log.debug(u"schema_data: %s", schema_data)
+ log.debug(u"extension files: %s", extensions)
+
+ self.source = None
+ self.schema = None
+ self.validation_errors = None
+ self.validation_errors_exceptions = None
+ self.root_rule = None
+ self.extensions = extensions
+ self.errors = []
+ self.strict_rule_validation = strict_rule_validation
+ self.fix_ruby_style_regex = fix_ruby_style_regex
+ self.allow_assertions = allow_assertions
+
+ if source_file is not None:
+ if not os.path.exists(source_file):
+ raise CoreError(u"Provided source_file do not exists on disk: {0}".format(source_file))
+
+ with open(source_file, "r") as stream:
+ if source_file.endswith(".json"):
+ self.source = json.load(stream)
+ elif source_file.endswith(".yaml") or source_file.endswith('.yml'):
+ self.source = yaml.safe_load(stream)
+ else:
+ raise CoreError(u"Unable to load source_file. Unknown file format of specified file path: {0}".format(source_file))
+
+ if not isinstance(schema_files, list):
+ raise CoreError(u"schema_files must be of list type")
+
+ # Merge all schema files into one single file for easy parsing
+ if len(schema_files) > 0:
+ schema_data = {}
+ for f in schema_files:
+ if not os.path.exists(f):
+ raise CoreError(u"Provided source_file do not exists on disk : {0}".format(f))
+
+ with open(f, "r") as stream:
+ if f.endswith(".json"):
+ data = json.load(stream)
+ elif f.endswith(".yaml") or f.endswith(".yml"):
+ data = yaml.safe_load(stream)
+ if not data:
+ raise CoreError(u"No data loaded from file : {0}".format(f))
+ else:
+ raise CoreError(u"Unable to load file : {0} : Unknown file format. Supported file endings is [.json, .yaml, .yml]")
+
+ for key in data.keys():
+ if key in schema_data.keys():
+ raise CoreError(u"Parsed key : {0} : two times in schema files...".format(key))
+
+ schema_data = dict(schema_data, **data)
+
+ self.schema = schema_data
+
+ # Nothing was loaded so try the source_data variable
+ if self.source is None:
+ log.debug(u"No source file loaded, trying source data variable")
+ self.source = source_data
+ if self.schema is None:
+ log.debug(u"No schema file loaded, trying schema data variable")
+ self.schema = schema_data
+
+ # Test if anything was loaded
+ if self.source is None:
+ raise CoreError(u"No source file/data was loaded")
+ if self.schema is None:
+ raise CoreError(u"No schema file/data was loaded")
+
+ # Merge any extensions defined in the schema with the provided list of extensions from the cli
+ for f in self.schema.get('extensions', []):
+ self.extensions.append(f)
+
+ if not isinstance(self.extensions, list) and all(isinstance(e, str) for e in self.extensions):
+ raise CoreError(u"Specified extensions must be a list of file paths")
+
+ self._load_extensions()
+
+ if self.strict_rule_validation:
+ log.info("Using strict rule keywords validation...")
+
+ def _load_extensions(self):
+ """
+ Load all extension files into the namespace pykwalify.ext
+ """
+ log.debug(u"loading all extensions : %s", self.extensions)
+
+ self.loaded_extensions = []
+
+ for f in self.extensions:
+ if not os.path.isabs(f):
+ f = os.path.abspath(f)
+
+ if not os.path.exists(f):
+ raise CoreError(u"Extension file: {0} not found on disk".format(f))
+
+ self.loaded_extensions.append(imp.load_source("", f))
+
+ log.debug(self.loaded_extensions)
+ log.debug([dir(m) for m in self.loaded_extensions])
+
+ def validate(self, raise_exception=True):
+ """
+ """
+ log.debug(u"starting core")
+
+ self._start_validate(self.source)
+ self.validation_errors = [unicode(error) for error in self.errors]
+ self.validation_errors_exceptions = self.errors
+
+ if self.errors is None or len(self.errors) == 0:
+ log.info(u"validation.valid")
+ else:
+ log.error(u"validation.invalid")
+ log.error(u" --- All found errors ---")
+ log.error(self.validation_errors)
+ if raise_exception:
+ raise SchemaError(u"Schema validation failed:\n - {error_msg}.".format(
+ error_msg=u'.\n - '.join(self.validation_errors)))
+ else:
+ log.error(u"Errors found but will not raise exception...")
+
+ # Return validated data
+ return self.source
+
+ def _start_validate(self, value=None):
+ """
+ """
+ path = ""
+ self.errors = []
+ done = []
+
+ s = {}
+
+ # Look for schema; tags so they can be parsed before the root rule is parsed
+ for k, v in self.schema.items():
+ if k.startswith("schema;"):
+ log.debug(u"Found partial schema; : %s", v)
+ r = Rule(schema=v)
+ log.debug(u" Partial schema : %s", r)
+ pykwalify.partial_schemas[k.split(";", 1)[1]] = r
+ else:
+ # readd all items that is not schema; so they can be parsed
+ s[k] = v
+
+ self.schema = s
+
+ log.debug(u"Building root rule object")
+ root_rule = Rule(schema=self.schema)
+ self.root_rule = root_rule
+ log.debug(u"Done building root rule")
+ log.debug(u"Root rule: %s", self.root_rule)
+
+ self._validate(value, root_rule, path, done)
+
+ def _validate(self, value, rule, path, done):
+ """
+ """
+ log.debug(u"Core validate")
+ log.debug(u" Root validate : Rule: %s", rule)
+ log.debug(u" Root validate : Rule_type: %s", rule.type)
+ log.debug(u" Root validate : Seq: %s", rule.sequence)
+ log.debug(u" Root validate : Map: %s", rule.mapping)
+ log.debug(u" Root validate : Done: %s", done)
+
+ if rule.required and value is None and not rule.type == 'none':
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"required.novalue : '{path}'",
+ path=path,
+ value=value.encode('unicode_escape') if value else value,
+ ))
+ return
+
+ if not rule.nullable and value is None and not rule.type == 'none':
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"nullable.novalue : '{path}'",
+ path=path,
+ value=value.encode('unicode_escape') if value else value,
+ ))
+ return
+
+ log.debug(u" ? ValidateRule: %s", rule)
+ if rule.include_name is not None:
+ self._validate_include(value, rule, path, done=None)
+ elif rule.sequence is not None:
+ self._validate_sequence(value, rule, path, done=None)
+ elif rule.mapping is not None or rule.allowempty_map:
+ self._validate_mapping(value, rule, path, done=None)
+ else:
+ self._validate_scalar(value, rule, path, done=None)
+
+ def _handle_func(self, value, rule, path, done=None):
+ """
+ Helper function that should check if func is specified for this rule and
+ then handle it for all cases in a generic way.
+ """
+ func = rule.func
+
+ # func keyword is not defined so nothing to do
+ if not func:
+ return
+
+ found_method = False
+
+ for extension in self.loaded_extensions:
+ method = getattr(extension, func, None)
+ if method:
+ found_method = True
+
+ # No exception will should be caught. If one is raised it should bubble up all the way.
+ ret = method(value, rule, path)
+
+ # If False or None or some other object that is interpreted as False
+ if not ret:
+ raise CoreError(u"Error when running extension function : {0}".format(func))
+
+ # Only run the first matched function. Sinc loading order is determined
+ # it should be easy to determine which file is used before others
+ break
+
+ if not found_method:
+ raise CoreError(u"Did not find method '{0}' in any loaded extension file".format(func))
+
+ def _validate_include(self, value, rule, path, done=None):
+ """
+ """
+ # TODO: It is difficult to get a good test case to trigger this if case
+ if rule.include_name is None:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u'Include name not valid',
+ path=path,
+ value=value.encode('unicode_escape')))
+ return
+ include_name = rule.include_name
+ partial_schema_rule = pykwalify.partial_schemas.get(include_name)
+ if not partial_schema_rule:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Cannot find partial schema with name '{include_name}'. Existing partial schemas: '{existing_schemas}'. Path: '{path}'",
+ path=path,
+ value=value,
+ include_name=include_name,
+ existing_schemas=", ".join(sorted(pykwalify.partial_schemas.keys()))))
+ return
+
+ self._validate(value, partial_schema_rule, path, done)
+
+ def _validate_sequence(self, value, rule, path, done=None):
+ """
+ """
+ log.debug(u"Core Validate sequence")
+ log.debug(u" Sequence : Data: %s", value)
+ log.debug(u" Sequence : Rule: %s", rule)
+ log.debug(u" Sequence : RuleType: %s", rule.type)
+ log.debug(u" Sequence : Path: %s", path)
+ log.debug(u" Sequence : Seq: %s", rule.sequence)
+ log.debug(u" Sequence : Map: %s", rule.mapping)
+
+ if len(rule.sequence) <= 0:
+ raise CoreError(u"Sequence must contains atleast one item : {0}".format(path))
+
+ if value is None:
+ log.debug(u" * Core seq: sequence data is None")
+ return
+
+ if not isinstance(value, list):
+ if isinstance(value, str):
+ value = value.encode('unicode_escape')
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ u"Value '{value}' is not a list. Value path: '{path}'",
+ path,
+ value,
+ ))
+ return
+
+ # Handle 'func' argument on this sequence
+ self._handle_func(value, rule, path, done)
+
+ ok_values = []
+ error_tracker = []
+
+ unique_errors = {}
+ map_unique_errors = {}
+
+ for i, item in enumerate(value):
+ processed = []
+
+ for r in rule.sequence:
+ tmp_errors = []
+
+ try:
+ # Create a sub core object to enable error tracking that do not
+ # collide with this Core objects errors
+ tmp_core = Core(source_data={}, schema_data={})
+ tmp_core.fix_ruby_style_regex = self.fix_ruby_style_regex
+ tmp_core.allow_assertions = self.allow_assertions
+ tmp_core.strict_rule_validation = self.strict_rule_validation
+ tmp_core.loaded_extensions = self.loaded_extensions
+ tmp_core._validate(item, r, "{0}/{1}".format(path, i), done)
+ tmp_errors = tmp_core.errors
+ except NotMappingError:
+ # For example: If one type was specified as 'map' but data
+ # was 'str' a exception will be thrown but we should ignore it
+ pass
+ except NotSequenceError:
+ # For example: If one type was specified as 'seq' but data
+ # was 'str' a exception will be thrown but we shold ignore it
+ pass
+
+ processed.append(tmp_errors)
+
+ if r.type == "map":
+ log.debug(u" * Found map inside sequence")
+ unique_keys = []
+
+ if r.mapping is None:
+ log.debug(u" + No rule to apply, prolly because of allowempty: True")
+ return
+
+ for k, _rule in r.mapping.items():
+ log.debug(u" * Key: %s", k)
+ log.debug(u" * Rule: %s", _rule)
+
+ if _rule.unique or _rule.ident:
+ unique_keys.append(k)
+
+ if len(unique_keys) > 0:
+ for v in unique_keys:
+ table = {}
+ for j, V in enumerate(value):
+ # If key do not exists it should be ignored by unique because that is not a broken constraint
+ val = V.get(v, None)
+
+ if val is None:
+ continue
+
+ if val in table:
+ curr_path = "{0}/{1}/{2}".format(path, j, v)
+ prev_path = "{0}/{1}/{2}".format(path, table[val], v)
+ s = SchemaError.SchemaErrorEntry(
+ msg=u"Value '{duplicate}' is not unique. Previous path: '{prev_path}'. Path: '{path}'",
+ path=curr_path,
+ value=value,
+ duplicate=val,
+ prev_path=prev_path,
+ )
+ map_unique_errors[s.__repr__()] = s
+ else:
+ table[val] = j
+ elif r.unique:
+ log.debug(u" * Found unique value in sequence")
+ table = {}
+
+ for j, val in enumerate(value):
+ if val is None:
+ continue
+
+ if val in table:
+ curr_path = "{0}/{1}".format(path, j)
+ prev_path = "{0}/{1}".format(path, table[val])
+ s = SchemaError.SchemaErrorEntry(
+ msg=u"Value '{duplicate}' is not unique. Previous path: '{prev_path}'. Path: '{path}'",
+ path=curr_path,
+ value=value,
+ duplicate=val,
+ prev_path=prev_path,
+ )
+ unique_errors[s.__repr__()] = s
+ else:
+ table[val] = j
+
+ error_tracker.append(processed)
+ no_errors = []
+ for _errors in processed:
+ no_errors.append(len(_errors) == 0)
+
+ if rule.matching == "any":
+ log.debug(u" * any rule %s", True in no_errors)
+ ok_values.append(True in no_errors)
+ elif rule.matching == "all":
+ log.debug(u" * all rule".format(all(no_errors)))
+ ok_values.append(all(no_errors))
+ elif rule.matching == "*":
+ log.debug(u" * star rule", "...")
+ ok_values.append(True)
+
+ for _error in unique_errors:
+ self.errors.append(_error)
+
+ for _error in map_unique_errors:
+ self.errors.append(_error)
+
+ log.debug(u" * ok : %s", ok_values)
+
+ # All values must pass the validation, otherwise add the parsed errors
+ # to the global error list and throw up some error.
+ if not all(ok_values):
+ # Ignore checking for '*' type because it should allways go through
+ if rule.matching == "any":
+ log.debug(u" * Value: %s did not validate against one or more sequence schemas", value)
+ elif rule.matching == "all":
+ log.debug(u" * Value: %s did not validate against all possible sequence schemas", value)
+
+ for i, is_ok in enumerate(ok_values):
+ if not is_ok:
+ for error in error_tracker[i]:
+ for e in error:
+ self.errors.append(e)
+
+ log.debug(u" * Core seq: validation recursivley done...")
+
+ if rule.range is not None:
+ rr = rule.range
+
+ self._validate_range(
+ rr.get("max"),
+ rr.get("min"),
+ rr.get("max-ex"),
+ rr.get("min-ex"),
+ len(value),
+ path,
+ "seq",
+ )
+
+ def _validate_mapping(self, value, rule, path, done=None):
+ """
+ """
+ log.debug(u"Validate mapping")
+ log.debug(u" Mapping : Data: %s", value)
+ log.debug(u" Mapping : Rule: %s", rule)
+ log.debug(u" Mapping : RuleType: %s", rule.type)
+ log.debug(u" Mapping : Path: %s", path)
+ log.debug(u" Mapping : Seq: %s", rule.sequence)
+ log.debug(u" Mapping : Map: %s", rule.mapping)
+
+ if not isinstance(value, dict):
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ u"Value '{value}' is not a dict. Value path: '{path}'",
+ path,
+ value,
+ ))
+ return
+
+ if rule.mapping is None:
+ log.debug(u" + No rule to apply, prolly because of allowempty: True")
+ return
+
+ # Handle 'func' argument on this mapping
+ self._handle_func(value, rule, path, done)
+
+ m = rule.mapping
+ log.debug(u" Mapping: Rule-Mapping: %s", m)
+
+ if rule.range is not None:
+ r = rule.range
+
+ self._validate_range(
+ r.get("max"),
+ r.get("min"),
+ r.get("max-ex"),
+ r.get("min-ex"),
+ len(value),
+ path,
+ "map",
+ )
+
+ for k, rr in m.items():
+ # Handle if the value of the key contains a include keyword
+ if rr.include_name is not None:
+ include_name = rr.include_name
+ partial_schema_rule = pykwalify.partial_schemas.get(include_name)
+
+ if not partial_schema_rule:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Cannot find partial schema with name '{include_name}'. Existing partial schemas: '{existing_schemas}'. Path: '{path}'",
+ path=path,
+ value=value,
+ include_name=include_name,
+ existing_schemas=", ".join(sorted(pykwalify.partial_schemas.keys()))))
+ return
+
+ rr = partial_schema_rule
+
+ # Find out if this is a regex rule
+ is_regex_rule = False
+ required_regex = ""
+ for regex_rule in rule.regex_mappings:
+ if k == "regex;({})".format(regex_rule.map_regex_rule) or k == "re;({})".format(regex_rule.map_regex_rule):
+ is_regex_rule = True
+ required_regex = regex_rule.map_regex_rule
+
+ # Check for the presense of the required key
+ is_present = False
+ if not is_regex_rule:
+ is_present = k in value
+ else:
+ is_present = any([re.search(required_regex, v) for v in value])
+
+ # Specifying =: as key is considered the "default" if no other keys match
+ if rr.required and not is_present and k != "=":
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Cannot find required key '{key}'. Path: '{path}'",
+ path=path,
+ value=value,
+ key=k))
+ if k not in value and rr.default is not None:
+ value[k] = rr.default
+
+ for k, v in value.items():
+ # If no other case was a match, check if a default mapping is valid/present and use
+ # that one instead
+ r = m.get(k, m.get('='))
+ log.debug(u" Mapping-value : %s", m)
+ log.debug(u" Mapping-value : %s %s", k, v)
+ log.debug(u" Mapping-value : %s", r)
+
+ regex_mappings = [(regex_rule, re.search(regex_rule.map_regex_rule, str(k))) for regex_rule in rule.regex_mappings]
+ log.debug(u" Mapping-value: Mapping Regex matches: %s", regex_mappings)
+
+ if r is not None:
+ # validate recursively
+ log.debug(u" Mapping-value: Core Map: validate recursively: %s", r)
+ self._validate(v, r, u"{0}/{1}".format(path, k), done)
+ elif any(regex_mappings):
+ sub_regex_result = []
+
+ # Found at least one that matches a mapping regex
+ for mm in regex_mappings:
+ if mm[1]:
+ log.debug(u" Mapping-value: Matching regex patter: %s", mm[0])
+ self._validate(v, mm[0], "{0}/{1}".format(path, k), done)
+ sub_regex_result.append(True)
+ else:
+ sub_regex_result.append(False)
+
+ if rule.matching_rule == "any":
+ if any(sub_regex_result):
+ log.debug(u" Mapping-value: Matched at least one regex")
+ else:
+ log.debug(u" Mapping-value: No regex matched")
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Key '{key}' does not match any regex '{regex}'. Path: '{path}'",
+ path=path,
+ value=value,
+ key=k,
+ regex="' or '".join(sorted([mm[0].map_regex_rule for mm in regex_mappings]))))
+ elif rule.matching_rule == "all":
+ if all(sub_regex_result):
+ log.debug(u" Mapping-value: Matched all regex rules")
+ else:
+ log.debug(u" Mapping-value: Did not match all regex rules")
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Key '{key}' does not match all regex '{regex}'. Path: '{path}'",
+ path=path,
+ value=value,
+ key=k,
+ regex="' and '".join(sorted([mm[0].map_regex_rule for mm in regex_mappings]))))
+ else:
+ log.debug(u" Mapping-value: No mapping rule defined")
+ else:
+ if not rule.allowempty_map:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Key '{key}' was not defined. Path: '{path}'",
+ path=path,
+ value=value,
+ key=k))
+
+ def _validate_scalar(self, value, rule, path, done=None):
+ """
+ """
+ log.debug(u"Validate scalar")
+ log.debug(u" Scalar : Value : %s", value)
+ log.debug(u" Scalar : Rule : %s", rule)
+ log.debug(u" Scalar : RuleType : %s", rule.type)
+ log.debug(u" Scalar : Path %s", path)
+
+ # Handle 'func' argument on this scalar
+ self._handle_func(value, rule, path, done)
+
+ if rule.assertion is not None:
+ self._validate_assert(rule, value, path)
+
+ if value is None:
+ return True
+
+ if rule.enum is not None and value not in rule.enum:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Enum '{value}' does not exist. Path: '{path}'",
+ path=path,
+ value=nativestr(value) if tt['str'](value) else value,
+ ))
+
+ # Set default value
+ if rule.default and value is None:
+ value = rule.default
+
+ if not self._validate_scalar_type(value, rule.type, path):
+ return
+
+ if value is None:
+ return
+
+ if rule.pattern is not None:
+ #
+ # Try to trim away the surrounding slashes around ruby style /<regex>/ if they are defined.
+ # This is a quirk from ruby that they define regex patterns with surrounding slashes.
+ # Docs on how ruby regex works can be found here: https://ruby-doc.org/core-2.4.0/Regexp.html
+ # The original ruby implementation uses this code to validate patterns
+ # unless value.to_s =~ rule.regexp
+ # Becuase python do not work with surrounding slashes we have to trim them away in order to make the regex work
+ #
+ if rule.pattern.startswith('/') and rule.pattern.endswith('/') and self.fix_ruby_style_regex:
+ rule.pattern = rule.pattern[1:-1]
+ log.debug("Trimming slashes around ruby style regex. New pattern value: '{0}'".format(rule.pattern))
+
+ try:
+ log.debug("Matching pattern '{0}' to regex '{1}".format(rule.pattern, value))
+ res = re.match(rule.pattern, value, re.UNICODE)
+ except TypeError:
+ res = None
+
+ if res is None: # Not matching
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Value '{value}' does not match pattern '{pattern}'. Path: '{path}'",
+ path=path,
+ value=nativestr(str(value)),
+ pattern=rule._pattern))
+ else:
+ log.debug("Pattern matched...")
+
+ if rule.range is not None:
+ if not is_scalar(value):
+ raise CoreError(u"value is not a valid scalar")
+
+ r = rule.range
+
+ try:
+ v = len(value)
+ value = v
+ except Exception:
+ pass
+
+ self._validate_range(
+ r.get("max"),
+ r.get("min"),
+ r.get("max-ex"),
+ r.get("min-ex"),
+ value,
+ path,
+ "scalar",
+ )
+
+ if rule.length is not None:
+ self._validate_length(
+ rule.length,
+ value,
+ path,
+ 'scalar',
+ )
+
+ # Validate timestamp
+ if rule.type == "timestamp":
+ self._validate_scalar_timestamp(value, path)
+
+ if rule.type == "date":
+ if not is_scalar(value):
+ raise CoreError(u'value is not a valid scalar')
+ date_format = rule.format
+ self._validate_scalar_date(value, date_format, path)
+
+ def _validate_scalar_timestamp(self, timestamp_value, path):
+ """
+ """
+ def _check_int_timestamp_boundaries(timestamp):
+ """
+ """
+ if timestamp < 1:
+ # Timestamp integers can't be negative
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Integer value of timestamp can't be below 0",
+ path=path,
+ value=timestamp,
+ timestamp=str(timestamp),
+ ))
+ if timestamp > 2147483647:
+ # Timestamp integers can't be above the upper limit of
+ # 32 bit integers
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Integer value of timestamp can't be above 2147483647",
+ path=path,
+ value=timestamp,
+ timestamp=str(timestamp),
+ ))
+
+ if isinstance(timestamp_value, (int, float)):
+ _check_int_timestamp_boundaries(timestamp_value)
+ elif isinstance(timestamp_value, datetime.datetime):
+ # Datetime objects currently have nothing to validate.
+ # In the future, more options will be added to datetime validation
+ pass
+ elif isinstance(timestamp_value, basestring):
+ v = timestamp_value.strip()
+
+ # parse("") will give a valid date but it should not be
+ # considered a valid timestamp
+ if v == "":
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Timestamp value is empty. Path: '{path}'",
+ path=path,
+ value=nativestr(timestamp_value),
+ timestamp=nativestr(timestamp_value)))
+ else:
+ # A string can contain a valid unit timestamp integer. Check if it is valid and validate it
+ try:
+ int_v = int(v)
+ _check_int_timestamp_boundaries(int_v)
+ except ValueError:
+ # Just continue to parse it as a timestamp
+ try:
+ parse(timestamp_value)
+ # If it can be parsed then it is valid
+ except Exception:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Timestamp: '{timestamp}'' is invalid. Path: '{path}'",
+ path=path,
+ value=nativestr(timestamp_value),
+ timestamp=nativestr(timestamp_value)))
+ else:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Not a valid timestamp",
+ path=path,
+ value=timestamp_value,
+ timestamp=timestamp_value,
+ ))
+
+ def _validate_scalar_date(self, date_value, date_formats, path):
+ log.debug(u"Validate date : %(value)s : %(format)s : %(path)s" % {
+ 'value': date_value,
+ 'format': date_formats,
+ 'path': path,
+ })
+
+ if isinstance(date_value, str):
+ # If a date_format is specefied then use strptime on all formats
+ # If no date_format is specefied then use dateutils.parse() to test the value
+ log.debug(date_formats)
+
+ if date_formats:
+ # Run through all date_formats and it is valid if atleast one of them passed time.strptime() parsing
+ valid = False
+ for date_format in date_formats:
+ try:
+ time.strptime(date_value, date_format)
+ valid = True
+ except ValueError:
+ pass
+
+ if not valid:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Not a valid date: {value} format: {format}. Path: '{path}'",
+ path=path,
+ value=date_value,
+ format=date_format,
+ ))
+ return
+ else:
+ try:
+ parse(date_value)
+ except ValueError:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Not a valid date: {value} Path: '{path}'",
+ path=path,
+ value=date_value,
+ ))
+ elif isinstance(date_value, (datetime.date, datetime.datetime)):
+ # If the object already is a datetime or date object it passes validation
+ pass
+ else:
+ # If value is any other type then raise error
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Not a valid date: {value} date must be a string or a datetime.date not a '{type}'",
+ path=path,
+ value=date_value,
+ type=type(date_value).__name__,
+ ))
+
+ def _validate_length(self, rule, value, path, prefix):
+ if not is_string(value):
+ raise CoreError("Value: '{0}' must be a 'str' type for length check to work".format(value))
+
+ value_length = len(str(value))
+ max_, min_, max_ex, min_ex = rule.get('max'), rule.get('min'), rule.get('max-ex'), rule.get('min-ex')
+
+ log.debug(
+ u"Validate length : %s : %s : %s : %s : %s : %s",
+ max, min, max_ex, min_ex, value, path,
+ )
+
+ if max_ is not None and max_ < value_length:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Value: '{value_str}' has length of '{value}', greater than max limit '{max_}'. Path: '{path}'",
+ value_str=value,
+ path=path,
+ value=len(value),
+ prefix=prefix,
+ max_=max_))
+
+ if min_ is not None and min_ > value_length:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Value: '{value_str}' has length of '{value}', greater than min limit '{min_}'. Path: '{path}'",
+ value_str=value,
+ path=path,
+ value=len(value),
+ prefix=prefix,
+ min_=min_))
+
+ if max_ex is not None and max_ex <= value_length:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Value: '{value_str}' has length of '{value}', greater than max_ex limit '{max_ex}'. Path: '{path}'",
+ value_str=value,
+ path=path,
+ value=len(value),
+ prefix=prefix,
+ max_ex=max_ex))
+
+ if min_ex is not None and min_ex >= value_length:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Value: '{value_str}' has length of '{value}', greater than min_ex limit '{min_ex}'. Path: '{path}'",
+ value_str=value,
+ path=path,
+ value=len(value),
+ prefix=prefix,
+ min_ex=min_ex))
+
+ def _validate_assert(self, rule, value, path):
+ if not self.allow_assertions:
+ raise CoreError('To allow usage of keyword "assert" you must use cli flag "--allow-assertions" or set the keyword "allow_assert" in Core class')
+
+ # Small hack to make strings work as a value.
+ if isinstance(value, str):
+ assert_value_str = '"{0}"'.format(value)
+ else:
+ assert_value_str = '{0}'.format(value)
+
+ assertion_string = "val = {0}; assert {1}".format(assert_value_str, rule.assertion)
+ try:
+ exec(assertion_string, {}, {})
+ except AssertionError:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Value: '{0}' assertion expression failed ({1})".format(value, rule.assertion),
+ path=path,
+ value=value,
+ ))
+ return
+ except Exception as err:
+ error_class = err.__class__.__name__
+ detail = err.args[0]
+ cl, exc, tb = sys.exc_info()
+ line_number = traceback.extract_tb(tb)[-1][1]
+ raise Exception("Unknown error during assertion\n{0}\n{1}\n{2}\n{3}\n{4}\n{5}".format(
+ error_class, detail, cl, exc, tb, line_number,
+ ))
+
+ def _validate_range(self, max_, min_, max_ex, min_ex, value, path, prefix):
+ """
+ Validate that value is within range values.
+ """
+ if not isinstance(value, int) and not isinstance(value, float):
+ raise CoreError("Value must be a integer type")
+
+ log.debug(
+ u"Validate range : %s : %s : %s : %s : %s : %s",
+ max_,
+ min_,
+ max_ex,
+ min_ex,
+ value,
+ path,
+ )
+
+ if max_ is not None and max_ < value:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Type '{prefix}' has size of '{value}', greater than max limit '{max_}'. Path: '{path}'",
+ path=path,
+ value=nativestr(value) if tt['str'](value) else value,
+ prefix=prefix,
+ max_=max_))
+
+ if min_ is not None and min_ > value:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Type '{prefix}' has size of '{value}', less than min limit '{min_}'. Path: '{path}'",
+ path=path,
+ value=nativestr(value) if tt['str'](value) else value,
+ prefix=prefix,
+ min_=min_))
+
+ if max_ex is not None and max_ex <= value:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Type '{prefix}' has size of '{value}', greater than or equals to max limit(exclusive) '{max_ex}'. Path: '{path}'",
+ path=path,
+ value=nativestr(value) if tt['str'](value) else value,
+ prefix=prefix,
+ max_ex=max_ex))
+
+ if min_ex is not None and min_ex >= value:
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Type '{prefix}' has size of '{value}', less than or equals to min limit(exclusive) '{min_ex}'. Path: '{path}'",
+ path=path,
+ value=nativestr(value) if tt['str'](value) else value,
+ prefix=prefix,
+ min_ex=min_ex))
+
+ def _validate_scalar_type(self, value, t, path):
+ """
+ """
+ log.debug(u" # Core scalar: validating scalar type : %s", t)
+ log.debug(u" # Core scalar: scalar type: %s", type(value))
+
+ try:
+ if not tt[t](value):
+ self.errors.append(SchemaError.SchemaErrorEntry(
+ msg=u"Value '{value}' is not of type '{scalar_type}'. Path: '{path}'",
+ path=path,
+ value=unicode(value) if tt['str'](value) else value,
+ scalar_type=t))
+ return False
+ return True
+ except KeyError as e:
+ # Type not found in valid types mapping
+ log.debug(e)
+ raise CoreError(u"Unknown type check: {0!s} : {1!s} : {2!s}".format(path, value, t))
diff --git a/pykwalify/errors.py b/pykwalify/errors.py
new file mode 100644
index 0000000..c9f9b54
--- /dev/null
+++ b/pykwalify/errors.py
@@ -0,0 +1,237 @@
+# -*- coding: utf-8 -*-
+
+""" pyKwalify - errors.py """
+
+# python stdlib
+from pykwalify.compat import basestring
+
+retcodes = {
+ # PyKwalifyExit
+ 0: 'noerror',
+
+ # UnknownError
+ 1: 'unknownerror',
+
+ # SchemaError
+ # e.g. when a rule or the core finds an error
+ 2: 'schemaerror',
+
+ # CoreError
+ # e.g. when the core finds an error that is not a SchemaError
+ 3: 'coreerror',
+
+ # RuleError
+ # e.g. when the rule class finds an error that is not a SchemaError, similar to CoreError
+ 4: 'ruleerror',
+
+ # SchemaConflict
+ # e.g. when a schema conflict occurs
+ 5: 'schemaconflict',
+
+ # NotMappingError
+ # e.g. when a value is not a mapping when it was expected it should be
+ 6: 'notmaperror',
+
+ # NotSequenceError
+ # e.g. when a value is not a sequence when it was expected it should be
+ 7: 'notsequenceerror',
+}
+
+
+retnames = dict((v, k) for (k, v) in retcodes.items())
+
+
+class PyKwalifyException(RuntimeError):
+ """
+ """
+
+ def __init__(self, msg=None, error_key=None, retcode=None, path=None):
+ """
+ Arguments:
+ - `msg`: a string
+ - `error_key`: a unique string that makes it easier to identify what error it is
+ - `retcode`: an integer, defined in PyKwalify.errors.retcodes
+ """
+ self.msg = msg or ""
+ self.retcode = retcode or retnames['unknownerror']
+ self.retname = retcodes[retcode]
+ self.error_key = error_key
+ self.path = path or "/"
+
+ def __str__(self):
+ """
+ """
+ # <PyKwalifyException msg='foo bar' retcode=1>
+ # kwargs = []
+ # if self.msg:
+ # kwargs.append("msg='{0}'".format(self.msg))
+ # if self.retcode != retnames['noerror']:
+ # kwargs.append("retcode=%d" % self.retcode)
+ # if kwargs:
+ # kwargs.insert(0, '')
+ # return "<{0}{1}>".format(self.__class__.__name__, ' '.join(kwargs))
+
+ # <PyKwalifyException: error code 1: foo bar>
+ kwargs = []
+ if self.retcode != retnames['noerror']:
+ kwargs.append("error code {0}".format(self.retcode))
+ if self.msg:
+ kwargs.append(self.msg)
+ if kwargs:
+ kwargs.insert(0, '')
+ if self.path:
+ kwargs.append("Path: '{0}'".format(self.path))
+ return "<{0}{1}>".format(self.__class__.__name__, ': '.join(kwargs))
+
+ def __repr__(self):
+ """
+ """
+ kwargs = []
+ if self.msg:
+ kwargs.append("msg='{0}'".format(self.msg))
+ return "{0}({1})".format(self.__class__.__name__, ', '.join(kwargs))
+
+ def msg():
+ doc = """ """
+
+ def fget(self):
+ return self._msg
+
+ def fset(self, value):
+ assert isinstance(value, basestring), "argument is not string"
+ self._msg = value
+
+ return locals()
+ msg = property(**msg())
+
+ def retcode():
+ doc = """ """
+
+ def fget(self):
+ return self._retcode
+
+ def fset(self, value):
+ assert isinstance(value, int), "argument is not integer"
+ self._retcode = value
+
+ return locals()
+ retcode = property(**retcode())
+
+ def retname():
+ doc = """ """
+
+ def fget(self):
+ return self._retname
+
+ def fset(self, value):
+ assert isinstance(value, str), "argument is not string"
+ self._retname = value
+
+ return locals()
+ retname = property(**retname())
+
+
+class UnknownError(PyKwalifyException):
+ """
+ """
+ def __init__(self, *args, **kwargs):
+ """
+ """
+ assert 'retcode' not in kwargs, "keyword retcode implicitly defined"
+ super(self.__class__, self).__init__(
+ retcode=retnames['unknownerror'],
+ *args, **kwargs
+ )
+
+
+class SchemaError(PyKwalifyException):
+ """
+ """
+ class SchemaErrorEntry(object):
+ """
+ """
+ def __init__(self, msg, path, value, **kwargs):
+ """
+ """
+ self.msg = msg
+ self.path = path
+ self.value = value
+ for key, value in kwargs.items():
+ self.__setattr__(key, value)
+
+ def __repr__(self):
+ return self.msg.format(**self.__dict__)
+
+ def __init__(self, *args, **kwargs):
+ """
+ """
+ assert "retcode" not in kwargs, "keyword retcode implicitly defined"
+ super(self.__class__, self).__init__(
+ retcode=retnames["schemaerror"],
+ *args, **kwargs
+ )
+
+
+class CoreError(PyKwalifyException):
+ """
+ """
+ def __init__(self, *args, **kwargs):
+ """
+ """
+ assert "retcode" not in kwargs, "keyword retcode implicitly defined"
+ super(self.__class__, self).__init__(
+ retcode=retnames["coreerror"],
+ *args, **kwargs
+ )
+
+
+class NotMappingError(PyKwalifyException):
+ """
+ """
+ def __init__(self, *args, **kwargs):
+ """
+ """
+ assert "retcode" not in kwargs, "keyword retcode implicitly defined"
+ super(self.__class__, self).__init__(
+ retcode=retnames['notmaperror'],
+ *args, **kwargs
+ )
+
+
+class NotSequenceError(PyKwalifyException):
+ """
+ """
+ def __init__(self, *args, **kwargs):
+ """
+ """
+ assert "retcode" not in kwargs, "keyword retcode implicitly defined"
+ super(self.__class__, self).__init__(
+ retcode=retnames['notsequenceerror'],
+ *args, **kwargs
+ )
+
+
+class RuleError(PyKwalifyException):
+ """
+ """
+ def __init__(self, *args, **kwargs):
+ """
+ """
+ assert "retcode" not in kwargs, "keyword retcode implicitly defined"
+ super(self.__class__, self).__init__(
+ retcode=retnames["ruleerror"],
+ *args, **kwargs
+ )
+
+
+class SchemaConflict(PyKwalifyException):
+ """
+ """
+ def __init__(self, *args, **kwargs):
+ """
+ """
+ assert "retcode" not in kwargs, "keyword retcode implicitly defined"
+ super(self.__class__, self).__init__(
+ retcode=retnames["schemaconflict"],
+ *args, **kwargs
+ )
diff --git a/pykwalify/rule.py b/pykwalify/rule.py
new file mode 100644
index 0000000..7ac2c9e
--- /dev/null
+++ b/pykwalify/rule.py
@@ -0,0 +1,1358 @@
+# -*- coding: utf-8 -*-
+
+""" pyKwalify - rule.py """
+
+# python stdlib
+import logging
+import re
+
+# pykwalify imports
+from pykwalify.compat import basestring
+from pykwalify.errors import SchemaConflict, RuleError
+from pykwalify.types import (
+ DEFAULT_TYPE,
+ is_bool,
+ is_builtin_type,
+ is_collection_type,
+ is_number,
+ is_string,
+ mapping_aliases,
+ sequence_aliases,
+ type_class,
+)
+
+log = logging.getLogger(__name__)
+
+
+class Rule(object):
+ """ Rule class that handles a rule constraint """
+
+ def __init__(self, schema=None, parent=None, strict_rule_validation=False):
+ self._allowempty_map = None
+ self._assertion = None
+ self._default = None
+ self._desc = None
+ self._enum = None
+ self._example = None
+ self._extensions = None
+ self._format = None
+ self._func = None
+ self._ident = None
+ self._include_name = None
+ self._length = None
+ self._map_regex_rule = None
+ self._mapping = None
+ # Possible values: [any, all, *]
+ self._matching = "any"
+ self._matching_rule = "any"
+ self._name = None
+ self._nullable = True
+ self._parent = parent
+ self._pattern = None
+ self._pattern_regexp = None
+ self._range = None
+ self._regex_mappings = None
+ self._required = False
+ self._schema = schema
+ self._schema_str = schema
+ self._sequence = None
+ self.strict_rule_validation = strict_rule_validation
+ self._type = None
+ self._type_class = None
+ self._unique = None
+ self._version = None
+
+ if isinstance(schema, dict):
+ self.init(schema, "")
+
+ @property
+ def allowempty_map(self):
+ return self._allowempty_map
+
+ @allowempty_map.setter
+ def allowempty_map(self, value):
+ self._allowempty_map = value
+
+ @property
+ def assertion(self):
+ return self._assertion
+
+ @assertion.setter
+ def assertion(self, value):
+ self._assertion = value
+
+ @property
+ def default(self):
+ return self._default
+
+ @default.setter
+ def default(self, value):
+ self._default = value
+
+ @property
+ def desc(self):
+ return self._desc
+
+ @desc.setter
+ def desc(self, value):
+ self._desc = value
+
+ @property
+ def enum(self):
+ return self._enum
+
+ @enum.setter
+ def enum(self, value):
+ self._enum = value
+
+ @property
+ def example(self):
+ return self._example
+
+ @example.setter
+ def example(self, value):
+ self._example = value
+
+ @property
+ def extensions(self):
+ return self._extensions
+
+ @extensions.setter
+ def extensions(self, value):
+ self._extensions = value
+
+ @property
+ def format(self):
+ return self._format
+
+ @format.setter
+ def format(self, value):
+ self._format = value
+
+ @property
+ def func(self):
+ return self._func
+
+ @func.setter
+ def func(self, value):
+ self._func = value
+
+ @property
+ def ident(self):
+ return self._ident
+
+ @ident.setter
+ def ident(self, value):
+ self._ident = value
+
+ @property
+ def include_name(self):
+ return self._include_name
+
+ @include_name.setter
+ def include_name(self, value):
+ self._include_name = value
+
+ @property
+ def length(self):
+ return self._length
+
+ @length.setter
+ def length(self, value):
+ self._length = value
+
+ @property
+ def map_regex_rule(self):
+ return self._map_regex_rule
+
+ @map_regex_rule.setter
+ def map_regex_rule(self, value):
+ self._map_regex_rule = value
+
+ @property
+ def mapping(self):
+ return self._mapping
+
+ @mapping.setter
+ def mapping(self, value):
+ self._mapping = value
+
+ @property
+ def matching(self):
+ return self._matching
+
+ @matching.setter
+ def matching(self, value):
+ self._matching = value
+
+ @property
+ def matching_rule(self):
+ return self._matching_rule
+
+ @matching_rule.setter
+ def matching_rule(self, value):
+ self._matching_rule = value
+
+ @property
+ def name(self):
+ return self._name
+
+ @name.setter
+ def name(self, value):
+ self._name = value
+
+ @property
+ def nullable(self):
+ return self._nullable
+
+ @nullable.setter
+ def nullable(self, value):
+ self._nullable = value
+
+ @property
+ def parent(self):
+ return self._parent
+
+ @parent.setter
+ def parent(self, value):
+ self._parent = value
+
+ @property
+ def pattern(self):
+ return self._pattern
+
+ @pattern.setter
+ def pattern(self, value):
+ self._pattern = value
+
+ @property
+ def pattern_regexp(self):
+ return self._pattern_regexp
+
+ @pattern_regexp.setter
+ def pattern_regexp(self, value):
+ self._pattern_regexp = value
+
+ @property
+ def range(self):
+ return self._range
+
+ @range.setter
+ def range(self, value):
+ self._range = value
+
+ @property
+ def regex_mappings(self):
+ return self._regex_mappings
+
+ @regex_mappings.setter
+ def regex_mappings(self, value):
+ self._regex_mappings = value
+
+ @property
+ def required(self):
+ return self._required
+
+ @required.setter
+ def required(self, value):
+ self._required = value
+
+ @property
+ def schema(self):
+ return self._schema
+
+ @schema.setter
+ def schema(self, value):
+ self._schema = value
+
+ @property
+ def schema_str(self):
+ return self._schema_str
+
+ @schema_str.setter
+ def schema_str(self, value):
+ self._schema_str = value
+
+ @property
+ def sequence(self):
+ return self._sequence
+
+ @sequence.setter
+ def sequence(self, value):
+ self._sequence = value
+
+ @property
+ def type(self):
+ return self._type
+
+ @type.setter
+ def type(self, value):
+ self._type = value
+
+ @property
+ def type_class(self):
+ return self._type_class
+
+ @type_class.setter
+ def type_class(self, value):
+ self._type_class = value
+
+ @property
+ def unique(self):
+ return self._unique
+
+ @unique.setter
+ def unique(self, value):
+ self._unique = value
+
+ @property
+ def version(self):
+ return self._version
+
+ @version.setter
+ def version(self, value):
+ self._version = value
+
+ def __str__(self):
+ return "Rule: {0}".format(str(self.schema_str))
+
+ def keywords(self):
+ """
+ Returns a list of all keywords that this rule object has defined.
+ A keyword is considered defined if the value it returns != None.
+ """
+ defined_keywords = [
+ ('allowempty_map', 'allowempty_map'),
+ ('assertion', 'assertion'),
+ ('default', 'default'),
+ ('class', 'class'),
+ ('desc', 'desc'),
+ ('enum', 'enum'),
+ ('example', 'example'),
+ ('extensions', 'extensions'),
+ ('format', 'format'),
+ ('func', 'func'),
+ ('ident', 'ident'),
+ ('include_name', 'include'),
+ ('length', 'length'),
+ ('map_regex_rule', 'map_regex_rule'),
+ ('mapping', 'mapping'),
+ ('matching', 'matching'),
+ ('matching_rule', 'matching_rule'),
+ ('name', 'name'),
+ ('nullable', 'nullable')
+ ('parent', 'parent'),
+ ('pattern', 'pattern'),
+ ('pattern_regexp', 'pattern_regexp'),
+ ('range', 'range'),
+ ('regex_mappings', 'regex_mappings'),
+ ('required', 'required'),
+ ('schema', 'schema'),
+ ('schema_str', 'schema_str'),
+ ('sequence', 'sequence'),
+ ('type', 'type'),
+ ('type_class', 'type_class'),
+ ('unique', 'unique'),
+ ('version', 'version'),
+ ]
+ found_keywords = []
+
+ for var_name, keyword_name in defined_keywords:
+ if getattr(self, var_name, None):
+ found_keywords.append(keyword_name)
+
+ return found_keywords
+
+ def init(self, schema, path):
+ """
+ """
+ log.debug(u"Init schema: %s", schema)
+
+ include = schema.get("include")
+
+ # Check if this item is a include, overwrite schema with include schema and continue to parse
+ if include:
+ log.debug(u"Found include tag...")
+ self.include_name = include
+ return
+
+ t = None
+ rule = self
+
+ if schema is not None:
+ if "type" not in schema:
+ # Mapping and sequence do not need explicit type defenitions
+ if any(sa in schema for sa in sequence_aliases):
+ t = "seq"
+ self.init_type_value(t, rule, path)
+ elif any(ma in schema for ma in mapping_aliases):
+ t = "map"
+ self.init_type_value(t, rule, path)
+ else:
+ t = DEFAULT_TYPE
+ self.type = t
+ else:
+ if not is_string(schema["type"]):
+ raise RuleError(
+ msg=u"Key 'type' in schema rule is not a string type (found %s)" % type(schema["type"]).__name__,
+ error_key=u"type.not_string",
+ path=path,
+ )
+
+ self.type = schema["type"]
+
+ self.schema_str = schema
+
+ if not t:
+ t = schema["type"]
+ self.init_type_value(t, rule, path)
+
+ func_mapping = {
+ "allowempty": self.init_allow_empty_map,
+ "assert": self.init_assert_value,
+ "class": lambda x, y, z: (),
+ "default": self.init_default_value,
+ "desc": self.init_desc_value,
+ "enum": self.init_enum_value,
+ "example": self.init_example,
+ "extensions": self.init_extensions,
+ "format": self.init_format_value,
+ "func": self.init_func,
+ "ident": self.init_ident_value,
+ "length": self.init_length_value,
+ "map": self.init_mapping_value,
+ "mapping": self.init_mapping_value,
+ "matching": self.init_matching,
+ "matching-rule": self.init_matching_rule,
+ "name": self.init_name_value,
+ "nul": self.init_nullable_value,
+ "nullable": self.init_nullable_value,
+ "pattern": self.init_pattern_value,
+ "range": self.init_range_value,
+ "req": self.init_required_value,
+ "required": self.init_required_value,
+ "seq": self.init_sequence_value,
+ "sequence": self.init_sequence_value,
+ "type": lambda x, y, z: (),
+ "unique": self.init_unique_value,
+ "version": self.init_version,
+ }
+
+ for k, v in schema.items():
+ if k in func_mapping:
+ func_mapping[k](v, rule, path)
+ elif k.startswith("schema;"):
+ # Schema tag is only allowed on top level of data
+ log.debug(u"Found schema tag...")
+ raise RuleError(
+ msg=u"Schema is only allowed on top level of schema file",
+ error_key=u"schema.not.toplevel",
+ path=path,
+ )
+ else:
+ raise RuleError(
+ msg=u"Unknown key: {0} found".format(k),
+ error_key=u"key.unknown",
+ path=path,
+ )
+
+ self.check_conflicts(schema, rule, path)
+
+ self.check_type_keywords(schema, rule, path)
+
+ def init_format_value(self, v, rule, path):
+ log.debug(u"Init format value : %s", path)
+
+ if is_string(v):
+ self._format = [v]
+ elif isinstance(v, list):
+ valid = True
+ for date_format in v:
+ if not isinstance(date_format, basestring):
+ valid = False
+
+ if valid:
+ self._format = v
+ else:
+ raise RuleError(
+ msg=u"All values in format list must be strings",
+ error_key=u"format.not_string",
+ path=path,
+ )
+ else:
+ raise RuleError(
+ msg=u"Value of format keyword: '{}' must be a string or list or string values".format(v),
+ error_key=u"format.not_string",
+ path=path,
+ )
+
+ valid_types = ("date", )
+
+ # Format is only supported when used with "type=date"
+ if self._type not in valid_types:
+ raise RuleError(
+ msg="Keyword format is only allowed when used with the following types: {0}".format(valid_types),
+ error_key=u"format.not_used_with_correct_type",
+ path=path,
+ )
+
+ def init_version(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init version value : {0}".format(path))
+
+ self._version = str(v)
+
+ def init_example(self, v, rule, path):
+ log.debug(u'Init example value : {0}'.format(path))
+
+ if not is_string(v):
+ raise RuleError(
+ msg=u"Value: {0} for keyword example must be a string".format(v),
+ error_key=u"example.not_string",
+ path=path,
+ )
+
+ self.desc = v
+
+ def init_length_value(self, v, rule, path):
+ log.debug(u'Init length value : {0}'.format(path))
+
+ supported_types = ["str", "int", "float", "number", "map", "seq"]
+
+ if not isinstance(v, dict):
+ raise RuleError(
+ msg=u"Length value is not a dict type: '{0}'".format(v),
+ error_key=u"length.not_map",
+ path=path,
+ )
+
+ if self.type not in supported_types:
+ raise RuleError(
+ msg=u"Length value type: '{0}' is not a supported type".format(self.type),
+ error_key=u"length.not_supported_type",
+ path=path,
+ )
+
+ # dict that should contain min, max, min-ex, max-ex keys
+ self.length = v
+
+ # This should validate that only min, max, min-ex, max-ex exists in the dict
+ for k, v in self.length.items():
+ if k not in ["max", "min", "max-ex", "min-ex"]:
+ raise RuleError(
+ msg=u"Unknown key: '{0}' found in length keyword".format(k),
+ error_key=u"length.unknown_key",
+ path=path,
+ )
+
+ if "max" in self.length and "max-ex" in self.length:
+ raise RuleError(
+ msg=u"'max' and 'max-ex' can't be used in the same length rule",
+ error_key=u"length.max_duplicate_keywords",
+ path=path,
+ )
+
+ if "min" in self.length and "min-ex" in self.length:
+ raise RuleError(
+ msg=u"'min' and 'min-ex' can't be used in the same length rule",
+ error_key=u"length.min_duplicate_keywords",
+ path=path,
+ )
+
+ max = self.length.get("max")
+ min = self.length.get("min")
+ max_ex = self.length.get("max-ex")
+ min_ex = self.length.get("min-ex")
+
+ if max is not None and not is_number(max) or is_bool(max):
+ raise RuleError(
+ msg=u"Value: '{0}' for 'max' keyword is not a number".format(v),
+ error_key=u"length.max.not_number",
+ path=path,
+ )
+
+ if min is not None and not is_number(min) or is_bool(min):
+ raise RuleError(
+ msg=u"Value: '{0}' for 'min' keyword is not a number".format(v),
+ error_key=u"length.min.not_number",
+ path=path,
+ )
+
+ if max_ex is not None and not is_number(max_ex) or is_bool(max_ex):
+ raise RuleError(
+ msg=u"Value: '{0}' for 'max-ex' keyword is not a number".format(v),
+ error_key=u"length.max_ex.not_number",
+ path=path,
+ )
+
+ if min_ex is not None and not is_number(min_ex) or is_bool(min_ex):
+ raise RuleError(
+ msg=u"Value: '{0}' for 'min-ex' keyword is not a number".format(v),
+ error_key=u"length.min_ex.not_number",
+ path=path,
+ )
+
+ # only numbers allow negative lengths
+ # string, map and seq require non negative lengtsh
+ if self.type not in ["int", "float", "number"]:
+ if min is not None and min < 0:
+ raise RuleError(
+ msg=u"Value for 'min' can't be negative in case of type {0}.".format(self.type),
+ error_key=u"length.min_negative",
+ path=path,
+ )
+ elif min_ex is not None and min_ex < 0:
+ raise RuleError(
+ msg=u"Value for 'min-ex' can't be negative in case of type {0}.".format(self.type),
+ error_key=u"length.min-ex_negative",
+ path=path,
+ )
+ if max is not None and max < 0:
+ raise RuleError(
+ msg=u"Value for 'max' can't be negative in case of type {0}.".format(self.type),
+ error_key=u"length.max_negative",
+ path=path,
+ )
+ elif max_ex is not None and max_ex < 0:
+ raise RuleError(
+ msg=u"Value for 'max-ex' can't be negative in case of type {0}.".format(self.type),
+ error_key=u"length.max-ex_negative",
+ path=path,
+ )
+
+ if max is not None:
+ if min is not None and max < min:
+ raise RuleError(
+ msg=u"Value for 'max' can't be less then value for 'min'. {0} < {1}".format(max, min),
+ error_key=u"length.max_lt_min",
+ path=path,
+ )
+ elif min_ex is not None and max <= min_ex:
+ raise RuleError(
+ msg=u"Value for 'max' can't be less then value for 'min-ex'. {0} <= {1}".format(max, min_ex),
+ error_key=u"length.max_le_min-ex",
+ path=path,
+ )
+ elif max_ex is not None:
+ if min is not None and max_ex < min:
+ raise RuleError(
+ msg=u"Value for 'max-ex' can't be less then value for 'min'. {0} < {1}".format(max_ex, min),
+ error_key=u"length.max-ex_le_min",
+ path=path,
+ )
+ elif min_ex is not None and max_ex <= min_ex:
+ raise RuleError(
+ msg=u"Value for 'max-ex' can't be less then value for 'min-ex'. {0} <= {1}".format(max_ex, min_ex),
+ error_key=u"length.max-ex_le_min-ex",
+ path=path,
+ )
+
+ def init_func(self, v, rule, path):
+ """
+ """
+ if not is_string(v):
+ raise RuleError(
+ msg=u"Value: {0} for func keyword must be a string".format(v),
+ error_key=u"func.notstring",
+ path=path,
+ )
+
+ self.func = v
+
+ def init_extensions(self, v, rule, path):
+ """
+ """
+ if not isinstance(v, list):
+ raise RuleError(
+ msg=u"Extension definition should be a list",
+ error_key=u"extension.not_list",
+ path=path,
+ )
+
+ # TODO: Add limitation that this keyword can only be used at the top level of the file
+
+ self.extensions = v
+
+ def init_matching_rule(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init matching-rule: %s", path)
+ log.debug(u"%s %s", v, rule)
+
+ # Verify that the provided rule is part of one of the allowed one
+ allowed = ["any", "all"]
+ # ["none", "one"] Is currently awaiting proper implementation
+ if v not in allowed:
+ raise RuleError(
+ msg=u"Specified rule in key: {0} is not part of allowed rule set : {1}".format(v, allowed),
+ error_key=u"matching_rule.not_allowed",
+ path=path,
+ )
+ else:
+ self.matching_rule = v
+
+ def init_allow_empty_map(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init allow empty value: %s", path)
+ log.debug(u"Type: %s : %s", v, rule)
+
+ self.allowempty_map = v
+
+ def init_type_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init type value : %s", path)
+ log.debug(u"Type: %s %s", v, rule)
+
+ if v is None:
+ v = DEFAULT_TYPE
+
+ self.type = v
+ self.type_class = type_class(v)
+
+ if not is_builtin_type(self.type):
+ raise RuleError(
+ msg=u"Type: {0} is not any of the known types".format(self.type),
+ error_key=u"type.unknown",
+ path=path,
+ )
+
+ def init_matching(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init matching rule : %s", path)
+
+ valid_values = ["any", "all", "*"]
+
+ if str(v) not in valid_values:
+ raise RuleError(
+ msg=u"matching value: {0} is not one of {1}".format(str(v), valid_values),
+ error_key=u"matching_rule.invalid",
+ path=path,
+ )
+
+ self.matching = str(v)
+
+ def init_name_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init name value : %s", path)
+
+ if not is_string(v):
+ raise RuleError(
+ msg=u"Value: {0} for keyword name must be a string".format(v),
+ error_key=u"name.not_string",
+ path=path,
+ )
+
+ self.name = v
+
+ def init_nullable_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init nullable value : %s", path)
+
+ if not isinstance(v, bool):
+ raise RuleError(
+ msg=u"Value: '{0}' for nullable keyword must be a boolean".format(v),
+ error_key=u"nullable.not_bool",
+ path=path,
+ )
+
+ self.nullable = v
+
+ def init_desc_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init descr value : %s", path)
+
+ if not is_string(v):
+ raise RuleError(
+ msg=u"Value: {0} for keyword desc must be a string".format(v),
+ error_key=u"desc.not_string",
+ path=path,
+ )
+
+ self.desc = v
+
+ def init_required_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init required value : %s", path)
+
+ if not is_bool(v):
+ raise RuleError(
+ msg=u"Value: '{0}' for required keyword must be a boolean".format(v),
+ error_key=u"required.not_bool",
+ path=path,
+ )
+ self.required = v
+
+ def init_pattern_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init pattern value : %s", path)
+
+ if not is_string(v):
+ raise RuleError(
+ msg=u"Value of pattern keyword: '{0}' is not a string".format(v),
+ error_key=u"pattern.not_string",
+ path=path,
+ )
+
+ self.pattern = v
+
+ if self.schema_str["type"] == "map":
+ raise RuleError(
+ msg=u"Keyword pattern is not allowed inside map",
+ error_key=u"pattern.not_allowed_in_map",
+ path=path,
+ )
+
+ # TODO: Some form of validation of the regexp? it exists in the source
+
+ try:
+ self.pattern_regexp = re.compile(self.pattern)
+ except Exception:
+ raise RuleError(
+ msg=u"Syntax error when compiling regex pattern: {0}".format(self.pattern_regexp),
+ error_key=u"pattern.syntax_error",
+ path=path,
+ )
+
+ def init_enum_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init enum value : %s", path)
+
+ if not isinstance(v, list):
+ raise RuleError(
+ msg=u"Enum is not a sequence",
+ error_key=u"enum.not_seq",
+ path=path,
+ )
+ self.enum = v
+
+ if is_collection_type(self.type):
+ raise RuleError(
+ msg=u"Enum is not a scalar",
+ error_key=u"enum.not_scalar",
+ path=path,
+ )
+
+ lookup = set()
+ for item in v:
+ if not isinstance(item, self.type_class):
+ raise RuleError(
+ msg=u"Item: '{0}' in enum is not of correct class type: '{1}'".format(item, self.type_class),
+ error_key=u"enum.type.unmatch",
+ path=path,
+ )
+
+ if item in lookup:
+ raise RuleError(
+ msg=u"Duplicate items: '{0}' found in enum".format(item),
+ error_key=u"enum.duplicate_items",
+ path=path,
+ )
+
+ lookup.add(item)
+
+ def init_assert_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init assert value : %s", path)
+
+ if not is_string(v):
+ raise RuleError(
+ msg=u"Value: '{0}' for keyword 'assert' is not a string".format(v),
+ error_key=u"assert.not_str",
+ path=path,
+ )
+
+ self.assertion = v
+
+ if any(k in self.assertion for k in (';', 'import', '__import__')):
+ raise RuleError(
+ msg=u"Value: '{assertion}' contain invalid content that is not allowed to be present in assertion keyword".format(assertion=self.assertion),
+ error_key=u"assert.unsupported_content",
+ path=path,
+ )
+
+ def init_range_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init range value : %s", path)
+
+ supported_types = ["str", "int", "float", "number", "map", "seq"]
+
+ if not isinstance(v, dict):
+ raise RuleError(
+ msg=u"Range value is not a dict type: '{0}'".format(v),
+ error_key=u"range.not_map",
+ path=path,
+ )
+
+ if self.type not in supported_types:
+ raise RuleError(
+ msg=u"Range value type: '{0}' is not a supported type".format(self.type),
+ error_key=u"range.not_supported_type",
+ path=path,
+ )
+
+ # dict that should contain min, max, min-ex, max-ex keys
+ self.range = v
+
+ # This should validate that only min, max, min-ex, max-ex exists in the dict
+ for k, v in self.range.items():
+ if k not in ["max", "min", "max-ex", "min-ex"]:
+ raise RuleError(
+ msg=u"Unknown key: '{0}' found in range keyword".format(k),
+ error_key=u"range.unknown_key",
+ path=path,
+ )
+
+ if "max" in self.range and "max-ex" in self.range:
+ raise RuleError(
+ msg=u"'max' and 'max-ex' can't be used in the same range rule",
+ error_key=u"range.max_duplicate_keywords",
+ path=path,
+ )
+
+ if "min" in self.range and "min-ex" in self.range:
+ raise RuleError(
+ msg=u"'min' and 'min-ex' can't be used in the same range rule",
+ error_key=u"range.min_duplicate_keywords",
+ path=path,
+ )
+
+ max = self.range.get("max")
+ min = self.range.get("min")
+ max_ex = self.range.get("max-ex")
+ min_ex = self.range.get("min-ex")
+
+ if max is not None and not is_number(max) or is_bool(max):
+ raise RuleError(
+ msg=u"Value: '{0}' for 'max' keyword is not a number".format(v),
+ error_key=u"range.max.not_number",
+ path=path,
+ )
+
+ if min is not None and not is_number(min) or is_bool(min):
+ raise RuleError(
+ msg=u"Value: '{0}' for 'min' keyword is not a number".format(v),
+ error_key=u"range.min.not_number",
+ path=path,
+ )
+
+ if max_ex is not None and not is_number(max_ex) or is_bool(max_ex):
+ raise RuleError(
+ msg=u"Value: '{0}' for 'max-ex' keyword is not a number".format(v),
+ error_key=u"range.max_ex.not_number",
+ path=path,
+ )
+
+ if min_ex is not None and not is_number(min_ex) or is_bool(min_ex):
+ raise RuleError(
+ msg=u"Value: '{0}' for 'min-ex' keyword is not a number".format(v),
+ error_key=u"range.min_ex.not_number",
+ path=path,
+ )
+
+ # only numbers allow negative ranges
+ # string, map and seq require non negative ranges
+ if self.type not in ["int", "float", "number"]:
+ if min is not None and min < 0:
+ raise RuleError(
+ msg=u"Value for 'min' can't be negative in case of type {0}.".format(self.type),
+ error_key=u"range.min_negative",
+ path=path,
+ )
+ elif min_ex is not None and min_ex < 0:
+ raise RuleError(
+ msg=u"Value for 'min-ex' can't be negative in case of type {0}.".format(self.type),
+ error_key=u"range.min-ex_negative",
+ path=path,
+ )
+ if max is not None and max < 0:
+ raise RuleError(
+ msg=u"Value for 'max' can't be negative in case of type {0}.".format(self.type),
+ error_key=u"range.max_negative",
+ path=path,
+ )
+ elif max_ex is not None and max_ex < 0:
+ raise RuleError(
+ msg=u"Value for 'max-ex' can't be negative in case of type {0}.".format(self.type),
+ error_key=u"range.max-ex_negative",
+ path=path,
+ )
+
+ if max is not None:
+ if min is not None and max < min:
+ raise RuleError(
+ msg=u"Value for 'max' can't be less then value for 'min'. {0} < {1}".format(max, min),
+ error_key=u"range.max_lt_min",
+ path=path,
+ )
+ elif min_ex is not None and max <= min_ex:
+ raise RuleError(
+ msg=u"Value for 'max' can't be less then value for 'min-ex'. {0} <= {1}".format(max, min_ex),
+ error_key=u"range.max_le_min-ex",
+ path=path,
+ )
+ elif max_ex is not None:
+ if min is not None and max_ex < min:
+ raise RuleError(
+ msg=u"Value for 'max-ex' can't be less then value for 'min'. {0} < {1}".format(max_ex, min),
+ error_key=u"range.max-ex_le_min",
+ path=path,
+ )
+ elif min_ex is not None and max_ex <= min_ex:
+ raise RuleError(
+ msg=u"Value for 'max-ex' can't be less then value for 'min-ex'. {0} <= {1}".format(max_ex, min_ex),
+ error_key=u"range.max-ex_le_min-ex",
+ path=path,
+ )
+
+ def init_ident_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init ident value : %s", path)
+
+ if v is None or not is_bool(v):
+ raise RuleError(
+ msg=u"Value: '{0}' of 'ident' is not a boolean value".format(v),
+ error_key=u"ident.not_bool",
+ path=path,
+ )
+
+ self.ident = bool(v)
+ self.required = True
+
+ if is_collection_type(self.type):
+ raise RuleError(
+ msg=u"Value: '{0}' of 'ident' is not a scalar value".format(v),
+ error_key=u"ident.not_scalar",
+ path=path,
+ )
+
+ if path == "":
+ raise RuleError(
+ msg=u"Keyword 'ident' can't be on root level of schema",
+ error_key=u"ident.not_on_root_level",
+ path=path,
+ )
+
+ if self.parent is None or not self.parent.type == "map":
+ raise RuleError(
+ msg=u"Keword 'ident' can't be inside 'map'",
+ error_key=u"ident.not_in_map",
+ path=path,
+ )
+
+ def init_unique_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init unique value : %s", path)
+
+ if not is_bool(v):
+ raise RuleError(
+ msg=u"Value: '{0}' for 'unique' keyword is not boolean".format(v),
+ error_key=u"unique.not_bool",
+ path=path,
+ )
+
+ self.unique = v
+
+ if is_collection_type(self.type):
+ raise RuleError(
+ msg=u"Type of the value: '{0}' for 'unique' keyword is not a scalar type".format(self.type),
+ error_key=u"unique.not_scalar",
+ path=path,
+ )
+ if path == "":
+ raise RuleError(
+ msg=u"Keyword 'unique' can't be on root level of schema",
+ error_key=u"unique.not_on_root_level",
+ path=path,
+ )
+
+ def init_sequence_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init sequence value : %s", path)
+
+ if v is not None and not isinstance(v, list):
+ raise RuleError(
+ msg=u"Sequence keyword is not a list",
+ error_key=u"sequence.not_seq",
+ path=path,
+ )
+
+ self.sequence = v
+
+ if self.sequence is None or len(self.sequence) == 0:
+ raise RuleError(
+ msg=u"Sequence contains 0 elements",
+ error_key=u"sequence.no_elements",
+ path=path,
+ )
+
+ tmp_seq = []
+
+ for i, e in enumerate(self.sequence):
+ elem = e or {}
+
+ rule = Rule(None, self)
+ rule.init(elem, u"{0}/sequence/{1}".format(path, i))
+
+ tmp_seq.append(rule)
+
+ self.sequence = tmp_seq
+
+ return rule
+
+ def init_mapping_value(self, v, rule, path):
+ """
+ """
+ # Check for duplicate use of 'map' and 'mapping'
+ if self.mapping:
+ raise RuleError(
+ msg=u"Keywords 'map' and 'mapping' can't be used on the same level",
+ error_key=u"mapping.duplicate_keywords",
+ path=path,
+ )
+
+ log.debug(u"Init mapping value : %s", path)
+
+ if v is not None and not isinstance(v, dict):
+ raise RuleError(
+ msg=u"Value for keyword 'map/mapping' is not a dict",
+ error_key=u"mapping.not_dict",
+ path=path,
+ )
+
+ if v is None or len(v) == 0:
+ raise RuleError(
+ msg=u"Mapping do not contain any elements",
+ error_key=u"mapping.no_elements",
+ path=path,
+ )
+
+ self.mapping = {}
+ self.regex_mappings = []
+
+ for k, v in v.items():
+ if v is None:
+ v = {}
+
+ # Check if this is a regex rule. Handle specially
+ if k.startswith("regex;") or k.startswith("re;"):
+ log.debug(u"Found regex map rule")
+ regex = k.split(";", 1)
+ if len(regex) != 2:
+ raise RuleError(
+ msg=u"Value: '{0}' for keyword regex is malformed".format(k),
+ error_key=u"mapping.regex.malformed",
+ path=path,
+ )
+ else:
+ regex = regex[1]
+ try:
+ re.compile(regex)
+ except Exception as e:
+ log.debug(e)
+ raise RuleError(
+ msg=u"Unable to compile regex '{0}'".format(regex),
+ error_key=u"mapping.regex.compile_error",
+ path=path,
+ )
+
+ regex_rule = Rule(None, self)
+ regex_rule.init(v, u"{0}/mapping;regex/{1}".format(path, regex[1:-1]))
+ regex_rule.map_regex_rule = regex[1:-1]
+ self.regex_mappings.append(regex_rule)
+ self.mapping[k] = regex_rule
+ else:
+ rule = Rule(None, self)
+ rule.init(v, u"{0}/mapping/{1}".format(path, k))
+ self.mapping[k] = rule
+
+ return rule
+
+ def init_default_value(self, v, rule, path):
+ """
+ """
+ log.debug(u"Init default value : %s", path)
+ self.default = v
+
+ if is_collection_type(self.type):
+ raise RuleError(
+ msg=u"Value: {0} for keyword 'default' is not a scalar type".format(v),
+ error_key=u"default.not_scalar",
+ path=path,
+ )
+
+ if self.type == "map" or self.type == "seq":
+ raise RuleError(
+ msg=u"Value: {0} for keyword 'default' is not a scalar type".format(v),
+ error_key=u"default.not_scalar",
+ path=path,
+ )
+
+ if not isinstance(v, self.type_class):
+ raise RuleError(
+ msg=u"Types do not match: '{0}' --> '{1}'".format(v, self.type_class),
+ error_key=u"default.type.unmatch",
+ path=path,
+ )
+
+ def check_type_keywords(self, schema, rule, path):
+ """
+ All supported keywords:
+ - allowempty_map
+ - assertion
+ - class
+ - date
+ - default
+ - desc
+ - enum
+ - example
+ - extensions
+ - func
+ - ident
+ - include_name
+ - map_regex_rule
+ - mapping
+ - matching
+ - matching_rule
+ - name
+ - nullable
+ - pattern
+ - pattern_regexp
+ - range
+ - regex_mappings
+ - required
+ - schema
+ - sequence
+ - type
+ - type_class
+ - unique
+ - version
+ """
+ if not self.strict_rule_validation:
+ return
+
+ global_keywords = ['type', 'desc', 'example', 'extensions', 'name', 'nullable', 'version', 'func', 'include']
+ all_allowed_keywords = {
+ 'str': global_keywords + ['default', 'pattern', 'range', 'enum', 'required', 'unique', 'req'],
+ 'int': global_keywords + ['default', 'range', 'enum', 'required', 'unique'],
+ 'float': global_keywords + ['default', 'enum', 'range', 'required'],
+ 'number': global_keywords + ['default', 'enum'],
+ 'bool': global_keywords + ['default', 'enum'],
+ 'map': global_keywords + ['allowempty_map', 'mapping', 'map', 'allowempty', 'required', 'matching-rule', 'range', 'class'],
+ 'seq': global_keywords + ['sequence', 'seq', 'required', 'range', 'matching'],
+ 'sequence': global_keywords + ['sequence', 'seq', 'required'],
+ 'mapping': global_keywords + ['mapping', 'seq', 'required'],
+ 'timestamp': global_keywords + ['default', 'enum'],
+ 'date': global_keywords + ['default', 'enum'],
+ 'symbol': global_keywords + ['default', 'enum'],
+ 'scalar': global_keywords + ['default', 'enum'],
+ 'text': global_keywords + ['default', 'enum', 'pattern'],
+ 'any': global_keywords + ['default', 'enum'],
+ 'enum': global_keywords + ['default', 'enum'],
+ 'none': global_keywords + ['default', 'enum', 'required'],
+ }
+ rule_type = schema.get('type')
+ if not rule_type:
+ # Special cases for the "shortcut methods"
+ if 'sequence' in schema or 'seq' in schema:
+ rule_type = 'sequence'
+ elif 'mapping' in schema or 'map' in schema:
+ rule_type = 'mapping'
+
+ allowed_keywords = all_allowed_keywords.get(rule_type)
+ if not allowed_keywords and 'sequence' not in schema and 'mapping' not in schema and 'seq' not in schema and 'map' not in schema:
+ raise RuleError('No allowed keywords found for type: {0}'.format(rule_type))
+
+ for k, v in schema.items():
+ if k not in allowed_keywords:
+ raise RuleError('Keyword "{0}" is not supported for type: "{1}" '.format(k, rule_type))
+
+ def check_conflicts(self, schema, rule, path):
+ """
+ """
+ log.debug(u"Checking for conflicts : %s", path)
+
+ if self.type == "seq":
+ if all(sa not in schema for sa in sequence_aliases):
+ raise SchemaConflict(
+ msg="Type is sequence but no sequence alias found on same level",
+ error_key=u"seq.no_sequence",
+ path=path,
+ )
+
+ if self.enum is not None:
+ raise SchemaConflict(
+ msg="Sequence and enum can't be on the same level in the schema",
+ error_key=u"seq.conflict.enum",
+ path=path,
+ )
+
+ if self.pattern is not None:
+ raise SchemaConflict(
+ msg="Sequence and pattern can't be on the same level in the schema",
+ error_key=u"seq.conflict.pattern",
+ path=path,
+ )
+
+ if self.mapping is not None:
+ raise SchemaConflict(
+ msg="Sequence and mapping can't be on the same level in the schema",
+ error_key=u"seq.conflict.mapping",
+ path=path,
+ )
+ elif self.type == "map":
+ if all(ma not in schema for ma in mapping_aliases) and not self.allowempty_map:
+ raise SchemaConflict(
+ msg="Type is mapping but no mapping alias found on same level",
+ error_key=u"map.no_mapping",
+ path=path,
+ )
+
+ if self.enum is not None:
+ raise SchemaConflict(
+ msg="Mapping and enum can't be on the same level in the schema",
+ error_key=u"map.conflict.enum",
+ path=path,
+ )
+
+ if self.sequence is not None:
+ raise SchemaConflict(
+ msg="Mapping and sequence can't be on the same level in the schema",
+ error_key=u"map.conflict.sequence",
+ path=path,
+ )
+ else:
+ if self.sequence is not None:
+ raise SchemaConflict(
+ msg="Scalar and sequence can't be on the same level in the schema",
+ error_key=u"scalar.conflict.sequence",
+ path=path,
+ )
+
+ if self.mapping is not None:
+ raise SchemaConflict(
+ msg="Scalar and mapping can't be on the same level in the schema",
+ error_key=u"scalar.conflict.mapping",
+ path=path,
+ )
+
+ if self.enum is not None and self.range is not None:
+ raise SchemaConflict(
+ msg="Enum and range can't be on the same level in the schema",
+ error_key=u"enum.conflict.range",
+ path=path,
+ )
diff --git a/pykwalify/types.py b/pykwalify/types.py
new file mode 100644
index 0000000..beb33ad
--- /dev/null
+++ b/pykwalify/types.py
@@ -0,0 +1,160 @@
+# -*- coding: utf-8 -*-
+
+""" pyKwalify - types.py """
+
+# python stdlib
+import datetime
+from pykwalify.compat import basestring, bytes
+
+DEFAULT_TYPE = "str"
+
+
+class TextMeta(type):
+ def __instancecheck__(self, instance):
+ return is_text(instance)
+
+
+class text(object):
+ __metaclass__ = TextMeta
+
+
+_types = {
+ "str": str,
+ "int": int,
+ "float": float,
+ "number": None,
+ "bool": bool,
+ "map": dict,
+ "seq": list,
+ "timestamp": datetime.datetime,
+ "date": datetime.date,
+ "symbol": str,
+ "scalar": None,
+ "text": text,
+ "any": object,
+ "enum": str,
+ "none": None
+}
+
+
+sequence_aliases = ["sequence", "seq"]
+mapping_aliases = ["map", "mapping"]
+
+
+def type_class(type):
+ return _types[type]
+
+
+def is_builtin_type(type):
+ return type in _types
+
+
+def is_collection_type(type):
+ return type.lower().strip() == "map" or type.lower().strip() == "seq"
+
+
+def is_scalar_type(type):
+ return not is_collection_type(type)
+
+
+def is_collection(obj):
+ return isinstance(obj, dict) or isinstance(obj, list)
+
+
+def is_scalar(obj):
+ return not is_collection(obj) and obj is not None
+
+
+def is_correct_type(obj, type):
+ return isinstance(obj, type)
+
+
+def is_string(obj):
+ return isinstance(obj, basestring) or isinstance(obj, bytes)
+
+
+def is_int(obj):
+ """
+ True & False is not considered valid integers even if python considers them 1 & 0 in some versions
+ """
+ return isinstance(obj, int) and not isinstance(obj, bool)
+
+
+def is_bool(obj):
+ return isinstance(obj, bool)
+
+
+def is_float(obj):
+ """
+ Valid types are:
+ - objects of float type
+ - Strings that can be converted to float. For example '1e-06'
+ """
+ is_f = isinstance(obj, float)
+ if not is_f:
+ try:
+ float(obj)
+ is_f = True
+ except (ValueError, TypeError):
+ is_f = False
+ return is_f and not is_bool(obj)
+
+
+def is_number(obj):
+ return is_int(obj) or is_float(obj)
+
+
+def is_text(obj):
+ return (is_string(obj) or is_number(obj)) and is_bool(obj) is False
+
+
+def is_any(obj):
+ return True
+
+
+def is_enum(obj):
+ return isinstance(obj, basestring)
+
+
+def is_none(obj):
+ return obj is None
+
+
+def is_sequence_alias(alias):
+ return alias in sequence_aliases
+
+
+def is_mapping_alias(alias):
+ return alias in mapping_aliases
+
+
+def is_timestamp(obj):
+ """
+ Yaml either have automatically converted it to a datetime object
+ or it is a string that will be validated later.
+ """
+ return isinstance(obj, datetime.datetime) or is_string(obj) or is_int(obj) or is_float(obj)
+
+
+def is_date(obj):
+ """
+ :param obj: Object that is to be validated
+ :return: True/False if obj is valid date object
+ """
+ return isinstance(obj, basestring) or isinstance(obj, datetime.date)
+
+
+tt = {
+ "str": is_string,
+ "int": is_int,
+ "bool": is_bool,
+ "float": is_float,
+ "number": is_number,
+ "text": is_text,
+ "any": is_any,
+ "enum": is_enum,
+ "none": is_none,
+ "timestamp": is_timestamp,
+ "scalar": is_scalar,
+ "date": is_date,
+}