Skip to content

API Reference

BUSCOlite provides a Python API for programmatic access to BUSCO analysis functionality.

Main Modules

busco

The main module for running BUSCO analysis.

busco

load_config(lineage)

Load the BUSCO dataset configuration file.

Parameters

lineage : str Path to the BUSCO lineage directory containing the dataset.cfg file

Returns

dict Dictionary containing the configuration parameters from dataset.cfg

Source code in buscolite/busco.py
def load_config(lineage):
    """
    Load the BUSCO dataset configuration file.

    Parameters
    ----------
    lineage : str
        Path to the BUSCO lineage directory containing the dataset.cfg file

    Returns
    -------
    dict
        Dictionary containing the configuration parameters from dataset.cfg
    """
    config = {}
    with open(os.path.join(lineage, "dataset.cfg"), "r") as infile:
        for line in infile:
            key, value = line.rstrip().split("=")
            config[key] = value
    return config

load_cutoffs(lineage)

Load the BUSCO score and length cutoffs from the lineage directory.

Parameters

lineage : str Path to the BUSCO lineage directory containing the cutoff files

Returns

dict Dictionary containing the score and length cutoffs for each BUSCO model Format: {busco_id: {"score": float, "sigma": float, "length": int}}

Source code in buscolite/busco.py
def load_cutoffs(lineage):
    """
    Load the BUSCO score and length cutoffs from the lineage directory.

    Parameters
    ----------
    lineage : str
        Path to the BUSCO lineage directory containing the cutoff files

    Returns
    -------
    dict
        Dictionary containing the score and length cutoffs for each BUSCO model
        Format: {busco_id: {"score": float, "sigma": float, "length": int}}
    """
    cutoffs = {}
    with open(os.path.join(lineage, "scores_cutoff"), "r") as infile:
        for line in infile:
            busco, score = line.rstrip().split("\t")
            if busco not in cutoffs:
                cutoffs[busco] = {"score": float(score)}
            else:
                cutoffs[busco]["score"] = float(score)
    # this file is not there anymore in odb12
    length_cutoffs = os.path.join(lineage, "lengths_cutoff")
    if os.path.isfile(length_cutoffs):
        with open(length_cutoffs, "r") as infile:
            for line in infile:
                busco, _, sigma, length = line.rstrip().split("\t")
                if float(sigma) == 0.0:
                    sigma = 1
                if busco not in cutoffs:
                    cutoffs[busco] = {
                        "sigma": float(sigma),
                        "length": int(float(length)),
                    }
                else:
                    cutoffs[busco]["sigma"] = float(sigma)
                    cutoffs[busco]["length"] = int(float(length))
    return cutoffs

check_lineage(lineage)

Verify that the BUSCO lineage directory contains all required files and directories.

Parameters

lineage : str Path to the BUSCO lineage directory

Returns

tuple (bool, str) - Boolean indicating if the lineage is valid, and an error message if not

Source code in buscolite/busco.py
def check_lineage(lineage):
    """
    Verify that the BUSCO lineage directory contains all required files and directories.

    Parameters
    ----------
    lineage : str
        Path to the BUSCO lineage directory

    Returns
    -------
    tuple
        (bool, str) - Boolean indicating if the lineage is valid, and an error message if not
    """
    lineage = os.path.abspath(lineage)
    if not os.path.isdir(lineage):
        return False, "{} is not a directory".format(lineage)
    dirs = ["prfl", "hmms"]
    files = [
        "ancestral",
        "ancestral_variants",
        "dataset.cfg",
        "scores_cutoff",
    ]
    for d in dirs:
        if not os.path.isdir(os.path.join(lineage, d)):
            return False, "{} directory was not found in {}".format(d, lineage)
    for f in files:
        if not os.path.isfile(os.path.join(lineage, f)):
            return False, "{} file is missing from {}".format(f, lineage)
    return True, ""

predict_and_validate(fadict, contig, prfl, cutoffs, species, start, end, strand, configpath, blast_score)

Predict and validate protein coding regions using Augustus and HMMER.

Parameters:

Name Type Description Default
fadict dict

Dictionary containing contig sequences.

required
contig str

Name of the contig.

required
prfl str

Path to the protein profile file.

required
cutoffs dict

Dictionary of cutoff scores for each protein.

required
species str

Species name for Augustus prediction.

required
start int

Start position of the region.

required
end int

End position of the region.

required
strand str

Strand of the region ('+' or '-') for prediction.

required
configpath str

Path to the configuration file for Augustus.

required
blast_score float

BLAST score threshold for validation.

required

Returns:

Name Type Description
tuple

A tuple containing the BUSCO name and the final prediction dictionary if a valid prediction is found, otherwise False.

Source code in buscolite/busco.py
def predict_and_validate(
    fadict,
    contig,
    prfl,
    cutoffs,
    species,
    start,
    end,
    strand,
    configpath,
    blast_score,
):
    """
    Predict and validate protein coding regions using Augustus and HMMER.

    Args:
        fadict (dict): Dictionary containing contig sequences.
        contig (str): Name of the contig.
        prfl (str): Path to the protein profile file.
        cutoffs (dict): Dictionary of cutoff scores for each protein.
        species (str): Species name for Augustus prediction.
        start (int): Start position of the region.
        end (int): End position of the region.
        strand (str): Strand of the region ('+' or '-') for prediction.
        configpath (str): Path to the configuration file for Augustus.
        blast_score (float): BLAST score threshold for validation.

    Returns:
        tuple: A tuple containing the BUSCO name and the final prediction dictionary if a valid prediction is found, otherwise False.
    """
    # augustus no longer accepts fasta from stdin, so write tempfile
    t_fasta = tempfile.NamedTemporaryFile(suffix=".fasta")
    with open(t_fasta.name, "w") as f:
        f.write(">{}\n{}\n".format(contig, fadict[contig][start:end]))

    # run augustus on the regions
    aug_preds = proteinprofile(
        t_fasta.name,
        prfl,
        species=species,
        start=start,
        end=end,
        strand=strand,
        configpath=configpath,
    )
    t_fasta.close()
    busco_name = os.path.basename(prfl).split(".")[0]
    if len(aug_preds) > 0:
        hmmfile = os.path.join(
            os.path.dirname(os.path.dirname(prfl)), "hmms", "{}.hmm".format(busco_name)
        )
        for k, v in aug_preds.items():
            transcript = getSeqRegions(fadict, v["contig"], v["coords"])
            if v["strand"] == "+":
                protein = translate(transcript, v["strand"], v["phase"][0])
            else:
                protein = translate(transcript, v["strand"], v["phase"][-1])
            if protein.startswith("M") and protein.endswith("*"):
                status = "complete"
            else:
                status = "fragmented"
            # now we can check via hmmer
            hmm_result = hmmer_search_single(hmmfile, protein.rstrip("*"))
            if len(hmm_result) > 0:
                if hmm_result[0]["bitscore"] > cutoffs[busco_name]["score"]:
                    final = {
                        "contig": v["contig"],
                        "strand": v["strand"],
                        "location": v["location"],
                        "coords": v["coords"],
                        "phase": v["phase"],
                        "transcript": transcript,
                        "translation": protein,
                        "status": status,
                        "hmmer": hmm_result[0],
                        "miniprot_score": blast_score,
                    }
                    return (busco_name, final)
    return False

runbusco(input, lineage, mode='genome', species='anidulans', cpus=1, offset=2000, verbosity=3, logger=False, check_augustus=True)

Run BUSCO analysis on genome or protein sequences.

Parameters: - input (str): Path to the input genome or protein sequences. - lineage (str): Path to the BUSCO lineage directory. - mode (str, optional): Analysis mode, either 'genome' or 'proteins'. Defaults to 'genome'. - species (str, optional): Species name for Augustus prediction. Defaults to 'anidulans'. - cpus (int, optional): Number of CPUs to use. Defaults to 1. - offset (int, optional): Offset value for sequence extraction. Defaults to 2000. - verbosity (int, optional): Level of verbosity for logging. Defaults to 3. - logger (bool or logger object, optional): Custom logger object. Defaults to False. - check_augustus (bool, optional): Flag to check Augustus functionality. Defaults to True.

Returns: - b_final (dict): Final BUSCO results. - missing (list): List of BUSCOs not found. - stats (dict): Statistics of BUSCO analysis. - Config (dict): Parsed configuration details.

Source code in buscolite/busco.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
def runbusco(
    input,
    lineage,
    mode="genome",
    species="anidulans",
    cpus=1,
    offset=2000,
    verbosity=3,
    logger=False,
    check_augustus=True,
):
    """
    Run BUSCO analysis on genome or protein sequences.

    Parameters:
    - input (str): Path to the input genome or protein sequences.
    - lineage (str): Path to the BUSCO lineage directory.
    - mode (str, optional): Analysis mode, either 'genome' or 'proteins'. Defaults to 'genome'.
    - species (str, optional): Species name for Augustus prediction. Defaults to 'anidulans'.
    - cpus (int, optional): Number of CPUs to use. Defaults to 1.
    - offset (int, optional): Offset value for sequence extraction. Defaults to 2000.
    - verbosity (int, optional): Level of verbosity for logging. Defaults to 3.
    - logger (bool or logger object, optional): Custom logger object. Defaults to False.
    - check_augustus (bool, optional): Flag to check Augustus functionality. Defaults to True.

    Returns:
    - b_final (dict): Final BUSCO results.
    - missing (list): List of BUSCOs not found.
    - stats (dict): Statistics of BUSCO analysis.
    - Config (dict): Parsed configuration details.
    """
    if not logger:
        logger = startLogging()
    # check that the lineage path/files all present
    check, msg = check_lineage(lineage)
    if not check:
        logger.error("Error: {}".format(msg))
        raise SystemExit(1)

    # parse the configs and cutoffs
    Config = load_config(lineage)
    CutOffs = load_cutoffs(lineage)

    if mode == "genome":
        # check augustus functionality
        aug_version = augustus_version()
        if verbosity >= 2:
            logger.info(
                "BUSCOlite v{}; Augustus v{}; miniprot v{}; pyhmmer v{}; pyfastx v{}".format(
                    __version__,
                    aug_version,
                    miniprot_version(),
                    pyhmmer_version(),
                    pyfastx.__version__,
                )
            )
        if check_augustus:
            if not augustus_functional():
                logger.error(
                    "Augustus PPX (--proteinprofile) is non-functional. Usually caused by compilation errors."
                )
                raise SystemExit(1)
        if verbosity >= 1:
            logger.info("{} lineage contains {} BUSCO models".format(Config["name"], len(CutOffs)))

        # load genome into dictionary
        seq_records = fasta2dict(input)
        fa_stats = dict2stats(seq_records)
        if verbosity >= 2:
            logger.info(
                "Input genome is {} MB and {} contigs".format(
                    round(fa_stats["size"] / 1e6, 2), fa_stats["n"]
                )
            )

        # run miniprot filter from ancestral proteins + variants (combined for efficiency)
        # We need to track which hits come from ancestral vs variants for prioritization
        # Priority order: ancestral complete > ancestral incomplete > variant incomplete

        # First, run miniprot on ancestral sequences
        ancestral = os.path.join(lineage, "ancestral")
        if verbosity >= 1:
            logger.info("Prefiltering predictions using miniprot of ancestral sequences")
        complete_ancestral, coords_ancestral, njobs_ancestral = miniprot_prefilter(
            input, ancestral, CutOffs, cpus=cpus, buscodb=lineage
        )

        # Tag ancestral coords with priority=1 (highest)
        for busco_id in coords_ancestral:
            for region in coords_ancestral[busco_id]:
                region["priority"] = 1
                region["source"] = "ancestral"

        # Second, run miniprot on ancestral_variants for BUSCOs not yet complete
        ancestral_variants = os.path.join(lineage, "ancestral_variants")
        complete_variants = {}
        coords_variants = {}
        njobs_variants = 0

        if os.path.isfile(ancestral_variants):
            # Only search for BUSCOs not already complete from ancestral
            missing = [b for b in CutOffs.keys() if b not in complete_ancestral]

            if len(missing) > 0:
                # Create filtered variants file with only missing BUSCOs
                filt_variants = tempfile.NamedTemporaryFile(mode="w", suffix=".fasta", delete=False)
                avariants = fasta2dict(ancestral_variants, full_header=True)
                seen = set()
                for title, seq in avariants.items():
                    # Parse the BUSCO ID from the variant header
                    try:
                        busco_id, variant_num = title.split(" ")
                    except ValueError:
                        if "_" in title:
                            busco_id, variant_num = title.rsplit("_", 1)
                        else:
                            busco_id = title
                            variant_num = "0"
                    if busco_id in missing:
                        seen.add(busco_id)
                        # Write with normalized header
                        filt_variants.write(
                            ">{} {}\n{}\n".format(busco_id, variant_num, softwrap(seq))
                        )
                filt_variants.close()

                if verbosity >= 2:
                    logger.info(
                        "Trying ancestral variants for {} BUSCOs not complete from ancestral".format(
                            len(seen)
                        )
                    )

                complete_variants, coords_variants, njobs_variants = miniprot_prefilter(
                    input, filt_variants.name, CutOffs, cpus=cpus, buscodb=lineage
                )

                # Clean up temp file
                os.unlink(filt_variants.name)

                # Tag variant coords with priority=2 (lower than ancestral)
                for busco_id in coords_variants:
                    for region in coords_variants[busco_id]:
                        region["priority"] = 2
                        region["source"] = "ancestral_variants"

        # Combine complete models from both sources
        complete = complete_ancestral.copy()
        for busco_id, models in complete_variants.items():
            if busco_id not in complete:
                complete[busco_id] = models
            else:
                complete[busco_id].extend(models)

        # Combine coords, merging and sorting by priority
        coords = {}
        for busco_id in set(list(coords_ancestral.keys()) + list(coords_variants.keys())):
            coords[busco_id] = []
            if busco_id in coords_ancestral:
                coords[busco_id].extend(coords_ancestral[busco_id])
            if busco_id in coords_variants:
                coords[busco_id].extend(coords_variants[busco_id])
            # Sort by priority (1=ancestral first, 2=variants second)
            coords[busco_id].sort(key=lambda x: x["priority"])

        njobs = njobs_ancestral + njobs_variants

        if verbosity >= 1:
            logger.info(
                "Found {} complete models from miniprot, now launching {} augustus/pyhmmer [species={}] jobs for {} BUSCO models".format(
                    len(complete), njobs, species, len(coords)
                )
            )

        # Helper function to process all jobs for a single BUSCO with early exit
        def process_busco_jobs(busco_id, job_list, seq_records, cutoffs, species):
            """
            Process all jobs for a single BUSCO in priority order.
            Returns early if a complete model is found.

            Returns:
                tuple: (busco_id, results_list, jobs_submitted, jobs_skipped)
            """
            results = []
            jobs_submitted = 0
            jobs_skipped = 0

            for job in job_list:
                # Run Augustus + HMMER validation
                result = predict_and_validate(
                    seq_records,
                    job["contig"],
                    job["prfl"],
                    cutoffs,
                    species,
                    job["start"],
                    job["end"],
                    job["strand"],
                    False,
                    job["score"],
                )
                jobs_submitted += 1

                if isinstance(result, tuple):
                    b, res = result
                    results.append(res)

                    # Early exit: if we found a COMPLETE model, skip remaining jobs
                    if res.get("status") == "complete":
                        remaining_jobs = len(job_list) - (job_list.index(job) + 1)
                        jobs_skipped = remaining_jobs
                        break  # Exit early!

            return (busco_id, results, jobs_submitted, jobs_skipped)

        # run busco analysis using threadpool with early exit optimization
        # Process multiple BUSCOs in parallel, each with its own early exit logic
        if len(complete) > 0:
            b_results = complete
        else:
            b_results = {}

        # Track which BUSCOs are already complete from miniprot
        found_complete = set(complete.keys())

        # Group jobs by BUSCO ID and priority for early exit
        jobs_by_busco = {}
        for k, v in coords.items():
            busco_prlf = os.path.join(lineage, "prfl", "{}.prfl".format(k))
            if not os.path.isfile(busco_prlf):
                continue
            jobs_by_busco[k] = []
            for i in v:
                contig = i["contig"]
                start = i["coords"][0] - offset
                if start < 0:
                    start = 0
                end = i["coords"][1] + offset
                if end > len(seq_records[contig]):
                    end = len(seq_records[contig])
                if i["strand"] == "+":
                    aug_strand = "forward"
                elif i["strand"] == "-":
                    aug_strand = "reverse"
                else:
                    aug_strand = "both"
                jobs_by_busco[k].append(
                    {
                        "contig": contig,
                        "prfl": busco_prlf,
                        "start": start,
                        "end": end,
                        "strand": aug_strand,
                        "score": i["score"],
                        "priority": i["priority"],
                    }
                )

        # Submit one task per BUSCO (each task processes multiple jobs with early exit)
        # This allows parallel processing across BUSCOs while maintaining early exit within each
        total_jobs_submitted = 0
        total_jobs_skipped = 0
        futures = []

        with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
            for busco_id, job_list in jobs_by_busco.items():
                # Skip if already complete from miniprot
                if busco_id in found_complete:
                    total_jobs_skipped += len(job_list)
                    continue

                # Submit this BUSCO's job processor to the executor
                future = executor.submit(
                    process_busco_jobs,
                    busco_id,
                    job_list,
                    seq_records,
                    CutOffs,
                    species,
                )
                futures.append(future)

            # Collect results as they complete
            for future in concurrent.futures.as_completed(futures):
                busco_id, results, jobs_submitted, jobs_skipped = future.result()
                total_jobs_submitted += jobs_submitted
                total_jobs_skipped += jobs_skipped

                # Add results to b_results
                if busco_id not in b_results:
                    b_results[busco_id] = results
                else:
                    b_results[busco_id].extend(results)

        if verbosity >= 2:
            logger.info(
                "Augustus completed: {} jobs submitted, {} jobs skipped via early exit".format(
                    total_jobs_submitted, total_jobs_skipped
                )
            )

        # Apply BUSCO v6-style filtering
        # NOTE: In genome mode, we skip the duplicate gene filter because:
        # 1. We already handle overlapping genes later (line 584)
        # 2. Using gene span for duplicate detection is too strict - different BUSCOs
        #    can legitimately predict overlapping genes
        # 3. BUSCO v6 uses exon-level overlap checking, which is more sophisticated
        if verbosity >= 2:
            logger.info(
                "Found {} matches from Augustus/HMMER".format(
                    sum(len(v) for v in b_results.values())
                )
            )

        # Step 2: Remove matches scoring less than 85% of top bitscore for each BUSCO
        # NOTE: Temporarily disabled - this filter may be too aggressive for genome mode
        # TODO: Investigate if BUSCO v6 actually applies this in genome mode with Augustus
        # For genome mode, the bitscore is in the nested 'hmmer' dict
        # Keep reference to original matches before flattening
        # b_results_original = {}
        # b_results_for_filter = {}
        # for busco_id, matches in b_results.items():
        #     b_results_original[busco_id] = matches  # Keep original
        #     b_results_for_filter[busco_id] = []
        #     for match in matches:
        #         # Create a flattened version for filtering
        #         flat_match = match.copy()
        #         if "hmmer" in match:
        #             flat_match["bitscore"] = match["hmmer"]["bitscore"]
        #         b_results_for_filter[busco_id].append(flat_match)

        # filtered = filter_low_scoring_matches(b_results_for_filter, threshold=0.85)

        # # Map back to original structure
        # b_results = {}
        # for busco_id, matches in filtered.items():
        #     b_results[busco_id] = []
        #     for match in matches:
        #         # Find the corresponding original match using object identity
        #         # The filtered matches are the same objects from b_results_for_filter
        #         for i, flat_match in enumerate(b_results_for_filter[busco_id]):
        #             if flat_match is match:
        #                 # Use the original match from b_results_original
        #                 b_results[busco_id].append(b_results_original[busco_id][i])
        #                 break

        # matches_after_85_filter = sum(len(v) for v in b_results.values())
        # if verbosity >= 2:
        #     logger.info(
        #         "After 85% bitscore filter: {} matches remain ({} removed)".format(
        #             matches_after_85_filter, matches_after_dup_filter - matches_after_85_filter
        #         )
        #     )

        # finally loop through results, classify and reorganize
        b_final = {}
        missing = []
        for b in CutOffs.keys():
            # Count as missing if not in results OR if results list is empty (filtered out)
            if b not in b_results or len(b_results[b]) == 0:
                missing.append(b)
        stats = {
            "total": len(CutOffs),
            "single-copy": 0,
            "fragmented": 0,
            "duplicated": 0,
            "missing": len(missing),
        }
        for k, v in natsorted(b_results.items()):
            # Skip empty lists (can happen after filtering)
            if len(v) == 0:
                continue
            elif len(v) > 1:  # duplicate processing here, overlapping regions are not duplicates
                dp = []
                for i, x in enumerate(
                    sorted(v, key=lambda y: y["hmmer"]["bitscore"], reverse=True)
                ):
                    # if first item in list then highest score
                    if i == 0:
                        dp.append(x)
                    else:
                        if x["contig"] in [a["contig"] for a in dp]:  # means contig already there
                            if not any_overlap(x["location"], [a["location"] for a in dp]):
                                dp.append(x)
                # these are actually duplicated
                if len(dp) > 1:
                    for y, z in enumerate(dp):
                        z["status"] = "duplicated"
                        if y > 0:
                            name = "{}_{}".format(k, y)
                        else:
                            name = k
                        stats["duplicated"] += 1
                        b_final[name] = z
                elif len(dp) == 1:
                    if dp[0]["status"] == "fragmented":
                        stats["fragmented"] += 1
                    else:
                        stats["single-copy"] += 1
                    b_final[k] = dp[0]
                # else: dp is empty, skip this BUSCO
            else:  # len(v) == 1
                if v[0]["status"] == "fragmented":
                    stats["fragmented"] += 1
                else:
                    stats["single-copy"] += 1
                b_final[k] = v[0]
        return b_final, missing, stats, Config

    elif mode == "proteins":
        if verbosity >= 2:
            logger.info("BUSCOlite v{}; pyhmmer v{}".format(__version__, pyhmmer.__version__))
        if verbosity >= 1:
            logger.info("{} lineage contains {} BUSCO models".format(Config["name"], len(CutOffs)))
        # load proteome into easel digitized sequence to pass to pyhmmer
        falengths = fasta2lengths(input)
        alphabet = pyhmmer.easel.Alphabet.amino()
        sequences = []
        with pyhmmer.easel.SequenceFile(input, digital=True, alphabet=alphabet) as seq_file:
            sequences = list(seq_file)
        if verbosity >= 1:
            logger.info("Loaded {} protein sequences from {}".format(len(sequences), input))
        # now we can loop over the hmms in the lineage and run hmmer on each
        results = []
        hmmfilelookup = {}
        with concurrent.futures.ThreadPoolExecutor(max_workers=cpus + 2) as executor:
            for f in os.listdir(os.path.join(lineage, "hmms")):
                if f.endswith(".hmm"):
                    if "at" in f:
                        hmmfilelookup[f.split("at")[0]] = f.split(".hmm")[0]
                    hmmfile = os.path.join(lineage, "hmms", f)
                    results.append(executor.submit(hmmer_search, hmmfile, sequences))
        # process mt results
        # danger with odb12, the hmm name does not match the file name in scores_cutoffs.... #dumb
        b_results = {}
        for r in results:
            if isinstance(r.result(), list):
                for x in r.result():
                    if x["name"] not in CutOffs:
                        if x["name"] in hmmfilelookup:
                            modelName = hmmfilelookup[x["name"]]
                        else:
                            modelName = x["name"]
                    else:
                        modelName = x["name"]
                    if x["bitscore"] > CutOffs[modelName]["score"]:
                        if modelName not in b_results:
                            b_results[modelName] = [x]
                        else:
                            b_results[modelName].append(x)

        # Apply BUSCO v6-style filtering
        # Step 1: Remove matches scoring less than 85% of top bitscore for each BUSCO
        if verbosity >= 2:
            logger.info(
                "Filtering {} initial matches using 85% bitscore threshold".format(
                    sum(len(v) for v in b_results.values())
                )
            )
        b_results = filter_low_scoring_matches(b_results, threshold=0.85)

        # Step 2: Remove duplicate gene matches (same gene matching multiple BUSCOs)
        b_results = remove_duplicate_gene_matches(b_results, score_key="bitscore")
        if verbosity >= 2:
            logger.info(
                "After filtering: {} matches remain".format(sum(len(v) for v in b_results.values()))
            )

        b_final = {}
        missing = []
        for b in CutOffs.keys():
            if b not in b_results:
                missing.append(b)
        stats = {
            "total": len(CutOffs),
            "single-copy": 0,
            "fragmented": 0,
            "duplicated": 0,
            "missing": len(missing),
        }
        for k, v in natsorted(b_results.items()):
            if len(v) > 1:  # duplicates
                for i, x in enumerate(sorted(v, key=lambda y: y["bitscore"], reverse=True)):
                    x["status"] = "duplicated"
                    x["length"] = falengths[x["hit"]]
                    if i > 0:
                        name = "{}_{}".format(k, i)
                    else:
                        name = k
                        stats["duplicated"] += 1
                    b_final[name] = x
            else:
                x = v[0]
                x["length"] = falengths[x["hit"]]
                if "status" in x and x["status"] == "fragmented":
                    stats["fragmented"] += 1
                    x["status"] = "fragmented"
                else:
                    x["status"] = "complete"
                    stats["single-copy"] += 1
                b_final[k] = x

        return b_final, missing, stats, Config

HMM search functionality using pyhmmer.

search

tblastn_version()

Get the version of tblastn installed on the system.

Returns

str The version string of tblastn

Source code in buscolite/search.py
def tblastn_version():
    """
    Get the version of tblastn installed on the system.

    Returns
    -------
    str
        The version string of tblastn
    """
    p1 = subprocess.Popen(
        ["tblastn", "-version"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        universal_newlines=True,
    )
    vers = p1.communicate()[0].split("+")[0]
    vers = vers.split(" ")[-1]
    return vers

miniprot_version()

Get the version of miniprot installed on the system.

Returns

str The version string of miniprot

Source code in buscolite/search.py
def miniprot_version():
    """
    Get the version of miniprot installed on the system.

    Returns
    -------
    str
        The version string of miniprot
    """
    p1 = subprocess.Popen(
        ["miniprot", "--version"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        universal_newlines=True,
    )
    vers = p1.communicate()[0].rstrip()
    return vers

pyhmmer_version()

Get the version of pyhmmer installed on the system.

Returns

str The version string of pyhmmer

Source code in buscolite/search.py
def pyhmmer_version():
    """
    Get the version of pyhmmer installed on the system.

    Returns
    -------
    str
        The version string of pyhmmer
    """
    return pyhmmer.__version__

miniprot_prefilter(input, query, cutoffs, tmpdir=False, cpus=1, maxhits=3, maxintron=10000, buscodb='.')

Prefilter alignments using miniprot for BUSCO analysis.

Parameters

input : str Path to the input genome FASTA file query : str Path to the query protein FASTA file cutoffs : dict Dictionary containing score cutoffs for each BUSCO model tmpdir : str or bool, optional Path to temporary directory, or False to use system default cpus : int, optional Number of CPU threads to use (default: 1) maxhits : int, optional Maximum number of hits to return per query (default: 3) maxintron : int, optional Maximum intron size (default: 10000) buscodb : str, optional Path to the BUSCO database directory (default: ".")

Returns

dict Dictionary containing filtered alignment results

Source code in buscolite/search.py
def miniprot_prefilter(
    input, query, cutoffs, tmpdir=False, cpus=1, maxhits=3, maxintron=10000, buscodb="."
):
    """
    Prefilter alignments using miniprot for BUSCO analysis.

    Parameters
    ----------
    input : str
        Path to the input genome FASTA file
    query : str
        Path to the query protein FASTA file
    cutoffs : dict
        Dictionary containing score cutoffs for each BUSCO model
    tmpdir : str or bool, optional
        Path to temporary directory, or False to use system default
    cpus : int, optional
        Number of CPU threads to use (default: 1)
    maxhits : int, optional
        Maximum number of hits to return per query (default: 3)
    maxintron : int, optional
        Maximum intron size (default: 10000)
    buscodb : str, optional
        Path to the BUSCO database directory (default: ".")

    Returns
    -------
    dict
        Dictionary containing filtered alignment results
    """
    # Continue with function implementation
    # setup tmpdir
    if not tmpdir:
        tmpdir = os.path.join("/tmp", str(uuid.uuid4()))
        os.makedirs(tmpdir)
    # check fasta headers
    headers = fasta2headers(input)
    # setup miniprot command
    cmd = [
        "miniprot",
        "-G",
        str(maxintron),
        "-N",
        str(maxhits),
        "-t",
        str(cpus),
        "--gff",
        os.path.abspath(input),
        os.path.abspath(query),
    ]
    results = {}
    Genes = {}
    for line in execute(cmd):
        if line.startswith("##PAF"):
            (
                pname,
                plen,
                pstart,
                pend,
                strand,
                cname,
                _,  # clen (not used)
                cstart,
                cend,
                _,  # match (not used)
                _,  # naln (not used)
                _,  # mapq (not used)
            ) = line.split("\t")[1:13]
            score = 0
            escore = 0.0
            for x in line.split("\t")[13:]:
                if x.startswith("AS:"):
                    score = int(x.split(":")[-1])
            qcov = (int(pend) - int(pstart)) / float(int(plen))
            if cname not in headers:
                if cname.startswith("gb|"):
                    cname = cname.replace("gb|", "").rstrip("|")
            if pname not in results:
                results[pname] = {cname: []}
            if cname not in results[pname]:
                results[pname][cname] = [
                    {
                        "evalue": escore,
                        "score": score,
                        "qcov": qcov,
                        "qlen": int(plen),
                        "cov": (int(pstart), int(pend)),
                        "coords": (int(cstart), int(cend)),
                        "strand": strand,
                    }
                ]
            else:
                results[pname][cname].append(
                    {
                        "evalue": escore,
                        "score": score,
                        "qcov": qcov,
                        "qlen": int(plen),
                        "cov": (int(pstart), int(pend)),
                        "coords": (int(cstart), int(cend)),
                        "strand": strand,
                    }
                )
        elif line.startswith(("##gff-version", "\n")):
            continue
        else:
            line = line.rstrip()
            # skip lines that aren't 9 columns
            if not line.count("\t") == 8:
                continue
            (
                contig,
                source,
                feature,
                start,
                end,
                score,
                strand,
                phase,
                attributes,
            ) = line.split("\t")
            if feature not in [
                "mRNA",
                "CDS",
                "stop_codon",
            ]:
                continue
            attributes = unquote(attributes)
            source = unquote(source)
            feature = unquote(feature)
            start = int(start)
            end = int(end)
            ID = None
            Parent = None
            Name = None
            # These variables are not used but kept for future compatibility
            # Product = None
            # GeneFeature = None
            # gbkey = None
            info = {}
            for field in attributes.split(";"):
                try:
                    k, v = field.split("=", 1)
                    info[k] = v.strip()
                except (IndexError, ValueError):
                    pass
            # now can lookup in info dict for values
            ID = info.get("ID", None)
            Parent = info.get("Parent", None)
            Target = info.get("Target", None)
            Identity = info.get("Identity", None)
            Positive = info.get("Positive", None)
            Rank = info.get("Rank", None)
            if Target:
                Name = Target.split()[0]
            # now we can do add to dictionary these parsed values
            # genbank gff files are incorrect for tRNA so check if gbkey exists and make up gene on the fly
            if feature in ["mRNA"]:
                if ID not in Genes:
                    Genes[ID] = {
                        "name": Name,
                        "type": ["mRNA"],
                        "transcript": [],
                        "cds_transcript": [],
                        "protein": [],
                        "5UTR": [[]],
                        "3UTR": [[]],
                        "gene_synonym": [],
                        "codon_start": [],
                        "ids": [ID],
                        "CDS": [[]],
                        "mRNA": [[]],
                        "strand": strand,
                        "EC_number": [[]],
                        "location": (start, end),
                        "contig": contig,
                        "product": ["miniprot alignment"],
                        "source": source,
                        "phase": [[]],
                        "db_xref": [[]],
                        "go_terms": [[]],
                        "note": [
                            [
                                f"IDENTITY:{Identity}",
                                f"RANK:{Rank}",
                                f"Positive:{Positive}",
                            ]
                        ],
                        "partialStart": [],
                        "partialStop": [],
                        "pseudo": False,
                        "score": [],
                        "target": [],
                    }
                else:
                    if start < Genes[ID]["location"][0]:
                        Genes[ID]["location"] = (start, Genes[ID]["location"][1])
                    if end > Genes[ID]["location"][1]:
                        Genes[ID]["location"] = (Genes[ID]["location"][0], end)
                    Genes[ID]["Note"].append(
                        [
                            f"IDENTITY:{Identity}",
                            f"RANK:{Rank}",
                            f"Positive:{Positive}",
                        ]
                    )
            elif feature in ["CDS"]:
                if Parent:
                    # determine which transcript this is get index from id
                    i = Genes[Parent]["ids"].index(Parent)
                    Genes[Parent]["CDS"][i].append((start, end))
                    Genes[Parent]["mRNA"][i].append((start, end))
                    Genes[Parent]["score"].append(round(float(Identity) * 100, 2))
                    Genes[Parent]["target"].append(Target)
                    # add phase
                    try:
                        Genes[Parent]["phase"][i].append(int(phase))
                    except ValueError:
                        Genes[Parent]["phase"][i].append("?")
            elif feature in ["stop_codon"]:
                if Parent:
                    i = Genes[Parent]["ids"].index(Parent)
                    # here we need to extend the last CDS if + and first if - strand
                    if strand == "+":
                        lt = Genes[Parent]["CDS"][i][-1]
                        nLT = (lt[0], end)
                        Genes[Parent]["CDS"][i][-1] = nLT
                        Genes[Parent]["mRNA"][i][-1] = nLT
                    elif strand == "-":
                        ft = Genes[Parent]["CDS"][i][0]
                        nFT = (start, ft[1])
                        Genes[Parent]["CDS"][i][0] = nFT
                        Genes[Parent]["mRNA"][i][0] = nFT

    # we can then process the Genes dictionary for full length genes
    SeqRecords = fasta2dict(input)
    annotation = validate_models(Genes, SeqRecords, table=1)
    g = {}
    for k, v in annotation.items():
        if "mRNA" in v["type"]:
            if False in v["partialStart"] and False in v["partialStop"] and not v["pseudo"]:
                # check complete models with hmmmer to validate and reformat
                hmmfile = os.path.join(buscodb, "hmms", "{}.hmm".format(v["name"]))
                if not os.path.isfile(hmmfile):
                    continue
                # now we can check via hmmer
                hmm_result = hmmer_search_single(hmmfile, v["protein"][0].rstrip("*"))
                if len(hmm_result) > 0:
                    if hmm_result[0]["bitscore"] > cutoffs[v["name"]]["score"]:
                        final = {
                            "contig": v["contig"],
                            "strand": v["strand"],
                            "location": v["location"],
                            "coords": v["CDS"][0],
                            "phase": v["phase"][0],
                            "transcript": v["cds_transcript"][0],
                            "translation": v["protein"][0],
                            "status": "complete",
                            "hmmer": hmm_result[0],
                            "miniprot_score": results[v["name"]][v["contig"]][0]["score"],
                        }
                        if v["name"] not in g:
                            g[v["name"]] = [final]
                        else:
                            g[v["name"]].append(final)
    # want to return a dictionary of busco hits
    final = {}
    njobs = 0
    for k, v in sorted(results.items()):
        if k not in g:
            for x, z in sorted(v.items()):
                merged = merge_overlapping_hits(z, fluff=maxintron)
                for w in merged:
                    njobs += 1
                    if k not in final:
                        final[k] = [
                            {
                                "contig": x,
                                "coords": w["coords"],
                                "evalue": w["evalue"],
                                "score": w["score"],
                                "strand": w["strand"],
                            }
                        ]
                    else:
                        final[k].append(
                            {
                                "contig": x,
                                "coords": w["coords"],
                                "evalue": w["evalue"],
                                "score": w["score"],
                                "strand": w["strand"],
                            }
                        )
    return g, final, njobs

blast_prefilter(input, query, logger=None, tmpdir=False, evalue=0.001, cpus=1, maxhits=3, maxintron=10000)

Prefilters alignments for augustus using tblastn.

Parameters

input : str Path to the input genome FASTA file query : str Path to the query protein FASTA file logger : object Logger object for logging messages tmpdir : str or bool, optional Path to temporary directory, or False to use system default evalue : float, optional E-value threshold for BLAST hits (default: 1e-3) cpus : int, optional Number of CPU threads to use (default: 1) maxhits : int, optional Maximum number of hits to return per query (default: 3) maxintron : int, optional Maximum intron size (default: 10000)

Returns

tuple (dict, int) - Dictionary containing filtered alignment results and number of jobs

Source code in buscolite/search.py
def blast_prefilter(
    input,
    query,
    logger=None,
    tmpdir=False,
    evalue=1e-3,
    cpus=1,
    maxhits=3,
    maxintron=10000,
    # logger parameter is kept for API compatibility but not used
):
    """
    Prefilters alignments for augustus using tblastn.

    Parameters
    ----------
    input : str
        Path to the input genome FASTA file
    query : str
        Path to the query protein FASTA file
    logger : object
        Logger object for logging messages
    tmpdir : str or bool, optional
        Path to temporary directory, or False to use system default
    evalue : float, optional
        E-value threshold for BLAST hits (default: 1e-3)
    cpus : int, optional
        Number of CPU threads to use (default: 1)
    maxhits : int, optional
        Maximum number of hits to return per query (default: 3)
    maxintron : int, optional
        Maximum intron size (default: 10000)

    Returns
    -------
    tuple
        (dict, int) - Dictionary containing filtered alignment results and number of jobs
    """
    # figure out cpus
    if cpus > 1:  # need to see if tblastn is safe multithreading
        if parse_version("2.2.32") < parse_version(tblastn_version()) < parse_version("2.10.2"):
            cpus = 1
    # setup tmpdir
    if not tmpdir:
        tmpdir = os.path.join("/tmp", str(uuid.uuid4()))
        os.makedirs(tmpdir)
    # check fasta headers
    headers = fasta2headers(input)
    # start by formatting blast db/dustmasker filtered format
    cmd = [
        "dustmasker",
        "-in",
        os.path.abspath(input),
        "-infmt",
        "fasta",
        "-parse_seqids",
        "-outfmt",
        "maskinfo_asn1_bin",
        "-out",
        "genome_dust.asnb",
    ]
    runprocess(cmd, cwd=tmpdir)
    cmd = [
        "makeblastdb",
        "-in",
        os.path.abspath(input),
        "-dbtype",
        "nucl",
        "-parse_seqids",
        "-mask_data",
        "genome_dust.asnb",
        "-out",
        "genome",
    ]
    runprocess(cmd, cwd=tmpdir)
    cmd = [
        "tblastn",
        "-num_threads",
        str(cpus),
        "-db",
        os.path.join(tmpdir, "genome"),
        "-query",
        query,
        "-max_target_seqs",
        str(maxhits),
        "-db_soft_mask",
        "11",
        "-threshold",
        "999",
        "-max_intron_length",
        str(maxintron),
        "-evalue",
        str(evalue),
        "-outfmt",
        "6 sseqid slen sstart send qseqid qlen qstart qend pident length evalue score",
    ]
    results = {}
    for line in execute(cmd):
        cols = line.rstrip().split("\t")
        qcov = (int(cols[7]) - int(cols[6])) / float(int(cols[5]))
        score = float(cols[11])
        escore = float(cols[10])
        contig = cols[0]
        if contig not in headers:
            if contig.startswith("gb|"):
                contig = contig.replace("gb|", "").rstrip("|")
        if int(cols[3]) > int(cols[2]):
            start = int(cols[2])
            end = int(cols[3])
        else:
            start = int(cols[3])
            end = int(cols[2])
        if cols[4] not in results:
            results[cols[4]] = {contig: []}
        if contig not in results[cols[4]]:
            results[cols[4]][contig] = [
                {
                    "evalue": escore,
                    "score": score,
                    "qcov": qcov,
                    "qlen": int(cols[5]),
                    "cov": (int(cols[6]), int(cols[7])),
                    "coords": (start, end),
                }
            ]
        else:
            results[cols[4]][contig].append(
                {
                    "evalue": escore,
                    "score": score,
                    "qcov": qcov,
                    "qlen": int(cols[5]),
                    "cov": (int(cols[6]), int(cols[7])),
                    "coords": (start, end),
                }
            )
    # clean up intermediate files
    shutil.rmtree(tmpdir)

    # now we can parse and filter results
    # want to return a dictionary of busco hits
    final = {}
    njobs = 0
    for k, v in sorted(results.items()):
        for x, z in sorted(v.items()):
            merged = merge_overlapping_hits(z, fluff=maxintron)
            for w in merged:
                njobs += 1
                if k not in final:
                    final[k] = [
                        {
                            "contig": x,
                            "coords": w["coords"],
                            "evalue": w["evalue"],
                            "score": w["score"],
                        }
                    ]
                else:
                    final[k].append(
                        {
                            "contig": x,
                            "coords": w["coords"],
                            "evalue": w["evalue"],
                            "score": w["score"],
                        }
                    )
    return final, njobs

merge_overlapping_hits(queryList, fluff=10000)

Merge overlapping or nearby hits from BLAST or miniprot searches.

Parameters

queryList : list List of dictionaries containing hit information with 'coords' key fluff : int, optional Maximum distance between hits to consider them for merging (default: 10000)

Returns

list List of merged hits

Source code in buscolite/search.py
def merge_overlapping_hits(queryList, fluff=10000):
    """
    Merge overlapping or nearby hits from BLAST or miniprot searches.

    Parameters
    ----------
    queryList : list
        List of dictionaries containing hit information with 'coords' key
    fluff : int, optional
        Maximum distance between hits to consider them for merging (default: 10000)

    Returns
    -------
    list
        List of merged hits
    """
    if len(queryList) < 2:
        return queryList
    # sort by coords and try to merge if within fluff
    queryList.sort(key=lambda x: x["coords"][0])
    result = [queryList[0]]
    for x in queryList[1:]:
        if (
            x["coords"][0] <= result[-1]["coords"][1]
            or (result[-1]["coords"][1] - x["coords"][0]) < fluff
        ):
            result[-1]["coords"] = (
                result[-1]["coords"][0],
                max(result[-1]["coords"][1], x["coords"][1]),
            )
        else:
            result.append(x)
    return result

hmmer_search_single(hmmfile, seq)

Search a single protein sequence against an HMM profile using pyhmmer.

Parameters

hmmfile : str Path to the HMM profile file seq : str Protein sequence to search

Returns

list List of dictionaries containing search results with the following keys: - name: Name of the HMM profile - bitscore: Bit score of the hit - evalue: E-value of the hit - domains: List of domain hits with coordinates and scores

Source code in buscolite/search.py
def hmmer_search_single(hmmfile, seq):
    """
    Search a single protein sequence against an HMM profile using pyhmmer.

    Parameters
    ----------
    hmmfile : str
        Path to the HMM profile file
    seq : str
        Protein sequence to search

    Returns
    -------
    list
        List of dictionaries containing search results with the following keys:
        - name: Name of the HMM profile
        - bitscore: Bit score of the hit
        - evalue: E-value of the hit
        - domains: List of domain hits with coordinates and scores
    """
    hmm = next(pyhmmer.plan7.HMMFile(hmmfile))
    prot = pyhmmer.easel.TextSequence(sequence=seq)
    alphabet = pyhmmer.easel.Alphabet.amino()
    results = []
    for top_hits in pyhmmer.hmmsearch([hmm], [prot.digitize(alphabet)]):
        for hit in top_hits:
            cog = hit.best_domain.alignment.hmm_name
            domains = []
            for h in hit.domains:
                domains.append(
                    {
                        "hmm_from": h.alignment.hmm_from,
                        "hmm_to": h.alignment.hmm_to,
                        "env_from": h.env_from,
                        "env_to": h.env_to,
                        "score": h.score,
                    }
                )
            results.append(
                {
                    "name": cog,
                    "bitscore": hit.score,
                    "evalue": hit.evalue,
                    "domains": domains,
                }
            )
    return results

Search multiple protein sequences against an HMM profile using pyhmmer.

Parameters

hmmfile : str Path to the HMM profile file sequences : list List of digitized protein sequences to search

Returns

list List of dictionaries containing search results with the following keys: - name: Name of the HMM profile - hit: Name of the sequence that matched - bitscore: Bit score of the hit - evalue: E-value of the hit - domains: List of domain hits with coordinates and scores

Source code in buscolite/search.py
def hmmer_search(hmmfile, sequences):
    """
    Search multiple protein sequences against an HMM profile using pyhmmer.

    Parameters
    ----------
    hmmfile : str
        Path to the HMM profile file
    sequences : list
        List of digitized protein sequences to search

    Returns
    -------
    list
        List of dictionaries containing search results with the following keys:
        - name: Name of the HMM profile
        - hit: Name of the sequence that matched
        - bitscore: Bit score of the hit
        - evalue: E-value of the hit
        - domains: List of domain hits with coordinates and scores
    """
    hmm = next(pyhmmer.plan7.HMMFile(hmmfile))
    results = []
    for top_hits in pyhmmer.hmmsearch([hmm], sequences):
        for hit in top_hits:
            cog = hit.best_domain.alignment.hmm_name
            domains = []
            for h in hit.domains:
                domains.append(
                    {
                        "hmm_from": h.alignment.hmm_from,
                        "hmm_to": h.alignment.hmm_to,
                        "env_from": h.env_from,
                        "env_to": h.env_to,
                        "score": h.score,
                    }
                )
            results.append(
                {
                    "name": cog,
                    "hit": hit.name,
                    "bitscore": hit.score,
                    "evalue": hit.evalue,
                    "domains": domains,
                }
            )
    return results

gff

GFF3 file parsing and manipulation.

gff

utilities

Utility functions for filtering and processing BUSCO results.

utilities

overlap(start1, end1, start2, end2)

how much does the range (start1, end1) overlap with (start2, end2)

Source code in buscolite/utilities.py
def overlap(start1, end1, start2, end2):
    """how much does the range (start1, end1) overlap with (start2, end2)"""
    return max(max((end2 - start1), 0) - max((end2 - end1), 0) - max((start2 - start1), 0), 0)

filter_low_scoring_matches(busco_results, threshold=0.85)

Filter out matches that score less than a threshold percentage of the top bitscore for each BUSCO. This implements the BUSCO v6 filtering logic.

Parameters

busco_results : dict Dictionary of BUSCO results where keys are BUSCO IDs and values are lists of match dictionaries containing 'bitscore' keys threshold : float, optional Minimum score threshold as fraction of top score (default: 0.85)

Returns

dict Filtered dictionary with low-scoring matches removed

Source code in buscolite/utilities.py
def filter_low_scoring_matches(busco_results, threshold=0.85):
    """
    Filter out matches that score less than a threshold percentage of the top bitscore
    for each BUSCO. This implements the BUSCO v6 filtering logic.

    Parameters
    ----------
    busco_results : dict
        Dictionary of BUSCO results where keys are BUSCO IDs and values are lists
        of match dictionaries containing 'bitscore' keys
    threshold : float, optional
        Minimum score threshold as fraction of top score (default: 0.85)

    Returns
    -------
    dict
        Filtered dictionary with low-scoring matches removed
    """
    filtered = {}
    for busco_id, matches in busco_results.items():
        if len(matches) == 0:
            continue
        elif len(matches) == 1:
            filtered[busco_id] = matches
        else:
            # Find the maximum bitscore for this BUSCO
            max_bitscore = max(m["bitscore"] for m in matches)
            cutoff = threshold * max_bitscore
            # Keep only matches above the cutoff
            kept_matches = [m for m in matches if m["bitscore"] >= cutoff]
            if len(kept_matches) > 0:
                filtered[busco_id] = kept_matches
    return filtered

remove_duplicate_gene_matches(busco_results, score_key='bitscore')

When the same gene/sequence matches multiple BUSCOs, keep only the highest scoring match. This prevents a single gene from being counted multiple times.

Parameters

busco_results : dict Dictionary of BUSCO results where keys are BUSCO IDs and values are lists of match dictionaries score_key : str, optional Key to use for scoring. Can be a simple key like 'bitscore' or a nested key like 'hmmer.bitscore' (default: 'bitscore')

Returns

dict Filtered dictionary with duplicate gene matches removed

Source code in buscolite/utilities.py
def remove_duplicate_gene_matches(busco_results, score_key="bitscore"):
    """
    When the same gene/sequence matches multiple BUSCOs, keep only the highest
    scoring match. This prevents a single gene from being counted multiple times.

    Parameters
    ----------
    busco_results : dict
        Dictionary of BUSCO results where keys are BUSCO IDs and values are lists
        of match dictionaries
    score_key : str, optional
        Key to use for scoring. Can be a simple key like 'bitscore' or a nested
        key like 'hmmer.bitscore' (default: 'bitscore')

    Returns
    -------
    dict
        Filtered dictionary with duplicate gene matches removed
    """
    # Build a reverse mapping: gene_id -> list of (busco_id, match_dict, score)
    gene_to_buscos = {}
    for busco_id, matches in busco_results.items():
        for match in matches:
            # For protein mode, use 'hit' as gene identifier
            # For genome mode, use combination of contig and location
            if "hit" in match:
                gene_id = match["hit"]
            elif "contig" in match and "location" in match:
                gene_id = "{}_{}:{}".format(
                    match["contig"], match["location"][0], match["location"][1]
                )
            else:
                continue

            # Handle nested score keys (e.g., 'hmmer.bitscore')
            if "." in score_key:
                keys = score_key.split(".")
                score = match
                for key in keys:
                    score = score.get(key, {})
                if not isinstance(score, (int, float)):
                    score = 0
            else:
                score = match.get(score_key, 0)

            if gene_id not in gene_to_buscos:
                gene_to_buscos[gene_id] = []
            gene_to_buscos[gene_id].append((busco_id, match, score))

    # Find genes that match multiple BUSCOs
    genes_to_remove = {}
    for gene_id, busco_matches in gene_to_buscos.items():
        if len(busco_matches) > 1:
            # Sort by score descending
            busco_matches.sort(key=lambda x: x[2], reverse=True)
            # Keep the best match, mark others for removal
            for busco_id, match, score in busco_matches[1:]:
                if busco_id not in genes_to_remove:
                    genes_to_remove[busco_id] = []
                genes_to_remove[busco_id].append(gene_id)

    # Remove the duplicate matches
    filtered = {}
    for busco_id, matches in busco_results.items():
        if busco_id in genes_to_remove:
            genes_to_skip = set(genes_to_remove[busco_id])
            kept_matches = []
            for match in matches:
                if "hit" in match:
                    gene_id = match["hit"]
                elif "contig" in match and "location" in match:
                    gene_id = "{}_{}:{}".format(
                        match["contig"], match["location"][0], match["location"][1]
                    )
                else:
                    gene_id = None

                if gene_id not in genes_to_skip:
                    kept_matches.append(match)

            if len(kept_matches) > 0:
                filtered[busco_id] = kept_matches
        else:
            filtered[busco_id] = matches

    return filtered

zopen(filename, mode='r', buff=1024 * 1024, external=PARALLEL)

Open pipe, zipped, or unzipped file automagically

external == 0: normal zip libraries

external == 1: (zcat, gzip) or (bzcat, bzip2)

external == 2: (pigz -dc, pigz) or (pbzip2 -dc, pbzip2)

Source code in buscolite/utilities.py
def zopen(filename, mode="r", buff=1024 * 1024, external=PARALLEL):
    """
    Open pipe, zipped, or unzipped file automagically

    # external == 0: normal zip libraries
    # external == 1: (zcat, gzip) or (bzcat, bzip2)
    # external == 2: (pigz -dc, pigz) or (pbzip2 -dc, pbzip2)
    """
    if "r" in mode and "w" in mode:
        return None
    if filename.startswith("!"):
        return open_pipe(filename[1:], mode, buff)
    elif filename.endswith(".bz2"):
        return open_bz2(filename, mode, buff, external)
    elif filename.endswith(".gz"):
        return open_gz(filename, mode, buff, external)
    elif filename.endswith(".xz"):
        return open_xz(filename, mode, buff, external)
    else:
        return open(filename, mode, buff)

fastx

FASTA/FASTQ file handling.

fastx

translate(dna, strand, phase, table=1)

Translates DNA sequence into proteins.

Takes DNA (or rather cDNA sequence) and translates to proteins/amino acids. It requires the DNA sequence, the strand, translation phase, and translation table.

Parameters

dna : str DNA (cDNA) sequence as nucleotides strand : str, (+/-) strand to translate (+ or -) phase : int phase to start translation [0,1,2] table : int, default=1 translation table [1]

Returns

protSeq : str string of translated amino acid sequence

Source code in buscolite/fastx.py
def translate(dna, strand, phase, table=1):
    """Translates DNA sequence into proteins.

    Takes DNA (or rather cDNA sequence) and translates to proteins/amino acids.
    It requires the DNA sequence, the strand, translation phase, and translation
    table.

    Parameters
    ----------
    dna : str
        DNA (cDNA) sequence as nucleotides
    strand : str, (+/-)
        strand to translate (+ or -)
    phase : int
        phase to start translation [0,1,2]
    table : int, default=1
        translation table [1]

    Returns
    -------
    protSeq : str
        string of translated amino acid sequence

    """

    def _split(str, num):
        return [str[start : start + num] for start in range(0, len(str), num)]

    if strand == "-" or strand == -1:
        seq = RevComp(dna)
    else:
        seq = dna
    seq = seq[phase:]
    # map seq to proteins
    protSeq = []
    for c, i in enumerate(_split(seq, 3)):
        if len(i) == 3:
            iSeq = i.upper()
            if c == 0:  # first codon
                if iSeq in codon_table[table]["start"]:
                    aa = "M"
                    protSeq.append(aa)
                else:
                    if iSeq in codon_table[table]["table"]:
                        aa = codon_table[table]["table"][iSeq]
                        protSeq.append(aa)
                    else:
                        protSeq.append("X")
            else:
                if iSeq in codon_table[table]["table"]:
                    aa = codon_table[table]["table"][iSeq]
                    protSeq.append(aa)
                else:
                    protSeq.append("X")
    return "".join(protSeq)

fasta2dict(fasta, full_header=False)

Read FASTA file to dictionary.

This is same as biopython SeqIO.to_dict(), return dictionary keyed by contig name and value is the sequence string.

Parameters

fasta : filename FASTA input file (can be gzipped) full_header : bool, default=False return full header for contig names, default is split at first space

Returns

seqs : dict returns OrderedDict() of header: seq

Source code in buscolite/fastx.py
def fasta2dict(fasta, full_header=False):
    """Read FASTA file to dictionary.

    This is same as biopython SeqIO.to_dict(), return dictionary
    keyed by contig name and value is the sequence string.

    Parameters
    ----------
    fasta : filename
        FASTA input file (can be gzipped)
    full_header : bool, default=False
        return full header for contig names, default is split at first space

    Returns
    -------
    seqs : dict
        returns OrderedDict() of header: seq

    """
    seqs = OrderedDict()
    for title, seq in pyfastx.Fasta(fasta, full_name=True, build_index=False):
        if full_header:
            title = title
        else:
            if " " in title:
                title = title.split(" ", 1)[0]
        seqs[title] = seq
    return seqs

fasta2headers(fasta, full_header=False)

Read FASTA file set of headers.

Simple function to read FASTA file and return set of contig names

Parameters

fasta : filename FASTA input file (can be gzipped) full_header : bool, default=False return full header for contig names, default is split at first space

Returns

headers : set returns set() of header names

Source code in buscolite/fastx.py
def fasta2headers(fasta, full_header=False):
    """Read FASTA file set of headers.

    Simple function to read FASTA file and return set of contig names

    Parameters
    ----------
    fasta : filename
        FASTA input file (can be gzipped)
    full_header : bool, default=False
        return full header for contig names, default is split at first space

    Returns
    -------
    headers : set
        returns set() of header names

    """
    # generate a set of the contig/scaffold names
    headers = set()
    for title, seq in pyfastx.Fasta(fasta, full_name=True, build_index=False):
        if full_header:
            title = title
        else:
            if " " in title:
                title = title.split(" ", 1)[0]
        headers.add(title)
    return headers

fasta2lengths(fasta, full_header=False)

Read FASTA file to dictionary of sequence lengths.

Reads FASTA file (optionally gzipped) and returns dictionary of contig header names as keys with length of sequences as values

Parameters

fasta : filename FASTA input file (can be gzipped) full_header : bool, default=False return full header for contig names, default is split at first space

Returns

seqs : dict returns dictionary of header: len(seq)

Source code in buscolite/fastx.py
def fasta2lengths(fasta, full_header=False):
    """Read FASTA file to dictionary of sequence lengths.

    Reads FASTA file (optionally gzipped) and returns dictionary of
    contig header names as keys with length of sequences as values

    Parameters
    ----------
    fasta : filename
        FASTA input file (can be gzipped)
    full_header : bool, default=False
        return full header for contig names, default is split at first space

    Returns
    -------
    seqs : dict
        returns dictionary of header: len(seq)

    """
    seqs = {}
    for title, seq in pyfastx.Fasta(fasta, full_name=True, build_index=False):
        if full_header:
            title = title
        else:
            if " " in title:
                title = title.split(" ", 1)[0]
        seqs[title] = len(seq)
    return seqs

explode_fasta(fasta, folder, suffix='.fa')

Read FASTA file and write 1 contig per file to folder

Parameters

fasta : filename FASTA input file (can be gzipped) folder : directory directory to write contigs to

Returns

seqs : dict returns dictionary of header: len(seq)

Source code in buscolite/fastx.py
def explode_fasta(fasta, folder, suffix=".fa"):
    """Read FASTA file and write 1 contig per file to folder

    Parameters
    ----------
    fasta : filename
        FASTA input file (can be gzipped)
    folder : directory
        directory to write contigs to

    Returns
    -------
    seqs : dict
        returns dictionary of header: len(seq)

    """
    if not os.path.isdir(folder):
        os.makedirs(folder)
    seqs = {}
    for title, seq in pyfastx.Fasta(fasta, full_name=True, build_index=False):
        if " " in title:
            title = title.split(" ", 1)[0]
        seqs[title] = len(seq)
        with open(os.path.join(folder, "{}{}".format(title, suffix)), "w") as outfile:
            outfile.write(">{}\n{}\n".format(title, softwrap(seq)))

    return seqs

getSeqRegions(seqs, header, coordinates)

From sequence dictionary return spliced coordinates.

Takes a sequence dictionary (ie from fasta2dict), the contig name (header) and the coordinates to fetch (list of tuples)

Parameters

seqs : dict dictionary of sequences keyed by contig name/ header header : str contig name (header) for sequence in seqs dictionary coordinates : list of tuples list of tuples of sequence coordinates to return [(1,10), (20,30)]

Returns

result : str returns spliced DNA sequence

Source code in buscolite/fastx.py
def getSeqRegions(seqs, header, coordinates):
    """From sequence dictionary return spliced coordinates.

    Takes a sequence dictionary (ie from fasta2dict), the contig name (header)
    and the coordinates to fetch (list of tuples)

    Parameters
    ----------
    seqs : dict
        dictionary of sequences keyed by contig name/ header
    header : str
        contig name (header) for sequence in seqs dictionary
    coordinates : list of tuples
        list of tuples of sequence coordinates to return [(1,10), (20,30)]

    Returns
    -------
    result : str
        returns spliced DNA sequence

    """
    # takes SeqRecord dictionary or Index, returns sequence string
    # coordinates is a list of tuples [(1,10), (20,30)]
    result = ""
    sorted_coordinates = sorted(coordinates, key=lambda tup: tup[0])
    for x in sorted_coordinates:
        result += seqs[header][x[0] - 1 : x[1]]
    return result

augustus

Augustus gene prediction integration.

augustus

log

Logging utilities.

log

Quick Example

from buscolite.busco import runbusco

# Run BUSCO analysis
results, missing, stats, config = runbusco(
    input="genome.fasta",
    lineage="/path/to/fungi_odb10",
    mode="genome",
    species="anidulans",
    cpus=8,
    offset=2000,
    verbosity=3
)

# Print summary
print(f"Complete (single-copy): {stats['single-copy']}")
print(f"Complete (duplicated): {stats['duplicated']}")
print(f"Fragmented: {stats['fragmented']}")
print(f"Missing: {len(missing)}")
print(f"Total: {stats['total']}")

# Access individual BUSCO results
for busco_id, data in results.items():
    if data.get("status") == "complete":
        print(f"{busco_id}: {data['location']}")

Plotting API

from buscolite.plot import generate_plot, generate_multi_plot

# Generate single sample plot
datasets = [{
    'name': 'My Genome',
    'stats': stats,
    'config': config
}]
generate_plot(datasets[0], 'output.svg')

# Generate multi-sample comparison plot
datasets = [
    {'name': 'Sample 1', 'stats': stats1, 'config': config1},
    {'name': 'Sample 2', 'stats': stats2, 'config': config2},
    {'name': 'Sample 3', 'stats': stats3, 'config': config3},
]
generate_multi_plot(datasets, 'comparison.svg')

Detailed Module Documentation

For detailed documentation of each module, see the individual module pages:

  • busco - Main BUSCO analysis functions
  • search - HMM search functionality
  • gff - GFF3 file handling
  • utilities - Utility functions
  • fastx - FASTA/FASTQ handling
  • augustus - Augustus integration
  • log - Logging utilities