Source code for flask_admin.contrib.pymongo.view

import logging
import typing as t
from typing import TypeGuard  # noqa

import pymongo
from bson import ObjectId
from bson.errors import InvalidId
from flask import flash

from flask_admin._compat import string_types
from flask_admin.actions import action
from flask_admin.babel import gettext
from flask_admin.babel import lazy_gettext
from flask_admin.babel import ngettext
from flask_admin.helpers import get_form_data
from flask_admin.model import BaseModelView

from ..._types import T_FILTER
from ...model.filters import BaseFilter
from .filters import BasePyMongoFilter
from .tools import parse_like_term

# Set up logger
log = logging.getLogger("flask-admin.pymongo")


[docs] class ModelView(BaseModelView): """ MongoEngine model scaffolding. """ column_filters: t.Collection[str | BasePyMongoFilter] | None = None """ Collection of the column filters. Should contain instances of :class:`flask_admin.contrib.pymongo.filters.BasePyMongoFilter` classes. Filters will be grouped by name when displayed in the drop-down. For example:: from flask_admin.contrib.pymongo.filters import BooleanEqualFilter class MyModelView(BaseModelView): column_filters = (BooleanEqualFilter(column=User.name, name='Name'),) or:: from flask_admin.contrib.pymongo.filters import BasePyMongoFilter class FilterLastNameBrown(BasePyMongoFilter): def apply(self, query, value): if value == '1': return query.filter(self.column == "Brown") else: return query.filter(self.column != "Brown") def operation(self): return 'is Brown' class MyModelView(BaseModelView): column_filters = [ FilterLastNameBrown( column=User.last_name, name='Last Name', options=(('1', 'Yes'), ('0', 'No')) ) ] """ def __init__( self, coll, name=None, category=None, endpoint=None, url=None, menu_class_name=None, menu_icon_type=None, menu_icon_value=None, ): """ Constructor :param coll: MongoDB collection object :param name: Display name :param category: Display category :param endpoint: Endpoint :param url: Custom URL :param menu_class_name: Optional class name for the menu item. :param menu_icon_type: Optional icon. Possible icon types: - `flask_admin.consts.ICON_TYPE_GLYPH` - Bootstrap glyph icon - `flask_admin.consts.ICON_TYPE_FONT_AWESOME` - Font Awesome icon - `flask_admin.consts.ICON_TYPE_IMAGE` - Image relative to Flask static directory - `flask_admin.consts.ICON_TYPE_IMAGE_URL` - Image with full URL :param menu_icon_value: Icon glyph name or URL, depending on `menu_icon_type` setting """ self._search_fields = [] if name is None: name = self._prettify_name(coll.name) if endpoint is None: endpoint = (f"{coll.name}view").lower() super().__init__( None, # type: ignore[arg-type] name, category, endpoint, url, menu_class_name=menu_class_name, menu_icon_type=menu_icon_type, menu_icon_value=menu_icon_value, ) self.coll = coll def scaffold_pk(self): return "_id"
[docs] def get_pk_value(self, model): """ Return primary key value from the model instance :param model: Model instance """ return model.get("_id")
[docs] def scaffold_list_columns(self): """ Scaffold list columns """ raise NotImplementedError()
[docs] def scaffold_sortable_columns(self): """ Return sortable columns dictionary (name, field) """ return []
[docs] def scaffold_filters(self, attr): """ Return filter object(s) for the field :param name: Either field name or field instance """ raise NotImplementedError()
[docs] def is_valid_filter(self, filter: BaseFilter | t.Any) -> TypeGuard[BaseFilter]: """ Validate if it is valid MongoEngine filter :param filter: Filter object """ return isinstance(filter, BasePyMongoFilter)
[docs] def scaffold_form(self): raise NotImplementedError()
def _get_field_value(self, model, name): """ Get unformatted field value from the model """ return model.get(name) def _search(self, query: dict[str, t.Any], search_term: str) -> dict[str, t.Any]: values = search_term.split(" ") queries: list[dict[str, t.Any]] = [] # Construct inner querie for value in values: if not value: continue regex = parse_like_term(value) stmt = [] for field in self._search_fields: stmt.append({field: {"$regex": regex}}) if stmt: if len(stmt) == 1: queries.append(stmt[0]) else: queries.append({"$or": stmt}) # Construct final query if queries: if len(queries) == 1: final = queries[0] else: final = {"$and": queries} if query: query = {"$and": [query, final]} else: query = final return query def get_query(self) -> dict[str, t.Any]: return {}
[docs] def get_list( # type: ignore[override] self, page: int | None, sort_column: str | None, sort_desc: bool, search: str | None, filters: t.Sequence[T_FILTER] | None, execute: bool = True, page_size: int | None = None, ) -> tuple[int | None, t.Any]: """ Get list of objects from MongoEngine :param page: Page number :param sort_column: Sort column :param sort_desc: Sort descending :param search: Search criteria :param filters: List of applied fiters :param execute: Run query immediately or not :param page_size: Number of results. Defaults to ModelView's page_size. Can be overriden to change the page_size limit. Removing the page_size limit requires setting page_size to 0 or False. """ query = self.get_query() # Filters if self._filters: data: list[str] | str = [] for flt, _flt_name, value in filters: # type: ignore[union-attr] f = self._filters[flt] data = f.apply(data, f.clean(value)) if data: if len(data) == 1: query = data[0] # type: ignore[assignment] else: query["$and"] = data # Search if self._search_supported and search: query = self._search(query, search) # Get count count = self.coll.count_documents(query) if not self.simple_list_pager else None # Sorting sort_by = None if sort_column: sort_by = [ (sort_column, pymongo.DESCENDING if sort_desc else pymongo.ASCENDING) ] else: order = self._get_default_order() if order: sort_by = [ (col, pymongo.DESCENDING if desc else pymongo.ASCENDING) for (col, desc) in order ] # Pagination if page_size is None: page_size = self.page_size skip = 0 if page and page_size: skip = page * page_size results = self.coll.find(query, sort=sort_by, skip=skip, limit=page_size) if execute: results = list(results) return count, results
def _get_valid_id(self, id): try: return ObjectId(id) except InvalidId: return id
[docs] def get_one(self, id): """ Return single model instance by ID :param id: Model ID """ return self.coll.find_one({"_id": self._get_valid_id(id)})
[docs] def edit_form(self, obj): # type: ignore[override] """ Create edit form from the MongoDB document """ return self._edit_form_class(get_form_data(), **obj)
[docs] def create_model(self, form): """ Create model helper :param form: Form instance """ try: model = form.data self._on_model_change(form, model, True) self.coll.insert_one(model) except Exception as ex: flash(gettext("Failed to create record. %(error)s", error=str(ex)), "error") log.exception("Failed to create record.") return False else: self.after_model_change(form, model, True) return model
[docs] def update_model(self, form, model): """ Update model helper :param form: Form instance :param model: Model instance to update """ try: model.update(form.data) self._on_model_change(form, model, False) pk = self.get_pk_value(model) self.coll.replace_one({"_id": pk}, model) except Exception as ex: flash(gettext("Failed to update record. %(error)s", error=str(ex)), "error") log.exception("Failed to update record.") return False else: self.after_model_change(form, model, False) return True
[docs] def delete_model(self, model): """ Delete model helper :param model: Model instance """ try: pk = self.get_pk_value(model) if not pk: raise ValueError("Document does not have _id") self.on_model_delete(model) self.coll.delete_one({"_id": pk}) except Exception as ex: flash(gettext("Failed to delete record. %(error)s", error=str(ex)), "error") log.exception("Failed to delete record.") return False else: self.after_model_delete(model) return True
# Default model actions
[docs] def is_action_allowed(self, name): # Check delete action permission if name == "delete" and not self.can_delete: return False return super().is_action_allowed(name)
@action( "delete", lazy_gettext("Delete"), lazy_gettext("Are you sure you want to delete selected records?"), ) def action_delete(self, ids): try: count = 0 # TODO: Optimize me for pk in ids: if self.delete_model(self.get_one(pk)): count += 1 flash( ngettext( "Record was successfully deleted.", "%(count)s records were successfully deleted.", count, count=count, ), "success", ) except Exception as ex: flash( gettext("Failed to delete records. %(error)s", error=str(ex)), "error" )