astra.database¶
Package Contents¶
Classes¶
AstraDatabaseConnection |
SQLAlchemy database connection implementation |
Functions¶
init_process(database) |
Dispose of the existing database engine. |
-
astra.database.AstraBase¶ If you want to use Astra with only a single worker, then there’s no problem.
If you want to use multiple workers then you have a problem. The AstraDatabaseConnection cannot be passed to child processes, otherwise everything fucks up. Two ameliorate this you have two options (not really, you only have Option #2):
Set use_pooling = False to disable pooling on the database connection.
That means every time a transaction is to occur, it will create a connection to the database and close it when it’s done. This causes some overhead, but will be fine if you only have a couple of workers that are not interacting with the database much.
If you have a lot of workers, or few workers who are accessing the database a lot, then this can cause issues on the PostgreSQL server: too many incoming connections at once will make the server just stop accepting new connections. Then you have issues with tasks failing because they cannot transmit their results to the database.
Set use_pooling = True, but require that every child process runs
init_process(database)ONCE – and only once – the moment it is created. This will make the database dispose of itself in the child process (database.engine.dispose()), and force it to reconnect.That’s good, but when should Astra do that?! We cannot do it when a task is run, because database connections are needed to check if a task is complete. We cannot even do it during the
complete()method – which is run first by luigi when a child process is created – because thatcomplete()method will be run many times. We have to run this step the moment that the child process is created, and never think of it again.To do this we need to:
-> Set “core.parallel_scheduling = True” in utah.cfg like:
[core] parallel_scheduling=True
-> Set use_pooling = True
- -> Change luigi/worker.py around line 747 to give the
initializerandinitargs keyword arguments for the
multiprocessing.Pool()when a task is added (add()) to a worker. Here is what it looks like:# Top of the file: from astra.database import init_process, database
# Around line 747:
- pool = multiprocessing.Pool(
initializer=init_process, initargs=(database, ), processes=processes if processes > 0 else None
)
- -> Change luigi/worker.py around line 747 to give the
This seems to work. Until luigi allows for custom kwargs to be passed to the multiprocessing.Pool, we will need to use a butchered version of luigi that has this functionality.
Type: Reader beware
-
class
astra.database.AstraDatabaseConnection(*args, **kwargs)¶ SQLAlchemy database connection implementation
-
dbname= sdss5db¶
-
base¶
-
engine¶
-
bases= []¶
-
Session¶
-
metadata¶
-
connection_params¶ Returns a dictionary with the connection parameters.
-
dbversion¶
-
auto_reflect= True¶
-
create_engine(self, db_connection_string=None, echo=False, use_pooling=True, expire_on_commit=True, pool_pre_ping=True, **kwargs)¶ Create a new database engine
Resets and creates a new sqlalchemy database engine. Also creates and binds engine metadata and a new scoped session.
-
_get_password(self, **params)¶ Get a db password from a pgpass file
Parameters: params (dict) – A dictionary of database connection parameters Returns: The database password for a given set of connection parameters
-
_make_connection_string(self, dbname, **params)¶ Build a db connection string
Parameters: Returns: A database connection string
-
_conn(self, dbname, silent_on_fail=False, **params)¶ Connects to the DB and tests the connection.
-
reset_engine(self)¶ Reset the engine, metadata, and session
-
add_base(self, base, prepare=True)¶ Binds a base to this connection.
-
prepare_bases(self, base=None)¶ Prepare a Model Base
Prepares a SQLalchemy Base for reflection. This binds a database engine to a specific Base which maps to a set of ModelClasses. If
baseis passed only that base will be prepared. Otherwise, all the bases bound to this database connection will be prepared.
-
__repr__(self)¶ Return repr(self).
-
set_profile(self, profile=None, connect=True)¶ Sets the profile from the configuration file.
Parameters: Returns: connected (bool) – Returns True if the database is connected.
-
connect(self, dbname=None, silent_on_fail=False, **connection_params)¶ Initialises the database using the profile information.
Parameters: - dbname (
strorNone) – The database name. IfNone, defaults todbname. - user (str) – Overrides the profile database user.
- host (str) – Overrides the profile database host.
- port (str) – Overrides the profile database port.
- silent_on_fail (
bool) – IfTrue, does not show a warning if the connection fails.
Returns: connected (bool) – Returns True if the database is connected.
- dbname (
-
connect_from_parameters(self, dbname=None, **params)¶ Initialises the database from a dictionary of parameters.
Parameters: Returns: connected (bool) – Returns True if the database is connected.
-
static
list_profiles(profile=None)¶ Returns a list of profiles.
Parameters: profile ( strorNone) – IfNone, returns a list of profile keys. If profile is notNonereturns the parameters for the given profile.
-
become(self, user)¶ Change the connection to a certain user.
-
become_admin(self, admin=None)¶ Becomes the admin user.
If
admin=Nonedefaults to theadminvalue in the current profile.
-
become_user(self, user=None)¶ Becomes the read-only user.
If
user=Nonedefaults to theuservalue in the current profile.
-
change_version(self, dbversion=None)¶ Change database version and attempt to reconnect
Parameters: dbversion (str) – A database version
-
post_connect(self)¶ Hook called after a successfull connection.
-
-
astra.database.database¶
-
astra.database.session¶
-
astra.database.init_process(database)¶ Dispose of the existing database engine.
This is a necessary step as soon as a child process is created.