在旋轉機械故障診斷領域,如何從強噪聲干擾的振動信號中提取敏感特征始終是核心難題。經典的深度學習模型如卷積神經網絡(Convolutional Neural Networks, CNN)在實驗室干凈數據集上表現優異,但在面對復雜的工業實測數據時,冗余的噪聲特征可能會導致模型準確率有所下降。為了解決這一問題,論文“Deep Residual Shrinkage Networks for Fault Diagnosis”提出了一種創新的結構——深度殘差收縮網絡(Deep Residual Shrinkage Network, DRSN)。
DRSN的核心思想在于將“軟閾值化 (Soft Thresholding)”這一經典的信號處理降噪技術集成到殘差網絡中。通過引入注意力機制,模型能夠自適應地學習每一組特征圖的收縮閾值。在特征傳遞的過程中,那些接近于零的、被視為噪聲的特征會被自動置為零,而強特征則得以保留。這種結構有助于提高模型在強噪聲環境下的魯棒性(即抗干擾能力),還實現了端到端的自適應特征提取,無需依賴復雜的專家先驗知識。
1.從殘差塊到自適應收縮
DRSN的核心組件是“帶有通道級閾值的殘差收縮構建塊 (Residual Shrinkage Building Unit with Channel-wise thresholds, RSBU-CW)”。該模塊在傳統殘差學習的基礎上,并行了一個用于計算閾值的子網絡。
在RSBU-CW模塊中,輸入特征經過兩次卷積和批歸一化(Batch Normalization, BN)處理后,會進入一個注意力分支。首先,通過取絕對值和全局平均池化(Global Average Pooling, GAP)將空間維度的特征壓縮,計算出每個通道的絕對值平均值。接著,利用兩個全連接層和Sigmoid激活函數學習出一個縮放因子α,取值范圍在0到1之間。收縮閾值τ的計算公式為:τ = α * average(abs(x))。
得到閾值后,模型應用軟閾值化算子處理特征圖,公式為:y = sign(x) * max(abs(x) - τ, 0)。這種設計允許模型為每個特征通道獨立設置閾值。
圖1. 深度殘差收縮網絡
2. 實驗設置
為了驗證DRSN-CW的性能,選擇了軸承診斷領域的標準基準——西儲大學(CWRU)軸承數據集。實驗涵蓋了正常狀態以及內圈故障、外圈故障和滾珠故障10類標簽。每個樣本采用1024個采樣點的滑動窗口進行切分。
圖2. 類別劃分
在數據工程模塊,除了常規的標準化處理,還設計了一套“在線實時增強”流水線,以模擬極端的工業場景。這包括:
(1)環移位(Rolling Shift):模擬傳感器采樣起始時刻的不確定性。
(2)瞬態沖擊注入:模擬機器偶爾出現的磕碰干擾。
(3)加性高斯白噪聲(Additive White Gaussian Noise, AWGN):在訓練過程中動態混合不同信噪比(Signal-to-Noise Ratio, SNR)的噪聲,盡量讓模型在各種環境下保持特征一致性。特別地,構建了一個SNR為-8dB的測試環境,這在工業診斷中屬于較強的背景噪聲干擾。
具體的TensorFlow代碼如下:
""" 項目名稱:深度殘差收縮網絡 (DRSN-CW) - 旋轉機械故障診斷復現 論文參考:Zhao, M., et al. "Deep Residual Shrinkage Networks for Fault Diagnosis," IEEE TII, 2020. 算法核心邏輯: 1. 軟閾值化 (Soft Thresholding):通過非線性映射,將接近于零的噪聲特征置為零,保留強特征。 2. 注意力機制 (Attention):利用小型子網絡自動學習每個通道的收縮閾值,實現自適應去噪。 3. 殘差學習 (Residual Learning):解決深層網絡梯度消失問題,確保特征傳遞的穩定性。 """ import os import sys import logging import numpy as np import scipy.io as sio import tensorflow as tf from tensorflow.keras import layers, Model, regularizers from sklearn.model_selection import train_test_split as split_data # ============================================================================= # 1. 環境與資源配置模塊 # ============================================================================= logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s') class GPUConfig: """ 計算資源管理器:負責 TensorFlow 運行時環境的初始化與硬件加速配置。 """ @staticmethod def init_tf(): """ 配置計算后端: - 抑制冗余日志:減少非關鍵性的系統警告。 - 顯存按需分配:防止 TensorFlow 啟動時預占全部顯存,允許與其他進程共用 GPU。 """ os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' physical_gpu_list = tf.config.list_physical_devices('GPU') if physical_gpu_list: try: for gpu_device in physical_gpu_list: # 開啟顯存動態增長模式 tf.config.experimental.set_memory_growth(gpu_device, True) logging.info("GPU 硬件加速就緒:檢測到 {0} 個計算單元,已啟用動態顯存模式。".format(len(physical_gpu_list))) except RuntimeError as hardware_error: logging.warning("GPU 后端配置失敗(可能已被占用): %s", hardware_error) else: logging.info("未檢測到 GPU,系統將使用 CPU 進行計算(訓練速度可能受限)。") # 執行全局初始化 GPUConfig.init_tf() # ============================================================================= # 2. 數據工程模塊 (ETL - Extract, Transform, Load) # ============================================================================= class CWRULoader: """ CWRU 數據集解析器:負責原始 .mat 振動信號的讀取、分段與特征重構。 """ def __init__(self, dataset_root, window_size=1024): """ :param dataset_root: 數據集存儲根目錄 :param window_size: 樣本長度(窗口步長,通常設為 1024 或 2048) """ self.base_directory = os.path.abspath(dataset_root) self.sample_length = window_size self.sampling_interval = window_size def _parse_mat_content(self, target_file): """ 從 MATLAB 容器中提取驅動端(DE)時間序列數據。 """ try: storage = sio.loadmat(target_file) for identifier in storage.keys(): # 匹配驅動端加速度計信號鍵名 if 'DE_time' in identifier: return storage[identifier].flatten() except Exception as parse_error: logging.debug("讀取文件 %s 異常: %s", target_file, parse_error) return None return None def load_data(self, category_dictionary): """ 構建訓練數據集。 :param category_dictionary: 標簽與文件名的映射關系字典。 :return: (X_data, y_label) 的 Numpy 數組。 """ feature_collection, label_collection = [], [] is_data_found = False for class_idx, name_list in category_dictionary.items(): for filename in name_list: full_path = os.path.join(self.base_directory, "{0}.mat".format(filename)) if not os.path.exists(full_path): continue vibration_series = self._parse_mat_content(full_path) if vibration_series is None: continue is_data_found = True # 非重疊滑動窗口采樣:將長序列切割為定長的樣本塊 for pointer in range(0, len(vibration_series) - self.sample_length + 1, self.sampling_interval): sub_sequence = vibration_series[pointer : pointer + self.sample_length] feature_collection.append(sub_sequence) label_collection.append(class_idx) if not is_data_found: raise FileNotFoundError("路徑下未找到 CWRU 相關 .mat 文件,請檢查路徑。") return np.array(feature_collection, dtype='float32'), np.array(label_collection, dtype='int32') def add_awgn(signal_input, snr_value): """ 加性高斯白噪聲 (AWGN) 注入模塊。 用于模擬真實工業場景下的背景噪聲,測試模型的魯棒性。 計算公式:P_noise = P_signal / 10^(SNR/10) """ signal_input = np.array(signal_input) random_engine = np.random.default_rng() # 支持固定 SNR 或 SNR 范圍隨機采樣 target_snr = snr_value if not isinstance(snr_value, (list, tuple)) else random_engine.uniform(snr_value[0], snr_value[1]) # 計算信號功率并推導噪聲標準差 signal_power = np.mean(np.square(signal_input), axis=1, keepdims=True) noise_variance = signal_power / (10 ** (target_snr / 10.0)) noise_component = random_engine.normal(0, np.sqrt(noise_variance), signal_input.shape) return (signal_input + noise_component).astype('float32') # ============================================================================= # 3. 神經網絡組件定義 (DRSN Core) # ============================================================================= class SoftThresholdOperator(layers.Layer): """ 軟閾值化算子 (Custom Layer): DRSN 的非線性核心,通過閾值 tau 對特征映射進行收縮處理。 公式:y = sign(x) * max(|x| - tau, 0) """ def __init__(self, **kwargs): super(SoftThresholdOperator, self).__init__(**kwargs) def call(self, inputs): """ x_conv: 輸入特征圖 (Batch, Steps, Channels) tau: 學習到的閾值 (Batch, Channels) """ x_conv, tau = inputs # 將閾值擴展至與特征圖空間維度匹配 expanded_tau = tf.expand_dims(tau, axis=1) return tf.sign(x_conv) * tf.maximum(tf.abs(x_conv) - expanded_tau, 0.0) class RSBU_CW(layers.Layer): """ 殘差收縮構建塊 (Residual Shrinkage Building Unit with Channel-wise thresholds): 集成了多通道注意力機制的殘差塊,能夠為每個通道獨立生成閾值。 """ def __init__(self, filters, kernel_size, strides=1, **kwargs): super(RSBU_CW, self).__init__(**kwargs) self.num_kernels = filters self.step_size = strides self.width = kernel_size self.weight_decay = regularizers.l2(1e-4) # 恒等映射路徑 (Residual Shortcut) self.shortcut = None # 主變換分支:采用經典的 BN-ReLU-Conv 結構 self.bn_alpha = layers.BatchNormalization() self.relu_alpha = layers.Activation('relu') self.conv_alpha = layers.Conv1D(filters, kernel_size, strides=strides, padding='same', kernel_initializer='he_normal', kernel_regularizer=self.weight_decay) self.bn_beta = layers.BatchNormalization() self.relu_beta = layers.Activation('relu') self.conv_beta = layers.Conv1D(filters, kernel_size, strides=1, padding='same', kernel_initializer='he_normal', kernel_regularizer=self.weight_decay) # 注意力子網絡:計算通道級收縮閾值 self.gap = layers.GlobalAveragePooling1D() self.fc1 = layers.Dense(filters, kernel_initializer='he_normal') self.bn_gamma = layers.BatchNormalization() self.relu_gamma = layers.Activation('relu') self.fc2 = layers.Dense(filters, activation='sigmoid') # 歸一化縮放因子 self.threshold_op = SoftThresholdOperator() def build(self, input_dim): """ 動態調整 Shortcut:當步長不為 1 或通道數變化時,使用 1x1 卷積對齊殘差。 """ if self.step_size != 1 or input_dim[-1] != self.num_kernels: self.shortcut = tf.keras.Sequential([ layers.Conv1D(self.num_kernels, 1, strides=self.step_size, padding='same', use_bias=False), layers.BatchNormalization() ]) super(RSBU_CW, self).build(input_dim) def call(self, layer_inputs): """ 邏輯流:特征提取 -> 通道全局特征感知 -> 動態閾值計算 -> 軟閾值降噪 -> 殘差相加 """ identity = layer_inputs if self.shortcut: identity = self.shortcut(layer_inputs) # 兩次卷積處理得到中間特征圖 x_conv x_conv = self.bn_alpha(layer_inputs) x_conv = self.relu_alpha(x_conv) x_conv = self.conv_alpha(x_conv) x_conv = self.bn_beta(x_conv) x_conv = self.relu_beta(x_conv) x_conv = self.conv_beta(x_conv) # 計算特征圖各通道的絕對值均值作為全局統計量 x_abs = tf.abs(x_conv) abs_mean = self.gap(x_abs) # 通過子網絡輸出 alpha (0,1),閾值 tau = alpha * abs_mean z = self.fc1(abs_mean) z = self.bn_gamma(z) z = self.relu_gamma(z) alpha = self.fc2(z) tau = tf.multiply(alpha, abs_mean) # 應用軟閾值收縮并進行殘差融合 denoised_output = self.threshold_op([x_conv, tau]) return layers.Add()([denoised_output, identity]) class DRSN_CW(Model): """ DRSN-CW 完整架構: 將多個 RSBU 模塊順序堆疊,最后通過全連接層進行故障分類。 """ def __init__(self, num_classes): super(DRSN_CW, self).__init__(name="Bearing_Fault_DRSN") self.weight_decay = regularizers.l2(1e-4) # 輸入層:初步感知一維時序信號 self.conv1 = layers.Conv1D(32, 15, strides=2, padding='same', kernel_initializer='he_normal', kernel_regularizer=self.weight_decay) self.bn1 = layers.BatchNormalization() self.relu1 = layers.Activation('relu') # 構建收縮殘差塊序列 (特征維度由 32 逐漸擴展至 128) self.rsbu_blocks = [ RSBU_CW(32, 5, strides=2), RSBU_CW(32, 5, strides=1), RSBU_CW(64, 5, strides=2), RSBU_CW(64, 5, strides=1), RSBU_CW(128, 5, strides=2), RSBU_CW(128, 5, strides=1) ] # 輸出頭:降維后映射至分類空間 self.post_norm = layers.BatchNormalization() self.post_relu = layers.Activation('relu') self.gap_layer = layers.GlobalAveragePooling1D() self.classifier = layers.Dense(num_classes, activation='softmax', kernel_regularizer=self.weight_decay) def call(self, network_input): """ 端到端正向推理流程。 """ x = self.conv1(network_input) x = self.bn1(x) x = self.relu1(x) for block in self.rsbu_blocks: x = block(x) x = self.post_norm(x) x = self.post_relu(x) x = self.gap_layer(x) return self.classifier(x) # ============================================================================= # 4. 訓練、增強與性能評估流 # ============================================================================= def train_and_test(dataset_path, seq_len=1024): """ 全流程控制器:涵蓋數據預處理、在線增強、模型訓練及極端環境(-8dB)評估。 """ # 定義故障類別(基于 CWRU 文件命名規則) label_map = { 0: ['Normal_0', 'Normal_1', 'Normal_2', 'Normal_3'], 1: ['IR007_0', 'IR007_1', 'IR007_2', 'IR007_3'], 2: ['IR014_0', 'IR014_1', 'IR014_2', 'IR014_3'], 3: ['IR021_0', 'IR021_1', 'IR021_2', 'IR021_3'], 4: ['B007_0', 'B007_1', 'B007_2', 'B007_3'], 5: ['B014_0', 'B014_1', 'B014_2', 'B014_3'], 6: ['B021_0', 'B021_1', 'B021_2', 'B021_3'], 7: ['OR007@6_0', 'OR007@6_1', 'OR007@6_2', 'OR007@6_3'], 8: ['OR014@6_0', 'OR014@6_1', 'OR014@6_2', 'OR014@6_3'], 9: ['OR021@6_0', 'OR021@6_1', 'OR021@6_2', 'OR021@6_3'] } data_engine = CWRULoader(dataset_root=dataset_path, window_size=seq_len) try: signals, labels = data_engine.load_data(label_map) except Exception as data_err: logging.error("數據加載失敗: %s", data_err) return # 隨機劃分:70% 訓練,15% 驗證,15% 測試 train_x_pre, temp_x, train_y_pre, temp_y = split_data( signals, labels, test_size=0.3, random_state=42 ) val_x_pre, test_x_pre, val_y_pre, test_y_pre = split_data( temp_x, temp_y, test_size=0.5, random_state=42 ) # 標準化處理:使用訓練集均值和標準差,防止測試信息泄露 mu, sigma = np.mean(train_x_pre), np.std(train_x_pre) def normalize(obs): return ((obs - mu) / sigma).reshape(-1, seq_len, 1) train_set_x = normalize(train_x_pre) val_set_x = normalize(val_x_pre) test_set_x = normalize(test_x_pre) # 標簽進行 One-hot 編碼 num_classes = len(label_map) train_set_y = tf.keras.utils.to_categorical(train_y_pre, num_classes).astype('float32') val_set_y = tf.keras.utils.to_categorical(val_y_pre, num_classes).astype('float32') test_set_y = tf.keras.utils.to_categorical(test_y_pre, num_classes).astype('float32') # 測試環境:注入極強噪聲(-8dB)以驗證模型在極端工業背景下的表現 val_x_awgn = add_awgn(val_set_x, snr_value=-8) test_x_awgn = add_awgn(test_set_x, snr_value=-8) def augment_batch(feat_batch, label_batch): """ 在線實時增強 (Online Data Augmentation): 1. 循環移位:模擬采樣時刻的不確定性。 2. 瞬態沖擊:模擬偶然出現的機器磕碰聲。 3. 混合噪聲:提升模型的抗噪閾值。 """ rand_gen = np.random.default_rng() augmented_x = feat_batch.copy() batch_n, steps_n, _ = augmented_x.shape # 隨機相位平移 for sample_idx in range(batch_n): offset = rand_gen.integers(0, steps_n) augmented_x[sample_idx, :, 0] = np.roll(augmented_x[sample_idx, :, 0], offset) # 脈沖沖擊噪聲注入 (10% 概率) if rand_gen.random() > 0.9: for sample_idx in range(batch_n): if rand_gen.random() > 0.5: num_spikes = rand_gen.integers(1, 3) positions = rand_gen.integers(0, steps_n, num_spikes) spike_mag = np.std(augmented_x[sample_idx]) * rand_gen.uniform(1.5, 2.5) augmented_x[sample_idx, positions, 0] += spike_mag * rand_gen.choice([-1, 1], size=num_spikes) # 動態 SNR 混合 (50% 概率) if rand_gen.random() > 0.5: augmented_x = add_awgn(augmented_x, snr_value=(-8, 8)) return augmented_x.astype(np.float32), label_batch.astype(np.float32) def _tensor_spec_binding(f_tensor, l_tensor): """ 為 tf.data 顯式綁定形狀信息 """ f_tensor.set_shape([None, seq_len, 1]) l_tensor.set_shape([None, num_classes]) return f_tensor, l_tensor # 利用 tf.data 構建高吞吐數據流水線 training_pipeline = tf.data.Dataset.from_tensor_slices((train_set_x.astype('float32'), train_set_y)) training_pipeline = training_pipeline.shuffle(len(train_set_x)).batch(64) training_pipeline = training_pipeline.map( lambda x, y: tf.numpy_function(augment_batch, [x, y], [tf.float32, tf.float32]), num_parallel_calls=tf.data.AUTOTUNE ).map(_tensor_spec_binding).prefetch(tf.data.AUTOTUNE) # 模型實例化與編譯 model_instance = DRSN_CW(num_classes=num_classes) model_instance.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss=tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.0), # 交叉熵損失 metrics=['accuracy'] ) logging.info("診斷系統啟動:分類數=%d, 序列長度=%d", num_classes, seq_len) # 動態學習率調整與早停保護 optimization_callbacks = [ tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=7, min_lr=1e-6, verbose=1), tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True) ] # 模型擬合 model_instance.fit( training_pipeline, epochs=100, validation_data=(val_x_awgn, val_set_y), callbacks=optimization_callbacks, verbose=2 ) # 在極低信噪比環境下最終驗證性能 final_loss, final_acc = model_instance.evaluate(test_x_awgn, test_set_y, verbose=0) print("n" + "="*50) print("模型評估報告 (DRSN-CW)") print("評估背景:-8dB SNR (強噪聲干擾環境)") print("最終識別準確率: {0:.2f}%".format(final_acc * 100)) print("="*50) # ============================================================================= # 程序入口 # ============================================================================= if __name__ == "__main__": # 配置默認的數據搜索目錄 DATA_PATH = os.path.join(os.getcwd(), 'data_path') if not os.path.exists(DATA_PATH): logging.warning("未找到默認數據目錄: %s", DATA_PATH) user_input_path = input("請輸入 CWRU 原始數據集 (.mat) 所在的完整路徑: ").strip() if user_input_path: DATA_PATH = user_input_path else: logging.critical("未提供有效路徑,程序退出。") sys.exit(0) # 啟動訓練與測試 train_and_test(DATA_PATH, seq_len=1024)
3.強噪聲環境下的診斷性能分析與復現總結
在復現實驗中,使用了Adam優化器進行訓練,并結合了學習率動態調整 (ReduceLROnPlateau) 策略。在注入了-8dB的高斯噪聲后,DRSN-CW依然保持了90%以上的測試準確率。
圖3. 實驗結果
論文原文:
論文標題: Deep residual shrinkage networks for fault diagnosis
出版期刊: IEEE Transactions on Industrial Informatics. 2020, 16(7): 4681-4690.
DOI: 10.1109/TII.2019.2943898
https://ieeexplore.ieee.org/document/8850096
審核編輯 黃宇
-
神經網絡
+關注
關注
42文章
4838瀏覽量
107753 -
python
+關注
關注
57文章
4876瀏覽量
90025
發布評論請先 登錄
面向嵌入式部署的神經網絡優化:模型壓縮深度解析
自動駕駛中常提的卷積神經網絡是個啥?
NMSIS神經網絡庫使用介紹
在Ubuntu20.04系統中訓練神經網絡模型的一些經驗
CICC2033神經網絡部署相關操作
液態神經網絡(LNN):時間連續性與動態適應性的神經網絡
如何在機器視覺中部署深度學習神經網絡
無刷電機小波神經網絡轉子位置檢測方法的研究
神經網絡專家系統在電機故障診斷中的應用
神經網絡RAS在異步電機轉速估計中的仿真研究
基于FPGA搭建神經網絡的步驟解析
面向強噪聲數據的深度神經網絡:深度殘差收縮網絡的Python編程復現
評論