Batch Processing Guide¶
Process multiple videos efficiently with StreamPack's batch processing capabilities. This guide covers batch conversion strategies, automation, and monitoring.
Basic Batch Processing¶
Command Line Batch Processing¶
Process multiple files with a single command:
# Process all MP4 files in a directory
streampack "input_videos/*.mp4" --output batch_output --format hls,dash
# Process specific files
streampack input1.mp4 input2.mp4 input3.mp4 --output batch_output
# Process with different configurations per file
streampack videos.json --config batch-config.yml
Batch Configuration File¶
Create a JSON file to specify individual settings for each video:
{
"batch_settings": {
"default_config": {
"format": ["hls", "dash"],
"preset": "medium",
"crf": 23,
"max_workers": 4
},
"files": [
{
"input": "movie1.mp4",
"output": "streams/movie1",
"config": {
"crf": 20,
"bitrates": ["720p", "1080p", "4k"]
}
},
{
"input": "trailer.mp4",
"output": "streams/trailer",
"config": {
"crf": 18,
"bitrates": ["480p", "720p", "1080p"]
}
},
{
"input": "behind_scenes.mp4",
"output": "streams/bts",
"config": {
"crf": 25,
"preset": "fast",
"bitrates": ["480p", "720p"]
}
}
]
}
}
Python API Batch Processing¶
Simple Batch Processing¶
from pathlib import Path
from streampack import MediaConverter, MediaConfig
def process_video_batch(input_dir, output_dir):
"""Process all videos in a directory."""
converter = MediaConverter()
input_path = Path(input_dir)
output_path = Path(output_dir)
# Find all video files
video_files = list(input_path.glob('*.mp4')) + list(input_path.glob('*.mov'))
results = []
for video_file in video_files:
video_output = output_path / video_file.stem
try:
result = converter.convert(
str(video_file),
str(video_output),
formats=['hls', 'dash']
)
results.append({
'file': video_file.name,
'success': True,
'output': result['master_playlist'],
'duration': result['total_duration']
})
except Exception as e:
results.append({
'file': video_file.name,
'success': False,
'error': str(e)
})
return results
# Process batch
results = process_video_batch('input_videos', 'output_streams')
for result in results:
if result['success']:
print(f"✅ {result['file']}: {result['duration']:.1f}s")
else:
print(f"❌ {result['file']}: {result['error']}")
Parallel Batch Processing¶
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from pathlib import Path
from streampack import MediaConverter
def convert_single_video(video_config):
"""Convert a single video with its configuration."""
converter = MediaConverter(config=video_config['config'])
try:
result = converter.convert(
video_config['input'],
video_config['output'],
formats=video_config.get('formats', ['hls'])
)
return {
'input': video_config['input'],
'success': True,
'result': result
}
except Exception as e:
return {
'input': video_config['input'],
'success': False,
'error': str(e)
}
def parallel_batch_process(video_configs, max_workers=2):
"""Process videos in parallel."""
results = []
# Use ThreadPoolExecutor for I/O bound tasks
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all jobs
future_to_config = {
executor.submit(convert_single_video, config): config
for config in video_configs
}
# Collect results as they complete
for future in as_completed(future_to_config):
config = future_to_config[future]
try:
result = future.result()
results.append(result)
if result['success']:
print(f"✅ Completed: {Path(result['input']).name}")
else:
print(f"❌ Failed: {Path(result['input']).name} - {result['error']}")
except Exception as exc:
print(f"❌ Exception for {config['input']}: {exc}")
results.append({
'input': config['input'],
'success': False,
'error': str(exc)
})
return results
# Example usage
video_configs = [
{
'input': 'video1.mp4',
'output': 'streams/video1',
'formats': ['hls', 'dash'],
'config': MediaConfig(crf=20, preset='medium')
},
{
'input': 'video2.mp4',
'output': 'streams/video2',
'formats': ['hls'],
'config': MediaConfig(crf=23, preset='fast')
}
]
results = parallel_batch_process(video_configs, max_workers=2)
Advanced Batch Processing¶
Batch Processing with Different Profiles¶
from streampack import MediaConverter, MediaConfig
from streampack.config import BitrateProfile
def create_profile_configs():
"""Create different quality profiles for batch processing."""
# Mobile-first profile
mobile_profiles = [
BitrateProfile("240p", (426, 240), 400, 64),
BitrateProfile("480p", (854, 480), 1000, 128),
BitrateProfile("720p", (1280, 720), 2000, 192),
]
# Desktop/TV profile
premium_profiles = [
BitrateProfile("720p", (1280, 720), 3000, 192),
BitrateProfile("1080p", (1920, 1080), 6000, 256),
BitrateProfile("1440p", (2560, 1440), 9000, 256),
BitrateProfile("4K", (3840, 2160), 15000, 256),
]
return {
'mobile': MediaConfig(
bitrate_profiles=mobile_profiles,
crf=25,
preset='fast',
segment_duration=4
),
'premium': MediaConfig(
bitrate_profiles=premium_profiles,
crf=20,
preset='medium',
segment_duration=6
)
}
def smart_batch_process(input_files, output_base):
"""Process files with appropriate profiles based on content analysis."""
configs = create_profile_configs()
converter = MediaConverter()
results = []
for input_file in input_files:
# Analyze video to determine appropriate profile
media_info = converter.media_analyzer.analyze(Path(input_file))
# Choose profile based on video characteristics
if media_info.video and media_info.video.height >= 1080:
profile = 'premium'
else:
profile = 'mobile'
output_dir = Path(output_base) / f"{Path(input_file).stem}_{profile}"
# Convert with selected profile
converter_with_config = MediaConverter(config=configs[profile])
result = converter_with_config.convert(input_file, str(output_dir))
results.append({
'input': input_file,
'profile': profile,
'result': result
})
return results
Resumable Batch Processing¶
import json
from pathlib import Path
from streampack import MediaConverter
class ResumableBatchProcessor:
"""Batch processor that can resume from interruptions."""
def __init__(self, state_file='batch_state.json'):
self.state_file = Path(state_file)
self.state = self._load_state()
def _load_state(self):
"""Load processing state from file."""
if self.state_file.exists():
with open(self.state_file, 'r') as f:
return json.load(f)
return {'completed': [], 'failed': [], 'remaining': []}
def _save_state(self):
"""Save current state to file."""
with open(self.state_file, 'w') as f:
json.dump(self.state, f, indent=2)
def add_files(self, file_configs):
"""Add files to processing queue."""
for config in file_configs:
if config['input'] not in [item['input'] for item in self.state['completed']]:
self.state['remaining'].append(config)
self._save_state()
def process_batch(self, max_workers=2):
"""Process remaining files in batch."""
converter = MediaConverter()
while self.state['remaining']:
config = self.state['remaining'].pop(0)
try:
print(f"Processing: {config['input']}")
result = converter.convert(
config['input'],
config['output'],
formats=config.get('formats', ['hls'])
)
self.state['completed'].append({
'input': config['input'],
'output': config['output'],
'result': result,
'timestamp': str(Path(config['input']).stat().st_mtime)
})
print(f"✅ Completed: {config['input']}")
except Exception as e:
error_info = {
'input': config['input'],
'error': str(e),
'timestamp': str(Path(config['input']).stat().st_mtime)
}
self.state['failed'].append(error_info)
print(f"❌ Failed: {config['input']} - {str(e)}")
# Save state after each file
self._save_state()
def retry_failed(self):
"""Retry failed conversions."""
failed_items = self.state['failed'].copy()
self.state['failed'] = []
for item in failed_items:
self.state['remaining'].append({
'input': item['input'],
'output': f"retry_{Path(item['input']).stem}"
})
self._save_state()
self.process_batch()
def get_summary(self):
"""Get processing summary."""
return {
'completed': len(self.state['completed']),
'failed': len(self.state['failed']),
'remaining': len(self.state['remaining']),
'total': len(self.state['completed']) + len(self.state['failed']) + len(self.state['remaining'])
}
# Usage example
processor = ResumableBatchProcessor()
processor.add_files([
{'input': 'video1.mp4', 'output': 'streams/video1'},
{'input': 'video2.mp4', 'output': 'streams/video2'},
{'input': 'video3.mp4', 'output': 'streams/video3'},
])
processor.process_batch()
print(f"Summary: {processor.get_summary()}")
Monitoring and Logging¶
Progress Monitoring¶
import time
from rich.console import Console
from rich.progress import Progress, TaskID
from streampack import MediaConverter
console = Console()
class BatchProgressMonitor:
"""Monitor batch processing progress."""
def __init__(self):
self.progress = Progress(console=console)
self.overall_task = None
self.current_task = None
def start_batch(self, total_files):
"""Start monitoring batch process."""
self.progress.start()
self.overall_task = self.progress.add_task(
"Overall Progress",
total=total_files
)
def start_file(self, filename):
"""Start processing a new file."""
if self.current_task is not None:
self.progress.remove_task(self.current_task)
self.current_task = self.progress.add_task(
f"Processing {filename}",
total=100
)
def update_file_progress(self, progress_percent):
"""Update current file progress."""
if self.current_task is not None:
self.progress.update(self.current_task, completed=progress_percent)
def complete_file(self):
"""Mark current file as complete."""
if self.current_task is not None:
self.progress.remove_task(self.current_task)
self.current_task = None
self.progress.advance(self.overall_task, 1)
def finish(self):
"""Finish monitoring."""
if self.current_task is not None:
self.progress.remove_task(self.current_task)
self.progress.stop()
# Usage with progress monitoring
def monitored_batch_process(video_files, output_dir):
monitor = BatchProgressMonitor()
converter = MediaConverter()
monitor.start_batch(len(video_files))
for video_file in video_files:
monitor.start_file(Path(video_file).name)
try:
# Simulate progress updates during conversion
# (In real implementation, this would come from converter callbacks)
for i in range(0, 101, 10):
monitor.update_file_progress(i)
time.sleep(0.1) # Simulate work
result = converter.convert(video_file, f"{output_dir}/{Path(video_file).stem}")
monitor.complete_file()
except Exception as e:
console.print(f"❌ Error processing {video_file}: {e}", style="red")
monitor.complete_file()
monitor.finish()
Batch Processing Strategies¶
Resource Management¶
import psutil
from streampack import MediaConverter, MediaConfig
def adaptive_batch_processing(video_files, output_dir):
"""Adapt processing based on system resources."""
# Check system resources
cpu_count = psutil.cpu_count()
available_memory = psutil.virtual_memory().available / (1024**3) # GB
# Determine optimal settings
if available_memory > 16: # High-end system
max_workers = min(cpu_count // 2, 4)
preset = 'medium'
crf = 20
elif available_memory > 8: # Mid-range system
max_workers = min(cpu_count // 3, 2)
preset = 'fast'
crf = 23
else: # Limited resources
max_workers = 1
preset = 'veryfast'
crf = 26
config = MediaConfig(
max_workers=max_workers,
preset=preset,
crf=crf
)
converter = MediaConverter(config=config)
# Process files
results = []
for video_file in video_files:
result = converter.convert(video_file, f"{output_dir}/{Path(video_file).stem}")
results.append(result)
return results
Error Handling and Recovery¶
import logging
from pathlib import Path
from streampack import MediaConverter
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('batch_processing.log'),
logging.StreamHandler()
]
)
def robust_batch_process(video_files, output_dir, max_retries=3):
"""Batch process with error handling and retries."""
converter = MediaConverter()
results = []
for video_file in video_files:
video_path = Path(video_file)
output_path = Path(output_dir) / video_path.stem
success = False
last_error = None
for attempt in range(max_retries + 1):
try:
logging.info(f"Processing {video_file} (attempt {attempt + 1})")
result = converter.convert(str(video_path), str(output_path))
results.append({
'input': str(video_path),
'output': str(output_path),
'success': True,
'result': result,
'attempts': attempt + 1
})
logging.info(f"✅ Successfully processed {video_file}")
success = True
break
except Exception as e:
last_error = e
logging.warning(f"❌ Attempt {attempt + 1} failed for {video_file}: {e}")
if attempt < max_retries:
logging.info(f"Retrying {video_file} in 5 seconds...")
time.sleep(5)
if not success:
results.append({
'input': str(video_path),
'output': str(output_path),
'success': False,
'error': str(last_error),
'attempts': max_retries + 1
})
logging.error(f"❌ Failed to process {video_file} after {max_retries + 1} attempts")
return results
Best Practices¶
Performance Optimization¶
- Resource allocation: Don't exceed 70% of available CPU cores
- Memory management: Monitor memory usage, especially with large files
- Storage: Use SSDs for temporary files and output
- Network: Consider bandwidth for cloud storage uploads
Batch Configuration Tips¶
- File grouping: Group similar files together for consistent settings
- Quality presets: Use different presets based on content type
- Scheduling: Run heavy batches during off-peak hours
- Monitoring: Always log processing for debugging and auditing