diff options
author | Andrej Shadura <andrew.shadura@collabora.co.uk> | 2020-04-22 22:42:05 +0200 |
---|---|---|
committer | Andrej Shadura <andrew.shadura@collabora.co.uk> | 2020-04-22 22:42:05 +0200 |
commit | f6fd16282811d1a686b5ae611351b0f8eed3c008 (patch) | |
tree | dc37399cc3aefb4e733c3307d8a4fdd5c0438e74 /ldap3/protocol/formatters/validators.py | |
parent | 4083ce411fb1d311316baa250fb271bfb0b63058 (diff) |
New upstream version 2.7
Diffstat (limited to 'ldap3/protocol/formatters/validators.py')
-rw-r--r-- | ldap3/protocol/formatters/validators.py | 785 |
1 files changed, 503 insertions, 282 deletions
diff --git a/ldap3/protocol/formatters/validators.py b/ldap3/protocol/formatters/validators.py index a332c1a..3ab300d 100644 --- a/ldap3/protocol/formatters/validators.py +++ b/ldap3/protocol/formatters/validators.py @@ -1,282 +1,503 @@ -"""
-"""
-
-# Created on 2016.08.09
-#
-# Author: Giovanni Cannata
-#
-# Copyright 2016 - 2018 Giovanni Cannata
-#
-# This file is part of ldap3.
-#
-# ldap3 is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License as published
-# by the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# ldap3 is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with ldap3 in the COPYING and COPYING.LESSER files.
-# If not, see <http://www.gnu.org/licenses/>.
-
-from datetime import datetime
-from calendar import timegm
-from uuid import UUID
-
-from ... import SEQUENCE_TYPES, STRING_TYPES
-from .formatters import format_time, format_ad_timestamp
-from ...utils.conv import to_raw, to_unicode
-
-# Validators return True if value is valid, False if value is not valid,
-# or a value different from True and False that is a valid value to substitute to the input value
-
-
-def check_type(input_value, value_type):
- if isinstance(input_value, value_type):
- return True
-
- if isinstance(input_value, SEQUENCE_TYPES):
- for value in input_value:
- if not isinstance(value, value_type):
- return False
- return True
-
- return False
-
-
-def always_valid(input_value):
- return True
-
-
-def validate_generic_single_value(input_value):
- if not isinstance(input_value, SEQUENCE_TYPES):
- return True
-
- try: # object couldn't have a __len__ method
- if len(input_value) == 1:
- return True
- except Exception:
- pass
-
- return False
-
-
-def validate_minus_one(input_value):
- """Accept -1 only (used by pwdLastSet in AD)
- """
- if not isinstance(input_value, SEQUENCE_TYPES):
- if input_value == -1 or input_value == '-1':
- return True
-
- try: # object couldn't have a __len__ method
- if len(input_value) == 1 and input_value == -1 or input_value == '-1':
- return True
- except Exception:
- pass
-
- return False
-
-
-def validate_integer(input_value):
- if check_type(input_value, (float, bool)):
- return False
-
- if str is bytes: # Python 2, check for long too
- if check_type(input_value, (int, long)):
- return True
- else: # Python 3, int only
- if check_type(input_value, int):
- return True
-
- sequence = True # indicates if a sequence must be returned
- if not isinstance(input_value, SEQUENCE_TYPES):
- sequence = False
- input_value = [input_value]
- else:
- sequence = True # indicates if a sequence must be returned
-
- valid_values = [] # builds a list of valid int values
- from decimal import Decimal, InvalidOperation
- for element in input_value:
- try: # try to convert any type to int, an invalid conversion raise TypeError or ValueError, doublecheck with Decimal type, if both are valid and equal then then int() value is used
- value = to_unicode(element) if isinstance(element, bytes) else element
- decimal_value = Decimal(value)
- int_value = int(value)
- if decimal_value == int_value:
- valid_values.append(int_value)
- else:
- return False
- except (ValueError, TypeError, InvalidOperation):
- return False
-
- if sequence:
- return valid_values
- else:
- return valid_values[0]
-
-
-def validate_bytes(input_value):
- return check_type(input_value, bytes)
-
-
-def validate_boolean(input_value):
- # it could be a real bool or the string TRUE or FALSE, # only a single valued is allowed
- if validate_generic_single_value(input_value): # valid only if a single value or a sequence with a single element
- if isinstance(input_value, SEQUENCE_TYPES):
- input_value = input_value[0]
- if isinstance(input_value, bool):
- if input_value:
- return 'TRUE'
- else:
- return 'FALSE'
- if isinstance(input_value, STRING_TYPES):
- if input_value.lower() == 'true':
- return 'TRUE'
- elif input_value.lower() == 'false':
- return 'FALSE'
-
- return False
-
-
-def validate_time(input_value):
- # if datetime object doesn't have a timezone it's considered local time and is adjusted to UTC
- if not isinstance(input_value, SEQUENCE_TYPES):
- sequence = False
- input_value = [input_value]
- else:
- sequence = True # indicates if a sequence must be returned
-
- valid_values = []
- changed = False
- for element in input_value:
- if isinstance(element, STRING_TYPES): # tries to check if it is already be a Generalized Time
- if isinstance(format_time(to_raw(element)), datetime): # valid Generalized Time string
- valid_values.append(element)
- else:
- return False
- elif isinstance(element, datetime):
- changed = True
- if element.tzinfo: # a datetime with a timezone
- valid_values.append(element.strftime('%Y%m%d%H%M%S%z'))
- else: # datetime without timezone, assumed local and adjusted to UTC
- offset = datetime.now() - datetime.utcnow()
- valid_values.append((element - offset).strftime('%Y%m%d%H%M%SZ'))
- else:
- return False
-
- if changed:
- if sequence:
- return valid_values
- else:
- return valid_values[0]
- else:
- return True
-
-
-def validate_ad_timestamp(input_value):
- """
- Active Directory stores date/time values as the number of 100-nanosecond intervals
- that have elapsed since the 0 hour on January 1, 1601 till the date/time that is being stored.
- The time is always stored in Greenwich Mean Time (GMT) in the Active Directory.
- """
- if not isinstance(input_value, SEQUENCE_TYPES):
- sequence = False
- input_value = [input_value]
- else:
- sequence = True # indicates if a sequence must be returned
-
- valid_values = []
- changed = False
- for element in input_value:
- if isinstance(element, STRING_TYPES): # tries to check if it is already be a AD timestamp
- if isinstance(format_ad_timestamp(to_raw(element)), datetime): # valid Generalized Time string
- valid_values.append(element)
- else:
- return False
- elif isinstance(element, datetime):
- changed = True
- if element.tzinfo: # a datetime with a timezone
- valid_values.append(to_raw((timegm((element).utctimetuple()) + 11644473600) * 10000000, encoding='ascii'))
- else: # datetime without timezone, assumed local and adjusted to UTC
- offset = datetime.now() - datetime.utcnow()
- valid_values.append(to_raw((timegm((element - offset).timetuple()) + 11644473600) * 10000000, encoding='ascii'))
- else:
- return False
-
- if changed:
- if sequence:
- return valid_values
- else:
- return valid_values[0]
- else:
- return True
-
-
-def validate_uuid(input_value):
- """
- object guid in uuid format
- """
- if not isinstance(input_value, SEQUENCE_TYPES):
- sequence = False
- input_value = [input_value]
- else:
- sequence = True # indicates if a sequence must be returned
-
- valid_values = []
- changed = False
- for element in input_value:
- if isinstance(element, (bytes, bytearray)): # assumes bytes are valid
- valid_values.append(element)
- elif isinstance(element, STRING_TYPES):
- try:
- valid_values.append(UUID(element).bytes)
- changed = True
- except ValueError:
- return False
- else:
- return False
-
- if changed:
- if sequence:
- return valid_values
- else:
- return valid_values[0]
- else:
- return True
-
-
-def validate_uuid_le(input_value):
- """
- Active Directory stores objectGUID in uuid_le format
- """
- if not isinstance(input_value, SEQUENCE_TYPES):
- sequence = False
- input_value = [input_value]
- else:
- sequence = True # indicates if a sequence must be returned
-
- valid_values = []
- changed = False
- for element in input_value:
- if isinstance(element, (bytes, bytearray)): # assumes bytes are valid
- valid_values.append(element)
- elif isinstance(element, STRING_TYPES):
- try:
- valid_values.append(UUID(element).bytes_le)
- changed = True
- except ValueError:
- return False
- else:
- return False
-
- if changed:
- if sequence:
- return valid_values
- else:
- return valid_values[0]
- else:
- return True
+""" +""" + +# Created on 2016.08.09 +# +# Author: Giovanni Cannata +# +# Copyright 2016 - 2020 Giovanni Cannata +# +# This file is part of ldap3. +# +# ldap3 is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ldap3 is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with ldap3 in the COPYING and COPYING.LESSER files. +# If not, see <http://www.gnu.org/licenses/>. +from binascii import a2b_hex, hexlify +from datetime import datetime +from calendar import timegm +from uuid import UUID +from struct import pack + + +from ... import SEQUENCE_TYPES, STRING_TYPES, NUMERIC_TYPES, INTEGER_TYPES +from .formatters import format_time, format_ad_timestamp +from ...utils.conv import to_raw, to_unicode, ldap_escape_to_bytes, escape_bytes + +# Validators return True if value is valid, False if value is not valid, +# or a value different from True and False that is a valid value to substitute to the input value + + +def check_backslash(value): + if isinstance(value, (bytearray, bytes)): + if b'\\' in value: + value = value.replace(b'\\', b'\\5C') + elif isinstance(value, STRING_TYPES): + if '\\' in value: + value = value.replace('\\', '\\5C') + return value + + +def check_type(input_value, value_type): + if isinstance(input_value, value_type): + return True + + if isinstance(input_value, SEQUENCE_TYPES): + for value in input_value: + if not isinstance(value, value_type): + return False + return True + + return False + + +# noinspection PyUnusedLocal +def always_valid(input_value): + return True + + +def validate_generic_single_value(input_value): + if not isinstance(input_value, SEQUENCE_TYPES): + return True + + try: # object couldn't have a __len__ method + if len(input_value) == 1: + return True + except Exception: + pass + + return False + + +def validate_zero_and_minus_one_and_positive_int(input_value): + """Accept -1 and 0 only (used by pwdLastSet in AD) + """ + if not isinstance(input_value, SEQUENCE_TYPES): + if isinstance(input_value, NUMERIC_TYPES) or isinstance(input_value, STRING_TYPES): + return True if int(input_value) >= -1 else False + return False + else: + if len(input_value) == 1 and (isinstance(input_value[0], NUMERIC_TYPES) or isinstance(input_value[0], STRING_TYPES)): + return True if int(input_value[0]) >= -1 else False + + return False + + +def validate_integer(input_value): + if check_type(input_value, (float, bool)): + return False + if check_type(input_value, INTEGER_TYPES): + return True + + if not isinstance(input_value, SEQUENCE_TYPES): + sequence = False + input_value = [input_value] + else: + sequence = True # indicates if a sequence must be returned + + valid_values = [] # builds a list of valid int values + from decimal import Decimal, InvalidOperation + for element in input_value: + try: #try to convert any type to int, an invalid conversion raise TypeError or ValueError, doublecheck with Decimal type, if both are valid and equal then then int() value is used + value = to_unicode(element) if isinstance(element, bytes) else element + decimal_value = Decimal(value) + int_value = int(value) + if decimal_value == int_value: + valid_values.append(int_value) + else: + return False + except (ValueError, TypeError, InvalidOperation): + return False + + if sequence: + return valid_values + else: + return valid_values[0] + + +def validate_bytes(input_value): + return check_type(input_value, bytes) + + +def validate_boolean(input_value): + # it could be a real bool or the string TRUE or FALSE, # only a single valued is allowed + if validate_generic_single_value(input_value): # valid only if a single value or a sequence with a single element + if isinstance(input_value, SEQUENCE_TYPES): + input_value = input_value[0] + if isinstance(input_value, bool): + if input_value: + return 'TRUE' + else: + return 'FALSE' + if str is not bytes and isinstance(input_value, bytes): # python3 try to converts bytes to string + input_value = to_unicode(input_value) + if isinstance(input_value, STRING_TYPES): + if input_value.lower() == 'true': + return 'TRUE' + elif input_value.lower() == 'false': + return 'FALSE' + return False + + +def validate_time_with_0_year(input_value): + # validates generalized time but accept a 0000 year too + # if datetime object doesn't have a timezone it's considered local time and is adjusted to UTC + if not isinstance(input_value, SEQUENCE_TYPES): + sequence = False + input_value = [input_value] + else: + sequence = True # indicates if a sequence must be returned + + valid_values = [] + changed = False + for element in input_value: + if str is not bytes and isinstance(element, bytes): # python3 try to converts bytes to string + element = to_unicode(element) + if isinstance(element, STRING_TYPES): # tries to check if it is already be a Generalized Time + if element.startswith('0000') or isinstance(format_time(to_raw(element)), datetime): # valid Generalized Time string + valid_values.append(element) + else: + return False + elif isinstance(element, datetime): + changed = True + if element.tzinfo: # a datetime with a timezone + valid_values.append(element.strftime('%Y%m%d%H%M%S%z')) + else: # datetime without timezone, assumed local and adjusted to UTC + offset = datetime.now() - datetime.utcnow() + valid_values.append((element - offset).strftime('%Y%m%d%H%M%SZ')) + else: + return False + + if changed: + if sequence: + return valid_values + else: + return valid_values[0] + else: + return True + + +def validate_time(input_value): + # if datetime object doesn't have a timezone it's considered local time and is adjusted to UTC + if not isinstance(input_value, SEQUENCE_TYPES): + sequence = False + input_value = [input_value] + else: + sequence = True # indicates if a sequence must be returned + + valid_values = [] + changed = False + for element in input_value: + if str is not bytes and isinstance(element, bytes): # python3 try to converts bytes to string + element = to_unicode(element) + if isinstance(element, STRING_TYPES): # tries to check if it is already be a Generalized Time + if isinstance(format_time(to_raw(element)), datetime): # valid Generalized Time string + valid_values.append(element) + else: + return False + elif isinstance(element, datetime): + changed = True + if element.tzinfo: # a datetime with a timezone + valid_values.append(element.strftime('%Y%m%d%H%M%S%z')) + else: # datetime without timezone, assumed local and adjusted to UTC + offset = datetime.now() - datetime.utcnow() + valid_values.append((element - offset).strftime('%Y%m%d%H%M%SZ')) + else: + return False + + if changed: + if sequence: + return valid_values + else: + return valid_values[0] + else: + return True + + +def validate_ad_timestamp(input_value): + """ + Active Directory stores date/time values as the number of 100-nanosecond intervals + that have elapsed since the 0 hour on January 1, 1601 till the date/time that is being stored. + The time is always stored in Greenwich Mean Time (GMT) in the Active Directory. + """ + if not isinstance(input_value, SEQUENCE_TYPES): + sequence = False + input_value = [input_value] + else: + sequence = True # indicates if a sequence must be returned + + valid_values = [] + changed = False + for element in input_value: + if str is not bytes and isinstance(element, bytes): # python3 try to converts bytes to string + element = to_unicode(element) + if isinstance(element, NUMERIC_TYPES): + if 0 <= element <= 9223372036854775807: # min and max for the AD timestamp starting from 12:00 AM January 1, 1601 + valid_values.append(element) + else: + return False + elif isinstance(element, STRING_TYPES): # tries to check if it is already be a AD timestamp + if isinstance(format_ad_timestamp(to_raw(element)), datetime): # valid Generalized Time string + valid_values.append(element) + else: + return False + elif isinstance(element, datetime): + changed = True + if element.tzinfo: # a datetime with a timezone + valid_values.append(to_raw((timegm(element.utctimetuple()) + 11644473600) * 10000000, encoding='ascii')) + else: # datetime without timezone, assumed local and adjusted to UTC + offset = datetime.now() - datetime.utcnow() + valid_values.append(to_raw((timegm((element - offset).timetuple()) + 11644473600) * 10000000, encoding='ascii')) + else: + return False + + if changed: + if sequence: + return valid_values + else: + return valid_values[0] + else: + return True + + +def validate_ad_timedelta(input_value): + """ + Should be validated like an AD timestamp except that since it is a time + delta, it is stored as a negative number. + """ + if not isinstance(input_value, INTEGER_TYPES) or input_value > 0: + return False + return validate_ad_timestamp(input_value * -1) + + +def validate_guid(input_value): + """ + object guid in uuid format (Novell eDirectory) + """ + if not isinstance(input_value, SEQUENCE_TYPES): + sequence = False + input_value = [input_value] + else: + sequence = True # indicates if a sequence must be returned + + valid_values = [] + changed = False + for element in input_value: + if isinstance(element, STRING_TYPES): + try: + valid_values.append(UUID(element).bytes) + changed = True + except ValueError: # try if the value is an escaped byte sequence + try: + valid_values.append(UUID(element.replace('\\', '')).bytes) + changed = True + continue + except ValueError: + if str is not bytes: # python 3 + pass + else: + valid_values.append(element) + continue + return False + elif isinstance(element, (bytes, bytearray)): # assumes bytes are valid + valid_values.append(element) + else: + return False + + if changed: + valid_values = [check_backslash(value) for value in valid_values] + if sequence: + return valid_values + else: + return valid_values[0] + else: + return True + + +def validate_uuid(input_value): + """ + object entryUUID in uuid format + """ + if not isinstance(input_value, SEQUENCE_TYPES): + sequence = False + input_value = [input_value] + else: + sequence = True # indicates if a sequence must be returned + + valid_values = [] + changed = False + for element in input_value: + if isinstance(element, STRING_TYPES): + try: + valid_values.append(str(UUID(element))) + changed = True + except ValueError: # try if the value is an escaped byte sequence + try: + valid_values.append(str(UUID(element.replace('\\', '')))) + changed = True + continue + except ValueError: + if str is not bytes: # python 3 + pass + else: + valid_values.append(element) + continue + return False + elif isinstance(element, (bytes, bytearray)): # assumes bytes are valid + valid_values.append(element) + else: + return False + + if changed: + valid_values = [check_backslash(value) for value in valid_values] + if sequence: + return valid_values + else: + return valid_values[0] + else: + return True + + +def validate_uuid_le(input_value): + """ + Active Directory stores objectGUID in uuid_le format, follows RFC4122 and MS-DTYP: + "{07039e68-4373-264d-a0a7-07039e684373}": string representation big endian, converted to little endian (with or without brace curles) + "689e030773434d26a7a007039e684373": packet representation, already in little endian + "\68\9e\03\07\73\43\4d\26\a7\a0\07\03\9e\68\43\73": bytes representation, already in little endian + byte sequence: already in little endian + + """ + if not isinstance(input_value, SEQUENCE_TYPES): + sequence = False + input_value = [input_value] + else: + sequence = True # indicates if a sequence must be returned + + valid_values = [] + changed = False + for element in input_value: + error = False + if isinstance(element, STRING_TYPES): + if element[0] == '{' and element[-1] == '}': + try: + valid_values.append(UUID(hex=element).bytes_le) # string representation, value in big endian, converts to little endian + changed = True + except ValueError: + error = True + elif '-' in element: + try: + valid_values.append(UUID(hex=element).bytes_le) # string representation, value in big endian, converts to little endian + changed = True + except ValueError: + error = True + elif '\\' in element: + try: + uuid = UUID(bytes_le=ldap_escape_to_bytes(element)).bytes_le + uuid = escape_bytes(uuid) + valid_values.append(uuid) # byte representation, value in little endian + changed = True + except ValueError: + error = True + elif '-' not in element: # value in little endian + try: + valid_values.append(UUID(bytes_le=a2b_hex(element)).bytes_le) # packet representation, value in little endian, converts to little endian + changed = True + except ValueError: + error = True + if error and str == bytes: # python2 only assume value is bytes and valid + valid_values.append(element) # value is untouched, must be in little endian + elif isinstance(element, (bytes, bytearray)): # assumes bytes are valid uuid + valid_values.append(element) # value is untouched, must be in little endian + else: + return False + + if changed: + valid_values = [check_backslash(value) for value in valid_values] + if sequence: + return valid_values + else: + return valid_values[0] + else: + return True + + +def validate_sid(input_value): + """ + SID= "S-1-" IdentifierAuthority 1*SubAuthority + IdentifierAuthority= IdentifierAuthorityDec / IdentifierAuthorityHex + ; If the identifier authority is < 2^32, the + ; identifier authority is represented as a decimal + ; number + ; If the identifier authority is >= 2^32, + ; the identifier authority is represented in + ; hexadecimal + IdentifierAuthorityDec = 1*10DIGIT + ; IdentifierAuthorityDec, top level authority of a + ; security identifier is represented as a decimal number + IdentifierAuthorityHex = "0x" 12HEXDIG + ; IdentifierAuthorityHex, the top-level authority of a + ; security identifier is represented as a hexadecimal number + SubAuthority= "-" 1*10DIGIT + ; Sub-Authority is always represented as a decimal number + ; No leading "0" characters are allowed when IdentifierAuthority + ; or SubAuthority is represented as a decimal number + ; All hexadecimal digits must be output in string format, + ; pre-pended by "0x" + + Revision (1 byte): An 8-bit unsigned integer that specifies the revision level of the SID. This value MUST be set to 0x01. + SubAuthorityCount (1 byte): An 8-bit unsigned integer that specifies the number of elements in the SubAuthority array. The maximum number of elements allowed is 15. + IdentifierAuthority (6 bytes): A SID_IDENTIFIER_AUTHORITY structure that indicates the authority under which the SID was created. It describes the entity that created the SID. The Identifier Authority value {0,0,0,0,0,5} denotes SIDs created by the NT SID authority. + SubAuthority (variable): A variable length array of unsigned 32-bit integers that uniquely identifies a principal relative to the IdentifierAuthority. Its length is determined by SubAuthorityCount. + + If you have a SID like S-a-b-c-d-e-f-g-... + + Then the bytes are + a (revision) + N (number of dashes minus two) + bbbbbb (six bytes of "b" treated as a 48-bit number in big-endian format) + cccc (four bytes of "c" treated as a 32-bit number in little-endian format) + dddd (four bytes of "d" treated as a 32-bit number in little-endian format) + eeee (four bytes of "e" treated as a 32-bit number in little-endian format) + ffff (four bytes of "f" treated as a 32-bit number in little-endian format) + + """ + if not isinstance(input_value, SEQUENCE_TYPES): + sequence = False + input_value = [input_value] + else: + sequence = True # indicates if a sequence must be returned + + valid_values = [] + changed = False + for element in input_value: + if isinstance(element, STRING_TYPES): + if element.startswith('S-'): + parts = element.split('-') + sid_bytes = pack('<q', int(parts[1]))[0:1] # revision number + sid_bytes += pack('<q', len(parts[3:]))[0:1] # number of sub authorities + if len(parts[2]) <= 10: + sid_bytes += pack('>q', int(parts[2]))[2:] # authority (in dec) + else: + sid_bytes += pack('>q', int(parts[2], 16))[2:] # authority (in hex) + for sub_auth in parts[3:]: + sid_bytes += pack('<q', int(sub_auth))[0:4] # sub-authorities + valid_values.append(sid_bytes) + changed = True + + if changed: + valid_values = [check_backslash(value) for value in valid_values] + if sequence: + return valid_values + else: + return valid_values[0] + else: + return True |