From 43798f39104a4a367096b05a6a00f8826d4764a2 Mon Sep 17 00:00:00 2001 From: a14218709679337472437681 Date: Wed, 8 Jan 2025 21:30:38 +0800 Subject: [PATCH] Add AE+FCL+SC.py Signed-off-by: a14218709679337472437681 --- AE+FCL+SC.py | 375 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 375 insertions(+) create mode 100644 AE+FCL+SC.py diff --git a/AE+FCL+SC.py b/AE+FCL+SC.py new file mode 100644 index 0000000..cdfe481 --- /dev/null +++ b/AE+FCL+SC.py @@ -0,0 +1,375 @@ +import torch +import torch.nn as nn +import torch.optim as optim +import numpy as np +from scipy.optimize import linear_sum_assignment +import hdf5storage +import torch +import os +from sklearn.cluster import KMeans +import pandas as pd +import openpyxl +# 设置随机种子 +seed = 46 +torch.manual_seed(seed) +np.random.seed(seed) + +# -------------------- 模型定义 -------------------- +class AutoencoderWithSpectralClustering(nn.Module): + def __init__(self, input_dim, latent_dim, n_clusters, m=2.0): + super(AutoencoderWithSpectralClustering, self).__init__() + if input_dim < 64: + # 编码器部分 + self.encoder = nn.Sequential( + nn.Linear(input_dim, 64), + nn.ReLU(), + nn.Linear(64, 32), + nn.ReLU(), + nn.Linear(32, latent_dim) + ) + # 解码器部分 + self.decoder = nn.Sequential( + nn.Linear(latent_dim, 32), + nn.ReLU(), + nn.Linear(32, 64), + nn.ReLU(), + nn.Linear(64, input_dim) + ) + else: + self.encoder = nn.Sequential( + nn.Linear(input_dim, 2048), + nn.ReLU(), + nn.Linear(2048, 1024), + nn.ReLU(), + nn.Linear(1024, 512), + nn.ReLU(), + nn.Linear(512, latent_dim) + ) + # 解码器部分 + self.decoder = nn.Sequential( + nn.Linear(latent_dim, 512), + nn.ReLU(), + nn.Linear(512, 1024), + nn.ReLU(), + nn.Linear(1024, 2048), + nn.ReLU(), + nn.Linear(2048, input_dim) + ) + + # 聚类中心 + self.centers = nn.Parameter(torch.randn(n_clusters, latent_dim)) # 初始化聚类中心 + self.m = m # 模糊因子 + + # 权重初始化 + self._initialize_weights() + + def forward(self, x): + latent = self.encoder(x) + reconstructed = self.decoder(latent) + return reconstructed, latent + + def compute_membership(self, latent): + """ + 根据公式计算隶属度 p_{ij},并分类讨论避免分母为零 + """ + dist = torch.cdist(latent, self.centers, p=2) # shape: (N, K) + zero_mask = (dist == 0) + membership = torch.zeros_like(dist) + membership[zero_mask] = 1.0 + non_zero_mask = ~zero_mask.any(dim=1) + if non_zero_mask.any(): + dist_non_zero = dist[non_zero_mask] + exponent = -2 / (self.m - 1) + power = dist_non_zero ** exponent + membership_non_zero = power / power.sum(dim=1, keepdim=True) + membership[non_zero_mask] = membership_non_zero + return membership + + def compute_fcm_loss(self, latent): + """ + 计算模糊聚类损失 + """ + membership = self.compute_membership(latent) + dist = torch.cdist(latent, self.centers, p=2) ** 2 + fcm_loss = (membership ** self.m * dist).sum() + return fcm_loss + + def compute_spectral_loss(self, latent, L): + """ + 计算谱聚类损失 + """ + _,R=torch.linalg.qr(latent) + orth=torch.inverse(R) + y=latent@orth + # 最小化 trace(Z^T L Z) + trace_loss = torch.trace(torch.matmul(torch.matmul(y.T, L), y)) + return trace_loss + + def _initialize_weights(self): + for layer in self.encoder: + if isinstance(layer, nn.Linear): + nn.init.xavier_uniform_(layer.weight) + if layer.bias is not None: + nn.init.zeros_(layer.bias) + for layer in self.decoder: + if isinstance(layer, nn.Linear): + nn.init.xavier_uniform_(layer.weight) + if layer.bias is not None: + nn.init.zeros_(layer.bias) +# 计算潜在空间的聚类准确率 +def calculate_accuracy(true_labels, pred_labels): + unique_true_labels = np.unique(true_labels) + unique_pred_labels = np.unique(pred_labels) + + confusion_matrix = np.zeros((len(unique_true_labels), len(unique_pred_labels)), dtype=np.int32) + for i, true_label in enumerate(unique_true_labels): + for j, pred_label in enumerate(unique_pred_labels): + confusion_matrix[i, j] = np.sum((true_labels == true_label) & (pred_labels == pred_label)) + + row_ind, col_ind = linear_sum_assignment(confusion_matrix, maximize=True) + best_match_count = confusion_matrix[row_ind, col_ind].sum() + accuracy = best_match_count / len(true_labels) + return accuracy + +def construct_similarity_matrix(X, k): + """ + 构造基于自调节谱聚类的相似度矩阵 (Self-tuning Spectral Clustering) + + Args: + X (torch.Tensor): 数据矩阵,形状为 (n_samples, n_features) + k (int): 每个数据点的最近邻个数 + + Returns: + torch.Tensor: 相似度矩阵 W 形状为 (n_samples, n_samples) + """ + # 计算欧式距离矩阵 + n_samples = X.size(0) + dist_matrix = torch.cdist(X, X, p=2) # 使用 torch.cdist 计算欧式距离 + + # 对每一行的距离排序 + sorted_dist, _ = torch.sort(dist_matrix, dim=1) + + # 初始化相似度矩阵 W + W = torch.zeros_like(dist_matrix) + + # 构造相似度矩阵 + for i in range(n_samples): + for j in range(i + 1, n_samples): + sigma_i = sorted_dist[i, k] # 第 i 个点的第 k+1 小距离 + sigma_j = sorted_dist[j, k] # 第 j 个点的第 k+1 小距离 + W[i, j] = torch.exp(-dist_matrix[i, j]**2 / (sigma_i * sigma_j + 1e-10)) # 防止除零 + + # 对称化 + W = W + W.T + + return W +def build_adjacency(CMat, K): + """ + 构建对称加权邻接矩阵 + + Args: + CMat (torch.Tensor): 输入矩阵 (N, N) + K (int): 最近邻个数,默认保留所有邻接关系 + + Returns: + CKSym (torch.Tensor): 对称邻接矩阵 (N, N) + CAbs (torch.Tensor): 绝对值邻接矩阵 (N, N) + """ + # 初始化 + N = CMat.size(0) + CAbs = torch.abs(CMat) # 取绝对值 + + # 对每列降序排序 + Srt, Ind = torch.sort(CAbs, dim=0, descending=True) + + # 归一化处理 + if K == 0: + # 归一化每一列 + for i in range(N): + CAbs[:, i] = CAbs[:, i] / (CAbs[Ind[0, i], i] + 1e-10) + else: + # 只归一化每列的前 K 个值 + for i in range(N): + for j in range(K): + CAbs[Ind[j, i], i] = CAbs[Ind[j, i], i] / (CAbs[Ind[0, i], i] + 1e-10) + + # 构造对称邻接矩阵 + CKSym = CAbs + CAbs.T + return CKSym + +# -------------------- 数据集定义 -------------------- +class DataSet(object): + def __init__(self, mat_data_file): + Xy = hdf5storage.loadmat(file_name=mat_data_file) + self.y = np.squeeze(Xy['gnd']).astype(np.int64) + self.X = Xy['X'].astype(np.float64) + + self.__rand_sam_ind = None + self.__batch_beg_ind = None + self.__batch_size = None + + def Feature_Num(self): + return self.X.shape[1] + + def Class_Num(self): + return np.unique(self.y).shape[0] + + +''' +model.load_state_dict(torch.load('save.pt')) +model.eval() +with torch.no_grad(): + _, latent = model(X_tensor) +latent = latent.numpy() +# 使用 K-means 初始化聚类中心 +y_pred_latent = kmeans_init.fit_predict(latent) +accuracy_latent = calculate_accuracy(y, y_pred_latent) +print(f'Clustering Accuracy in Latent Space init: {accuracy_latent:.4f}') +# 获取初始聚类中心 +initial_centers = kmeans_init.cluster_centers_ +# 将聚类中心初始化到模型中 +model.centers.data = torch.tensor(initial_centers, dtype=torch.float32) +''' + +m = 2 +epochs_pretrain = 1200 +epochs_finetune = 50 +latent_dim = 128 +criterion = nn.MSELoss() +folder_path="D:\\python_code\\rundatasets" +results = [] +for filename in os.listdir(folder_path): + mat_data_file = os.path.join(folder_path, filename) + data=DataSet(mat_data_file) + X_tensor=torch.tensor(data.X, dtype=torch.float32) + y=data.y + input_dim = X_tensor.shape[1] + n_clusters = data.Class_Num() + model = AutoencoderWithSpectralClustering(input_dim, latent_dim, n_clusters, m=m) + optimizer = optim.Adam(model.parameters(), lr=0.001) + W = construct_similarity_matrix(X_tensor, 10) + CKSym=build_adjacency(W, 10) + L=torch.diag(CKSym.sum(dim=1))-CKSym + kmeans_init = KMeans(n_clusters=n_clusters, random_state=42) + # 阶段一:预训练自编码器 + ac=0 + bs=0 + print(f"Training Dataset: {filename} - Stage 1: Pretraining Autoencoder") + for epoch in range(epochs_pretrain): + model.train() + reconstructed, latent = model(X_tensor) + reconstruction_loss = criterion(reconstructed, X_tensor) + optimizer.zero_grad() + reconstruction_loss.backward() + optimizer.step() + if epoch % 200 == 0: + print(f'Epoch [{epoch}/{epochs_pretrain}], Reconstruction Loss: {reconstruction_loss.item():.4f}') + model.eval() + with torch.no_grad(): + _, latent = model(X_tensor) + latent = latent.numpy() + # 使用 K-means 初始化聚类中心 + y_pred_latent = kmeans_init.fit_predict(latent) + accuracy_latent = calculate_accuracy(y, y_pred_latent) + if accuracy_latent>ac: + ac=accuracy_latent + bs=epoch + initial_centers = kmeans_init.cluster_centers_ + nowloss=reconstruction_loss + # 将聚类中心初始化到模型中 + model.centers.data = torch.tensor(initial_centers, dtype=torch.float32) + torch.save(model.state_dict(),f"{filename}_AE.pt") + print(f'Pretraining best accuracy: {ac}, epoch: {bs}, Reconstruction Loss: {nowloss.item():.4f}') + + # 阶段二:联合优化 + print(f"Training Dataset: {filename} - Stage 2: Joint Optimization with Spectral Loss") + clustering_coefficients = [0.0001, 0.001, 0.01, 0.1, 1, 10, 100, 1000] + spectral_coefficients = [0.0001, 0.001, 0.01, 0.1, 1, 10, 100, 1000] + best_accuracy = 0.0 # 初始化最高准确率 + best_params = {} + biresults = [] + for cluster_coef in clustering_coefficients: + for spectral_coef in spectral_coefficients: + model.load_state_dict(torch.load(f"{filename}_AE.pt")) + print(f"\nTraining with Coefficients - Cluster: {cluster_coef}, Spectral: {spectral_coef}") + for epoch in range(epochs_finetune): + model.train() + reconstructed, latent = model(X_tensor) + reconstruction_loss = criterion(reconstructed, X_tensor) + clustering_loss = model.compute_fcm_loss(latent) + spectral_loss = model.compute_spectral_loss(latent, L) + total_loss = ( + reconstruction_loss + + cluster_coef * clustering_loss + + spectral_coef * spectral_loss + ) + optimizer.zero_grad() + total_loss.backward() + optimizer.step() + #if epoch % 200 == 0: + # print(f'Epoch [{epoch}/{epochs_finetune}], Total Loss: {total_loss.item():.4f}, Reconstruction Loss: {reconstruction_loss.item():.4f}, Clustering Loss: {clustering_loss.item():.4f}, Spectral Loss: {spectral_loss.item():.4f}') + + model.eval() + with torch.no_grad(): + _, latent = model(X_tensor) + membership = model.compute_membership(latent) + predicted_labels = membership.argmax(dim=1).numpy() + accuracy = calculate_accuracy(y, predicted_labels) + print(f'Epoch [{epoch}/{epochs_finetune}], Total Loss: {total_loss.item():.4f}, Reconstruction Loss: {reconstruction_loss.item():.4f}, Clustering Loss: {clustering_loss.item():.4f}, Spectral Loss: {spectral_loss.item():.4f}, Clustering Accuracy: {accuracy:.4f}') + if accuracy > best_accuracy: + best_accuracy = accuracy + best_params = { + 'cluster_coef': cluster_coef, + 'spectral_coef': spectral_coef, + 'accuracy': best_accuracy, + 'epoch': epoch + } + torch.save(model.state_dict(), f"{filename}_AEfinal.pt") + biresults.append({ + 'Cluster Coef': cluster_coef, + 'Spectral Coef': spectral_coef, + 'End Accuracy': accuracy, + 'Best Accuracy': best_accuracy, + 'Best Epoch': best_params['epoch'], + }) + results.append({ + 'Dataset Name': filename, + 'Pretraining Best Accuracy': ac, + 'Finetuning Best Accuracy': best_accuracy, + 'Best Clustering Loss Coefficient': best_params['cluster_coef'], + 'Best Spectral Loss Coefficient': best_params['spectral_coef'], + 'Pretraining Best Accuracy Epoch': bs, + 'Finetuning Best Accuracy Epoch': best_params['epoch'], + 'Pretraining Best Reconstruction Loss': nowloss.item(), + 'Finetuning Best Total Loss': total_loss.item() + }) + df = pd.DataFrame(biresults) + df.to_excel(f'{filename}_biresults.xlsx', index=False) + + print(f'Pretraining best accuracy: {ac}, epoch: {bs}, Reconstruction Loss: {nowloss.item():.4f}') + print(f"\nBest Accuracy: {best_accuracy:.4f}") + print(f"Best Parameters: Cluster Coef: {best_params['cluster_coef']}, Spectral Coef: {best_params['spectral_coef']}, Epoch: {best_params['epoch']}") +wb = openpyxl.Workbook() +ws = wb.active +ws.title = "Results" +headers = ["Dataset Name", "Pretraining Best Accuracy", 'Finetuning Best Accuracy', + 'Best Clustering Loss Coefficient', 'Best Spectral Loss Coefficient', 'Pretraining Best Accuracy Epoch', + 'Finetuning Best Accuracy Epoch', 'Pretraining Best Reconstruction Loss', 'Finetuning Best Total Loss'] +ws.append(headers) +for result in results: + row = [ + result['Dataset Name'], + result['Pretraining Best Accuracy'], + result['Pretraining Best Accuracy Epoch'], + result['Pretraining Best Reconstruction Loss'], + result['Finetuning Best Accuracy'], + result['Best Clustering Loss Coefficient'], + result['Best Spectral Loss Coefficient'], + result['Finetuning Best Total Loss'], + result['Finetuning Best Accuracy Epoch'] + ] + ws.append(row) +# 保存 Excel 文件 +output_path = "results2.xlsx" +wb.save(output_path) +print(f"Results have been saved to {output_path}") \ No newline at end of file