summaryrefslogtreecommitdiff
path: root/netplan/libnetplan.py
diff options
context:
space:
mode:
Diffstat (limited to 'netplan/libnetplan.py')
-rw-r--r--netplan/libnetplan.py281
1 files changed, 237 insertions, 44 deletions
diff --git a/netplan/libnetplan.py b/netplan/libnetplan.py
index aac41b3..797ac86 100644
--- a/netplan/libnetplan.py
+++ b/netplan/libnetplan.py
@@ -16,9 +16,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import tempfile
+import logging
import ctypes
import ctypes.util
-from ctypes import c_char_p, c_void_p, c_int
+from ctypes import c_char_p, c_void_p, c_int, c_uint, c_size_t, c_ssize_t
+from typing import List, Union, IO
class LibNetplanException(Exception):
@@ -42,10 +45,6 @@ class _netplan_net_definition(ctypes.Structure):
lib = ctypes.CDLL(ctypes.util.find_library('netplan'))
-lib.netplan_parse_yaml.argtypes = [ctypes.c_char_p, ctypes.POINTER(ctypes.POINTER(_GError))]
-lib.netplan_get_filename_by_id.restype = ctypes.c_char_p
-lib.process_yaml_hierarchy.argtypes = [ctypes.c_char_p]
-lib.process_yaml_hierarchy.restype = ctypes.c_int
_GErrorPP = ctypes.POINTER(ctypes.POINTER(_GError))
_NetplanParserP = ctypes.POINTER(_netplan_parser)
@@ -55,17 +54,22 @@ _NetplanNetDefinitionP = ctypes.POINTER(_netplan_net_definition)
lib.netplan_get_id_from_nm_filename.restype = ctypes.c_char_p
-def netplan_parse(path):
- # Clear old NetplanNetDefinitions from libnetplan memory
- lib.netplan_clear_netdefs()
- err = ctypes.POINTER(_GError)()
- ret = bool(lib.netplan_parse_yaml(path.encode(), ctypes.byref(err)))
- if not ret:
- raise Exception(err.contents.message.decode('utf-8'))
- lib.netplan_finish_parse(ctypes.byref(err))
- if err:
- raise Exception(err.contents.message.decode('utf-8'))
- return True
+def _string_realloc_call_no_error(function):
+ size = 16
+ while size < 1073741824: # 1MB
+ buffer = ctypes.create_string_buffer(size)
+ code = function(buffer)
+ if code == -2:
+ size = size * 2
+ continue
+
+ if code < 0: # pragma: nocover
+ raise LibNetplanException("Unknown error: %d" % code)
+ elif code == 0:
+ return None # pragma: nocover as it's hard to trigger for now
+ else:
+ return buffer.value.decode('utf-8')
+ raise LibNetplanException('Aborting due to string buffer size > 1M') # pragma: nocover
def _checked_lib_call(fn, *args):
@@ -75,11 +79,6 @@ def _checked_lib_call(fn, *args):
raise LibNetplanException(err.contents.message.decode('utf-8'))
-def netplan_get_filename_by_id(netdef_id, rootdir):
- res = lib.netplan_get_filename_by_id(netdef_id.encode(), rootdir.encode())
- return res.decode('utf-8') if res else None
-
-
class Parser:
_abi_loaded = False
@@ -94,6 +93,12 @@ class Parser:
lib.netplan_parser_load_yaml.argtypes = [_NetplanParserP, c_char_p, _GErrorPP]
lib.netplan_parser_load_yaml.restype = c_int
+ lib.netplan_parser_load_yaml_from_fd.argtypes = [_NetplanParserP, c_int, _GErrorPP]
+ lib.netplan_parser_load_yaml_from_fd.restype = c_int
+
+ lib.netplan_parser_load_nullable_fields.argtypes = [_NetplanParserP, c_int, _GErrorPP]
+ lib.netplan_parser_load_nullable_fields.restype = c_int
+
cls._abi_loaded = True
def __init__(self):
@@ -103,12 +108,18 @@ class Parser:
def __del__(self):
lib.netplan_parser_clear(ctypes.byref(self._ptr))
- def load_yaml(self, filename):
- _checked_lib_call(lib.netplan_parser_load_yaml, self._ptr, filename.encode('utf-8'))
+ def load_yaml(self, input_file: Union[str, IO]):
+ if isinstance(input_file, str):
+ _checked_lib_call(lib.netplan_parser_load_yaml, self._ptr, input_file.encode('utf-8'))
+ else:
+ _checked_lib_call(lib.netplan_parser_load_yaml_from_fd, self._ptr, input_file.fileno())
def load_yaml_hierarchy(self, rootdir):
_checked_lib_call(lib.netplan_parser_load_yaml_hierarchy, self._ptr, rootdir.encode('utf-8'))
+ def load_nullable_fields(self, input_file: IO):
+ _checked_lib_call(lib.netplan_parser_load_nullable_fields, self._ptr, input_file.fileno())
+
class State:
_abi_loaded = False
@@ -130,6 +141,12 @@ class State:
lib.netplan_state_get_netdef.argtypes = [_NetplanStateP, c_char_p]
lib.netplan_state_get_netdef.restype = _NetplanNetDefinitionP
+ lib.netplan_state_write_yaml_file.argtypes = [_NetplanStateP, c_char_p, c_char_p, _GErrorPP]
+ lib.netplan_state_write_yaml_file.restype = c_int
+
+ lib.netplan_state_update_yaml_hierarchy.argtypes = [_NetplanStateP, c_char_p, c_char_p, _GErrorPP]
+ lib.netplan_state_update_yaml_hierarchy.restype = c_int
+
lib.netplan_state_dump_yaml.argtypes = [_NetplanStateP, c_int, _GErrorPP]
lib.netplan_state_dump_yaml.restype = c_int
@@ -139,6 +156,9 @@ class State:
lib.netplan_netdef_get_delay_virtual_functions_rebind.argtypes = [_NetplanNetDefinitionP]
lib.netplan_netdef_get_delay_virtual_functions_rebind.restype = c_int
+ lib.netplan_state_get_backend.argtypes = [_NetplanStateP]
+ lib.netplan_state_get_backend.restype = c_int
+
cls._abi_loaded = True
def __init__(self):
@@ -151,6 +171,16 @@ class State:
def import_parser_results(self, parser):
_checked_lib_call(lib.netplan_state_import_parser_results, self._ptr, parser._ptr)
+ def write_yaml_file(self, filename, rootdir):
+ name = filename.encode('utf-8') if filename else None
+ root = rootdir.encode('utf-8') if rootdir else None
+ _checked_lib_call(lib.netplan_state_write_yaml_file, self._ptr, name, root)
+
+ def update_yaml_hierarchy(self, default_filename, rootdir):
+ name = default_filename.encode('utf-8')
+ root = rootdir.encode('utf-8') if rootdir else None
+ _checked_lib_call(lib.netplan_state_update_yaml_hierarchy, self._ptr, name, root)
+
def dump_yaml(self, output_file):
fd = output_file.fileno()
_checked_lib_call(lib.netplan_state_dump_yaml, self._ptr, fd)
@@ -164,6 +194,60 @@ class State:
raise IndexError()
return NetDefinition(self, ptr)
+ @property
+ def all_defs(self):
+ return dict((nd.id, nd) for nd in _NetdefIterator(self, None))
+
+ @property
+ def ethernets(self):
+ return dict((nd.id, nd) for nd in _NetdefIterator(self, "ethernets"))
+
+ @property
+ def modems(self):
+ return dict((nd.id, nd) for nd in _NetdefIterator(self, "modems"))
+
+ @property
+ def wifis(self):
+ return dict((nd.id, nd) for nd in _NetdefIterator(self, "wifis"))
+
+ @property
+ def vlans(self):
+ return dict((nd.id, nd) for nd in _NetdefIterator(self, "vlans"))
+
+ @property
+ def bridges(self):
+ return dict((nd.id, nd) for nd in _NetdefIterator(self, "bridges"))
+
+ @property
+ def bonds(self):
+ return dict((nd.id, nd) for nd in _NetdefIterator(self, "bonds"))
+
+ @property
+ def tunnels(self):
+ return dict((nd.id, nd) for nd in _NetdefIterator(self, "tunnels"))
+
+ @property
+ def vrfs(self):
+ return dict((nd.id, nd) for nd in _NetdefIterator(self, "vrfs"))
+
+ @property
+ def ovs_ports(self):
+ return dict((nd.id, nd) for nd in _NetdefIterator(self, "_ovs-ports"))
+
+ @property
+ def nm_devices(self):
+ return dict((nd.id, nd) for nd in _NetdefIterator(self, "nm-devices"))
+
+ @property
+ def backend(self):
+ return lib.netplan_backend_name(lib.netplan_state_get_backend(self._ptr)).decode('utf-8')
+
+ def dump_to_logs(self):
+ # Convoluted way to dump the parsed config to the logs...
+ with tempfile.TemporaryFile() as tmp:
+ self.dump_yaml(output_file=tmp)
+ logging.debug("Merged config:\n{}".format(tmp.read()))
+
class NetDefinition:
_abi_loaded = False
@@ -173,8 +257,53 @@ class NetDefinition:
if cls._abi_loaded:
return
- lib.netplan_netdef_get_id.argtypes = [_NetplanNetDefinitionP]
- lib.netplan_netdef_get_id.restype = c_char_p
+ lib.netplan_netdef_has_match.argtypes = [_NetplanNetDefinitionP]
+ lib.netplan_netdef_has_match.restype = c_int
+
+ lib.netplan_netdef_get_id.argtypes = [_NetplanNetDefinitionP, c_char_p, c_size_t]
+ lib.netplan_netdef_get_id.restype = c_ssize_t
+
+ lib.netplan_netdef_get_filepath.argtypes = [_NetplanNetDefinitionP, c_char_p, c_size_t]
+ lib.netplan_netdef_get_filepath.restype = c_ssize_t
+
+ lib.netplan_netdef_get_backend.argtypes = [_NetplanNetDefinitionP]
+ lib.netplan_netdef_get_backend.restype = c_int
+
+ lib.netplan_netdef_get_type.argtypes = [_NetplanNetDefinitionP]
+ lib.netplan_netdef_get_type.restype = c_int
+
+ lib.netplan_netdef_get_set_name.argtypes = [_NetplanNetDefinitionP, c_char_p, c_size_t]
+ lib.netplan_netdef_get_set_name.restype = c_ssize_t
+
+ lib._netplan_netdef_get_critical.argtypes = [_NetplanNetDefinitionP]
+ lib._netplan_netdef_get_critical.restype = c_int
+
+ lib._netplan_netdef_get_sriov_link.argtypes = [_NetplanNetDefinitionP]
+ lib._netplan_netdef_get_sriov_link.restype = _NetplanNetDefinitionP
+
+ lib._netplan_netdef_get_vlan_link.argtypes = [_NetplanNetDefinitionP]
+ lib._netplan_netdef_get_vlan_link.restype = _NetplanNetDefinitionP
+
+ lib._netplan_netdef_get_vlan_id.argtypes = [_NetplanNetDefinitionP]
+ lib._netplan_netdef_get_vlan_id.restype = c_uint
+
+ lib._netplan_netdef_get_sriov_vlan_filter.argtypes = [_NetplanNetDefinitionP]
+ lib._netplan_netdef_get_sriov_vlan_filter.restype = c_int
+
+ lib.netplan_netdef_match_interface.argtypes = [_NetplanNetDefinitionP]
+ lib.netplan_netdef_match_interface.restype = c_int
+
+ lib.netplan_backend_name.argtypes = [c_int]
+ lib.netplan_backend_name.restype = c_char_p
+
+ lib.netplan_def_type_name.argtypes = [c_int]
+ lib.netplan_def_type_name.restype = c_char_p
+
+ lib._netplan_state_get_vf_count_for_def.argtypes = [_NetplanStateP, _NetplanNetDefinitionP, _GErrorPP]
+ lib._netplan_state_get_vf_count_for_def.restype = c_int
+
+ lib._netplan_netdef_is_trivial_compound_itf.argtypes = [_NetplanNetDefinitionP]
+ lib._netplan_netdef_is_trivial_compound_itf.restype = c_int
cls._abi_loaded = True
@@ -191,8 +320,58 @@ class NetDefinition:
self._parent = np_state
@property
+ def has_match(self):
+ return bool(lib.netplan_netdef_has_match(self._ptr))
+
+ @property
+ def set_name(self):
+ return _string_realloc_call_no_error(lambda b: lib.netplan_netdef_get_set_name(self._ptr, b, len(b)))
+
+ @property
+ def critical(self):
+ return bool(lib._netplan_netdef_get_critical(self._ptr))
+
+ @property
+ def sriov_link(self):
+ link_ptr = lib._netplan_netdef_get_sriov_link(self._ptr)
+ if link_ptr:
+ return NetDefinition(self._parent, link_ptr)
+ return None
+
+ @property
+ def vlan_link(self):
+ link_ptr = lib._netplan_netdef_get_vlan_link(self._ptr)
+ if link_ptr:
+ return NetDefinition(self._parent, link_ptr)
+ return None
+
+ @property
+ def vlan_id(self):
+ vlan_id = lib._netplan_netdef_get_vlan_id(self._ptr)
+ # No easy way to get UINT_MAX besides this...
+ if vlan_id == c_uint(-1).value:
+ return None
+ return vlan_id
+
+ @property
+ def has_sriov_vlan_filter(self):
+ return bool(lib._netplan_netdef_get_sriov_vlan_filter(self._ptr))
+
+ @property
+ def backend(self):
+ return lib.netplan_backend_name(lib.netplan_netdef_get_backend(self._ptr)).decode('utf-8')
+
+ @property
+ def type(self):
+ return lib.netplan_def_type_name(lib.netplan_netdef_get_type(self._ptr)).decode('utf-8')
+
+ @property
def id(self):
- return lib.netplan_netdef_get_id(self._ptr).decode('utf-8')
+ return _string_realloc_call_no_error(lambda b: lib.netplan_netdef_get_id(self._ptr, b, len(b)))
+
+ @property
+ def filepath(self):
+ return _string_realloc_call_no_error(lambda b: lib.netplan_netdef_get_filepath(self._ptr, b, len(b)))
@property
def embedded_switch_mode(self):
@@ -203,6 +382,29 @@ class NetDefinition:
def delay_virtual_functions_rebind(self):
return bool(lib.netplan_netdef_get_delay_virtual_functions_rebind(self._ptr))
+ def match_interface(self, itf_name=None, itf_driver=None, itf_mac=None):
+ return bool(lib.netplan_netdef_match_interface(
+ self._ptr,
+ itf_name and itf_name.encode('utf-8'),
+ itf_mac and itf_mac.encode('utf-8'),
+ itf_driver and itf_driver.encode('utf-8')))
+
+ @property
+ def vf_count(self):
+ err = ctypes.POINTER(_GError)()
+ count = lib._netplan_state_get_vf_count_for_def(self._parent._ptr, self._ptr, ctypes.byref(err))
+ if count < 0:
+ raise LibNetplanException(err.contents.message.decode('utf-8'))
+ return count
+
+ @property
+ def is_trivial_compound_itf(self):
+ '''
+ Returns True if the interface is a compound interface (bond or bridge),
+ and its configuration is trivial, without any variation from the defaults.
+ '''
+ return bool(lib._netplan_netdef_is_trivial_compound_itf(self._ptr))
+
class _NetdefIterator:
_abi_loaded = False
@@ -250,29 +452,20 @@ class _NetdefIterator:
return NetDefinition(self.np_state, next_value)
-class __GlobalState(State):
- def __init__(self):
- self._ptr = ctypes.cast(lib.global_state, _NetplanStateP)
-
- def __del__(self):
- pass
-
-
-def netplan_get_ids_for_devtype(devtype, rootdir):
- err = ctypes.POINTER(_GError)()
- lib.netplan_clear_netdefs()
- lib.process_yaml_hierarchy(rootdir.encode('utf-8'))
- lib.netplan_finish_parse(ctypes.byref(err))
- if err: # pragma: nocover (this is a "break in case of emergency" thing)
- raise Exception(err.contents.message.decode('utf-8'))
- nds = list(_NetdefIterator(__GlobalState(), devtype))
- return [nd.id for nd in nds]
-
+lib.netplan_util_create_yaml_patch.argtypes = [c_char_p, c_char_p, c_int, _GErrorPP]
+lib.netplan_util_create_yaml_patch.restype = c_int
lib.netplan_util_dump_yaml_subtree.argtypes = [c_char_p, c_int, c_int, _GErrorPP]
lib.netplan_util_dump_yaml_subtree.restype = c_int
+def create_yaml_patch(patch_object_path: List[str], patch_payload: str, patch_output):
+ _checked_lib_call(lib.netplan_util_create_yaml_patch,
+ '\t'.join(patch_object_path).encode('utf-8'),
+ patch_payload.encode('utf-8'),
+ patch_output.fileno())
+
+
def dump_yaml_subtree(prefix, input_file, output_file):
_checked_lib_call(lib.netplan_util_dump_yaml_subtree,
prefix.encode('utf-8'),