以下是补充更多数据清洗操作的 Scala Spark 示例代码,基于你的场景(处理配电网终端数据),包括常见的清洗步骤,如去重、填补缺失值、异常值检测、数据格式转换等。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object XTUDataCleaning {
def main(args: Array[String]): Unit = {
// 初始化 SparkSession
val spark = SparkSession.builder()
.appName("XTU Data Cleaning and Processing")
.getOrCreate()
// Kafka 配置
val kafkaBrokers = "localhost:9092"
val kafkaTopic = "xtu_realtime_data"
// 定义 Kafka 数据的 Schema
val schema = StructType(Array(
StructField("device_id", StringType, true),
StructField("timestamp", LongType, true),
StructField("voltage", ArrayType(DoubleType), true),
StructField("current", ArrayType(DoubleType), true),
StructField("status", StringType, true) // 假设设备状态字段
))
// 从 Kafka 消费数据
val rawStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.load()
// 解析 Kafka 数据为结构化格式
val parsedStream = rawStream.selectExpr("CAST(value AS STRING) as json")
.select(from_json(col("json"), schema).as("data"))
.select("data.*")
// 数据清洗示例
// 1. **去重**:如果数据中存在重复的 `device_id` 和 `timestamp`,保留最新的数据
val deduplicatedStream = parsedStream.dropDuplicates("device_id", "timestamp")
// 2. **缺失值处理**:填补 voltage 和 current 的空值
val filledStream = deduplicatedStream
.withColumn("voltage", when(col("voltage").isNull, array(lit(0.0))).otherwise(col("voltage")))
.withColumn("current", when(col("current").isNull, array(lit(0.0))).otherwise(col("current")))
.withColumn("status", when(col("status").isNull, lit("UNKNOWN")).otherwise(col("status")))
// 3. **异常值检测与过滤**:过滤掉电压或电流超出合理范围的数据
val filteredStream = filledStream.filter(
array_min(col("voltage")) >= 200 && array_max(col("voltage")) <= 240 && // 假设电压范围 200~240
array_min(col("current")) >= 0 && array_max(col("current")) <= 50 // 假设电流范围 0~50
)
// 4. **时间戳转换**:将 Unix 时间戳转换为人类可读的时间格式
val timestampedStream = filteredStream.withColumn(
"readable_time",
from_unixtime(col("timestamp"))
)
// 5. **数据格式转换**:将 voltage 和 current 数组展开为多行(长表格式)
val explodedStream = timestampedStream
.withColumn("voltage_value", explode(col("voltage")))
.withColumn("current_value", explode(col("current")))
// 6. **冗余字段删除**:移除不必要的列(如原始 voltage 和 current 数组)
val cleanedStream = explodedStream.drop("voltage", "current")
// 7. **添加派生字段**:根据现有数据生成新字段
val enrichedStream = cleanedStream
.withColumn("power", col("voltage_value") * col("current_value")) // 假设功率计算公式
.withColumn("is_fault", when(col("power") > 1000, lit(true)).otherwise(lit(false))) // 功率超过 1000W 视为故障
// 输出清洗后的数据(控制台输出,仅用于调试)
val query = enrichedStream.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
query.awaitTermination()
}
}
通过 dropDuplicates
去掉 device_id
和 timestamp
相同的重复数据,防止重复计算。
val deduplicatedStream = parsedStream.dropDuplicates("device_id", "timestamp")
为 voltage
和 current
列填充默认值,使用 when
和 otherwise
函数处理 NULL 值。
val filledStream = deduplicatedStream
.withColumn("voltage", when(col("voltage").isNull, array(lit(0.0))).otherwise(col("voltage")))
.withColumn("current", when(col("current").isNull, array(lit(0.0))).otherwise(col("current")))
.withColumn("status", when(col("status").isNull, lit("UNKNOWN")).otherwise(col("status")))
通过 array_min
和 array_max
检测电压和电流是否在合理范围内,过滤掉不符合条件的数据。
val filteredStream = filledStream.filter(
array_min(col("voltage")) >= 200 && array_max(col("voltage")) <= 240 &&
array_min(col("current")) >= 0 && array_max(col("current")) <= 50
)
将 Unix 时间戳转换为人类可读的时间格式,方便调试和分析。
val timestampedStream = filteredStream.withColumn(
"readable_time",
from_unixtime(col("timestamp"))
)
使用 explode
将数组类型的列(voltage
和 current
)展开为多行,变成长表格式,便于后续计算。
val explodedStream = timestampedStream
.withColumn("voltage_value", explode(col("voltage")))
.withColumn("current_value", explode(col("current")))
移除不必要的列(如原始的 voltage
和 current
数组),减小输出数据体积。
val cleanedStream = explodedStream.drop("voltage", "current")
根据现有数据生成新字段:
power
:计算功率(假设公式为 voltage_value * current_value
)。is_fault
:当功率超过阈值(例如 1000W)时标记为故障。val enrichedStream = cleanedStream
.withColumn("power", col("voltage_value") * col("current_value"))
.withColumn("is_fault", when(col("power") > 1000, lit(true)).otherwise(lit(false)))
假设输入数据为以下 JSON:
{
"device_id": "device_001",
"timestamp": 1677763200,
"voltage": [220.1, 220.3, 219.8],
"current": [30.1, 30.2, 29.9],
"status": "OK"
}
清洗后的输出数据如下:
+----------+-------------+-------------------+--------------+--------------+------+-------+-------+-------+
|device_id |timestamp |readable_time |voltage_value |current_value |power |status |is_fault|
+----------+-------------+-------------------+--------------+--------------+------+-------+-------+-------+
|device_001|1677763200 |2023-03-01 12:00:00|220.1 |30.1 |6630.1|OK |true |
|device_001|1677763200 |2023-03-01 12:00:00|220.3 |30.2 |6653.06|OK |true |
|device_001|1677763200 |2023-03-01 12:00:00|219.8 |29.9 |6565.02|OK |true |
+----------+-------------+-------------------+--------------+--------------+------+-------+-------+-------+
这段代码展示了典型的数据清洗步骤,解决了:
通过这些清洗步骤,可以确保输入数据的质量满足后续计算和存储的需求。同时,代码还具备可扩展性,可根据实际需求添加更多清洗逻辑。