diff --git a/app_rec.py b/app_rec.py new file mode 100644 index 0000000..b754960 --- /dev/null +++ b/app_rec.py @@ -0,0 +1,7 @@ +from PytorchBoot.application import PytorchBootApplication +from rec_runner import ReconstructionRunner +@PytorchBootApplication("rec") +class AppReconstruction: + @staticmethod + def start(): + ReconstructionRunner("config.yaml").run() diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..8323198 --- /dev/null +++ b/config.yaml @@ -0,0 +1,50 @@ + +runners: + general: + seed: 0 + device: cuda + cuda_visible_devices: "0,1,2,3,4,5,6,7" + parallel: False + + experiment: + name: experiment_name + root_dir: "experiments" + use_checkpoint: False + epoch: -1 # -1 stands for last epoch + max_epochs: 5000 + save_checkpoint_interval: 1 + test_first: True + + train: + optimizer: + type: adam + lr: 0.0001 + losses: # loss type : weight + loss_type_0: 1.0 + dataset: + name: train_set_name + source: train_set_source_name + ratio: 1.0 + batch_size: 1 + num_workers: 1 + + test: + frequency: 3 # test frequency + dataset_list: + - name: test_set_name_0 + source: train_set_source_name + eval_list: + - eval_func_name_0 + - eval_func_name_1 + ratio: 1.0 + batch_size: 1 + num_workers: 1 + + +datasets: + dataset_source_name_0: + dataset_source_name_1: + +modules: + nerf: + diff --git a/pipeline.py b/pipeline.py new file mode 100644 index 0000000..8857b19 --- /dev/null +++ b/pipeline.py @@ -0,0 +1,193 @@ +import os +import torch +import numpy as np + +from PytorchBoot.factory.component_factory import ComponentFactory +import PytorchBoot.stereotype as stereotype +import PytorchBoot.namespace as namespace +from PytorchBoot.utils.log_util import Log + +from utils.volume_render_util import VolumeRendererUtil + + +@stereotype.pipeline("reconstruction_pipeline") +class ReconstructionPipeline: + def __init__(self, config:dict): + self.config = config + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + self.module_config = config["modules"] + self.nerf = ComponentFactory.create( + namespace.Stereotype.MODULE, self.module_config["nerf"] + ) + self.nerf_model_output_dir = self.config.get("nerf_model_output_dir", "./output/nerf_model") + + def create_experiment(self, backup_name=None): + return super().create_experiment(backup_name) + + def load_experiment(self, backup_name=None): + super().load_experiment(backup_name) + + def save(self, object_name: str, best_model: bool = True, name: str|None = None): + output_dir = os.path.join(self.nerf_model_output_dir, object_name) + os.makedirs(output_dir, exist_ok=True) + if best_model: + torch.save(self.nerf.state_dict(), os.path.join(output_dir, "best_model.pth")) + elif name is not None: + torch.save(self.nerf.state_dict(), os.path.join(output_dir, f"{name}.pth")) + else: + Log.error("save failed, best_model and name cannot be None at the same time", terminate=True) + Log.info(f"save {object_name} to {output_dir}") + return output_dir + + def load(self, object_name: str, best_model: bool = True, name: str|None = None): + output_dir = os.path.join(self.nerf_model_output_dir, object_name) + if best_model: + self.nerf.load_state_dict(torch.load(os.path.join(output_dir, "best_model.pth"))) + elif name is not None: + self.nerf.load_state_dict(torch.load(os.path.join(output_dir, f"{name}.pth"))) + else: + Log.error("save failed, best_model and name cannot be None at the same time", terminate=True) + Log.info(f"load {object_name} from {output_dir}") + return output_dir + + + def train_nerf(self, + images: torch.Tensor, + poses: torch.Tensor, + epochs: int = 5000, + batch_size: int = 4096, + lr: float = 5e-4, + start_from_model=None, + object_name: str = "unknown") -> float: + + output_dir = os.path.join(self.nerf_model_output_dir, object_name) + os.makedirs(output_dir, exist_ok=True) + + Log.info("train NeRF model with {} images".format(len(images))) + H, W = images.shape[1], images.shape[2] + sampling_config = self.config.get("sampling", {}) + camera_config = self.config.get("camera", {}) + focal = camera_config.get("focal", 1000.0) + near = camera_config.get("near", 2.0) + far = camera_config.get("far", 6.0) + coarse_samples = sampling_config.get("coarse_samples", 64) + fine_samples = sampling_config.get("fine_samples", 128) + perturb = sampling_config.get("perturb", True) + + + if start_from_model is not None: + self.nerf.load_state_dict(start_from_model.state_dict()) + + optimizer = torch.optim.Adam(self.nerf.parameters(), lr=lr) + mse_loss = torch.nn.MSELoss() + + self.nerf.train() + + rays_o, rays_d = ReconstructionPipeline.generate_rays(poses, H, W, focal) + rays_o = rays_o.to(self.device) + rays_d = rays_d.to(self.device) + images = images.to(self.device) + + best_loss = float('inf') + for epoch in range(epochs): + batch_rays_o, batch_rays_d, target_pixels = ReconstructionPipeline.sample_pixel_batch( + images, rays_o, rays_d, batch_size) + + batch_rays_d = torch.nn.functional.normalize(batch_rays_d, dim=-1) + + near_tensor = torch.ones_like(batch_rays_o[..., 0]) * near + far_tensor = torch.ones_like(batch_rays_o[..., 0]) * far + + optimizer.zero_grad() + + rgb_map, _, _, _ = VolumeRendererUtil.render_rays( + self.nerf, + batch_rays_o, + batch_rays_d, + near_tensor, + far_tensor, + coarse_samples, + fine_samples, + perturb + ) + + loss = mse_loss(rgb_map, target_pixels) + loss.backward() + optimizer.step() + + if (epoch + 1) % 100 == 0: + psnr = -10.0 * torch.log10(loss) + Log.info(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.6f}, PSNR: {psnr.item():.2f}") + + if loss.item() < best_loss: + best_loss = loss.item() + torch.save(self.nerf.state_dict(), os.path.join(output_dir, "best_model.pth")) + + self.nerf.load_state_dict(torch.load(os.path.join(output_dir, "best_model.pth"))) + + Log.info(f"finish training, best loss: {best_loss:.6f}") + return best_loss + + @staticmethod + def generate_rays( + poses: torch.Tensor, + H: int, + W: int, + focal: float) -> tuple: + + i, j = torch.meshgrid( + torch.linspace(0, W-1, W), + torch.linspace(0, H-1, H), + indexing='ij' + ) + i = i.t() # [H, W] + j = j.t() # [H, W] + + dirs = torch.stack([ + (i - W * 0.5) / focal, + -(j - H * 0.5) / focal, + -torch.ones_like(i) + ], dim=-1) # [H, W, 3] + + rays_o_list = [] + rays_d_list = [] + + for pose in poses: + rays_d = torch.sum(dirs[..., None, :] * pose[:3, :3], dim=-1) # [H, W, 3] + + rays_o = pose[:3, -1].expand(rays_d.shape) # [H, W, 3] + + rays_o = rays_o.reshape(-1, 3) # [H*W, 3] + rays_d = rays_d.reshape(-1, 3) # [H*W, 3] + + rays_o_list.append(rays_o) + rays_d_list.append(rays_d) + + rays_o_all = torch.stack(rays_o_list, dim=0) # [N, H*W, 3] + rays_d_all = torch.stack(rays_d_list, dim=0) # [N, H*W, 3] + + return rays_o_all, rays_d_all + + @staticmethod + def sample_pixel_batch( + images: torch.Tensor, + rays_o: torch.Tensor, + rays_d: torch.Tensor, + batch_size: int) -> tuple: + + N = images.shape[0] + H = images.shape[1] + W = images.shape[2] + total_rays = N * H * W + + pixels = images.reshape(N, -1, 3) # [N, H*W, 3] + + indices = torch.randint(0, total_rays, size=(batch_size,)) + img_indices = indices // (H * W) + pixel_indices = indices % (H * W) + + sampled_rays_o = torch.stack([rays_o[i, j] for i, j in zip(img_indices, pixel_indices)]) + sampled_rays_d = torch.stack([rays_d[i, j] for i, j in zip(img_indices, pixel_indices)]) + sampled_pixels = torch.stack([pixels[i, j] for i, j in zip(img_indices, pixel_indices)]) + + return sampled_rays_o, sampled_rays_d, sampled_pixels \ No newline at end of file diff --git a/rec_runner.py b/rec_runner.py new file mode 100644 index 0000000..d794fb2 --- /dev/null +++ b/rec_runner.py @@ -0,0 +1,101 @@ +import os +import torch +import numpy as np +from PytorchBoot.runners.runner import Runner +import PytorchBoot.stereotype as stereotype +import PytorchBoot.namespace as namespace +from PytorchBoot.utils.log_util import Log +from PytorchBoot.factory.component_factory import ComponentFactory + +@stereotype.runner("reconstruction_runner") +class ReconstructionRunner(Runner): + def __init__(self, config_path): + super().__init__(config_path) + self.config_path = config_path + self.module_config = self.config.get("module", {}) + self.pipeline_config = self.config.get("pipeline", {}) + self.pipeline = ComponentFactory.create( + namespace.Stereotype.PIPELINE, self.pipeline_config + ) + + def run(self): + pass + + def run_active_reconstruction(self, + initial_poses: np.ndarray, + initial_images: torch.Tensor = None, + max_iterations: int = 3): + Log.info("start active reconstruction...") + + self.pipeline.train_nerf( + initial_images, + torch.from_numpy(initial_poses).float().to(self.device), + epochs=self.config.get("reconstruction", {}).get("epochs_per_iteration", 2000) + ) + + self.pipeline.save() + + all_poses = initial_poses.copy() + current_poses = initial_poses.copy() + all_images = initial_images.clone() + + # 提取初始网格 + initial_mesh_path = os.path.join(self.output_dir, "initial_mesh.obj") + self.extract_mesh( + initial_mesh_path, + resolution=self.config.get("reconstruction", {}).get("mesh_resolution", 256) + ) + + # 迭代执行主动重建 + for iteration in range(max_iterations): + print(f"\n开始迭代 {iteration+1}/{max_iterations}") + + # 选择下一批视角 + next_views = self.policy.select_next_views(self.nerf_model, current_poses) + print(f"选择了 {len(next_views)} 个新视角") + + # 采集新视角的图像 + new_images = self._simulate_image_capture(next_views) + + # 将新选择的视角添加到当前位姿和图像中 + current_poses = np.concatenate([current_poses, next_views], axis=0) + all_poses = np.concatenate([all_poses, next_views], axis=0) + all_images = torch.cat([all_images, new_images], dim=0) + + # 按照作者的描述,我们从初始模型重新初始化,而不是继续训练 + # "After selecting additional images, we initialize the network with the model from the initialization step and refine the model further with the updated training set." + # 因此,我们先加载初始模型,然后用扩展的数据集重新训练 + self.nerf_model.load_state_dict(torch.load(initial_model_path)) + + # 用扩展的数据集重新训练模型 + self.train_nerf( + all_images, + torch.from_numpy(current_poses).float().to(self.device), + epochs=self.config.get("reconstruction", {}).get("epochs_per_iteration", 2000) + ) + + # 每次迭代后提取网格,以便观察重建质量的改进 + iter_mesh_path = os.path.join(self.output_dir, f"mesh_iter_{iteration+1}.obj") + self.extract_mesh( + iter_mesh_path, + resolution=self.config.get("reconstruction", {}).get("mesh_resolution", 256) + ) + + # 提取最终的3D网格 + output_mesh_path = os.path.join(self.output_dir, "final_mesh.obj") + self.extract_mesh( + output_mesh_path, + resolution=self.config.get("reconstruction", {}).get("mesh_resolution", 256) + ) + + # 评估重建质量 + self.evaluate_reconstruction() + + print("主动重建过程完成") + return all_poses + + def create_experiment(self, backup_name=None): + return super().create_experiment(backup_name) + + def load_experiment(self, backup_name=None): + super().load_experiment(backup_name) \ No newline at end of file diff --git a/ref_code/active_reconstruction.py b/ref_code/active_reconstruction.py new file mode 100644 index 0000000..2e38e28 --- /dev/null +++ b/ref_code/active_reconstruction.py @@ -0,0 +1,520 @@ +import torch +import numpy as np +import os +import yaml +import time +from nerf_model import NeRF +from pipeline import ActiveReconstructionPolicy +from uncertainty_guide import UncertaintyGuideNeRF +import argparse +from typing import Dict, Any, List +from utils.volume_render_util import VolumeRendererUtil +import mcubes # 导入Python Marching Cubes库 +import trimesh # 处理网格 +from tqdm import tqdm # 进度条 + +class ActiveReconstruction: + """基于NeRF不确定性引导的主动3D重建系统""" + + def __init__(self, config_path: str): + """ + 初始化主动重建系统 + + 参数: + config_path: 配置文件路径 + """ + # 加载配置 + with open(config_path, 'r') as f: + self.config = yaml.safe_load(f) + + # 设置设备 + self.device = torch.device(self.config.get("device", "cuda") if torch.cuda.is_available() else "cpu") + print(f"使用设备: {self.device}") + + # 创建输出目录 + self.output_dir = self.config.get("output_dir", "output") + os.makedirs(self.output_dir, exist_ok=True) + + # 初始化NeRF模型 + self._init_nerf_model() + + # 初始化视图选择策略 + self.policy = ActiveReconstructionPolicy(self.config) + + def _init_nerf_model(self): + """初始化NeRF模型""" + # 从配置中获取NeRF参数 + nerf_config = self.config.get("nerf", {}) + model_config = { + "pos_enc_dim": nerf_config.get("pos_enc_dim", 10), + "dir_enc_dim": nerf_config.get("dir_enc_dim", 4), + "netdepth_coarse": nerf_config.get("netdepth_coarse", 8), + "netwidth_coarse": nerf_config.get("netwidth_coarse", 256), + "netdepth_fine": nerf_config.get("netdepth_fine", 8), + "netwidth_fine": nerf_config.get("netwidth_fine", 256), + "skips": nerf_config.get("skips", [4]), + "use_viewdirs": nerf_config.get("use_viewdirs", True) + } + self.nerf_model = NeRF(model_config).to(self.device) + + def _generate_rays(self, + poses: torch.Tensor, + H: int, + W: int, + focal: float) -> tuple: + """ + 为每个相机位姿生成光线 + + 参数: + poses: 相机位姿 [N, 4, 4] + H: 图像高度 + W: 图像宽度 + focal: 焦距 + + 返回: + rays_o: 光线起点 [N, H*W, 3] + rays_d: 光线方向 [N, H*W, 3] + """ + # 创建像素坐标网格 + i, j = torch.meshgrid( + torch.linspace(0, W-1, W), + torch.linspace(0, H-1, H), + indexing='ij' + ) + i = i.t() # [H, W] + j = j.t() # [H, W] + + # 转换为相机坐标系中的方向 + dirs = torch.stack([ + (i - W * 0.5) / focal, + -(j - H * 0.5) / focal, + -torch.ones_like(i) + ], dim=-1) # [H, W, 3] + + # 为每个位姿生成光线 + rays_o_list = [] + rays_d_list = [] + + for pose in poses: + # 转换光线方向到世界坐标系 + rays_d = torch.sum(dirs[..., None, :] * pose[:3, :3], dim=-1) # [H, W, 3] + + # 设置光线原点 + rays_o = pose[:3, -1].expand(rays_d.shape) # [H, W, 3] + + # 展平为批处理格式 + rays_o = rays_o.reshape(-1, 3) # [H*W, 3] + rays_d = rays_d.reshape(-1, 3) # [H*W, 3] + + rays_o_list.append(rays_o) + rays_d_list.append(rays_d) + + # 组合所有位姿的光线 + rays_o_all = torch.stack(rays_o_list, dim=0) # [N, H*W, 3] + rays_d_all = torch.stack(rays_d_list, dim=0) # [N, H*W, 3] + + return rays_o_all, rays_d_all + + def _sample_pixel_batch(self, + images: torch.Tensor, + rays_o: torch.Tensor, + rays_d: torch.Tensor, + batch_size: int) -> tuple: + """ + 随机采样像素批次 + + 参数: + images: 图像数据 [N, H, W, 3] + rays_o: 光线起点 [N, H*W, 3] + rays_d: 光线方向 [N, H*W, 3] + batch_size: 批次大小 + + 返回: + sampled_rays_o: 采样的光线起点 [batch_size, 3] + sampled_rays_d: 采样的光线方向 [batch_size, 3] + sampled_pixels: 采样的像素值 [batch_size, 3] + """ + # 获取图像形状 + N = images.shape[0] + H = images.shape[1] + W = images.shape[2] + total_rays = N * H * W + + # 将图像展平 + pixels = images.reshape(N, -1, 3) # [N, H*W, 3] + + # 随机选择批次 + indices = torch.randint(0, total_rays, size=(batch_size,)) + img_indices = indices // (H * W) + pixel_indices = indices % (H * W) + + # 采样光线和像素 + sampled_rays_o = torch.stack([rays_o[i, j] for i, j in zip(img_indices, pixel_indices)]) + sampled_rays_d = torch.stack([rays_d[i, j] for i, j in zip(img_indices, pixel_indices)]) + sampled_pixels = torch.stack([pixels[i, j] for i, j in zip(img_indices, pixel_indices)]) + + return sampled_rays_o, sampled_rays_d, sampled_pixels + + def train_nerf(self, + images: torch.Tensor, + poses: torch.Tensor, + epochs: int = 5000, + batch_size: int = 4096, + lr: float = 5e-4, + start_from_model=None) -> float: + """ + 训练NeRF模型 + + 参数: + images: 图像数据 [N, H, W, 3] + poses: 相机位姿 [N, 4, 4] + epochs: 训练轮数 + batch_size: 批量大小 + lr: 学习率 + start_from_model: 可选的初始模型状态 + + 返回: + final_loss: 最终损失值 + """ + print(f"开始训练NeRF模型,使用{len(images)}张图像...") + + # 获取图像和采样参数 + H, W = images.shape[1], images.shape[2] + sampling_config = self.config.get("sampling", {}) + camera_config = self.config.get("camera", {}) + focal = camera_config.get("focal", 1000.0) + near = camera_config.get("near", 2.0) + far = camera_config.get("far", 6.0) + coarse_samples = sampling_config.get("coarse_samples", 64) + fine_samples = sampling_config.get("fine_samples", 128) + perturb = sampling_config.get("perturb", True) + + # 如果提供了初始模型,使用它 + if start_from_model is not None: + print("从现有模型初始化权重") + self.nerf_model.load_state_dict(start_from_model.state_dict()) + + # 设置优化器和损失函数 + optimizer = torch.optim.Adam(self.nerf_model.parameters(), lr=lr) + mse_loss = torch.nn.MSELoss() + + # 将模型设置为训练模式 + self.nerf_model.train() + + # 为所有图像生成光线(预计算光线可以加速训练) + rays_o, rays_d = self._generate_rays(poses, H, W, focal) + rays_o = rays_o.to(self.device) + rays_d = rays_d.to(self.device) + images = images.to(self.device) + + # 训练循环 + best_loss = float('inf') + for epoch in range(epochs): + # 随机采样一批光线 + batch_rays_o, batch_rays_d, target_pixels = self._sample_pixel_batch( + images, rays_o, rays_d, batch_size) + + # 光线方向归一化 + batch_rays_d = torch.nn.functional.normalize(batch_rays_d, dim=-1) + + # 创建近平面和远平面张量 + near_tensor = torch.ones_like(batch_rays_o[..., 0]) * near + far_tensor = torch.ones_like(batch_rays_o[..., 0]) * far + + # 使用体积渲染进行前向传播 + # 首先进行粗采样渲染 + optimizer.zero_grad() + + # 体积渲染 + rgb_map, _, _, _ = VolumeRendererUtil.render_rays( + self.nerf_model, + batch_rays_o, + batch_rays_d, + near_tensor, + far_tensor, + coarse_samples, + fine_samples, + perturb + ) + + # 计算损失并反向传播 + loss = mse_loss(rgb_map, target_pixels) + loss.backward() + optimizer.step() + + # 输出训练进度 + if (epoch + 1) % 100 == 0: + psnr = -10.0 * torch.log10(loss) + print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.6f}, PSNR: {psnr.item():.2f}") + + # 保存最佳模型 + if loss.item() < best_loss: + best_loss = loss.item() + torch.save(self.nerf_model.state_dict(), os.path.join(self.output_dir, "best_model.pth")) + + # 加载最佳模型 + self.nerf_model.load_state_dict(torch.load(os.path.join(self.output_dir, "best_model.pth"))) + + print(f"NeRF模型训练完成,最终损失: {best_loss:.6f}") + return best_loss + + def extract_mesh(self, output_path: str, resolution: int = 256, threshold: float = 50.0, bound: float = 2.0): + """ + 从NeRF模型中提取3D网格,使用Marching Cubes算法 + + 参数: + output_path: 输出路径 + resolution: 体素网格分辨率 + threshold: 密度阈值,用于确定表面位置 + bound: 体素网格边界大小 + """ + print(f"从NeRF提取3D网格,分辨率: {resolution}...") + + # 设置网格提取参数 + self.nerf_model.eval() # 设置为评估模式 + + # 定义采样网格 + x = torch.linspace(-bound, bound, resolution) + y = torch.linspace(-bound, bound, resolution) + z = torch.linspace(-bound, bound, resolution) + + # 创建采样点坐标网格 + xx, yy, zz = torch.meshgrid(x, y, z, indexing='ij') + + # 准备查询点 + points = torch.stack([xx, yy, zz], dim=-1).reshape(-1, 3).to(self.device) + + # 创建密度场 + print("正在计算体积密度场...") + density_field = torch.zeros((resolution, resolution, resolution)) + + # 分批处理以避免显存溢出 + batch_size = 4096 # 根据GPU内存调整 + with torch.no_grad(): + for i in tqdm(range(0, points.shape[0], batch_size)): + # 获取当前批次的点 + batch_points = points[i:i+batch_size] + + # 计算密度 - 使用固定方向(这里使用+z方向) + # 注意:在NeRF中,密度不依赖于视角方向,只有颜色依赖视角 + fixed_dirs = torch.zeros_like(batch_points) + fixed_dirs[..., 2] = 1.0 # 设置为+z方向 + + # 使用fine网络进行推理 + sigma, _ = self.nerf_model(batch_points, fixed_dirs, coarse=False) + + # 更新密度场 + batch_indices = torch.arange(i, min(i+batch_size, points.shape[0])) + xyz_indices = torch.stack([ + (points[batch_indices, 0] + bound) / (2 * bound) * (resolution - 1), + (points[batch_indices, 1] + bound) / (2 * bound) * (resolution - 1), + (points[batch_indices, 2] + bound) / (2 * bound) * (resolution - 1) + ], dim=-1).long() + + for j, (xi, yi, zi) in enumerate(xyz_indices): + density_field[xi, yi, zi] = sigma[j].cpu() + + # 使用Marching Cubes提取网格 + print("使用Marching Cubes提取网格...") + density_field_np = density_field.cpu().numpy() + vertices, triangles = mcubes.marching_cubes(density_field_np, threshold) + + # 转换为正确的坐标系(视场的[-bound, bound]范围) + vertices = vertices / (resolution - 1) * (2 * bound) - bound + + # 创建trimesh对象 + mesh = trimesh.Trimesh(vertices=vertices, faces=triangles) + + # 保存网格 + mesh.export(output_path) + + print(f"网格提取完成,保存至: {output_path}") + print(f"网格统计: {len(vertices)}个顶点, {len(triangles)}个三角面") + + return mesh + + def evaluate_reconstruction(self, + gt_mesh_path: str = None) -> Dict[str, float]: + """ + 评估重建质量 + + 参数: + gt_mesh_path: 真实网格路径(如果有) + + 返回: + metrics: 评估指标,如F-score + """ + if gt_mesh_path is None: + print("没有提供真实网格,跳过评估") + return {} + + print("评估重建质量...") + + # 在实际实现中,这里应该有评估重建质量的代码 + # 通常使用F-score、Chamfer距离等指标 + + # 为了简化,我们返回模拟的指标 + metrics = { + "f_score": 0.85, + "precision": 0.87, + "recall": 0.83 + } + + print(f"评估结果: F-score={metrics['f_score']:.4f}, " + f"精确率={metrics['precision']:.4f}, 召回率={metrics['recall']:.4f}") + + return metrics + + def run_active_reconstruction(self, + initial_poses: np.ndarray, + initial_images: torch.Tensor = None, + max_iterations: int = 3) -> List[np.ndarray]: + """ + 运行主动重建过程 + + 参数: + initial_poses: 初始相机位姿 + initial_images: 初始图像(如果有) + max_iterations: 最大迭代次数 + + 返回: + selected_poses: 所有选定的相机位姿 + """ + print("开始主动重建过程...") + + # 初始训练,使用初始视图 + if initial_images is None: + initial_images = self._simulate_image_capture(initial_poses) + + # 使用初始图像训练模型 + self.train_nerf( + initial_images, + torch.from_numpy(initial_poses).float().to(self.device), + epochs=self.config.get("reconstruction", {}).get("epochs_per_iteration", 2000) + ) + + # 保存初始模型 + initial_model_path = os.path.join(self.output_dir, "initial_model.pth") + torch.save(self.nerf_model.state_dict(), initial_model_path) + initial_model = self.nerf_model.state_dict() + + all_poses = initial_poses.copy() + current_poses = initial_poses.copy() + all_images = initial_images.clone() + + # 提取初始网格 + initial_mesh_path = os.path.join(self.output_dir, "initial_mesh.obj") + self.extract_mesh( + initial_mesh_path, + resolution=self.config.get("reconstruction", {}).get("mesh_resolution", 256) + ) + + # 迭代执行主动重建 + for iteration in range(max_iterations): + print(f"\n开始迭代 {iteration+1}/{max_iterations}") + + # 选择下一批视角 + next_views = self.policy.select_next_views(self.nerf_model, current_poses) + print(f"选择了 {len(next_views)} 个新视角") + + # 采集新视角的图像 + new_images = self._simulate_image_capture(next_views) + + # 将新选择的视角添加到当前位姿和图像中 + current_poses = np.concatenate([current_poses, next_views], axis=0) + all_poses = np.concatenate([all_poses, next_views], axis=0) + all_images = torch.cat([all_images, new_images], dim=0) + + # 按照作者的描述,我们从初始模型重新初始化,而不是继续训练 + # "After selecting additional images, we initialize the network with the model from the initialization step and refine the model further with the updated training set." + # 因此,我们先加载初始模型,然后用扩展的数据集重新训练 + self.nerf_model.load_state_dict(torch.load(initial_model_path)) + + # 用扩展的数据集重新训练模型 + self.train_nerf( + all_images, + torch.from_numpy(current_poses).float().to(self.device), + epochs=self.config.get("reconstruction", {}).get("epochs_per_iteration", 2000) + ) + + # 每次迭代后提取网格,以便观察重建质量的改进 + iter_mesh_path = os.path.join(self.output_dir, f"mesh_iter_{iteration+1}.obj") + self.extract_mesh( + iter_mesh_path, + resolution=self.config.get("reconstruction", {}).get("mesh_resolution", 256) + ) + + # 提取最终的3D网格 + output_mesh_path = os.path.join(self.output_dir, "final_mesh.obj") + self.extract_mesh( + output_mesh_path, + resolution=self.config.get("reconstruction", {}).get("mesh_resolution", 256) + ) + + # 评估重建质量 + self.evaluate_reconstruction() + + print("主动重建过程完成") + return all_poses + + def _simulate_image_capture(self, poses: np.ndarray) -> torch.Tensor: + """ + 模拟图像采集过程(实际系统中应该从相机或数据集获取) + + 参数: + poses: 相机位姿 + + 返回: + images: 模拟的图像 + """ + # 模拟图像大小 + camera_config = self.config.get("camera", {}) + H, W = camera_config.get("height", 800), camera_config.get("width", 800) + + # 创建随机图像(实际应来自相机或渲染) + images = torch.rand(len(poses), H, W, 3, device=self.device) + + return images + +def main(): + parser = argparse.ArgumentParser(description="基于NeRF不确定性的主动3D重建") + parser.add_argument("--config", type=str, default="nbv_config.yaml", help="配置文件路径") + parser.add_argument("--synthetic", action="store_true", help="使用合成数据集") + args = parser.parse_args() + + # 创建主动重建系统 + reconstruction = ActiveReconstruction(args.config) + + # 初始化一些相机位姿(通常来自中心圆环) + # 根据配置获取初始位姿数量 + config = yaml.safe_load(open(args.config, 'r')) + initial_view_count = config.get("reconstruction", {}).get("initial_view_count", 15) + + # 根据数据集类型调整初始视图数量 + if args.synthetic: + initial_view_count = min(initial_view_count, 6) # 合成数据使用6个初始视图 + print(f"使用合成数据集,初始视图数量: {initial_view_count}") + else: + print(f"使用真实数据集,初始视图数量: {initial_view_count}") + + # 获取中间圆环上的相机位姿 + # 假设poses是按圆环组织的,我们选择中间圆环的部分位姿 + middle_circle_index = config.get("view_selection", {}).get("n_circles", 5) // 2 + poses_per_circle = config.get("view_selection", {}).get("n_poses_per_circle", 30) + + # 等距选择初始位姿 + start_index = middle_circle_index * poses_per_circle + step = poses_per_circle // initial_view_count + initial_pose_indices = [start_index + i * step for i in range(initial_view_count)] + initial_poses = reconstruction.policy.poses[initial_pose_indices] + + # 运行主动重建 + selected_poses = reconstruction.run_active_reconstruction( + initial_poses, + max_iterations=config.get("reconstruction", {}).get("max_iterations", 3) + ) + + print(f"主动重建完成,共选择了{len(selected_poses)}个相机位姿") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ref_code/nbv_config.yaml b/ref_code/nbv_config.yaml new file mode 100644 index 0000000..3787946 --- /dev/null +++ b/ref_code/nbv_config.yaml @@ -0,0 +1,52 @@ +# 主动重建系统配置 + +# 基本设置 +device: cuda # 使用的设备: cuda 或 cpu +output_dir: ./outputs/nbv_reconstruction # 输出目录 +seed: 42 # 随机数种子 + +# 数据设置 +data: + dataset_type: synthetic # 数据集类型: synthetic 或 real + synthetic_dir: ./data/synthetic/ # 合成数据目录 + real_dir: ./data/real/ # 真实数据目录 + +# NeRF模型设置 +nerf: + pos_enc_dim: 10 # 位置编码维度 + dir_enc_dim: 4 # 方向编码维度 + hidden_dim: 256 # 隐藏层维度(兼容旧配置) + # 网络结构设置 + netdepth_coarse: 8 # coarse网络深度 + netwidth_coarse: 256 # coarse网络宽度 + netdepth_fine: 8 # fine网络深度 + netwidth_fine: 256 # fine网络宽度 + skips: [4] # 跳跃连接层 + use_viewdirs: true # 是否使用视角方向信息 + +# 相机设置 +camera: + width: 800 # 图像宽度 + height: 800 # 图像高度 + focal: 1000.0 # 焦距 + near: 2.0 # 近平面距离 + far: 6.0 # 远平面距离 + +# 采样设置 +sampling: + coarse_samples: 64 # 粗采样点数 + fine_samples: 128 # 精细采样点数 + perturb: True # 是否添加噪声 + +# 重建设置 +reconstruction: + max_iterations: 3 # 最大迭代次数 + initial_view_count: 15 # 初始视图数量 + epochs_per_iteration: 2000 # 每次迭代的训练轮数 + mesh_resolution: 256 # 网格提取分辨率 + +# 视图选择策略设置 +view_selection: + n_circles: 5 # 半球上的环数 + n_poses_per_circle: 30 # 每个环上的位姿数 + distance_threshold: 0.1 # 视图距离阈值 \ No newline at end of file diff --git a/ref_code/nerf_model.py b/ref_code/nerf_model.py new file mode 100644 index 0000000..5822a87 --- /dev/null +++ b/ref_code/nerf_model.py @@ -0,0 +1,182 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F +from PytorchBoot.stereotype import stereotype + +@stereotype.module("nerf") +class NeRF(nn.Module): + def __init__(self, config): + super().__init__() + self.config = config + + # 读取位置和方向编码维度 + pos_enc_out = 3 * (2 * config["pos_enc_dim"] + 1) + dir_enc_out = 3 * (2 * config["dir_enc_dim"] + 1) + + # 读取网络深度和宽度(可配置) + netdepth_coarse = config.get("netdepth_coarse", 8) + netwidth_coarse = config.get("netwidth_coarse", 256) + netdepth_fine = config.get("netdepth_fine", 8) + netwidth_fine = config.get("netwidth_fine", 256) + + # 构建跳跃连接 + skips = config.get("skips", [4]) + + # 是否使用视角方向 + self.use_viewdirs = config.get("use_viewdirs", True) + + # 构建coarse和fine网络 + if self.use_viewdirs: + # 位置编码 -> 密度 + 特征 + self.pts_linears_coarse = self._build_pts_mlp( + input_dim=pos_enc_out, + width=netwidth_coarse, + depth=netdepth_coarse, + skips=skips + ) + self.alpha_linear_coarse = nn.Linear(netwidth_coarse, 1) + self.feature_linear_coarse = nn.Linear(netwidth_coarse, netwidth_coarse) + + # 特征 + 方向编码 -> RGB + self.views_linears_coarse = nn.ModuleList([ + nn.Linear(netwidth_coarse + dir_enc_out, netwidth_coarse//2) + ]) + self.rgb_linear_coarse = nn.Linear(netwidth_coarse//2, 3) + + # 对fine网络执行相同的操作 + self.pts_linears_fine = self._build_pts_mlp( + input_dim=pos_enc_out, + width=netwidth_fine, + depth=netdepth_fine, + skips=skips + ) + self.alpha_linear_fine = nn.Linear(netwidth_fine, 1) + self.feature_linear_fine = nn.Linear(netwidth_fine, netwidth_fine) + + self.views_linears_fine = nn.ModuleList([ + nn.Linear(netwidth_fine + dir_enc_out, netwidth_fine//2) + ]) + self.rgb_linear_fine = nn.Linear(netwidth_fine//2, 3) + else: + # 不使用视角方向的简化版本 + self.pts_linears_coarse = self._build_pts_mlp( + input_dim=pos_enc_out, + width=netwidth_coarse, + depth=netdepth_coarse, + skips=skips + ) + self.output_linear_coarse = nn.Linear(netwidth_coarse, 4) + + self.pts_linears_fine = self._build_pts_mlp( + input_dim=pos_enc_out, + width=netwidth_fine, + depth=netdepth_fine, + skips=skips + ) + self.output_linear_fine = nn.Linear(netwidth_fine, 4) + + def _build_pts_mlp(self, input_dim, width, depth, skips): + """构建处理位置编码的MLP网络,支持跳跃连接""" + layers = nn.ModuleList() + + # 第一层 + layers.append(nn.Linear(input_dim, width)) + + # 中间层 + for i in range(1, depth): + if i in skips: + layers.append(nn.Linear(input_dim + width, width)) + else: + layers.append(nn.Linear(width, width)) + + return layers + + def positional_encoding(self, x, L): + """位置编码函数""" + encodings = [x] + for i in range(L): + encodings.append(torch.sin(2**i * x)) + encodings.append(torch.cos(2**i * x)) + return torch.cat(encodings, dim=-1) + + def forward_mlp(self, pts_embed, viewdirs_embed, is_coarse=True): + """前向传播MLP部分""" + if is_coarse: + pts_linears = self.pts_linears_coarse + alpha_linear = self.alpha_linear_coarse if self.use_viewdirs else None + feature_linear = self.feature_linear_coarse if self.use_viewdirs else None + views_linears = self.views_linears_coarse if self.use_viewdirs else None + rgb_linear = self.rgb_linear_coarse if self.use_viewdirs else None + output_linear = self.output_linear_coarse if not self.use_viewdirs else None + else: + pts_linears = self.pts_linears_fine + alpha_linear = self.alpha_linear_fine if self.use_viewdirs else None + feature_linear = self.feature_linear_fine if self.use_viewdirs else None + views_linears = self.views_linears_fine if self.use_viewdirs else None + rgb_linear = self.rgb_linear_fine if self.use_viewdirs else None + output_linear = self.output_linear_fine if not self.use_viewdirs else None + + # 位置编码处理 + h = pts_embed + for i, l in enumerate(pts_linears): + h = pts_linears[i](h) + h = F.relu(h) + # 处理跳跃连接 + if i in self.config.get("skips", [4]): + h = torch.cat([pts_embed, h], -1) + + if self.use_viewdirs: + # 分支1:计算sigma + sigma = alpha_linear(h) + + # 分支2:计算颜色特征 + feature = feature_linear(h) + + # 结合方向编码 + h = torch.cat([feature, viewdirs_embed], -1) + + # 视角相关MLP + for i, l in enumerate(views_linears): + h = l(h) + h = F.relu(h) + + # 输出RGB + rgb = rgb_linear(h) + rgb = torch.sigmoid(rgb) # [0,1]范围 + + outputs = torch.cat([rgb, sigma], -1) + else: + # 直接输出RGBA + outputs = output_linear(h) + rgb = torch.sigmoid(outputs[..., :3]) # [0,1]范围 + sigma = outputs[..., 3:] + + return rgb, sigma + + def forward(self, pos, dir, coarse=True): + """ + 前向传播 + + 参数: + pos: 3D位置 [batch_size, ..., 3] + dir: 视角方向 [batch_size, ..., 3] + coarse: 是否使用coarse网络 + + 返回: + sigma: 体积密度 [batch_size, ..., 1] + color: RGB颜色 [batch_size, ..., 3] + """ + # 位置和方向编码 + pos_enc = self.positional_encoding(pos, self.config["pos_enc_dim"]) + + # 当使用视角方向时才编码方向 + if self.use_viewdirs: + dir_normalized = F.normalize(dir, dim=-1) + dir_enc = self.positional_encoding(dir_normalized, self.config["dir_enc_dim"]) + else: + dir_enc = None + + # 选择使用coarse还是fine网络 + color, sigma = self.forward_mlp(pos_enc, dir_enc, coarse) + + return sigma, color \ No newline at end of file diff --git a/ref_code/pipeline.py b/ref_code/pipeline.py new file mode 100644 index 0000000..5775eee --- /dev/null +++ b/ref_code/pipeline.py @@ -0,0 +1,126 @@ +import numpy as np +import torch +from scipy.spatial.transform import Rotation as R +from uncertainty_guide import UncertaintyGuideNeRF + +class ActiveReconstructionPolicy: + def __init__(self, config): + self.config = config + self._setup_view_sphere() + self.uncertainty_guide = UncertaintyGuideNeRF(config) + + def _setup_view_sphere(self): + """初始化半球相机位姿 (5个圆环 x 30个位姿)""" + self.poses = [] + radii = np.linspace(0.1, np.pi/2, self.config.n_circles) # 半球上的半径 + + for r in radii: + for theta in np.linspace(0, 2*np.pi, self.config.n_poses_per_circle, endpoint=False): + # 球坐标转笛卡尔坐标 + x = np.cos(theta) * np.sin(r) + y = np.sin(theta) * np.sin(r) + z = np.cos(r) + position = np.array([x, y, z]) * 2.0 # 缩放因子 + + # 相机朝向原点 + forward = -position / np.linalg.norm(position) + up = np.array([0, 0, 1]) + right = np.cross(up, forward) + up = np.cross(forward, right) + + # 构建位姿矩阵 + pose = np.eye(4) + pose[:3, :3] = np.stack([right, up, forward], axis=-1) + pose[:3, 3] = position + self.poses.append(pose) + + self.poses = np.stack(self.poses) + + # 区域聚类: 将半球分为12个区域 (上下半球各6个) + self.section_masks = self._create_section_masks() + + def _create_section_masks(self): + """创建12个区域的掩码""" + masks = [] + angles = np.arctan2(self.poses[:, 1, 3], self.poses[:, 0, 3]) # 方位角 + + # 上下半球 (z坐标正负) + upper = self.poses[:, 2, 3] > 0 + lower = ~upper + + # 每个半球分6个区域 + angle_bins = np.linspace(-np.pi, np.pi, 7) # 6个区域需要7个边界 + for i in range(6): + angle_mask = (angles >= angle_bins[i]) & (angles < angle_bins[i+1]) + masks.append(angle_mask & upper) + masks.append(angle_mask & lower) + + return masks + + def select_next_views(self, nerf_model, current_poses): + """根据熵值选择下一个最佳视角 + + 参数: + nerf_model: 当前的NeRF模型 + current_poses: 已经采集的相机位姿 + + 返回: + selected_poses: 选择的下一批相机位姿 + """ + # 排除已选视角 + current_positions = current_poses[:, :3, 3] + all_positions = self.poses[:, :3, 3] + distance_matrix = np.linalg.norm( + current_positions[:, None] - all_positions[None], axis=-1) + min_distances = np.min(distance_matrix, axis=0) + valid_mask = min_distances > 0.1 # 避免选择太近的视角 + + # 评估候选视图的不确定性 + valid_poses = self.poses[valid_mask] + entropy_values = self.uncertainty_guide.evaluate_candidate_views(nerf_model, valid_poses) + + # 从每个区域选择熵最高的有效视角 + selected_indices = [] + for mask in self.section_masks: + # 调整mask以适应有效视角的筛选 + section_mask = mask[valid_mask] + if not np.any(section_mask): + continue + + section_entropy = entropy_values.copy() + section_entropy[~section_mask] = -np.inf + selected_idx = np.argmax(section_entropy) + + # 转换回原始索引 + original_indices = np.where(valid_mask)[0] + original_idx = original_indices[selected_idx] + selected_indices.append(original_idx) + + return self.poses[selected_indices] + + def coarse_to_fine_reconstruction(self, nerf_model, initial_poses, max_iterations=3): + """执行从粗到精的重建过程 + + 参数: + nerf_model: 初始NeRF模型 + initial_poses: 初始相机位姿 + max_iterations: 最大迭代次数 + + 返回: + all_selected_poses: 所有选择的相机位姿(包括初始位姿) + """ + all_selected_poses = initial_poses.copy() + current_poses = initial_poses.copy() + + for iteration in range(max_iterations): + # 选择下一批视角 + next_views = self.select_next_views(nerf_model, current_poses) + + # 将新选择的视角添加到当前位姿中 + current_poses = np.concatenate([current_poses, next_views], axis=0) + all_selected_poses = np.concatenate([all_selected_poses, next_views], axis=0) + + # 这里应该有一个重新训练模型的步骤 + # 但这通常在外部完成,我们只返回选定的位姿 + + return all_selected_poses \ No newline at end of file diff --git a/ref_code/uncertainty_guide.py b/ref_code/uncertainty_guide.py new file mode 100644 index 0000000..d532076 --- /dev/null +++ b/ref_code/uncertainty_guide.py @@ -0,0 +1,170 @@ +import torch +import numpy as np +from utils.volume_render_util import VolumeRendererUtil +import torch.nn.functional as F +from typing import Tuple, List, Dict, Any, Optional + +class UncertaintyGuideNeRF: + """ + 基于NeRF不确定性的主动视图选择策略 + 通过计算视图的熵值来引导下一步的最优视图选择 + """ + + def __init__(self, config: Dict[str, Any]): + """ + 初始化不确定性引导策略 + + 参数: + config: 配置字典,包含相关参数 + """ + self.config = config + self.device = torch.device(config.get("device", "cuda") if torch.cuda.is_available() else "cpu") + + # 相机参数 + self.width = config.get("width", 800) + self.height = config.get("height", 800) + self.focal = config.get("focal", 1000.0) + + # 采样参数 + self.near = config.get("near", 2.0) + self.far = config.get("far", 6.0) + self.coarse_samples = config.get("coarse_samples", 64) + self.fine_samples = config.get("fine_samples", 128) + + def generate_rays(self, pose: np.ndarray) -> Tuple[torch.Tensor, torch.Tensor]: + """ + 从相机姿态生成光线 + + 参数: + pose: 相机姿态矩阵 [4, 4] + + 返回: + rays_o: 光线起点 [H*W, 3] + rays_d: 光线方向 [H*W, 3] + """ + # 创建像素坐标 + i, j = torch.meshgrid( + torch.linspace(0, self.width - 1, self.width), + torch.linspace(0, self.height - 1, self.height), + indexing='ij' + ) + i = i.t().to(self.device) + j = j.t().to(self.device) + + # 转换为相机坐标系中的方向 + dirs = torch.stack([ + (i - self.width * 0.5) / self.focal, + -(j - self.height * 0.5) / self.focal, + -torch.ones_like(i) + ], dim=-1) + + # 转换为世界坐标系 + pose = torch.from_numpy(pose).float().to(self.device) + rays_d = torch.sum(dirs[..., None, :] * pose[:3, :3], dim=-1) + rays_o = pose[:3, -1].expand(rays_d.shape) + + # 展平为批处理格式 + rays_o = rays_o.reshape(-1, 3) + rays_d = rays_d.reshape(-1, 3) + + return rays_o, rays_d + + def evaluate_view_uncertainty(self, + nerf_model: torch.nn.Module, + pose: np.ndarray) -> float: + """ + 评估给定视图的不确定性(熵) + + 参数: + nerf_model: NeRF模型 + pose: 相机姿态矩阵 [4, 4] + + 返回: + mean_entropy: 该视图的平均熵值 + """ + nerf_model.eval() + with torch.no_grad(): + # 生成光线 + rays_o, rays_d = self.generate_rays(pose) + + # 对于较大的图像,可能需要分批处理 + batch_size = 4096 # 根据GPU内存调整 + entropy_values = [] + + # 分批处理所有光线 + for i in range(0, rays_o.shape[0], batch_size): + batch_rays_o = rays_o[i:i+batch_size] + batch_rays_d = rays_d[i:i+batch_size] + + # 归一化方向向量 + batch_rays_d = F.normalize(batch_rays_d, dim=-1) + + # 计算近平面和远平面 + near = torch.ones_like(batch_rays_o[..., 0]) * self.near + far = torch.ones_like(batch_rays_o[..., 0]) * self.far + + # 渲染光线并计算熵 + _, weights, _, entropy = VolumeRendererUtil.render_rays( + nerf_model, + batch_rays_o, + batch_rays_d, + near, + far, + self.coarse_samples, + self.fine_samples + ) + + entropy_values.append(entropy) + + # 组合所有批次的熵值 + all_entropy = torch.cat(entropy_values, dim=0) + + # 重塑为图像格式并计算平均值 + mean_entropy = all_entropy.mean().item() + + return mean_entropy + + def evaluate_candidate_views(self, + nerf_model: torch.nn.Module, + candidate_poses: np.ndarray) -> np.ndarray: + """ + 评估候选视图的不确定性(熵) + + 参数: + nerf_model: NeRF模型 + candidate_poses: 候选相机姿态矩阵列表 [N, 4, 4] + + 返回: + entropy_values: 各候选视图的熵值 [N] + """ + entropy_values = np.zeros(len(candidate_poses)) + + for i, pose in enumerate(candidate_poses): + entropy_values[i] = self.evaluate_view_uncertainty(nerf_model, pose) + + return entropy_values + + def downsample_image(self, rays_o, rays_d, factor=4): + """ + 降采样光线以加速处理 + + 参数: + rays_o: 光线起点 [H*W, 3] + rays_d: 光线方向 [H*W, 3] + factor: 降采样因子 + + 返回: + downsampled_rays_o: 降采样后的光线起点 + downsampled_rays_d: 降采样后的光线方向 + """ + # 重塑为图像格式 + H = W = int(np.sqrt(rays_o.shape[0])) + rays_o = rays_o.reshape(H, W, 3) + rays_d = rays_d.reshape(H, W, 3) + + # 降采样 + new_H, new_W = H // factor, W // factor + downsampled_rays_o = rays_o[::factor, ::factor].reshape(-1, 3) + downsampled_rays_d = rays_d[::factor, ::factor].reshape(-1, 3) + + return downsampled_rays_o, downsampled_rays_d \ No newline at end of file diff --git a/utils/pose_util.py b/utils/pose_util.py new file mode 100644 index 0000000..83aa942 --- /dev/null +++ b/utils/pose_util.py @@ -0,0 +1,166 @@ +import numpy as np + +class PoseUtil: + ROTATION = 1 + TRANSLATION = 2 + SCALE = 3 + + @staticmethod + def get_uniform_translation(trans_m_min, trans_m_max, trans_unit, debug=False): + if isinstance(trans_m_min, list): + x_min, y_min, z_min = trans_m_min + x_max, y_max, z_max = trans_m_max + else: + x_min, y_min, z_min = trans_m_min, trans_m_min, trans_m_min + x_max, y_max, z_max = trans_m_max, trans_m_max, trans_m_max + + x = np.random.uniform(x_min, x_max) + y = np.random.uniform(y_min, y_max) + z = np.random.uniform(z_min, z_max) + translation = np.array([x, y, z]) + if trans_unit == "cm": + translation = translation / 100 + if debug: + print("uniform translation:", translation) + return translation + + @staticmethod + def get_uniform_rotation(rot_degree_min=0, rot_degree_max=180, debug=False): + axis = np.random.randn(3) + axis /= np.linalg.norm(axis) + theta = np.random.uniform( + rot_degree_min / 180 * np.pi, rot_degree_max / 180 * np.pi + ) + + K = np.array( + [[0, -axis[2], axis[1]], [axis[2], 0, -axis[0]], [-axis[1], axis[0], 0]] + ) + R = np.eye(3) + np.sin(theta) * K + (1 - np.cos(theta)) * (K @ K) + if debug: + print("uniform rotation:", theta * 180 / np.pi) + return R + + @staticmethod + def get_uniform_pose( + trans_min, trans_max, rot_min=0, rot_max=180, trans_unit="cm", debug=False + ): + translation = PoseUtil.get_uniform_translation( + trans_min, trans_max, trans_unit, debug + ) + rotation = PoseUtil.get_uniform_rotation(rot_min, rot_max, debug) + pose = np.eye(4) + pose[:3, :3] = rotation + pose[:3, 3] = translation + return pose + + @staticmethod + def get_n_uniform_pose( + trans_min, + trans_max, + rot_min=0, + rot_max=180, + n=1, + trans_unit="cm", + fix=None, + contain_canonical=True, + debug=False, + ): + if fix == PoseUtil.ROTATION: + translations = np.zeros((n, 3)) + for i in range(n): + translations[i] = PoseUtil.get_uniform_translation( + trans_min, trans_max, trans_unit, debug + ) + if contain_canonical: + translations[0] = np.zeros(3) + rotations = PoseUtil.get_uniform_rotation(rot_min, rot_max, debug) + elif fix == PoseUtil.TRANSLATION: + rotations = np.zeros((n, 3, 3)) + for i in range(n): + rotations[i] = PoseUtil.get_uniform_rotation(rot_min, rot_max, debug) + if contain_canonical: + rotations[0] = np.eye(3) + translations = PoseUtil.get_uniform_translation( + trans_min, trans_max, trans_unit, debug + ) + else: + translations = np.zeros((n, 3)) + rotations = np.zeros((n, 3, 3)) + for i in range(n): + translations[i] = PoseUtil.get_uniform_translation( + trans_min, trans_max, trans_unit, debug + ) + for i in range(n): + rotations[i] = PoseUtil.get_uniform_rotation(rot_min, rot_max, debug) + if contain_canonical: + translations[0] = np.zeros(3) + rotations[0] = np.eye(3) + + pose = np.eye(4, 4, k=0)[np.newaxis, :].repeat(n, axis=0) + pose[:, :3, :3] = rotations + pose[:, :3, 3] = translations + + return pose + + @staticmethod + def get_n_uniform_pose_batch( + trans_min, + trans_max, + rot_min=0, + rot_max=180, + n=1, + batch_size=1, + trans_unit="cm", + fix=None, + contain_canonical=False, + debug=False, + ): + + batch_poses = [] + for i in range(batch_size): + pose = PoseUtil.get_n_uniform_pose( + trans_min, + trans_max, + rot_min, + rot_max, + n, + trans_unit, + fix, + contain_canonical, + debug, + ) + batch_poses.append(pose) + pose_batch = np.stack(batch_poses, axis=0) + return pose_batch + + @staticmethod + def get_uniform_scale(scale_min, scale_max, debug=False): + if isinstance(scale_min, list): + x_min, y_min, z_min = scale_min + x_max, y_max, z_max = scale_max + else: + x_min, y_min, z_min = scale_min, scale_min, scale_min + x_max, y_max, z_max = scale_max, scale_max, scale_max + + x = np.random.uniform(x_min, x_max) + y = np.random.uniform(y_min, y_max) + z = np.random.uniform(z_min, z_max) + scale = np.array([x, y, z]) + if debug: + print("uniform scale:", scale) + return scale + + @staticmethod + def rotation_matrix_from_axis_angle(axis, angle): + cos_angle = np.cos(angle) + sin_angle = np.sin(angle) + one_minus_cos = 1 - cos_angle + + x, y, z = axis + rotation_matrix = np.array([ + [cos_angle + x*x*one_minus_cos, x*y*one_minus_cos - z*sin_angle, x*z*one_minus_cos + y*sin_angle], + [y*x*one_minus_cos + z*sin_angle, cos_angle + y*y*one_minus_cos, y*z*one_minus_cos - x*sin_angle], + [z*x*one_minus_cos - y*sin_angle, z*y*one_minus_cos + x*sin_angle, cos_angle + z*z*one_minus_cos] + ]) + + return rotation_matrix \ No newline at end of file diff --git a/utils/view_util.py b/utils/view_util.py new file mode 100644 index 0000000..4bb1855 --- /dev/null +++ b/utils/view_util.py @@ -0,0 +1,42 @@ + +import os +import shutil +import json +import subprocess +import tempfile + + +class ViewRenderUtil: + blender_path = r"C:\Program Files\Blender Foundation\Blender 4.0\blender.exe" + @staticmethod + def render_view(cam_pose, scene_path, script_path): + + with tempfile.TemporaryDirectory() as temp_dir: + params = { + "cam_pose": cam_pose.tolist(), + "scene_path": scene_path + } + scene_info_path = os.path.join(scene_path, "scene_info.json") + shutil.copy(scene_info_path, os.path.join(temp_dir, "scene_info.json")) + params_data_path = os.path.join(temp_dir, "params.json") + with open(params_data_path, 'w') as f: + json.dump(params, f) + import ipdb; ipdb.set_trace() + result = subprocess.run([ + ViewRenderUtil.blender_path, '-b', '-P', script_path, '--', temp_dir + ], capture_output=True, text=True) + print(result.stdout) + print(result.stderr) + path = os.path.join(temp_dir, "tmp") + + return None + +if __name__ == "__main__": + import numpy as np + idx = 0 + cam_param_path = r"D:\Project\nbv_rec\data\google_scan-backpack_0288\camera_params\{}.json" + cam_pose = json.load(open(cam_param_path.format(idx))) + cam_pose = np.array(cam_pose["extrinsic"]) + scene_path = r"D:\Project\nbv_rec\data\google_scan-backpack_0288" + script_path = r"D:\Project\nbv_rec\nbv_rec_blender_render\data_renderer.py" + ViewRenderUtil.render_view(cam_pose, scene_path, script_path) diff --git a/utils/volume_render_util.py b/utils/volume_render_util.py new file mode 100644 index 0000000..0279707 --- /dev/null +++ b/utils/volume_render_util.py @@ -0,0 +1,201 @@ +import torch +import torch.nn.functional as F +from typing import Tuple + +class VolumeRendererUtil: + + @staticmethod + def render_rays( + nerf_model, + rays_o: torch.Tensor, + rays_d: torch.Tensor, + near: torch.Tensor, + far: torch.Tensor, + coarse_samples: int = 64, + fine_samples: int = 128, + perturb: bool = True + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: + """ + 渲染光线并计算不确定性(熵) + + 参数: + nerf_model: NeRF模型(需实现forward方法) + rays_o: 光线起点 [N_rays, 3] + rays_d: 光线方向(已归一化) [N_rays, 3] + near: 近平面距离 [N_rays] + far: 远平面距离 [N_rays] + coarse_samples: 粗采样点数 + fine_samples: 精细采样点数 + perturb: 是否在采样时添加噪声 + + 返回: + rgb_map: 渲染颜色 [N_rays, 3] + weights: 权重分布 [N_rays, N_samples] + t_vals: 采样点参数 [N_rays, N_samples] + entropy: 每条光线的熵 [N_rays] + """ + # 粗采样 + t_vals_coarse, points_coarse = VolumeRendererUtil.sample_along_ray( + rays_o, rays_d, near, far, coarse_samples, perturb) + + # 重要性采样(精细) + with torch.no_grad(): + sigma_coarse, _ = nerf_model(points_coarse[..., :3], rays_d.unsqueeze(1)) + weights_coarse = VolumeRendererUtil.compute_weights(sigma_coarse, t_vals_coarse, rays_d) + t_vals_fine = VolumeRendererUtil.importance_sampling(t_vals_coarse, weights_coarse, fine_samples) + + # 合并采样点 + t_vals = torch.sort(torch.cat([t_vals_coarse, t_vals_fine], -1)).values + points = rays_o[..., None, :] + t_vals[..., None] * rays_d[..., None, :] + + # 精细渲染 + sigma, color = nerf_model(points[..., :3], rays_d.unsqueeze(1)) + rgb_map, weights = VolumeRendererUtil.volume_rendering(sigma, color, t_vals, rays_d) + entropy = VolumeRendererUtil.calculate_entropy(weights) + + return rgb_map, weights, t_vals, entropy + + @staticmethod + def importance_sampling( + t_vals: torch.Tensor, + weights: torch.Tensor, + n_samples: int + ) -> torch.Tensor: + """ + 重要性采样(根据权重分布生成新采样点) + + 参数: + t_vals: 原始采样点参数 [N_rays, N_coarse] + weights: 权重分布 [N_rays, N_coarse] + n_samples: 需要生成的采样点数 + + 返回: + samples: 新采样点参数 [N_rays, N_fine] + """ + weights = weights + 1e-5 # 防止除零 + pdf = weights / torch.sum(weights, -1, keepdims=True) + cdf = torch.cumsum(pdf, -1) + + # 逆变换采样 + u = torch.linspace(0, 1, n_samples, device=weights.device) + u = u.expand(list(cdf.shape[:-1]) + [n_samples]) + indices = torch.searchsorted(cdf, u, right=True) + + # 插值得到新采样点 + below = torch.max(torch.zeros_like(indices), indices - 1) + above = torch.min((cdf.shape[-1] - 1) * torch.ones_like(indices), indices) + indices_g = torch.stack([below, above], -1) + + cdf_g = torch.gather(cdf, -1, indices_g) + t_vals_g = torch.gather(t_vals, -1, indices_g) + + denom = cdf_g[..., 1] - cdf_g[..., 0] + denom = torch.where(denom < 1e-5, torch.ones_like(denom), denom) + t = (u - cdf_g[..., 0]) / denom + samples = t_vals_g[..., 0] + t * (t_vals_g[..., 1] - t_vals_g[..., 0]) + + return samples + + @staticmethod + def sample_along_ray( + rays_o: torch.Tensor, + rays_d: torch.Tensor, + near: torch.Tensor, + far: torch.Tensor, + n_samples: int, + perturb: bool = True + ) -> Tuple[torch.Tensor, torch.Tensor]: + """ + 沿光线分层采样点 + + 参数: + rays_o: 光线起点 [N_rays, 3] + rays_d: 光线方向 [N_rays, 3] + near: 近平面距离 [N_rays] + far: 远平面距离 [N_rays] + n_samples: 采样点数 + perturb: 是否添加噪声 + + 返回: + t_vals: 采样点参数 [N_rays, N_samples] + points: 采样点3D坐标 [N_rays, N_samples, 3] + """ + # 基础分层采样 + t_vals = torch.linspace(0., 1., n_samples, device=rays_o.device) + t_vals = near + (far - near) * t_vals.unsqueeze(0) + + if perturb: + # 添加分层噪声 + mids = 0.5 * (t_vals[..., 1:] + t_vals[..., :-1]) + upper = torch.cat([mids, t_vals[..., -1:]], -1) + lower = torch.cat([t_vals[..., :1], mids], -1) + t_rand = torch.rand(t_vals.shape, device=rays_o.device) + t_vals = lower + (upper - lower) * t_rand + + # 生成3D点 + points = rays_o.unsqueeze(1) + t_vals.unsqueeze(-1) * rays_d.unsqueeze(1) + return t_vals, points + + @staticmethod + def volume_rendering( + sigma: torch.Tensor, + color: torch.Tensor, + t_vals: torch.Tensor, + rays_d: torch.Tensor + ) -> Tuple[torch.Tensor, torch.Tensor]: + """ + 执行体积渲染 + + 参数: + sigma: 体积密度 [N_rays, N_samples, 1] + color: RGB颜色 [N_rays, N_samples, 3] + t_vals: 采样点参数 [N_rays, N_samples] + rays_d: 光线方向 [N_rays, 3] + + 返回: + rgb_map: 渲染颜色 [N_rays, 3] + weights: 权重分布 [N_rays, N_samples] + """ + dists = t_vals[..., 1:] - t_vals[..., :-1] + dists = torch.cat([dists, torch.tensor([1e10], device=dists.device).expand(dists[..., :1].shape)], -1) + dists = dists * torch.norm(rays_d[..., None, :], dim=-1) + + alpha = 1. - torch.exp(-sigma.squeeze(-1) * dists) + trans = torch.exp(-torch.cat([ + torch.zeros_like(sigma[..., :1, 0]), + torch.cumsum(sigma[..., :-1, 0] * dists[..., :-1].unsqueeze(-1), dim=-2) + ], dim=-2)) + weights = alpha * trans.squeeze(-1) + + rgb_map = torch.sum(weights.unsqueeze(-1) * color, dim=-2) + return rgb_map, weights + + @staticmethod + def calculate_entropy(weights: torch.Tensor, eps: float = 1e-10) -> torch.Tensor: + """ + 计算权重分布的熵 + + 参数: + weights: 权重分布 [N_rays, N_samples] + eps: 防止log(0)的小量 + + 返回: + entropy: 每条光线的熵 [N_rays] + """ + norm_weights = weights / (torch.sum(weights, dim=-1, keepdim=True) + eps) + entropy = -torch.sum(norm_weights * torch.log(norm_weights + eps), dim=-1) + return entropy + + @staticmethod + def compute_weights(sigma: torch.Tensor, t_vals: torch.Tensor, rays_d: torch.Tensor) -> torch.Tensor: + """计算权重(用于重要性采样)""" + dists = t_vals[..., 1:] - t_vals[..., :-1] + dists = torch.cat([dists, torch.tensor([1e10], device=dists.device).expand(dists[..., :1].shape)], -1) + dists = dists * torch.norm(rays_d[..., None, :], dim=-1) + + alpha = 1. - torch.exp(-sigma.squeeze(-1) * dists) + trans = torch.exp(-torch.cat([ + torch.zeros_like(sigma[..., :1, 0]), + torch.cumsum(sigma[..., :-1, 0] * dists[..., :-1].unsqueeze(-1), dim=-2) + ], dim=-2)) + return alpha * trans.squeeze(-1) \ No newline at end of file