102人参与 • 2025-07-02 • 阿里
# 创建虚拟环境 python3 -m venv env source env/bin/activate # 安装依赖 pip install django celery redis django-celery-beat aliyun-python-sdk-core-v3 aliyun-python-sdk-mq mysqlclient
# 创建项目 django-admin startproject rocketmq_manager cd rocketmq_manager # 创建应用 python manage.py startapp rocketmq
databases = {
'default': {
'engine': 'django.db.backends.mysql',
'name': 'rocketmq_manager', # 数据库名
'user': 'your_username', # 用户名
'password': 'your_password', # 密码
'host': 'localhost', # 主机
'port': '3306', # 端口
'options': {
'init_command': "set sql_mode='strict_trans_tables'",
},
}
}
installed_apps = [
# ...
'django_celery_beat',
'django_celery_results',
'rocketmq',
]
# celery配置
celery_broker_url = 'redis://localhost:6379/0'
celery_result_backend = 'django-db'
celery_accept_content = ['json']
celery_task_serializer = 'json'
celery_result_serializer = 'json'
celery_timezone = 'asia/shanghai'
# 阿里云配置(从环境变量获取)
aliyun_access_key_id = os.environ.get('aliyun_access_key_id')
aliyun_access_key_secret = os.environ.get('aliyun_access_key_secret')
aliyun_region_id = os.environ.get('aliyun_region_id', 'cn-hangzhou')
from __future__ import absolute_import, unicode_literals
import os
from celery import celery
os.environ.setdefault('django_settings_module', 'rocketmq_manager.settings')
app = celery('rocketmq_manager')
app.config_from_object('django.conf:settings', namespace='celery')
app.autodiscover_tasks()
@app.task(bind=true)
def debug_task(self):
print(f'request: {self.request!r}')
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
python
运行
from django.db import models
from django.utils import timezone
class rocketmqinstance(models.model):
instance_id = models.charfield('实例id', max_length=100, unique=true)
instance_name = models.charfield('实例名称', max_length=200, blank=true, null=true)
instance_type = models.charfield('实例类型', max_length=50, blank=true, null=true)
region_id = models.charfield('区域id', max_length=50)
status = models.charfield('状态', max_length=50, blank=true, null=true)
create_time = models.datetimefield('创建时间', blank=true, null=true)
expire_time = models.datetimefield('过期时间', blank=true, null=true)
tags = models.jsonfield('标签', blank=true, null=true)
last_updated = models.datetimefield('最后更新时间', auto_now=true)
def __str__(self):
return f"{self.instance_name} ({self.instance_id})"
class meta:
verbose_name = 'rocketmq实例'
verbose_name_plural = 'rocketmq实例列表'
indexes = [
models.index(fields=['instance_id', 'region_id']),
]
class instancesynclog(models.model):
sync_time = models.datetimefield('同步时间', auto_now_add=true)
instance_count = models.integerfield('实例数量', default=0)
success = models.booleanfield('是否成功', default=true)
error_message = models.textfield('错误信息', blank=true, null=true)
execution_time = models.floatfield('执行时间(秒)', blank=true, null=true)
def __str__(self):
return f"同步记录 - {self.sync_time}"
class meta:
verbose_name = '实例同步日志'
verbose_name_plural = '实例同步日志列表'
ordering = ['-sync_time']
python manage.py makemigrations python manage.py migrate
import os
from aliyunsdkcore.client import acsclient
from aliyunsdkcore.acs_exception.exceptions import clientexception
from aliyunsdkcore.acs_exception.exceptions import serverexception
from aliyunsdkmq.model.v20190513 import describeinstancesrequest
import json
import time
class aliyunrocketmqclient:
def __init__(self):
self.access_key_id = os.environ.get('aliyun_access_key_id')
self.access_key_secret = os.environ.get('aliyun_access_key_secret')
self.region_id = os.environ.get('aliyun_region_id', 'cn-hangzhou')
self.client = acsclient(self.access_key_id, self.access_key_secret, self.region_id)
def get_instances(self):
try:
request = describeinstancesrequest.describeinstancesrequest()
request.set_accept_format('json')
# 添加重试机制
max_retries = 3
for attempt in range(max_retries):
try:
response = self.client.do_action_with_exception(request)
return json.loads(response)
except (clientexception, serverexception) as e:
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2
print(f"请求失败,{wait_time}秒后重试: {str(e)}")
time.sleep(wait_time)
else:
raise
except exception as e:
print(f"获取实例信息失败: {str(e)}")
raise
from celery import shared_task
from .models import rocketmqinstance, instancesynclog
from .aliyun_client import aliyunrocketmqclient
import logging
from datetime import datetime
import time
logger = logging.getlogger(__name__)
@shared_task(bind=true, autoretry_for=(exception,), retry_backoff=5, retry_kwargs={'max_retries': 3})
def sync_rocketmq_instances(self):
start_time = time.time()
try:
client = aliyunrocketmqclient()
response = client.get_instances()
# 处理响应数据
instance_list = []
if 'data' in response and 'instancedolist' in response['data']:
for item in response['data']['instancedolist']:
instance = {
'instance_id': item.get('instanceid', ''),
'instance_name': item.get('instancename', ''),
'instance_type': item.get('instancetype', ''),
'region_id': item.get('regionid', ''),
'status': item.get('instancestatus', ''),
'create_time': datetime.fromtimestamp(item.get('createtime', 0) / 1000) if item.get('createtime') else none,
'expire_time': datetime.fromtimestamp(item.get('expiretime', 0) / 1000) if item.get('expiretime') else none,
'tags': item.get('tags', {})
}
instance_list.append(instance)
# 使用事务批量更新数据库
from django.db import transaction
with transaction.atomic():
# 先删除不存在的实例(可选)
# existing_ids = [item['instance_id'] for item in instance_list]
# rocketmqinstance.objects.exclude(instance_id__in=existing_ids).delete()
# 批量更新或创建实例
for instance_data in instance_list:
rocketmqinstance.objects.update_or_create(
instance_id=instance_data['instance_id'],
defaults=instance_data
)
execution_time = time.time() - start_time
# 记录同步日志
log = instancesynclog.objects.create(
instance_count=len(instance_list),
success=true,
execution_time=execution_time
)
logger.info(f"成功同步 {len(instance_list)} 个rocketmq实例,耗时: {execution_time:.2f}秒")
return f"同步完成,共 {len(instance_list)} 个实例,耗时: {execution_time:.2f}秒"
except exception as e:
execution_time = time.time() - start_time
# 记录错误日志
instancesynclog.objects.create(
success=false,
error_message=str(e),
execution_time=execution_time
)
logger.error(f"同步rocketmq实例失败: {str(e)},耗时: {execution_time:.2f}秒")
raise
from rest_framework import serializers
from .models import rocketmqinstance, instancesynclog
class rocketmqinstanceserializer(serializers.modelserializer):
class meta:
model = rocketmqinstance
fields = '__all__'
read_only_fields = ['last_updated']
class instancesynclogserializer(serializers.modelserializer):
class meta:
model = instancesynclog
fields = '__all__'
read_only_fields = ['sync_time', 'instance_count', 'success', 'error_message', 'execution_time']
from rest_framework import viewsets, status
from rest_framework.response import response
from .models import rocketmqinstance, instancesynclog
from .serializers import rocketmqinstanceserializer, instancesynclogserializer
from .tasks import sync_rocketmq_instances
from rest_framework.decorators import action
from rest_framework.permissions import isauthenticated
from rest_framework.authentication import tokenauthentication
class rocketmqinstanceviewset(viewsets.modelviewset):
queryset = rocketmqinstance.objects.all()
serializer_class = rocketmqinstanceserializer
authentication_classes = [tokenauthentication]
permission_classes = [isauthenticated]
@action(detail=false, methods=['post'])
def sync_now(self, request):
"""立即触发实例同步"""
task = sync_rocketmq_instances.delay()
return response({'task_id': task.id, 'message': '同步任务已启动'}, status=status.http_202_accepted)
@action(detail=false, methods=['get'])
def regions(self, request):
"""获取所有区域列表"""
regions = rocketmqinstance.objects.values_list('region_id', flat=true).distinct()
return response(regions, status=status.http_200_ok)
class instancesynclogviewset(viewsets.readonlymodelviewset):
queryset = instancesynclog.objects.all().order_by('-sync_time')
serializer_class = instancesynclogserializer
authentication_classes = [tokenauthentication]
permission_classes = [isauthenticated]
from django.urls import include, path
from rest_framework import routers
from .views import rocketmqinstanceviewset, instancesynclogviewset
router = routers.defaultrouter()
router.register(r'instances', rocketmqinstanceviewset)
router.register(r'sync-logs', instancesynclogviewset)
urlpatterns = [
path('', include(router.urls)),
]
from django.contrib import admin
from django.urls import path, include
from rest_framework.authtoken.views import obtain_auth_token
urlpatterns = [
path('admin/', admin.site.urls),
path('api/', include('rocketmq.urls')),
path('api/token/', obtain_auth_token, name='api_token_auth'), # 获取认证令牌
]
celery_beat_schedule = {
'sync-rocketmq-instances': {
'task': 'rocketmq.tasks.sync_rocketmq_instances',
'schedule': 3600.0, # 每小时执行一次
'args': ()
},
}
export aliyun_access_key_id=your_access_key_id export aliyun_access_key_secret=your_access_key_secret export aliyun_region_id=cn-hangzhou # 根据实际情况修改
redis-server
celery -a rocketmq_manager worker --loglevel=info --pool=prefork --concurrency=4
celery -a rocketmq_manager beat --loglevel=info --scheduler django_celery_beat.schedulers:databasescheduler
python manage.py runserver
curl -x post -d "username=your_username&password=your_password" http://localhost:8000/api/token/
curl -h "authorization: token your_token_here" http://localhost:8000/api/instances/
curl -h "authorization: token your_token_here" http://localhost:8000/api/sync-logs/
curl -x post -h "authorization: token your_token_here" http://localhost:8000/api/instances/sync_now/
rocketmq_manager/ ├── rocketmq_manager/ │ ├── __init__.py │ ├── celery.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py ├── rocketmq/ │ ├── migrations/ │ ├── __init__.py │ ├── admin.py │ ├── apps.py │ ├── aliyun_client.py │ ├── models.py │ ├── serializers.py │ ├── tasks.py │ ├── urls.py │ └── views.py ├── manage.py └── db.sqlite3
这个实现提供了一个完整的 django+celery 定时拉取阿里云 rocketmq 实例信息的解决方案,使用 mysql 存储数据,支持权限控制和手动触发同步,可直接用于生产环境。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论