Core API¶
astra.__version__¶
astra.__version__ # e.g. "0.8.1"
The current version of astra as a string in "major.minor.patch" format.
@task¶
@task
def my_pipeline(spectra, ..., **kwargs) -> Iterable[MyModel]:
...
yield result
A decorator for functions that serve as astra tasks. The decorated function should be a
generator that yields result objects (typically subclasses of astra.models.BaseModel).
The decorator handles batching results and writing them to the database.
Keyword arguments passed at call time:
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
If |
|
|
|
Number of rows to insert per batch. |
|
|
|
Seconds to wait between database write checkpoints. |
|
|
|
If |
|
|
|
If |
Example:
from astra import task
@task
def my_analysis(spectra, **kwargs):
for spectrum in spectra:
result = MyModel(spectrum_pk=spectrum.spectrum_pk, ...)
yield result
When calling a decorated task, you can override the decorator parameters:
for result in my_analysis(spectra, batch_size=500, write_frequency=60):
print(result)
generate_queries_for_task¶
def generate_queries_for_task(
task,
input_model=None,
sdss_ids=None,
limit=None,
page=None
)
Generate Peewee queries for input data that needs to be processed by a given task. The function inspects the task’s type annotations to determine which input spectrum or source models are expected and which output model to check against. Queries are ordered by modified time (most recent first), and exclude rows that have already been processed at the current astra version (unless the input has been modified since).
Parameters:
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
(required) |
The task name (e.g. |
|
model or |
|
The input spectrum model. If |
|
list or |
|
A list of SDSS IDs to filter results to specific sources. |
|
|
|
Maximum number of rows per query. |
|
|
|
Page number for pagination (used with |
Yields: (input_model, query) tuples, where input_model is a Peewee model class and query is a SelectQuery.
Example:
from astra import generate_queries_for_task
for input_model, query in generate_queries_for_task("slam.slam", limit=100):
print(f"{input_model}: {query.count()} spectra to process")