前言
大家好,这里是浩道linux,主要给大家分享linux、python、网络通信相关的it知识平台。
众所周知,python除了以简洁著称,其成熟的第三方库功能也是很强大的,今天浩道带大家看看如何通过python轻松处理大文件,真让人直呼yyds 。
为了进行并行处理,我们将任务划分为子单元。它增加了程序处理的作业数量,减少了整体处理时间。
例如,如果你正在处理一个大的csv文件,你想修改一个单列。我们将把数据以数组的形式输入函数,它将根据可用的进程数量,一次并行处理多个值。这些进程是基于你的处理器内核的数量。
在这篇文章中,我们将学习如何使用multiprocessing、joblib和tqdm python包减少大文件的处理时间。这是一个简单的教程,可以适用于任何文件、数据库、图像、视频和音频。
开始
我们将使用来自 kaggle 的 us accidents (2016 - 2021) 数据集,它包括280万条记录和47个列。
https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents
我们将导入multiprocessing、joblib和tqdm用于并行处理,pandas用于数据导入,re、nltk和string用于文本处理。
# parallel computingimport multiprocessing as mpfrom joblib import parallel, delayedfrom tqdm.notebook import tqdm# data ingestion import pandas as pd# text processing import re from nltk.corpus import stopwordsimport string
在我们开始之前,让我们通过加倍cpu_count()来设置n_workers。正如你所看到的,我们有8个workers。
n_workers = 2 * mp.cpu_count()print(f{n_workers} workers are available)>>> 8 workers are available
下一步,我们将使用pandas read_csv函数读取大型csv文件。然后打印出dataframe的形状、列的名称和处理时间。
%%timefile_name=../input/us-accidents/us_accidents_dec21_updated.csvdf = pd.read_csv(file_name)print(fshape:{df.shape}column names:{df.columns})
输出:
shape:(2845342, 47)column names:index(['id', 'severity', 'start_time', 'end_time', 'start_lat', 'start_lng','end_lat', 'end_lng', 'distance(mi)', 'description', 'number', 'street','side', 'city', 'county', 'state', 'zipcode', 'country', 'timezone','airport_code', 'weather_timestamp', 'temperature(f)', 'wind_chill(f)','humidity(%)', 'pressure(in)', 'visibility(mi)', 'wind_direction','wind_speed(mph)', 'precipitation(in)', 'weather_condition', 'amenity','bump', 'crossing', 'give_way', 'junction', 'no_exit', 'railway','roundabout', 'station', 'stop', 'traffic_calming', 'traffic_signal','turning_loop', 'sunrise_sunset', 'civil_twilight', 'nautical_twilight','astronomical_twilight'],dtype='object')cpu times: user 33.9 s, sys: 3.93 s, total: 37.9 swall time: 46.9 s
处理文本
clean_text是一个用于处理文本的简单函数。我们将使用nltk.copus获得英语停止词,并使用它来过滤掉文本行中的停止词。之后,我们将删除句子中的特殊字符和多余的空格。它将成为确定串行、并行和批处理的处理时间的基准函数。
def clean_text(text): # remove stop words stops = stopwords.words(english) text = .join([word for word in text.split() if word not in stops]) # remove special characters text = text.translate(str.maketrans('', '', string.punctuation)) # removing the extra spaces text = re.sub(' +',' ', text) return text
串行处理
对于串行处理,我们可以使用pandas的.apply()函数,但是如果你想看到进度条,你需要为pandas激活tqdm,然后使用.progress_apply()函数。
我们将处理280万条记录,并将结果保存回 “description” 列中。
%%timetqdm.pandas()df['description'] = df['description'].progress_apply(clean_text)
输出
高端处理器串行处理280万行花了9分5秒。
100% 2845342/2845342 [09:05<00:00, 5724.25it/s]cpu times: user 8min 14s, sys: 53.6 s, total: 9min 7swall time: 9min 5s
多进程处理
有多种方法可以对文件进行并行处理,我们将了解所有这些方法。multiprocessing是一个内置的python包,通常用于并行处理大型文件。
我们将创建一个有8个workers的多处理池,并使用map函数来启动进程。为了显示进度条,我们将使用tqdm。
map函数由两部分组成。第一个部分需要函数,第二个部分需要一个参数或参数列表。
%%timep = mp.pool(n_workers) df['description'] = p.map(clean_text,tqdm(df['description']))
输出
我们的处理时间几乎提高了3倍。处理时间从9分5秒下降到3分51秒。
100% 2845342/2845342 [02:58<00:00, 135646.12it/s]cpu times: user 5.68 s, sys: 1.56 s, total: 7.23 swall time: 3min 51s
并行处理
我们现在将学习另一个python包来执行并行处理。在本节中,我们将使用joblib的parallel和delayed来复制map函数。
parallel需要两个参数:n_job = 8和backend = multiprocessing。
然后,我们将在delayed函数中加入clean_text。
创建一个循环,每次输入一个值。
下面的过程是相当通用的,你可以根据你的需要修改你的函数和数组。我曾用它来处理成千上万的音频和视频文件,没有任何问题。
建议:使用 try: 和 except: 添加异常处理。
def text_parallel_clean(array): result = parallel(n_jobs=n_workers,backend=multiprocessing)( delayed(clean_text) (text) for text in tqdm(array) ) return result
在text_parallel_clean()中添加“description”列。
%%timedf['description'] = text_parallel_clean(df['description'])
输出
我们的函数比多进程处理pool多花了13秒。即使如此,并行处理也比串行处理快4分59秒。
100% 2845342/2845342 [04:03>> 100% 8/8 [00:00<00:00, 280.01it/s]
运行并行批处理
最后,我们将使用parallel和delayed来处理批次。
%%timebatch_output = parallel(n_jobs=n_workers,backend=multiprocessing)( delayed(proc_batch) (batch) for batch in tqdm(batches) )df['description'] = [j for i in batch_output for j in i]
输出
我们已经改善了处理时间。这种技术在处理复杂数据和训练深度学习模型方面非常有名。
100% 8/8 [00:00<00:00, 2.19it/s]cpu times: user 3.39 s, sys: 1.42 s, total: 4.81 swall time: 3min 56s
tqdm 并发
tqdm将多处理带到了一个新的水平。它简单而强大。
process_map需要:
函数名称
dataframe 列名
max_workers
chucksize与批次大小类似。我们将用workers的数量来计算批处理的大小,或者你可以根据你的喜好来添加这个数字。
%%timefrom tqdm.contrib.concurrent import process_mapbatch = round(len(df)/n_workers)df['description'] = process_map(clean_text,df['description'], max_workers=n_workers, chunksize=batch)
输出
通过一行代码,我们得到了最好的结果:
100% 2845342/2845342 [03:48<00:00, 1426320.93it/s]cpu times: user 7.32 s, sys: 1.97 s, total: 9.29 swall time: 3min 51s
结论
我们需要找到一个平衡点,它可以是串行处理,并行处理,或批处理。如果你正在处理一个较小的、不太复杂的数据集,并行处理可能会适得其反。
在这个教程中,我们已经了解了各种处理大文件的python包,它们允许我们对数据函数进行并行处理。
如果你只处理一个表格数据集,并且想提高处理性能,那么建议你尝试dask、datatable和rapids。
如何制作一个简易的微型风力发电机模型?
4G移动通信系统的主要特点和关键技术
MP3选购必看
大功率超声波焊接发生器电源设计
三星Galaxy Buds +配置参数泄露,电池寿命将大大延长
如何通过python轻松处理大文件
新变压器投运前要做冲击试验的原因 变压器不允许反送电?
端子线的正确检测工序是怎样的
意大利开发出由大脑控制的有触感的机械手
拜占庭将军问题和比特币之间有关系吗
物联网网络层的关键技术是什么
焊板子继电器的关键商品性能的参数是怎样的
中国MEMS传感器市场分析
基于CANScope强大的CAN总线底层测试分析
基于NXP的KL26芯片PDF温度记录仪低功耗解决方案
仪表放大器的特点是什么?
如何在装修时打造全屋WiFi
OnePlus Z将运行Snapdragon 765,因此它应支持下一代连接
英特尔或将投资瑞芯微,狙击ARM阵营
3D打印产业规模第二大的华东地区中,谁是领跑者?