#!/usr/bin/python3 import argparse import configparser import contextlib import ctypes, ctypes.util import crypt import hashlib import os import platform import shutil import subprocess import sys import tempfile import time import uuid from enum import Enum # TODO # - squashfs root # - volatile images # - make debian/ubuntu images bootable # - work on device nodes # - allow passing env vars # - rework cache management to use mkosi.cache by default in the project dir class OutputFormat(Enum): raw_gpt = 1 raw_btrfs = 2 directory = 3 subvolume = 4 tar = 5 class Distribution(Enum): fedora = 1 debian = 2 ubuntu = 3 arch = 4 GPT_ROOT_X86 = uuid.UUID("44479540f29741b29af7d131d5f0458a") GPT_ROOT_X86_64 = uuid.UUID("4f68bce3e8cd4db196e7fbcaf984b709") GPT_ROOT_ARM = uuid.UUID("69dad7102ce44e3cb16c21a1d49abed3") GPT_ROOT_ARM_64 = uuid.UUID("b921b0451df041c3af444c6f280d3fae") GPT_ROOT_IA64 = uuid.UUID("993d8d3df80e4225855a9daf8ed7ea97") GPT_ESP = uuid.UUID("c12a7328f81f11d2ba4b00a0c93ec93b") GPT_SWAP = uuid.UUID("0657fd6da4ab43c484e50933c84b4f4f") GPT_HOME = uuid.UUID("933ac7e12eb44f13b8440e14e2aef915") GPT_SRV = uuid.UUID("3b8f842520e04f3b907f1a25a76f98e8") if platform.machine() == "x86_64": GPT_ROOT_NATIVE = GPT_ROOT_X86_64 elif platform.machine() == "aarch64": GPT_ROOT_NATIVE = GPT_ROOT_ARM_64 else: sys.stderr.write("Don't known the %s architecture.\n" % platform.machine()) sys.exit(1) CLONE_NEWNS = 0x00020000 def unshare(flags): libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True) if libc.unshare(ctypes.c_int(flags)) != 0: e = ctypes.get_errno() raise OSError(e, os.strerror(e)) def init_namespace(args): print_step("Detaching namespace...") args.original_umask = os.umask(0o000) unshare(CLONE_NEWNS) subprocess.run(["mount", "--make-rslave", "/"], check=True) print_step("Detaching namespace complete.") def print_step(text): sys.stderr.write("‣ \033[0;1;39m" + text + "\033[0m\n") def setup_workspace(args): print_step("Setting up temporary workspace.") if args.output_format in (OutputFormat.directory, OutputFormat.subvolume): d = tempfile.TemporaryDirectory(dir=os.path.dirname(args.output), prefix='.mkosi-') else: d = tempfile.TemporaryDirectory(dir='/var/tmp', prefix='mkosi-') print_step("Temporary workspace in " + d.name + " is now set up.") return d def btrfs_subvol_create(path, mode=0o755): m = os.umask(~mode & 0o7777) subprocess.run(["btrfs", "subvol", "create", path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) os.umask(m) def btrfs_subvol_delete(path, mode=0o755): subprocess.run(["btrfs", "subvol", "delete", path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) def btrfs_subvol_make_ro(path, b=True): subprocess.run(["btrfs", "property", "set", path, "ro", "true" if b else "false"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) def image_size(args): size = args.root_size if args.home_size is not None: size += args.home_size if args.srv_size is not None: size += args.srv_size if args.bootable: size += args.esp_size if args.swap_size is not None: size += args.swap_size return size def create_image(args, workspace): if not args.output_format in (OutputFormat.raw_gpt, OutputFormat.raw_btrfs): return None print_step("Creating partition table...") f = tempfile.NamedTemporaryFile(dir = os.path.dirname(args.output), prefix='.mkosi-') subprocess.run(["chattr", "+C", f.name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) f.truncate(image_size(args)) pn = 1 table = "label: gpt\n" if args.bootable: table += 'size={}, type={}, name="ESP System Partition"\n'.format(str(int(args.esp_size / 512)), GPT_ESP) args.esp_partno = pn pn += 1 else: args.esp_partno = None if args.swap_size is not None: table += 'size={}, type={}, name="Swap Partition"\n'.format(str(int(args.swap_size / 512)), GPT_SWAP) args.swap_partno = pn pn += 1 else: args.swap_partno = None args.home_partno = None args.srv_partno = None if args.output_format != OutputFormat.raw_btrfs: if args.home_size is not None: table += 'size={}, type={}, name="Home Partition"\n'.format(str(int(args.home_size / 512)), GPT_HOME) args.home_partno = pn pn += 1 if args.srv_size is not None: table += 'size={}, type={}, name="Server Data Partition"\n'.format(str(int(args.srv_size / 512)), GPT_SRV) args.srv_partno = pn pn += 1 table += 'type={}, name="Root Partition"\n'.format(GPT_ROOT_NATIVE) args.root_partno = pn pn += 1 subprocess.run(["sfdisk", "--color=never", f.name], input=table.encode("utf-8"), check=True) subprocess.run(["sync"]) print_step("Created partition table as " + f.name + ".") return f @contextlib.contextmanager def attach_image_loopback(args, raw): if raw is None: yield None return print_step("Attaching image file...") c = subprocess.run(["losetup", "--find", "--show", "--partscan", raw.name], stdout=subprocess.PIPE, check=True) loopdev = c.stdout.decode("utf-8").strip() print_step("Attached image file as " + loopdev + ".") try: yield loopdev finally: print_step("Detaching image file..."); subprocess.run(["losetup", "--detach", loopdev], check=True) print_step("Detaching image file completed."); def partition(loopdev, partno): return loopdev + "p" + str(partno) def prepare_swap(args, loopdev): if loopdev is None: return if args.swap_partno is None: return print_step("Formatting swap partition..."); subprocess.run(["mkswap", "-Lswap", partition(loopdev, args.swap_partno)], check=True) print_step("Formatting swap partition completed."); def prepare_esp(args, loopdev): if loopdev is None: return if args.esp_partno is None: return print_step("Formatting ESP partition..."); subprocess.run(["mkfs.fat", "-nEFI", "-F32", partition(loopdev, args.esp_partno)], check=True) print_step("Formatting ESP partition completed."); def mkfs_ext4(label, mount, loopdev, partno): subprocess.run(["mkfs.ext4", "-L", label, "-M", mount, partition(loopdev, partno)], check=True) def prepare_root(args, loopdev): if loopdev is None: return if args.root_partno is None: return print_step("Formatting root partition..."); if args.output_format == OutputFormat.raw_btrfs: subprocess.run(["mkfs.btrfs", "-Lroot", partition(loopdev, args.root_partno)], check=True) else: mkfs_ext4("root", "/", loopdev, args.root_partno) print_step("Formatting root partition completed."); def prepare_home(args, loopdev): if loopdev is None: return if args.home_partno is None: return print_step("Formatting home partition..."); mkfs_ext4("home", "/home", loopdev, args.home_partno) print_step("Formatting home partition completed."); def prepare_srv(args, loopdev): if loopdev is None: return if args.srv_partno is None: return print_step("Formatting server data partition..."); mkfs_ext4("srv", "/srv", loopdev, args.srv_partno) print_step("Formatted server data partition."); def mount_loop(args, loopdev, partno, where): os.makedirs(where, 0o755, True) options = "-odiscard" if args.compress and args.output_format == OutputFormat.raw_btrfs: options += ",compress" subprocess.run(["mount", "-n", partition(loopdev, partno), where, options], check=True) def mount_bind(what, where): os.makedirs(where, 0o755, True) subprocess.run(["mount", "--bind", what, where], check=True) @contextlib.contextmanager def mount_image(args, workspace, loopdev): if loopdev is None: yield None return print_step("Mounting image..."); root = os.path.join(workspace, "root") mount_loop(args, loopdev, args.root_partno, root) if args.home_partno is not None: mount_loop(args, loopdev, args.home_partno, os.path.join(root, "home")) if args.srv_partno is not None: mount_loop(args, loopdev, args.srv_partno, os.path.join(root, "srv")) if args.esp_partno is not None: mount_loop(args, loopdev, args.esp_partno, os.path.join(root, "boot/efi")) if args.distribution == Distribution.fedora: mount_bind("/proc", os.path.join(root, "proc")) mount_bind("/dev", os.path.join(root, "dev")) mount_bind("/sys", os.path.join(root, "sys")) print_step("Mounting image completed."); try: yield finally: print_step("Unmounting image..."); umount(os.path.join(root, "home")) umount(os.path.join(root, "srv")) umount(os.path.join(root, "boot/efi")) umount(os.path.join(root, "proc")) umount(os.path.join(root, "sys")) umount(os.path.join(root, "dev")) umount(os.path.join(root, "var/cache/dnf")) umount(os.path.join(root, "var/cache/apt/archives")) umount(os.path.join(root)) print_step("Unmounting image completed."); def mount_cache(args, workspace): if not args.distribution in (Distribution.fedora, Distribution.debian, Distribution.ubuntu): return if args.cache_path is None: return # We can't do this in mount_image() yet, as /var itself might have to be created as a subvolume first if args.distribution == Distribution.fedora: mount_bind(args.cache_path, os.path.join(workspace, "root", "var/cache/dnf")) elif args.distribution in (Distribution.debian, Distribution.ubuntu): mount_bind(args.cache_path, os.path.join(workspace, "root", "var/cache/apt/archives")) def umount(where): # Ignore failures and error messages subprocess.run(["umount", "-n", where], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def prepare_tree(args, workspace): print_step("Setting up basic OS tree..."); if args.output_format == OutputFormat.subvolume: btrfs_subvol_create(os.path.join(workspace, "root")) else: try: os.mkdir(os.path.join(workspace, "root")) except FileExistsError: pass if args.output_format in (OutputFormat.subvolume, OutputFormat.raw_btrfs): btrfs_subvol_create(os.path.join(workspace, "root", "home")) btrfs_subvol_create(os.path.join(workspace, "root", "srv")) btrfs_subvol_create(os.path.join(workspace, "root", "var")) btrfs_subvol_create(os.path.join(workspace, "root", "var/tmp"), 0o1777) os.mkdir(os.path.join(workspace, "root", "var/lib")) btrfs_subvol_create(os.path.join(workspace, "root", "var/lib/machines"), 0o700) if args.bootable: # We need an initialized machine ID for the boot logic to work mid = uuid.uuid4().hex os.mkdir(os.path.join(workspace, "root", "etc"), 0o755) open(os.path.join(workspace, "root", "etc/machine-id"), "w").write(mid + "\n") # For now, let's stay compatible with traditional Linux ESP mounts os.mkdir(os.path.join(workspace, "root", "boot/efi/EFI"), 0o700) os.mkdir(os.path.join(workspace, "root", "boot/efi/EFI/BOOT"), 0o700) os.mkdir(os.path.join(workspace, "root", "boot/efi/EFI/systemd"), 0o700) os.mkdir(os.path.join(workspace, "root", "boot/efi/loader"), 0o700) os.mkdir(os.path.join(workspace, "root", "boot/efi/loader/entries"), 0o700) os.mkdir(os.path.join(workspace, "root", "boot/efi", mid), 0o700) os.symlink("efi/loader", os.path.join(workspace, "root", "boot/loader")) os.symlink("efi/" + mid, os.path.join(workspace, "root", "boot", mid)) os.mkdir(os.path.join(workspace, "root", "etc/kernel"), 0o755) with open(os.path.join(workspace, "root", "etc/kernel/cmdline"), "w") as cmdline: cmdline.write(args.kernel_commandline) cmdline.write("\n") print_step("Setting up basic OS tree completed."); def patch_file(filepath, line_rewriter): temp_new_filepath = filepath + ".tmp.new" with open(filepath, "r") as old: with open(temp_new_filepath, "w") as new: for line in old: new.write(line_rewriter(line)) shutil.copystat(filepath, temp_new_filepath) os.remove(filepath) shutil.move(temp_new_filepath, filepath) def enable_networkd(workspace): subprocess.run(["systemctl", "--root", os.path.join(workspace, "root"), "enable", "systemd-networkd", "systemd-resolved"], check=True) os.remove(os.path.join(workspace, "root", "etc/resolv.conf")) os.symlink("../usr/lib/systemd/resolv.conf", os.path.join(workspace, "root", "etc/resolv.conf")) patch_file(os.path.join(workspace, "root", "etc/nsswitch.conf"), lambda line: " ".join(["resolve" if w == "dns" else w for w in line.split(" ")]) if line.startswith("hosts:") else line) with open(os.path.join(workspace, "root", "etc/systemd/network/all-ethernet.network"), "w") as f: f.write("""\ [Match] Type=ether [Network] DHCP=yes """) def run_workspace_command(workspace, *cmd, network=False): cmdline = ["systemd-nspawn", '--quiet', "--directory", os.path.join(workspace, "root"), "--as-pid2", "--register=no"] if not network: cmdline += ["--private-network"] cmdline += ['--', *cmd] subprocess.run(cmdline, check=True) def install_fedora(args, workspace, run_build_script): print_step("Installing Fedora...") gpg_key = "/etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-%s-x86_64" % args.release if os.path.exists(gpg_key): gpg_key = "file://%s" % gpg_key else: gpg_key = "https://getfedora.org/static/81B46521.txt" if args.mirror: release_url = "baseurl={.mirror}/releases/{.release}/Everything/x86_64/os/".format(args) updates_url = "baseurl={.mirror}/updates/{.release}/x86_64/".format(args) else: release_url = ("metalink=https://mirrors.fedoraproject.org/metalink?" + "repo=fedora-{.release}&arch=x86_64".format(args)) updates_url = ("metalink=https://mirrors.fedoraproject.org/metalink?" + "repo=updates-released-f{.release}&arch=x86_64".format(args)) with open(os.path.join(workspace, "dnf.conf"), "w") as f: f.write("""\ [main] gpgcheck=1 [fedora] name=Fedora {args.release} - base {release_url} gpgkey={gpg_key} [updates] name=Fedora {args.release} - updates {updates_url} gpgkey={gpg_key} """.format(args=args, gpg_key=gpg_key, release_url=release_url, updates_url=updates_url)) root = os.path.join(workspace, "root") cmdline = ["dnf", "-y", "--config=" + os.path.join(workspace, "dnf.conf"), "--best", "--allowerasing", "--releasever=" + args.release, "--installroot=" + root, "--disablerepo=*", "--enablerepo=fedora", "--enablerepo=updates", "--setopt=keepcache=1", "--setopt=install_weak_deps=0"] # Turn off docs, but not during the development build, as dnf currently has problems with that if not args.with_docs and not run_build_script: cmdline.append("--setopt=tsflags=nodocs") cmdline.extend([ "install", "systemd", "fedora-release", "passwd"]) if args.packages is not None: cmdline.extend(args.packages) if run_build_script and args.build_packages is not None: cmdline.extend(args.build_packages) if args.bootable: cmdline.extend(["kernel", "systemd-udev"]) os.makedirs(os.path.join(root, 'efi'), exist_ok=True) subprocess.run(cmdline, check=True) print_step("Installing Fedora completed.") def install_debian_or_ubuntu(args, workspace, run_build_script, mirror): cmdline = ["debootstrap", "--verbose", "--variant=minbase", "--include=systemd-sysv", "--exclude=sysv-rc,initscripts,startpar,lsb-base,insserv", args.release, workspace + "/root", mirror] if args.bootable and args.output_format == OutputFormat.raw_btrfs: cmdline[3] += ",btrfs-tools" subprocess.run(cmdline, check=True) # Debootstrap is not smart enough to deal correctly with alternative dependencies # Installing libpam-systemd via debootstrap results in systemd-shim being installed # Therefore, prefer to install via apt from inside the container extra_packages = [ 'dbus', 'libpam-systemd'] # Also install extra packages via the secondary APT run, because it is smarter and # can deal better with any conflicts if args.packages is not None: extra_packages += args.packages if run_build_script and args.build_packages is not None: extra_packages += args.build_packages # Work around debian bug #835628 os.makedirs(os.path.join(workspace, "root/etc/dracut.conf.d"), exist_ok=True) with open(os.path.join(workspace, "root/etc/dracut.conf.d/99-generic.conf"), "w") as f: f.write("hostonly=no") if args.bootable: extra_packages += ["linux-image-amd64", "dracut"] if extra_packages: # Debian policy is to start daemons by default. # The policy-rc.d script can be used choose which ones to start # Let's install one that denies all daemon startups # See https://people.debian.org/~hmh/invokerc.d-policyrc.d-specification.txt # Note: despite writing in /usr/sbin, this file is not shipped by the OS # and instead should be managed by the admin. policyrcd = os.path.join(workspace, "root/usr/sbin/policy-rc.d") with open(policyrcd, "w") as f: f.write("#!/bin/sh\n") f.write("exit 101") os.chmod(policyrcd, 0o755) cmdline = ["/usr/bin/apt-get", "--assume-yes", "--no-install-recommends", "install"] + extra_packages run_workspace_command(workspace, network=True, *cmdline) os.unlink(policyrcd) def install_debian(args, workspace, run_build_script): print_step("Installing Debian...") install_debian_or_ubuntu(args, workspace, run_build_script, args.mirror) print_step("Installing Debian completed.") def install_ubuntu(args, workspace, run_build_script): print_step("Installing Ubuntu...") install_debian_or_ubuntu(args, workspace, run_build_script, args.mirror) print_step("Installing Ubuntu completed.") def install_arch(args, workspace, run_build_script): if args.release is not None: sys.stderr.write("Distribution release specification is not supported for ArchLinux, ignoring.") print_step("Installing ArchLinux...") keyring = "archlinux" if platform.machine() == "aarch64": keyring += "arm" subprocess.run(["pacman-key", "--nocolor", "--init"], check=True) subprocess.run(["pacman-key", "--nocolor", "--populate", keyring], check=True) if platform.machine() == "aarch64": server = "Server = {}/$arch/$repo".format(args.mirror) else: server = "Server = {}/$repo/os/$arch".format(args.mirror) with open(os.path.join(workspace, "pacman.conf"), "w") as f: f.write("""\ [options] HookDir = /no_hook/ HoldPkg = pacman glibc Architecture = auto CheckSpace SigLevel = Required DatabaseOptional [core] {server} [extra] {server} [community] {server} """.format(args=args, server=server)) subprocess.run(["pacman", "--color", "never", "--config", os.path.join(workspace, "pacman.conf"), "-Sy"], check=True) c = subprocess.run(["pacman", "--color", "never", "--config", os.path.join(workspace, "pacman.conf"), "-Sg", "base"], stdout=subprocess.PIPE, universal_newlines=True, check=True) packages = set(c.stdout.split()) packages.remove("base") packages -= {"cryptsetup", "device-mapper", "dhcpcd", "e2fsprogs", "jfsutils", "lvm2", "mdadm", "netctl", "pcmciautils", "reiserfsprogs", "xfsprogs"} if args.bootable: if args.output_format == OutputFormat.raw_gpt: packages.add("e2fsprogs") elif args.output_format == OutputFormat.raw_btrfs: packages.add("btrfs-progs") else: if "linux" in packages: packages.remove("linux") if args.packages is not None: packages |= set(args.packages) if run_build_script and args.build_packages is not None: packages |= set(args.build_packages) cmdline = ["pacstrap", "-C", os.path.join(workspace, "pacman.conf"), "-c", "-d", workspace + "/root"] + \ list(packages) subprocess.run(cmdline, check=True) enable_networkd(workspace) print_step("Installing ArchLinux complete.") def install_distribution(args, workspace, run_build_script): install = { Distribution.fedora : install_fedora, Distribution.debian : install_debian, Distribution.ubuntu : install_ubuntu, Distribution.arch : install_arch, } install[args.distribution](args, workspace, run_build_script) def set_root_password(args, workspace): "Set the root account password, or just delete it so it's easy to log in" if args.password == '': print_step("Deleting root password...") jj = lambda line: (':'.join(['root', ''] + line.split(':')[2:]) if line.startswith('root:') else line) patch_file(os.path.join(workspace, 'root', 'etc/passwd'), jj) elif args.password: print_step("Setting root password...") password = crypt.crypt(args.password, crypt.mksalt(crypt.METHOD_SHA512)) jj = lambda line: (':'.join(['root', password] + line.split(':')[2:]) if line.startswith('root:') else line) patch_file(os.path.join(workspace, 'root', 'etc/shadow'), jj) def install_boot_loader_arch(args, workspace): patch_file(os.path.join(workspace, "root", "etc/mkinitcpio.conf"), lambda line: "HOOKS=\"systemd modconf block filesystems fsck\"\n" if line.startswith("HOOKS=") else line) kernel_version = next(filter(lambda x: x[0].isdigit(), os.listdir(os.path.join(workspace, "root", "lib/modules")))) run_workspace_command(workspace, "/usr/bin/kernel-install", "add", kernel_version, "/boot/vmlinuz-linux") def install_boot_loader_debian(args, workspace): kernel_version = next(filter(lambda x: x[0].isdigit(), os.listdir(os.path.join(workspace, "root", "lib/modules")))) run_workspace_command(workspace, "/usr/bin/kernel-install", "add", kernel_version, "/boot/vmlinuz-" + kernel_version) def install_boot_loader(args, workspace): if not args.bootable: return print_step("Installing boot loader...") shutil.copyfile(os.path.join(workspace, "root", "usr/lib/systemd/boot/efi/systemd-bootx64.efi"), os.path.join(workspace, "root", "boot/efi/EFI/systemd/systemd-bootx64.efi")) shutil.copyfile(os.path.join(workspace, "root", "usr/lib/systemd/boot/efi/systemd-bootx64.efi"), os.path.join(workspace, "root", "boot/efi/EFI/BOOT/bootx64.efi")) if args.distribution == Distribution.arch: install_boot_loader_arch(args, workspace) if args.distribution == Distribution.debian: install_boot_loader_debian(args, workspace) print_step("Installing boot loader completed.") def enumerate_and_copy(source, dest, suffix = ""): for entry in os.scandir(source + suffix): dest_path = dest + suffix + "/" + entry.name if entry.is_dir(): os.makedirs(dest_path, mode=entry.stat(follow_symlinks=False).st_mode & 0o7777, exist_ok=True) enumerate_and_copy(source, dest, suffix + "/" + entry.name) else: try: os.unlink(dest_path) except: pass shutil.copy(entry.path, dest_path, follow_symlinks=False) shutil.copystat(entry.path, dest_path, follow_symlinks=False) def install_extra_trees(args, workspace): if args.extra_trees is None: return print_step("Copying in extra file trees...") for d in args.extra_trees: enumerate_and_copy(d, os.path.join(workspace, "root")) print_step("Copying in extra file trees completed.") def git_files_ignore(): "Creates a function to be used as a ignore callable argument for copytree" c = subprocess.run(['git', 'ls-files', '-z', '--others', '--cached', '--exclude-standard', '--exclude', '/.mkosi-*'], stdout=subprocess.PIPE, universal_newlines=False, check=True) files = {x.decode("utf-8") for x in c.stdout.split(b'\0')} del c def ignore(src, names): return [name for name in names if (os.path.relpath(os.path.join(src, name)) not in files and not os.path.isdir(os.path.join(src, name)))] return ignore def install_build_src(args, workspace, run_build_script): if not run_build_script: return if args.build_script is None: return print_step("Copying in build script and sources...") shutil.copy(args.build_script, os.path.join(workspace, "root", "root", os.path.basename(args.build_script))) if args.build_sources is not None: target = os.path.join(workspace, "root", "root/src") use_git = args.use_git_files if use_git is None: use_git = os.path.exists('.git') if use_git: ignore = git_files_ignore() else: ignore = shutil.ignore_patterns('.mkosi-*', '.git') shutil.copytree(args.build_sources, target, symlinks=True, ignore=ignore) print_step("Copying in build script and sources completed.") def install_build_dest(args, workspace, run_build_script): if run_build_script: return if args.build_script is None: return print_step("Copying in build tree...") enumerate_and_copy(os.path.join(workspace, "dest"), os.path.join(workspace, "root")) print_step("Copying in build tree completed.") def make_read_only(args, workspace): if not args.read_only: return if not args.output_format in (OutputFormat.raw_btrfs, OutputFormat.subvolume): return print_step("Marking root subvolume read-only...") btrfs_subvol_make_ro(os.path.join(workspace, "root")) print_step("Marking root subvolume read-only completed.") def make_tar(args, workspace): if args.output_format != OutputFormat.tar: return None print_step("Creating archive...") f = tempfile.NamedTemporaryFile(dir = os.path.dirname(args.output), prefix=".mkosi-") subprocess.run(["tar", "-C", os.path.join(workspace, "root"), "-c", "-J", "--xattrs", "--xattrs-include=*", "."], stdout=f, check=True) print_step("Creating archive completed.") return f def xz_output(args, raw): if not args.output_format in (OutputFormat.raw_btrfs, OutputFormat.raw_gpt): return raw if not args.xz: return raw print_step("Compressing image file...") f = tempfile.NamedTemporaryFile(dir = os.path.dirname(args.output), prefix=".mkosi-") subprocess.run(["xz", "-c", raw.name], stdout=f, check=True) print_step("Compressing image file complete.") return f def copy_nspawn_settings(args): if args.nspawn_settings is None: return None print_step("Copying nspawn settings file...") f = tempfile.NamedTemporaryFile(mode = "w+b", dir = os.path.dirname(args.output_nspawn_settings), prefix=".mkosi-") with open(args.nspawn_settings, "rb") as c: bs = 65536 buf = c.read(bs) while len(buf) > 0: f.write(buf) buf = c.read(bs) print_step("Copying nspawn settings file completed.") return f def hash_file(of, sf, fname): bs = 65536 h = hashlib.sha256() sf.seek(0) buf = sf.read(bs) while len(buf) > 0: h.update(buf) buf = sf.read(bs) of.write(h.hexdigest() + " *" + fname + "\n") def calculate_sha256sum(args, raw, tar, nspawn_settings): if args.output_format in (OutputFormat.directory, OutputFormat.subvolume): return None if not args.checksum: return None print_step("Calculating SHA256SUM...") f = tempfile.NamedTemporaryFile(mode="w+", dir=os.path.dirname(args.output_checksum), prefix=".mkosi-", encoding="utf-8") if raw is not None: hash_file(f, raw, os.path.basename(args.output)) if tar is not None: hash_file(f, tar, os.path.basename(args.output)) if nspawn_settings is not None: hash_file(f, nspawn_settings, os.path.basename(args.output_nspawn_settings)) print_step("Calculating SHA256SUM complete.") return f def calculate_signature(args, checksum): if not args.sign: return None if checksum is None: return None print_step("Signing SHA256SUM...") f = tempfile.NamedTemporaryFile(mode="wb", prefix=".mkosi-", dir=os.path.dirname(args.output_signature)) cmdline = ["gpg", "--detach-sign"] if args.key is not None: cmdline.extend(["--default-key", args.key]) checksum.seek(0) subprocess.run(cmdline, stdin=checksum, stdout=f, check=True) print_step("Signing SHA256SUM complete.") return f def link_output(args, workspace, raw, tar): print_step("Linking image file...") if args.output_format in (OutputFormat.directory, OutputFormat.subvolume): os.rename(os.path.join(workspace, "root"), args.output) elif args.output_format in (OutputFormat.raw_btrfs, OutputFormat.raw_gpt): os.chmod(raw, 0o666 & ~args.original_umask) os.link(raw, args.output) else: os.chmod(raw, 0o666 & ~args.original_umask) os.link(tar, args.output) print_step("Successfully linked " + args.output + ".") def link_output_nspawn_settings(args, path): if path is None: return print_step("Linking nspawn settings file...") os.chmod(path, 0o666 & ~args.original_umask) os.link(path, args.output_nspawn_settings) print_step("Successfully linked " + args.output_nspawn_settings + ".") def link_output_checksum(args, checksum): if checksum is None: return print_step("Linking SHA256SUM file...") os.chmod(checksum, 0o666 & ~args.original_umask) os.link(checksum, args.output_checksum) print_step("Successfully linked " + args.output_checksum + ".") def link_output_signature(args, signature): if signature is None: return print_step("Linking SHA256SUM.gpg file...") os.chmod(signature, 0o666 & ~args.original_umask) os.link(signature, args.output_signature) print_step("Successfully linked " + args.output_signature + ".") def format_bytes(bytes): if bytes >= 1024*1024*1024: return "{:0.1f}G".format(bytes / 1024**3) if bytes >= 1024*1024: return "{:0.1f}M".format(bytes / 1024**2) if bytes >= 1024: return "{:0.1f}K".format(bytes / 1024) return "{}B".format(bytes) def dir_size(path): sum = 0 for entry in os.scandir(path): if entry.is_symlink(): # We can ignore symlinks because they either point into our tree, # in which case we'll include the size of target directory anyway, # or outside, in which case we don't need to. continue elif entry.is_file(): sum += entry.stat().st_blocks * 512 elif entry.is_dir(): sum += dir_size(entry.path) return sum def print_output_size(args): if args.output_format in (OutputFormat.directory, OutputFormat.subvolume): print_step("Resulting image size is " + format_bytes(dir_size(args.output)) + ".") else: st = os.stat(args.output) print_step("Resulting image size is " + format_bytes(st.st_size) + ", consumes " + format_bytes(st.st_blocks * 512) + ".") def setup_cache(args): if not args.distribution in (Distribution.fedora, Distribution.debian, Distribution.ubuntu): return None print_step("Setting up package cache...") if args.cache_path is None: d = tempfile.TemporaryDirectory(dir=os.path.dirname(args.output), prefix=".mkosi-") args.cache_path = d.name else: os.makedirs(args.cache_path, 0o700, True) d = None print_step("Setting up package cache " + args.cache_path + " completed.") return d class PackageAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): l = getattr(namespace, self.dest) if l is None: l = [] l.extend(values.split(",")) setattr(namespace, self.dest, l) def parse_args(): parser = argparse.ArgumentParser(description='Build Legacy-Free OS Images', add_help=False) group = parser.add_argument_group("Commands") group.add_argument("verb", choices=("build", "clean", "help", "summary"), nargs='?', default="build", help='Operation to execute') group.add_argument('-h', '--help', action='help', help="Show this help") group = parser.add_argument_group("Distribution") group.add_argument('-d', "--distribution", choices=Distribution.__members__, help='Distribution to install') group.add_argument('-r', "--release", help='Distribution release to install') group.add_argument('-m', "--mirror", help='Distribution mirror to use') group = parser.add_argument_group("Output") group.add_argument('-t', "--format", dest='output_format', choices=OutputFormat.__members__, help='Output Format') group.add_argument('-o', "--output", help='Output image path', metavar='PATH') group.add_argument('-f', "--force", action='store_true', help='Remove existing image file before operation') group.add_argument('-b', "--bootable", type=parse_boolean, nargs='?', const=True, help='Make image bootable on EFI (only raw_gpt, raw_btrfs)') group.add_argument("--read-only", action='store_true', help='Make root volume read-only (only raw_btrfs, subvolume)') group.add_argument("--compress", action='store_true', help='Enable compression in file system (only raw_btrfs, subvolume)') group.add_argument("--xz", action='store_true', help='Compress resulting image with xz (only raw_gpt, raw_btrfs, implied on tar)') group = parser.add_argument_group("Packages") group.add_argument('-p', "--package", action=PackageAction, dest='packages', help='Add an additional package to the OS image', metavar='PACKAGE') group.add_argument("--with-docs", action='store_true', help='Install documentation (only fedora)') group.add_argument("--cache", dest='cache_path', help='Package cache path (only fedora, debian, ubuntu)', metavar='PATH') group.add_argument("--extra-tree", action='append', dest='extra_trees', help='Copy an extra tree on top of image', metavar='PATH') group.add_argument("--build-script", help='Build script to run inside image', metavar='PATH') group.add_argument("--build-sources", help='Path for sources to build', metavar='PATH') group.add_argument("--build-package", action=PackageAction, dest='build_packages', help='Additional packages needed for build script', metavar='PACKAGE') group.add_argument('--use-git-files', type=parse_boolean, help='Ignore any files that git itself ignores (default: guess)') group.add_argument("--settings", dest='nspawn_settings', help='Add in .spawn settings file', metavar='PATH') group = parser.add_argument_group("Partitions") group.add_argument("--root-size", help='Set size of root partition (only raw_gpt, raw_btrfs)', metavar='BYTES') group.add_argument("--esp-size", help='Set size of EFI system partition (only raw_gpt, raw_btrfs)', metavar='BYTES') group.add_argument("--swap-size", help='Set size of swap partition (only raw_gpt, raw_btrfs)', metavar='BYTES') group.add_argument("--home-size", help='Set size of /home partition (only raw_gpt)', metavar='BYTES') group.add_argument("--srv-size", help='Set size of /srv partition (only raw_gpt)', metavar='BYTES') group = parser.add_argument_group("Validation (only raw_gpt, raw_btrfs, tar)") group.add_argument("--checksum", action='store_true', help='Write SHA256SUM file') group.add_argument("--sign", action='store_true', help='Write and sign SHA256SUM file') group.add_argument("--key", help='GPG key to use for signing') group.add_argument("--password", help='Set the root password') group = parser.add_argument_group("Additional Configuration") group.add_argument('-C', "--directory", help='Change to specified directory before doing anything', metavar='PATH') group.add_argument("--default", dest='default_path', help='Read configuration data from file', metavar='PATH') group.add_argument("--kernel-commandline", help='Set the kernel command line (only bootable images)') args = parser.parse_args() if args.verb == "help": parser.print_help() sys.exit(0) return args def parse_bytes(bytes): if bytes is None: return bytes if bytes.endswith('G'): factor = 1024**3 elif bytes.endswith('M'): factor = 1024**2 elif bytes.endswith('K'): factor = 1024 else: factor = 1 if factor > 1: bytes = bytes[:-1] result = int(bytes) * factor if result <= 0: raise ValueError("Size out of range") if result % 512 != 0: raise ValueError("Size not a multiple of 512") return result def detect_distribution(): try: f = open("/etc/os-release") except IOError: try: f = open("/usr/lib/os-release") except IOError: return None, None id = None version_id = None for ln in f: if ln.startswith("ID="): id = ln[3:].strip() if ln.startswith("VERSION_ID="): version_id = ln[11:].strip() d = Distribution.__members__.get(id, None) return d, version_id def unlink_try_hard(path): try: os.unlink(path) except: pass try: btrfs_subvol_delete(path) except: pass try: shutil.rmtree(path) except: pass def unlink_output(args): if not args.force and args.verb != "clean": return unlink_try_hard(args.output) if args.checksum: unlink_try_hard(args.output_checksum) if args.sign: unlink_try_hard(args.output_signature) if args.nspawn_settings is not None: unlink_try_hard(args.output_nspawn_settings) def parse_boolean(s): if s in {"1", "true", "yes"}: return True if s in {"0", "false", "no"}: return False raise ValueError("invalid literal for bool(): {!r}".format(s)) def process_setting(args, section, key, value): if section == "Distribution": if key == "Distribution": if args.distribution is None: args.distribution = value elif key == "Release": if args.release is None: args.release = value elif key is None: return True else: return False elif section == "Output": if key == "Format": if args.output_format is None: args.output_format = value elif key == "Output": if args.output is None: args.output = value elif key == "Force": if not args.force: args.force = parse_boolean(value) elif key == "Bootable": if not args.bootable: args.bootable = parse_boolean(value) elif key == "ReadOnly": if not args.read_only: args.read_only = parse_boolean(value) elif key == "Compress": if not args.compress: args.read_only = parse_boolean(value) elif key == "XZ": if not args.xz: args.xz = parse_boolean(value) elif key is None: return True else: return False elif section == "Packages": if key == "Packages": if args.packages is None: args.packages = value.split() else: args.packages.extend(value.split()) elif key == "WithDocs": if not args.with_docs: args.with_docs = parse_boolean(value) elif key == "Cache": if args.cache_path is None: args.cache_path = value elif key == "ExtraTrees": if args.extra_trees is None: args.extra_trees = value.split() else: args.extra_trees.extend(value.split()) elif key == "BuildScript": if args.build_script is not None: args.build_script = value elif key == "BuildSources": if args.build_sources is not None: args.build_sources = value elif key == "BuildPackages": if args.build_packages is None: args.build_packages = value.split() else: args.build_packages.extend(value.split()) elif key == "NSpawnSettings": if args.nspawn_settings is not None: args.nspawn_settings = value elif key is None: return True else: return False elif section == "Partitions": if key == "RootSize": if args.root_size is None: args.root_size = value elif key == "ESPSize": if args.esp_size is None: args.esp_size = value elif key == "SwapSize": if args.swap_size is None: args.swap_size = value elif key == "HomeSize": if args.home_size is None: args.home_size = value elif key == "SrvSize": if args.srv_size is None: args.srv_size = value elif key is None: return True else: return False elif section == "Validation": if key == "CheckSum": if not args.check_sum: args.check_sum = parse_boolean(value) elif key == "Sign": if not args.sign: args.sign = parse_boolean(value) elif key == "Key": if args.key is None: args.key = value elif key is None: return True else: return False else: return False return True def load_defaults(args): fname = "mkosi.default" if args.default_path is None else args.default_path try: f = open(fname, "r") except FileNotFoundError: return config = configparser.ConfigParser(delimiters='=') config.optionxform = str config.read_file(f) for section in config.sections(): if not process_setting(args, section, None, None): sys.stderr.write("Unknown section in {}, ignoring: [{}]\n".format(fname, section)) for key in config[section]: if not process_setting(args, section, key, config[section][key]): sys.stderr.write("Unknown key in section [{}] in {}, ignoring: {}=\n".format(section, fname, key)) def find_nspawn_settings(args): if args.nspawn_settings is not None: return if os.path.exists("mkosi.nspawn"): args.nspawn_settings = "mkosi.nspawn" def find_extra(args): if os.path.exists("mkosi.extra"): if args.extra_trees is None: args.extra_trees = ["mkosi.extra"] else: args.extra_trees.append("mkosi.extra") def find_build_script(args): if args.build_script is not None: return if os.path.exists("mkosi.build"): args.build_script = "mkosi.build" def find_build_sources(args): if args.build_sources is not None: return args.build_sources = os.getcwd() def build_nspawn_settings_path(path): t = path while True: if t.endswith(".xz"): t = t[:-3] elif t.endswith(".raw"): t = t[:-4] elif t.endswith(".tar"): t = t[:-4] else: break return t + ".nspawn" def load_args(): args = parse_args() if args.directory is not None: os.chdir(args.directory) load_defaults(args) find_nspawn_settings(args) find_extra(args) find_build_script(args) find_build_sources(args) if args.output_format is None: args.output_format = OutputFormat.raw_gpt else: args.output_format = OutputFormat[args.output_format] if args.distribution is not None: args.distribution = Distribution[args.distribution] if args.distribution is None or args.release is None: d, r = detect_distribution() if args.distribution is None: args.distribution = d if args.distribution == d and args.release is None: args.release = r if args.distribution is None: sys.stderr.write("Couldn't detect distribution.\n") sys.exit(1) if args.release is None: if args.distribution == Distribution.fedora: args.release = "24" elif args.distribution == Distribution.debian: args.release = "unstable" elif args.distribution == Distribution.ubuntu: args.release = "yakkety" if args.mirror is None: if args.distribution == Distribution.fedora: args.mirror = None elif args.distribution == Distribution.debian: args.mirror = "http://httpredir.debian.org/debian" elif args.distribution == Distribution.ubuntu: args.mirror = "http://archive.ubuntu.com/ubuntu" elif args.distribution == Distribution.arch: args.mirror = "https://mirrors.kernel.org/archlinux" if platform.machine() == "aarch64": args.mirror = "http://mirror.archlinuxarm.org" if args.bootable: if args.distribution not in (Distribution.fedora, Distribution.arch, Distribution.debian): sys.stderr.write("Bootable images are currently supported only on Debian, Fedora and ArchLinux.\n") sys.exit(1) if not args.output_format in (OutputFormat.raw_gpt, OutputFormat.raw_btrfs): sys.stderr.write("Directory, subvolume and tar images cannot be booted.\n") sys.exit(1) if args.sign: args.checksum = True if args.output is None: if args.output_format in (OutputFormat.raw_gpt, OutputFormat.raw_btrfs): if args.xz: args.output = "image.raw.xz" else: args.output = "image.raw" elif args.output_format == OutputFormat.tar: args.output = "image.tar.xz" else: args.output = "image" args.output = os.path.abspath(args.output) if args.output_format == OutputFormat.tar: args.xz = True if args.checksum: args.output_checksum = os.path.join(os.path.dirname(args.output), "SHA256SUM") if args.sign: args.output_signature = os.path.join(os.path.dirname(args.output), "SHA256SUM.gpg") if args.nspawn_settings is not None: args.nspawn_settings = os.path.abspath(args.nspawn_settings) args.output_nspawn_settings = build_nspawn_settings_path(args.output) if args.build_script is not None: args.build_script = os.path.abspath(args.build_script) if args.build_sources is not None: args.build_sources = os.path.abspath(args.build_sources) if args.extra_trees is not None: for i in range(len(args.extra_trees)): args.extra_trees[i] = os.path.abspath(args.extra_trees[i]) args.root_size = parse_bytes(args.root_size) args.home_size = parse_bytes(args.home_size) args.srv_size = parse_bytes(args.srv_size) args.esp_size = parse_bytes(args.esp_size) args.swap_size = parse_bytes(args.swap_size) if args.output_format in (OutputFormat.raw_gpt, OutputFormat.raw_btrfs) and args.root_size is None: args.root_size = 1024*1024*1024 if args.bootable and args.esp_size is None: args.esp_size = 256*1024*1024 if args.bootable and args.kernel_commandline is None: args.kernel_commandline = "rhgb quiet selinux=0 audit=0 rw" return args def check_output(args): for f in (args.output, args.output_checksum if args.checksum else None, args.output_signature if args.sign else None, args.output_nspawn_settings if args.nspawn_settings is not None else None): if f is None: continue if os.path.exists(f): sys.stderr.write("Output file " + f + " exists already. (Consider invocation with --force.)\n") sys.exit(1) def yes_no(b): return "yes" if b else "no" def format_bytes_or_disabled(sz): if sz is None: return "(disabled)" return format_bytes(sz) def none_to_na(s): return "n/a" if s is None else s def none_to_none(s): return "none" if s is None else s def line_join_list(l): if l is None: return "none" return "\n ".join(l) def print_summary(args): sys.stderr.write("DISTRIBUTION:\n") sys.stderr.write(" Distribution: " + args.distribution.name + "\n") sys.stderr.write(" Release: " + none_to_na(args.release) + "\n") if args.mirror is not None: sys.stderr.write(" Mirror: " + args.mirror + "\n") sys.stderr.write("\nOUTPUT:\n") sys.stderr.write(" Output Format: " + args.output_format.name + "\n") sys.stderr.write(" Output: " + args.output + "\n") sys.stderr.write(" Output Checksum: " + none_to_na(args.output_checksum if args.checksum else None) + "\n") sys.stderr.write(" Output Signature: " + none_to_na(args.output_signature if args.sign else None) + "\n") sys.stderr.write("Output nspawn Settings: " + none_to_na(args.output_nspawn_settings if args.nspawn_settings is not None else None) + "\n") if args.output_format in (OutputFormat.raw_btrfs, OutputFormat.subvolume): sys.stderr.write(" Read-only: " + yes_no(args.read_only) + "\n") sys.stderr.write(" FS Compression: " + yes_no(args.compress) + "\n") if args.output_format in (OutputFormat.raw_gpt, OutputFormat.raw_btrfs, OutputFormat.tar): sys.stderr.write(" XZ Compression: " + yes_no(args.xz) + "\n") sys.stderr.write("\nPACKAGES:\n") sys.stderr.write(" Packages: " + line_join_list(args.packages) + "\n") if args.distribution == Distribution.fedora: sys.stderr.write(" With Documentation: " + yes_no(args.with_docs) + "\n") sys.stderr.write(" Package Cache: " + none_to_none(args.cache_path) + "\n") sys.stderr.write(" Extra Trees: " + line_join_list(args.extra_trees) + "\n") sys.stderr.write(" Build Script: " + none_to_none(args.build_script) + "\n") sys.stderr.write(" Build Sources: " + none_to_none(args.build_sources) + "\n") sys.stderr.write(" Build Packages: " + line_join_list(args.build_packages) + "\n") sys.stderr.write(" nspawn Settings: " + none_to_none(args.nspawn_settings) + "\n") if args.output_format in (OutputFormat.raw_gpt, OutputFormat.raw_btrfs): sys.stderr.write("\nPARTITIONS:\n") sys.stderr.write(" Bootable: " + yes_no(args.bootable) + "\n") sys.stderr.write(" Root Partition: " + format_bytes(args.root_size) + "\n") sys.stderr.write(" Swap Partition: " + format_bytes_or_disabled(args.swap_size) + "\n") sys.stderr.write(" ESP: " + format_bytes_or_disabled(args.esp_size) + "\n") sys.stderr.write(" /home Partition: " + format_bytes_or_disabled(args.home_size) + "\n") sys.stderr.write(" /srv Partition: " + format_bytes_or_disabled(args.srv_size) + "\n") if args.output_format in (OutputFormat.raw_gpt, OutputFormat.raw_btrfs, OutputFormat.tar): sys.stderr.write("\nVALIDATION:\n") sys.stderr.write(" Checksum: " + yes_no(args.checksum) + "\n") sys.stderr.write(" Sign: " + yes_no(args.sign) + "\n") sys.stderr.write(" GPG Key: " + ("default" if args.key is None else args.key) + "\n") def build_image(args, workspace, run_build_script): # If there's no build script set, there's no point in executing # the build script iteration. Let's quite early. if args.build_script is None and run_build_script: return (None, None) tar = None raw = create_image(args, workspace.name) with attach_image_loopback(args, raw) as loopdev: prepare_swap(args, loopdev) prepare_esp(args, loopdev) prepare_root(args, loopdev) prepare_home(args, loopdev) prepare_srv(args, loopdev) with mount_image(args, workspace.name, loopdev): prepare_tree(args, workspace.name) mount_cache(args, workspace.name) install_distribution(args, workspace.name, run_build_script) install_boot_loader(args, workspace.name) install_extra_trees(args, workspace.name) install_build_src(args, workspace.name, run_build_script) install_build_dest(args, workspace.name, run_build_script) if not run_build_script: set_root_password(args, workspace.name) make_read_only(args, workspace.name) tar = make_tar(args, workspace.name) return raw, tar def run_build_script(args, workspace, raw): if args.build_script is None: return print_step("Running build script...") dest = os.path.join(workspace, "dest") os.mkdir(dest, 0o755) cmdline = ["systemd-nspawn", '--quiet', "--directory=" + os.path.join(workspace, "root") if raw is None else "--image=" + raw.name, "--as-pid2", "--private-network", "--register=no", "--bind", dest + ":/root/dest", "--setenv=WITH_DOCS=" + ("1" if args.with_docs else "0"), "--setenv=DESTDIR=/root/dest"] if args.build_sources is not None: cmdline.append("--setenv=SRCDIR=/root/src") cmdline.append("--chdir=/root/src") else: cmdline.append("--chdir=/root") cmdline.append("/root/" + os.path.basename(args.build_script)) print(cmdline) subprocess.run(cmdline, check=True) print_step("Running build script completed.") def build_stuff(args): cache = setup_cache(args) workspace = setup_workspace(args) # Run the image builder twice, once for running the build script and once for the final build raw, tar = build_image(args, workspace, run_build_script=True) run_build_script(args, workspace.name, raw) if raw is not None: del raw if tar is not None: del tar raw, tar = build_image(args, workspace, run_build_script=False) raw = xz_output(args, raw) settings = copy_nspawn_settings(args) checksum = calculate_sha256sum(args, raw, tar, settings) signature = calculate_signature(args, checksum) link_output(args, workspace.name, raw.name if raw is not None else None, tar.name if tar is not None else None) link_output_checksum(args, checksum.name if checksum is not None else None) link_output_signature(args, signature.name if signature is not None else None) link_output_nspawn_settings(args, settings.name if settings is not None else None) def main(): args = load_args() if os.getuid() != 0: sys.stderr.write("Must be invoked as root.\n") sys.exit(1) if args.verb in ("build", "clean"): unlink_output(args) if args.verb == "build": check_output(args) if args.verb in ("build", "summary"): print_summary(args) if args.verb == "build": init_namespace(args) build_stuff(args) print_output_size(args) if __name__ == "__main__": main()