移动 > 阿里

django+celery如何实现定时拉取阿里云rocketmq实例信息

8人参与 2025-07-02 阿里

一、项目初始化

1. 创建虚拟环境并安装依赖

# 创建虚拟环境
python3 -m venv env
source env/bin/activate

# 安装依赖
pip install django celery redis django-celery-beat aliyun-python-sdk-core-v3 aliyun-python-sdk-mq mysqlclient

2. 创建 django 项目和应用

# 创建项目
django-admin startproject rocketmq_manager
cd rocketmq_manager

# 创建应用
python manage.py startapp rocketmq

3. 配置 mysql 数据库(rocketmq_manager/settings.py)

databases = {
    'default': {
        'engine': 'django.db.backends.mysql',
        'name': 'rocketmq_manager',      # 数据库名
        'user': 'your_username',         # 用户名
        'password': 'your_password',     # 密码
        'host': 'localhost',             # 主机
        'port': '3306',                  # 端口
        'options': {
            'init_command': "set sql_mode='strict_trans_tables'",
        },
    }
}

4. 配置项目其他设置(rocketmq_manager/settings.py)

installed_apps = [
    # ...
    'django_celery_beat',
    'django_celery_results',
    'rocketmq',
]

# celery配置
celery_broker_url = 'redis://localhost:6379/0'
celery_result_backend = 'django-db'
celery_accept_content = ['json']
celery_task_serializer = 'json'
celery_result_serializer = 'json'
celery_timezone = 'asia/shanghai'

# 阿里云配置(从环境变量获取)
aliyun_access_key_id = os.environ.get('aliyun_access_key_id')
aliyun_access_key_secret = os.environ.get('aliyun_access_key_secret')
aliyun_region_id = os.environ.get('aliyun_region_id', 'cn-hangzhou')

二、celery 集成配置

1. 创建 celery 应用(rocketmq_manager/celery.py)

from __future__ import absolute_import, unicode_literals
import os
from celery import celery

os.environ.setdefault('django_settings_module', 'rocketmq_manager.settings')

app = celery('rocketmq_manager')
app.config_from_object('django.conf:settings', namespace='celery')
app.autodiscover_tasks()

@app.task(bind=true)
def debug_task(self):
    print(f'request: {self.request!r}')

2. 初始化 celery(rocketmq_manager/__init__.py)

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ('celery_app',)

三、model 开发

创建 rocketmq 实例模型(rocketmq/models.py)

python

运行

from django.db import models
from django.utils import timezone

class rocketmqinstance(models.model):
    instance_id = models.charfield('实例id', max_length=100, unique=true)
    instance_name = models.charfield('实例名称', max_length=200, blank=true, null=true)
    instance_type = models.charfield('实例类型', max_length=50, blank=true, null=true)
    region_id = models.charfield('区域id', max_length=50)
    status = models.charfield('状态', max_length=50, blank=true, null=true)
    create_time = models.datetimefield('创建时间', blank=true, null=true)
    expire_time = models.datetimefield('过期时间', blank=true, null=true)
    tags = models.jsonfield('标签', blank=true, null=true)
    last_updated = models.datetimefield('最后更新时间', auto_now=true)
    
    def __str__(self):
        return f"{self.instance_name} ({self.instance_id})"
    
    class meta:
        verbose_name = 'rocketmq实例'
        verbose_name_plural = 'rocketmq实例列表'
        indexes = [
            models.index(fields=['instance_id', 'region_id']),
        ]

class instancesynclog(models.model):
    sync_time = models.datetimefield('同步时间', auto_now_add=true)
    instance_count = models.integerfield('实例数量', default=0)
    success = models.booleanfield('是否成功', default=true)
    error_message = models.textfield('错误信息', blank=true, null=true)
    execution_time = models.floatfield('执行时间(秒)', blank=true, null=true)
    
    def __str__(self):
        return f"同步记录 - {self.sync_time}"
    
    class meta:
        verbose_name = '实例同步日志'
        verbose_name_plural = '实例同步日志列表'
        ordering = ['-sync_time']

迁移数据库

python manage.py makemigrations
python manage.py migrate

四、定时任务代码

创建阿里云 api 客户端(rocketmq/aliyun_client.py)

import os
from aliyunsdkcore.client import acsclient
from aliyunsdkcore.acs_exception.exceptions import clientexception
from aliyunsdkcore.acs_exception.exceptions import serverexception
from aliyunsdkmq.model.v20190513 import describeinstancesrequest
import json
import time

class aliyunrocketmqclient:
    def __init__(self):
        self.access_key_id = os.environ.get('aliyun_access_key_id')
        self.access_key_secret = os.environ.get('aliyun_access_key_secret')
        self.region_id = os.environ.get('aliyun_region_id', 'cn-hangzhou')
        self.client = acsclient(self.access_key_id, self.access_key_secret, self.region_id)
    
    def get_instances(self):
        try:
            request = describeinstancesrequest.describeinstancesrequest()
            request.set_accept_format('json')
            
            # 添加重试机制
            max_retries = 3
            for attempt in range(max_retries):
                try:
                    response = self.client.do_action_with_exception(request)
                    return json.loads(response)
                except (clientexception, serverexception) as e:
                    if attempt < max_retries - 1:
                        wait_time = (attempt + 1) * 2
                        print(f"请求失败,{wait_time}秒后重试: {str(e)}")
                        time.sleep(wait_time)
                    else:
                        raise
            
        except exception as e:
            print(f"获取实例信息失败: {str(e)}")
            raise

定义定时任务(rocketmq/tasks.py)

from celery import shared_task
from .models import rocketmqinstance, instancesynclog
from .aliyun_client import aliyunrocketmqclient
import logging
from datetime import datetime
import time

logger = logging.getlogger(__name__)

@shared_task(bind=true, autoretry_for=(exception,), retry_backoff=5, retry_kwargs={'max_retries': 3})
def sync_rocketmq_instances(self):
    start_time = time.time()
    try:
        client = aliyunrocketmqclient()
        response = client.get_instances()
        
        # 处理响应数据
        instance_list = []
        if 'data' in response and 'instancedolist' in response['data']:
            for item in response['data']['instancedolist']:
                instance = {
                    'instance_id': item.get('instanceid', ''),
                    'instance_name': item.get('instancename', ''),
                    'instance_type': item.get('instancetype', ''),
                    'region_id': item.get('regionid', ''),
                    'status': item.get('instancestatus', ''),
                    'create_time': datetime.fromtimestamp(item.get('createtime', 0) / 1000) if item.get('createtime') else none,
                    'expire_time': datetime.fromtimestamp(item.get('expiretime', 0) / 1000) if item.get('expiretime') else none,
                    'tags': item.get('tags', {})
                }
                instance_list.append(instance)
        
        # 使用事务批量更新数据库
        from django.db import transaction
        with transaction.atomic():
            # 先删除不存在的实例(可选)
            # existing_ids = [item['instance_id'] for item in instance_list]
            # rocketmqinstance.objects.exclude(instance_id__in=existing_ids).delete()
            
            # 批量更新或创建实例
            for instance_data in instance_list:
                rocketmqinstance.objects.update_or_create(
                    instance_id=instance_data['instance_id'],
                    defaults=instance_data
                )
        
        execution_time = time.time() - start_time
        
        # 记录同步日志
        log = instancesynclog.objects.create(
            instance_count=len(instance_list),
            success=true,
            execution_time=execution_time
        )
        
        logger.info(f"成功同步 {len(instance_list)} 个rocketmq实例,耗时: {execution_time:.2f}秒")
        return f"同步完成,共 {len(instance_list)} 个实例,耗时: {execution_time:.2f}秒"
        
    except exception as e:
        execution_time = time.time() - start_time
        
        # 记录错误日志
        instancesynclog.objects.create(
            success=false,
            error_message=str(e),
            execution_time=execution_time
        )
        
        logger.error(f"同步rocketmq实例失败: {str(e)},耗时: {execution_time:.2f}秒")
        raise

五、接口开发

1. 创建序列化器(rocketmq/serializers.py)

from rest_framework import serializers
from .models import rocketmqinstance, instancesynclog

class rocketmqinstanceserializer(serializers.modelserializer):
    class meta:
        model = rocketmqinstance
        fields = '__all__'
        read_only_fields = ['last_updated']

class instancesynclogserializer(serializers.modelserializer):
    class meta:
        model = instancesynclog
        fields = '__all__'
        read_only_fields = ['sync_time', 'instance_count', 'success', 'error_message', 'execution_time']

2. 创建视图集(rocketmq/views.py)

from rest_framework import viewsets, status
from rest_framework.response import response
from .models import rocketmqinstance, instancesynclog
from .serializers import rocketmqinstanceserializer, instancesynclogserializer
from .tasks import sync_rocketmq_instances
from rest_framework.decorators import action
from rest_framework.permissions import isauthenticated
from rest_framework.authentication import tokenauthentication

class rocketmqinstanceviewset(viewsets.modelviewset):
    queryset = rocketmqinstance.objects.all()
    serializer_class = rocketmqinstanceserializer
    authentication_classes = [tokenauthentication]
    permission_classes = [isauthenticated]
    
    @action(detail=false, methods=['post'])
    def sync_now(self, request):
        """立即触发实例同步"""
        task = sync_rocketmq_instances.delay()
        return response({'task_id': task.id, 'message': '同步任务已启动'}, status=status.http_202_accepted)
    
    @action(detail=false, methods=['get'])
    def regions(self, request):
        """获取所有区域列表"""
        regions = rocketmqinstance.objects.values_list('region_id', flat=true).distinct()
        return response(regions, status=status.http_200_ok)

class instancesynclogviewset(viewsets.readonlymodelviewset):
    queryset = instancesynclog.objects.all().order_by('-sync_time')
    serializer_class = instancesynclogserializer
    authentication_classes = [tokenauthentication]
    permission_classes = [isauthenticated]

3. 配置 url(rocketmq/urls.py)

from django.urls import include, path
from rest_framework import routers
from .views import rocketmqinstanceviewset, instancesynclogviewset

router = routers.defaultrouter()
router.register(r'instances', rocketmqinstanceviewset)
router.register(r'sync-logs', instancesynclogviewset)

urlpatterns = [
    path('', include(router.urls)),
]

4. 项目 url 配置(rocketmq_manager/urls.py)

from django.contrib import admin
from django.urls import path, include
from rest_framework.authtoken.views import obtain_auth_token

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/', include('rocketmq.urls')),
    path('api/token/', obtain_auth_token, name='api_token_auth'),  # 获取认证令牌
]

六、配置定时任务

在settings.py中添加定时任务配置

celery_beat_schedule = {
    'sync-rocketmq-instances': {
        'task': 'rocketmq.tasks.sync_rocketmq_instances',
        'schedule': 3600.0,  # 每小时执行一次
        'args': ()
    },
}

七、启动服务

1. 设置环境变量

export aliyun_access_key_id=your_access_key_id
export aliyun_access_key_secret=your_access_key_secret
export aliyun_region_id=cn-hangzhou  # 根据实际情况修改

2. 启动 redis

redis-server

3. 启动 celery worker

celery -a rocketmq_manager worker --loglevel=info --pool=prefork --concurrency=4

4. 启动 celery beat

celery -a rocketmq_manager beat --loglevel=info --scheduler django_celery_beat.schedulers:databasescheduler

5. 启动 django 开发服务器

python manage.py runserver

八、api 测试

1. 获取认证令牌

curl -x post -d "username=your_username&password=your_password" http://localhost:8000/api/token/

2. 获取 rocketmq 实例列表

curl -h "authorization: token your_token_here" http://localhost:8000/api/instances/

3. 获取同步日志

curl -h "authorization: token your_token_here" http://localhost:8000/api/sync-logs/

4. 手动触发同步

curl -x post -h "authorization: token your_token_here" http://localhost:8000/api/instances/sync_now/

项目结构

rocketmq_manager/
├── rocketmq_manager/
│   ├── __init__.py
│   ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── rocketmq/
│   ├── migrations/
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── aliyun_client.py
│   ├── models.py
│   ├── serializers.py
│   ├── tasks.py
│   ├── urls.py
│   └── views.py
├── manage.py
└── db.sqlite3

关键特性说明

  1. mysql 存储:使用 mysql 数据库存储 rocketmq 实例信息和同步日志
  2. 定时同步:每小时自动拉取阿里云 rocketmq 实例信息
  3. 数据持久化:将实例信息存储到数据库,支持索引加速查询
  4. 手动触发:提供 api 接口支持手动触发同步
  5. 错误处理:任务失败自动重试,记录详细的同步日志和执行时间
  6. 权限控制:使用 token 认证保护 api 接口

扩展建议

  1. 添加更多阿里云 api 调用,获取更详细的实例指标(如 tps、消息堆积量等)
  2. 实现多区域支持,同时监控多个地域的 rocketmq 实例
  3. 添加告警机制,当实例状态异常或同步失败时发送通知
  4. 集成缓存系统(如 redis),提高接口响应速度
  5. 添加 api 限流功能,防止恶意请求
  6. 实现实例信息的导出功能,支持数据报表生成

这个实现提供了一个完整的 django+celery 定时拉取阿里云 rocketmq 实例信息的解决方案,使用 mysql 存储数据,支持权限控制和手动触发同步,可直接用于生产环境。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

您想发表意见!!点此发布评论

推荐阅读

阿里巴巴网页版登录入口在哪?

07-03

在Ubuntu 24.04(Noble)上配置阿里源的操作步骤

07-03

Nginx的预定义变量方式

07-03

Nginx日志配置的使用及说明

07-03

Nginx安装和配置过程(yum安装和编译安装)

07-03

Nginx(自定义/预定义)变量,alias虚拟目录解读

07-03

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论