支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


DIFY实现prometheus历史告警结合实时数据进行分析

发布日期:2025-08-11 14:25:05 浏览次数: 1524
作者:运维狗不背锅

微信搜一搜,关注“运维狗不背锅”

推荐语

DIFY结合Prometheus历史告警与实时数据,打造智能分析闭环,提升运维效率。

核心内容:
1. 告警数据清洗与指标查询语句生成的关键步骤
2. 数据量控制与查询优化的实战经验
3. 后端实现代码示例与参数调优技巧

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家
废话不对说,先上DLS截图
实现步骤:
1、先获取告警数据,对告警数据进行处理,主要是将告警数据清洗,去重,聚合等操作,都在后端实现
2、将prometheus的告警规则存到知识库里,这么做主要是生成的查询告警时的指标数据更准确
3、生成查询语句
4、通过查询语句查询prometheus数据,这里也是后端实现,同样需将查询到的数据进行清洗,去重,聚合。防止数据太多,模型报错
5、将告警数据和指标数据进行联合分析
6、生成报告
主要的步骤是上面这些,具体注意的点:
1、不管是告警数据还是指标数据都需要注意数据量的问题,数据量太大超过模型上限就直接报错了
2、指标数据查询语句生成问题,这个我试过很多方式,感觉还是将prometheus的告警指标直接存到知识库,通过告警指标生成查询语句这种效果好一点
3、当进行聚合分析时,很多时候告警数据的量不会很大,但是查询到指标数据的量很大,这种我是通过不同step参数去做
后端代码:
# 在 monitor/AlertView.py 文件中修改以下代码import datetime
import requestsfrom django.http import JsonResponsefrom django.views.decorators.http import require_GETimport logging
logger = logging.getLogger(__name__)

@require_GETdef prometheus_query_range(request):    """    转发请求到Prometheus的query_range API,并根据namespace过滤结果    参数:    - query: Prometheus查询语句(支持多个,用分号分隔)    - start: 开始时间 (RFC3339格式)    - end: 结束时间 (RFC3339格式)    - step: 时间步长    - namespace: 可选,用于过滤结果中的namespace(支持多个,用逗号分隔)    """    try:        # 从请求参数中获取值        query = request.GET.get('query')        start = request.GET.get('start')        end = request.GET.get('end')        step = request.GET.get('step''30m')  # 默认步长30分钟        filter_namespaces = request.GET.get('namespace')  # 过滤用的namespace
        # 参数校验        if not all([query, start, end]):            return JsonResponse({                'error''缺少必要参数: query, start, end都是必需的'            }, status=400)
        # 处理多个namespace过滤参数        filter_namespace_list = None        if filter_namespaces:            filter_namespace_list = [ns.strip() for ns in filter_namespaces.split(','if ns.strip()]
        # Prometheus API地址        prometheus_url = "http://IP:9090/api/v1/query_range"
        # 支持多个查询语句,用分号分隔        queries = [q.strip() for q in query.split(';'if q.strip()]
        # 存储所有查询结果        all_results = []
        # 执行每个查询        for q in queries:            # 构造请求参数            params = {                'query': q,                'start': start,                'end': end,                'step': step            }
            # 发送请求到Prometheus            response = requests.get(prometheus_url, params=params, timeout=30)
            # 检查响应状态            if response.status_code == 200:                data = response.json()
                # 如果指定了namespace过滤,则进行过滤                if filter_namespace_list is not None and 'data' in data and 'result' in data['data']:                    filtered_result = []                    for item in data['data']['result']:                        # 检查metric中是否有namespace字段,并且是否匹配                        if 'metric' in item and 'namespace' in item['metric']:                            if item['metric']['namespace'in filter_namespace_list:                                filtered_result.append(item)                        # 如果没有namespace字段,根据需求决定是否包含                        # 这里假设只包含有明确namespace且匹配的项
                    # 更新返回数据中的result                    data['data']['result'] = filtered_result
                # 处理结果,只保留pod, instance, namespace, values这四个参数                # 并过滤掉包含+Inf值的metric                if 'data' in data and 'result' in data['data']:                    processed_result = []                    for item in data['data']['result']:                        # 检查values中是否包含+Inf值                        has_inf = False                        if 'values' in item:                            for value_pair in item['values']:                                if len(value_pair) > 1 and value_pair[1] == "+Inf":                                    has_inf = True                                    break
                        # 如果包含+Inf值,则跳过这个metric                        if has_inf:                            continue
                        processed_item = {}
                        # 只保留指定的字段                        if 'metric' in item:                            # 提取需要的字段                            processed_metric = {}                            if 'pod' in item['metric']:                                processed_metric['pod'] = item['metric']['pod']                            if 'instance' in item['metric']:                                processed_metric['instance'] = item['metric']['instance']                            if 'namespace' in item['metric']:                                processed_metric['namespace'] = item['metric']['namespace']
                            processed_item['metric'] = processed_metric
                        # 保留values字段                        if 'values' in item:                            formatted_values = []                            for value_pair in item['values']:                                if len(value_pair) >= 2:                                    timestamp = value_pair[0]                                    value = value_pair[1]
                                    formatted_value = {                                        'time': datetime.datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d '                                                                                                       '%H:%M:%S'),                                        'value': value                                    }                                    formatted_values.append(formatted_value)                            processed_item['values'] = formatted_values                        processed_result.append(processed_item)
                    # 将处理后的结果添加到总结果中                    all_results.extend(processed_result)                elif 'data' in data:                    # 即使没有result字段,也要添加空的结果以确保所有查询都被处理                    all_results.extend([])            else:                logger.warning(f"Prometheus API调用失败,状态码: {response.status_code},查询: {q}")
        # 构造最终返回的数据结构        final_response = {            'status''success',            'data': {                'resultType''matrix',                'result': all_results            }        }
        return JsonResponse(final_response)
    except requests.exceptions.RequestException as e:        logger.error(f"请求Prometheus API时发生网络错误: {str(e)}")        return JsonResponse({            'error''网络错误',            'message'str(e)        }, status=500)    except Exception as e:        logger.error(f"处理Prometheus查询时发生错误: {str(e)}")        return JsonResponse({            'error''服务器内部错误',            'message'str(e)        }, status=500)

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询