整合数据遍历所有文件夹 excel

  • 作者: tchsong
  • 分类: Pandas
  • 阅读: (91)
  • 评论(0)
  • 发表日期: 2023-11-24 10:52:24

"""
    用来遍历文件夹下(包括子目录内递归遍历)所有 excel 文件,批量执行
"""
import locale
import os, io
import pandas as pd
import datetime
import msoffcrypto

# 打印出一个目录下所有的末级文件名称
"""
    os.walk()文件目录遍历器
    语法:os.walk(top),top为需要遍历的目录地址,返回一个三元组(root,dirs,files)。
    1)root 所指的是当前正在遍历的这个文件夹的本身的地址
    2)dirs 是一个 list ,内容是该文件夹中所有的目录的名字(不包括子目录)
    3)files 同样是 list , 内容是该文件夹中所有的文件名称(不包括子目录)
    for root, dirs, files in os.walk(r"/Users/hh/Downloads/附件合集"):
    
        # print(root,'root')
        # print(dirs,'dirs')
        print(files, 'files')
"""


class MYData(object):

    def __init__(self, file_str, password, eligible_list, eligible_sheet, exclude, extensions, output_path):
        self.password = password
        # 根文件夹路径
        self.file_str = file_str
        # 输出的整理好的表格数据
        self.__dfs = pd.DataFrame()
        # 输出的执行过的文件
        self.__list_s = []
        # 有资格参与数据处理的 excel 关键字,用来判断 excel 名字里是否包含
        self.eligible_list = eligible_list
        self.eligible_sheet = eligible_sheet
        self.exclude = exclude
        self.extensions = extensions
        self.output_path = output_path

    def __is_Eligible(self, file_name):
        """
        判断 exle 是否符合特殊要求
        :param file_name:
        :return:
        """
        if file_name.startswith('~'):
            return False
        if file_name.endswith(".xlsx") or file_name.endswith(".xls"):
            # str_list = ['生产工艺', '质量文件', '引物探针', '序列']
            search_str = file_name
            if any(substring in search_str for substring in self.eligible_list):
                print('工作薄名称是否包含所需关键字:True', search_str, self.eligible_list)
                return True
            else:
                print('工作薄名称是否包含所需关键字:False', search_str, self.eligible_list)
                return False
        return False

    def __isExcelEncrypted(self, file_name):
        """
        判断 excel 是否加密
        :return: True or False
        """
        try:
            fileHandle = open(file_name, "rb")
            ofile = msoffcrypto.OfficeFile(fileHandle)
            isEncrypted = ofile.is_encrypted()
            fileHandle.close()
            return isEncrypted
        except Exception as err:
            return "Exception: " + str(format(err))

    def __add_dfs(self, df):
        """
        增加到 DataFarm 用于 pandas 处理数据
        :param df: 每个 excel 的 df
        :return:
        """
        self.__dfs = pd.concat([self.__dfs, df], axis=0, copy=True)

    def __add_list_s(self, file_name, sheetname):
        """
        增加执行的文件列表,拼接一起存入列表
        :param root_dir: 执行目录路径
        :param file: 文件名字
        :return:
        """
        self.__list_s.append(f'{file_name}|{sheetname}')

    def __processing_sheet_data(self, sheet, file_name, root_dir, file):
        """
        处理 sheet 表,读数据写到 DF 里面
        :param sheet: 传递 sheet 名字
        :param file_name: 具体路径全路径
        :param root_dir: 路径
        :param file: 文件名字
        :return:
        """
        str_list = self.eligible_sheet
        search_str = sheet

        if any(substring in search_str for substring in str_list):
            print('True  sheet是否包含关键字', search_str, str_list)
            df = pd.read_excel(file_name, sheet_name=sheet, header=1)
            print(df.columns, df.index, df.shape, df.size,
                  "header ==1 ")
            my_list = df.columns
            substring = 'Unnamed: 3'
            result = any(substring in str(word) for word in my_list)
            print(result)
            if result:
                df = pd.read_excel(file_name, sheet_name=sheet, header=2)
                print(df.columns, df.index, df.shape, df.size,
                      "header ==2 ")
                my_list = df.columns
                substring = 'Unnamed:'
                result = any(substring in str(word) for word in my_list)
                result_2 = [word for word in my_list if substring in str(word)]

                if len(result_2) > 2:
                    df = pd.read_excel(file_name, sheet_name=sheet, header=3)

                    print(df.columns, df.index, df.shape, df.size, "header ==3 ")
                    print(result_2, type(result_2), len(result_2))
                    print(len(result_2))

            df["excel_name"] = os.path.join(root_dir, file)
            df["sheet_name"] = sheet
            print(df.info())
            self.__add_dfs(df)
            self.__add_list_s(file_name=os.path.join(root_dir, file), sheetname=sheet)

        else:
            print('查找后没有不包含啊!False', f'【工作表{search_str}】', f'关键字:{self.eligible_list}')

    def __workbook_not_encrypted(self, *args, **kwargs):
        root_dir = kwargs.get("root_dir", None)
        file = kwargs.get("file", None)
        file_name = kwargs.get("file_name", None)
        # 执行遍历把 sheet 传递到函数去处理
        sheet_keys = pd.read_excel(file_name, sheet_name=None).keys()

        # 把执行过的文件保存下来到指定文件
        import shutil
        # 剪切工作表到新地址
        # os.replace(file_name, os.path.join(self.output_path, file))
        # 复制工作表到新地址
        strroot = '[' + str(root_dir).replace('/', '-').split('-')[-1] + '] '
        shutil.copy2(file_name, os.path.join(self.output_path, strroot + file))

        print('+++++++++++++++++++++++++++++++++++++++', file_name)
        for sheet in sheet_keys:

            print(sheet, '-----------', sheet_keys)
            print('+++++++++++++++++++++++++++++++++++++++')
            try:
                self.__processing_sheet_data(sheet=sheet, file_name=file_name, root_dir=root_dir, file=file)
            except Exception as e:
                print(e, '这里可是不加密 excel 的处理哦!')
                self.__add_list_s(file_name=os.path.join(root_dir, file), sheetname=e)
                return False

    def __workbook_is_encrypted(self, *args, **kwargs):
        root_dir = kwargs.get("root_dir", None)
        file = kwargs.get("file", None)
        root_dir = kwargs.get("root_dir", None)
        file_name = kwargs.get("file_name", None)
        file_temp = io.BytesIO()
        # 打开加密文件
        with open(file_name, 'rb') as f:
            fileExcel = msoffcrypto.OfficeFile(f)
            try:
                fileExcel.load_key(self.password)
                fileExcel.decrypt(file_temp)
                # 顺便把解密的文件保存到指定目录
                strroot = '[加密#' + str(root_dir).replace('/', '-').split('-')[-1] + '] '
                # 把读到了内存中的表另存到新的地址,不会变更原始位置的表
                fileExcel.decrypt(open(os.path.join(self.output_path, strroot + file), 'wb'))

                sheet_keys = pd.read_excel(io=file_temp, sheet_name=None, ).keys()
                print('------------------------------------------------', file_name)
                for sheet in sheet_keys:
                    print(sheet, '-----------', sheet_keys)
                    self.__processing_sheet_data(sheet=sheet, file_name=file_temp, root_dir=root_dir, file=file)
                return None, None
            except Exception as e:
                print(e, '加密解开错误,或许也是文件 utf-16-e的问题')
                self.__add_list_s(file_name=os.path.join(root_dir, file), sheetname=e)
                return False, e
   
    @property
    def goto_excel(self):
        # os.walk(file_path) 深度遍历file_path下的所有子文件夹及文件

        for root_dir, sub_dir, files in os.walk(self.file_str):

            sub_dir[:] = [d for d in sub_dir if d not in self.exclude]
            # os.path.splitext是Python标准库中的一个函数,它可以将一个文件路径拆分成两部分:文件名和文件扩展名
            files = [file for file in files if os.path.splitext(file)[1] in self.extensions]

            for file in files:
                # 判断哪些 excel 是否和条件的则处理, 条件罗列在初始化类的时候定义
                if self.__is_Eligible(file):
                    # 构造绝对路径
                    file_name = os.path.join(root_dir, file)

                    isExcelEncrypted = self.__isExcelEncrypted(file_name)
                    print(f'是否加密{isExcelEncrypted};当前执行的文件是:{file_name}')
                    if not isExcelEncrypted:
                        print(file_name, '++++++++++++++++++++++++++++++++++++++++++++++++++正常未加密的 excle')

                        self.__workbook_not_encrypted(file_name=file_name, file=file, root_dir=root_dir, )
                        print('<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<')
                    else:
                        print(file_name, '---------------------------------------------------处理加密的 excle')
                        # 当密码解密文件不对的时候返回  一个 False,判断后记录下来
                        failed_to_verify_password, f_e = self.__workbook_is_encrypted(file_name=file_name, file=file,
                                                                                      root_dir=root_dir, )
                        print(failed_to_verify_password, f_e)
                        # if failed_to_verify_password == False:
                        #     self.__add_list_s(file_name=os.path.join(root_dir, file), sheetname=f'{f_e} 可能被重复记录')
                        print('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')



        print("---------------执行结束——————————————")
        # 执行的文件路径和名字
        print(self.__list_s)
        for l in self.__list_s:
            print(l)
        dflist = pd.DataFrame(self.__list_s)
        dflist.to_excel(f'{self.output_path}/执行文件列表.xlsx')
        self.__dfs.to_excel(f'{self.output_path}/合并数据.xlsx')


pd.set_option('display.max_columns', None)  # 设置列
pd.set_option('display.max_rows', None)  # 设置行


def main():
    # 实例化干活的类,参数file_str是执行的路径,把加密的 excel 密码传递进去,
    # 参数str_list = ['生产工艺', '引物探针', '序列'] 控制哪些 excel 进行处理

    # 在这里定义自己的路径地址,要处理的文件夹: file_str=r" 这里输入文件夹地址 "
    my_data = MYData(file_str=r"/Users/hh/Downloads/2023年日报",
                     # 锁定的工作表的密码
                     password='zchscdc',
                     # 我要的工作簿名称关键词,包含就可以
                     eligible_list=['马小云', ],
                     # 我要的 sheet 的名字关键词
                     eligible_sheet=['Sheet1', ],
                     # 排除文件夹的名字,不排除就保持空列表即可 ['文件夹名字 1', '文件夹名字 2', ]
                     exclude=[],
                     # 需要的文件类型,后缀
                     extensions=['.xlsx', '.xls'],
                     # 输出文件的文件夹,这里本程序输入 整合数据及加密表格的解密版本
                     output_path=r'/Users/hh/Downloads/电影天堂'
                     )

    # 执行goto_excel方法 ,参数output_path是指定的输出目录位置
    my_data.goto_excel
    """
        # 这种写法尅访问私有变量,私有方法(对封装的一种破解),也可以进行修改
        # my_data._MYData__list_s=[]
        # print(my_data._MYData__list_s)
    """
    [print(i, f'列标推导式循环{str(i).split("/")}') for i in my_data._MYData__list_s]


if __name__ == '__main__':
    print('main……………………………………………………………………………start………………………………………………………………………………………')
    main()
    print('over………………………………………………………………………………main……………………………………………………………………………………')
 

提交评论

您尚未登录,登录之后方可评论~ 登录 or 注册

评论列表

暂无评论