Source code for asyncorm.apps.app_migration

import hashlib
import importlib
import logging
import os
import re
import types
from datetime import datetime

from asyncorm.exceptions import AsyncOrmMigrationError
from asyncorm.orm_migrations.migration_actions import CreateModel

logger = logging.getLogger("asyncorm")


[docs]class AppMigration: def _check_migration_dir(self, initial=True): self.migrations_dir = os.path.join(self.abs_path, "migrations") if initial: os.makedirs(self.migrations_dir, exist_ok=True) async def _construct_migrations_status(self): fs_declared = self._fs_migration_list() db_migrated = await self._app_db_applied_migrations() migrations_status = {} for migration in fs_declared: migrations_status.update( { migration: { "migrated": True if migration in db_migrated else False, "initial": True if "__initial_" in migration else False, } } ) try: _latest_db_migrated = migrations_status[db_migrated[-1]] if db_migrated else {} except KeyError: raise AsyncOrmMigrationError( 'Something went wrong, migration "{}" does not exist in the filesystem.'.format(db_migrated[-1]) ) migrations_status.update( { "_latest_db_migrated": _latest_db_migrated, "_latest_fs_declared": migrations_status[fs_declared[-1]] if fs_declared else {}, } ) return migrations_status async def _check_makemigrations_status(self): """ Checks that the migration is correcly synced and everything is fine returns the latest migration applied file_name """ _migration_status = await self._construct_migrations_status() _latest_db_migrated = _migration_status["_latest_db_migrated"] _latest_db_migrated_number = self._migration_integer_number(_latest_db_migrated) _latest_fs_declared = _migration_status["_latest_fs_declared"] _latest_fs_declared_number = self._migration_integer_number(_latest_fs_declared) self._migration_integer_number(_latest_fs_declared) # the database doesn't have any migration if not _latest_db_migrated: if _latest_fs_declared: raise AsyncOrmMigrationError( "The model is not in the latest filesystem status, so the migration created will " 'not be consistent.\nPlease "migrate" the database before "makemigrations" again.' ) else: if not _latest_fs_declared: raise AsyncOrmMigrationError( "Severe inconsistence detected, the database has at least one migration applied and no " "migration described in the filesystem." ) if _latest_db_migrated_number > _latest_fs_declared_number: raise AsyncOrmMigrationError( 'There is an inconsistency, the database has a migration named "{}" ' 'more advanced than the filesystem "{}"'.format(_latest_db_migrated, _latest_fs_declared) ) elif _latest_db_migrated_number < _latest_fs_declared_number: raise AsyncOrmMigrationError( "The model is not in the latest filesystem status, so the migration created will " 'not be consistent.\nPlease "migrate" the database before "makemigrations" again.' ) elif _latest_fs_declared != _latest_db_migrated: raise AsyncOrmMigrationError( 'The migration in the filesystem "{}" is not the same migration ' 'applied in the database "{}" .'.format(_latest_fs_declared, _latest_db_migrated) ) async def _check_current_migrations_status(self, target): self._check_migration_dir() _latest_db_migration = self._migration_integer_number(await self._latest_db_migration()) forward = False if target is None: target_fs_migration = self._migration_integer_number(self._latest_fs_migration()) else: target_fs_migration = [fn for fn in next(os.walk(self.migrations_dir))[2] if fn.startswith(target)] if not target_fs_migration: raise AsyncOrmMigrationError("the migration {} does not exist for app {}".format(target, self.name)) if _latest_db_migration is not None and target_fs_migration is not None: if _latest_db_migration > target_fs_migration: raise AsyncOrmMigrationError( 'There is an inconsistency, the database has a migration named "{}" ' 'more advanced than the filesystem "{}"'.format(_latest_db_migration, target_fs_migration) ) if _latest_db_migration < target_fs_migration: forward = True return forward def _get_migration(self, migration_name): migration_name += ".py" migration_path = os.path.join(self.relative_name, "migrations", migration_name) _loader = importlib.machinery.SourceFileLoader("Migration", migration_path) migration = types.ModuleType(_loader.name) _loader.exec_module(migration) return migration @staticmethod def _migration_integer_number(migration_name): match = re.search(r"^(?P<m_number>[\d]{4})", migration_name) if match: return migration_name and int(match.groups("m_number")[0]) or 0 return 0 async def _app_db_applied_migrations(self): AsyncormMigrations = self.orm.get_model("AsyncormMigrations") results = [] async for res in AsyncormMigrations.objects.filter(app_name=self.name): results.append(res) return [r.name for r in results] if results else [] async def _latest_db_migration(self): kwargs = { "select": "name", "table_name": "asyncorm_migrations", "join": "", "ordering": "ORDER BY -id", "condition": "app_name = '{}'".format(self.name), } result = await self.db_backend.request(self.db_backend._db__select.format(**kwargs)) return result and result["name"] or "" async def _check_migration_applied(self, migration_name): kwargs = { "select": "*", "table_name": "asyncorm_migrations", "join": "", "ordering": "", "condition": "app_name = '{}' AND name = '{}'".format(self.name, migration_name), } result = await self.db_backend.request(self.db_backend._db__select.format(**kwargs)) return result def _fs_migration_list(self): py_ext = ".py" self._check_migration_dir() return sorted([fn.rstrip(py_ext) for fn in next(os.walk(self.migrations_dir))[2] if fn[-3:] == py_ext]) def _latest_fs_migration(self): filenames = self._fs_migration_list() return filenames and filenames[-1] or "" def _next_fs_migration_name(self, stage="auto"): if stage not in ("auto", "data", "initial"): raise AsyncOrmMigrationError("that migration stage is not supported") target_fs_migration = self._migration_integer_number(self._latest_fs_migration()) random_hash = hashlib.sha3_512("{}{}".format(target_fs_migration, str(datetime.now())).encode("utf-8")) return "{}__{}_{}".format("0000{}".format(target_fs_migration + 1)[-5:], stage, random_hash.hexdigest()[:20]) def _get_absolute_migration(self, migration_name): return os.path.join(self.abs_path, "migrations", migration_name) def _get_migration_actions(self): actions = [] for model_name, model in self.models.items(): final_migration_state = {} current_state = model.current_state() if current_state != final_migration_state: fields, meta = {}, {} if not final_migration_state: action_type = CreateModel fields = current_state["fields"] meta = current_state["meta"] else: # not create here action_type = CreateModel for k, value in current_state["fields"].items(): if final_migration_state["fields"][k] != value: fields.update({k: value}) meta = current_state["meta"] actions.append(action_type(model_name, fields, meta)) return actions def _get_migration_depends(self): from asyncorm.models import ForeignKey, ManyToManyField depends = [] for _, model in self.models.items(): if isinstance(model.fields, (ForeignKey, ManyToManyField)): pass return depends