Source code for ryo_iso.config

import appdirs as _appdirs
from pathlib import Path as _Path
# NOTE: importlib backport for Python 3.5
import importlib_resources as _resources
import shutil as _shutil
import time as _time
import yaml as _yaml
import sys as _sys
import os as _os
import re as _re
from loguru import logger as _logger
import requests as _requests
import delegator as _delegator

_ctx = None

[docs]class Config: """ Configuration State This class stores configuration information and provides methods for validation and network updates Methods ------- __init__(config='~/.config/ryo-iso/config.yml') Loads ``config`` file and perform initialization *distro*_init() Performs distribution specific initialization """ # Config File Setup _cfg_dir = (_os.environ.get('RYO_USER_CONFIG_DIR') or _appdirs.user_config_dir('ryo-iso', 'HxR')) _iso_cfg = _os.environ.get('RYO_ISO_CONF') data = { 'app': _Path(_cfg_dir) / 'config.yml', 'base': _Path(_cfg_dir) / 'iso_base.yml', 'iso': _Path(_iso_cfg) if _iso_cfg else (_Path.cwd() / 'iso.yml'), } build_dir = _Path(_os.environ.get('RYO_BUILD_DIR') or _Path.cwd())
[docs] @classmethod def install(cls,force=False): """ Install user config files. This function installs the application and project config files. Parameters ---------- force : bool Overwrite existing config files """ cls.data['app'].parent.mkdir(parents=True, exist_ok=True) if (not cls.data['app'].exists() or force): # Install application config from package resources # into $XDG_CONFIG_HOME/ryo-iso/config.yml with _resources.path("ryo_iso.data", 'config.yml') as default_config: _logger.debug("Install %s to %s"%(default_config, cls.data['app'])) # NOTE: Python 3.5 shutil.copy does not accept pathlib.Path _shutil.copy(str(default_config), str(cls.data['app'])) cls.data['base'].parent.mkdir(parents=True, exist_ok=True) if (not cls.data['base'].exists() or force): # Install base config from package resources # into $XDG_CONFIG_HOME/ryo-iso/iso_base.yml with _resources.path("ryo_iso.data", 'iso_base.yml') as default_config: _logger.debug("Install %s to %s"%(default_config, cls.data['base'])) # NOTE: Python 3.5 shutil.copy does not accept pathlib.Path _shutil.copy(str(default_config), str(cls.data['base'])) if (not cls.data['iso'].exists() or force): # Install Project config from package resources with _resources.path("ryo_iso.data", 'iso.yml') as default_config: _logger.debug("Install %s to %s"%(default_config, cls.data['iso'])) # NOTE: Python 3.5 shutil.copy does not accept pathlib.Path _shutil.copy(str(default_config), str(cls.data['iso']))
[docs] @staticmethod def deep_merge(overlay, base): """ Deep merge dictionaries This method recursivly merges dictionaries, where ``base`` provides defaults for the dictionary ``overlay``. Sub-lists and dictionaries inside lists are replaced and not merged. Parameters ---------- overlay : dict This dictionary is modified in place to contain the merged results. base : dict Provides default values not found in the overlay. """ for k in base.keys(): if isinstance(base[k],dict): overlay[k] = overlay.get(k) or {} Config.deep_merge(overlay[k],base[k]) else: overlay[k] = overlay.get(k) or base[k]
def __repr__(self): #report = {} #report['data'] = self.data #report['base_image'] = self.base_image #report['file'] = self.file #report['cache'] = self.cache #report['url'] = self.url return _yaml.dump(self.iso_cfg)
[docs] def __init__(self,config=data['iso']): """ Initialize Config Class This class loads a config file and provides functions for updating the configurational state from online resources Parameters ---------- config : str iso project config file """ # hashes are cached before they are validated self.app_cache_dir = _Path(_appdirs.user_cache_dir('ryo-iso', 'HxR')) self.app_cache_dir.mkdir(parents=True, exist_ok=True) with self.data['app'].open('r') as f: self.app_cfg = _yaml.safe_load(f) with self.data['base'].open('r') as f: self.base_cfg = _yaml.safe_load(f) with config.open('r') as f: self.iso_cfg = _yaml.safe_load(f) # Setup `iso.yml` defaults via `$XDG_CONFIG_HOME/ryo-iso/iso_base.yml` self.deep_merge(self.iso_cfg,self.base_cfg) self.base_image={} # supported distros distros = ['ubuntu'] # Parse ``image: {{ distro }}/{{version}}`` from config # See builds.sr.ht config file for aesthetics (self.base_image['distro'],self.base_image['version']) = self.iso_cfg['image'].split('/') if self.base_image['distro'] not in distros: _logger.error("Unsupported Distro") _sys.exit(_os.EX_CONFIG) # Distro plug-in system # Remap Config.{{distro}}_{{command}}() to Config.distro_{{command}}() # TODO: This probably needs to be refactored into a submodule cmds = ['init', 'update', 'hash_cache', 'hash_check', 'hash_version', 'base_image_check', ] for cmd in cmds: setattr(self,"distro_"+cmd, getattr(self, self.base_image['distro']+"_"+cmd)) # Initialize {{ distro }} self.distro_init(self.base_image['version'])
[docs] def ubuntu_init(self,version): """ Ubuntu - distro specific initialization This function normalizes versions and codenames to compose the URLs for the distribution and associated hash file Parameters ---------- version : str Version of the distribution """ codenames = ['xenial', 'bionic', 'cosmic', 'focal', 'impish', 'jammy'] codevers = {'16.04': 'xenial', '18.04': 'bionic', '18.10': 'cosmic', '20.04': 'focal', '21.10': 'impish', '22.04': 'jammy'} if version in codenames: self.base_image['codename'] = version match = _re.match(r'^(\d+\.\d+)$',version) if match: self.base_image['codename'] = codevers[match.group(0)] match = _re.match(r'^(\d+\.\d+\.\d+)$',version) if match: for ver in codevers: if version.startswith(ver): self.base_image['codename'] = codevers[ver] self.base_image['release_url'] = ('http://releases.ubuntu.com/' + self.base_image['codename'] + '/') # NOTE: The ISO image download directory should have sufficient disk space self.base_image['dir'] = ( _Path( self.app_cfg['distro_image_dir']).expanduser() / self.base_image['distro'] / self.base_image['codename'] ) self.base_image['dir'].mkdir(parents=True, exist_ok=True) self.file = {} self.file['hash'] = 'SHA256SUMS' self.file['sign'] = 'SHA256SUMS.gpg' self.cache = {} for k in self.file.keys(): self.cache[k] = self.app_cache_dir / self.file[k]
[docs] def ubuntu_hash_cache(self): """ Ubuntu - SHA256 checksum download This function downloads the hashes and the associated gpg signature file. ``['SHA256SUMS', 'SHA256SUMS.gpg']`` """ _logger.debug("ubuntu_hash_cache") self.url = {} for k in self.file.keys(): self.url[k] = self.base_image['release_url'] + self.file[k] for k in self.cache.keys(): with open(str(self.cache[k]),'w') as f: _logger.debug("Downloading %s to %s"%(self.url[k],self.cache[k])) r = _requests.get(self.url[k]) f.write(r.text)
[docs] def ubuntu_hash_check(self): """ Ubuntu - Verify GPG signature of SHA256 checksum download This function verifies the GPG signature of ``SHA256SUMS`` from the local keyring. """ _logger.debug("ubuntu_hash_check") keyring = [ "/usr/share/keyrings/ubuntu-archive-keyring.gpg", "/usr/share/keyrings/ubuntu-archive-removed-keys.gpg" ] keyargs = map(lambda i: "--keyring "+i, keyring) cmd = ("gpgv2 "+ " ".join(keyargs) + " " + str(self.cache['sign']) + " " + str(self.cache['hash']) ) _logger.debug(cmd) r = _delegator.run(cmd) return_msg = "OK" if r.return_code == 0 else "ERROR" _logger.debug("Verify: " + return_msg) if r.return_code == 0: for k in self.cache.keys(): # NOTE: Python 3.5 shutil.copy does not accept pathlib.Path _shutil.copy(str(self.cache[k]), str(self.base_image['dir'])) self.base_image[k] = self.base_image['dir'] / self.file[k] else: _logger.error("Can not verify " + self.file['hash']) _sys.exit(_os.EX_DATAERR)
[docs] def ubuntu_hash_version(self): """ Ubuntu - Parse and extract version information from hash file Parses version information from configuration and extracts the full version ``yy.mm.patch`` from ``SHA256SUMS`` from the local keyring. The function can extend the configured version from ``yy.mm`` into ``yy.mm.patch`` for easier upstream tracking. It can also convert from codenames (ex. ``xenial``) to ``yy.mm.patch`` (ex. ``16.04.6``). Finally this function constructs the url of the Ubuntu release from the version and variant configuration. """ _logger.debug("ubuntu_hash_version") target = {} # If version in ``image: {{ distro }}/{{version}}`` from config # is in the format `yy.mm.patch` (ex. 16.04.6) match = _re.match(r'^(\d+)\.(\d+)\.(\d+)$',self.base_image['version']) if match: target['yy'] = match.group(1) target['mm'] = match.group(2) target['patch'] = match.group(3) target['version'] = self.base_image['version'] self.base_image['release_version'] = self.base_image['version'] # Parse `~/.cache/ryo-iso/SHA256SUMS` to determine the latest patch release regex = _re.compile(r'^([0-9A-Fa-f]{64}) \*'+self.base_image['distro']+"-"+r'(\d+)\.(\d+)\.?(\d?)'+"-"+self.iso_cfg['variant']+"-"+self.iso_cfg['arch']+".iso$") _logger.debug("Regex: " + str(regex)) latest = {} latest_hash = None with open(str(self.cache['hash']),'r') as f: for line in f: # _logger.debug(line) match = regex.search(line) if match: # NOTE: `SHA256SUMS` should only contain one yy.mm latest['yy'] = match.group(2) latest['mm'] = match.group(3) if not latest.get('patch') or match.group(4) > latest['patch']: latest['patch'] = match.group(4) latest_hash = match.group(1) latest['version'] = (latest['yy'] + "." + latest['mm'] ) if (latest['patch'] != ""): latest['version'] = latest['version'] + "." + latest['patch'] if target == latest: break _logger.debug("Latest Version: " + latest['version']) current = self.base_image['release_url'].startswith('http://releases.ubuntu.com/') # If the target version is not the latest version switch to the release archive # TODO: Validate this codepath, lol. if target and target != latest and current: _logger.warning("Newer release version available") self.base_image['release_url'] = ( 'http://old-releases.ubuntu.com/releases/' + self.base_image['codename'] + '/') self.ubuntu_hash_cache() self.ubuntu_hash_check() self.ubuntu_hash_version() #self.base_image['release_version'] = self.base_image['version'] else: self.base_image['release_version'] = latest['version'] self.base_image['hash'] = latest_hash _logger.debug("Base Image Release Version: " + self.base_image['release_version']) _logger.debug("Base Image Hash: " + self.base_image['hash']) with (self.build_dir/'.release_version').open('w') as f: f.write(self.base_image['release_version']) with (self.build_dir/'.hash').open('w') as f: f.write(self.base_image['hash']) self.base_image['file'] = ("ubuntu-" + self.base_image['release_version'] + "-" + self.iso_cfg['variant'] + "-" + self.iso_cfg['arch'] + ".iso" ) self.base_image['path'] = self.base_image['dir'] / self.base_image['file'] self.base_image['url'] = self.base_image['release_url'] + self.base_image['file'] r = _requests.head(self.base_image['url']) self.base_image['size'] = int(r.headers['content-length'])
[docs] def ubuntu_base_image_check(self): """ Ubuntu - Verify the SHA256SUM of the upstream ISO image This function runs ``sha256sum -c`` on ``self.build_dir/'base_image.iso'`` """ # TODO: refactor out echo cmd = ('echo "'+self.base_image['hash']+" *"+self.base_image['file']+'"' + ' | sha256sum -c') _logger.debug(cmd) r = _delegator.run(cmd, cwd=str(self.base_image['dir'])) valid = self.base_image['file']+": OK" if r.return_code == 0: for line in r.out.split('\n'): _logger.debug(line) if valid == line: return_msg = "OK" _logger.info("Verify: " + return_msg) _os.symlink(str(self.base_image['path']),str(self.build_dir/'base_image.iso')) return return_msg = "ERROR" _logger.error("Can not verify " + self.file['file']) _sys.exit(_os.EX_DATAERR)
[docs] def ubuntu_update(self): # Update SHA256SUM # curl --list-only ftp://releases.ubuntu.com/releases/16.04/ # curl -s --remote-name http://releases.ubuntu.com/xenial/SHA256SUMS.gpg # http://old-releases.ubuntu.com/releases/xenial/SHA256SUMS # http://releases.ubuntu.com/xenial/ubuntu-16.04.6-desktop-amd64.iso print("STUB ubuntu_update()")