自学内容网 自学内容网

pyhton django web集群基于linux定时任务

基于django management/commands目录下的脚本

from django.core.management import BaseCommand
import logging
import uuid
from pia.utils.cache import reset_redis_expire
from pia.utils.reids_key import TASK_KEY

logging = logging.getLogger('task')

"""
定时任务: 定时任务 看门狗续时
"""


class Command(BaseCommand):

    def add_arguments(self, parser):
        parser.add_argument('trace_id', type=str)

    def handle(self, *args, **options):
        trace_id = options["trace_id"]
        if not trace_id:
            trace_id = str(uuid.uuid4())
        self.trace_id = trace_id

        self.writeLog('watchdog continuation start')
        reset_redis_expire(TASK_KEY + "privacy_detect", 80)
        reset_redis_expire(TASK_KEY + "privacy_analysis", 80)
        reset_redis_expire(TASK_KEY + "privacy_notify", 80)
        reset_redis_expire(TASK_KEY + "tx_organization", 80)
        reset_redis_expire(TASK_KEY + "send_message", 80)
        reset_redis_expire(TASK_KEY + "todo_notify", 80)
        reset_redis_expire(TASK_KEY + "thirdparty_ex_notify", 80)
        reset_redis_expire(TASK_KEY + "cookies_scan", 80)
        reset_redis_expire(TASK_KEY + "xuanwu_app_scan", 80)
        self.writeLog('watchdog continuation end')

    def writeLog(self, msg: str):
        logging.info(f'[{self.trace_id}] {msg}')
import sys
import os
from pathlib import Path
from tasks import CrontabTask

# 添加 Django 项目的根目录到 Python 路径
django_project_path = Path(__file__).resolve().parent.parent.parent
sys.path.append(str(django_project_path))
# 设置 DJANGO_SETTINGS_MODULE 环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tencheck.settings')
if __name__ == '__main__':
    """总入口 脚本启动信息"""
    # crontab任务 redis key
    redis_key = sys.argv[1]
    # crontab任务名称
    task_name = sys.argv[2]
    # 环境yaml文件
    env = sys.argv[3]
    env_name = sys.argv[4]
    config_name = sys.argv[5]
    rainbow_key = sys.argv[6]
    os.environ.setdefault('ENV_NAME', env_name)
    os.environ.setdefault('CONFIG_NAME', config_name)
    os.environ.setdefault('RAINBOW_KEY', rainbow_key)

    task = CrontabTask(env=env, redis_key=redis_key, task_name=task_name)
    task.start()
from pathlib import Path
import os
import sys
import logging
import time
import traceback
import uuid
import yaml
from django.core.cache import cache

TASK_KEY = "pia:crontab:"


class BaseTask:

    def __init__(self, *args, **kwargs):
        self.env = kwargs.get('env')
        self.task_name = kwargs.get('task_name')
        self.redis_key = kwargs.get('redis_key')
        self.workspace = Path(__file__).resolve().parent.parent.parent
        self.config = self.load_config()
        self.pid = os.getpid()
        self.trace_id = str(uuid.uuid4())

    def load_config(self):
        """ 读取配置文件 """
        config = {}
        with open(os.path.join(self.workspace, f"task/conf/config-{self.env}.yaml"), 'r') as f:
            config = yaml.safe_load(f)
        return config

    def writelog(self, text: str) -> None:
        """ 记录日志 """
        log_path = self.config['log_path']
        formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")
        file_handler = logging.FileHandler(log_path + "/crontab_task.log")  # 日志目录
        file_handler.setFormatter(formatter)
        logger = logging.getLogger()
        logger.addHandler(file_handler)
        logger.setLevel(logging.DEBUG)
        text = time.strftime(f'[PID:{self.pid}] ', time.localtime(time.time())) + text
        logger.info(f'[{self.trace_id}] ' + text)  # 写入日志
        logger.removeHandler(file_handler)
        file_handler.close()

    def script_err_report(push_user: list, info: str) -> None:
        """ 脚本告警信息 """
        # writelog(info)
        pass

    def start_crontab(self) -> int:
        """redis控制脚本执行"""
        try:
            redis_key = self.redis_key
            """保证原子性 True if lock acquired, False otherwise"""
            task_status = cache.add(TASK_KEY + redis_key, True, 80)
            if task_status:
                self.writelog(f"task  set redis key {TASK_KEY + redis_key}")
                return 0
            else:
                self.writelog(f"task redis key {TASK_KEY + redis_key} exists")
                return 1

        except Exception:
            self.writelog(f"redis connect fail {traceback.format_exc()}")

    def end_crontab(self) -> None:
        """ 清除redis信息 """
        try:
            redis_key = self.redis_key
            cache.delete(TASK_KEY + redis_key)
            self.writelog(f"clean redis key  {TASK_KEY + redis_key}")
        except Exception:
            self.writelog(f"redis connect fail {traceback.format_exc()}")

    def start(self):
        start_time = time.time()
        execute = False
        try:
            self.writelog(f"task {self.task_name} start")
            if self.start_crontab() != 0:
                error_info = f"task {self.task_name} not finish!"
                self.writelog(error_info)
                sys.exit(0)
            execute = True
            self.do_work()

        except Exception as e:
            result_err_info = traceback.format_exc()
            self.writelog(f"{self.task_name} ERROR: {result_err_info}")
            # 告警
            # self.script_err_report(SCRIPT_ERR_REPORT_USER, result_err_info)
        finally:
            end_time = time.time()
            self.writelog(f"{self.task_name} cost time {end_time - start_time}")
            if execute:
                self.end_crontab()

    def do_work(self):
        """ 脚本核心功能主流程 """
        # self.writelog("privacy analysis task start")
        # cmds = [f'cd {WORKSPACE_DIR} && {self.config["privacy_detect"]["python_path"]} manage.py privacy_analysis {self.trace_id}']
        # retcode = subprocess.call(cmds, shell=True)
        # self.writelog(f"privacy analysis task end retcode:{retcode}")
import subprocess
from base_task import BaseTask


class CrontabTask(BaseTask):

    def do_work(self):
        config = self.config
        task_name = self.task_name
        workspace = self.workspace
        self.writelog(f"{task_name} task start")
        cmds = [f'cd {workspace} && {config["python_path"]} manage.py {task_name} {self.trace_id}']
        retcode = subprocess.call(cmds, shell=True)
        self.writelog(f"{task_name} task end retcode:{retcode}")


将任务添加到Linux服务器的 crontab里面

# argv[1]------redis_key ; argv[2]------task_name即 commands目录下 crontab_前缀的文件 ; argv[3]------配置文件 ; argv[4]------环境变量 ; argv[5]------组  ; argv[6]------通道

*/1 * * * * cd /usr/local/oit/tencheck && ../env/bin/python3.11 task/script/start_task.py watchdog crontab_watchdog tx xx xx xx >/dev/null 2>&1 &


原文地址:https://blog.csdn.net/weixin_39184713/article/details/143884489

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!