import asyncio
import logging
import sys
from copy import copy
from datetime import datetime
from logging import DEBUG, ERROR, FATAL, INFO, WARN, WARNING # noqa: F401
from threading import Lock, Thread
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Union
from grpclib.exceptions import StreamTerminatedError
import viam
if TYPE_CHECKING:
from .robot.client import RobotClient
LOG_LEVEL = INFO
LOGGERS: Dict[str, logging.Logger] = {}
_MODULE_PARENT: Optional["RobotClient"] = None
class _SingletonEventLoopThread:
_instance = None
_lock = Lock()
_ready_event = asyncio.Event()
_loop: Union[asyncio.AbstractEventLoop, None]
_thread: Thread
def __new__(cls):
# Ensure singleton precondition
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(_SingletonEventLoopThread, cls).__new__(cls)
cls._instance._loop = None
cls._instance._thread = Thread(target=cls._instance._run)
cls._instance._thread.start()
return cls._instance
def _run(self):
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._ready_event.set()
self._loop.run_forever()
def stop(self):
if self._loop is not None:
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join()
def get_loop(self):
if self._loop is None:
raise RuntimeError("Event loop is None. Did you call .start() and .wait_until_ready()?")
return self._loop
async def wait_until_ready(self):
await self._ready_event.wait()
class _ModuleHandler(logging.Handler):
_parent: "RobotClient"
_logger: logging.Logger
_worker: _SingletonEventLoopThread
def __init__(self, parent: "RobotClient"):
super().__init__()
self._parent = parent
self._logger = logging.getLogger("ModuleLogger")
addHandlers(self._logger, True)
self._logger.setLevel(self.level)
self._worker = _SingletonEventLoopThread()
def setLevel(self, level: Union[int, str]) -> None:
self._logger.setLevel(level)
return super().setLevel(level)
async def handle_task_result(self, task: asyncio.Task):
try:
_ = task.result()
except (asyncio.CancelledError, asyncio.InvalidStateError, StreamTerminatedError):
pass
def emit(self, record: logging.LogRecord):
assert isinstance(record, logging.LogRecord)
# Fully qualified name of form "{subtype triplet}/{name}", e.g. "rdk:component:arm/myarm"
name = record.name.replace(".", "/")
message = f"{record.filename}:{record.lineno}\t{record.getMessage()}"
stack = f"exc_info: {record.exc_info}, exc_text: {record.exc_text}, stack_info: {record.stack_info}"
time = datetime.fromtimestamp(record.created)
try:
loop = self._worker.get_loop()
asyncio.run_coroutine_threadsafe(
self._asynchronously_emit(record, name, message, stack, time),
loop,
)
except Exception as err:
# If the module log fails, log using stdout/stderr handlers
self._logger.error(f"ModuleLogger failed for {record.name} - {err}")
self._logger.log(record.levelno, message)
async def _asynchronously_emit(self, record: logging.LogRecord, name: str, message: str, stack: str, time: datetime):
await self._worker.wait_until_ready()
task = self._worker.get_loop().create_task(
self._parent.log(name, record.levelname, time, message, stack),
name=f"{viam._TASK_PREFIX}-LOG-{record.created}",
)
task.add_done_callback(lambda t: asyncio.run_coroutine_threadsafe(self.handle_task_result(t), self._worker.get_loop()))
def close(self):
self._worker.stop()
super().close()
class _ColorFormatter(logging.Formatter):
MAPPING = {
"DEBUG": 37, # white
"INFO": 36, # cyan
"WARNING": 33, # yellow
"ERROR": 31, # red
"CRITICAL": 41, # white on red bg
}
def __init__(self, pattern):
logging.Formatter.__init__(self, pattern)
def format(self, record):
colored_record = copy(record)
levelname = colored_record.levelname
seq = self.MAPPING.get(levelname, 37) # default white
colored_levelname = f"\x1b[33;{seq}m{levelname}\x1b[0m"
colored_record.levelname = colored_levelname
return super().format(colored_record)
[docs]def getLogger(name: str) -> logging.Logger:
logger = LOGGERS.get(name)
if logger:
return logger
logger = logging.getLogger(name)
logger.setLevel(LOG_LEVEL)
addHandlers(logger)
LOGGERS[name] = logger
return logger
[docs]def addHandlers(logger: logging.Logger, use_default_handlers=False):
_addHandlers([logger], use_default_handlers)
[docs]def update_log_level(logger: logging.Logger, level: Union[int, str]):
if level == "":
level = LOG_LEVEL
logger.setLevel(level)
for handler in logger.handlers:
handler.setLevel(level)
def _addHandlers(loggers: Iterable[logging.Logger], use_default_handlers=False):
format = _ColorFormatter("%(asctime)s\t\t" + "%(levelname)s\t" + "%(name)s (%(filename)s:%(lineno)d)\t" + "%(message)s\t")
handlers: List[logging.Handler] = []
std_handler = logging.StreamHandler(stream=sys.stdout)
std_handler.setFormatter(format)
# filter out logs at error level or above
std_handler.setLevel(LOG_LEVEL)
std_handler.addFilter(filter=lambda record: (record.levelno < ERROR))
err_handler = logging.StreamHandler(stream=sys.stderr)
err_handler.setFormatter(format)
# filter out logs below error level
err_handler.setLevel(max(ERROR, LOG_LEVEL))
if _MODULE_PARENT is not None and not use_default_handlers:
mod_handler = _ModuleHandler(_MODULE_PARENT)
mod_handler.setFormatter(format)
mod_handler.setLevel(LOG_LEVEL)
handlers = [mod_handler]
else:
handlers = [std_handler, err_handler]
for logger in loggers:
logger.handlers.clear()
if "viam.sessions_client" in LOGGERS and LOGGERS["viam.sessions_client"] == logger:
logger.addHandler(std_handler)
logger.addHandler(err_handler)
else:
for h in handlers:
logger.addHandler(h)
[docs]def setParent(parent: "RobotClient"):
global _MODULE_PARENT
_MODULE_PARENT = parent
_addHandlers(LOGGERS.values())
[docs]def setLevel(level: int):
global LOG_LEVEL
LOG_LEVEL = level
for logger in LOGGERS.values():
logger.setLevel(LOG_LEVEL)
_addHandlers(LOGGERS.values())
[docs]def silence():
setLevel(FATAL + 1)
[docs]def shutdown():
logging.shutdown()