Writing a Pipeline

This guide walks through how to add a new analysis pipeline to Astra. A pipeline consists of two parts:

  1. A model in src/astra/models/ that defines the database table for the pipeline’s output.

  2. A task function in src/astra/pipelines/ that does the analysis and yields results.

We will use a fictional pipeline called “Rocket” as an example throughout.

Step 1: Define the output model

Create src/astra/models/rocket.py. The model must extend PipelineOutputMixin, which provides common columns (task_pk, source_pk, spectrum_pk, v_astra, created, modified, timing fields, and tag).

from astra.fields import FloatField, TextField, BitField
from astra.models.pipeline import PipelineOutputMixin


class Rocket(PipelineOutputMixin):

    """Results from the Rocket pipeline."""

    #> Stellar Parameters
    teff = FloatField(null=True, help_text="Effective temperature [K]")
    e_teff = FloatField(null=True, help_text="Uncertainty in Teff [K]")
    logg = FloatField(null=True, help_text="Surface gravity [dex]")
    e_logg = FloatField(null=True, help_text="Uncertainty in logg [dex]")

    #> Summary Statistics
    rchi2 = FloatField(null=True, help_text="Reduced chi-squared")
    result_flags = BitField(default=0)

    flag_low_snr = result_flags.flag(2**0, help_text="S/N is too low")
    flag_no_converge = result_flags.flag(2**1, help_text="Fit did not converge")

Key conventions

  • Category headers: Lines like #> Stellar Parameters above a field group are parsed by BaseModel.category_headers and used to add section headers in FITS output files.

  • help_text: Used as the FITS column comment. Keep it under ~47 characters.

  • BitField flags: Use result_flags.flag(2**N) to define individual boolean flags packed into a single integer column. Each flag is a power of two.

  • null=True: Almost all pipeline output fields should be nullable, because a pipeline may fail on a given spectrum and still want to record a row (e.g., with a flag set).

  • Unique constraint: PipelineOutputMixin enforces a unique constraint on (spectrum_pk, v_astra_major_minor). This means each pipeline produces at most one result per spectrum per major.minor version of Astra. Re-running a pipeline for the same version will update existing rows rather than creating duplicates.

from_spectrum

PipelineOutputMixin provides a from_spectrum(spectrum, **kwargs) class method that creates a new model instance, automatically copying source_pk and spectrum_pk from the input spectrum:

result = Rocket.from_spectrum(spectrum, teff=5000, logg=4.5)

This is the standard way to construct result objects inside a pipeline.

Step 2: Write the pipeline task

Create src/astra/pipelines/rocket/__init__.py. The key ingredients are:

  1. Import task from astra.

  2. Decorate the main function with @task.

  3. The function must be a generator that yields model instances.

  4. Use Python type annotations to declare the input spectrum type and the output model type.

from typing import Iterable, Optional
from astra import task
from astra.models.boss import BossVisitSpectrum
from astra.models.rocket import Rocket
from astra.utils import log


@task
def rocket(
    spectra: Iterable[BossVisitSpectrum],
    some_parameter: Optional[float] = 1.0,
    **kwargs,
) -> Iterable[Rocket]:
    """
    Estimate stellar parameters with the Rocket method.

    :param spectra:
        An iterable of input spectra.
    :param some_parameter:
        A tuning knob.
    """

    # One-time setup (e.g., load a model). The @task decorator's Timer
    # tracks this as "overhead" time separate from per-spectrum time.
    model = _load_rocket_model()

    for spectrum in spectra:
        try:
            teff, e_teff, logg, e_logg, rchi2 = _fit(
                spectrum.wavelength,
                spectrum.flux,
                spectrum.ivar,
                model,
                some_parameter,
            )
        except Exception:
            log.exception(f"Failed on {spectrum}")
            yield Rocket.from_spectrum(spectrum, flag_no_converge=True)
            continue

        yield Rocket.from_spectrum(
            spectrum,
            teff=teff,
            e_teff=e_teff,
            logg=logg,
            e_logg=e_logg,
            rchi2=rchi2,
        )

How the @task decorator works

The @task decorator (defined in src/astra/__init__.py) wraps a generator function and handles:

  1. Timing: It wraps the generator in a Timer context manager that measures elapsed time and overhead per result. The t_elapsed and t_overhead fields are automatically populated on each yielded result.

  2. Batched database writes: Yielded results are collected into batches (default batch_size=1000) and periodically flushed to the database (default every write_frequency=300 seconds). The flush uses an upsert (INSERT ... ON CONFLICT ... UPDATE) so re-running a pipeline for the same Astra version updates existing rows.

  3. Error handling: Exceptions inside the generator are logged. By default (re_raise_exceptions=True) they are re-raised after logging; set to False to continue past errors.

  4. Return semantics: After database writes, the decorator yields back the results (now with task_pk and created populated from the database). Callers iterating over the task get fully-persisted result objects.

You can control behaviour with keyword arguments when calling the task:

# Run without writing to the database (useful for debugging)
for result in rocket(spectra, write_to_database=False):
    print(result.teff)

# Change batch size and write frequency
for result in rocket(spectra, batch_size=500, write_frequency=60):
    ...

Type annotations matter

The type annotations on the task function are not just documentation – Astra inspects them at runtime:

  • spectra: Iterable[BossVisitSpectrum] tells generate_queries_for_task which input spectrum table(s) to query. The system uses this to build a query that finds spectra not yet processed by this pipeline for the current Astra version.

  • -> Iterable[Rocket] tells the system which output model to use for the upsert conflict resolution.

If your pipeline can accept multiple spectrum types, use a Union:

from typing import Union
from astra.models.apogee import ApogeeCoaddedSpectrumInApStar

def rocket(
    spectra: Iterable[Union[BossVisitSpectrum, ApogeeCoaddedSpectrumInApStar]],
    **kwargs,
) -> Iterable[Rocket]:
    ...

Yielding partial or flagged results

When a spectrum cannot be processed (e.g., wrong target type, too low S/N), yield a result with the appropriate flag set and no parameter values. This records that the pipeline has seen this spectrum, preventing it from being re-queued:

yield Rocket.from_spectrum(spectrum, flag_low_snr=True)

Because parameter fields are null=True, the unfilled fields will be NULL in the database.

Step 3: Run the pipeline

From Python

from astra.models.boss import BossVisitSpectrum
from astra.pipelines.rocket import rocket

spectra = BossVisitSpectrum.select().limit(10)
for result in rocket(spectra):
    print(result.teff, result.logg)

From the CLI

# Run locally
astra run rocket BossVisitSpectrum --limit 100

# Submit to Slurm
astra srun rocket BossVisitSpectrum --nodes 4 --time="24:00:00"

The CLI resolves the task name (rocket) to the pipeline function and the input model name (BossVisitSpectrum) to the ORM class. It then calls generate_queries_for_task to find unprocessed spectra and passes them to the task.

Real-world example: Corv

The corv pipeline in src/astra/pipelines/corv/__init__.py is a good compact example. It:

  1. Accepts BossVisitSpectrum and returns Corv results.

  2. Checks pre-conditions (is the source a white dwarf? does it have a DA classification from SnowWhite?) and yields flagged results for spectra that do not pass.

  3. Submits fitting work to a ProcessPoolExecutor for parallelism.

  4. Yields Corv(...) instances (not using from_spectrum in the worker, since database connections cannot cross process boundaries – instead it constructs the object directly with source_pk and spectrum_pk).

The corresponding model is in src/astra/models/corv.py and defines output fields for radial velocity, stellar parameters, initial values, and quality flags.