<tbody id="86a2i"></tbody>


<dd id="86a2i"></dd>
<progress id="86a2i"><track id="86a2i"></track></progress>

<dd id="86a2i"></dd>
<em id="86a2i"><ruby id="86a2i"><u id="86a2i"></u></ruby></em>

    <dd id="86a2i"></dd>
    deali

    前言

    最近遇到一個需求,有幾十個Excel,每個的字段都不一樣,然后都差不多是第一行是表頭,后面幾千上萬的數據,需要把這些Excel中的數據全都加入某個已經上線的Django項目

    這就需要每個Excel建個表,然后一個個導入了

    這樣的效率太低,不能忍

    所以我造了個自動生成 Model 和導入腳本的*

    思路

    首先拿出 pandas,它的 DataFrame 用來處理數據很方便

    pandas 加載 Excel 之后,提取表頭,我們要通過表頭來生成數據表的字段。有些 Excel 的表頭是中文的,需要先做個轉換。

    一開始我是想用翻譯API,全都翻譯成英文,不過發現免費的很慢有限額,微軟、DeepL都要申請,很麻煩。索性用個拼音轉換庫,全都轉換成拼音得了~

    然后字段的長度也要確定,或者全部用不限制長度的 TextField

    權衡一下,我還是做一下字段長度判定的邏輯,遍歷整個表,找出各個字段最長的數據,然后再加一個偏移量,作為最大長度。

    接著生成 Model 類,這里我用 jinja2 模板語言,先把大概的模板寫好,然后根據提取出來的字段名啥的生成。

    最后生成 admin 配置和導入腳本,同理,也是用 jinja2 模板。

    實現

    簡單介紹下思路,現在開始上代碼。

    就幾行而已,Python很省代碼~

    模型

    首先定義倆模型

    字段模型

    class Field(object):
        def __init__(self, name: str, verbose_name: str, max_length: int = 128):
            self.name = name
            self.verbose_name = verbose_name
            self.max_length = max_length
    
        def __str__(self):
            return f'<Field>{self.name}:{self.verbose_name}'
    
        def __repr__(self):
            return self.__str__()
    

    Model模型

    為了符合Python關于變量的命名規范,snake_name 屬性是用正則表達式實現駝峰命名轉蛇形命名

    class Model(object):
        def __init__(self, name: str, verbose_name: str, id_field: Field, fields: List[Field]):
            self.name = name
            self.verbose_name = verbose_name
            self.id_field = id_field
            self.fields: List[Field] = fields
    
        @property
        def snake_name(self):
            import re
            pattern = re.compile(r'(?<!^)(?=[A-Z])')
            name = pattern.sub('_', self.name).lower()
            return name
    
        def __str__(self):
            return f'<Model>{self.name}:{self.verbose_name}'
    
        def __repr__(self):
            return self.__str__()
    

    代碼模板

    使用 jinja2 實現。

    本身 jinja2 是 Flask、Django 之類的框架用來渲染網頁的。

    不過單獨使用的效果也不錯,我的 DjangoStarter 框架也是用這個 jinja2 來自動生成 CRUD 代碼~

    Model模板

    # -*- coding:utf-8 -*-
    from django.db import models
    
    class {{ model.name }}(models.Model):
        """{{ model.verbose_name }}"""
        {% for field in model.fields -%}
        {{ field.name }} = models.CharField('{{ field.verbose_name }}', default='', null=True, blank=True, max_length={{ field.max_length }})
        {% endfor %}
        class Meta:
            db_table = '{{ model.snake_name }}'
            verbose_name = '{{ model.verbose_name }}'
            verbose_name_plural = verbose_name
    

    Admin配置模板

    @admin.register({{ model.name }})
    class {{ model.name }}Admin(admin.ModelAdmin):
        list_display = [{% for field in model.fields %}'{{ field.name }}', {% endfor %}]
        list_display_links = None
    
        def has_add_permission(self, request):
            return False
    
        def has_delete_permission(self, request, obj=None):
            return False
    
        def has_view_permission(self, request, obj=None):
            return False
    

    數據導入腳本

    這里做了幾件事:

    • 使用 pandas 處理空值,填充空字符串
    • 已有數據進行批量更新
    • 新數據批量插入

    更新邏輯麻煩一點,因為數據庫一般都有每次最大更新數量的限制,所以我做了分批處理,通過 update_data_once_max_lines 控制每次最多同時更新多少條數據。

    def import_{{ model.snake_name }}():
        file_path = path_proc(r'{{ excel_filepath }}')
    
        logger.info(f'讀取文件: {file_path}')
        xlsx = pd.ExcelFile(file_path)
        df = pd.read_excel(xlsx, 0, header={{ excel_header }})
        df.fillna('', inplace=True)
    
        logger.info('開始處理數據')
    
        id_field_list = {{ model.name }}.objects.values_list('{{ model.id_field.name }}', flat=True)
        item_list = list({{ model.name }}.objects.all())
    
        def get_item(id_value):
            for i in item_list:
                if i.shen_qing_ren_zheng_jian_hao_ma == id_value:
                    return i
            return None
    
        insert_data = []
        update_data_once_max_lines = 100
        update_data_sub_set_index = 0
        update_data = [[]]
        update_fields = set()
    
        for index, row in df.iterrows():
            if '{{ model.id_field.verbose_name }}' not in row:
                logger.error('id_field {} is not existed'.format('{{ model.id_field.verbose_name }}'))
                continue
    
            if row['{{ model.id_field.verbose_name }}'] in id_field_list:
                item = get_item(row['{{ model.id_field.verbose_name }}'])
                {% for field in model.fields -%}
                if '{{ field.verbose_name }}' in row:
                    if item.{{ field.name }} != row['{{ field.verbose_name }}']:
                        item.{{ field.name }} = row['{{ field.verbose_name }}']
                        update_fields.add('{{ field.name }}')
                {% endfor %}
                if len(update_data[update_data_sub_set_index]) >= update_data_once_max_lines:
                    update_data_sub_set_index += 1
                    update_data.append([])
                update_data[update_data_sub_set_index].append(item)
            else:
                # {% for field in model.fields -%}{{ field.verbose_name }},{%- endfor %}
                model_obj = {{ model.name }}()
                {% for field in model.fields -%}
                if '{{ field.verbose_name }}' in row:
                    model_obj.{{ field.name }} = row['{{ field.verbose_name }}']
                {% endfor %}
                insert_data.append(model_obj)
    
        logger.info('開始批量導入')
        {{ model.name }}.objects.bulk_create(insert_data)
        logger.info('導入完成')
    
        if len(update_data[update_data_sub_set_index]) > 0:
            logger.info('開始批量更新')
            for index, update_sub in enumerate(update_data):
                logger.info(f'正在更新 {index * update_data_once_max_lines}-{(index + 1) * update_data_once_max_lines} 條數據')
                {{ model.name }}.objects.bulk_update(update_sub, list(update_fields))
            logger.info('更新完成')
    

    主體代碼

    剩下的全是核心代碼了

    引用依賴

    先把用到的庫導入

    import os
    import re
    from typing import List, Optional
    
    from pypinyin import pinyin, lazy_pinyin, Style
    from jinja2 import Environment, PackageLoader, FileSystemLoader
    

    或者后面直接去我的完整代碼里面拿也行~

    老規矩,我封裝了一個類。

    構造方法需要指定 Excel 文件地址,還有表頭的行索引。

    class ExcelToModel(object):
        def __init__(self, filepath, header_index=0):
            self.filepath = filepath
            self.header_index = header_index
            self.columns = []
            self.fields: List[Field] = []
    
            self.base_dir = os.path.dirname(os.path.abspath(__file__))
            self.template_path = os.path.join(self.base_dir, 'templates')
            self.jinja2_env = Environment(loader=FileSystemLoader(self.template_path))
    
            self.load_file()
    

    這里面有個 self.load_file() 后面再貼。

    字段名中文轉拼音

    用了 pypinyin 這個庫,感覺還不錯。

    轉換后用正則表達式,去除符號,只保留英文和數字。

    代碼如下,也是放在 ExcelToModel 類里邊。

    @staticmethod
    def to_pinyin(text: str) -> str:
        pattern = r'~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\[ '
        text = re.sub(r"[%s]+" % pattern, "", text)
        return '_'.join(lazy_pinyin(text, style=Style.NORMAL))
    

    加載文件

    拿出萬能的 pandas,按照前面說的思路,提取表頭轉換成字段,并且遍歷數據確定每個字段的最大長度,我這里偏移值是32,即在當前數據最大長度基礎上加上32個字符。

    def load_file(self):
        import pandas as pd
        xlsx = pd.ExcelFile(self.filepath)
        df = pd.read_excel(xlsx, 0, header=self.header_index)
        df.fillna('', inplace=True)
        self.columns = list(df.columns)
        for col in self.columns:
            field = Field(self.to_pinyin(col), col)
            self.fields.append(field)
            for index, row in df.iterrows():
                item_len = len(str(row[col]))
                if item_len > field.max_length:
                    field.max_length = item_len + 32
    
            print(field.verbose_name, field.name, field.max_length)
    

    如果覺得這樣生成表太慢,可以把確定最大長度的這塊代碼去掉,就下面這塊代碼

    for index, row in df.iterrows():
        item_len = len(str(row[col]))
        if item_len > field.max_length:
            field.max_length = item_len + 32
    

    手動指定最大長度或者換成不限制長度的 TextField 就行。

    生成文件

    先構造個 context 然后直接用 jinja2 的 render 功能生成代碼。

    為了在導入時判斷數據存不存在,生成代碼時要指定 id_field_verbose_name,即Excel文件中類似“證件號碼”、“編號”之類的列名,注意是Excel中的表頭列名。

    def find_field_by_verbose_name(self, verbose_name) -> Optional[Field]:
        for field in self.fields:
            if field.verbose_name == verbose_name:
                return field
        return None
    
    def generate_file(self, model_name: str, verbose_name: str, id_field_verbose_name: str, output_filepath: str):
        template = self.jinja2_env.get_template('output.jinja2')
        context = {
            'model': Model(
                model_name, verbose_name,
                self.find_field_by_verbose_name(id_field_verbose_name),
                self.fields
            ),
            'excel_filepath': self.filepath,
            'excel_header': self.header_index,
        }
        with open(output_filepath, 'w+', encoding='utf-8') as f:
            render_result = template.render(context)
            f.write(render_result)
    

    使用

    看代碼。

    tool = ExcelToModel('file.xlsx')
    tool.generate_file('CitizenFertility', '房價與居民生育率', '證件號碼', 'output/citizen_fertility.py')
    

    生成出來的代碼都在一個文件里,請根據實際情況放到項目的各個位置。

    完整代碼

    發布到Github了

    地址: https://github.com/Deali-Axy/excel_to_model

    小結

    目前看來完美契合需求,極大節省工作量~

    實際跑起來,不得不吐槽 Python 羸弱的性能,占內存還大… 湊合著用吧。也許后面有時間會優化一下~

    相關文章:

    免费一级a片在线播放视频|亚洲娇小性XXXX色|曰本无码毛片道毛片视频清|亚洲一级a片视频免费观看
    <tbody id="86a2i"></tbody>

    
    
    <dd id="86a2i"></dd>
    <progress id="86a2i"><track id="86a2i"></track></progress>

    <dd id="86a2i"></dd>
    <em id="86a2i"><ruby id="86a2i"><u id="86a2i"></u></ruby></em>

      <dd id="86a2i"></dd>