Skip to content

API Reference

Complete API documentation for mpmsub.

Core Functions

cluster()

mpmsub.cluster

Core Cluster class for mpmsub library.

Cluster(cpus=None, memory=None, verbose=True, progress_bar=True)

Main cluster class for managing subprocess execution with memory awareness.

Initialize a compute cluster.

Parameters:

Name Type Description Default
cpus Union[int, str, None]

Number of CPUs to use. If None, auto-detects.

None
memory Union[str, int, None]

Memory limit (e.g., "16G", "2048M"). If None, auto-detects.

None
verbose bool

Whether to print progress information.

True
progress_bar bool

Whether to show a progress bar during execution.

True
Source code in mpmsub/cluster.py
def __init__(
    self,
    cpus: Union[int, str, None] = None,
    memory: Union[str, int, None] = None,
    verbose: bool = True,
    progress_bar: bool = True,
):
    """
    Initialize a compute cluster.

    Args:
        cpus: Number of CPUs to use. If None, auto-detects.
        memory: Memory limit (e.g., "16G", "2048M"). If None, auto-detects.
        verbose: Whether to print progress information.
        progress_bar: Whether to show a progress bar during execution.
    """
    # Parse resource specifications
    self.max_cpus = parse_cpu_string(cpus)
    self.max_memory_mb = parse_memory_string(memory)

    # Auto-detect resources if not specified
    if self.max_cpus is None or self.max_memory_mb is None:
        system_resources = get_system_resources()
        if self.max_cpus is None:
            self.max_cpus = system_resources["cpus"]
        if self.max_memory_mb is None:
            # Use 90% of available memory
            self.max_memory_mb = int(system_resources["memory_mb"] * 0.9)

    self.verbose = verbose
    self.progress_bar = progress_bar

    # Initialize components
    self.job_queue = JobQueue()
    self.memory_monitor = MemoryMonitor()
    self.resource_usage = ResourceUsage()

    # Job management
    self.jobs = JobList(self.job_queue)

    # Execution state
    self._running = False
    self._executor = None

    # Statistics
    self.start_time = None
    self.end_time = None

    # Setup logging
    self.logger = logging.getLogger("mpmsub")
    if verbose and not self.logger.handlers:
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

completed_jobs property

Get list of completed jobs.

failed_jobs property

Get list of failed jobs.

stats property

Get cluster statistics.

describe_resources()

Print detailed information about cluster resources.

Source code in mpmsub/cluster.py
def describe_resources(self):
    """Print detailed information about cluster resources."""
    import psutil

    from .utils import _get_available_memory_mb

    # System resources
    system_memory = psutil.virtual_memory()
    system_cpus = psutil.cpu_count()
    available_memory_mb = _get_available_memory_mb(system_memory)

    print("🖥️  System Resources:")
    print(f"   CPUs: {system_cpus} cores")
    print(
        f"   Memory: {format_memory(system_memory.total / (1024**2))} total, {format_memory(available_memory_mb)} available"
    )
    print()

    print("⚙️  Cluster Configuration:")
    print(
        f"   CPUs: {self.max_cpus} cores ({self.max_cpus / system_cpus * 100:.0f}% of system)"
    )
    print(
        f"   Memory: {format_memory(self.max_memory_mb)} ({self.max_memory_mb / (system_memory.total / (1024**2)) * 100:.0f}% of system)"
    )
    print()

    print("📊 Resource Utilization:")
    print(
        f"   CPU slots used: {self.resource_usage.cpu_slots_used}/{self.max_cpus}"
    )
    print(
        f"   Memory used: {format_memory(self.resource_usage.memory_used)}/{format_memory(self.max_memory_mb)}"
    )
    print(f"   Active jobs: {self.resource_usage.active_jobs}")
    print()

print_summary()

Print a summary of execution results.

Source code in mpmsub/cluster.py
def print_summary(self):
    """Print a summary of execution results."""
    stats = self.stats

    print("\n" + "=" * 60)
    print("MPMSUB EXECUTION SUMMARY")
    print("=" * 60)

    # Cluster info
    cluster_info = stats["cluster"]
    print(
        f"Cluster: {cluster_info['max_cpus']} CPUs, {cluster_info['max_memory']} memory"
    )
    print(f"Runtime: {cluster_info['runtime_formatted']}")

    # Job statistics
    job_stats = stats["jobs"]
    print(f"\nJobs: {job_stats['total']} total")
    print(f"  ✓ Completed: {job_stats['completed']}")
    print(f"  ✗ Failed: {job_stats['failed']}")

    if job_stats["completed"] > 0:
        # Performance statistics
        completed = self.completed_jobs
        runtimes = [job.runtime for job in completed]
        memories = [job.memory_used for job in completed]

        print("\nPerformance:")
        print(
            f"  Average runtime: {format_duration(sum(runtimes) / len(runtimes))}"
        )
        print(f"  Total CPU time: {format_duration(sum(runtimes))}")
        if memories:
            print(f"  Peak memory: {format_memory(max(memories))}")
            print(
                f"  Average memory: {format_memory(sum(memories) / len(memories))}"
            )

    if job_stats["failed"] > 0:
        print("\nFailed jobs:")
        for job in self.failed_jobs[:5]:  # Show first 5 failures
            print(f"  ✗ {job.job_id}: {job.error or f'Exit code {job.returncode}'}")
        if len(self.failed_jobs) > 5:
            print(f"  ... and {len(self.failed_jobs) - 5} more")

    print("=" * 60)

profile(verbose=True)

Profile jobs by running them sequentially to measure actual resource usage.

This is useful for estimating memory requirements when you don't know them. Jobs are run one at a time (respecting CPU requirements) to get accurate memory measurements without interference.

Parameters:

Name Type Description Default
verbose bool

Whether to print progress information.

True

Returns:

Type Description
List[JobResult]

List[JobResult]: Results from profiling run with actual memory usage.

Source code in mpmsub/cluster.py
def profile(self, verbose: bool = True) -> List[JobResult]:
    """
    Profile jobs by running them sequentially to measure actual resource usage.

    This is useful for estimating memory requirements when you don't know them.
    Jobs are run one at a time (respecting CPU requirements) to get accurate
    memory measurements without interference.

    Args:
        verbose: Whether to print progress information.

    Returns:
        List[JobResult]: Results from profiling run with actual memory usage.
    """
    if self._running:
        raise RuntimeError("Cannot profile while cluster is running")

    stats = self.job_queue.get_stats()
    if stats["pending"] == 0:
        if verbose:
            print("No jobs to profile")
        return []

    if verbose:
        print("MPMSUB PROFILING MODE")
        print("=" * 40)
        print(f"Profiling {stats['pending']} jobs sequentially")
        print("This will measure actual memory usage for each job")
        print("Use these measurements to set 'm' values for efficient scheduling\n")

    self._running = True
    self.start_time = time.time()

    try:
        return self._profile_jobs(verbose)
    finally:
        self._running = False
        self.end_time = time.time()

run(max_workers=None)

Run all queued jobs with optimal scheduling.

Parameters:

Name Type Description Default
max_workers Optional[int]

Maximum number of concurrent jobs. If None, uses cluster CPU limit.

None

Returns:

Name Type Description
dict Dict[str, Any]

Execution statistics and results.

Source code in mpmsub/cluster.py
def run(self, max_workers: Optional[int] = None) -> Dict[str, Any]:
    """
    Run all queued jobs with optimal scheduling.

    Args:
        max_workers: Maximum number of concurrent jobs. If None, uses cluster CPU limit.

    Returns:
        dict: Execution statistics and results.
    """
    if self._running:
        raise RuntimeError("Cluster is already running")

    self._running = True
    self.start_time = time.time()

    try:
        return self._execute_jobs(max_workers)
    finally:
        self._running = False
        self.end_time = time.time()

Job(cmd=None, p=None, m=None, id=None, cwd=None, env=None, timeout=None, pipeline=None, stdout=None, stderr=None)

Object-oriented interface for job specification.

Provides a more intuitive way to create jobs with IDE support, while maintaining compatibility with the dictionary interface. Supports both single commands and pipelines.

Create a new job.

Parameters:

Name Type Description Default
cmd Union[List[str], Pipeline, None]

Command to execute as list of strings, or Pipeline object

None
p Union[int, str, None]

Number of CPU cores needed (default: 1)

None
m Union[str, int, None]

Memory requirement (e.g., "1G", "512M", default: unlimited)

None
id Optional[str]

Custom job identifier (auto-generated if None)

None
cwd Optional[str]

Working directory for the job

None
env Optional[Dict[str, str]]

Environment variables for the job

None
timeout Optional[float]

Timeout in seconds

None
pipeline Optional[List[List[str]]]

Alternative way to specify pipeline as list of commands

None
stdout Optional[str]

File path to redirect stdout to (optional)

None
stderr Optional[str]

File path to redirect stderr to (optional)

None
Source code in mpmsub/cluster.py
def __init__(
    self,
    cmd: Union[List[str], Pipeline, None] = None,
    p: Union[int, str, None] = None,
    m: Union[str, int, None] = None,
    id: Optional[str] = None,
    cwd: Optional[str] = None,
    env: Optional[Dict[str, str]] = None,
    timeout: Optional[float] = None,
    pipeline: Optional[List[List[str]]] = None,
    stdout: Optional[str] = None,
    stderr: Optional[str] = None,
):
    """
    Create a new job.

    Args:
        cmd: Command to execute as list of strings, or Pipeline object
        p: Number of CPU cores needed (default: 1)
        m: Memory requirement (e.g., "1G", "512M", default: unlimited)
        id: Custom job identifier (auto-generated if None)
        cwd: Working directory for the job
        env: Environment variables for the job
        timeout: Timeout in seconds
        pipeline: Alternative way to specify pipeline as list of commands
        stdout: File path to redirect stdout to (optional)
        stderr: File path to redirect stderr to (optional)
    """
    # Handle pipeline specification
    if pipeline is not None:
        self.cmd = Pipeline(pipeline)
    elif isinstance(cmd, Pipeline):
        self.cmd = cmd
    elif cmd is not None:
        self.cmd = cmd
    else:
        raise ValueError("Must specify either 'cmd' or 'pipeline'")

    self.p = p
    self.m = m
    self.id = id
    self.cwd = cwd
    self.env = env
    self.timeout = timeout
    self.stdout = stdout
    self.stderr = stderr

__repr__()

String representation of the job.

Source code in mpmsub/cluster.py
def __repr__(self) -> str:
    """String representation of the job."""
    if isinstance(self.cmd, Pipeline):
        return f"Job(pipeline={self.cmd}, p={self.p}, m={self.m})"
    else:
        cmd_str = " ".join(self.cmd[:3])
        if len(self.cmd) > 3:
            cmd_str += "..."
        return f"Job(cmd=[{cmd_str}], p={self.p}, m={self.m})"

cpu(cores)

Set CPU requirement (builder pattern).

Source code in mpmsub/cluster.py
def cpu(self, cores: Union[int, str]) -> "Job":
    """Set CPU requirement (builder pattern)."""
    self.p = cores
    return self

environment(env_vars)

Set environment variables (builder pattern).

Source code in mpmsub/cluster.py
def environment(self, env_vars: Dict[str, str]) -> "Job":
    """Set environment variables (builder pattern)."""
    self.env = env_vars
    return self

memory(mem)

Set memory requirement (builder pattern).

Source code in mpmsub/cluster.py
def memory(self, mem: Union[str, int]) -> "Job":
    """Set memory requirement (builder pattern)."""
    self.m = mem
    return self

pipe_to(next_cmd)

Add another command to the pipeline (builder pattern).

Source code in mpmsub/cluster.py
def pipe_to(self, next_cmd: List[str]) -> "Job":
    """Add another command to the pipeline (builder pattern)."""
    if isinstance(self.cmd, Pipeline):
        # Extend existing pipeline
        self.cmd.commands.append(next_cmd)
    elif isinstance(self.cmd, list):
        # Convert single command to pipeline
        self.cmd = Pipeline([self.cmd, next_cmd])
    else:
        raise ValueError("Cannot pipe from non-command job")
    return self

stderr_to(file_path)

Redirect stderr to file (builder pattern).

Source code in mpmsub/cluster.py
def stderr_to(self, file_path: str) -> "Job":
    """Redirect stderr to file (builder pattern)."""
    self.stderr = file_path
    return self

stdout_to(file_path)

Redirect stdout to file (builder pattern).

Source code in mpmsub/cluster.py
def stdout_to(self, file_path: str) -> "Job":
    """Redirect stdout to file (builder pattern)."""
    self.stdout = file_path
    return self

to_dict()

Convert to dictionary format for internal use.

Source code in mpmsub/cluster.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to dictionary format for internal use."""
    return {
        "cmd": self.cmd,
        "p": self.p,
        "m": self.m,
        "id": self.id,
        "cwd": self.cwd,
        "env": self.env,
        "timeout": self.timeout,
        "stdout": self.stdout,
        "stderr": self.stderr,
    }

with_id(job_id)

Set custom job ID (builder pattern).

Source code in mpmsub/cluster.py
def with_id(self, job_id: str) -> "Job":
    """Set custom job ID (builder pattern)."""
    self.id = job_id
    return self

with_timeout(seconds)

Set timeout (builder pattern).

Source code in mpmsub/cluster.py
def with_timeout(self, seconds: float) -> "Job":
    """Set timeout (builder pattern)."""
    self.timeout = seconds
    return self

working_dir(path)

Set working directory (builder pattern).

Source code in mpmsub/cluster.py
def working_dir(self, path: str) -> "Job":
    """Set working directory (builder pattern)."""
    self.cwd = path
    return self

JobList(job_queue)

List-like interface for managing jobs.

Source code in mpmsub/cluster.py
def __init__(self, job_queue: JobQueue):
    self._queue = job_queue

__iter__()

Iterate over pending jobs.

Source code in mpmsub/cluster.py
def __iter__(self):
    """Iterate over pending jobs."""
    return iter(self._queue.pending_jobs)

__len__()

Get number of pending jobs.

Source code in mpmsub/cluster.py
def __len__(self) -> int:
    """Get number of pending jobs."""
    return len(self._queue.pending_jobs)

append(job)

Add a job to the queue. Accepts both Job objects and dictionaries.

Source code in mpmsub/cluster.py
def append(self, job: Union[Dict[str, Any], Job]) -> str:
    """Add a job to the queue. Accepts both Job objects and dictionaries."""
    if isinstance(job, Job):
        job_dict = job.to_dict()
    else:
        job_dict = job
    return self._queue.add_job(job_dict)

extend(jobs)

Add multiple jobs to the queue. Accepts both Job objects and dictionaries.

Source code in mpmsub/cluster.py
def extend(self, jobs: List[Union[Dict[str, Any], Job]]) -> List[str]:
    """Add multiple jobs to the queue. Accepts both Job objects and dictionaries."""
    return [self.append(job) for job in jobs]

JobQueue()

Manage job queue with priority scheduling.

Source code in mpmsub/cluster.py
def __init__(self):
    self.pending_jobs = []
    self.running_jobs = {}
    self.completed_jobs = []
    self.failed_jobs = []
    self._job_counter = 0
    self._lock = threading.Lock()

add_job(job)

Add a job to the queue.

Source code in mpmsub/cluster.py
def add_job(self, job: Dict[str, Any]) -> str:
    """Add a job to the queue."""
    with self._lock:
        # Validate and normalize job
        normalized_job = validate_job(job)

        # Assign unique ID if not provided
        if normalized_job["id"] is None:
            self._job_counter += 1
            normalized_job["id"] = f"job_{self._job_counter:04d}"

        self.pending_jobs.append(normalized_job)
        return normalized_job["id"]

get_next_job(available_cpus, available_memory)

Get the next job that can run with available resources.

Source code in mpmsub/cluster.py
def get_next_job(
    self, available_cpus: int, available_memory: float
) -> Optional[Dict]:
    """Get the next job that can run with available resources."""
    with self._lock:
        for i, job in enumerate(self.pending_jobs):
            # Check CPU constraint
            if job["p"] > available_cpus:
                continue

            # Check memory constraint (None means no limit)
            if job["m"] is not None and job["m"] > available_memory:
                continue

            return self.pending_jobs.pop(i)
        return None

get_stats()

Get queue statistics.

Source code in mpmsub/cluster.py
def get_stats(self) -> Dict[str, int]:
    """Get queue statistics."""
    with self._lock:
        return {
            "pending": len(self.pending_jobs),
            "running": len(self.running_jobs),
            "completed": len(self.completed_jobs),
            "failed": len(self.failed_jobs),
            "total": len(self.pending_jobs)
            + len(self.running_jobs)
            + len(self.completed_jobs)
            + len(self.failed_jobs),
        }

mark_completed(result)

Mark a job as completed.

Source code in mpmsub/cluster.py
def mark_completed(self, result: JobResult):
    """Mark a job as completed."""
    with self._lock:
        job_id = result.job_id
        if job_id in self.running_jobs:
            del self.running_jobs[job_id]

        if result.success:
            self.completed_jobs.append(result)
        else:
            self.failed_jobs.append(result)

mark_running(job)

Mark a job as running.

Source code in mpmsub/cluster.py
def mark_running(self, job: Dict[str, Any]):
    """Mark a job as running."""
    with self._lock:
        self.running_jobs[job["id"]] = job

JobResult(job_id, cmd, returncode=0, stdout='', stderr='', runtime=0.0, memory_used=0.0, cpu_used=1, start_time=0.0, end_time=0.0, success=False, error=None) dataclass

Result of a completed job.

MemoryMonitor(sampling_interval=0.5)

Monitor memory usage of running processes.

Source code in mpmsub/cluster.py
def __init__(self, sampling_interval: float = 0.5):
    self.sampling_interval = sampling_interval
    self._monitoring = {}
    self._lock = threading.Lock()

cleanup(job_id)

Clean up monitoring data for a job.

Source code in mpmsub/cluster.py
def cleanup(self, job_id: str):
    """Clean up monitoring data for a job."""
    with self._lock:
        self._monitoring.pop(job_id, None)

get_peak_memory(job_id)

Get peak memory usage for a job.

Source code in mpmsub/cluster.py
def get_peak_memory(self, job_id: str) -> float:
    """Get peak memory usage for a job."""
    with self._lock:
        if job_id in self._monitoring:
            return self._monitoring[job_id]["peak_memory"]
        return 0.0

start_monitoring(job_id, process)

Start monitoring a process.

Source code in mpmsub/cluster.py
def start_monitoring(
    self, job_id: str, process: subprocess.Popen
) -> threading.Thread:
    """Start monitoring a process."""

    def monitor():
        try:
            psutil_process = psutil.Process(process.pid)
            peak_memory = 0.0

            while process.poll() is None:
                try:
                    # Get memory info for process and all children
                    memory_info = psutil_process.memory_info()
                    current_memory = memory_info.rss / (
                        1024 * 1024
                    )  # Convert to MB

                    # Include children
                    for child in psutil_process.children(recursive=True):
                        try:
                            child_memory = child.memory_info()
                            current_memory += child_memory.rss / (1024 * 1024)
                        except (psutil.NoSuchProcess, psutil.AccessDenied):
                            pass

                    peak_memory = max(peak_memory, current_memory)

                    with self._lock:
                        self._monitoring[job_id] = {
                            "current_memory": current_memory,
                            "peak_memory": peak_memory,
                        }

                    time.sleep(self.sampling_interval)

                except (psutil.NoSuchProcess, psutil.AccessDenied):
                    break

        except Exception:
            # Process might have ended before we could monitor it
            pass

    thread = threading.Thread(target=monitor, daemon=True)
    thread.start()
    return thread

Pipeline(commands)

Represents a pipeline of commands connected via pipes.

This allows chaining multiple subprocess commands together, similar to shell pipes (cmd1 | cmd2 | cmd3).

Create a new pipeline.

Parameters:

Name Type Description Default
commands List[List[str]]

List of commands, where each command is a list of strings. Commands will be piped together in order.

required

Examples:

>>> pipeline = Pipeline([
...     ["cat", "file.txt"],
...     ["grep", "pattern"],
...     ["sort"]
... ])
Source code in mpmsub/cluster.py
def __init__(self, commands: List[List[str]]):
    """
    Create a new pipeline.

    Args:
        commands: List of commands, where each command is a list of strings.
                 Commands will be piped together in order.

    Examples:
        >>> pipeline = Pipeline([
        ...     ["cat", "file.txt"],
        ...     ["grep", "pattern"],
        ...     ["sort"]
        ... ])
    """
    if not commands or len(commands) < 2:
        raise ValueError("Pipeline must have at least 2 commands")

    self.commands = commands

__repr__()

String representation of the pipeline.

Source code in mpmsub/cluster.py
def __repr__(self) -> str:
    """String representation of the pipeline."""
    cmd_strs = []
    for cmd in self.commands:
        cmd_str = " ".join(cmd[:3])
        if len(cmd) > 3:
            cmd_str += "..."
        cmd_strs.append(cmd_str)
    return f"Pipeline({' | '.join(cmd_strs)})"

ProgressBar(total, width=40, show_percent=True)

Simple progress bar using only standard library.

Source code in mpmsub/cluster.py
def __init__(self, total: int, width: int = 40, show_percent: bool = True):
    self.total = total
    self.current = 0
    self.width = width
    self.show_percent = show_percent
    self.start_time = time.time()

finish()

Ensure progress bar shows 100% completion.

Source code in mpmsub/cluster.py
def finish(self):
    """Ensure progress bar shows 100% completion."""
    self.current = self.total
    self._draw()

update(increment=1)

Update progress by increment.

Source code in mpmsub/cluster.py
def update(self, increment: int = 1):
    """Update progress by increment."""
    self.current = min(self.current + increment, self.total)
    self._draw()

ResourceUsage(cpu_slots_used=0, memory_used=0.0, active_jobs=0) dataclass

Track resource usage over time.

job()

mpmsub.job(cmd, p=None, m=None, cpu=None, cpus=None, memory=None, **kwargs)

Create a new Job object with a concise interface.

Parameters:

Name Type Description Default
cmd List[str]

Command to execute as list of strings

required
p Optional[Union[int, str]]

Number of CPU cores needed (default: 1)

None
m Optional[Union[str, int]]

Memory requirement (e.g., "1G", "512M", default: unlimited)

None
cpu Optional[Union[int, str]]

Alternative to 'p' - Number of CPU cores needed

None
cpus Optional[Union[int, str]]

Alternative to 'p' - Number of CPU cores needed

None
memory Optional[Union[str, int]]

Alternative to 'm' - Memory requirement

None
**kwargs Any

Additional job parameters (id, cwd, env, timeout, stdout, stderr)

{}

Returns:

Name Type Description
Job Job

A new job instance.

Examples:

>>> import mpmsub
>>> j = mpmsub.job(["echo", "hello"], p=1, m="100M")
>>> j = mpmsub.job(["echo", "hello"], cpu=1, memory="100M")  # Alt syntax
>>> j = mpmsub.job(["python", "script.py"], p=2, m="1G", timeout=300)
Source code in mpmsub/__init__.py
def job(
    cmd: List[str],
    p: Optional[Union[int, str]] = None,
    m: Optional[Union[str, int]] = None,
    cpu: Optional[Union[int, str]] = None,
    cpus: Optional[Union[int, str]] = None,
    memory: Optional[Union[str, int]] = None,
    **kwargs: Any,
) -> Job:
    """
    Create a new Job object with a concise interface.

    Args:
        cmd: Command to execute as list of strings
        p: Number of CPU cores needed (default: 1)
        m: Memory requirement (e.g., "1G", "512M", default: unlimited)
        cpu: Alternative to 'p' - Number of CPU cores needed
        cpus: Alternative to 'p' - Number of CPU cores needed
        memory: Alternative to 'm' - Memory requirement
        **kwargs: Additional job parameters (id, cwd, env, timeout, stdout, stderr)

    Returns:
        Job: A new job instance.

    Examples:
        >>> import mpmsub
        >>> j = mpmsub.job(["echo", "hello"], p=1, m="100M")
        >>> j = mpmsub.job(["echo", "hello"], cpu=1, memory="100M")  # Alt syntax
        >>> j = mpmsub.job(["python", "script.py"], p=2, m="1G", timeout=300)
    """
    # Handle multiple CPU parameter names (p, cpu, cpus)
    cpu_param = p or cpu or cpus

    # Handle multiple memory parameter names (m, memory)
    memory_param = m or memory

    return Job(cmd=cmd, p=cpu_param, m=memory_param, **kwargs)

pipeline()

mpmsub.pipeline(commands, p=None, m=None, cpu=None, cpus=None, memory=None, **kwargs)

Create a new Job with a pipeline of commands.

Parameters:

Name Type Description Default
commands List[List[str]]

List of commands to pipe together

required
p Optional[Union[int, str]]

Number of CPU cores needed (default: 1)

None
m Optional[Union[str, int]]

Memory requirement (e.g., "1G", "512M", default: unlimited)

None
cpu Optional[Union[int, str]]

Alternative to 'p' - Number of CPU cores needed

None
cpus Optional[Union[int, str]]

Alternative to 'p' - Number of CPU cores needed

None
memory Optional[Union[str, int]]

Alternative to 'm' - Memory requirement

None
**kwargs Any

Additional job parameters (id, cwd, env, timeout, stdout, stderr)

{}

Returns:

Name Type Description
Job Job

A new job instance with a pipeline.

Examples:

>>> import mpmsub
>>> j = mpmsub.pipeline([
...     ["cat", "file.txt"],
...     ["grep", "pattern"],
...     ["sort"]
... ], p=1, m="100M")
>>> j = mpmsub.pipeline([
...     ["cat", "file.txt"],
...     ["grep", "pattern"]
... ], cpu=1, memory="100M")  # Alternative syntax
Source code in mpmsub/__init__.py
def pipeline(
    commands: List[List[str]],
    p: Optional[Union[int, str]] = None,
    m: Optional[Union[str, int]] = None,
    cpu: Optional[Union[int, str]] = None,
    cpus: Optional[Union[int, str]] = None,
    memory: Optional[Union[str, int]] = None,
    **kwargs: Any,
) -> Job:
    """
    Create a new Job with a pipeline of commands.

    Args:
        commands: List of commands to pipe together
        p: Number of CPU cores needed (default: 1)
        m: Memory requirement (e.g., "1G", "512M", default: unlimited)
        cpu: Alternative to 'p' - Number of CPU cores needed
        cpus: Alternative to 'p' - Number of CPU cores needed
        memory: Alternative to 'm' - Memory requirement
        **kwargs: Additional job parameters (id, cwd, env, timeout, stdout, stderr)

    Returns:
        Job: A new job instance with a pipeline.

    Examples:
        >>> import mpmsub
        >>> j = mpmsub.pipeline([
        ...     ["cat", "file.txt"],
        ...     ["grep", "pattern"],
        ...     ["sort"]
        ... ], p=1, m="100M")
        >>> j = mpmsub.pipeline([
        ...     ["cat", "file.txt"],
        ...     ["grep", "pattern"]
        ... ], cpu=1, memory="100M")  # Alternative syntax
    """
    # Handle multiple CPU parameter names (p, cpu, cpus)
    cpu_param = p or cpu or cpus

    # Handle multiple memory parameter names (m, memory)
    memory_param = m or memory

    return Job(cmd=Pipeline(commands), p=cpu_param, m=memory_param, **kwargs)

Core Classes

Cluster

mpmsub.Cluster(cpus=None, memory=None, verbose=True, progress_bar=True)

Main cluster class for managing subprocess execution with memory awareness.

Initialize a compute cluster.

Parameters:

Name Type Description Default
cpus Union[int, str, None]

Number of CPUs to use. If None, auto-detects.

None
memory Union[str, int, None]

Memory limit (e.g., "16G", "2048M"). If None, auto-detects.

None
verbose bool

Whether to print progress information.

True
progress_bar bool

Whether to show a progress bar during execution.

True
Source code in mpmsub/cluster.py
def __init__(
    self,
    cpus: Union[int, str, None] = None,
    memory: Union[str, int, None] = None,
    verbose: bool = True,
    progress_bar: bool = True,
):
    """
    Initialize a compute cluster.

    Args:
        cpus: Number of CPUs to use. If None, auto-detects.
        memory: Memory limit (e.g., "16G", "2048M"). If None, auto-detects.
        verbose: Whether to print progress information.
        progress_bar: Whether to show a progress bar during execution.
    """
    # Parse resource specifications
    self.max_cpus = parse_cpu_string(cpus)
    self.max_memory_mb = parse_memory_string(memory)

    # Auto-detect resources if not specified
    if self.max_cpus is None or self.max_memory_mb is None:
        system_resources = get_system_resources()
        if self.max_cpus is None:
            self.max_cpus = system_resources["cpus"]
        if self.max_memory_mb is None:
            # Use 90% of available memory
            self.max_memory_mb = int(system_resources["memory_mb"] * 0.9)

    self.verbose = verbose
    self.progress_bar = progress_bar

    # Initialize components
    self.job_queue = JobQueue()
    self.memory_monitor = MemoryMonitor()
    self.resource_usage = ResourceUsage()

    # Job management
    self.jobs = JobList(self.job_queue)

    # Execution state
    self._running = False
    self._executor = None

    # Statistics
    self.start_time = None
    self.end_time = None

    # Setup logging
    self.logger = logging.getLogger("mpmsub")
    if verbose and not self.logger.handlers:
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

completed_jobs property

Get list of completed jobs.

failed_jobs property

Get list of failed jobs.

stats property

Get cluster statistics.

describe_resources()

Print detailed information about cluster resources.

Source code in mpmsub/cluster.py
def describe_resources(self):
    """Print detailed information about cluster resources."""
    import psutil

    from .utils import _get_available_memory_mb

    # System resources
    system_memory = psutil.virtual_memory()
    system_cpus = psutil.cpu_count()
    available_memory_mb = _get_available_memory_mb(system_memory)

    print("🖥️  System Resources:")
    print(f"   CPUs: {system_cpus} cores")
    print(
        f"   Memory: {format_memory(system_memory.total / (1024**2))} total, {format_memory(available_memory_mb)} available"
    )
    print()

    print("⚙️  Cluster Configuration:")
    print(
        f"   CPUs: {self.max_cpus} cores ({self.max_cpus / system_cpus * 100:.0f}% of system)"
    )
    print(
        f"   Memory: {format_memory(self.max_memory_mb)} ({self.max_memory_mb / (system_memory.total / (1024**2)) * 100:.0f}% of system)"
    )
    print()

    print("📊 Resource Utilization:")
    print(
        f"   CPU slots used: {self.resource_usage.cpu_slots_used}/{self.max_cpus}"
    )
    print(
        f"   Memory used: {format_memory(self.resource_usage.memory_used)}/{format_memory(self.max_memory_mb)}"
    )
    print(f"   Active jobs: {self.resource_usage.active_jobs}")
    print()

print_summary()

Print a summary of execution results.

Source code in mpmsub/cluster.py
def print_summary(self):
    """Print a summary of execution results."""
    stats = self.stats

    print("\n" + "=" * 60)
    print("MPMSUB EXECUTION SUMMARY")
    print("=" * 60)

    # Cluster info
    cluster_info = stats["cluster"]
    print(
        f"Cluster: {cluster_info['max_cpus']} CPUs, {cluster_info['max_memory']} memory"
    )
    print(f"Runtime: {cluster_info['runtime_formatted']}")

    # Job statistics
    job_stats = stats["jobs"]
    print(f"\nJobs: {job_stats['total']} total")
    print(f"  ✓ Completed: {job_stats['completed']}")
    print(f"  ✗ Failed: {job_stats['failed']}")

    if job_stats["completed"] > 0:
        # Performance statistics
        completed = self.completed_jobs
        runtimes = [job.runtime for job in completed]
        memories = [job.memory_used for job in completed]

        print("\nPerformance:")
        print(
            f"  Average runtime: {format_duration(sum(runtimes) / len(runtimes))}"
        )
        print(f"  Total CPU time: {format_duration(sum(runtimes))}")
        if memories:
            print(f"  Peak memory: {format_memory(max(memories))}")
            print(
                f"  Average memory: {format_memory(sum(memories) / len(memories))}"
            )

    if job_stats["failed"] > 0:
        print("\nFailed jobs:")
        for job in self.failed_jobs[:5]:  # Show first 5 failures
            print(f"  ✗ {job.job_id}: {job.error or f'Exit code {job.returncode}'}")
        if len(self.failed_jobs) > 5:
            print(f"  ... and {len(self.failed_jobs) - 5} more")

    print("=" * 60)

profile(verbose=True)

Profile jobs by running them sequentially to measure actual resource usage.

This is useful for estimating memory requirements when you don't know them. Jobs are run one at a time (respecting CPU requirements) to get accurate memory measurements without interference.

Parameters:

Name Type Description Default
verbose bool

Whether to print progress information.

True

Returns:

Type Description
List[JobResult]

List[JobResult]: Results from profiling run with actual memory usage.

Source code in mpmsub/cluster.py
def profile(self, verbose: bool = True) -> List[JobResult]:
    """
    Profile jobs by running them sequentially to measure actual resource usage.

    This is useful for estimating memory requirements when you don't know them.
    Jobs are run one at a time (respecting CPU requirements) to get accurate
    memory measurements without interference.

    Args:
        verbose: Whether to print progress information.

    Returns:
        List[JobResult]: Results from profiling run with actual memory usage.
    """
    if self._running:
        raise RuntimeError("Cannot profile while cluster is running")

    stats = self.job_queue.get_stats()
    if stats["pending"] == 0:
        if verbose:
            print("No jobs to profile")
        return []

    if verbose:
        print("MPMSUB PROFILING MODE")
        print("=" * 40)
        print(f"Profiling {stats['pending']} jobs sequentially")
        print("This will measure actual memory usage for each job")
        print("Use these measurements to set 'm' values for efficient scheduling\n")

    self._running = True
    self.start_time = time.time()

    try:
        return self._profile_jobs(verbose)
    finally:
        self._running = False
        self.end_time = time.time()

run(max_workers=None)

Run all queued jobs with optimal scheduling.

Parameters:

Name Type Description Default
max_workers Optional[int]

Maximum number of concurrent jobs. If None, uses cluster CPU limit.

None

Returns:

Name Type Description
dict Dict[str, Any]

Execution statistics and results.

Source code in mpmsub/cluster.py
def run(self, max_workers: Optional[int] = None) -> Dict[str, Any]:
    """
    Run all queued jobs with optimal scheduling.

    Args:
        max_workers: Maximum number of concurrent jobs. If None, uses cluster CPU limit.

    Returns:
        dict: Execution statistics and results.
    """
    if self._running:
        raise RuntimeError("Cluster is already running")

    self._running = True
    self.start_time = time.time()

    try:
        return self._execute_jobs(max_workers)
    finally:
        self._running = False
        self.end_time = time.time()

Job

mpmsub.Job(cmd=None, p=None, m=None, id=None, cwd=None, env=None, timeout=None, pipeline=None, stdout=None, stderr=None)

Object-oriented interface for job specification.

Provides a more intuitive way to create jobs with IDE support, while maintaining compatibility with the dictionary interface. Supports both single commands and pipelines.

Create a new job.

Parameters:

Name Type Description Default
cmd Union[List[str], Pipeline, None]

Command to execute as list of strings, or Pipeline object

None
p Union[int, str, None]

Number of CPU cores needed (default: 1)

None
m Union[str, int, None]

Memory requirement (e.g., "1G", "512M", default: unlimited)

None
id Optional[str]

Custom job identifier (auto-generated if None)

None
cwd Optional[str]

Working directory for the job

None
env Optional[Dict[str, str]]

Environment variables for the job

None
timeout Optional[float]

Timeout in seconds

None
pipeline Optional[List[List[str]]]

Alternative way to specify pipeline as list of commands

None
stdout Optional[str]

File path to redirect stdout to (optional)

None
stderr Optional[str]

File path to redirect stderr to (optional)

None
Source code in mpmsub/cluster.py
def __init__(
    self,
    cmd: Union[List[str], Pipeline, None] = None,
    p: Union[int, str, None] = None,
    m: Union[str, int, None] = None,
    id: Optional[str] = None,
    cwd: Optional[str] = None,
    env: Optional[Dict[str, str]] = None,
    timeout: Optional[float] = None,
    pipeline: Optional[List[List[str]]] = None,
    stdout: Optional[str] = None,
    stderr: Optional[str] = None,
):
    """
    Create a new job.

    Args:
        cmd: Command to execute as list of strings, or Pipeline object
        p: Number of CPU cores needed (default: 1)
        m: Memory requirement (e.g., "1G", "512M", default: unlimited)
        id: Custom job identifier (auto-generated if None)
        cwd: Working directory for the job
        env: Environment variables for the job
        timeout: Timeout in seconds
        pipeline: Alternative way to specify pipeline as list of commands
        stdout: File path to redirect stdout to (optional)
        stderr: File path to redirect stderr to (optional)
    """
    # Handle pipeline specification
    if pipeline is not None:
        self.cmd = Pipeline(pipeline)
    elif isinstance(cmd, Pipeline):
        self.cmd = cmd
    elif cmd is not None:
        self.cmd = cmd
    else:
        raise ValueError("Must specify either 'cmd' or 'pipeline'")

    self.p = p
    self.m = m
    self.id = id
    self.cwd = cwd
    self.env = env
    self.timeout = timeout
    self.stdout = stdout
    self.stderr = stderr

__repr__()

String representation of the job.

Source code in mpmsub/cluster.py
def __repr__(self) -> str:
    """String representation of the job."""
    if isinstance(self.cmd, Pipeline):
        return f"Job(pipeline={self.cmd}, p={self.p}, m={self.m})"
    else:
        cmd_str = " ".join(self.cmd[:3])
        if len(self.cmd) > 3:
            cmd_str += "..."
        return f"Job(cmd=[{cmd_str}], p={self.p}, m={self.m})"

cpu(cores)

Set CPU requirement (builder pattern).

Source code in mpmsub/cluster.py
def cpu(self, cores: Union[int, str]) -> "Job":
    """Set CPU requirement (builder pattern)."""
    self.p = cores
    return self

environment(env_vars)

Set environment variables (builder pattern).

Source code in mpmsub/cluster.py
def environment(self, env_vars: Dict[str, str]) -> "Job":
    """Set environment variables (builder pattern)."""
    self.env = env_vars
    return self

memory(mem)

Set memory requirement (builder pattern).

Source code in mpmsub/cluster.py
def memory(self, mem: Union[str, int]) -> "Job":
    """Set memory requirement (builder pattern)."""
    self.m = mem
    return self

pipe_to(next_cmd)

Add another command to the pipeline (builder pattern).

Source code in mpmsub/cluster.py
def pipe_to(self, next_cmd: List[str]) -> "Job":
    """Add another command to the pipeline (builder pattern)."""
    if isinstance(self.cmd, Pipeline):
        # Extend existing pipeline
        self.cmd.commands.append(next_cmd)
    elif isinstance(self.cmd, list):
        # Convert single command to pipeline
        self.cmd = Pipeline([self.cmd, next_cmd])
    else:
        raise ValueError("Cannot pipe from non-command job")
    return self

stderr_to(file_path)

Redirect stderr to file (builder pattern).

Source code in mpmsub/cluster.py
def stderr_to(self, file_path: str) -> "Job":
    """Redirect stderr to file (builder pattern)."""
    self.stderr = file_path
    return self

stdout_to(file_path)

Redirect stdout to file (builder pattern).

Source code in mpmsub/cluster.py
def stdout_to(self, file_path: str) -> "Job":
    """Redirect stdout to file (builder pattern)."""
    self.stdout = file_path
    return self

to_dict()

Convert to dictionary format for internal use.

Source code in mpmsub/cluster.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to dictionary format for internal use."""
    return {
        "cmd": self.cmd,
        "p": self.p,
        "m": self.m,
        "id": self.id,
        "cwd": self.cwd,
        "env": self.env,
        "timeout": self.timeout,
        "stdout": self.stdout,
        "stderr": self.stderr,
    }

with_id(job_id)

Set custom job ID (builder pattern).

Source code in mpmsub/cluster.py
def with_id(self, job_id: str) -> "Job":
    """Set custom job ID (builder pattern)."""
    self.id = job_id
    return self

with_timeout(seconds)

Set timeout (builder pattern).

Source code in mpmsub/cluster.py
def with_timeout(self, seconds: float) -> "Job":
    """Set timeout (builder pattern)."""
    self.timeout = seconds
    return self

working_dir(path)

Set working directory (builder pattern).

Source code in mpmsub/cluster.py
def working_dir(self, path: str) -> "Job":
    """Set working directory (builder pattern)."""
    self.cwd = path
    return self

Pipeline

mpmsub.Pipeline(commands)

Represents a pipeline of commands connected via pipes.

This allows chaining multiple subprocess commands together, similar to shell pipes (cmd1 | cmd2 | cmd3).

Create a new pipeline.

Parameters:

Name Type Description Default
commands List[List[str]]

List of commands, where each command is a list of strings. Commands will be piped together in order.

required

Examples:

>>> pipeline = Pipeline([
...     ["cat", "file.txt"],
...     ["grep", "pattern"],
...     ["sort"]
... ])
Source code in mpmsub/cluster.py
def __init__(self, commands: List[List[str]]):
    """
    Create a new pipeline.

    Args:
        commands: List of commands, where each command is a list of strings.
                 Commands will be piped together in order.

    Examples:
        >>> pipeline = Pipeline([
        ...     ["cat", "file.txt"],
        ...     ["grep", "pattern"],
        ...     ["sort"]
        ... ])
    """
    if not commands or len(commands) < 2:
        raise ValueError("Pipeline must have at least 2 commands")

    self.commands = commands

__repr__()

String representation of the pipeline.

Source code in mpmsub/cluster.py
def __repr__(self) -> str:
    """String representation of the pipeline."""
    cmd_strs = []
    for cmd in self.commands:
        cmd_str = " ".join(cmd[:3])
        if len(cmd) > 3:
            cmd_str += "..."
        cmd_strs.append(cmd_str)
    return f"Pipeline({' | '.join(cmd_strs)})"

Utility Functions

mpmsub.parse_memory_string(memory)

Parse memory specification into MB.

Parameters:

Name Type Description Default
memory Union[str, int, None]

Memory specification as string (e.g., "16G", "2048M", "1024") or int (MB), or None for auto-detection.

required

Returns:

Name Type Description
int Optional[int]

Memory in MB, or None if auto-detection requested.

Examples:

>>> parse_memory_string("16G")
16384
>>> parse_memory_string("2048M")
2048
>>> parse_memory_string("1024")
1024
>>> parse_memory_string(2048)
2048
Source code in mpmsub/utils.py
def parse_memory_string(memory: Union[str, int, None]) -> Optional[int]:
    """
    Parse memory specification into MB.

    Args:
        memory: Memory specification as string (e.g., "16G", "2048M", "1024")
                or int (MB), or None for auto-detection.

    Returns:
        int: Memory in MB, or None if auto-detection requested.

    Examples:
        >>> parse_memory_string("16G")
        16384
        >>> parse_memory_string("2048M")
        2048
        >>> parse_memory_string("1024")
        1024
        >>> parse_memory_string(2048)
        2048
    """
    if memory is None:
        return None

    if isinstance(memory, int):
        return memory

    if isinstance(memory, str):
        memory = memory.strip().upper()

        # Match number followed by optional unit
        match = re.match(r"^(\d+(?:\.\d+)?)\s*([KMGT]?)B?$", memory)
        if not match:
            raise ValueError(f"Invalid memory specification: {memory}")

        value, unit = match.groups()
        value = float(value)

        # Convert to MB
        multipliers = {
            "": 1,  # Assume MB if no unit
            "K": 1 / 1024,  # KB to MB
            "M": 1,  # MB
            "G": 1024,  # GB to MB
            "T": 1024 * 1024,  # TB to MB
        }

        return int(value * multipliers[unit])

    raise ValueError(f"Invalid memory specification type: {type(memory)}")

mpmsub.parse_cpu_string(cpus)

Parse CPU specification.

Parameters:

Name Type Description Default
cpus Union[str, int, None]

CPU specification as string or int, or None for auto-detection.

required

Returns:

Name Type Description
int Optional[int]

Number of CPUs, or None if auto-detection requested.

Examples:

>>> parse_cpu_string("4")
4
>>> parse_cpu_string(6)
6
>>> parse_cpu_string(None)
None
Source code in mpmsub/utils.py
def parse_cpu_string(cpus: Union[str, int, None]) -> Optional[int]:
    """
    Parse CPU specification.

    Args:
        cpus: CPU specification as string or int, or None for auto-detection.

    Returns:
        int: Number of CPUs, or None if auto-detection requested.

    Examples:
        >>> parse_cpu_string("4")
        4
        >>> parse_cpu_string(6)
        6
        >>> parse_cpu_string(None)
        None
    """
    if cpus is None:
        return None

    if isinstance(cpus, int):
        return cpus

    if isinstance(cpus, str):
        try:
            return int(cpus.strip())
        except ValueError:
            raise ValueError(f"Invalid CPU specification: {cpus}")

    raise ValueError(f"Invalid CPU specification type: {type(cpus)}")

mpmsub.format_memory(mb)

Format memory in MB to human-readable string.

Parameters:

Name Type Description Default
mb Union[int, float]

Memory in MB.

required

Returns:

Name Type Description
str str

Formatted memory string.

Examples:

>>> format_memory(1024)
'1.0G'
>>> format_memory(512)
'512M'
Source code in mpmsub/utils.py
def format_memory(mb: Union[int, float]) -> str:
    """
    Format memory in MB to human-readable string.

    Args:
        mb: Memory in MB.

    Returns:
        str: Formatted memory string.

    Examples:
        >>> format_memory(1024)
        '1.0G'
        >>> format_memory(512)
        '512M'
    """
    if mb >= 1024:
        return f"{mb / 1024:.1f}G"
    else:
        return f"{int(mb)}M"

Quick Reference

Creating Clusters

import mpmsub

# Auto-detect resources
p = mpmsub.cluster()

# Specify resources
p = mpmsub.cluster(cpu=4, memory="8G")    # Alternative syntax
p = mpmsub.cluster(p=4, m="8G")           # Traditional syntax

Creating Jobs

# Dictionary interface
job = {"cmd": ["echo", "hello"], "p": 1, "m": "100M"}

# Object interface
job = mpmsub.Job(["echo", "hello"]).cpu(1).memory("100M")

# Convenience function
job = mpmsub.job(["echo", "hello"], cpu=1, memory="100M")

Creating Pipelines

# Pipeline convenience function
pipeline_job = mpmsub.pipeline([
    ["cat", "data.txt"],
    ["grep", "pattern"]
], cpu=1, memory="500M")

# Builder pattern
job = mpmsub.Job(["cat", "input.txt"]) \
    .pipe_to(["grep", "important"]) \
    .cpu(1).memory("200M")

Job Parameters

Parameter Type Description
cmd List[str] or Pipeline Command to execute (required)
p/cpu/cpus int or str CPU cores needed
m/memory str or int Memory requirement
id str Custom job identifier
cwd str Working directory
env Dict[str, str] Environment variables
timeout float Timeout in seconds
stdout str File path for stdout redirection
stderr str File path for stderr redirection

Memory Formats

  • "1G" - 1 gigabyte
  • "512M" - 512 megabytes
  • "2048K" - 2048 kilobytes
  • 1024 - 1024 megabytes (integer)

Results

results = p.run()
# Returns: {"runtime": 45.2, "jobs": {"total": 10, "completed": 8, "failed": 2}, ...}

# Access completed/failed jobs
for job in p.completed_jobs:
    print(f"{job.id}: {job.runtime:.2f}s, {job.memory_used:.1f}MB")

for job in p.failed_jobs:
    print(f"Failed {job.id}: {job.error}")

Error Handling

import mpmsub

try:
    p = mpmsub.cluster(p=4, m="8G")
    p.jobs.append({"cmd": ["invalid_command"]})
    results = p.run()
except ValueError as e:
    print(f"Invalid parameter: {e}")
except RuntimeError as e:
    print(f"Execution error: {e}")

Best Practices

  • Specify realistic resource limits
  • Use profiling (p.profile()) to optimize memory allocation
  • Always check results for failed jobs
  • Set appropriate timeouts for long-running tasks
  • Use output redirection to capture results