Skip to content

Batch Processing Guide

Process multiple videos efficiently with StreamPack's batch processing capabilities. This guide covers batch conversion strategies, automation, and monitoring.

Basic Batch Processing

Command Line Batch Processing

Process multiple files with a single command:

# Process all MP4 files in a directory
streampack "input_videos/*.mp4" --output batch_output --format hls,dash

# Process specific files
streampack input1.mp4 input2.mp4 input3.mp4 --output batch_output

# Process with different configurations per file
streampack videos.json --config batch-config.yml

Batch Configuration File

Create a JSON file to specify individual settings for each video:

{
  "batch_settings": {
    "default_config": {
      "format": ["hls", "dash"],
      "preset": "medium",
      "crf": 23,
      "max_workers": 4
    },
    "files": [
      {
        "input": "movie1.mp4",
        "output": "streams/movie1",
        "config": {
          "crf": 20,
          "bitrates": ["720p", "1080p", "4k"]
        }
      },
      {
        "input": "trailer.mp4", 
        "output": "streams/trailer",
        "config": {
          "crf": 18,
          "bitrates": ["480p", "720p", "1080p"]
        }
      },
      {
        "input": "behind_scenes.mp4",
        "output": "streams/bts",
        "config": {
          "crf": 25,
          "preset": "fast",
          "bitrates": ["480p", "720p"]
        }
      }
    ]
  }
}

Python API Batch Processing

Simple Batch Processing

from pathlib import Path
from streampack import MediaConverter, MediaConfig

def process_video_batch(input_dir, output_dir):
    """Process all videos in a directory."""

    converter = MediaConverter()
    input_path = Path(input_dir)
    output_path = Path(output_dir)

    # Find all video files
    video_files = list(input_path.glob('*.mp4')) + list(input_path.glob('*.mov'))

    results = []
    for video_file in video_files:
        video_output = output_path / video_file.stem

        try:
            result = converter.convert(
                str(video_file), 
                str(video_output),
                formats=['hls', 'dash']
            )
            results.append({
                'file': video_file.name,
                'success': True,
                'output': result['master_playlist'],
                'duration': result['total_duration']
            })
        except Exception as e:
            results.append({
                'file': video_file.name,
                'success': False,
                'error': str(e)
            })

    return results

# Process batch
results = process_video_batch('input_videos', 'output_streams')
for result in results:
    if result['success']:
        print(f"✅ {result['file']}: {result['duration']:.1f}s")
    else:
        print(f"❌ {result['file']}: {result['error']}")

Parallel Batch Processing

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from pathlib import Path
from streampack import MediaConverter

def convert_single_video(video_config):
    """Convert a single video with its configuration."""
    converter = MediaConverter(config=video_config['config'])

    try:
        result = converter.convert(
            video_config['input'],
            video_config['output'],
            formats=video_config.get('formats', ['hls'])
        )
        return {
            'input': video_config['input'],
            'success': True,
            'result': result
        }
    except Exception as e:
        return {
            'input': video_config['input'],
            'success': False,
            'error': str(e)
        }

def parallel_batch_process(video_configs, max_workers=2):
    """Process videos in parallel."""

    results = []

    # Use ThreadPoolExecutor for I/O bound tasks
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all jobs
        future_to_config = {
            executor.submit(convert_single_video, config): config 
            for config in video_configs
        }

        # Collect results as they complete
        for future in as_completed(future_to_config):
            config = future_to_config[future]
            try:
                result = future.result()
                results.append(result)

                if result['success']:
                    print(f"✅ Completed: {Path(result['input']).name}")
                else:
                    print(f"❌ Failed: {Path(result['input']).name} - {result['error']}")

            except Exception as exc:
                print(f"❌ Exception for {config['input']}: {exc}")
                results.append({
                    'input': config['input'],
                    'success': False,
                    'error': str(exc)
                })

    return results

# Example usage
video_configs = [
    {
        'input': 'video1.mp4',
        'output': 'streams/video1',
        'formats': ['hls', 'dash'],
        'config': MediaConfig(crf=20, preset='medium')
    },
    {
        'input': 'video2.mp4',
        'output': 'streams/video2',
        'formats': ['hls'],
        'config': MediaConfig(crf=23, preset='fast')
    }
]

results = parallel_batch_process(video_configs, max_workers=2)

Advanced Batch Processing

Batch Processing with Different Profiles

from streampack import MediaConverter, MediaConfig
from streampack.config import BitrateProfile

def create_profile_configs():
    """Create different quality profiles for batch processing."""

    # Mobile-first profile
    mobile_profiles = [
        BitrateProfile("240p", (426, 240), 400, 64),
        BitrateProfile("480p", (854, 480), 1000, 128),
        BitrateProfile("720p", (1280, 720), 2000, 192),
    ]

    # Desktop/TV profile  
    premium_profiles = [
        BitrateProfile("720p", (1280, 720), 3000, 192),
        BitrateProfile("1080p", (1920, 1080), 6000, 256),
        BitrateProfile("1440p", (2560, 1440), 9000, 256),
        BitrateProfile("4K", (3840, 2160), 15000, 256),
    ]

    return {
        'mobile': MediaConfig(
            bitrate_profiles=mobile_profiles,
            crf=25,
            preset='fast',
            segment_duration=4
        ),
        'premium': MediaConfig(
            bitrate_profiles=premium_profiles,
            crf=20,
            preset='medium',
            segment_duration=6
        )
    }

def smart_batch_process(input_files, output_base):
    """Process files with appropriate profiles based on content analysis."""

    configs = create_profile_configs()
    converter = MediaConverter()

    results = []

    for input_file in input_files:
        # Analyze video to determine appropriate profile
        media_info = converter.media_analyzer.analyze(Path(input_file))

        # Choose profile based on video characteristics
        if media_info.video and media_info.video.height >= 1080:
            profile = 'premium'
        else:
            profile = 'mobile'

        output_dir = Path(output_base) / f"{Path(input_file).stem}_{profile}"

        # Convert with selected profile
        converter_with_config = MediaConverter(config=configs[profile])
        result = converter_with_config.convert(input_file, str(output_dir))

        results.append({
            'input': input_file,
            'profile': profile,
            'result': result
        })

    return results

Resumable Batch Processing

import json
from pathlib import Path
from streampack import MediaConverter

class ResumableBatchProcessor:
    """Batch processor that can resume from interruptions."""

    def __init__(self, state_file='batch_state.json'):
        self.state_file = Path(state_file)
        self.state = self._load_state()

    def _load_state(self):
        """Load processing state from file."""
        if self.state_file.exists():
            with open(self.state_file, 'r') as f:
                return json.load(f)
        return {'completed': [], 'failed': [], 'remaining': []}

    def _save_state(self):
        """Save current state to file."""
        with open(self.state_file, 'w') as f:
            json.dump(self.state, f, indent=2)

    def add_files(self, file_configs):
        """Add files to processing queue."""
        for config in file_configs:
            if config['input'] not in [item['input'] for item in self.state['completed']]:
                self.state['remaining'].append(config)
        self._save_state()

    def process_batch(self, max_workers=2):
        """Process remaining files in batch."""
        converter = MediaConverter()

        while self.state['remaining']:
            config = self.state['remaining'].pop(0)

            try:
                print(f"Processing: {config['input']}")
                result = converter.convert(
                    config['input'],
                    config['output'],
                    formats=config.get('formats', ['hls'])
                )

                self.state['completed'].append({
                    'input': config['input'],
                    'output': config['output'],
                    'result': result,
                    'timestamp': str(Path(config['input']).stat().st_mtime)
                })

                print(f"✅ Completed: {config['input']}")

            except Exception as e:
                error_info = {
                    'input': config['input'],
                    'error': str(e),
                    'timestamp': str(Path(config['input']).stat().st_mtime)
                }
                self.state['failed'].append(error_info)
                print(f"❌ Failed: {config['input']} - {str(e)}")

            # Save state after each file
            self._save_state()

    def retry_failed(self):
        """Retry failed conversions."""
        failed_items = self.state['failed'].copy()
        self.state['failed'] = []

        for item in failed_items:
            self.state['remaining'].append({
                'input': item['input'],
                'output': f"retry_{Path(item['input']).stem}"
            })

        self._save_state()
        self.process_batch()

    def get_summary(self):
        """Get processing summary."""
        return {
            'completed': len(self.state['completed']),
            'failed': len(self.state['failed']),
            'remaining': len(self.state['remaining']),
            'total': len(self.state['completed']) + len(self.state['failed']) + len(self.state['remaining'])
        }

# Usage example
processor = ResumableBatchProcessor()
processor.add_files([
    {'input': 'video1.mp4', 'output': 'streams/video1'},
    {'input': 'video2.mp4', 'output': 'streams/video2'},
    {'input': 'video3.mp4', 'output': 'streams/video3'},
])

processor.process_batch()
print(f"Summary: {processor.get_summary()}")

Monitoring and Logging

Progress Monitoring

import time
from rich.console import Console
from rich.progress import Progress, TaskID
from streampack import MediaConverter

console = Console()

class BatchProgressMonitor:
    """Monitor batch processing progress."""

    def __init__(self):
        self.progress = Progress(console=console)
        self.overall_task = None
        self.current_task = None

    def start_batch(self, total_files):
        """Start monitoring batch process."""
        self.progress.start()
        self.overall_task = self.progress.add_task(
            "Overall Progress", 
            total=total_files
        )

    def start_file(self, filename):
        """Start processing a new file."""
        if self.current_task is not None:
            self.progress.remove_task(self.current_task)

        self.current_task = self.progress.add_task(
            f"Processing {filename}",
            total=100
        )

    def update_file_progress(self, progress_percent):
        """Update current file progress."""
        if self.current_task is not None:
            self.progress.update(self.current_task, completed=progress_percent)

    def complete_file(self):
        """Mark current file as complete."""
        if self.current_task is not None:
            self.progress.remove_task(self.current_task)
            self.current_task = None

        self.progress.advance(self.overall_task, 1)

    def finish(self):
        """Finish monitoring."""
        if self.current_task is not None:
            self.progress.remove_task(self.current_task)
        self.progress.stop()

# Usage with progress monitoring
def monitored_batch_process(video_files, output_dir):
    monitor = BatchProgressMonitor()
    converter = MediaConverter()

    monitor.start_batch(len(video_files))

    for video_file in video_files:
        monitor.start_file(Path(video_file).name)

        try:
            # Simulate progress updates during conversion
            # (In real implementation, this would come from converter callbacks)
            for i in range(0, 101, 10):
                monitor.update_file_progress(i)
                time.sleep(0.1)  # Simulate work

            result = converter.convert(video_file, f"{output_dir}/{Path(video_file).stem}")
            monitor.complete_file()

        except Exception as e:
            console.print(f"❌ Error processing {video_file}: {e}", style="red")
            monitor.complete_file()

    monitor.finish()

Batch Processing Strategies

Resource Management

import psutil
from streampack import MediaConverter, MediaConfig

def adaptive_batch_processing(video_files, output_dir):
    """Adapt processing based on system resources."""

    # Check system resources
    cpu_count = psutil.cpu_count()
    available_memory = psutil.virtual_memory().available / (1024**3)  # GB

    # Determine optimal settings
    if available_memory > 16:  # High-end system
        max_workers = min(cpu_count // 2, 4)
        preset = 'medium'
        crf = 20
    elif available_memory > 8:  # Mid-range system
        max_workers = min(cpu_count // 3, 2)  
        preset = 'fast'
        crf = 23
    else:  # Limited resources
        max_workers = 1
        preset = 'veryfast'
        crf = 26

    config = MediaConfig(
        max_workers=max_workers,
        preset=preset,
        crf=crf
    )

    converter = MediaConverter(config=config)

    # Process files
    results = []
    for video_file in video_files:
        result = converter.convert(video_file, f"{output_dir}/{Path(video_file).stem}")
        results.append(result)

    return results

Error Handling and Recovery

import logging
from pathlib import Path
from streampack import MediaConverter

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('batch_processing.log'),
        logging.StreamHandler()
    ]
)

def robust_batch_process(video_files, output_dir, max_retries=3):
    """Batch process with error handling and retries."""

    converter = MediaConverter()
    results = []

    for video_file in video_files:
        video_path = Path(video_file)
        output_path = Path(output_dir) / video_path.stem

        success = False
        last_error = None

        for attempt in range(max_retries + 1):
            try:
                logging.info(f"Processing {video_file} (attempt {attempt + 1})")

                result = converter.convert(str(video_path), str(output_path))

                results.append({
                    'input': str(video_path),
                    'output': str(output_path),
                    'success': True,
                    'result': result,
                    'attempts': attempt + 1
                })

                logging.info(f"✅ Successfully processed {video_file}")
                success = True
                break

            except Exception as e:
                last_error = e
                logging.warning(f"❌ Attempt {attempt + 1} failed for {video_file}: {e}")

                if attempt < max_retries:
                    logging.info(f"Retrying {video_file} in 5 seconds...")
                    time.sleep(5)

        if not success:
            results.append({
                'input': str(video_path),
                'output': str(output_path),
                'success': False,
                'error': str(last_error),
                'attempts': max_retries + 1
            })
            logging.error(f"❌ Failed to process {video_file} after {max_retries + 1} attempts")

    return results

Best Practices

Performance Optimization

  1. Resource allocation: Don't exceed 70% of available CPU cores
  2. Memory management: Monitor memory usage, especially with large files
  3. Storage: Use SSDs for temporary files and output
  4. Network: Consider bandwidth for cloud storage uploads

Batch Configuration Tips

  1. File grouping: Group similar files together for consistent settings
  2. Quality presets: Use different presets based on content type
  3. Scheduling: Run heavy batches during off-peak hours
  4. Monitoring: Always log processing for debugging and auditing

Troubleshooting

# Check system resources during batch processing
htop

# Monitor disk space
df -h

# Check StreamPack logs
tail -f streampack.log

# Resume failed batch
streampack --resume-batch batch_state.json