관리-도구
편집 파일: cleanup.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import contextlib import time from datetime import timedelta from typing import Generator from threading import Event import sqlalchemy as sa import sqlalchemy.exc from lvestats.orm import bursting_events_table from ..common import Timestamp from .._logs import logger from .base import thread_running def cleanup_old_events( engine: sa.engine.Engine, server_id: str, cutoff: Timestamp, ) -> None: stmt = sa.delete(bursting_events_table).where(sa.and_( bursting_events_table.c.timestamp <= cutoff, bursting_events_table.c.server_id == server_id, )) engine.execute(stmt) @contextlib.contextmanager def cleanup_running( engine: sa.engine.Engine, server_id: str, cleanup_interval: timedelta, history_window: timedelta, run_period: timedelta = timedelta(seconds=5), fail_fast: bool = True, ) -> Generator[None, None, None]: def main(terminate: Event): # FIXME(vlebedev): It will take ~`dump_period` in the worst case for thread to respond to termination request. # Loop more frequently? prev_db_write_time = 0.0 while not terminate.is_set(): now = time.time() if (now - prev_db_write_time) > cleanup_interval.total_seconds(): cutoff = Timestamp(int(now - history_window.total_seconds())) try: cleanup_old_events(engine, server_id, cutoff) except sqlalchemy.exc.DBAPIError as e: if fail_fast: raise e logger.error('Failed to cleanup bursting events from DB!', exc_info=e) else: prev_db_write_time = now time.sleep(run_period.total_seconds()) logger.debug('Stopping events cleanup thread.') with thread_running('bursting-cleanup', main): yield