Module ambianic.server
Main Ambianic server module.
Expand source code
"""Main Ambianic server module."""
import logging
import logging.handlers
import os
import time
from ambianic import logger
from ambianic.configuration import get_all_config_files, get_root_config, reload_config
from ambianic.pipeline import pipeline_event
from ambianic.pipeline.interpreter import PipelineServer
from ambianic.util import ServiceExit
from watchdog.observers import Observer
log = logging.getLogger(__name__)
AI_MODELS_DIR = "ai_models"
MANAGED_SERVICE_HEARTBEAT_THRESHOLD = 180 # seconds
MAIN_HEARTBEAT_LOG_INTERVAL = 5
ROOT_SERVERS = {
"pipelines": PipelineServer,
# web server is now started as a separted uvicorn process from the OS shell
# "web": FastapiServer,
}
class AmbianicServer:
"""Ambianic main server."""
def __init__(self, work_dir=None):
"""Inititalize server from working directory files.
:Parameters:
----------
work_dir : string
The working directory where config and data reside.
"""
assert work_dir
self._env_work_dir = work_dir
# array of managed specialized servers
self._servers = {}
self._service_exit_requested = False
self._service_restart_requested = False
self._latest_heartbeat = time.monotonic()
self._config_observer = None
def stop_watch_config(self):
if self._config_observer:
self._config_observer.unschedule_all()
self._config_observer.stop()
self._config_observer.join()
self._config_observer = None
def start_watch_config(self):
if self._config_observer:
self.stop_watch_config()
self._config_observer = Observer()
config_paths = get_all_config_files()
for filepath in config_paths:
if not os.path.exists(filepath):
log.warning("File %s not found, skip changes watch" % filepath)
continue
log.info("Watching %s for changes" % filepath)
self._config_observer.schedule(self, filepath, recursive=False)
self._config_observer.start()
def _stop_servers(self, servers):
log.debug("Stopping servers...")
for name, srv in servers.items():
srv.stop()
srv = None
servers[name] = None
def _healthcheck(self, servers):
"""Check the health of managed servers."""
for s in servers.values():
latest_heartbeat, _ = s.healthcheck()
now = time.monotonic()
lapse = now - latest_heartbeat
if lapse > 1:
# log only if lapse is over 1 second long.
# otherwise things are OK and we don't want
# unnecessary log noise
log.debug("lapse for %s is %f", s.__class__.__name__, lapse)
if lapse > MANAGED_SERVICE_HEARTBEAT_THRESHOLD:
log.warning(
'Server "%s" is not responsive. '
"Latest heart beat was %f seconds ago. "
"Will send heal signal.",
s.__class__.__name__,
lapse,
)
s.heal()
def _log_heartbeat(self):
log.info("Main thread alive.")
def _heartbeat(self):
new_time = time.monotonic()
# print a heartbeat message every so many seconds
if new_time - self._latest_heartbeat > MAIN_HEARTBEAT_LOG_INTERVAL:
self._log_heartbeat()
# this is where hooks to external
# monitoring services will come in
self._latest_heartbeat = new_time
if self._service_exit_requested:
raise ServiceExit
def dispatch(self, event):
"""Callback called by watchdog.Observer when a config file changes"""
log.info("Configuration file changed, stopping Ambianic server")
self.restart()
def restart(self):
self._service_restart_requested = True
self.stop()
def start(self):
"""Programmatic start of the main service."""
assert os.path.exists(self._env_work_dir)
config = get_root_config()
logger.configure(config.get("logging"))
# dynamically (re)load fresh configuration settings
log.debug("server start: before config reload")
reload_config()
log.debug("server start: after config reload")
# Re-configure logging in case config file just changed
# on disk and caused config reload.
logger.configure(config.get("logging"))
pipeline_event.configure_timeline(config.get("timeline"))
# watch configuration changes
self.start_watch_config()
log.info("Starting Ambianic server...")
# Register the signal handlers
servers = {}
# Start the job threads
try:
for s_name, s_class in ROOT_SERVERS.items():
srv = s_class(config=config)
srv.start()
servers[s_name] = srv
self._latest_heartbeat = time.monotonic()
self._servers = servers
# Keep the main thread running, otherwise signals are ignored.
while True:
time.sleep(0.5)
self._healthcheck(servers)
self._heartbeat()
except ServiceExit:
log.info("Service exit requested.")
# stop servers and cleanup references
self._stop_servers(servers)
self._servers = {}
self._service_exit_requested = False
# stop watching config files
self.stop_watch_config()
if self._service_restart_requested:
self._service_restart_requested = False
log.info("Restarting Ambianic server.")
return self.start()
log.info("Exiting Ambianic server.")
return True
def stop(self):
"""Programmatic stop of the main service."""
log.info("Stopping server...")
self._service_exit_requested = True
Classes
class AmbianicServer (work_dir=None)
-
Ambianic main server.
Inititalize server from working directory files.
:Parameters:
work_dir : string The working directory where config and data reside.
Expand source code
class AmbianicServer: """Ambianic main server.""" def __init__(self, work_dir=None): """Inititalize server from working directory files. :Parameters: ---------- work_dir : string The working directory where config and data reside. """ assert work_dir self._env_work_dir = work_dir # array of managed specialized servers self._servers = {} self._service_exit_requested = False self._service_restart_requested = False self._latest_heartbeat = time.monotonic() self._config_observer = None def stop_watch_config(self): if self._config_observer: self._config_observer.unschedule_all() self._config_observer.stop() self._config_observer.join() self._config_observer = None def start_watch_config(self): if self._config_observer: self.stop_watch_config() self._config_observer = Observer() config_paths = get_all_config_files() for filepath in config_paths: if not os.path.exists(filepath): log.warning("File %s not found, skip changes watch" % filepath) continue log.info("Watching %s for changes" % filepath) self._config_observer.schedule(self, filepath, recursive=False) self._config_observer.start() def _stop_servers(self, servers): log.debug("Stopping servers...") for name, srv in servers.items(): srv.stop() srv = None servers[name] = None def _healthcheck(self, servers): """Check the health of managed servers.""" for s in servers.values(): latest_heartbeat, _ = s.healthcheck() now = time.monotonic() lapse = now - latest_heartbeat if lapse > 1: # log only if lapse is over 1 second long. # otherwise things are OK and we don't want # unnecessary log noise log.debug("lapse for %s is %f", s.__class__.__name__, lapse) if lapse > MANAGED_SERVICE_HEARTBEAT_THRESHOLD: log.warning( 'Server "%s" is not responsive. ' "Latest heart beat was %f seconds ago. " "Will send heal signal.", s.__class__.__name__, lapse, ) s.heal() def _log_heartbeat(self): log.info("Main thread alive.") def _heartbeat(self): new_time = time.monotonic() # print a heartbeat message every so many seconds if new_time - self._latest_heartbeat > MAIN_HEARTBEAT_LOG_INTERVAL: self._log_heartbeat() # this is where hooks to external # monitoring services will come in self._latest_heartbeat = new_time if self._service_exit_requested: raise ServiceExit def dispatch(self, event): """Callback called by watchdog.Observer when a config file changes""" log.info("Configuration file changed, stopping Ambianic server") self.restart() def restart(self): self._service_restart_requested = True self.stop() def start(self): """Programmatic start of the main service.""" assert os.path.exists(self._env_work_dir) config = get_root_config() logger.configure(config.get("logging")) # dynamically (re)load fresh configuration settings log.debug("server start: before config reload") reload_config() log.debug("server start: after config reload") # Re-configure logging in case config file just changed # on disk and caused config reload. logger.configure(config.get("logging")) pipeline_event.configure_timeline(config.get("timeline")) # watch configuration changes self.start_watch_config() log.info("Starting Ambianic server...") # Register the signal handlers servers = {} # Start the job threads try: for s_name, s_class in ROOT_SERVERS.items(): srv = s_class(config=config) srv.start() servers[s_name] = srv self._latest_heartbeat = time.monotonic() self._servers = servers # Keep the main thread running, otherwise signals are ignored. while True: time.sleep(0.5) self._healthcheck(servers) self._heartbeat() except ServiceExit: log.info("Service exit requested.") # stop servers and cleanup references self._stop_servers(servers) self._servers = {} self._service_exit_requested = False # stop watching config files self.stop_watch_config() if self._service_restart_requested: self._service_restart_requested = False log.info("Restarting Ambianic server.") return self.start() log.info("Exiting Ambianic server.") return True def stop(self): """Programmatic stop of the main service.""" log.info("Stopping server...") self._service_exit_requested = True
Methods
def dispatch(self, event)
-
Callback called by watchdog.Observer when a config file changes
Expand source code
def dispatch(self, event): """Callback called by watchdog.Observer when a config file changes""" log.info("Configuration file changed, stopping Ambianic server") self.restart()
def restart(self)
-
Expand source code
def restart(self): self._service_restart_requested = True self.stop()
def start(self)
-
Programmatic start of the main service.
Expand source code
def start(self): """Programmatic start of the main service.""" assert os.path.exists(self._env_work_dir) config = get_root_config() logger.configure(config.get("logging")) # dynamically (re)load fresh configuration settings log.debug("server start: before config reload") reload_config() log.debug("server start: after config reload") # Re-configure logging in case config file just changed # on disk and caused config reload. logger.configure(config.get("logging")) pipeline_event.configure_timeline(config.get("timeline")) # watch configuration changes self.start_watch_config() log.info("Starting Ambianic server...") # Register the signal handlers servers = {} # Start the job threads try: for s_name, s_class in ROOT_SERVERS.items(): srv = s_class(config=config) srv.start() servers[s_name] = srv self._latest_heartbeat = time.monotonic() self._servers = servers # Keep the main thread running, otherwise signals are ignored. while True: time.sleep(0.5) self._healthcheck(servers) self._heartbeat() except ServiceExit: log.info("Service exit requested.") # stop servers and cleanup references self._stop_servers(servers) self._servers = {} self._service_exit_requested = False # stop watching config files self.stop_watch_config() if self._service_restart_requested: self._service_restart_requested = False log.info("Restarting Ambianic server.") return self.start() log.info("Exiting Ambianic server.") return True
def start_watch_config(self)
-
Expand source code
def start_watch_config(self): if self._config_observer: self.stop_watch_config() self._config_observer = Observer() config_paths = get_all_config_files() for filepath in config_paths: if not os.path.exists(filepath): log.warning("File %s not found, skip changes watch" % filepath) continue log.info("Watching %s for changes" % filepath) self._config_observer.schedule(self, filepath, recursive=False) self._config_observer.start()
def stop(self)
-
Programmatic stop of the main service.
Expand source code
def stop(self): """Programmatic stop of the main service.""" log.info("Stopping server...") self._service_exit_requested = True
def stop_watch_config(self)
-
Expand source code
def stop_watch_config(self): if self._config_observer: self._config_observer.unschedule_all() self._config_observer.stop() self._config_observer.join() self._config_observer = None