#!/usr/bin/python3 # OpenVPN 3 Linux client -- Next generation OpenVPN client # # SPDX-License-Identifier: AGPL-3.0-only # # Copyright (C) 2017 - 2023 OpenVPN Inc # Copyright (C) 2017 - 2023 David Sommerseth # ## # @file openvpn3-autoload # # @brief This will load all configuration files (.ovpn/.conf) found under a # predefined directory. It is a requirement that the configuration file # is accompanied by an .autoload file with the same basename to be # processed import sys import os import argparse import json import dbus import time from pathlib import Path import openvpn3 from openvpn3 import StatusMajor, StatusMinor # Add the traceback module if we're in debugging mode. # This will allow dumping of tracebacks when exceptions happens if 'OPENVPN3_DEBUG' in os.environ: import traceback ## # Parses all .ovpn/.conf files in the provided directory which also # has a corresponding .autload file present. The result is loaded and parsed # and returned in a dictionary, indexed by the base filename processed. # def find_autoload_configs(srcdir): # File extensions we support for autoloading extensions = ('.ovpn', '.conf') # Get a list of all files with the provided extensions # in the source directory path = Path(srcdir) files = [] for ext in extensions: files += list(path.glob('*' + ext)) # Parse the list of files and check that a corresponding .autoload # file exists with a valid JSON format autoload_cfgs = {} for cfg in sorted([f for f in files if f.exists() and f.is_file()]): # Extract the base filename without the .conf/.ovpn extension cfgnam = str(cfg) for ext in extensions: # Find the base name without extension pos = cfgnam.rfind(ext) if pos < 1: continue alfilen = Path(cfgnam[:pos] + '.autoload') if alfilen.exists(): # Only continue processing if a corresponding .autoload file # exists try: # Parse the .autoload file - JSON formatted autoload = json.load(open(str(alfilen.resolve()))) # Load the OpenVPN configuration opts = openvpn3.ConfigParser(['openvpn3-autoload', '--cd', os.path.dirname(cfgnam), '--config', os.path.basename(cfgnam)], 'OpenVPN 3 autoloader') opts.SanityCheck() # Save all parsed/loaded data into a dictionary autoload_cfgs[os.path.basename(cfgnam)] = { 'config': opts.GenerateConfig(), 'autoload': autoload } except TypeError as e: print('Could not parse "%s": %s' % (cfgnam, str(e))) except ValueError as e: print('Invalid JSON data in "%s": %s' % (alfilen, str(e))) except Exception as e: print('Error: ' + str(e)) return autoload_cfgs ## # Imports a parsed configuration file to the OpenVPN 3 Configuration Manager # This will return with the D-Bus path for the imported configuration on # success # def import_configuration(cfgmgr, cfgname, autoloadcfg, cfg): cfgobj = cfgmgr.Import(cfgname, # human readable config name cfg, # configuration profile as a string False, # Single-use config? No. False) # Persistent config? No. # If we have a list of flags, set these flags in the configuration object if autoloadcfg['flags'] and len(autoloadcfg['flags']) > 0: # Enable the provided flags. These flags are only boolean flags for flag in autoloadcfg['flags']: cfgobj.SetProperty(flag, True) for (k, v) in autoloadcfg['override'].items(): cfgobj.SetOverride(k, v) return cfgobj ## # Converts various .autoload flags into flags the property values to be # set in the OpenVPN 3 Configuration Manager for a specific config # def parse_autoload_config(autoload): autoloadcfg = {'flags': [], 'override': {}} if 'name' in autoload: autoloadcfg['name'] = autoload['name'] if 'acl' in autoload: for (k, v) in autoload['acl'].items(): if not v: continue if 'public' == k: autoloadcfg['flags'].append('public_access') if 'locked-down' == k: autoloadcfg['flags'].append('locked_down') if 'set-owner' == k: autoloadcfg['owner'] = v if 'remote' in autoload: for (k, v) in autoload['remote'].items(): if not v: continue if 'proto-override' == k: if v in ('udp', 'tcp'): autoloadcfg['override']['proto-override'] = v else: print('WARNING: Invalid proto-override value: %s' % v) elif 'port-override' == k: autoloadcfg['override']['port-override'] = int(v) elif 'compression' == k: if v in ('no', 'asym', 'yes'): autoloadcfg['override']['allow-compression'] = v else: print('WARNING: Invalid compression value: %s' % v) elif 'proxy' == k: for (pk, pv) in v.items(): if 'host' == pk: autoloadcfg['override']['proxy-host'] = pv elif 'port' == pk: autoloadcfg['override']['proxy-port'] = pv elif 'username' == pk: autoloadcfg['override']['proxy-username'] = pv elif 'password' == pk: autoloadcfg['override']['proxy-password'] = pv elif 'allow-plain-text' == pk: autoloadcfg['override']['proxy-auth-cleartext'] = pv if 'crypto' in autoload: for (k, v) in autoload['crypto'].items(): if not v: continue elif 'tls-params' == k: for (tpk, tpv) in v.items(): if not tpv: continue if 'cert-profile' == tpk: if tpv in ('legacy', 'preferred', 'suiteb'): autoloadcfg['override']['tls-cert-profile'] = tpv else: print('WARNINIG: Invalid tls-params:cert-profile value: %s' % tpv) elif 'min-version' == tpk: if tpv in ('tls_1_0', 'tls_1_1', 'tls_1_2', 'tls_1_3'): autoloadcfg['override']['tls-version-min'] = tpv else: print('WARNING: Invalid tls-params:min-version value: %s' % tpv) if 'tunnel' in autoload: for (k, v) in autoload['tunnel'].items(): if not v: continue if 'persist' == k: autoloadcfg['override']['persist-tun'] = True elif 'ipv6' == k: if v in ('yes', 'no', 'default'): autoloadcfg['override']['ipv6'] = v else: print('WARNING: Invalid tunnel:ipv6 value: %s' % v) elif 'dns-fallback' == k: if 'google' == v: autoloadcfg['override']['dns-fallback-google'] = True else: print("WARNING: Invalid tunnel:dns-fallback value: %s" % v) elif 'dns-scope' == k: autoloadcfg['override']['dns-scope'] = v elif 'dns-setup-disabled' == k: autoloadcfg['override']['dns-setup-disabled'] = v return autoloadcfg ## # Starts a new VPN tunnel based on the already imported configuration object # def start_tunnel(sessmgr, cfgobj, autoload): cfgn = cfgobj.GetProperty('name') sess = sessmgr.NewTunnel(cfgobj) ready = False conn_started = False while not ready: try: # See of the backend process is ready to start a connection sess.Ready() # Only try to call the Connect() method # if it hasn't been done yet if not conn_started: sess.Connect() conn_started = True # Check the session status and only report unexpected statuses status = sess.GetStatus() if StatusMajor.CONNECTION != status['major'] and (StatusMinor.CONNECTING != status['minor'] or StatusMinor.CONNECTED == status['minor']): print('[%s] Status: %s/%s' % (cfgn, status['major'].name, status['minor'].name)) ready = True except dbus.exceptions.DBusException as excep: err = str(excep) # If the VPN backend needs user credentials, retrieve that # from the autoload configuration for this VPN configuration if err.find(' Missing user credentials') > 0: conn_started = False if 'user-auth' not in autoload: sess.Disconnect() raise Exception('Configuration "%s" requires user/password authentication, but .autoload is not ' ' configured with credentials. Auto-start ignored.' % cfgn) userauth = autoload['user-auth'] for ui in sess.FetchUserInputSlots(): varn = ui.GetVariableName() if varn not in userauth: sess.Disconnect() raise Exception('The .autoload config for "%s" is lacking details for "%s". Auto-start ' 'ignored.' % (cfgn, varn)) ui.ProvideInput(userauth[varn]) # If the VPN backend doesn't reply soon enough, try again in a bit elif (err.find('org.freedesktop.DBus.Error.NoReply') > -1 or err.find('Backend VPN process is not ready') > -1): time.sleep(1) # All other exceptions are re-thrown else: raise excep return sess.GetPath() ## # The real main function for the autoloader # def autoload_main(cmdargs): argp = argparse.ArgumentParser(prog=cmdargs[0], description='OpenVPN 3 Configuration Loader') argp.add_argument('--directory', metavar='DIR', action='store', help='Directory to process') argp.add_argument('--ignore-autostart', action='store_true', help='Do not automatically start configurations') opts = argp.parse_args(cmdargs[1:]) exit_code = 0 # Presume all goes fine # Get a connection to the D-Bus' system bus and connect to the # OpenVPN 3 manager services bus = dbus.SystemBus() sessionmgr = openvpn3.SessionManager(bus) configmgr = openvpn3.ConfigurationManager(bus) # Process the autoload configuration directory for (cfgname, autocfg) in find_autoload_configs(opts.directory).items(): # Extract a list of properties with flags to set autoloadcfg = parse_autoload_config(autocfg['autoload']) # Import the configuration to the configuration manager, the # result value is the D-Bus path to the imported config cfgobj = import_configuration(configmgr, ('name' in autoloadcfg and autoloadcfg['name'] or cfgname), autoloadcfg, autocfg['config']) print('Configuration "%s" imported: %s [%s]' % (cfgname, cfgobj.GetPath(), ', '.join(autoloadcfg['flags']))) # Should this configuration be automatically started too? sesspath = None if 'autostart' in autocfg['autoload'] and autocfg['autoload']['autostart']: if not opts.ignore_autostart: try: sesspath = start_tunnel(sessionmgr, cfgobj, autocfg['autoload']) print('Auto-started "%s": %s' % (cfgname, sesspath)) except Exception as excp: print('WARNING: ' + str(excp)) print('WARNING: This configuration may not have been auto-started.') exit_code = 1 else: print('Auto-start of "%s" was ignored.' % cfgname) # The transfer ownership feature is restricted to root only, so only # attempt this if this script is run as root. This transfer is also # done after the a possible auto-start of the tunnel, as that needs to # be done by the root user. if 'acl' in autocfg['autoload'] and 'set-owner' in autocfg['autoload']['acl'] and 0 == os.geteuid(): if sesspath is not None: sessionmgr.TransferOwnership(sesspath, autocfg['autoload']['acl']['set-owner']) configmgr.TransferOwnership(cfgobj.GetPath(), autocfg['autoload']['acl']['set-owner']) return exit_code if __name__ == '__main__': ret = 9 try: ret = autoload_main(sys.argv) except KeyboardInterrupt: print('') ret = 8 except Exception as e: print('\n** ERROR ** %s' % str(e)) ret = 2 if 'OPENVPN3_DEBUG' in os.environ: print('\nmain traceback:') print(traceback.format_exc()) sys.exit(ret)