API Reference
Complete API documentation for mpmsub.
Core Functions
cluster()
mpmsub.cluster
Core Cluster class for mpmsub library.
Cluster(cpus=None, memory=None, verbose=True, progress_bar=True)
Main cluster class for managing subprocess execution with memory awareness.
Initialize a compute cluster.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cpus
|
Union[int, str, None]
|
Number of CPUs to use. If None, auto-detects. |
None
|
memory
|
Union[str, int, None]
|
Memory limit (e.g., "16G", "2048M"). If None, auto-detects. |
None
|
verbose
|
bool
|
Whether to print progress information. |
True
|
progress_bar
|
bool
|
Whether to show a progress bar during execution. |
True
|
Source code in mpmsub/cluster.py
completed_jobs
property
Get list of completed jobs.
failed_jobs
property
Get list of failed jobs.
stats
property
Get cluster statistics.
describe_resources()
Print detailed information about cluster resources.
Source code in mpmsub/cluster.py
print_summary()
Print a summary of execution results.
Source code in mpmsub/cluster.py
profile(verbose=True)
Profile jobs by running them sequentially to measure actual resource usage.
This is useful for estimating memory requirements when you don't know them. Jobs are run one at a time (respecting CPU requirements) to get accurate memory measurements without interference.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
verbose
|
bool
|
Whether to print progress information. |
True
|
Returns:
| Type | Description |
|---|---|
List[JobResult]
|
List[JobResult]: Results from profiling run with actual memory usage. |
Source code in mpmsub/cluster.py
run(max_workers=None)
Run all queued jobs with optimal scheduling.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
Optional[int]
|
Maximum number of concurrent jobs. If None, uses cluster CPU limit. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Dict[str, Any]
|
Execution statistics and results. |
Source code in mpmsub/cluster.py
Job(cmd=None, p=None, m=None, id=None, cwd=None, env=None, timeout=None, pipeline=None, stdout=None, stderr=None)
Object-oriented interface for job specification.
Provides a more intuitive way to create jobs with IDE support, while maintaining compatibility with the dictionary interface. Supports both single commands and pipelines.
Create a new job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cmd
|
Union[List[str], Pipeline, None]
|
Command to execute as list of strings, or Pipeline object |
None
|
p
|
Union[int, str, None]
|
Number of CPU cores needed (default: 1) |
None
|
m
|
Union[str, int, None]
|
Memory requirement (e.g., "1G", "512M", default: unlimited) |
None
|
id
|
Optional[str]
|
Custom job identifier (auto-generated if None) |
None
|
cwd
|
Optional[str]
|
Working directory for the job |
None
|
env
|
Optional[Dict[str, str]]
|
Environment variables for the job |
None
|
timeout
|
Optional[float]
|
Timeout in seconds |
None
|
pipeline
|
Optional[List[List[str]]]
|
Alternative way to specify pipeline as list of commands |
None
|
stdout
|
Optional[str]
|
File path to redirect stdout to (optional) |
None
|
stderr
|
Optional[str]
|
File path to redirect stderr to (optional) |
None
|
Source code in mpmsub/cluster.py
__repr__()
String representation of the job.
Source code in mpmsub/cluster.py
cpu(cores)
environment(env_vars)
memory(mem)
pipe_to(next_cmd)
Add another command to the pipeline (builder pattern).
Source code in mpmsub/cluster.py
stderr_to(file_path)
stdout_to(file_path)
to_dict()
Convert to dictionary format for internal use.
Source code in mpmsub/cluster.py
with_id(job_id)
with_timeout(seconds)
JobList(job_queue)
List-like interface for managing jobs.
Source code in mpmsub/cluster.py
__iter__()
__len__()
append(job)
Add a job to the queue. Accepts both Job objects and dictionaries.
Source code in mpmsub/cluster.py
extend(jobs)
Add multiple jobs to the queue. Accepts both Job objects and dictionaries.
JobQueue()
Manage job queue with priority scheduling.
Source code in mpmsub/cluster.py
add_job(job)
Add a job to the queue.
Source code in mpmsub/cluster.py
get_next_job(available_cpus, available_memory)
Get the next job that can run with available resources.
Source code in mpmsub/cluster.py
get_stats()
Get queue statistics.
Source code in mpmsub/cluster.py
mark_completed(result)
Mark a job as completed.
Source code in mpmsub/cluster.py
JobResult(job_id, cmd, returncode=0, stdout='', stderr='', runtime=0.0, memory_used=0.0, cpu_used=1, start_time=0.0, end_time=0.0, success=False, error=None)
dataclass
Result of a completed job.
MemoryMonitor(sampling_interval=0.5)
Monitor memory usage of running processes.
Source code in mpmsub/cluster.py
cleanup(job_id)
get_peak_memory(job_id)
start_monitoring(job_id, process)
Start monitoring a process.
Source code in mpmsub/cluster.py
Pipeline(commands)
Represents a pipeline of commands connected via pipes.
This allows chaining multiple subprocess commands together, similar to shell pipes (cmd1 | cmd2 | cmd3).
Create a new pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
commands
|
List[List[str]]
|
List of commands, where each command is a list of strings. Commands will be piped together in order. |
required |
Examples:
Source code in mpmsub/cluster.py
__repr__()
String representation of the pipeline.
Source code in mpmsub/cluster.py
ProgressBar(total, width=40, show_percent=True)
Simple progress bar using only standard library.
Source code in mpmsub/cluster.py
finish()
ResourceUsage(cpu_slots_used=0, memory_used=0.0, active_jobs=0)
dataclass
Track resource usage over time.
job()
mpmsub.job(cmd, p=None, m=None, cpu=None, cpus=None, memory=None, **kwargs)
Create a new Job object with a concise interface.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cmd
|
List[str]
|
Command to execute as list of strings |
required |
p
|
Optional[Union[int, str]]
|
Number of CPU cores needed (default: 1) |
None
|
m
|
Optional[Union[str, int]]
|
Memory requirement (e.g., "1G", "512M", default: unlimited) |
None
|
cpu
|
Optional[Union[int, str]]
|
Alternative to 'p' - Number of CPU cores needed |
None
|
cpus
|
Optional[Union[int, str]]
|
Alternative to 'p' - Number of CPU cores needed |
None
|
memory
|
Optional[Union[str, int]]
|
Alternative to 'm' - Memory requirement |
None
|
**kwargs
|
Any
|
Additional job parameters (id, cwd, env, timeout, stdout, stderr) |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Job |
Job
|
A new job instance. |
Examples:
>>> import mpmsub
>>> j = mpmsub.job(["echo", "hello"], p=1, m="100M")
>>> j = mpmsub.job(["echo", "hello"], cpu=1, memory="100M") # Alt syntax
>>> j = mpmsub.job(["python", "script.py"], p=2, m="1G", timeout=300)
Source code in mpmsub/__init__.py
pipeline()
mpmsub.pipeline(commands, p=None, m=None, cpu=None, cpus=None, memory=None, **kwargs)
Create a new Job with a pipeline of commands.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
commands
|
List[List[str]]
|
List of commands to pipe together |
required |
p
|
Optional[Union[int, str]]
|
Number of CPU cores needed (default: 1) |
None
|
m
|
Optional[Union[str, int]]
|
Memory requirement (e.g., "1G", "512M", default: unlimited) |
None
|
cpu
|
Optional[Union[int, str]]
|
Alternative to 'p' - Number of CPU cores needed |
None
|
cpus
|
Optional[Union[int, str]]
|
Alternative to 'p' - Number of CPU cores needed |
None
|
memory
|
Optional[Union[str, int]]
|
Alternative to 'm' - Memory requirement |
None
|
**kwargs
|
Any
|
Additional job parameters (id, cwd, env, timeout, stdout, stderr) |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Job |
Job
|
A new job instance with a pipeline. |
Examples:
>>> import mpmsub
>>> j = mpmsub.pipeline([
... ["cat", "file.txt"],
... ["grep", "pattern"],
... ["sort"]
... ], p=1, m="100M")
>>> j = mpmsub.pipeline([
... ["cat", "file.txt"],
... ["grep", "pattern"]
... ], cpu=1, memory="100M") # Alternative syntax
Source code in mpmsub/__init__.py
Core Classes
Cluster
mpmsub.Cluster(cpus=None, memory=None, verbose=True, progress_bar=True)
Main cluster class for managing subprocess execution with memory awareness.
Initialize a compute cluster.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cpus
|
Union[int, str, None]
|
Number of CPUs to use. If None, auto-detects. |
None
|
memory
|
Union[str, int, None]
|
Memory limit (e.g., "16G", "2048M"). If None, auto-detects. |
None
|
verbose
|
bool
|
Whether to print progress information. |
True
|
progress_bar
|
bool
|
Whether to show a progress bar during execution. |
True
|
Source code in mpmsub/cluster.py
completed_jobs
property
Get list of completed jobs.
failed_jobs
property
Get list of failed jobs.
stats
property
Get cluster statistics.
describe_resources()
Print detailed information about cluster resources.
Source code in mpmsub/cluster.py
print_summary()
Print a summary of execution results.
Source code in mpmsub/cluster.py
profile(verbose=True)
Profile jobs by running them sequentially to measure actual resource usage.
This is useful for estimating memory requirements when you don't know them. Jobs are run one at a time (respecting CPU requirements) to get accurate memory measurements without interference.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
verbose
|
bool
|
Whether to print progress information. |
True
|
Returns:
| Type | Description |
|---|---|
List[JobResult]
|
List[JobResult]: Results from profiling run with actual memory usage. |
Source code in mpmsub/cluster.py
run(max_workers=None)
Run all queued jobs with optimal scheduling.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
Optional[int]
|
Maximum number of concurrent jobs. If None, uses cluster CPU limit. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
dict |
Dict[str, Any]
|
Execution statistics and results. |
Source code in mpmsub/cluster.py
Job
mpmsub.Job(cmd=None, p=None, m=None, id=None, cwd=None, env=None, timeout=None, pipeline=None, stdout=None, stderr=None)
Object-oriented interface for job specification.
Provides a more intuitive way to create jobs with IDE support, while maintaining compatibility with the dictionary interface. Supports both single commands and pipelines.
Create a new job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cmd
|
Union[List[str], Pipeline, None]
|
Command to execute as list of strings, or Pipeline object |
None
|
p
|
Union[int, str, None]
|
Number of CPU cores needed (default: 1) |
None
|
m
|
Union[str, int, None]
|
Memory requirement (e.g., "1G", "512M", default: unlimited) |
None
|
id
|
Optional[str]
|
Custom job identifier (auto-generated if None) |
None
|
cwd
|
Optional[str]
|
Working directory for the job |
None
|
env
|
Optional[Dict[str, str]]
|
Environment variables for the job |
None
|
timeout
|
Optional[float]
|
Timeout in seconds |
None
|
pipeline
|
Optional[List[List[str]]]
|
Alternative way to specify pipeline as list of commands |
None
|
stdout
|
Optional[str]
|
File path to redirect stdout to (optional) |
None
|
stderr
|
Optional[str]
|
File path to redirect stderr to (optional) |
None
|
Source code in mpmsub/cluster.py
__repr__()
String representation of the job.
Source code in mpmsub/cluster.py
cpu(cores)
environment(env_vars)
memory(mem)
pipe_to(next_cmd)
Add another command to the pipeline (builder pattern).
Source code in mpmsub/cluster.py
stderr_to(file_path)
stdout_to(file_path)
to_dict()
Convert to dictionary format for internal use.
Source code in mpmsub/cluster.py
with_id(job_id)
with_timeout(seconds)
Pipeline
mpmsub.Pipeline(commands)
Represents a pipeline of commands connected via pipes.
This allows chaining multiple subprocess commands together, similar to shell pipes (cmd1 | cmd2 | cmd3).
Create a new pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
commands
|
List[List[str]]
|
List of commands, where each command is a list of strings. Commands will be piped together in order. |
required |
Examples:
Source code in mpmsub/cluster.py
__repr__()
String representation of the pipeline.
Source code in mpmsub/cluster.py
Utility Functions
mpmsub.parse_memory_string(memory)
Parse memory specification into MB.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
memory
|
Union[str, int, None]
|
Memory specification as string (e.g., "16G", "2048M", "1024") or int (MB), or None for auto-detection. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
int |
Optional[int]
|
Memory in MB, or None if auto-detection requested. |
Examples:
>>> parse_memory_string("16G")
16384
>>> parse_memory_string("2048M")
2048
>>> parse_memory_string("1024")
1024
>>> parse_memory_string(2048)
2048
Source code in mpmsub/utils.py
mpmsub.parse_cpu_string(cpus)
Parse CPU specification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cpus
|
Union[str, int, None]
|
CPU specification as string or int, or None for auto-detection. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
int |
Optional[int]
|
Number of CPUs, or None if auto-detection requested. |
Examples:
Source code in mpmsub/utils.py
mpmsub.format_memory(mb)
Format memory in MB to human-readable string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mb
|
Union[int, float]
|
Memory in MB. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Formatted memory string. |
Examples:
Source code in mpmsub/utils.py
Quick Reference
Creating Clusters
import mpmsub
# Auto-detect resources
p = mpmsub.cluster()
# Specify resources
p = mpmsub.cluster(cpu=4, memory="8G") # Alternative syntax
p = mpmsub.cluster(p=4, m="8G") # Traditional syntax
Creating Jobs
# Dictionary interface
job = {"cmd": ["echo", "hello"], "p": 1, "m": "100M"}
# Object interface
job = mpmsub.Job(["echo", "hello"]).cpu(1).memory("100M")
# Convenience function
job = mpmsub.job(["echo", "hello"], cpu=1, memory="100M")
Creating Pipelines
# Pipeline convenience function
pipeline_job = mpmsub.pipeline([
["cat", "data.txt"],
["grep", "pattern"]
], cpu=1, memory="500M")
# Builder pattern
job = mpmsub.Job(["cat", "input.txt"]) \
.pipe_to(["grep", "important"]) \
.cpu(1).memory("200M")
Job Parameters
| Parameter | Type | Description |
|---|---|---|
cmd |
List[str] or Pipeline |
Command to execute (required) |
p/cpu/cpus |
int or str |
CPU cores needed |
m/memory |
str or int |
Memory requirement |
id |
str |
Custom job identifier |
cwd |
str |
Working directory |
env |
Dict[str, str] |
Environment variables |
timeout |
float |
Timeout in seconds |
stdout |
str |
File path for stdout redirection |
stderr |
str |
File path for stderr redirection |
Memory Formats
"1G"- 1 gigabyte"512M"- 512 megabytes"2048K"- 2048 kilobytes1024- 1024 megabytes (integer)
Results
results = p.run()
# Returns: {"runtime": 45.2, "jobs": {"total": 10, "completed": 8, "failed": 2}, ...}
# Access completed/failed jobs
for job in p.completed_jobs:
print(f"{job.id}: {job.runtime:.2f}s, {job.memory_used:.1f}MB")
for job in p.failed_jobs:
print(f"Failed {job.id}: {job.error}")
Error Handling
import mpmsub
try:
p = mpmsub.cluster(p=4, m="8G")
p.jobs.append({"cmd": ["invalid_command"]})
results = p.run()
except ValueError as e:
print(f"Invalid parameter: {e}")
except RuntimeError as e:
print(f"Execution error: {e}")
Best Practices
- Specify realistic resource limits
- Use profiling (
p.profile()) to optimize memory allocation - Always check results for failed jobs
- Set appropriate timeouts for long-running tasks
- Use output redirection to capture results