Skip to content

Advanced Features

Pipeline Support

mpmsub supports chaining subprocess commands together using pipes, similar to shell pipes (cmd1 | cmd2 | cmd3).

Pipeline Creation

import mpmsub

p = mpmsub.cluster(cpu=4, memory="8G")

# Method 1: Pipeline convenience function
pipeline_job = mpmsub.pipeline([
    ["cat", "dataset.txt"],
    ["grep", "pattern"],
    ["sort", "-n"],
    ["head", "-20"]
], cpu=1, memory="500M", stdout="results.txt")

# Method 2: Builder pattern with pipe_to()
log_analysis = mpmsub.Job(["cat", "/var/log/access.log"]) \
    .pipe_to(["grep", "ERROR"]) \
    .pipe_to(["sort"]) \
    .pipe_to(["uniq", "-c"]) \
    .cpu(1).memory("200M") \
    .stdout_to("error_summary.txt")

p.jobs.extend([pipeline_job, log_analysis])
p.run()

Pipeline Features

  • Automatic piping between commands
  • Error handling for individual commands
  • Memory monitoring across the entire pipeline
  • Output redirection for final command
  • Resource scheduling like any other job

Output Redirection

Automatically redirect subprocess stdout and stderr to files.

import mpmsub

p = mpmsub.cluster(cpu=4, memory="8G")

# Dictionary interface
p.jobs.append({
    "cmd": ["python", "analysis.py"],
    "cpu": 2, "memory": "2G",
    "stdout": "results.txt", "stderr": "errors.txt"
})

# Job object interface
job = mpmsub.Job(["python", "ml_script.py"]) \
    .cpu(4).memory("4G") \
    .stdout_to("output.txt").stderr_to("errors.txt")

# Pipeline output redirection (applies to final command)
pipeline = mpmsub.pipeline([
    ["cat", "data.csv"],
    ["python", "process.py"]
], cpu=2, memory="3G", stdout="report.txt")

p.jobs.extend([job, pipeline])
p.run()

Flexible API

mpmsub supports multiple parameter names for better usability:

import mpmsub

# CPU parameter names (all equivalent)
job1 = mpmsub.job(["echo", "test"], p=4)           # Traditional
job2 = mpmsub.job(["echo", "test"], cpu=4)         # Alternative
job3 = mpmsub.job(["echo", "test"], cpus=4)        # Alternative

# Memory parameter names (all equivalent)
job4 = mpmsub.job(["echo", "test"], m="1G")        # Traditional
job5 = mpmsub.job(["echo", "test"], memory="1G")   # Alternative

# Mixed usage is supported
job6 = mpmsub.job(["echo", "test"], cpu=2, m="1G")

# Cluster creation
p = mpmsub.cluster(cpu=4, memory="8G")             # Alternative syntax
p = mpmsub.cluster(p=4, m="8G")                    # Traditional syntax

Resource Information

# Show system resources when creating cluster
p = mpmsub.cluster(describe=True)

# Or call describe_resources() method
p.describe_resources()

Example Workflows

Bioinformatics Pipeline

import mpmsub

p = mpmsub.cluster(cpu=8, memory="32G")

# Quality control pipeline
qc_pipeline = mpmsub.pipeline([
    ["fastqc", "raw_reads.fastq"],
    ["trimmomatic", "SE", "raw_reads.fastq", "trimmed.fastq", "TRAILING:20"]
], cpu=4, memory="4G", stderr="qc_errors.txt")

# Assembly job
assembly = mpmsub.Job(["spades.py", "--single", "trimmed.fastq", "-o", "assembly"]) \
    .cpu(8).memory("16G").stdout_to("assembly.log")

p.jobs.extend([qc_pipeline, assembly])
p.run()

Data Science Workflow

import mpmsub

p = mpmsub.cluster(cpu=6, memory="16G")

# Preprocessing pipeline
preprocessing = mpmsub.pipeline([
    ["python", "download_data.py"],
    ["python", "clean_data.py"]
], cpu=2, memory="4G", stdout="preprocessing.log")

# Model training
models = [
    mpmsub.job(["python", "train.py", "--model", "rf"], cpu=2, memory="3G"),
    mpmsub.job(["python", "train.py", "--model", "xgb"], cpu=2, memory="3G")
]

p.jobs.append(preprocessing)
p.jobs.extend(models)
p.run()