菜单
个人主页
(当前)
写文章
浏览博文
    
搜索
登录
微信公众号
站点链接
半根蓝白个人主页
CSDN
Github
友情链接
摘繁华个人博客
博文目录
#custom-toc-container
django 分表设计实现及兼容 admin 后台
BGLB0324
2022年7月17日 03:36
最后发布:2022年7月17日 03:36
首发:2022年7月17日 03:36
139
0
博文分类:
Python
博文标签:
分表
版权声明:本文为博主[BGLB0324]原创文章,遵循
CC 4.0 BY
版权协议,转载请附上原文出处链接和本声明。
本文链接:
http://blog.bglb.work/blog/blog-detail/86
版权
# django 分表设计及兼容 admin 后台 > 其实, django 分表的业务并不是很常见,因为 django 速度上不太适合做大量的数据查询,还有就是现代的数据库(`mysql`、`pg`)其实已经很能扛了, 根本不需要分表, 百万级数据以上才可能考虑分表(我瞎说的, 因为还没接触到这种海量数据)。 一些废话:原本这篇博文计划很早了,但是由于种种原因,时间不是很充裕,就一直耽搁了(就是太懒了),终于今晚腾出来一点时间做个回顾(根本原因是,最近在做`odoo`的技术栈,不是很顺利,回来`django`找找自信!) 前言:小学生的流水账哦,大佬可以略过。 ## 分表的逻辑 - 按照固定的 `key` 进行分表 商品表: 按照商品种类分表: [水果(shop_fruit),蔬菜(shop_vegetable),......] (可能不太准确, 能理解就行) - 按照时间维度进行分表 账单详情表按年:[bill_202201, bill_202202, ......] 请求日志表按月:[request_log_2022, request_log_2023, ......] ## 自定义分表model ```python class ShareModel(models.Model): """ 分表继承的model, 按年份, 按月份, 自定义 """ @classmethod def get_model(cls, sharing_key: str, verbose=None): key = sharing_key.split('=')[-1] class Meta: managed = False db_table = '{}_{}'.format(cls._meta.db_table, key.lower()) verbose_name = verbose if verbose else f'{cls._meta.verbose_name}({cls.get_shared_keys().get(key)})' verbose_name_plural = verbose_name attrs = { 'Meta': Meta, '__module__': cls.__module__, } # 这里会报一个 RuntimeWarning: Model ... was already registered. 没办法去除 生产环境无影响 return type(''.join([item.title() for item in Meta.db_table.split('_')]), (cls,), attrs) @classmethod def table_exists(cls, sharing_key) -> bool: """ 判断是否已经在数据库 暂未实现 :param sharing_key: :return: """ # TODO: 确认表是否存在 return False @classmethod def get_shared_year(cls, start_year=2020): """ 年份: 获取所有分表的年份 """ current_year = datetime.today().year max = (current_year - start_year) + 1 year = [current_year - offset for offset in range(max)] return year @classmethod def get_shared_month(cls, first_year=2020, first_month=7): """ 所有 分表月份: 基于年份 :return: list ['202001', '202002', 。。。] """ all_month = [] years = cls.get_shared_year() for year in years: if year == first_year: start_month = first_month else: start_month = 1 range_month = datetime.today().month + 1 if year == datetime.today().year else 13 month = [str(year)+'0'+str(item) if item < 10 else str(year)+str(item) for item in range(start_month, range_month)] all_month.extend(month) return all_month def delete(self, using=None, keep_parents=False, **kwargs): if hasattr(self, 'shared_key'): delattr(self, 'shared_key') return super().delete() @classmethod def get_shared_keys(cls) -> dict: """ 获取 所有的分表 key;若分表方式不同, 重写此方法 :return: """ return {item: f'{item}年' for item in cls.get_shared_year()} class Meta: # 虚拟model, 只有通过调用 get_model 才会实例化出来一个 abstract = True ``` #### 使用示例 > 这里我们选一个中等复杂度的分表,既有固定`key`及 时间 两个维度综合的分表策略 (其实也不复杂,不过django官方没有相关的方案 就很难受) 这里我们只需要继承上面的 `model`,重写 `get_shared_key` 就可以实现动态分表啦 ```python class VideoShareModel(ShareModel): """ Video 我们假定 有很多不同频台的视频需要存储 平台:抖音,西瓜,快手 抖音: 视频量大 需要按月存 西瓜:视频量小,按年存 快手:同西瓜 """ video_source = models.CharField(max_length=10, blank=True, null=True, verbose_name='视频来源', help_text='视频来源') @classmethod def get_shared_keys(cls): all_platform = (('douyin', '抖音'), ('xigua', '西瓜'), ('kuaishou', '快手'),) all_shared_key = {} for shared_item in all_platform: if shared_item[0].lower() == 'douyin': # 抖音 按月存 shared_time = cls.get_shared_month() else: # 其他 按年存 shared_time = cls.get_shared_year(start_year=2020) shared_key = {f'{shared_item[0]}_{time_key}': f'{shared_item[1]}_{time_key}' for time_key in shared_time} all_shared_key.update(shared_key) return all_shared_key class Meta: # 这里注意下 当前 model 只是虚拟 model abstract = True db_table = 'video' verbose_name = '视频管理' verbose_name_plural = verbose_name # 半自动生成表(运行`python manage.py makemigrations` 及 `python manage.py migrate` 时会自动生成分表) for k, item in VideoShareModel.get_shared_keys().items(): VideoShareModel.get_model(k) ``` ## 分表的orm查询 ```python current_source = 'douyin' query = VideoShareModel.get_model(current_source).objects.filter(...) # 其他方法类似 ``` ## 分表的后台展示 #### 首先重写一个 分表的过滤器 因为需要查询的话必须首先得有这个key ```python class SharedKeyFilter(SimpleListFilter, ABC): """ 通用分表的过滤器 继承 此过滤器 可以在 后台选择对应的分表 进行增删改查 """ title = '选择分表' parameter_name = 'shared_key' def lookups(self, request, model_admin): return [(k, v) for k, v in model_admin.model.get_shared_keys().items()] ``` #### 然后重写一个 model admin的类 用于在后台展示数据 ```python class SharedModelAdmin(ModelAdmin): """ 通用的分表 ModelAdmin 变量命名可能不是很准确 * source_model: 默认查询的分表 model 初始化查询的 model 必须指定 * VIRTUAL_SHARE_MODEL: 所有分表 model 的基类 也就是继承 ShareModel 的类名 必须指定 """ __url = None source_model = None VIRTUAL_SHARE_MODEL = None # 优化 show_full_result_count = False # @display(description='所属分表') django2.1 没有 这个装饰器 def shared_key_readonly(self, obj): """""" if obj: vertual_db_table = f'{self.VIRTUAL_SHARE_MODEL.Meta.db_table}_' shared_key = obj._meta.db_table.replace(vertual_db_table, '').title() return self.VIRTUAL_SHARE_MODEL.get_shared_keys().get(shared_key, None) else: return '--' shared_key_readonly.short_description = '所属分表' def get_readonly_fields(self, request, obj=None): result = super().get_readonly_fields(request, obj) if obj: result = list(result) result.insert(0, 'shared_key_readonly') return set(result) return result def get_fieldsets(self, request, obj=None): """""" result = super().get_fieldsets(request, obj) if obj: result = list(result) result.pop(0) result.insert(0, ('所属分表', {'fields': ('shared_key_readonly',)})) else: result = list(result) result.insert(0, ('所属分表', {'fields': ('shared_key',)})) return result def get_queryset(self, request): """""" if 'shared_key' in request.request_data and len(request.request_data) > 1: shared_key = request.request_data.pop('shared_key') request.GET._mutable = True request.GET.pop('shared_key') # self.model = self.VIRTUAL_SHARE_MODEL.get_model(shared_key) return super().get_queryset(request) def get_form(self, request, obj=None, change=False, **kwargs): """ Return a Form class for use in the admin add view. This is used by add_view and change_view. """ self.model = self.source_model # 生成一个 假的 数据库字段 shared_key_filed = CharField(choices=((k, v) for k, v in self.VIRTUAL_SHARE_MODEL.get_shared_keys().items()), verbose_name='所属分表', help_text='所属分表', name='shared_key') # 设置本字段相关值, 骗过 django 底层的字段检测 attr_dict = { 'attname': 'shared_key', 'concrete': True, 'model': self.model, 'concrete_model': self.model, 'column': None, } for k, v in attr_dict.items(): setattr(shared_key_filed, k, v) # 添加 到 私有字段 if len(self.model._meta.private_fields) == 0: self.model._meta.private_fields.append(shared_key_filed) return super().get_form(request, obj, change, **kwargs) def render_change_form(self, request, context, add=False, change=False, form_url='', obj=None): if add or change: self.model = self.source_model return super().render_change_form(request, context, add, change, form_url, obj) def _changeform_view(self, request, object_id, form_url, extra_context): shared_key = request.request_data.get('_changelist_filters', None) or request.request_data.get('shared_key', None) shared_key = [item for item in shared_key.split('&') if 'shared_key' in item] if object_id and shared_key: self.model = self.VIRTUAL_SHARE_MODEL.get_model(shared_key[0].split('=')[-1]) return super()._changeform_view(request, object_id, form_url, extra_context) def delete_model(self, request, obj): """""" obj._meta.model = self.VIRTUAL_SHARE_MODEL.get_model(obj.shared_key) obj.delete() obj.__delattr__('shared_key') obj._meta.model = self.source_model def save_model(self, request, obj, form, change): """""" if change: shared_key = request.request_data.get('_changelist_filters', None) or request.request_data.get('shared_key', None) shared_key = ''.join([item for item in shared_key.split('&') if 'shared_key' in item]) else: shared_key = form.data.get('shared_key', '') obj._meta.model = self.VIRTUAL_SHARE_MODEL.get_model(shared_key.split('=')[-1]) obj.save() obj._meta.model = self.source_model def get_urls(self): if self.__url: return self.__url self.__url = super().get_urls() return self.__url def response_add(self, request, obj, post_url_continue=None): self.model = self.source_model return super().response_add(request, obj, post_url_continue) def response_change(self, request, obj): """""" self.model = self.source_model return super().response_change(request, obj) ``` #### 使用示例 ```python VideoModel = VideoShareModel.get_model('douyin', '视频信息') # 默认查询抖音 class VideoListFilter(SharedKeyFilter): """ video 分表过滤器 """ def queryset(self, request, queryset): q = self.value() qs = OrderList.get_model(q, ).objects.get_queryset() if q else queryset return qs @register(VideoModel) class VideoModelAdmin(SharedModelAdmin): """ video 分表后台展示 """ list_display = (..., ) search_fields = (...) fieldsets = (..., ) list_filter = (InventoryFilter, ) source_model = VideoModel VIRTUAL_SHARE_MODEL = VideoShareModel ``` 使用以上的方式, 就可以兼容后台的增删改查 ## 其他关于django 后台的适配(就是 魔改AdminModel) #### 非外键的过滤及展示 比如 有个 商品表, 里面有个 `shop_creator_id` 但是 这个没有物理外键, 你又想按照 用户名在`django`后台展示并且选择查询 就可以按照下面的方法 ```python class UserIdNotForeignKeyFilter(SimpleListFilter, ABC): """ """ title = '选择用户' parameter_name = 'user_id_filter' user_list: list = None # 这里指定当前model中的 user_id 字段值 user_id_field_name = '' @classmethod def set_user_list(cls, user_list): """ user_list: [(user_id, user_name)] :return: """ return type('UserIdNotForeignKeyFilter', (UserIdNotForeignKeyFilter,), {'user_list': user_list}) def lookups(self, request, model_admin): if not self.user_id_field_name: try: model_admin.model._meta.get_field('user_id') self.user_id_field_name = 'user_id' except Exception: self.user_id_field_name = 'add_user_id' return self.user_list def queryset(self, request, queryset): """ :param request: :param queryset: :return: """ user_id_filter = request.request_data.get(self.parameter_name, '') if user_id_filter: return queryset.filter(**{self.user_id_field_name: user_id_filter}) return queryset.filter() class UserIdNotForeignKeyMinMx(object): """ 对于 非外键 的 user_id 进行 处理 """ UserList = None def user_name(self, obj): """ 将 user_id 转换 为 user_name 显示 :param obj: :return: """ user_query = self.UserList if user_query: user_dict = dict(user_query) # 这里 应该是可以优化的 直接使用 user_id = getattr(obj, self.user_id_field_name, None) # 很久之前的代码了,需要验证哈 user_id = getattr(obj, 'user_id', None) if not user_id: user_id = getattr(obj, 'add_user_id', None) return user_dict.get(user_id) user_name.short_description = '添加用户名称' class UserIdNotForeignKeyAdmin(UserIdNotForeignKeyMinMx, ModelAdmin): def get_list_filter(self, request): self.list_filter = list(super().get_list_filter(request)) temp = UserIdNotForeignKeyFilter.set_user_list(list(self.UserList)) for item in self.list_filter: if isinstance(item, str): continue if issubclass(item, UserIdNotForeignKeyFilter): return self.list_filter self.list_filter.append(temp) return self.list_filter def get_list_display(self, request): self.list_display = list(super().get_list_display(request)) if 'user_name' not in self.list_display: self.list_display.append('user_name') return self.list_display ``` 使用示例: ```python @register(ShopModel) class ShopModelAdmin(UserIdNotForeignKeyIEAdmin): """ 商品表后台展示 """ # 这里 因为 django 使用缓存 有可能会导致 新添加的用户 不能及时添加进来,导致不能被搜索及展示 UserList = UserInfo.objects.filter().values_list('id', 'name') list_select_related = (......) list_filter = (...... ) search_fields = (......) list_display = (......, 'user_id') # 这里必须添加一个 user_id 否则 获取不到user_id 就无法进行匹配 show_full_result_count = False ``` #### 时间戳的过滤及展示 ```python class TimestampFilter(SimpleListFilter, ABC): """ 时间戳 过滤 """ title = '时间范围' parameter_name = 'add_time__range' def lookups(self, request, model_admin): return [ (Time.get_day_timestamp_range(-1), '昨天'), (Time.get_day_timestamp_range(), '今天'), (Time.get_week_timestamp_range(-1), '上周'), (Time.get_week_timestamp_range(), '本周'), (Time.get_month_timestamp_range(), '本月'), (Time.get_month_timestamp_range(-1), '上个月'), ((Time.get_month_timestamp_range(-3)[0], Time.get_month_timestamp_range(-1)[1]), '前三个月'), (Time.get_year_timestamp_range(), '今年'), (Time.get_year_timestamp_range(-1), '去年'), # 这里get__timestamp_range` 返回 (int(开始的时间戳), int(结束的时间戳)) ] def queryset(self, request, queryset): """ 根据 时间范围筛选 数据 :param request: :param queryset: :return: """ add_time__range = request.request_data.get(self.parameter_name, '') if add_time__range: # 时间太过久远了 这里使用eval的原因已经想不起来了 # 这里应该是可以优化的 # return queryset.filter(**{f'{self.timestamp_filed_name}__range': eval(add_time__range)}) return queryset.filter(add_time__range=eval(add_time__range)) return queryset.filter() class TimestampMixin(object): timestamp_filed_name = 'add_time' def datetime_format(self, obj): return Time.timestamp_format(getattr(obj, self.timestamp_filed_name)) datetime_format.short_description = '添加时间' class TimestampAdmin(TimestampMixin, ModelAdmin): def get_list_filter(self, request): self.list_filter = list(super().get_list_filter(request)) if TimestampFilter not in self.list_filter: self.list_filter.append(TimestampFilter) return self.list_filter def get_list_display(self, request): self.list_display = list(super().get_list_display(request)) if 'datetime_format' not in self.list_display: self.list_display.append('datetime_format') return self.list_display ``` 使用示例 ```python @register(TestModel) class TestTimestampAdmin(TimestampAdmin): """ 产品分类管理 """ timestamp_filed_name = 'create_timestamp' # 时间戳字段 list_display = () list_filter = (......) search_fields = ('name', ) show_full_result_count = False ``` ## 下面是一些自己封装(魔改)的通用类 下面这些 包含了上面说的 非外键字段,时间戳, 导入导出, 分表 展示过滤的通用类(适用于 admin 后台哦 ) 具体使用方法参照上面的两个示例,很简单的。 ```python # -*- coding:utf-8 -*- # @Time : 2021/12/17 14:28 # @Author : BGLB # @Software : PyCharm import datetime from abc import ABC from urllib import parse from django.contrib.admin import SimpleListFilter, ModelAdmin from django.db.models import CharField from django.utils.translation import gettext_lazy as _ from import_export.admin import ImportMixin, ExportMixin, ExportActionMixin from simpleui.admin import AjaxAdmin from utils.util import Time class SharedKeyFilter(SimpleListFilter, ABC): """ 通用分表的过滤器 继承 此过滤器 可以在 后台选择对应的分表 进行增删改查 """ title = '选择分表' parameter_name = 'shared_key' def lookups(self, request, model_admin): return [(k, v) for k, v in model_admin.model.get_shared_keys().items()] class UserIdNotForeignKeyFilter(SimpleListFilter, ABC): """ """ title = '选择用户' parameter_name = 'user_id_filter' user_list: list = None user_id_field_name = '' @classmethod def set_user_list(cls, user_list): """ :return: """ return type('UserIdNotForeignKeyFilter', (UserIdNotForeignKeyFilter,), {'user_list': user_list}) def lookups(self, request, model_admin): if not self.user_id_field_name: try: model_admin.model._meta.get_field('user_id') self.user_id_field_name = 'user_id' except Exception: self.user_id_field_name = 'add_user_id' return self.user_list def queryset(self, request, queryset): """ :param request: :param queryset: :return: """ user_id_filter = request.request_data.get(self.parameter_name, '') if user_id_filter: return queryset.filter(**{self.user_id_field_name: user_id_filter}) return queryset.filter() class TimestampFilter(SimpleListFilter, ABC): """ 时间戳 过滤 """ title = '时间范围' parameter_name = 'add_time__range' def lookups(self, request, model_admin): return [ (Time.get_day_timestamp_range(-1), '昨天'), (Time.get_day_timestamp_range(), '今天'), (Time.get_week_timestamp_range(-1), '上周'), (Time.get_week_timestamp_range(), '本周'), (Time.get_month_timestamp_range(), '本月'), (Time.get_month_timestamp_range(-1), '上个月'), ((Time.get_month_timestamp_range(-3)[0], Time.get_month_timestamp_range(-1)[1]), '前三个月'), (Time.get_year_timestamp_range(), '今年'), (Time.get_year_timestamp_range(-1), '去年'), ] def queryset(self, request, queryset): """ 根据 时间范围筛选 数据 :param request: :param queryset: :return: """ add_time__range = request.request_data.get(self.parameter_name, '') if add_time__range: return queryset.filter(add_time__range=eval(add_time__range)) return queryset.filter() class SharedModelAdmin(ModelAdmin): """ 通用的分表 ModelAdmin 变量命名可能不是很准确 * source_model: 默认查询的分表 model 初始化查询的 model 必须指定 * VIRTUAL_SHARE_MODEL: 所有分表 model 的基类 也就是继承 ShareModel 的类名 必须指定 """ __url = None source_model = None VIRTUAL_SHARE_MODEL = None # 优化 show_full_result_count = False # @display(description='所属分表') django2.1 没有 这个装饰器 def shared_key_readonly(self, obj): """""" if obj: vertual_db_table = f'{self.VIRTUAL_SHARE_MODEL.Meta.db_table}_' shared_key = obj._meta.db_table.replace(vertual_db_table, '').title() return self.VIRTUAL_SHARE_MODEL.get_shared_keys().get(shared_key, None) else: return '--' shared_key_readonly.short_description = '所属分表' def get_readonly_fields(self, request, obj=None): result = super().get_readonly_fields(request, obj) if obj: result = list(result) result.insert(0, 'shared_key_readonly') return set(result) return result def get_fieldsets(self, request, obj=None): """""" result = super().get_fieldsets(request, obj) if obj: result = list(result) result.pop(0) result.insert(0, ('所属分表', {'fields': ('shared_key_readonly',)})) else: result = list(result) result.insert(0, ('所属分表', {'fields': ('shared_key',)})) return result def get_queryset(self, request): """""" if 'shared_key' in request.request_data and len(request.request_data) > 1: shared_key = request.request_data.pop('shared_key') request.GET._mutable = True request.GET.pop('shared_key') # self.model = self.VIRTUAL_SHARE_MODEL.get_model(shared_key) return super().get_queryset(request) def get_form(self, request, obj=None, change=False, **kwargs): """ Return a Form class for use in the admin add view. This is used by add_view and change_view. """ self.model = self.source_model # 生成一个 假的 数据库字段 shared_key_filed = CharField(choices=((k, v) for k, v in self.VIRTUAL_SHARE_MODEL.get_shared_keys().items()), verbose_name='所属分表', help_text='所属分表', name='shared_key') # 设置本字段相关值, 骗过 django 底层的字段检测 attr_dict = { 'attname': 'shared_key', 'concrete': True, 'model': self.model, 'concrete_model': self.model, 'column': None, } for k, v in attr_dict.items(): setattr(shared_key_filed, k, v) # 添加 到 私有字段 if len(self.model._meta.private_fields) == 0: self.model._meta.private_fields.append(shared_key_filed) return super().get_form(request, obj, change, **kwargs) def render_change_form(self, request, context, add=False, change=False, form_url='', obj=None): if add or change: self.model = self.source_model return super().render_change_form(request, context, add, change, form_url, obj) def _changeform_view(self, request, object_id, form_url, extra_context): shared_key = request.request_data.get('_changelist_filters', None) or request.request_data.get('shared_key', None) shared_key = [item for item in shared_key.split('&') if 'shared_key' in item] if object_id and shared_key: self.model = self.VIRTUAL_SHARE_MODEL.get_model(shared_key[0].split('=')[-1]) return super()._changeform_view(request, object_id, form_url, extra_context) def delete_model(self, request, obj): """""" obj._meta.model = self.VIRTUAL_SHARE_MODEL.get_model(obj.shared_key) obj.delete() obj.__delattr__('shared_key') obj._meta.model = self.source_model def save_model(self, request, obj, form, change): """""" if change: shared_key = request.request_data.get('_changelist_filters', None) or request.request_data.get('shared_key', None) shared_key = ''.join([item for item in shared_key.split('&') if 'shared_key' in item]) else: shared_key = form.data.get('shared_key', '') obj._meta.model = self.VIRTUAL_SHARE_MODEL.get_model(shared_key.split('=')[-1]) obj.save() obj._meta.model = self.source_model def get_urls(self): if self.__url: return self.__url self.__url = super().get_urls() return self.__url def response_add(self, request, obj, post_url_continue=None): self.model = self.source_model return super().response_add(request, obj, post_url_continue) def response_change(self, request, obj): """""" self.model = self.source_model return super().response_change(request, obj) class UserIdNotForeignKeyMinMx(object): """ 对于 非外键 的 user_id 进行 处理 """ UserList = None def user_name(self, obj): """ :param obj: :return: """ user_query = self.UserList if user_query: user_dict = dict(user_query) user_id = getattr(obj, 'user_id', None) if not user_id: user_id = getattr(obj, 'add_user_id', None) return user_dict.get(user_id) user_name.short_description = '添加用户名称' class TimestampMixin(object): timestamp_filed_name = 'add_time' def datetime_format(self, obj): return Time.timestamp_format(getattr(obj, self.timestamp_filed_name)) datetime_format.short_description = '添加时间' class UserIdNotForeignKeyAdmin(UserIdNotForeignKeyMinMx, ModelAdmin): def get_list_filter(self, request): self.list_filter = list(super().get_list_filter(request)) temp = UserIdNotForeignKeyFilter.set_user_list(list(self.UserList)) for item in self.list_filter: if isinstance(item, str): continue if issubclass(item, UserIdNotForeignKeyFilter): return self.list_filter self.list_filter.append(temp) return self.list_filter def get_list_display(self, request): self.list_display = list(super().get_list_display(request)) if 'user_name' not in self.list_display: self.list_display.append('user_name') return self.list_display class TimestampAdmin(TimestampMixin, ModelAdmin): def get_list_filter(self, request): self.list_filter = list(super().get_list_filter(request)) if TimestampFilter not in self.list_filter: self.list_filter.append(TimestampFilter) return self.list_filter def get_list_display(self, request): self.list_display = list(super().get_list_display(request)) if 'datetime_format' not in self.list_display: self.list_display.append('datetime_format') return self.list_display class TimestampUserIdNotForeignKeyAdmin(TimestampMixin, UserIdNotForeignKeyMinMx, ModelAdmin): """ 适用于 1. user_id 不是真实外键 2. 有时间字段是是时间戳格式 """ def get_list_filter(self, request): self.list_filter = list(super().get_list_filter(request)) if TimestampFilter not in self.list_filter: self.list_filter.append(TimestampFilter) temp = UserIdNotForeignKeyFilter.set_user_list(list(self.UserList)) for item in self.list_filter: if isinstance(item, str): continue if issubclass(item, UserIdNotForeignKeyFilter): return self.list_filter self.list_filter.append(temp) return self.list_filter def get_list_display(self, request): self.list_display = list(super().get_list_display(request)) if 'datetime_format' not in self.list_display: self.list_display.append('datetime_format') if 'user_name' not in self.list_display: self.list_display.append('user_name') return self.list_display class ImportExportActioinMixin(ImportMixin, ExportMixin): """ 导入导出 Mixin """ def get_export_filename(self, request, queryset, file_format): """ :param request: :param queryset: :param file_format: :return: """ add_time_range = request.request_data.get('add_time__range', []) if add_time_range: add_time_range = eval(add_time_range) date_str_start = datetime.datetime.fromtimestamp(add_time_range[0]).strftime('%Y-%m-%d') date_str_end = datetime.datetime.fromtimestamp(add_time_range[1]).strftime('%Y-%m-%d') date_str = '-({0}_{1})'.format(date_str_start, date_str_end) else: date_str = '' filename = "%s%s.%s"%(self.model._meta.verbose_name, date_str, file_format.get_extension()) return parse.quote(filename) def get_actions(self, request): actions = super().get_actions(request) if self.has_export_permission(request): actions.update( export_admin_action=( ExportActionMixin.export_admin_action, "export_admin_action", _("Export selected %(verbose_name_plural)s"), ) ) return actions return actions class ImportExportAdmin(ImportExportActioinMixin, ModelAdmin): """ 导入导出继承类 """ pass class ImportExportAjaxAdmin(ImportExportActioinMixin, AjaxAdmin): pass class TimestampIEAdmin(ImportExportActioinMixin, TimestampAdmin): """ """ pass class UserIdNotForeignKeyIEAdmin(ImportExportActioinMixin, UserIdNotForeignKeyAdmin): """ 适用于 user_id 没有真实外键关系的 导入导出 """ pass class TimestampUserIdNotForeignKeyIEAdmin(ImportExportActioinMixin, TimestampUserIdNotForeignKeyAdmin): pass ``` ## 结语 > 啊!这项任务终于是完成了,感觉找回了一点自信,回顾之前的一些代码,还是有用的,可以弥补之前的一些逻辑缺陷及设计缺陷!
点赞
0
打赏
暂时没有评论
请
登录
后评论
暂时没有评论