diff options
Diffstat (limited to 'netplan/libnetplan.py')
-rw-r--r-- | netplan/libnetplan.py | 281 |
1 files changed, 237 insertions, 44 deletions
diff --git a/netplan/libnetplan.py b/netplan/libnetplan.py index aac41b3..797ac86 100644 --- a/netplan/libnetplan.py +++ b/netplan/libnetplan.py @@ -16,9 +16,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import tempfile +import logging import ctypes import ctypes.util -from ctypes import c_char_p, c_void_p, c_int +from ctypes import c_char_p, c_void_p, c_int, c_uint, c_size_t, c_ssize_t +from typing import List, Union, IO class LibNetplanException(Exception): @@ -42,10 +45,6 @@ class _netplan_net_definition(ctypes.Structure): lib = ctypes.CDLL(ctypes.util.find_library('netplan')) -lib.netplan_parse_yaml.argtypes = [ctypes.c_char_p, ctypes.POINTER(ctypes.POINTER(_GError))] -lib.netplan_get_filename_by_id.restype = ctypes.c_char_p -lib.process_yaml_hierarchy.argtypes = [ctypes.c_char_p] -lib.process_yaml_hierarchy.restype = ctypes.c_int _GErrorPP = ctypes.POINTER(ctypes.POINTER(_GError)) _NetplanParserP = ctypes.POINTER(_netplan_parser) @@ -55,17 +54,22 @@ _NetplanNetDefinitionP = ctypes.POINTER(_netplan_net_definition) lib.netplan_get_id_from_nm_filename.restype = ctypes.c_char_p -def netplan_parse(path): - # Clear old NetplanNetDefinitions from libnetplan memory - lib.netplan_clear_netdefs() - err = ctypes.POINTER(_GError)() - ret = bool(lib.netplan_parse_yaml(path.encode(), ctypes.byref(err))) - if not ret: - raise Exception(err.contents.message.decode('utf-8')) - lib.netplan_finish_parse(ctypes.byref(err)) - if err: - raise Exception(err.contents.message.decode('utf-8')) - return True +def _string_realloc_call_no_error(function): + size = 16 + while size < 1073741824: # 1MB + buffer = ctypes.create_string_buffer(size) + code = function(buffer) + if code == -2: + size = size * 2 + continue + + if code < 0: # pragma: nocover + raise LibNetplanException("Unknown error: %d" % code) + elif code == 0: + return None # pragma: nocover as it's hard to trigger for now + else: + return buffer.value.decode('utf-8') + raise LibNetplanException('Aborting due to string buffer size > 1M') # pragma: nocover def _checked_lib_call(fn, *args): @@ -75,11 +79,6 @@ def _checked_lib_call(fn, *args): raise LibNetplanException(err.contents.message.decode('utf-8')) -def netplan_get_filename_by_id(netdef_id, rootdir): - res = lib.netplan_get_filename_by_id(netdef_id.encode(), rootdir.encode()) - return res.decode('utf-8') if res else None - - class Parser: _abi_loaded = False @@ -94,6 +93,12 @@ class Parser: lib.netplan_parser_load_yaml.argtypes = [_NetplanParserP, c_char_p, _GErrorPP] lib.netplan_parser_load_yaml.restype = c_int + lib.netplan_parser_load_yaml_from_fd.argtypes = [_NetplanParserP, c_int, _GErrorPP] + lib.netplan_parser_load_yaml_from_fd.restype = c_int + + lib.netplan_parser_load_nullable_fields.argtypes = [_NetplanParserP, c_int, _GErrorPP] + lib.netplan_parser_load_nullable_fields.restype = c_int + cls._abi_loaded = True def __init__(self): @@ -103,12 +108,18 @@ class Parser: def __del__(self): lib.netplan_parser_clear(ctypes.byref(self._ptr)) - def load_yaml(self, filename): - _checked_lib_call(lib.netplan_parser_load_yaml, self._ptr, filename.encode('utf-8')) + def load_yaml(self, input_file: Union[str, IO]): + if isinstance(input_file, str): + _checked_lib_call(lib.netplan_parser_load_yaml, self._ptr, input_file.encode('utf-8')) + else: + _checked_lib_call(lib.netplan_parser_load_yaml_from_fd, self._ptr, input_file.fileno()) def load_yaml_hierarchy(self, rootdir): _checked_lib_call(lib.netplan_parser_load_yaml_hierarchy, self._ptr, rootdir.encode('utf-8')) + def load_nullable_fields(self, input_file: IO): + _checked_lib_call(lib.netplan_parser_load_nullable_fields, self._ptr, input_file.fileno()) + class State: _abi_loaded = False @@ -130,6 +141,12 @@ class State: lib.netplan_state_get_netdef.argtypes = [_NetplanStateP, c_char_p] lib.netplan_state_get_netdef.restype = _NetplanNetDefinitionP + lib.netplan_state_write_yaml_file.argtypes = [_NetplanStateP, c_char_p, c_char_p, _GErrorPP] + lib.netplan_state_write_yaml_file.restype = c_int + + lib.netplan_state_update_yaml_hierarchy.argtypes = [_NetplanStateP, c_char_p, c_char_p, _GErrorPP] + lib.netplan_state_update_yaml_hierarchy.restype = c_int + lib.netplan_state_dump_yaml.argtypes = [_NetplanStateP, c_int, _GErrorPP] lib.netplan_state_dump_yaml.restype = c_int @@ -139,6 +156,9 @@ class State: lib.netplan_netdef_get_delay_virtual_functions_rebind.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_get_delay_virtual_functions_rebind.restype = c_int + lib.netplan_state_get_backend.argtypes = [_NetplanStateP] + lib.netplan_state_get_backend.restype = c_int + cls._abi_loaded = True def __init__(self): @@ -151,6 +171,16 @@ class State: def import_parser_results(self, parser): _checked_lib_call(lib.netplan_state_import_parser_results, self._ptr, parser._ptr) + def write_yaml_file(self, filename, rootdir): + name = filename.encode('utf-8') if filename else None + root = rootdir.encode('utf-8') if rootdir else None + _checked_lib_call(lib.netplan_state_write_yaml_file, self._ptr, name, root) + + def update_yaml_hierarchy(self, default_filename, rootdir): + name = default_filename.encode('utf-8') + root = rootdir.encode('utf-8') if rootdir else None + _checked_lib_call(lib.netplan_state_update_yaml_hierarchy, self._ptr, name, root) + def dump_yaml(self, output_file): fd = output_file.fileno() _checked_lib_call(lib.netplan_state_dump_yaml, self._ptr, fd) @@ -164,6 +194,60 @@ class State: raise IndexError() return NetDefinition(self, ptr) + @property + def all_defs(self): + return dict((nd.id, nd) for nd in _NetdefIterator(self, None)) + + @property + def ethernets(self): + return dict((nd.id, nd) for nd in _NetdefIterator(self, "ethernets")) + + @property + def modems(self): + return dict((nd.id, nd) for nd in _NetdefIterator(self, "modems")) + + @property + def wifis(self): + return dict((nd.id, nd) for nd in _NetdefIterator(self, "wifis")) + + @property + def vlans(self): + return dict((nd.id, nd) for nd in _NetdefIterator(self, "vlans")) + + @property + def bridges(self): + return dict((nd.id, nd) for nd in _NetdefIterator(self, "bridges")) + + @property + def bonds(self): + return dict((nd.id, nd) for nd in _NetdefIterator(self, "bonds")) + + @property + def tunnels(self): + return dict((nd.id, nd) for nd in _NetdefIterator(self, "tunnels")) + + @property + def vrfs(self): + return dict((nd.id, nd) for nd in _NetdefIterator(self, "vrfs")) + + @property + def ovs_ports(self): + return dict((nd.id, nd) for nd in _NetdefIterator(self, "_ovs-ports")) + + @property + def nm_devices(self): + return dict((nd.id, nd) for nd in _NetdefIterator(self, "nm-devices")) + + @property + def backend(self): + return lib.netplan_backend_name(lib.netplan_state_get_backend(self._ptr)).decode('utf-8') + + def dump_to_logs(self): + # Convoluted way to dump the parsed config to the logs... + with tempfile.TemporaryFile() as tmp: + self.dump_yaml(output_file=tmp) + logging.debug("Merged config:\n{}".format(tmp.read())) + class NetDefinition: _abi_loaded = False @@ -173,8 +257,53 @@ class NetDefinition: if cls._abi_loaded: return - lib.netplan_netdef_get_id.argtypes = [_NetplanNetDefinitionP] - lib.netplan_netdef_get_id.restype = c_char_p + lib.netplan_netdef_has_match.argtypes = [_NetplanNetDefinitionP] + lib.netplan_netdef_has_match.restype = c_int + + lib.netplan_netdef_get_id.argtypes = [_NetplanNetDefinitionP, c_char_p, c_size_t] + lib.netplan_netdef_get_id.restype = c_ssize_t + + lib.netplan_netdef_get_filepath.argtypes = [_NetplanNetDefinitionP, c_char_p, c_size_t] + lib.netplan_netdef_get_filepath.restype = c_ssize_t + + lib.netplan_netdef_get_backend.argtypes = [_NetplanNetDefinitionP] + lib.netplan_netdef_get_backend.restype = c_int + + lib.netplan_netdef_get_type.argtypes = [_NetplanNetDefinitionP] + lib.netplan_netdef_get_type.restype = c_int + + lib.netplan_netdef_get_set_name.argtypes = [_NetplanNetDefinitionP, c_char_p, c_size_t] + lib.netplan_netdef_get_set_name.restype = c_ssize_t + + lib._netplan_netdef_get_critical.argtypes = [_NetplanNetDefinitionP] + lib._netplan_netdef_get_critical.restype = c_int + + lib._netplan_netdef_get_sriov_link.argtypes = [_NetplanNetDefinitionP] + lib._netplan_netdef_get_sriov_link.restype = _NetplanNetDefinitionP + + lib._netplan_netdef_get_vlan_link.argtypes = [_NetplanNetDefinitionP] + lib._netplan_netdef_get_vlan_link.restype = _NetplanNetDefinitionP + + lib._netplan_netdef_get_vlan_id.argtypes = [_NetplanNetDefinitionP] + lib._netplan_netdef_get_vlan_id.restype = c_uint + + lib._netplan_netdef_get_sriov_vlan_filter.argtypes = [_NetplanNetDefinitionP] + lib._netplan_netdef_get_sriov_vlan_filter.restype = c_int + + lib.netplan_netdef_match_interface.argtypes = [_NetplanNetDefinitionP] + lib.netplan_netdef_match_interface.restype = c_int + + lib.netplan_backend_name.argtypes = [c_int] + lib.netplan_backend_name.restype = c_char_p + + lib.netplan_def_type_name.argtypes = [c_int] + lib.netplan_def_type_name.restype = c_char_p + + lib._netplan_state_get_vf_count_for_def.argtypes = [_NetplanStateP, _NetplanNetDefinitionP, _GErrorPP] + lib._netplan_state_get_vf_count_for_def.restype = c_int + + lib._netplan_netdef_is_trivial_compound_itf.argtypes = [_NetplanNetDefinitionP] + lib._netplan_netdef_is_trivial_compound_itf.restype = c_int cls._abi_loaded = True @@ -191,8 +320,58 @@ class NetDefinition: self._parent = np_state @property + def has_match(self): + return bool(lib.netplan_netdef_has_match(self._ptr)) + + @property + def set_name(self): + return _string_realloc_call_no_error(lambda b: lib.netplan_netdef_get_set_name(self._ptr, b, len(b))) + + @property + def critical(self): + return bool(lib._netplan_netdef_get_critical(self._ptr)) + + @property + def sriov_link(self): + link_ptr = lib._netplan_netdef_get_sriov_link(self._ptr) + if link_ptr: + return NetDefinition(self._parent, link_ptr) + return None + + @property + def vlan_link(self): + link_ptr = lib._netplan_netdef_get_vlan_link(self._ptr) + if link_ptr: + return NetDefinition(self._parent, link_ptr) + return None + + @property + def vlan_id(self): + vlan_id = lib._netplan_netdef_get_vlan_id(self._ptr) + # No easy way to get UINT_MAX besides this... + if vlan_id == c_uint(-1).value: + return None + return vlan_id + + @property + def has_sriov_vlan_filter(self): + return bool(lib._netplan_netdef_get_sriov_vlan_filter(self._ptr)) + + @property + def backend(self): + return lib.netplan_backend_name(lib.netplan_netdef_get_backend(self._ptr)).decode('utf-8') + + @property + def type(self): + return lib.netplan_def_type_name(lib.netplan_netdef_get_type(self._ptr)).decode('utf-8') + + @property def id(self): - return lib.netplan_netdef_get_id(self._ptr).decode('utf-8') + return _string_realloc_call_no_error(lambda b: lib.netplan_netdef_get_id(self._ptr, b, len(b))) + + @property + def filepath(self): + return _string_realloc_call_no_error(lambda b: lib.netplan_netdef_get_filepath(self._ptr, b, len(b))) @property def embedded_switch_mode(self): @@ -203,6 +382,29 @@ class NetDefinition: def delay_virtual_functions_rebind(self): return bool(lib.netplan_netdef_get_delay_virtual_functions_rebind(self._ptr)) + def match_interface(self, itf_name=None, itf_driver=None, itf_mac=None): + return bool(lib.netplan_netdef_match_interface( + self._ptr, + itf_name and itf_name.encode('utf-8'), + itf_mac and itf_mac.encode('utf-8'), + itf_driver and itf_driver.encode('utf-8'))) + + @property + def vf_count(self): + err = ctypes.POINTER(_GError)() + count = lib._netplan_state_get_vf_count_for_def(self._parent._ptr, self._ptr, ctypes.byref(err)) + if count < 0: + raise LibNetplanException(err.contents.message.decode('utf-8')) + return count + + @property + def is_trivial_compound_itf(self): + ''' + Returns True if the interface is a compound interface (bond or bridge), + and its configuration is trivial, without any variation from the defaults. + ''' + return bool(lib._netplan_netdef_is_trivial_compound_itf(self._ptr)) + class _NetdefIterator: _abi_loaded = False @@ -250,29 +452,20 @@ class _NetdefIterator: return NetDefinition(self.np_state, next_value) -class __GlobalState(State): - def __init__(self): - self._ptr = ctypes.cast(lib.global_state, _NetplanStateP) - - def __del__(self): - pass - - -def netplan_get_ids_for_devtype(devtype, rootdir): - err = ctypes.POINTER(_GError)() - lib.netplan_clear_netdefs() - lib.process_yaml_hierarchy(rootdir.encode('utf-8')) - lib.netplan_finish_parse(ctypes.byref(err)) - if err: # pragma: nocover (this is a "break in case of emergency" thing) - raise Exception(err.contents.message.decode('utf-8')) - nds = list(_NetdefIterator(__GlobalState(), devtype)) - return [nd.id for nd in nds] - +lib.netplan_util_create_yaml_patch.argtypes = [c_char_p, c_char_p, c_int, _GErrorPP] +lib.netplan_util_create_yaml_patch.restype = c_int lib.netplan_util_dump_yaml_subtree.argtypes = [c_char_p, c_int, c_int, _GErrorPP] lib.netplan_util_dump_yaml_subtree.restype = c_int +def create_yaml_patch(patch_object_path: List[str], patch_payload: str, patch_output): + _checked_lib_call(lib.netplan_util_create_yaml_patch, + '\t'.join(patch_object_path).encode('utf-8'), + patch_payload.encode('utf-8'), + patch_output.fileno()) + + def dump_yaml_subtree(prefix, input_file, output_file): _checked_lib_call(lib.netplan_util_dump_yaml_subtree, prefix.encode('utf-8'), |