Skip to content

Adding Jobs

mpmsub provides multiple interfaces for creating and managing jobs. Choose the approach that best fits your needs.

Job Interfaces

Interface Best For Example
Dictionary Simple jobs, existing code {"cmd": [...], "p": 1}
Job Object Complex jobs, IDE support Job([...]).cpu(1).memory("1G")
Convenience Function Quick creation job([...], p=1, m="1G")
Pipeline Chained commands pipeline([["cat"], ["grep"]], ...)

Dictionary Interface

import mpmsub

p = mpmsub.cluster(p=4, m="8G")

# Simple job
p.jobs.append({"cmd": ["echo", "hello"]})

# Job with resources and output redirection
p.jobs.append({
    "cmd": ["python", "analysis.py"],
    "p": 2, "m": "1G", "timeout": 300,
    "cwd": "/data/analysis",
    "env": {"PYTHONPATH": "/custom/libs"},
    "stdout": "output.txt", "stderr": "errors.txt"
})

Job Object Interface

Object-oriented approach with builder pattern:

import mpmsub

# Builder pattern (fluent interface)
job = mpmsub.Job(["python", "script.py"]) \
    .cpu(2).memory("1G").with_timeout(300) \
    .stdout_to("output.txt").stderr_to("errors.txt")

# Step-by-step building
job = mpmsub.Job(["python", "analysis.py"])
job.cpu(1).memory("2G").working_dir("/data/analysis")
job.environment({"PYTHONPATH": "/custom/libs"})

p = mpmsub.cluster(p=4, m="8G")
p.jobs.extend([job1, job2])

Convenience Function

Quick job creation with a functional approach:

import mpmsub

# Quick job creation
job1 = mpmsub.job(["echo", "hello"], cpu=1, memory="100M")
job2 = mpmsub.job(["python", "script.py"], cpus=2, memory="1G",
                  timeout=300, stdout="output.txt")

p = mpmsub.cluster(p=4, m="8G")
p.jobs.extend([job1, job2])

Pipeline Interface

Chain subprocess commands together with automatic piping:

import mpmsub

p = mpmsub.cluster(cpu=4, memory="8G")

# Pipeline convenience function
pipeline_job = mpmsub.pipeline([
    ["cat", "data.txt"],
    ["grep", "pattern"],
    ["sort"]
], cpu=1, memory="500M", stdout="results.txt")

# Builder pattern with pipe_to()
job = mpmsub.Job(["cat", "input.txt"]) \
    .pipe_to(["grep", "important"]) \
    .pipe_to(["sort"]) \
    .cpu(1).memory("200M") \
    .stdout_to("results.txt")

p.jobs.extend([pipeline_job, job])

Job Parameters

Parameter Type Default Description
cmd List[str] or Pipeline Required Command and arguments to execute
p/cpu/cpus int or str 1 Number of CPU cores
m/memory str or int None Memory limit (e.g., "1G", "512M", 1024)
id str Auto-generated Custom job identifier
cwd str None Working directory
env Dict[str, str] None Environment variables
timeout float None Timeout in seconds
stdout str None File path for stdout redirection
stderr str None File path for stderr redirection

Memory Formats

# String formats
"1G"     # 1 gigabyte
"512M"   # 512 megabytes
"2048K"  # 2048 kilobytes

# Integer (always MB)
1024     # 1024 megabytes

Output Redirection

# Dictionary interface
{"cmd": ["python", "script.py"], "stdout": "output.txt", "stderr": "errors.txt"}

# Job object interface
mpmsub.Job(["python", "script.py"]).stdout_to("output.txt").stderr_to("errors.txt")

Mixed Interface Usage

You can freely mix all interfaces:

import mpmsub

p = mpmsub.cluster(cpu=4, memory="8G")

jobs = [
    {"cmd": ["echo", "dict job"], "cpu": 1, "memory": "100M"},
    mpmsub.Job(["echo", "object job"]).cpu(1).memory("150M"),
    mpmsub.job(["echo", "convenience job"], cpus=1, memory="200M"),
    mpmsub.pipeline([["cat", "data.txt"], ["grep", "pattern"]], cpu=1)
]

p.jobs.extend(jobs)
results = p.run()

Defaults and Validation

Default Values: - p: 1 (one CPU core) - m: None (unlimited memory) - id: Auto-generated - Other parameters inherit from parent process

Jobs are validated when added - invalid commands, CPU counts, or memory formats will raise ValueError.

Best Practices

  • Simple jobs: Use dictionary or convenience function
  • Complex jobs: Use Job objects with builder pattern
  • Batch creation: Use list comprehensions
  • Resource specification: Be specific about CPU and memory needs
  • Profiling: Use p.profile() to optimize resource allocation
  • Error prevention: Set reasonable timeouts and validate inputs
  • Job tracking: Use custom IDs for important jobs

Examples

# Simple batch processing
jobs = [mpmsub.job(["process", f"file_{i}.txt"], p=1, m="500M")
        for i in range(100)]

# Complex analysis with tracking
job = mpmsub.Job(["python", "analysis.py"]) \
    .cpu(4).memory("8G") \
    .working_dir("/data") \
    .with_timeout(3600) \
    .with_id("main_analysis")

# Manual job dependencies
p1 = mpmsub.cluster(p=4, m="8G")
p1.jobs.extend(preprocessing_jobs)
results1 = p1.run()

if results1['jobs']['failed'] == 0:
    p2 = mpmsub.cluster(p=4, m="8G")
    p2.jobs.extend(analysis_jobs)
    results2 = p2.run()