#!/usr/bin/python3 # OpenVPN 3 Linux client -- Next generation OpenVPN client # # SPDX-License-Identifier: AGPL-3.0-only # # Copyright (C) 2017 - 2023 OpenVPN Inc # Copyright (C) 2017 - 2023 David Sommerseth # ## # @file openvpn2 # # @brief This is an attempt to make a simplistic front-end user interface # for the OpenVPN 3 D-Bus based VPN client which feels more like the # well known and classic OpenVPN 2.x client. # # Only client side options are supported, and only the features which # have been made available in the OpenVPN 3 Core library. import sys if sys.version_info[0] < 3: print("Python 3.x is required") sys.exit(1) import datetime import dbus import os import time import webbrowser from dbus.mainloop.glib import DBusGMainLoop from gi.repository import GLib from getpass import getpass # Make all defined constants and classes part if this main module from openvpn3 import StatusMajor, StatusMinor from openvpn3 import ClientAttentionType, ClientAttentionGroup from openvpn3 import ConfigParser from openvpn3 import ConfigurationManager, Configuration from openvpn3 import SessionManager, Session # Add the traceback module if we're in debugging mode. # This will allow dumping of tracebacks when exceptions happens if 'OPENVPN3_DEBUG' in os.environ: import traceback # Global variables mainloop = GLib.MainLoop() gl_sessionobj = None def import_config(bus, opts): cfgmgr = ConfigurationManager(bus) # Only preserver the imported configuration if we're debugging single_use = True if 'OPENVPN3_DEBUG' in os.environ: single_use = False cfgname = opts.GetConfigName() cfg = cfgmgr.Import(cfgname and cfgname or '(command line)', # name opts.GenerateConfig(), # config_str single_use, # single_use False) # persistent if opts.GetPersistTun() is True: # If --persist-tun was provided, set the persist-tun override # in the imported configuration cfg.SetOverride('persist-tun', True) try: for (k, v) in opts.GetOverrides().items(): cfg.SetOverride(k, v) except dbus.exceptions.DBusException as excp: cfg.Remove() raise Exception('Incorrect override: %s=%s' % (k, v)) if not single_use: print('Configuration path: %s' % cfg.GetPath()) return cfg def open_web_auth(url): print(' ########################') print(' ## WEB AUTHENTICATION ##') print(' ########################\n') print() print(' Web based authentication is required to establish the connection.') if webbrowser.open_new(url): print(' A browser window should have been opened on your system.') else: print(' Open this URL in a browser: ') print(' ' + url); print() ## # Simple Log signal callback function. Called each time a Log event # happens on this session. # def LogHandler(group, catg, msg): loglines = [l for l in msg.split('\n') if len(l) > 0] if len(loglines) < 1: return print('%s [LOG] %s' % (datetime.datetime.now(), loglines[0])) for line in loglines[1:]: print('%s%s' % (' ' * 33, line)) ## # Simple StatusChange callback function. Called each time a StatusChange # event happens on this session. # # This handler will also shutdown this currently running session if # the session is disconnected outside of this program # def StatusHandler(major, minor, msg): maj = StatusMajor(major) min = StatusMinor(minor) if StatusMajor.SESSION == maj and StatusMinor.SESS_AUTH_URL == min: open_web_auth(msg) else: print('%s [STATUS] (%s, %s) %s' % (datetime.datetime.now(), maj, min, msg)) # Session got most likely disconnected outside of this program if StatusMajor.SESSION == maj and StatusMinor.PROC_STOPPED == min: mainloop.quit() if StatusMajor.CONNECTION == maj and StatusMinor.CONN_AUTH_FAILED == min: mainloop.quit() if StatusMajor.CONNECTION == maj and StatusMinor.CONN_FAILED == min: mainloop.quit() if StatusMajor.CONNECTION == maj and StatusMinor.CONN_DISCONNECTED == min: mainloop.quit() if StatusMajor.CONNECTION == maj and StatusMinor.CFG_REQUIRE_USER == min: try: global gl_sessionobj request_credentials(gl_sessionobj) gl_sessionobj.Connect() except RuntimeError: mainloop.quit() ## # Main function for the foreground run mode of openvpn2. Log and StatusChange # events will be printed to the terminal as they occur until CTRL-C or # the session is closed outside of this program # def fetch_logs_loop(sessobj, verb): global gl_sessionobj gl_sessionobj = sessobj sessobj.LogCallback(LogHandler) sessobj.StatusChangeCallback(StatusHandler) sessobj.SetProperty('log_verbosity', dbus.UInt32(verb)) print("Press CTRL-C to stop the connection\n") try: sessobj.Connect() # Start the connection mainloop.run() # Start listening for signals print("\nSession manager initiated disconnect...") try: # Ensure the session is really closed sessobj.Disconnect() except dbus.exceptions.DBusException as excep: pass except KeyboardInterrupt: print("\nDisconnecting...") try: sessobj.LogCallback(None) except Exception: # If anything happens in this call, ignore it. # Hopefully it already gets cleaned up elsewhere pass try: print(sessobj.GetFormattedStatistics()) sessobj.Disconnect() except dbus.exceptions.DBusException as e: if str(e).split(':')[0] == 'org.freedesktop.DBus.Error.UnknownMethod': pass else: raise e print('Disconnected') return 0 def request_credentials(sessionobj): # User credentials comes in tuples of (type, group). # Type should always be 1 - that is the type for # user credentials. for input_slot in sessionobj.FetchUserInputSlots(): # Skip non-user credentials requests if input_slot.GetTypeGroup()[0] != ClientAttentionType.CREDENTIALS: continue try: if False == input_slot.GetInputMask(): response = input(input_slot.GetLabel() + ': ') else: response = getpass(input_slot.GetLabel() + ': ') except KeyboardInterrupt: # Shutdown the backend client print("\nAborting") sessionobj.Disconnect() raise RuntimeError("Aborting") input_slot.ProvideInput(response) ## # Starts a new VPN session for a specific configuration # # This is the main entry point for establishing new tunnels and it # handles both background and forground run modes # def start_tunnel(bus, cfgobj, opts): # Main session manager object sessmgr = SessionManager(bus) # Prepare the tunnel sessionobj = sessmgr.NewTunnel(cfgobj) # # Starting the tunnel ... # done = False connected = False status = None exit_code = 0 dco_configured = False while done is False: try: # Check if the backend needs something more from us ... sessionobj.Ready() # This throws an exception if not ready # Is DCO enabled? if not dco_configured and opts.GetDataChannelOffload(): sessionobj.SetDCO(opts.GetDataChannelOffload()) dco_configured = True # If not running in the background, print the log events # to the terminal. When the fetch_logs_loop() function returns, # the connection is already disconnecte and we can just return # # This approach will start a GLib main loop which takes care of # the main programs execution # if not opts.GetDaemon(): return fetch_logs_loop(sessionobj, opts.GetLogVerbosity()) # ... and if this session is to be run in the # background when connected ... # Start the VPN connection sessionobj.Connect() # Simple poll routine to catch the situation of the tunnel # The 'status' property in a session object contains the last # status update sent by the VPN backend connected = False connecting_prompt = False while done is False and connected is False: time.sleep(1) # Don't race too quickly forward status = sessionobj.GetStatus() if StatusMajor.CONNECTION == status['major']: if StatusMinor.CONN_CONNECTING == status['minor']: if connecting_prompt is False: sys.stdout.write('Connecting ...') sys.stdout.flush() connecting_prompt = True else: sys.stdout.write('.') sys.stdout.flush() elif StatusMinor.CONN_CONNECTED == status['minor']: print('\nConnected') connected = True elif status['minor'] in (StatusMinor.CONN_DISCONNECTED, StatusMinor.CONN_DONE): s = len(status['message']) > 0 \ and status['message'] or '(No information)' print('\nConnection got disconnected: ' + s) done = True elif StatusMinor.CFG_REQUIRE_USER == status['minor']: connected = False elif StatusMinor.CONN_AUTH_FAILED == status['minor']: print('Authentication failed') done = True exit_code = 3 elif StatusMinor.CONN_FAILED == status['minor']: print('Connection failed') done = True exit_code = 3 else: # Not handled status codes print('{Status: major=%s, minor=%s, message="%s"}' % (status['major'].name, status['minor'].name, status['message'])) exit_code = 7 elif StatusMajor.SESSION == status['major']: if StatusMinor.SESS_AUTH_URL: connected = True; if connected is False and done is False: # Check if everything is okay. If authentication # failed, or a dynamic challenge is sent, then an # exception is thrown again sessionobj.Ready() if connected: # Print important details to the terminal ... print('VPN session is running in the background.') print('Session path: ' + sessionobj.GetPath()) print() if StatusMajor.SESSION == status['major'] \ and StatusMinor.SESS_AUTH_URL == status['minor']: open_web_auth(status['message']) print('Use the \'openvpn3 session-manage\' command line ' + 'tool to manage this session.') print('See \'openvpn3 session-manage --help\' and ' +'\'openvpn3 sessions-list\' for more details') print() print('To disconnect and stop this VPN session run:') print() print(' $ openvpn3 session-manage --session-path ' + sessionobj.GetPath() + ' --disconnect') print() done = True except dbus.exceptions.DBusException as excep: # Do we need provide user credentials? if str(excep).find(' Missing user credentials') > 0: print('Credentials needed') try: request_credentials(sessionobj) except RuntimeError: return 8; elif str(excep).find(' Backend VPN process has died') > 1: if 3 == exit_code: # Authentication failed, this is expected break if 'OPENVPN3_DEBUG' in os.environ: print ('\nstart_tunnel() [1] traceback:\n') print (traceback.format_exc()) raise Exception('VPN backend process stopped unexpectedly. ' + 'Session has closed. :: ' + str(excep)) elif str(excep).find('Backend VPN process is not ready') > 1: # Wait for the backend to settle time.sleep(0.5) elif str(excep).find('ERR_PROFILE_SERVER_LOCKED_UNSUPPORTED:') > 1: sessionobj.Disconnect() raise Exception('Server locked profiles are not supported') else: if 'OPENVPN3_DEBUG' in os.environ: print ('\nstart_tunnel() [2] traceback:\n') print (traceback.format_exc()) # Other exceptions are re-thrown raise excep except KeyboardInterrupt: if connected is True: # If connected, retrieve and dump the last connection # statistics print(sessionobj.GetFormattedStatistics()) print("\nClosing session") connected = False try: sessionobj.Disconnect() except KeyboardInterrupt: done = True except RuntimeError: done = True return exit_code def openvpn2_main(args): ret = 9 try: if len(args) < 2: raise Exception('No options provided. See "%s --help" ' 'for details.' % args[0]) # Simulate the classic OpenVPN 2.x behaviour where it can take a # single file name as the argument, which is the config file name if len(args) == 2 and '--' != args[1][0:2]: args.insert(1, '--config') # Parse provided provided on the command line opts = ConfigParser(args, 'OpenVPN 2.x wrapper for OpenVPN 3') opts.SanityCheck() if ('OPENVPN3_DEBUG' in os.environ and 'DUMP_PARSE' == os.environ['OPENVPN3_DEBUG']): print (opts.GenerateConfig()) return 0 # Get a connection to the D-Bus' system bus dbusloop = DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus(mainloop=dbusloop) # Import the configuration, which is stored in the # configuration manager cfgobj = import_config(bus, opts) # Start the tunnel, which his managed by the session manager ret = start_tunnel(bus, cfgobj, opts) except KeyboardInterrupt: print("") ret = 8 except Exception as e: print ('\n** ERROR ** %s' % str(e)) ret = 2 if 'OPENVPN3_DEBUG' in os.environ: print ('\nmain traceback:') print (traceback.format_exc()) return ret if __name__ == "__main__": sys.exit(openvpn2_main(sys.argv))