Add AE+FCL+SC.py

Signed-off-by: a14218709679337472437681 <a14218709679337472437681@hero.ai>
This commit is contained in:
a14218709679337472437681 2025-01-08 21:30:38 +08:00
parent 88ae47568c
commit 43798f3910
1 changed files with 375 additions and 0 deletions

375
AE+FCL+SC.py Normal file
View File

@ -0,0 +1,375 @@
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from scipy.optimize import linear_sum_assignment
import hdf5storage
import torch
import os
from sklearn.cluster import KMeans
import pandas as pd
import openpyxl
# 设置随机种子
seed = 46
torch.manual_seed(seed)
np.random.seed(seed)
# -------------------- 模型定义 --------------------
class AutoencoderWithSpectralClustering(nn.Module):
def __init__(self, input_dim, latent_dim, n_clusters, m=2.0):
super(AutoencoderWithSpectralClustering, self).__init__()
if input_dim < 64:
# 编码器部分
self.encoder = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, latent_dim)
)
# 解码器部分
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 32),
nn.ReLU(),
nn.Linear(32, 64),
nn.ReLU(),
nn.Linear(64, input_dim)
)
else:
self.encoder = nn.Sequential(
nn.Linear(input_dim, 2048),
nn.ReLU(),
nn.Linear(2048, 1024),
nn.ReLU(),
nn.Linear(1024, 512),
nn.ReLU(),
nn.Linear(512, latent_dim)
)
# 解码器部分
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 512),
nn.ReLU(),
nn.Linear(512, 1024),
nn.ReLU(),
nn.Linear(1024, 2048),
nn.ReLU(),
nn.Linear(2048, input_dim)
)
# 聚类中心
self.centers = nn.Parameter(torch.randn(n_clusters, latent_dim)) # 初始化聚类中心
self.m = m # 模糊因子
# 权重初始化
self._initialize_weights()
def forward(self, x):
latent = self.encoder(x)
reconstructed = self.decoder(latent)
return reconstructed, latent
def compute_membership(self, latent):
"""
根据公式计算隶属度 p_{ij}并分类讨论避免分母为零
"""
dist = torch.cdist(latent, self.centers, p=2) # shape: (N, K)
zero_mask = (dist == 0)
membership = torch.zeros_like(dist)
membership[zero_mask] = 1.0
non_zero_mask = ~zero_mask.any(dim=1)
if non_zero_mask.any():
dist_non_zero = dist[non_zero_mask]
exponent = -2 / (self.m - 1)
power = dist_non_zero ** exponent
membership_non_zero = power / power.sum(dim=1, keepdim=True)
membership[non_zero_mask] = membership_non_zero
return membership
def compute_fcm_loss(self, latent):
"""
计算模糊聚类损失
"""
membership = self.compute_membership(latent)
dist = torch.cdist(latent, self.centers, p=2) ** 2
fcm_loss = (membership ** self.m * dist).sum()
return fcm_loss
def compute_spectral_loss(self, latent, L):
"""
计算谱聚类损失
"""
_,R=torch.linalg.qr(latent)
orth=torch.inverse(R)
y=latent@orth
# 最小化 trace(Z^T L Z)
trace_loss = torch.trace(torch.matmul(torch.matmul(y.T, L), y))
return trace_loss
def _initialize_weights(self):
for layer in self.encoder:
if isinstance(layer, nn.Linear):
nn.init.xavier_uniform_(layer.weight)
if layer.bias is not None:
nn.init.zeros_(layer.bias)
for layer in self.decoder:
if isinstance(layer, nn.Linear):
nn.init.xavier_uniform_(layer.weight)
if layer.bias is not None:
nn.init.zeros_(layer.bias)
# 计算潜在空间的聚类准确率
def calculate_accuracy(true_labels, pred_labels):
unique_true_labels = np.unique(true_labels)
unique_pred_labels = np.unique(pred_labels)
confusion_matrix = np.zeros((len(unique_true_labels), len(unique_pred_labels)), dtype=np.int32)
for i, true_label in enumerate(unique_true_labels):
for j, pred_label in enumerate(unique_pred_labels):
confusion_matrix[i, j] = np.sum((true_labels == true_label) & (pred_labels == pred_label))
row_ind, col_ind = linear_sum_assignment(confusion_matrix, maximize=True)
best_match_count = confusion_matrix[row_ind, col_ind].sum()
accuracy = best_match_count / len(true_labels)
return accuracy
def construct_similarity_matrix(X, k):
"""
构造基于自调节谱聚类的相似度矩阵 (Self-tuning Spectral Clustering)
Args:
X (torch.Tensor): 数据矩阵形状为 (n_samples, n_features)
k (int): 每个数据点的最近邻个数
Returns:
torch.Tensor: 相似度矩阵 W 形状为 (n_samples, n_samples)
"""
# 计算欧式距离矩阵
n_samples = X.size(0)
dist_matrix = torch.cdist(X, X, p=2) # 使用 torch.cdist 计算欧式距离
# 对每一行的距离排序
sorted_dist, _ = torch.sort(dist_matrix, dim=1)
# 初始化相似度矩阵 W
W = torch.zeros_like(dist_matrix)
# 构造相似度矩阵
for i in range(n_samples):
for j in range(i + 1, n_samples):
sigma_i = sorted_dist[i, k] # 第 i 个点的第 k+1 小距离
sigma_j = sorted_dist[j, k] # 第 j 个点的第 k+1 小距离
W[i, j] = torch.exp(-dist_matrix[i, j]**2 / (sigma_i * sigma_j + 1e-10)) # 防止除零
# 对称化
W = W + W.T
return W
def build_adjacency(CMat, K):
"""
构建对称加权邻接矩阵
Args:
CMat (torch.Tensor): 输入矩阵 (N, N)
K (int): 最近邻个数默认保留所有邻接关系
Returns:
CKSym (torch.Tensor): 对称邻接矩阵 (N, N)
CAbs (torch.Tensor): 绝对值邻接矩阵 (N, N)
"""
# 初始化
N = CMat.size(0)
CAbs = torch.abs(CMat) # 取绝对值
# 对每列降序排序
Srt, Ind = torch.sort(CAbs, dim=0, descending=True)
# 归一化处理
if K == 0:
# 归一化每一列
for i in range(N):
CAbs[:, i] = CAbs[:, i] / (CAbs[Ind[0, i], i] + 1e-10)
else:
# 只归一化每列的前 K 个值
for i in range(N):
for j in range(K):
CAbs[Ind[j, i], i] = CAbs[Ind[j, i], i] / (CAbs[Ind[0, i], i] + 1e-10)
# 构造对称邻接矩阵
CKSym = CAbs + CAbs.T
return CKSym
# -------------------- 数据集定义 --------------------
class DataSet(object):
def __init__(self, mat_data_file):
Xy = hdf5storage.loadmat(file_name=mat_data_file)
self.y = np.squeeze(Xy['gnd']).astype(np.int64)
self.X = Xy['X'].astype(np.float64)
self.__rand_sam_ind = None
self.__batch_beg_ind = None
self.__batch_size = None
def Feature_Num(self):
return self.X.shape[1]
def Class_Num(self):
return np.unique(self.y).shape[0]
'''
model.load_state_dict(torch.load('save.pt'))
model.eval()
with torch.no_grad():
_, latent = model(X_tensor)
latent = latent.numpy()
# 使用 K-means 初始化聚类中心
y_pred_latent = kmeans_init.fit_predict(latent)
accuracy_latent = calculate_accuracy(y, y_pred_latent)
print(f'Clustering Accuracy in Latent Space init: {accuracy_latent:.4f}')
# 获取初始聚类中心
initial_centers = kmeans_init.cluster_centers_
# 将聚类中心初始化到模型中
model.centers.data = torch.tensor(initial_centers, dtype=torch.float32)
'''
m = 2
epochs_pretrain = 1200
epochs_finetune = 50
latent_dim = 128
criterion = nn.MSELoss()
folder_path="D:\\python_code\\rundatasets"
results = []
for filename in os.listdir(folder_path):
mat_data_file = os.path.join(folder_path, filename)
data=DataSet(mat_data_file)
X_tensor=torch.tensor(data.X, dtype=torch.float32)
y=data.y
input_dim = X_tensor.shape[1]
n_clusters = data.Class_Num()
model = AutoencoderWithSpectralClustering(input_dim, latent_dim, n_clusters, m=m)
optimizer = optim.Adam(model.parameters(), lr=0.001)
W = construct_similarity_matrix(X_tensor, 10)
CKSym=build_adjacency(W, 10)
L=torch.diag(CKSym.sum(dim=1))-CKSym
kmeans_init = KMeans(n_clusters=n_clusters, random_state=42)
# 阶段一:预训练自编码器
ac=0
bs=0
print(f"Training Dataset: {filename} - Stage 1: Pretraining Autoencoder")
for epoch in range(epochs_pretrain):
model.train()
reconstructed, latent = model(X_tensor)
reconstruction_loss = criterion(reconstructed, X_tensor)
optimizer.zero_grad()
reconstruction_loss.backward()
optimizer.step()
if epoch % 200 == 0:
print(f'Epoch [{epoch}/{epochs_pretrain}], Reconstruction Loss: {reconstruction_loss.item():.4f}')
model.eval()
with torch.no_grad():
_, latent = model(X_tensor)
latent = latent.numpy()
# 使用 K-means 初始化聚类中心
y_pred_latent = kmeans_init.fit_predict(latent)
accuracy_latent = calculate_accuracy(y, y_pred_latent)
if accuracy_latent>ac:
ac=accuracy_latent
bs=epoch
initial_centers = kmeans_init.cluster_centers_
nowloss=reconstruction_loss
# 将聚类中心初始化到模型中
model.centers.data = torch.tensor(initial_centers, dtype=torch.float32)
torch.save(model.state_dict(),f"{filename}_AE.pt")
print(f'Pretraining best accuracy: {ac}, epoch: {bs}, Reconstruction Loss: {nowloss.item():.4f}')
# 阶段二:联合优化
print(f"Training Dataset: {filename} - Stage 2: Joint Optimization with Spectral Loss")
clustering_coefficients = [0.0001, 0.001, 0.01, 0.1, 1, 10, 100, 1000]
spectral_coefficients = [0.0001, 0.001, 0.01, 0.1, 1, 10, 100, 1000]
best_accuracy = 0.0 # 初始化最高准确率
best_params = {}
biresults = []
for cluster_coef in clustering_coefficients:
for spectral_coef in spectral_coefficients:
model.load_state_dict(torch.load(f"{filename}_AE.pt"))
print(f"\nTraining with Coefficients - Cluster: {cluster_coef}, Spectral: {spectral_coef}")
for epoch in range(epochs_finetune):
model.train()
reconstructed, latent = model(X_tensor)
reconstruction_loss = criterion(reconstructed, X_tensor)
clustering_loss = model.compute_fcm_loss(latent)
spectral_loss = model.compute_spectral_loss(latent, L)
total_loss = (
reconstruction_loss
+ cluster_coef * clustering_loss
+ spectral_coef * spectral_loss
)
optimizer.zero_grad()
total_loss.backward()
optimizer.step()
#if epoch % 200 == 0:
# print(f'Epoch [{epoch}/{epochs_finetune}], Total Loss: {total_loss.item():.4f}, Reconstruction Loss: {reconstruction_loss.item():.4f}, Clustering Loss: {clustering_loss.item():.4f}, Spectral Loss: {spectral_loss.item():.4f}')
model.eval()
with torch.no_grad():
_, latent = model(X_tensor)
membership = model.compute_membership(latent)
predicted_labels = membership.argmax(dim=1).numpy()
accuracy = calculate_accuracy(y, predicted_labels)
print(f'Epoch [{epoch}/{epochs_finetune}], Total Loss: {total_loss.item():.4f}, Reconstruction Loss: {reconstruction_loss.item():.4f}, Clustering Loss: {clustering_loss.item():.4f}, Spectral Loss: {spectral_loss.item():.4f}, Clustering Accuracy: {accuracy:.4f}')
if accuracy > best_accuracy:
best_accuracy = accuracy
best_params = {
'cluster_coef': cluster_coef,
'spectral_coef': spectral_coef,
'accuracy': best_accuracy,
'epoch': epoch
}
torch.save(model.state_dict(), f"{filename}_AEfinal.pt")
biresults.append({
'Cluster Coef': cluster_coef,
'Spectral Coef': spectral_coef,
'End Accuracy': accuracy,
'Best Accuracy': best_accuracy,
'Best Epoch': best_params['epoch'],
})
results.append({
'Dataset Name': filename,
'Pretraining Best Accuracy': ac,
'Finetuning Best Accuracy': best_accuracy,
'Best Clustering Loss Coefficient': best_params['cluster_coef'],
'Best Spectral Loss Coefficient': best_params['spectral_coef'],
'Pretraining Best Accuracy Epoch': bs,
'Finetuning Best Accuracy Epoch': best_params['epoch'],
'Pretraining Best Reconstruction Loss': nowloss.item(),
'Finetuning Best Total Loss': total_loss.item()
})
df = pd.DataFrame(biresults)
df.to_excel(f'{filename}_biresults.xlsx', index=False)
print(f'Pretraining best accuracy: {ac}, epoch: {bs}, Reconstruction Loss: {nowloss.item():.4f}')
print(f"\nBest Accuracy: {best_accuracy:.4f}")
print(f"Best Parameters: Cluster Coef: {best_params['cluster_coef']}, Spectral Coef: {best_params['spectral_coef']}, Epoch: {best_params['epoch']}")
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Results"
headers = ["Dataset Name", "Pretraining Best Accuracy", 'Finetuning Best Accuracy',
'Best Clustering Loss Coefficient', 'Best Spectral Loss Coefficient', 'Pretraining Best Accuracy Epoch',
'Finetuning Best Accuracy Epoch', 'Pretraining Best Reconstruction Loss', 'Finetuning Best Total Loss']
ws.append(headers)
for result in results:
row = [
result['Dataset Name'],
result['Pretraining Best Accuracy'],
result['Pretraining Best Accuracy Epoch'],
result['Pretraining Best Reconstruction Loss'],
result['Finetuning Best Accuracy'],
result['Best Clustering Loss Coefficient'],
result['Best Spectral Loss Coefficient'],
result['Finetuning Best Total Loss'],
result['Finetuning Best Accuracy Epoch']
]
ws.append(row)
# 保存 Excel 文件
output_path = "results2.xlsx"
wb.save(output_path)
print(f"Results have been saved to {output_path}")