astra.database

Package Contents

Classes

AstraDatabaseConnection SQLAlchemy database connection implementation

Functions

init_process(database) Dispose of the existing database engine.
astra.database.AstraBase
  • If you want to use Astra with only a single worker, then there’s no problem.

  • If you want to use multiple workers then you have a problem. The AstraDatabaseConnection cannot be passed to child processes, otherwise everything fucks up. Two ameliorate this you have two options (not really, you only have Option #2):

    1. Set use_pooling = False to disable pooling on the database connection.

      That means every time a transaction is to occur, it will create a connection to the database and close it when it’s done. This causes some overhead, but will be fine if you only have a couple of workers that are not interacting with the database much.

      If you have a lot of workers, or few workers who are accessing the database a lot, then this can cause issues on the PostgreSQL server: too many incoming connections at once will make the server just stop accepting new connections. Then you have issues with tasks failing because they cannot transmit their results to the database.

    2. Set use_pooling = True, but require that every child process runs init_process(database) ONCE – and only once – the moment it is created. This will make the database dispose of itself in the child process (database.engine.dispose()), and force it to reconnect.

      That’s good, but when should Astra do that?! We cannot do it when a task is run, because database connections are needed to check if a task is complete. We cannot even do it during the complete() method – which is run first by luigi when a child process is created – because that complete() method will be run many times. We have to run this step the moment that the child process is created, and never think of it again.

      To do this we need to:

      -> Set “core.parallel_scheduling = True” in utah.cfg like:

      [core] parallel_scheduling=True

      -> Set use_pooling = True

      -> Change luigi/worker.py around line 747 to give the initializer and initargs

      keyword arguments for the multiprocessing.Pool() when a task is added (add()) to a worker. Here is what it looks like:

      # Top of the file: from astra.database import init_process, database

      # Around line 747:

      pool = multiprocessing.Pool(

      initializer=init_process, initargs=(database, ), processes=processes if processes > 0 else None

      )

    This seems to work. Until luigi allows for custom kwargs to be passed to the multiprocessing.Pool, we will need to use a butchered version of luigi that has this functionality.

Type:Reader beware
class astra.database.AstraDatabaseConnection(*args, **kwargs)

SQLAlchemy database connection implementation

dbname = sdss5db
base
engine
bases = []
Session
metadata
connection_params

Returns a dictionary with the connection parameters.

dbversion
auto_reflect = True
create_engine(self, db_connection_string=None, echo=False, use_pooling=True, expire_on_commit=True, pool_pre_ping=True, **kwargs)

Create a new database engine

Resets and creates a new sqlalchemy database engine. Also creates and binds engine metadata and a new scoped session.

_get_password(self, **params)

Get a db password from a pgpass file

Parameters:params (dict) – A dictionary of database connection parameters
Returns:The database password for a given set of connection parameters
_make_connection_string(self, dbname, **params)

Build a db connection string

Parameters:
  • dbname (str) – The name of the database to connect to
  • params (dict) – A dictionary of database connection parameters
Returns:

A database connection string

_conn(self, dbname, silent_on_fail=False, **params)

Connects to the DB and tests the connection.

reset_engine(self)

Reset the engine, metadata, and session

add_base(self, base, prepare=True)

Binds a base to this connection.

prepare_bases(self, base=None)

Prepare a Model Base

Prepares a SQLalchemy Base for reflection. This binds a database engine to a specific Base which maps to a set of ModelClasses. If base is passed only that base will be prepared. Otherwise, all the bases bound to this database connection will be prepared.

__repr__(self)

Return repr(self).

set_profile(self, profile=None, connect=True)

Sets the profile from the configuration file.

Parameters:
  • profile (str) – The profile to set. If None, uses the domain name to determine the profile.
  • connect (bool) – If True, tries to connect to the database using the new profile.
Returns:

connected (bool) – Returns True if the database is connected.

connect(self, dbname=None, silent_on_fail=False, **connection_params)

Initialises the database using the profile information.

Parameters:
  • dbname (str or None) – The database name. If None, defaults to dbname.
  • user (str) – Overrides the profile database user.
  • host (str) – Overrides the profile database host.
  • port (str) – Overrides the profile database port.
  • silent_on_fail (bool) – If True, does not show a warning if the connection fails.
Returns:

connected (bool) – Returns True if the database is connected.

connect_from_parameters(self, dbname=None, **params)

Initialises the database from a dictionary of parameters.

Parameters:
  • dbname (str or None) – The database name. If None, defaults to dbname.
  • params (dict) – A dictionary of parameters, which should include user, host, and port.
Returns:

connected (bool) – Returns True if the database is connected.

static list_profiles(profile=None)

Returns a list of profiles.

Parameters:profile (str or None) – If None, returns a list of profile keys. If profile is not None returns the parameters for the given profile.
become(self, user)

Change the connection to a certain user.

become_admin(self, admin=None)

Becomes the admin user.

If admin=None defaults to the admin value in the current profile.

become_user(self, user=None)

Becomes the read-only user.

If user=None defaults to the user value in the current profile.

change_version(self, dbversion=None)

Change database version and attempt to reconnect

Parameters:dbversion (str) – A database version
post_connect(self)

Hook called after a successfull connection.

astra.database.database
astra.database.session
astra.database.init_process(database)

Dispose of the existing database engine.

This is a necessary step as soon as a child process is created.