Writing a Pipeline¶
This guide walks through how to add a new analysis pipeline to Astra. A pipeline consists of two parts:
A model in
src/astra/models/that defines the database table for the pipeline’s output.A task function in
src/astra/pipelines/that does the analysis and yields results.
We will use a fictional pipeline called “Rocket” as an example throughout.
Step 1: Define the output model¶
Create src/astra/models/rocket.py. The model must extend PipelineOutputMixin, which provides common columns (task_pk, source_pk, spectrum_pk, v_astra, created, modified, timing fields, and tag).
from astra.fields import FloatField, TextField, BitField
from astra.models.pipeline import PipelineOutputMixin
class Rocket(PipelineOutputMixin):
"""Results from the Rocket pipeline."""
#> Stellar Parameters
teff = FloatField(null=True, help_text="Effective temperature [K]")
e_teff = FloatField(null=True, help_text="Uncertainty in Teff [K]")
logg = FloatField(null=True, help_text="Surface gravity [dex]")
e_logg = FloatField(null=True, help_text="Uncertainty in logg [dex]")
#> Summary Statistics
rchi2 = FloatField(null=True, help_text="Reduced chi-squared")
result_flags = BitField(default=0)
flag_low_snr = result_flags.flag(2**0, help_text="S/N is too low")
flag_no_converge = result_flags.flag(2**1, help_text="Fit did not converge")
Key conventions¶
Category headers: Lines like
#> Stellar Parametersabove a field group are parsed byBaseModel.category_headersand used to add section headers in FITS output files.help_text: Used as the FITS column comment. Keep it under ~47 characters.BitFieldflags: Useresult_flags.flag(2**N)to define individual boolean flags packed into a single integer column. Each flag is a power of two.null=True: Almost all pipeline output fields should be nullable, because a pipeline may fail on a given spectrum and still want to record a row (e.g., with a flag set).Unique constraint:
PipelineOutputMixinenforces a unique constraint on(spectrum_pk, v_astra_major_minor). This means each pipeline produces at most one result per spectrum per major.minor version of Astra. Re-running a pipeline for the same version will update existing rows rather than creating duplicates.
from_spectrum¶
PipelineOutputMixin provides a from_spectrum(spectrum, **kwargs) class method that creates a new model instance, automatically copying source_pk and spectrum_pk from the input spectrum:
result = Rocket.from_spectrum(spectrum, teff=5000, logg=4.5)
This is the standard way to construct result objects inside a pipeline.
Step 2: Write the pipeline task¶
Create src/astra/pipelines/rocket/__init__.py. The key ingredients are:
Import
taskfromastra.Decorate the main function with
@task.The function must be a generator that
yields model instances.Use Python type annotations to declare the input spectrum type and the output model type.
from typing import Iterable, Optional
from astra import task
from astra.models.boss import BossVisitSpectrum
from astra.models.rocket import Rocket
from astra.utils import log
@task
def rocket(
spectra: Iterable[BossVisitSpectrum],
some_parameter: Optional[float] = 1.0,
**kwargs,
) -> Iterable[Rocket]:
"""
Estimate stellar parameters with the Rocket method.
:param spectra:
An iterable of input spectra.
:param some_parameter:
A tuning knob.
"""
# One-time setup (e.g., load a model). The @task decorator's Timer
# tracks this as "overhead" time separate from per-spectrum time.
model = _load_rocket_model()
for spectrum in spectra:
try:
teff, e_teff, logg, e_logg, rchi2 = _fit(
spectrum.wavelength,
spectrum.flux,
spectrum.ivar,
model,
some_parameter,
)
except Exception:
log.exception(f"Failed on {spectrum}")
yield Rocket.from_spectrum(spectrum, flag_no_converge=True)
continue
yield Rocket.from_spectrum(
spectrum,
teff=teff,
e_teff=e_teff,
logg=logg,
e_logg=e_logg,
rchi2=rchi2,
)
How the @task decorator works¶
The @task decorator (defined in src/astra/__init__.py) wraps a generator function and handles:
Timing: It wraps the generator in a
Timercontext manager that measures elapsed time and overhead per result. Thet_elapsedandt_overheadfields are automatically populated on each yielded result.Batched database writes: Yielded results are collected into batches (default
batch_size=1000) and periodically flushed to the database (default everywrite_frequency=300seconds). The flush uses an upsert (INSERT ... ON CONFLICT ... UPDATE) so re-running a pipeline for the same Astra version updates existing rows.Error handling: Exceptions inside the generator are logged. By default (
re_raise_exceptions=True) they are re-raised after logging; set toFalseto continue past errors.Return semantics: After database writes, the decorator yields back the results (now with
task_pkandcreatedpopulated from the database). Callers iterating over the task get fully-persisted result objects.
You can control behaviour with keyword arguments when calling the task:
# Run without writing to the database (useful for debugging)
for result in rocket(spectra, write_to_database=False):
print(result.teff)
# Change batch size and write frequency
for result in rocket(spectra, batch_size=500, write_frequency=60):
...
Type annotations matter¶
The type annotations on the task function are not just documentation – Astra inspects them at runtime:
spectra: Iterable[BossVisitSpectrum]tellsgenerate_queries_for_taskwhich input spectrum table(s) to query. The system uses this to build a query that finds spectra not yet processed by this pipeline for the current Astra version.-> Iterable[Rocket]tells the system which output model to use for the upsert conflict resolution.
If your pipeline can accept multiple spectrum types, use a Union:
from typing import Union
from astra.models.apogee import ApogeeCoaddedSpectrumInApStar
def rocket(
spectra: Iterable[Union[BossVisitSpectrum, ApogeeCoaddedSpectrumInApStar]],
**kwargs,
) -> Iterable[Rocket]:
...
Yielding partial or flagged results¶
When a spectrum cannot be processed (e.g., wrong target type, too low S/N), yield a result with the appropriate flag set and no parameter values. This records that the pipeline has seen this spectrum, preventing it from being re-queued:
yield Rocket.from_spectrum(spectrum, flag_low_snr=True)
Because parameter fields are null=True, the unfilled fields will be NULL in the database.
Step 3: Run the pipeline¶
From Python¶
from astra.models.boss import BossVisitSpectrum
from astra.pipelines.rocket import rocket
spectra = BossVisitSpectrum.select().limit(10)
for result in rocket(spectra):
print(result.teff, result.logg)
From the CLI¶
# Run locally
astra run rocket BossVisitSpectrum --limit 100
# Submit to Slurm
astra srun rocket BossVisitSpectrum --nodes 4 --time="24:00:00"
The CLI resolves the task name (rocket) to the pipeline function and the input model name (BossVisitSpectrum) to the ORM class. It then calls generate_queries_for_task to find unprocessed spectra and passes them to the task.
Real-world example: Corv¶
The corv pipeline in src/astra/pipelines/corv/__init__.py is a good compact example. It:
Accepts
BossVisitSpectrumand returnsCorvresults.Checks pre-conditions (is the source a white dwarf? does it have a DA classification from SnowWhite?) and yields flagged results for spectra that do not pass.
Submits fitting work to a
ProcessPoolExecutorfor parallelism.Yields
Corv(...)instances (not usingfrom_spectrumin the worker, since database connections cannot cross process boundaries – instead it constructs the object directly withsource_pkandspectrum_pk).
The corresponding model is in src/astra/models/corv.py and defines output fields for radial velocity, stellar parameters, initial values, and quality flags.