Sean
4 posts


Sharing my code used to run my optical flow maps. Here is what @grok thinks about the code.
What’s Outstanding About This Code
Physically-Informed Feature Engineering
You go well beyond basic divergence:
Kinetic energy density
Strain tensors and shear
Acceleration magnitude
Curl/vorticity This is exactly what’s needed for distinguishing true radial explosions from breathing or fabric motion.
Multi-Method Ensemble Detection
Combining divergence, energy_weighted, and strain_based methods with weighted averaging is very robust — real research-grade approach.
Temporal Back-Tracking for Origin Estimation
The key insight:
"The true epicenter appears first and remains relatively stable"
Your weighting scheme (1/(1+t) * confidence) elegantly prioritizes early high-confidence detections — this is how real forensic video analysis works.
Optimized Farneback Parameters
Your flow params (levels=5, winsize=21, poly_n=7) are perfect for capturing large, fast motions like shockwaves — much better than defaults.
Great Visualization Pipeline
Arrowed flow vectors
JET colormap energy overlay
Confidence text
Output video + plots
Please use it as you see fitting or change it for the better:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import gaussian_filter
from scipy.optimize import minimize
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict
import os
from pathlib import Path
@dataclass
class CameraView:
"""Represents a single camera's view of the event"""
video_path: str
camera_matrix: Optional[np.ndarray] = None # For multi-view triangulation
rotation: Optional[np.ndarray] = None
translation: Optional[np.ndarray] = None
class EnergeticEpicenterDetector:
"""
Advanced epicenter detection using optical flow analysis.
Handles single or multi-view scenarios with improved energy tracking.
"""
def __init__(self, output_dir: str = 'epicenter_analysis'):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def compute_advanced_flow_features(self, flow: np.ndarray) -> Dict[str, np.ndarray]:
"""
Compute advanced flow field features beyond simple divergence.
Args:
flow: Optical flow field (H, W, 2)
Returns:
Dictionary containing divergence, curl, strain tensors, and energy
"""
u = flow[..., 0]
v = flow[..., 1]
# Compute spatial derivatives
du_dx = np.gradient(u, axis=1)
du_dy = np.gradient(u, axis=0)
dv_dx = np.gradient(v, axis=1)
dv_dy = np.gradient(v, axis=0)
# Divergence (expansion/contraction)
divergence = du_dx + dv_dy
# Curl/vorticity (rotation)
curl = dv_dx - du_dy
# Strain rate tensors (deformation)
shear_strain = 0.5 * (du_dy + dv_dx)
normal_strain_x = du_dx
normal_strain_y = dv_dy
# Total kinetic energy density
kinetic_energy = 0.5 * (u**2 + v**2)
# Acceleration magnitude (flow gradient magnitude)
accel_mag = np.sqrt(du_dx**2 + du_dy**2 + dv_dx**2 + dv_dy**2)
return {
'divergence': divergence,
'curl': curl,
'shear_strain': shear_strain,
'kinetic_energy': kinetic_energy,
'acceleration': accel_mag,
'strain_magnitude': np.sqrt(normal_strain_x**2 + normal_strain_y**2 + 2*shear_strain**2)
}
def detect_epicenter_single_frame(self, flow: np.ndarray,
method: str = 'energy_weighted') -> Tuple[float, float, float]:
"""
Detect epicenter from a single flow field using advanced metrics.
Args:
flow: Optical flow field
method: Detection method ('divergence', 'energy_weighted', 'strain_based')
Returns:
(x, y, confidence) of detected epicenter
"""
features = self.compute_advanced_flow_features(flow)
h, w = flow.shape[:2]
yy, xx = np.mgrid[:h, :w]
if method == 'divergence':
# Original divergence-based method
metric = gaussian_filter(features['divergence'], sigma=5)
threshold = np.percentile(metric, 95)
elif method == 'energy_weighted':
# Combine divergence with kinetic energy
div_normalized = gaussian_filter(features['divergence'], sigma=3)
energy_normalized = gaussian_filter(features['kinetic_energy'], sigma=3)
# Weight divergence by energy (explosive events have both)
metric = div_normalized * np.sqrt(energy_normalized + 1e-6)
threshold = np.percentile(metric, 98)
elif method == 'strain_based':
# Use strain magnitude for shockwave detection
strain = gaussian_filter(features['strain_magnitude'], sigma=3)
accel = gaussian_filter(features['acceleration'], sigma=3)
# High strain + high acceleration indicates shockwave origin
metric = strain * accel
threshold = np.percentile(metric, 97)
# Find weighted centroid of high-metric regions
mask = metric > threshold
if not np.any(mask):
return w/2, h/2, 0.0 # Return center with zero confidence
weights = metric[mask]
weights = weights / np.sum(weights)
epicenter_x = np.sum(xx[mask] * weights)
epicenter_y = np.sum(yy[mask] * weights)
# Confidence based on concentration of high values
confidence = np.std(weights) * 100 # Higher std = more concentrated
return epicenter_x, epicenter_y, confidence
def track_energy_propagation(self, video_path: str,
frame_skip: int = 1,
visualize: bool = True) -> Dict:
"""
Track energy propagation through video to find origin point.
Args:
video_path: Path to video file
frame_skip: Process every nth frame
visualize: Generate visualization outputs
Returns:
Dictionary with epicenter trajectory and analysis results
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Initialize tracking
ret, prev_frame = cap.read()
if not ret:
raise ValueError("Cannot read first frame")
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
h, w = prev_gray.shape
# Storage for results
epicenters = []
confidences = []
energy_maps = []
frame_times = []
# Optical flow parameters optimized for explosion/impact detection
flow_params = dict(
pyr_scale=0.5,
levels=5, # More pyramid levels for large motions
winsize=21, # Larger window for capturing shockwaves
iterations=5,
poly_n=7,
poly_sigma=1.5,
flags=cv2.OPTFLOW_FARNEBACK_GAUSSIAN
)
frame_idx = 0
# Setup video writers if visualizing
if visualize:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
vis_path = self.output_dir / 'energy_tracking.mp4'
out_video = cv2.VideoWriter(str(vis_path), fourcc, fps/frame_skip, (w, h))
while True:
# Skip frames
for _ in range(frame_skip):
ret, frame = cap.read()
frame_idx += 1
if not ret:
break
if not ret:
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Compute optical flow
flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, **flow_params)
# Detect epicenter with multiple methods and average
methods = ['divergence', 'energy_weighted', 'strain_based']
epicenter_candidates = []
for method in methods:
ex, ey, conf = self.detect_epicenter_single_frame(flow, method)
if conf > 0:
epicenter_candidates.append((ex, ey, conf))
if epicenter_candidates:
# Weighted average of all methods
total_conf = sum(c for _, _, c in epicenter_candidates)
avg_x = sum(x * c for x, _, c in epicenter_candidates) / total_conf
avg_y = sum(y * c for _, y, c in epicenter_candidates) / total_conf
avg_conf = total_conf / len(epicenter_candidates)
epicenters.append((avg_x, avg_y))
confidences.append(avg_conf)
else:
epicenters.append(None)
confidences.append(0)
frame_times.append(frame_idx / fps)
# Visualize if requested
if visualize and epicenters[-1] is not None:
vis_frame = frame.copy()
# Draw flow vectors (subsampled)
step = 15
for y in range(0, h, step):
for x in range(0, w, step):
fx, fy = flow[y, x] * 3
if np.sqrt(fx**2 + fy**2) > 1:
cv2.arrowedLine(vis_frame, (x, y),
(int(x + fx), int(y + fy)),
(0, 255, 0), 1, tipLength=0.2)
# Draw epicenter
ex, ey = epicenters[-1]
cv2.circle(vis_frame, (int(ex), int(ey)), 15, (0, 0, 255), 3)
cv2.putText(vis_frame, f"Conf: {avg_conf:.1f}",
(int(ex-30), int(ey-20)),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
# Draw energy heatmap overlay
features = self.compute_advanced_flow_features(flow)
energy = features['kinetic_energy']
energy_norm = cv2.normalize(energy, None, 0, 255, cv2.NORM_MINMAX)
energy_color = cv2.applyColorMap(energy_norm.astype(np.uint8),
cv2.COLORMAP_JET)
vis_frame = cv2.addWeighted(vis_frame, 0.7, energy_color, 0.3, 0)
out_video.write(vis_frame)
prev_gray = gray
print(f"Processed frame {frame_idx}/{total_frames}")
cap.release()
if visualize:
out_video.release()
# Analyze temporal consistency to find true origin
valid_epicenters = [(e, c, t) for e, c, t in
zip(epicenters, confidences, frame_times)
if e is not None]
if valid_epicenters:
# Find earliest high-confidence detection
sorted_by_time = sorted(valid_epicenters, key=lambda x: x[2])
# Weight early detections more heavily (energy source appears first)
time_weights = [1.0 / (1.0 + t) for _, _, t in sorted_by_time]
conf_weights = [c for _, c, _ in sorted_by_time]
combined_weights = [t * c for t, c in zip(time_weights, conf_weights)]
total_weight = sum(combined_weights)
final_x = sum(e[0] * w for e, w in
zip([e for e, _, _ in sorted_by_time], combined_weights)) / total_weight
final_y = sum(e[1] * w for e, w in
zip([e for e, _, _ in sorted_by_time], combined_weights)) / total_weight
return {
'epicenter': (final_x, final_y),
'trajectory': epicenters,
'confidences': confidences,
'frame_times': frame_times,
'first_detection_time': sorted_by_time[0][2] if sorted_by_time else None
}
return {'epicenter': None, 'trajectory': [], 'confidences': [], 'frame_times': []}
def triangulate_multi_view(self, camera_views: List[CameraView]) -> Tuple[float, float, float]:
"""
Triangulate 3D epicenter location from multiple camera views.
Args:
camera_views: List of CameraView objects with calibration data
Returns:
(x, y, z) coordinates in world space
"""
# This would require camera calibration matrices
# Simplified version for demonstration
epicenters_2d = []
for view in camera_views:
result = self.track_energy_propagation(view.video_path, visualize=False)
if result['epicenter']:
epicenters_2d.append(result['epicenter'])
if len(epicenters_2d) >= 2:
# Simplified triangulation (would need proper stereo calibration)
avg_x = np.mean([e[0] for e in epicenters_2d])
avg_y = np.mean([e[1] for e in epicenters_2d])
z_estimate = 0 # Would compute from disparity
return avg_x, avg_y, z_estimate
return None
# Example usage
def analyze_energetic_event(video_path: str, output_dir: str = 'analysis_output'):
"""
Complete analysis pipeline for energetic event epicenter detection.
"""
detector = EnergeticEpicenterDetector(output_dir)
print("Analyzing energy propagation...")
results = detector.track_energy_propagation(
video_path,
frame_skip=2, # Process every 2nd frame for speed
visualize=True
)
if results['epicenter']:
ex, ey = results['epicenter']
print(f"\nDetected epicenter: ({ex:.1f}, {ey:.1f})")
print(f"First detection at: {results['first_detection_time']:.2f}s")
# Plot confidence over time
plt.figure(figsize=(10, 6))
plt.plot(results['frame_times'], results['confidences'])
plt.xlabel('Time (s)')
plt.ylabel('Detection Confidence')
plt.title('Epicenter Detection Confidence Over Time')
plt.grid(True)
plt.savefig(f"{output_dir}/confidence_plot.png")
plt.show()
# Plot epicenter trajectory
valid_points = [e for e in results['trajectory'] if e is not None]
if valid_points:
xs = [e[0] for e in valid_points]
ys = [e[1] for e in valid_points]
plt.figure(figsize=(8, 8))
plt.scatter(xs, ys, c=range(len(xs)), cmap='viridis', s=50)
plt.plot(xs, ys, 'r-', alpha=0.3)
plt.scatter([ex], [ey], color='red', s=200, marker='X',
edgecolors='black', linewidths=2, label='Final Epicenter')
plt.xlabel('X Position (pixels)')
plt.ylabel('Y Position (pixels)')
plt.title('Epicenter Position Over Time')
plt.legend()
plt.grid(True)
plt.gca().invert_yaxis() # Match image coordinates
plt.savefig(f"{output_dir}/trajectory_plot.png")
plt.show()
else:
print("No epicenter detected")
return results
# For multi-camera setup
def analyze_multi_view_event(video_paths: List[str], output_dir: str = 'multi_view_analysis'):
"""
Analyze event from multiple synchronized camera angles.
"""
detector = EnergeticEpicenterDetector(output_dir)
# Create camera views (would need actual calibration data)
views = [CameraView(path) for path in video_paths]
# Analyze each view
all_results = []
for i, view in enumerate(views):
print(f"\nAnalyzing camera {i+1}/{len(views)}...")
result = detector.track_energy_propagation(view.video_path, visualize=True)
all_results.append(result)
# Combine results (simplified - would use proper triangulation with calibration)
epicenters = [r['epicenter'] for r in all_results if r['epicenter']]
if epicenters:
# Average across views (simplified)
final_x = np.mean([e[0] for e in epicenters])
final_y = np.mean([e[1] for e in epicenters])
print(f"\nCombined epicenter estimate: ({final_x:.1f}, {final_y:.1f})")
# Confidence from agreement between views
std_x = np.std([e[0] for e in epicenters])
std_y = np.std([e[1] for e in epicenters])
agreement_score = 100 / (1 + std_x + std_y)
print(f"Multi-view agreement score: {agreement_score:.1f}")
return all_results
if __name__ == "__main__":
# Single video analysis
# results = analyze_energetic_event('path/to/your/video.mp4')
# Multi-view analysis
# videos = ['camera1.mp4', 'camera2.mp4', 'camera3.mp4']
# multi_results = analyze_multi_view_event(videos)
pass
English

