根据您提供的电网配网设备故障分类及遥测特征信息,我可以帮您创建一个Python脚本来生成适合机器学习的样本数据。这个脚本将基于您列出的不同故障类型及其特征生成模拟数据。
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
def generate_normal_data(num_samples=100):
"""生成正常运行状态的数据"""
data = []
# 正常状态下的标称值
nominal_voltage = 10000 # 10kV系统
nominal_current = 100 # 正常运行电流
nominal_power_factor = 0.9
for _ in range(num_samples):
# 添加一些随机波动但保持在正常范围内
uab = nominal_voltage * random.uniform(0.95, 1.05)
ubc = nominal_voltage * random.uniform(0.95, 1.05)
uca = nominal_voltage * random.uniform(0.95, 1.05)
ua = uab / np.sqrt(3) * random.uniform(0.95, 1.05)
ub = ubc / np.sqrt(3) * random.uniform(0.95, 1.05)
uc = uca / np.sqrt(3) * random.uniform(0.95, 1.05)
ia = nominal_current * random.uniform(0.8, 1.0)
ib = nominal_current * random.uniform(0.8, 1.0)
ic = nominal_current * random.uniform(0.8, 1.0)
i0 = (ia + ib + ic) * random.uniform(0, 0.05) # 很小的零序电流
# 计算功率
power_factor = nominal_power_factor * random.uniform(0.95, 1.05)
active_power_a = ua * ia * power_factor
active_power_b = ub * ib * power_factor
active_power_c = uc * ic * power_factor
active_power_total = active_power_a + active_power_b + active_power_c
apparent_power_a = ua * ia
apparent_power_b = ub * ib
apparent_power_c = uc * ic
apparent_power_total = apparent_power_a + apparent_power_b + apparent_power_c
reactive_power_a = apparent_power_a * np.sin(np.arccos(power_factor))
reactive_power_b = apparent_power_b * np.sin(np.arccos(power_factor))
reactive_power_c = apparent_power_c * np.sin(np.arccos(power_factor))
reactive_power_total = reactive_power_a + reactive_power_b + reactive_power_c
sample = {
"UAB": uab, "UBC": ubc, "UCA": uca,
"UA": ua, "UB": ub, "UC": uc,
"IA": ia, "IB": ib, "IC": ic, "I0": i0,
"有功功率": active_power_total,
"A相有功功率": active_power_a, "B相有功功率": active_power_b, "C相有功功率": active_power_c,
"无功功率": reactive_power_total,
"A相无功功率": reactive_power_a, "B相无功功率": reactive_power_b, "C相无功功率": reactive_power_c,
"视在功率": apparent_power_total,
"A相视在功率": apparent_power_a, "B相视在功率": apparent_power_b, "C相视在功率": apparent_power_c,
"故障类型": "正常",
"一级分类": "正常",
"二级分类": "正常"
}
data.append(sample)
return data
def generate_short_circuit_data(num_samples=100):
"""生成短路故障数据"""
data = []
# 短路故障类型
fault_types = [
{"一级分类": "短路故障", "二级分类": "三相短路"},
{"一级分类": "短路故障", "二级分类": "两相短路"},
{"一级分类": "短路故障", "二级分类": "相间短路"},
{"一级分类": "短路故障", "二级分类": "金属性短路"},
{"一级分类": "短路故障", "二级分类": "电弧性短路"}
]
nominal_voltage = 10000
nominal_current = 100
for _ in range(num_samples):
fault_type = random.choice(fault_types)
if fault_type["二级分类"] == "三相短路":
# 三相短路:三相电压接近零,电流大幅增加
uab = nominal_voltage * random.uniform(0.0, 0.1)
ubc = nominal_voltage * random.uniform(0.0, 0.1)
uca = nominal_voltage * random.uniform(0.0, 0.1)
ua = uab / np.sqrt(3) * random.uniform(0.0, 0.1)
ub = ubc / np.sqrt(3) * random.uniform(0.0, 0.1)
uc = uca / np.sqrt(3) * random.uniform(0.0, 0.1)
ia = nominal_current * random.uniform(10, 20)
ib = nominal_current * random.uniform(10, 20)
ic = nominal_current * random.uniform(10, 20)
elif fault_type["二级分类"] == "两相短路":
# 两相短路:两相电压降低,两相电流增大
# 随机选择两相短路
phases = random.sample(["A", "B", "C"], 2)
uab = nominal_voltage * (0.1 if ("A" in phases and "B" in phases) else random.uniform(0.9, 1.0))
ubc = nominal_voltage * (0.1 if ("B" in phases and "C" in phases) else random.uniform(0.9, 1.0))
uca = nominal_voltage * (0.1 if ("C" in phases and "A" in phases) else random.uniform(0.9, 1.0))
ua = nominal_voltage / np.sqrt(3) * (0.1 if "A" in phases else random.uniform(0.9, 1.0))
ub = nominal_voltage / np.sqrt(3) * (0.1 if "B" in phases else random.uniform(0.9, 1.0))
uc = nominal_voltage / np.sqrt(3) * (0.1 if "C" in phases else random.uniform(0.9, 1.0))
ia = nominal_current * (random.uniform(10, 15) if "A" in phases else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(10, 15) if "B" in phases else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(10, 15) if "C" in phases else random.uniform(0.8, 1.0))
elif fault_type["二级分类"] == "相间短路":
# 相间短路:类似两相短路但电压降低不那么严重
phases = random.sample(["A", "B", "C"], 2)
uab = nominal_voltage * (0.3 if ("A" in phases and "B" in phases) else random.uniform(0.9, 1.0))
ubc = nominal_voltage * (0.3 if ("B" in phases and "C" in phases) else random.uniform(0.9, 1.0))
uca = nominal_voltage * (0.3 if ("C" in phases and "A" in phases) else random.uniform(0.9, 1.0))
ua = nominal_voltage / np.sqrt(3) * (0.3 if "A" in phases else random.uniform(0.9, 1.0))
ub = nominal_voltage / np.sqrt(3) * (0.3 if "B" in phases else random.uniform(0.9, 1.0))
uc = nominal_voltage / np.sqrt(3) * (0.3 if "C" in phases else random.uniform(0.9, 1.0))
ia = nominal_current * (random.uniform(5, 10) if "A" in phases else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(5, 10) if "B" in phases else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(5, 10) if "C" in phases else random.uniform(0.8, 1.0))
elif fault_type["二级分类"] == "金属性短路":
# 金属性短路:电阻极小,电流极大
phases = random.sample(["A", "B", "C"], random.choice([2, 3]))
uab = nominal_voltage * (0.05 if ("A" in phases and "B" in phases) else random.uniform(0.9, 1.0))
ubc = nominal_voltage * (0.05 if ("B" in phases and "C" in phases) else random.uniform(0.9, 1.0))
uca = nominal_voltage * (0.05 if ("C" in phases and "A" in phases) else random.uniform(0.9, 1.0))
ua = nominal_voltage / np.sqrt(3) * (0.05 if "A" in phases else random.uniform(0.9, 1.0))
ub = nominal_voltage / np.sqrt(3) * (0.05 if "B" in phases else random.uniform(0.9, 1.0))
uc = nominal_voltage / np.sqrt(3) * (0.05 if "C" in phases else random.uniform(0.9, 1.0))
ia = nominal_current * (random.uniform(15, 25) if "A" in phases else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(15, 25) if "B" in phases else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(15, 25) if "C" in phases else random.uniform(0.8, 1.0))
else: # 电弧性短路
# 电弧性短路:电流波动,波形畸变
phases = random.sample(["A", "B", "C"], random.choice([2, 3]))
uab = nominal_voltage * (random.uniform(0.3, 0.7) if ("A" in phases and "B" in phases) else random.uniform(0.9, 1.0))
ubc = nominal_voltage * (random.uniform(0.3, 0.7) if ("B" in phases and "C" in phases) else random.uniform(0.9, 1.0))
uca = nominal_voltage * (random.uniform(0.3, 0.7) if ("C" in phases and "A" in phases) else random.uniform(0.9, 1.0))
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0.3, 0.7) if "A" in phases else random.uniform(0.9, 1.0))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0.3, 0.7) if "B" in phases else random.uniform(0.9, 1.0))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0.3, 0.7) if "C" in phases else random.uniform(0.9, 1.0))
# 电弧性短路电流波动较大
ia = nominal_current * (random.uniform(3, 12) if "A" in phases else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(3, 12) if "B" in phases else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(3, 12) if "C" in phases else random.uniform(0.8, 1.0))
# 计算零序电流,短路情况下可能较大
i0 = (ia + ib + ic) * random.uniform(0.1, 0.9)
# 功率计算 - 在短路故障下可能不准确,但为了数据完整性仍然计算
active_power_a = ua * ia * 0.2 # 假设功率因数较低
active_power_b = ub * ib * 0.2
active_power_c = uc * ic * 0.2
active_power_total = active_power_a + active_power_b + active_power_c
apparent_power_a = ua * ia
apparent_power_b = ub * ib
apparent_power_c = uc * ic
apparent_power_total = apparent_power_a + apparent_power_b + apparent_power_c
reactive_power_a = apparent_power_a * 0.98 # 短路时无功较大
reactive_power_b = apparent_power_b * 0.98
reactive_power_c = apparent_power_c * 0.98
reactive_power_total = reactive_power_a + reactive_power_b + reactive_power_c
sample = {
"UAB": uab, "UBC": ubc, "UCA": uca,
"UA": ua, "UB": ub, "UC": uc,
"IA": ia, "IB": ib, "IC": ic, "I0": i0,
"有功功率": active_power_total,
"A相有功功率": active_power_a, "B相有功功率": active_power_b, "C相有功功率": active_power_c,
"无功功率": reactive_power_total,
"A相无功功率": reactive_power_a, "B相无功功率": reactive_power_b, "C相无功功率": reactive_power_c,
"视在功率": apparent_power_total,
"A相视在功率": apparent_power_a, "B相视在功率": apparent_power_b, "C相视在功率": apparent_power_c,
"故障类型": fault_type["二级分类"],
"一级分类": fault_type["一级分类"],
"二级分类": fault_type["二级分类"]
}
data.append(sample)
return data
def generate_ground_fault_data(num_samples=100):
"""生成接地故障数据"""
data = []
fault_types = [
{"一级分类": "接地故障", "二级分类": "单相接地"},
{"一级分类": "接地故障", "二级分类": "低阻接地"},
{"一级分类": "接地故障", "二级分类": "高阻接地"},
{"一级分类": "接地故障", "二级分类": "间歇性接地"},
{"一级分类": "接地故障", "二级分类": "中性点接地系统故障"}
]
nominal_voltage = 10000
nominal_current = 100
for _ in range(num_samples):
fault_type = random.choice(fault_types)
if fault_type["二级分类"] == "单相接地":
# 随机选择一相接地
fault_phase = random.choice(["A", "B", "C"])
# 接地相电压降低,其他相电压升高
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0.1, 0.3) if fault_phase == "A" else random.uniform(1.1, 1.3))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0.1, 0.3) if fault_phase == "B" else random.uniform(1.1, 1.3))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0.1, 0.3) if fault_phase == "C" else random.uniform(1.1, 1.3))
# 相电压计算出线电压
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
# 接地相电流可能增加
ia = nominal_current * (random.uniform(1.2, 2.0) if fault_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(1.2, 2.0) if fault_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(1.2, 2.0) if fault_phase == "C" else random.uniform(0.8, 1.0))
# 零序电流明显
i0 = (ia + ib + ic) * random.uniform(0.3, 0.8)
elif fault_type["二级分类"] == "低阻接地":
# 低阻接地:接地电流较大
fault_phase = random.choice(["A", "B", "C"])
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0.05, 0.15) if fault_phase == "A" else random.uniform(1.1, 1.3))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0.05, 0.15) if fault_phase == "B" else random.uniform(1.1, 1.3))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0.05, 0.15) if fault_phase == "C" else random.uniform(1.1, 1.3))
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
ia = nominal_current * (random.uniform(2.0, 5.0) if fault_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(2.0, 5.0) if fault_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(2.0, 5.0) if fault_phase == "C" else random.uniform(0.8, 1.0))
# 低阻接地零序电流大
i0 = (ia + ib + ic) * random.uniform(0.6, 0.9)
elif fault_type["二级分类"] == "高阻接地":
# 高阻接地:接地电流较小
fault_phase = random.choice(["A", "B", "C"])
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0.4, 0.8) if fault_phase == "A" else random.uniform(1.05, 1.2))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0.4, 0.8) if fault_phase == "B" else random.uniform(1.05, 1.2))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0.4, 0.8) if fault_phase == "C" else random.uniform(1.05, 1.2))
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
ia = nominal_current * (random.uniform(1.0, 1.3) if fault_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(1.0, 1.3) if fault_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(1.0, 1.3) if fault_phase == "C" else random.uniform(0.8, 1.0))
# 高阻接地零序电流小
i0 = (ia + ib + ic) * random.uniform(0.05, 0.2)
elif fault_type["二级分类"] == "间歇性接地":
# 间歇性接地:电流电压波动
fault_phase = random.choice(["A", "B", "C"])
# 模拟波动性
wave_factor = random.uniform(0.3, 0.9)
ua = nominal_voltage / np.sqrt(3) * (wave_factor if fault_phase == "A" else random.uniform(1.0, 1.2))
ub = nominal_voltage / np.sqrt(3) * (wave_factor if fault_phase == "B" else random.uniform(1.0, 1.2))
uc = nominal_voltage / np.sqrt(3) * (wave_factor if fault_phase == "C" else random.uniform(1.0, 1.2))
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
wave_factor = random.uniform(1.0, 2.0)
ia = nominal_current * (wave_factor if fault_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (wave_factor if fault_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (wave_factor if fault_phase == "C" else random.uniform(0.8, 1.0))
# 间歇性接地零序电流波动
i0 = (ia + ib + ic) * random.uniform(0.1, 0.7)
else: # 中性点接地系统故障
# 三相电压整体不平衡
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
ia = nominal_current * random.uniform(0.8, 1.2)
ib = nominal_current * random.uniform(0.8, 1.2)
ic = nominal_current * random.uniform(0.8, 1.2)
# 中性点接地故障零序电流异常
i0 = (ia + ib + ic) * random.uniform(0.2, 0.4)
# 计算功率
power_factor = random.uniform(0.7, 0.9)
active_power_a = ua * ia * power_factor
active_power_b = ub * ib * power_factor
active_power_c = uc * ic * power_factor
active_power_total = active_power_a + active_power_b + active_power_c
apparent_power_a = ua * ia
apparent_power_b = ub * ib
apparent_power_c = uc * ic
apparent_power_total = apparent_power_a + apparent_power_b + apparent_power_c
reactive_power_a = apparent_power_a * np.sin(np.arccos(power_factor))
reactive_power_b = apparent_power_b * np.sin(np.arccos(power_factor))
reactive_power_c = apparent_power_c * np.sin(np.arccos(power_factor))
reactive_power_total = reactive_power_a + reactive_power_b + reactive_power_c
sample = {
"UAB": uab, "UBC": ubc, "UCA": uca,
"UA": ua, "UB": ub, "UC": uc,
"IA": ia, "IB": ib, "IC": ic, "I0": i0,
"有功功率": active_power_total,
"A相有功功率": active_power_a, "B相有功功率": active_power_b, "C相有功功率": active_power_c,
"无功功率": reactive_power_total,
"A相无功功率": reactive_power_a, "B相无功功率": reactive_power_b, "C相无功功率": reactive_power_c,
"视在功率": apparent_power_total,
"A相视在功率": apparent_power_a, "B相视在功率": apparent_power_b, "C相视在功率": apparent_power_c,
"故障类型": fault_type["二级分类"],
"一级分类": fault_type["一级分类"],
"二级分类": fault_type["二级分类"]
}
data.append(sample)
return data
def generate_voltage_loss_data(num_samples=100):
"""生成失压故障数据"""
data = []
fault_types = [
{"一级分类": "失压故障", "二级分类": "全电压失压"},
{"一级分类": "失压故障", "二级分类": "部分失压"},
{"一级分类": "失压故障", "二级分类": "电压骤降"},
{"一级分类": "失压故障", "二级分类": "欠电压"},
{"一级分类": "失压故障", "二级分类": "电压不平衡"}
]
nominal_voltage = 10000
nominal_current = 100
for _ in range(num_samples):
fault_type = random.choice(fault_types)
if fault_type["二级分类"] == "全电压失压":
# 全电压失压:三相电压接近零
uab = nominal_voltage * random.uniform(0, 0.05)
ubc = nominal_voltage * random.uniform(0, 0.05)
uca = nominal_voltage * random.uniform(0, 0.05)
ua = uab / np.sqrt(3) * random.uniform(0, 0.05)
ub = ubc / np.sqrt(3) * random.uniform(0, 0.05)
uc = uca / np.sqrt(3) * random.uniform(0, 0.05)
# 电流也接近零
ia = nominal_current * random.uniform(0, 0.1)
ib = nominal_current * random.uniform(0, 0.1)
ic = nominal_current * random.uniform(0, 0.1)
i0 = (ia + ib + ic) * random.uniform(0, 0.1)
elif fault_type["二级分类"] == "部分失压":
# 部分失压:部分相位电压消失
lost_phases = random.sample(["A", "B", "C"], random.choice([1, 2]))
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0, 0.05) if "A" in lost_phases else random.uniform(0.95, 1.05))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0, 0.05) if "B" in lost_phases else random.uniform(0.95, 1.05))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0, 0.05) if "C" in lost_phases else random.uniform(0.95, 1.05))
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
ia = nominal_current * (random.uniform(0, 0.1) if "A" in lost_phases else random.uniform(0.95, 1.05))
ib = nominal_current * (random.uniform(0, 0.1) if "B" in lost_phases else random.uniform(0.95, 1.05))
ic = nominal_current * (random.uniform(0, 0.1) if "C" in lost_phases else random.uniform(0.95, 1.05))
i0 = (ia + ib + ic) * random.uniform(0.1, 0.3)
elif fault_type["二级分类"] == "电压骤降":
# 电压骤降:电压显著下降但不为零
drop_factor = random.uniform(0.5, 0.85)
uab = nominal_voltage * drop_factor
ubc = nominal_voltage * drop_factor
uca = nominal_voltage * drop_factor
ua = uab / np.sqrt(3) * random.uniform(0.95, 1.05)
ub = ubc / np.sqrt(3) * random.uniform(0.95, 1.05)
uc = uca / np.sqrt(3) * random.uniform(0.95, 1.05)
# 电流可能略有下降
ia = nominal_current * random.uniform(0.8, 1.0)
ib = nominal_current * random.uniform(0.8, 1.0)
ic = nominal_current * random.uniform(0.8, 1.0)
i0 = (ia + ib + ic) * random.uniform(0.05, 0.15)
elif fault_type["二级分类"] == "欠电压":
# 欠电压:持续的较低电压
low_factor = random.uniform(0.7, 0.9)
uab = nominal_voltage * low_factor
ubc = nominal_voltage * low_factor
uca = nominal_voltage * low_factor
ua = uab / np.sqrt(3) * random.uniform(0.95, 1.05)
ub = ubc / np.sqrt(3) * random.uniform(0.95, 1.05)
uc = uca / np.sqrt(3) * random.uniform(0.95, 1.05)
# 电流可能增加(因为负载试图维持恒定功率)
ia = nominal_current * random.uniform(1.0, 1.2)
ib = nominal_current * random.uniform(1.0, 1.2)
ic = nominal_current * random.uniform(1.0, 1.2)
i0 = (ia + ib + ic) * random.uniform(0.05, 0.15)
else: # 电压不平衡
# 电压不平衡:三相电压不平衡
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.8, 1.2)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.8, 1.2)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.8, 1.2)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
# 电流也可能不平衡
ia = nominal_current * random.uniform(0.8, 1.2)
ib = nominal_current * random.uniform(0.8, 1.2)
ic = nominal_current * random.uniform(0.8, 1.2)
i0 = (ia + ib + ic) * random.uniform(0.1, 0.3)
# 计算功率
power_factor = random.uniform(0.7, 0.9)
active_power_a = ua * ia * power_factor
active_power_b = ub * ib * power_factor
active_power_c = uc * ic * power_factor
active_power_total = active_power_a + active_power_b + active_power_c
apparent_power_a = ua * ia
apparent_power_b = ub * ib
apparent_power_c = uc * ic
apparent_power_total = apparent_power_a + apparent_power_b + apparent_power_c
reactive_power_a = apparent_power_a * np.sin(np.arccos(power_factor))
reactive_power_b = apparent_power_b * np.sin(np.arccos(power_factor))
reactive_power_c = apparent_power_c * np.sin(np.arccos(power_factor))
reactive_power_total = reactive_power_a + reactive_power_b + reactive_power_c
sample = {
"UAB": uab, "UBC": ubc, "UCA": uca,
"UA": ua, "UB": ub, "UC": uc,
"IA": ia, "IB": ib, "IC": ic, "I0": i0,
"有功功率": active_power_total,
"A相有功功率": active_power_a, "B相有功功率": active_power_b, "C相有功功率": active_power_c,
"无功功率": reactive_power_total,
"A相无功功率": reactive_power_a, "B相无功功率": reactive_power_b, "C相无功功率": reactive_power_c,
"视在功率": apparent_power_total,
"A相视在功率": apparent_power_a, "B相视在功率": apparent_power_b, "C相视在功率": apparent_power_c,
"故障类型": fault_type["二级分类"],
"一级分类": fault_type["一级分类"],
"二级分类": fault_type["二级分类"]
}
data.append(sample)
return data
def generate_overload_data(num_samples=100):
"""生成过载故障数据"""
data = []
fault_types = [
{"一级分类": "过载故障", "二级分类": "持续过载"},
{"一级分类": "过载故障", "二级分类": "短时过载"},
{"一级分类": "过载故障", "二级分类": "变压器过载"},
{"一级分类": "过载故障", "二级分类": "线路过载"},
{"一级分类": "过载故障", "二级分类": "开关设备过载"}
]
nominal_voltage = 10000
nominal_current = 100
for _ in range(num_samples):
fault_type = random.choice(fault_types)
if fault_type["二级分类"] == "持续过载":
# 持续过载:电流持续超过额定值
overload_factor = random.uniform(1.2, 1.5)
# 电压可能有轻微下降
uab = nominal_voltage * random.uniform(0.9, 1.0)
ubc = nominal_voltage * random.uniform(0.9, 1.0)
uca = nominal_voltage * random.uniform(0.9, 1.0)
ua = uab / np.sqrt(3) * random.uniform(0.95, 1.05)
ub = ubc / np.sqrt(3) * random.uniform(0.95, 1.05)
uc = uca / np.sqrt(3) * random.uniform(0.95, 1.05)
# 所有相电流都增加
ia = nominal_current * overload_factor * random.uniform(0.95, 1.05)
ib = nominal_current * overload_factor * random.uniform(0.95, 1.05)
ic = nominal_current * overload_factor * random.uniform(0.95, 1.05)
i0 = (ia + ib + ic) * random.uniform(0.05, 0.15)
elif fault_type["二级分类"] == "短时过载":
# 短时过载:电流短时间大幅增加
overload_factor = random.uniform(1.5, 2.5)
uab = nominal_voltage * random.uniform(0.85, 0.95)
ubc = nominal_voltage * random.uniform(0.85, 0.95)
uca = nominal_voltage * random.uniform(0.85, 0.95)
ua = uab / np.sqrt(3) * random.uniform(0.95, 1.05)
ub = ubc / np.sqrt(3) * random.uniform(0.95, 1.05)
uc = uca / np.sqrt(3) * random.uniform(0.95, 1.05)
ia = nominal_current * overload_factor * random.uniform(0.9, 1.1)
ib = nominal_current * overload_factor * random.uniform(0.9, 1.1)
ic = nominal_current * overload_factor * random.uniform(0.9, 1.1)
i0 = (ia + ib + ic) * random.uniform(0.1, 0.2)
elif fault_type["二级分类"] == "变压器过载":
# 变压器过载:电流增加,电压可能有下降
overload_factor = random.uniform(1.3, 1.8)
uab = nominal_voltage * random.uniform(0.9, 0.98)
ubc = nominal_voltage * random.uniform(0.9, 0.98)
uca = nominal_voltage * random.uniform(0.9, 0.98)
ua = uab / np.sqrt(3) * random.uniform(0.95, 1.05)
ub = ubc / np.sqrt(3) * random.uniform(0.95, 1.05)
uc = uca / np.sqrt(3) * random.uniform(0.95, 1.05)
ia = nominal_current * overload_factor * random.uniform(0.95, 1.05)
ib = nominal_current * overload_factor * random.uniform(0.95, 1.05)
ic = nominal_current * overload_factor * random.uniform(0.95, 1.05)
i0 = (ia + ib + ic) * random.uniform(0.05, 0.2)
elif fault_type["二级分类"] == "线路过载":
# 线路过载:电流增加,可能有电压降
overload_factor = random.uniform(1.2, 2.0)
# 线路阻抗导致的电压降
voltage_drop = random.uniform(0.1, 0.2)
uab = nominal_voltage * (1 - voltage_drop)
ubc = nominal_voltage * (1 - voltage_drop)
uca = nominal_voltage * (1 - voltage_drop)
ua = uab / np.sqrt(3) * random.uniform(0.95, 1.05)
ub = ubc / np.sqrt(3) * random.uniform(0.95, 1.05)
uc = uca / np.sqrt(3) * random.uniform(0.95, 1.05)
ia = nominal_current * overload_factor * random.uniform(0.95, 1.05)
ib = nominal_current * overload_factor * random.uniform(0.95, 1.05)
ic = nominal_current * overload_factor * random.uniform(0.95, 1.05)
i0 = (ia + ib + ic) * random.uniform(0.05, 0.15)
else: # 开关设备过载
# 开关设备过载:电流增加,可能有局部发热
overload_factor = random.uniform(1.3, 1.7)
uab = nominal_voltage * random.uniform(0.92, 1.0)
ubc = nominal_voltage * random.uniform(0.92, 1.0)
uca = nominal_voltage * random.uniform(0.92, 1.0)
ua = uab / np.sqrt(3) * random.uniform(0.95, 1.05)
ub = ubc / np.sqrt(3) * random.uniform(0.95, 1.05)
uc = uca / np.sqrt(3) * random.uniform(0.95, 1.05)
# 某一相可能负载更重
heavy_phase = random.choice(["A", "B", "C"])
ia = nominal_current * (overload_factor * 1.2 if heavy_phase == "A" else overload_factor) * random.uniform(0.95, 1.05)
ib = nominal_current * (overload_factor * 1.2 if heavy_phase == "B" else overload_factor) * random.uniform(0.95, 1.05)
ic = nominal_current * (overload_factor * 1.2 if heavy_phase == "C" else overload_factor) * random.uniform(0.95, 1.05)
i0 = (ia + ib + ic) * random.uniform(0.1, 0.2)
# 计算功率
power_factor = random.uniform(0.75, 0.95) # 过载情况下功率因数可能较好
active_power_a = ua * ia * power_factor
active_power_b = ub * ib * power_factor
active_power_c = uc * ic * power_factor
active_power_total = active_power_a + active_power_b + active_power_c
apparent_power_a = ua * ia
apparent_power_b = ub * ib
apparent_power_c = uc * ic
apparent_power_total = apparent_power_a + apparent_power_b + apparent_power_c
reactive_power_a = apparent_power_a * np.sin(np.arccos(power_factor))
reactive_power_b = apparent_power_b * np.sin(np.arccos(power_factor))
reactive_power_c = apparent_power_c * np.sin(np.arccos(power_factor))
reactive_power_total = reactive_power_a + reactive_power_b + reactive_power_c
sample = {
"UAB": uab, "UBC": ubc, "UCA": uca,
"UA": ua, "UB": ub, "UC": uc,
"IA": ia, "IB": ib, "IC": ic, "I0": i0,
"有功功率": active_power_total,
"A相有功功率": active_power_a, "B相有功功率": active_power_b, "C相有功功率": active_power_c,
"无功功率": reactive_power_total,
"A相无功功率": reactive_power_a, "B相无功功率": reactive_power_b, "C相无功功率": reactive_power_c,
"视在功率": apparent_power_total,
"A相视在功率": apparent_power_a, "B相视在功率": apparent_power_b, "C相视在功率": apparent_power_c,
"故障类型": fault_type["二级分类"],
"一级分类": fault_type["一级分类"],
"二级分类": fault_type["二级分类"]
}
data.append(sample)
return data
def generate_open_circuit_data(num_samples=100):
"""生成断路故障数据"""
data = []
fault_types = [
{"一级分类": "断路故障", "二级分类": "导线断线"},
{"一级分类": "断路故障", "二级分类": "接头断开"},
{"一级分类": "断路故障", "二级分类": "开关拒动"},
{"一级分类": "断路故障", "二级分类": "熔断器熔断"},
{"一级分类": "断路故障", "二级分类": "机械性断路"}
]
nominal_voltage = 10000
nominal_current = 100
for _ in range(num_samples):
fault_type = random.choice(fault_types)
if fault_type["二级分类"] == "导线断线":
# 导线断线:断线相电流为零,电压可能异常
broken_phase = random.choice(["A", "B", "C"])
ua = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "A" else random.uniform(0.9, 1.1))
ub = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "B" else random.uniform(0.9, 1.1))
uc = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "C" else random.uniform(0.9, 1.1))
# 根据相电压计算线电压
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
ia = nominal_current * (0 if broken_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (0 if broken_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (0 if broken_phase == "C" else random.uniform(0.8, 1.0))
i0 = (ia + ib + ic) * random.uniform(0.1, 0.3)
elif fault_type["二级分类"] == "接头断开":
# 接头断开:局部接触电阻增大,电流波动
bad_joint_phase = random.choice(["A", "B", "C"])
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
# 接头不良可能导致电流波动减小
ia = nominal_current * (random.uniform(0.3, 0.7) if bad_joint_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(0.3, 0.7) if bad_joint_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(0.3, 0.7) if bad_joint_phase == "C" else random.uniform(0.8, 1.0))
i0 = (ia + ib + ic) * random.uniform(0.1, 0.2)
elif fault_type["二级分类"] == "开关拒动":
# 开关拒动:可能表现为电流正常但无法切断
# 这里模拟某些相电流正常,某些相无法切断
stuck_phase = random.choice(["A", "B", "C"])
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
# 拒动相电流可能仍在流动
ia = nominal_current * (random.uniform(0.8, 1.0) if stuck_phase == "A" else 0)
ib = nominal_current * (random.uniform(0.8, 1.0) if stuck_phase == "B" else 0)
ic = nominal_current * (random.uniform(0.8, 1.0) if stuck_phase == "C" else 0)
i0 = (ia + ib + ic) * random.uniform(0.1, 0.3)
elif fault_type["二级分类"] == "熔断器熔断":
# 熔断器熔断:相电流为零,电压可能仍有
fused_phase = random.choice(["A", "B", "C"])
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
# 熔断相电流为零
ia = nominal_current * (0 if fused_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (0 if fused_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (0 if fused_phase == "C" else random.uniform(0.8, 1.0))
i0 = (ia + ib + ic) * random.uniform(0.1, 0.2)
else: # 机械性断路
# 机械性断路:可能表现为较复杂的电气特性
broken_phases = random.sample(["A", "B", "C"], random.choice([1, 2]))
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0, 0.3) if "A" in broken_phases else random.uniform(0.9, 1.1))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0, 0.3) if "B" in broken_phases else random.uniform(0.9, 1.1))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0, 0.3) if "C" in broken_phases else random.uniform(0.9, 1.1))
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
ia = nominal_current * (random.uniform(0, 0.2) if "A" in broken_phases else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(0, 0.2) if "B" in broken_phases else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(0, 0.2) if "C" in broken_phases else random.uniform(0.8, 1.0))
i0 = (ia + ib + ic) * random.uniform(0.1, 0.3)
# 计算功率 - 断路情况下功率会减小
power_factor = random.uniform(0.7, 0.9)
active_power_a = ua * ia * power_factor
active_power_b = ub * ib * power_factor
active_power_c = uc * ic * power_factor
active_power_total = active_power_a + active_power_b + active_power_c
apparent_power_a = ua * ia
apparent_power_b = ub * ib
apparent_power_c = uc * ic
apparent_power_total = apparent_power_a + apparent_power_b + apparent_power_c
reactive_power_a = apparent_power_a * np.sin(np.arccos(power_factor))
reactive_power_b = apparent_power_b * np.sin(np.arccos(power_factor))
reactive_power_c = apparent_power_c * np.sin(np.arccos(power_factor))
reactive_power_total = reactive_power_a + reactive_power_b + reactive_power_c
sample = {
"UAB": uab, "UBC": ubc, "UCA": uca,
"UA": ua, "UB": ub, "UC": uc,
"IA": ia, "IB": ib, "IC": ic, "I0": i0,
"有功功率": active_power_total,
"A相有功功率": active_power_a, "B相有功功率": active_power_b, "C相有功功率": active_power_c,
"无功功率": reactive_power_total,
"A相无功功率": reactive_power_a, "B相无功功率": reactive_power_b, "C相无功功率": reactive_power_c,
"视在功率": apparent_power_total,
"A相视在功率": apparent_power_a, "B相视在功率": apparent_power_b, "C相视在功率": apparent_power_c,
"故障类型": fault_type["二级分类"],
"一级分类": fault_type["一级分类"],
"二级分类": fault_type["二级分类"]
}
data.append(sample)
return data
def generate_equipment_fault_data(num_samples=100):
"""生成设备故障数据"""
data = []
fault_types = [
{"一级分类": "设备故障", "二级分类": "变压器故障"},
{"一级分类": "设备故障", "二级分类": "断路器故障"},
{"一级分类": "设备故障", "二级分类": "电缆故障"},
{"一级分类": "设备故障", "二级分类": "避雷器故障"},
{"一级分类": "设备故障", "二级分类": "电容器故障"},
{"一级分类": "设备故障", "二级分类": "互感器故障"}
]
nominal_voltage = 10000
nominal_current = 100
for _ in range(num_samples):
fault_type = random.choice(fault_types)
if fault_type["二级分类"] == "变压器故障":
# 变压器故障:可能表现为电压电流不平衡等
fault_severity = random.choice(["轻微", "严重"])
if fault_severity == "轻微":
# 轻微故障:电压轻微不平衡,可能有增加的零序电流
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
ia = nominal_current * random.uniform(0.9, 1.1)
ib = nominal_current * random.uniform(0.9, 1.1)
ic = nominal_current * random.uniform(0.9, 1.1)
i0 = (ia + ib + ic) * random.uniform(0.1, 0.25)
else:
# 严重故障:可能有明显的电压下降或电流上升
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.7, 0.9)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.7, 0.9)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.7, 0.9)
ia = nominal_current * random.uniform(1.2, 1.5)
ib = nominal_current * random.uniform(1.2, 1.5)
ic = nominal_current * random.uniform(1.2, 1.5)
i0 = (ia + ib + ic) * random.uniform(0.2, 0.4)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
elif fault_type["二级分类"] == "断路器故障":
# 断路器故障:可能导致无法正确切断电流
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
# 可能有一相电流异常(未能正确切断)
faulty_phase = random.choice(["A", "B", "C"])
ia = nominal_current * (random.uniform(1.0, 1.5) if faulty_phase == "A" else random.uniform(0.1, 0.3))
ib = nominal_current * (random.uniform(1.0, 1.5) if faulty_phase == "B" else random.uniform(0.1, 0.3))
ic = nominal_current * (random.uniform(1.0, 1.5) if faulty_phase == "C" else random.uniform(0.1, 0.3))
i0 = (ia + ib + ic) * random.uniform(0.2, 0.4)
elif fault_type["二级分类"] == "电缆故障":
# 电缆故障:可能表现为绝缘问题,泄漏电流
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
ia = nominal_current * random.uniform(0.8, 1.2)
ib = nominal_current * random.uniform(0.8, 1.2)
ic = nominal_current * random.uniform(0.8, 1.2)
# 电缆故障可能导致较大的零序电流
i0 = (ia + ib + ic) * random.uniform(0.3, 0.5)
elif fault_type["二级分类"] == "避雷器故障":
# 避雷器故障:正常运行时可能不明显,但可能有泄漏电流
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.95, 1.05)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.95, 1.05)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.95, 1.05)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
ia = nominal_current * random.uniform(0.9, 1.1)
ib = nominal_current * random.uniform(0.9, 1.1)
ic = nominal_current * random.uniform(0.9, 1.1)
# 轻微的泄漏电流
i0 = (ia + ib + ic) * random.uniform(0.05, 0.15)
elif fault_type["二级分类"] == "电容器故障":
# 电容器故障:可能导致无功功率不平衡
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.95, 1.05)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.95, 1.05)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.95, 1.05)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
faulty_phase = random.choice(["A", "B", "C"])
ia = nominal_current * random.uniform(0.8, 1.2)
ib = nominal_current * random.uniform(0.8, 1.2)
ic = nominal_current * random.uniform(0.8, 1.2)
i0 = (ia + ib + ic) * random.uniform(0.1, 0.2)
# 电容器故障会影响功率因数计算
power_factor = random.uniform(0.6, 0.8)
else: # 互感器故障
# 互感器故障:可能导致测量值异常
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
faulty_phase = random.choice(["A", "B", "C"])
# 互感器故障可能导致测量值异常偏高或偏低
if random.choice([True, False]):
# 异常偏高
ia = nominal_current * (random.uniform(1.5, 3.0) if faulty_phase == "A" else random.uniform(0.8, 1.2))
ib = nominal_current * (random.uniform(1.5, 3.0) if faulty_phase == "B" else random.uniform(0.8, 1.2))
ic = nominal_current * (random.uniform(1.5, 3.0) if faulty_phase == "C" else random.uniform(0.8, 1.2))
else:
# 异常偏低
ia = nominal_current * (random.uniform(0.1, 0.5) if faulty_phase == "A" else random.uniform(0.8, 1.2))
ib = nominal_current * (random.uniform(0.1, 0.5) if faulty_phase == "B" else random.uniform(0.8, 1.2))
ic = nominal_current * (random.uniform(0.1, 0.5) if faulty_phase == "C" else random.uniform(0.8, 1.2))
i0 = (ia + ib + ic) * random.uniform(0.1, 0.3)
# 计算功率
power_factor = random.uniform(0.7, 0.9)
active_power_a = ua * ia * power_factor
active_power_b = ub * ib * power_factor
active_power_c = uc * ic * power_factor
active_power_total = active_power_a + active_power_b + active_power_c
apparent_power_a = ua * ia
apparent_power_b = ub * ib
apparent_power_c = uc * ic
apparent_power_total = apparent_power_a + apparent_power_b + apparent_power_c
reactive_power_a = apparent_power_a * np.sin(np.arccos(power_factor))
reactive_power_b = apparent_power_b * np.sin(np.arccos(power_factor))
reactive_power_c = apparent_power_c * np.sin(np.arccos(power_factor))
reactive_power_total = reactive_power_a + reactive_power_b + reactive_power_c
sample = {
"UAB": uab, "UBC": ubc, "UCA": uca,
"UA": ua, "UB": ub, "UC": uc,
"IA": ia, "IB": ib, "IC": ic, "I0": i0,
"有功功率": active_power_total,
"A相有功功率": active_power_a, "B相有功功率": active_power_b, "C相有功功率": active_power_c,
"无功功率": reactive_power_total,
"A相无功功率": reactive_power_a, "B相无功功率": reactive_power_b, "C相无功功率": reactive_power_c,
"视在功率": apparent_power_total,
"A相视在功率": apparent_power_a, "B相视在功率": apparent_power_b, "C相视在功率": apparent_power_c,
"故障类型": fault_type["二级分类"],
"一级分类": fault_type["一级分类"],
"二级分类": fault_type["二级分类"]
}
data.append(sample)
return data
def generate_environmental_fault_data(num_samples=100):
"""生成环境因素导致的故障数据"""
data = []
fault_types = [
{"一级分类": "环境因素", "二级分类": "雷击故障"},
{"一级分类": "环境因素", "二级分类": "污秽闪络"},
{"一级分类": "环境因素", "二级分类": "风灾故障"},
{"一级分类": "环境因素", "二级分类": "冰雪灾害"},
{"一级分类": "环境因素", "二级分类": "外力破坏"},
{"一级分类": "环境因素", "二级分类": "动物侵害"}
]
nominal_voltage = 10000
nominal_current = 100
for _ in range(num_samples):
fault_type = random.choice(fault_types)
if fault_type["二级分类"] == "雷击故障":
# 雷击故障:可能导致瞬时过电压和绝缘击穿
# 随机选择受影响的相
affected_phases = random.sample(["A", "B", "C"], random.choice([1, 2, 3]))
ua = nominal_voltage / np.sqrt(3) * (random.uniform(1.3, 2.0) if "A" in affected_phases else random.uniform(0.9, 1.1))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(1.3, 2.0) if "B" in affected_phases else random.uniform(0.9, 1.1))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(1.3, 2.0) if "C" in affected_phases else random.uniform(0.9, 1.1))
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
# 雷击可能导致瞬时大电流
ia = nominal_current * (random.uniform(3.0, 8.0) if "A" in affected_phases else random.uniform(0.8, 1.2))
ib = nominal_current * (random.uniform(3.0, 8.0) if "B" in affected_phases else random.uniform(0.8, 1.2))
ic = nominal_current * (random.uniform(3.0, 8.0) if "C" in affected_phases else random.uniform(0.8, 1.2))
i0 = (ia + ib + ic) * random.uniform(0.3, 0.7)
elif fault_type["二级分类"] == "污秽闪络":
# 污秽闪络:可能导致绝缘子表面闪络
# 随机选择受影响的相
affected_phases = random.sample(["A", "B", "C"], random.choice([1, 2]))
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0.7, 0.9) if "A" in affected_phases else random.uniform(0.9, 1.1))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0.7, 0.9) if "B" in affected_phases else random.uniform(0.9, 1.1))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0.7, 0.9) if "C" in affected_phases else random.uniform(0.9, 1.1))
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
# 污秽闪络可能导致泄漏电流增加
ia = nominal_current * (random.uniform(1.2, 2.0) if "A" in affected_phases else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(1.2, 2.0) if "B" in affected_phases else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(1.2, 2.0) if "C" in affected_phases else random.uniform(0.8, 1.0))
i0 = (ia + ib + ic) * random.uniform(0.2, 0.5)
elif fault_type["二级分类"] == "风灾故障":
# 风灾故障:可能导致导线摇摆、短路或断线
fault_mode = random.choice(["摇摆接触", "短路", "断线"])
if fault_mode == "摇摆接触":
# 导线摇摆可能导致相间短路
affected_phases = random.sample(["A", "B", "C"], 2)
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0.5, 0.8) if "A" in affected_phases else random.uniform(0.9, 1.1))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0.5, 0.8) if "B" in affected_phases else random.uniform(0.9, 1.1))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0.5, 0.8) if "C" in affected_phases else random.uniform(0.9, 1.1))
ia = nominal_current * (random.uniform(1.5, 3.0) if "A" in affected_phases else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(1.5, 3.0) if "B" in affected_phases else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(1.5, 3.0) if "C" in affected_phases else random.uniform(0.8, 1.0))
elif fault_mode == "短路":
# 短路导致电压降低、电流增大
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.1, 0.4)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.1, 0.4)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.1, 0.4)
ia = nominal_current * random.uniform(5.0, 10.0)
ib = nominal_current * random.uniform(5.0, 10.0)
ic = nominal_current * random.uniform(5.0, 10.0)
else: # 断线
# 断线导致部分相电流为零
broken_phase = random.choice(["A", "B", "C"])
ua = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "A" else random.uniform(0.9, 1.1))
ub = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "B" else random.uniform(0.9, 1.1))
uc = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "C" else random.uniform(0.9, 1.1))
ia = nominal_current * (0 if broken_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (0 if broken_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (0 if broken_phase == "C" else random.uniform(0.8, 1.0))
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
i0 = (ia + ib + ic) * random.uniform(0.2, 0.6)
elif fault_type["二级分类"] == "冰雪灾害":
# 冰雪灾害:可能导致断线或负载不平衡
fault_mode = random.choice(["断线", "负载不平衡"])
if fault_mode == "断线":
# 断线导致部分相电流为零
broken_phase = random.choice(["A", "B", "C"])
ua = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "A" else random.uniform(0.9, 1.1))
ub = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "B" else random.uniform(0.9, 1.1))
uc = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "C" else random.uniform(0.9, 1.1))
ia = nominal_current * (0 if broken_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (0 if broken_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (0 if broken_phase == "C" else random.uniform(0.8, 1.0))
else: # 负载不平衡
# 负载不平衡导致电流不平衡
ua = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
ub = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
uc = nominal_voltage / np.sqrt(3) * random.uniform(0.9, 1.1)
ia = nominal_current * random.uniform(0.5, 1.5)
ib = nominal_current * random.uniform(0.5, 1.5)
ic = nominal_current * random.uniform(0.5, 1.5)
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
i0 = (ia + ib + ic) * random.uniform(0.2, 0.5)
elif fault_type["二级分类"] == "外力破坏":
# 外力破坏:可能导致断线、短路等
fault_mode = random.choice(["断线", "短路", "绝缘损坏"])
if fault_mode == "断线":
# 断线导致部分相电流为零
broken_phase = random.choice(["A", "B", "C"])
ua = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "A" else random.uniform(0.9, 1.1))
ub = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "B" else random.uniform(0.9, 1.1))
uc = nominal_voltage / np.sqrt(3) * (0 if broken_phase == "C" else random.uniform(0.9, 1.1))
ia = nominal_current * (0 if broken_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (0 if broken_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (0 if broken_phase == "C" else random.uniform(0.8, 1.0))
elif fault_mode == "短路":
# 短路导致电压降低、电流增大
affected_phases = random.sample(["A", "B", "C"], random.choice([2, 3]))
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0.1, 0.3) if "A" in affected_phases else random.uniform(0.9, 1.1))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0.1, 0.3) if "B" in affected_phases else random.uniform(0.9, 1.1))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0.1, 0.3) if "C" in affected_phases else random.uniform(0.9, 1.1))
ia = nominal_current * (random.uniform(5.0, 10.0) if "A" in affected_phases else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(5.0, 10.0) if "B" in affected_phases else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(5.0, 10.0) if "C" in affected_phases else random.uniform(0.8, 1.0))
else: # 绝缘损坏
# 绝缘损坏可能导致接地
affected_phase = random.choice(["A", "B", "C"])
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0.3, 0.7) if affected_phase == "A" else random.uniform(0.9, 1.1))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0.3, 0.7) if affected_phase == "B" else random.uniform(0.9, 1.1))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0.3, 0.7) if affected_phase == "C" else random.uniform(0.9, 1.1))
ia = nominal_current * (random.uniform(1.2, 2.0) if affected_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(1.2, 2.0) if affected_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(1.2, 2.0) if affected_phase == "C" else random.uniform(0.8, 1.0))
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
i0 = (ia + ib + ic) * random.uniform(0.2, 0.6)
else: # 动物侵害
# 动物侵害:可能导致相间短路或接地
fault_mode = random.choice(["相间短路", "接地"])
if fault_mode == "相间短路":
# 相间短路
affected_phases = random.sample(["A", "B", "C"], 2)
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0.3, 0.7) if "A" in affected_phases else random.uniform(0.9, 1.1))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0.3, 0.7) if "B" in affected_phases else random.uniform(0.9, 1.1))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0.3, 0.7) if "C" in affected_phases else random.uniform(0.9, 1.1))
ia = nominal_current * (random.uniform(2.0, 5.0) if "A" in affected_phases else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(2.0, 5.0) if "B" in affected_phases else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(2.0, 5.0) if "C" in affected_phases else random.uniform(0.8, 1.0))
else: # 接地
# 接地故障
affected_phase = random.choice(["A", "B", "C"])
ua = nominal_voltage / np.sqrt(3) * (random.uniform(0.1, 0.5) if affected_phase == "A" else random.uniform(1.1, 1.3))
ub = nominal_voltage / np.sqrt(3) * (random.uniform(0.1, 0.5) if affected_phase == "B" else random.uniform(1.1, 1.3))
uc = nominal_voltage / np.sqrt(3) * (random.uniform(0.1, 0.5) if affected_phase == "C" else random.uniform(1.1, 1.3))
ia = nominal_current * (random.uniform(1.2, 2.5) if affected_phase == "A" else random.uniform(0.8, 1.0))
ib = nominal_current * (random.uniform(1.2, 2.5) if affected_phase == "B" else random.uniform(0.8, 1.0))
ic = nominal_current * (random.uniform(1.2, 2.5) if affected_phase == "C" else random.uniform(0.8, 1.0))
uab = abs(ua - ub)
ubc = abs(ub - uc)
uca = abs(uc - ua)
i0 = (ia + ib + ic) * random.uniform(0.3, 0.7)
# 计算功率
power_factor = random.uniform(0.65, 0.85) # 环境因素故障下功率因数可能较差
active_power_a = ua * ia * power_factor
active_power_b = ub * ib * power_factor
active_power_c = uc * ic * power_factor
active_power_total = active_power_a + active_power_b + active_power_c
apparent_power_a = ua * ia
apparent_power_b = ub * ib
apparent_power_c = uc * ic
apparent_power_total = apparent_power_a + apparent_power_b + apparent_power_c
reactive_power_a = apparent_power_a * np.sin(np.arccos(power_factor))
reactive_power_b = apparent_power_b * np.sin(np.arccos(power_factor))
reactive_power_c = apparent_power_c * np.sin(np.arccos(power_factor))
reactive_power_total = reactive_power_a + reactive_power_b + reactive_power_c
sample = {
"UAB": uab, "UBC": ubc, "UCA": uca,
"UA": ua, "UB": ub, "UC": uc,
"IA": ia, "IB": ib, "IC": ic, "I0": i0,
"有功功率": active_power_total,
"A相有功功率": active_power_a, "B相有功功率": active_power_b, "C相有功功率": active_power_c,
"无功功率": reactive_power_total,
"A相无功功率": reactive_power_a, "B相无功功率": reactive_power_b, "C相无功功率": reactive_power_c,
"视在功率": apparent_power_total,
"A相视在功率": apparent_power_a, "B相视在功率": apparent_power_b, "C相视在功率": apparent_power_c,
"故障类型": fault_type["二级分类"],
"一级分类": fault_type["一级分类"],
"二级分类": fault_type["二级分类"]
}
data.append(sample)
return data
def add_noise_to_data(data_list, noise_level=0.05):
"""向数据中添加随机噪声"""
noisy_data = []
for sample in data_list:
noisy_sample = {}
for key, value in sample.items():
if key in ["故障类型", "一级分类", "二级分类"]:
noisy_sample[key] = value
else:
# 添加随机噪声,噪声水平为原值的noise_level比例
noise = value * random.uniform(-noise_level, noise_level)
noisy_sample[key] = value + noise
noisy_data.append(noisy_sample)
return noisy_data
def generate_all_fault_data(samples_per_category=200, noise_level=0.05):
"""生成所有类型的故障数据并添加噪声"""
normal_data = generate_normal_data(samples_per_category)
short_circuit_data = generate_short_circuit_data(samples_per_category)
ground_fault_data = generate_ground_fault_data(samples_per_category)
voltage_loss_data = generate_voltage_loss_data(samples_per_category)
overload_data = generate_overload_data(samples_per_category)
open_circuit_data = generate_open_circuit_data(samples_per_category)
equipment_fault_data = generate_equipment_fault_data(samples_per_category)
environmental_fault_data = generate_environmental_fault_data(samples_per_category)
all_data = (normal_data + short_circuit_data + ground_fault_data +
voltage_loss_data + overload_data + open_circuit_data +
equipment_fault_data + environmental_fault_data)
# 添加随机噪声
noisy_data = add_noise_to_data(all_data, noise_level)
# 随机打乱数据
random.shuffle(noisy_data)
return noisy_data
def save_to_csv(data_list, filename):
"""将数据保存为CSV文件"""
df = pd.DataFrame(data_list)
df.to_csv(filename, index=False, encoding='utf-8')
print(f"数据已保存至 {filename},共 {len(data_list)} 条记录。")
def split_train_test_data(data_list, test_ratio=0.2):
"""将数据分为训练集和测试集"""
random.shuffle(data_list)
split_idx = int(len(data_list) * (1 - test_ratio))
train_data = data_list[:split_idx]
test_data = data_list[split_idx:]
return train_data, test_data
def main():
print("开始生成电网故障数据...")
# 设置随机种子以便结果可复现
random.seed(42)
np.random.seed(42)
# 每种故障类型生成的样本数
samples_per_category = 200
# 生成所有故障数据并添加噪声
all_data = generate_all_fault_data(samples_per_category, noise_level=0.03)
# 分割训练集和测试集
train_data, test_data = split_train_test_data(all_data, test_ratio=0.2)
# 保存数据
save_to_csv(train_data, "电网故障训练数据.csv")
save_to_csv(test_data, "电网故障测试数据.csv")
# 计算并打印各类故障的分布情况
fault_types = {}
for sample in all_data:
fault_type = sample["二级分类"]
if fault_type in fault_types:
fault_types[fault_type] += 1
else:
fault_types[fault_type] = 1
print("\n故障类型分布:")
for fault_type, count in fault_types.items():
print(f"{fault_type}: {count}条记录 ({count/len(all_data)*100:.2f}%)")
if __name__ == "__main__":
main()
这个脚本为电网配网设备故障分类问题生成了模拟训练数据,具体包括以下功能:
基于您提供的故障分类表,针对每种故障类型生成了特征明显的样本数据
生成的数据包括三相电压、电流和功率等特征,以及对应的故障分类标签
模拟了各类故障的电气特性,例如:
为数据添加随机噪声,使其更接近真实情况
将数据分为训练集和测试集,并保存为CSV文件
使用方法:
您可以根据需要调整samples_per_category
参数来生成更多或更少的样本数据,也可以调整noise_level
参数来控制噪声水平。