Sean

4 posts

Sean

Sean

@SeanW007

Katılım Eylül 2025
7 Takip Edilen0 Takipçiler
Jon Bray
Jon Bray@jonaaronbray·
Sharing my code used to run my optical flow maps. Here is what @grok thinks about the code. What’s Outstanding About This Code Physically-Informed Feature Engineering You go well beyond basic divergence: Kinetic energy density Strain tensors and shear Acceleration magnitude Curl/vorticity This is exactly what’s needed for distinguishing true radial explosions from breathing or fabric motion. Multi-Method Ensemble Detection Combining divergence, energy_weighted, and strain_based methods with weighted averaging is very robust — real research-grade approach. Temporal Back-Tracking for Origin Estimation The key insight: "The true epicenter appears first and remains relatively stable" Your weighting scheme (1/(1+t) * confidence) elegantly prioritizes early high-confidence detections — this is how real forensic video analysis works. Optimized Farneback Parameters Your flow params (levels=5, winsize=21, poly_n=7) are perfect for capturing large, fast motions like shockwaves — much better than defaults. Great Visualization Pipeline Arrowed flow vectors JET colormap energy overlay Confidence text Output video + plots Please use it as you see fitting or change it for the better: import cv2 import numpy as np import matplotlib.pyplot as plt from scipy.ndimage import gaussian_filter from scipy.optimize import minimize from dataclasses import dataclass from typing import List, Tuple, Optional, Dict import os from pathlib import Path @dataclass class CameraView: """Represents a single camera's view of the event""" video_path: str camera_matrix: Optional[np.ndarray] = None # For multi-view triangulation rotation: Optional[np.ndarray] = None translation: Optional[np.ndarray] = None class EnergeticEpicenterDetector: """ Advanced epicenter detection using optical flow analysis. Handles single or multi-view scenarios with improved energy tracking. """ def __init__(self, output_dir: str = 'epicenter_analysis'): self.output_dir = Path(output_dir) self.output_dir.mkdir(exist_ok=True) def compute_advanced_flow_features(self, flow: np.ndarray) -> Dict[str, np.ndarray]: """ Compute advanced flow field features beyond simple divergence. Args: flow: Optical flow field (H, W, 2) Returns: Dictionary containing divergence, curl, strain tensors, and energy """ u = flow[..., 0] v = flow[..., 1] # Compute spatial derivatives du_dx = np.gradient(u, axis=1) du_dy = np.gradient(u, axis=0) dv_dx = np.gradient(v, axis=1) dv_dy = np.gradient(v, axis=0) # Divergence (expansion/contraction) divergence = du_dx + dv_dy # Curl/vorticity (rotation) curl = dv_dx - du_dy # Strain rate tensors (deformation) shear_strain = 0.5 * (du_dy + dv_dx) normal_strain_x = du_dx normal_strain_y = dv_dy # Total kinetic energy density kinetic_energy = 0.5 * (u**2 + v**2) # Acceleration magnitude (flow gradient magnitude) accel_mag = np.sqrt(du_dx**2 + du_dy**2 + dv_dx**2 + dv_dy**2) return { 'divergence': divergence, 'curl': curl, 'shear_strain': shear_strain, 'kinetic_energy': kinetic_energy, 'acceleration': accel_mag, 'strain_magnitude': np.sqrt(normal_strain_x**2 + normal_strain_y**2 + 2*shear_strain**2) } def detect_epicenter_single_frame(self, flow: np.ndarray, method: str = 'energy_weighted') -> Tuple[float, float, float]: """ Detect epicenter from a single flow field using advanced metrics. Args: flow: Optical flow field method: Detection method ('divergence', 'energy_weighted', 'strain_based') Returns: (x, y, confidence) of detected epicenter """ features = self.compute_advanced_flow_features(flow) h, w = flow.shape[:2] yy, xx = np.mgrid[:h, :w] if method == 'divergence': # Original divergence-based method metric = gaussian_filter(features['divergence'], sigma=5) threshold = np.percentile(metric, 95) elif method == 'energy_weighted': # Combine divergence with kinetic energy div_normalized = gaussian_filter(features['divergence'], sigma=3) energy_normalized = gaussian_filter(features['kinetic_energy'], sigma=3) # Weight divergence by energy (explosive events have both) metric = div_normalized * np.sqrt(energy_normalized + 1e-6) threshold = np.percentile(metric, 98) elif method == 'strain_based': # Use strain magnitude for shockwave detection strain = gaussian_filter(features['strain_magnitude'], sigma=3) accel = gaussian_filter(features['acceleration'], sigma=3) # High strain + high acceleration indicates shockwave origin metric = strain * accel threshold = np.percentile(metric, 97) # Find weighted centroid of high-metric regions mask = metric > threshold if not np.any(mask): return w/2, h/2, 0.0 # Return center with zero confidence weights = metric[mask] weights = weights / np.sum(weights) epicenter_x = np.sum(xx[mask] * weights) epicenter_y = np.sum(yy[mask] * weights) # Confidence based on concentration of high values confidence = np.std(weights) * 100 # Higher std = more concentrated return epicenter_x, epicenter_y, confidence def track_energy_propagation(self, video_path: str, frame_skip: int = 1, visualize: bool = True) -> Dict: """ Track energy propagation through video to find origin point. Args: video_path: Path to video file frame_skip: Process every nth frame visualize: Generate visualization outputs Returns: Dictionary with epicenter trajectory and analysis results """ cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"Cannot open video: {video_path}") # Get video properties fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Initialize tracking ret, prev_frame = cap.read() if not ret: raise ValueError("Cannot read first frame") prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY) h, w = prev_gray.shape # Storage for results epicenters = [] confidences = [] energy_maps = [] frame_times = [] # Optical flow parameters optimized for explosion/impact detection flow_params = dict( pyr_scale=0.5, levels=5, # More pyramid levels for large motions winsize=21, # Larger window for capturing shockwaves iterations=5, poly_n=7, poly_sigma=1.5, flags=cv2.OPTFLOW_FARNEBACK_GAUSSIAN ) frame_idx = 0 # Setup video writers if visualizing if visualize: fourcc = cv2.VideoWriter_fourcc(*'mp4v') vis_path = self.output_dir / 'energy_tracking.mp4' out_video = cv2.VideoWriter(str(vis_path), fourcc, fps/frame_skip, (w, h)) while True: # Skip frames for _ in range(frame_skip): ret, frame = cap.read() frame_idx += 1 if not ret: break if not ret: break gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Compute optical flow flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, **flow_params) # Detect epicenter with multiple methods and average methods = ['divergence', 'energy_weighted', 'strain_based'] epicenter_candidates = [] for method in methods: ex, ey, conf = self.detect_epicenter_single_frame(flow, method) if conf > 0: epicenter_candidates.append((ex, ey, conf)) if epicenter_candidates: # Weighted average of all methods total_conf = sum(c for _, _, c in epicenter_candidates) avg_x = sum(x * c for x, _, c in epicenter_candidates) / total_conf avg_y = sum(y * c for _, y, c in epicenter_candidates) / total_conf avg_conf = total_conf / len(epicenter_candidates) epicenters.append((avg_x, avg_y)) confidences.append(avg_conf) else: epicenters.append(None) confidences.append(0) frame_times.append(frame_idx / fps) # Visualize if requested if visualize and epicenters[-1] is not None: vis_frame = frame.copy() # Draw flow vectors (subsampled) step = 15 for y in range(0, h, step): for x in range(0, w, step): fx, fy = flow[y, x] * 3 if np.sqrt(fx**2 + fy**2) > 1: cv2.arrowedLine(vis_frame, (x, y), (int(x + fx), int(y + fy)), (0, 255, 0), 1, tipLength=0.2) # Draw epicenter ex, ey = epicenters[-1] cv2.circle(vis_frame, (int(ex), int(ey)), 15, (0, 0, 255), 3) cv2.putText(vis_frame, f"Conf: {avg_conf:.1f}", (int(ex-30), int(ey-20)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) # Draw energy heatmap overlay features = self.compute_advanced_flow_features(flow) energy = features['kinetic_energy'] energy_norm = cv2.normalize(energy, None, 0, 255, cv2.NORM_MINMAX) energy_color = cv2.applyColorMap(energy_norm.astype(np.uint8), cv2.COLORMAP_JET) vis_frame = cv2.addWeighted(vis_frame, 0.7, energy_color, 0.3, 0) out_video.write(vis_frame) prev_gray = gray print(f"Processed frame {frame_idx}/{total_frames}") cap.release() if visualize: out_video.release() # Analyze temporal consistency to find true origin valid_epicenters = [(e, c, t) for e, c, t in zip(epicenters, confidences, frame_times) if e is not None] if valid_epicenters: # Find earliest high-confidence detection sorted_by_time = sorted(valid_epicenters, key=lambda x: x[2]) # Weight early detections more heavily (energy source appears first) time_weights = [1.0 / (1.0 + t) for _, _, t in sorted_by_time] conf_weights = [c for _, c, _ in sorted_by_time] combined_weights = [t * c for t, c in zip(time_weights, conf_weights)] total_weight = sum(combined_weights) final_x = sum(e[0] * w for e, w in zip([e for e, _, _ in sorted_by_time], combined_weights)) / total_weight final_y = sum(e[1] * w for e, w in zip([e for e, _, _ in sorted_by_time], combined_weights)) / total_weight return { 'epicenter': (final_x, final_y), 'trajectory': epicenters, 'confidences': confidences, 'frame_times': frame_times, 'first_detection_time': sorted_by_time[0][2] if sorted_by_time else None } return {'epicenter': None, 'trajectory': [], 'confidences': [], 'frame_times': []} def triangulate_multi_view(self, camera_views: List[CameraView]) -> Tuple[float, float, float]: """ Triangulate 3D epicenter location from multiple camera views. Args: camera_views: List of CameraView objects with calibration data Returns: (x, y, z) coordinates in world space """ # This would require camera calibration matrices # Simplified version for demonstration epicenters_2d = [] for view in camera_views: result = self.track_energy_propagation(view.video_path, visualize=False) if result['epicenter']: epicenters_2d.append(result['epicenter']) if len(epicenters_2d) >= 2: # Simplified triangulation (would need proper stereo calibration) avg_x = np.mean([e[0] for e in epicenters_2d]) avg_y = np.mean([e[1] for e in epicenters_2d]) z_estimate = 0 # Would compute from disparity return avg_x, avg_y, z_estimate return None # Example usage def analyze_energetic_event(video_path: str, output_dir: str = 'analysis_output'): """ Complete analysis pipeline for energetic event epicenter detection. """ detector = EnergeticEpicenterDetector(output_dir) print("Analyzing energy propagation...") results = detector.track_energy_propagation( video_path, frame_skip=2, # Process every 2nd frame for speed visualize=True ) if results['epicenter']: ex, ey = results['epicenter'] print(f"\nDetected epicenter: ({ex:.1f}, {ey:.1f})") print(f"First detection at: {results['first_detection_time']:.2f}s") # Plot confidence over time plt.figure(figsize=(10, 6)) plt.plot(results['frame_times'], results['confidences']) plt.xlabel('Time (s)') plt.ylabel('Detection Confidence') plt.title('Epicenter Detection Confidence Over Time') plt.grid(True) plt.savefig(f"{output_dir}/confidence_plot.png") plt.show() # Plot epicenter trajectory valid_points = [e for e in results['trajectory'] if e is not None] if valid_points: xs = [e[0] for e in valid_points] ys = [e[1] for e in valid_points] plt.figure(figsize=(8, 8)) plt.scatter(xs, ys, c=range(len(xs)), cmap='viridis', s=50) plt.plot(xs, ys, 'r-', alpha=0.3) plt.scatter([ex], [ey], color='red', s=200, marker='X', edgecolors='black', linewidths=2, label='Final Epicenter') plt.xlabel('X Position (pixels)') plt.ylabel('Y Position (pixels)') plt.title('Epicenter Position Over Time') plt.legend() plt.grid(True) plt.gca().invert_yaxis() # Match image coordinates plt.savefig(f"{output_dir}/trajectory_plot.png") plt.show() else: print("No epicenter detected") return results # For multi-camera setup def analyze_multi_view_event(video_paths: List[str], output_dir: str = 'multi_view_analysis'): """ Analyze event from multiple synchronized camera angles. """ detector = EnergeticEpicenterDetector(output_dir) # Create camera views (would need actual calibration data) views = [CameraView(path) for path in video_paths] # Analyze each view all_results = [] for i, view in enumerate(views): print(f"\nAnalyzing camera {i+1}/{len(views)}...") result = detector.track_energy_propagation(view.video_path, visualize=True) all_results.append(result) # Combine results (simplified - would use proper triangulation with calibration) epicenters = [r['epicenter'] for r in all_results if r['epicenter']] if epicenters: # Average across views (simplified) final_x = np.mean([e[0] for e in epicenters]) final_y = np.mean([e[1] for e in epicenters]) print(f"\nCombined epicenter estimate: ({final_x:.1f}, {final_y:.1f})") # Confidence from agreement between views std_x = np.std([e[0] for e in epicenters]) std_y = np.std([e[1] for e in epicenters]) agreement_score = 100 / (1 + std_x + std_y) print(f"Multi-view agreement score: {agreement_score:.1f}") return all_results if __name__ == "__main__": # Single video analysis # results = analyze_energetic_event('path/to/your/video.mp4') # Multi-view analysis # videos = ['camera1.mp4', 'camera2.mp4', 'camera3.mp4'] # multi_results = analyze_multi_view_event(videos) pass
English
304
554
2.9K
1.5M