You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
report_app/app/tools/deal_excels.py

247 lines
8.9 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import pandas as pd
import numpy as np
import logging
import re
from .effective_cities import effective_cities
# 获取日志记录器
logger = logging.getLogger(__name__)
# 获取省份统计结果及地市统计结果,仅统计个数情况
def deal_excel(start_time, end_time, file_path):
try:
logger.info("开始分析停电excel")
logger.info(f"开始分析:{start_time}")
logger.info(f"结束时间:{end_time}")
# 获取所有sheet页名称
excel_file = pd.ExcelFile(file_path)
sheet_names = excel_file.sheet_names
pattern_sheet = r"(2025年?投诉明细|投诉明细)[\(\s]*供电类[\)\s]*"
# 使用正则表达式进行模糊匹配(不区分大小写)
matched_sheets = [
sheet
for sheet in sheet_names
if re.fullmatch(pattern_sheet, sheet, re.IGNORECASE)
]
if len(matched_sheets) == 1:
final_sheet = matched_sheets[0]
else:
logger.error("没有找到匹配的sheet页")
return None
df = pd.read_excel(
file_path,
sheet_name=final_sheet,
skiprows=1,
)
# 将时间列转成字符串
df["time"] = df["受理时间"].astype(str)
# ---------------------------------------去点中间或两侧空格---------------------------------
df[""] = df[""].str.strip().str.replace(r"\s+", "", regex=True)
df["地市"] = df["地市"].str.strip().str.replace(r"\s+", "", regex=True)
df["县区"] = df["县区"].str.strip().str.replace(r"\s+", "", regex=True)
df["一级分类"] = df["一级分类"].str.strip().str.replace(r"\s+", "", regex=True)
df["time"] = (
df["time"]
.str.strip()
.str.replace(r"-|年|月|日|\.|时|分|秒|点", "/", regex=True)
)
# ---------------------------------------去点中间或两侧空格---------------------------------
# 通过字符串功能格式化时间
# df['time'] = df['time'].str.replace(r'-|年|月|日|\.|时|分|秒|点', '/', regex=True)
# 转成date方便比较
df["datetime"] = pd.to_datetime(df["time"])
# 开始时间和结束时间
# start_time = datetime(2025, 3, 5, 17, 0, 0)
# end_time = datetime(2025, 3, 6, 17, 0, 0)
# 拿到供电质量在当天的数据
df = df[
(df["datetime"] > start_time)
& (df["datetime"] <= end_time)
& (df["一级分类"] == "供电质量")
]
print(f"只通过时间筛选的数据行数{len(df)}")
# 对省份数据进行清洗
province_list = ["广东", "广西", "云南", "贵州", "海南", "深圳"]
# 省份正则
province_pattern = "|".join(province_list)
# 对省份数据进行清洗
df[""] = df[""].apply(
lambda x: re.search(province_pattern, x).group()
if re.search(province_pattern, x)
else ""
)
# 删除省份为空的值
df = df[df[""] != ""]
# 判断数据区里面是否有深圳
df["地市"] = df["地市"].astype(str)
# df.loc[df['地市'].str.contains('深圳|罗湖|福田|南山|宝安|龙岗|盐田|龙华|坪山|光明|大鹏'), '省'] = '深圳'
# 条件1b 字段匹配正则
mask_b = df["地市"].str.contains(
"深圳|罗湖|福田|南山|宝安|龙岗|盐田|龙华|坪山|光明|大鹏",
regex=True,
na=False,
)
# 条件2a 字段是特定值(例如 a=1 或 a=3
mask_a = df[""].isin(["广东", "深圳"]) # 替换条件a=1 或 a=3
# 最终条件b 匹配正则 且 a 在允许范围内
final_mask = mask_b & mask_a
# 执行替换
df.loc[final_mask, ""] = "深圳"
# 对数据按照’省‘进行分类汇总
group_province = df.groupby("")
province_statistics = {
"广东": 0,
"广西": 0,
"云南": 0,
"贵州": 0,
"海南": 0,
"深圳": 0,
}
# 保存省份统计的数据到字典
province_temp = group_province.size().to_dict()
# 最终当天省份的统计数据,利用update更新旧字典
province_statistics.update(province_temp)
# 地市处理逻辑
# 81地市优化函数,非81地市的省份要不要算进去
df = effective_cities(df)
# 对数据按照区进行分组汇总
# 首先对省和地区进行拼接
# 替换掉'供电局'字样
df["地市"] = df["地市"].str.replace("供电局", "")
# 对非深圳的地市,拼接省的信息
df.loc[df[""] != "深圳", "地市"] = df[""] + "" + df["地市"]
# 按照地市进行分组统计
group_district = df.groupby("地市")
# 将地市的统计数据保存到字典
district_statistics = group_district.size().to_dict()
# 对数据进行降序排列
district_statistics = sorted(
district_statistics.items(), key=lambda x: x[1], reverse=True
)
return province_statistics, district_statistics
except Exception as e:
logger.exception(f"对数据按照’省‘进行分类汇总{e}")
def deal_excel_over_load(file_path):
try:
logger.info("开始分析配变过载excel")
# 获取所有sheet页名称
excel_file = pd.ExcelFile(file_path)
sheet_names = excel_file.sheet_names
pattern_sheet = r"重过载明细"
# 使用正则表达式进行模糊匹配(不区分大小写)
matched_sheets = [
sheet
for sheet in sheet_names
if re.fullmatch(pattern_sheet, sheet, re.IGNORECASE)
]
if len(matched_sheets) == 1:
final_sheet = matched_sheets[0]
else:
logger.error("没有找到匹配的sheet页")
return None
df = pd.read_excel(
file_path,
sheet_name=final_sheet,
)
values_to_include = ['严重过载', '一般过载']
filtered_df = df[df['重过载情况'].isin(values_to_include)]
grouped_df = filtered_df.groupby(['分子公司', '地市局']).size().reset_index(name='记录数')
sorted_df = grouped_df.sort_values(by='记录数', ascending=False)
top_5_results = sorted_df.head(5)
# ---------------------------------------去点中间或两侧空格---------------------------------
top_5_results["分子公司"] = top_5_results["分子公司"].str.strip().str.replace(r"\s+", "", regex=True)
top_5_results["地市局"] = top_5_results["地市局"].str.strip().str.replace(r"\s+", "", regex=True)
# ---------------------------------------去点中间或两侧空格---------------------------------
top_5_results['公司地市'] = top_5_results['分子公司'] + top_5_results['地市局']
# 最终选择需要返回的列:'公司_地市' 和 '记录数'
final_output = top_5_results[['公司地市', '记录数']]
return final_output
except Exception as e:
logger.exception(f"对数据按照’省‘进行分类汇总{e}")
# 判断地市top5环比方法
def top5_dod_analysis(top, district_stat_before):
try:
logger.info("开始分析地市top5环比")
if top[0] in district_stat_before.keys():
top_dod = top[1] - district_stat_before[top[0]]
if top_dod > 0:
top_dod = "+" + str(top_dod)
return top_dod
elif top_dod == 0:
top_dod = "持平"
return top_dod
else:
return top_dod
else:
top_dod = "+" + str(top[1])
return top_dod
except KeyError:
logger.exception(f"判断地市top5环比{KeyError}")
except AttributeError:
logger.exception(f"判断地市top5环比{AttributeError}")
except ArithmeticError:
logger.exception(f"判断地市top5环比{ArithmeticError}")
# 判断省份环比
# def province_dod_analysis(before:dict, after:dict):
#
# dod = sum(after.values()) - sum(before.values())
#
# if dod > 0:
# dod = '+' + str(dod)
# return dod
# elif dod == 0:
# dod = '持平'
# return dod
# 将统计出来的省份数据或者排名前五的数据、环比组成列表,再转至(行和列互换),方便写入表格
def transform_data(data):
try:
logger.info("开始将统计出来的数据转至(行和列互换),方便写入表格")
# 使用 NumPy 实现行转列
transposed_data = np.array(data).transpose().tolist()
return transposed_data
except Exception as e:
logger.exception(f"将统计出来的数据转至(行和列互换),方便写入表格{e}")