大文件上的结构化数据计算示例
在数据分析过程中,经常会处理文本文件中的结构化数据(txt,csv等),有时这些文件还会很大,计算机内存不足以一次性读入。这时,只能将数据分批读入内存,对每批数据计算出临时中间结果,分批处理完以后,再按照计算要求对分批处理结果进行恰当的汇总处理,与一次性装入内存的小文件数据计算有很大的不同。实现大文件计算需要了解一个重要的概念—游标。我们以前比较熟知的是数据库游标,在数据库中使用游标,可以每次返回部分数据,而不将所有数据同时读入内存。游标类似于一个指针,在读取时会通过移动指针的位置来从结果集中每次提取部分记录。与数据库游标类似,在读取大文件数据时,也需要实现文件游标,它具有以下特点:1、只用于获取数据,并不用来修改结果集。2、在读取数据时从前向后只遍历一次。有了游标对象,就可以把大文件计算步骤依次附加在游标对象上,在进行最后计算时,再逐条取出记录,按附加的步骤进行计算。本文将以结构化文本文件为例,给出大文件过滤、聚合计算、添加计算列、排序、分组聚合、topN 以及并行计算等目标任务的实现方法,并提供用 esProc SPL 编写的代码示例。esProc 是专业的数据计算引擎,其采用的 SPL 中有完善的游标对象及运算,处理这些运算非常方便。1. 过滤过滤就是设置一个条件表达式,然后用每条记录的数据来计算表达式的值,如果计算结果值为真则本条记录有效,需要添加到最后取数的结果集里,否则就丢弃这条记录不用取出。对大文件过滤是一种延迟计算,就是先把过滤表达式记在游标对象上,等到取某条记录时,再计算过滤表达式来决定是否将本记录加入结果集。示例:在大数据学生成绩文本文件students_scores.txt中,查找10班学生的成绩。列数据间用TAB分隔,部分数据如下图:
esProc SPL脚本如下:A注释1=file("E:/txt/students_scores.txt").cursor@t()@t选项,把第一行读作标题2=A1.select(CLASS==10)筛选出10班的学生成绩,延迟计算3=file("E:/txt/students_scores_10.txt").export@t(A2)将过滤后的数据存入新的文件2. 聚合计算聚合计算是对大文件中的所有记录,执行某种统计计算,比如统计总和、平均值、最大值、最小值、计数等。循环遍历游标中所有记录,用每条记录数据计算出当前的聚合统计值,只把统计值存在内存中,而不用保存数据记录,就不会占据太多内存。遍历完毕后就得到最终的统计值。示例:在大数据学生成绩文本文件students_scores.csv中,列数据间用逗号分隔,部分数据如下图:
计算语文成绩总分,esProc SPL脚本如下:A注释1=file("E:/txt/students_scores.csv").cursor@tc()@c指示分隔符是”,”2=A1.total(sum(Chinese))计算语文成绩总分计算10班语文成绩总分,esProc SPL脚本如下:A注释1=file("E:/txt/students_scores.csv").cursor@tc()@c指示分隔符是”,”2=A1.select(CLASS==10).total(sum(Chinese))先过滤出10班的成绩,再计算语文成绩总分3. 添加计算列添加计算列是指用文件中的一列或几列经过某种指定计算,将计算结果记为一个新列的列值。这也是一种延迟计算,当读取某条记录时,再计算表达式的值,将它赋给本记录的新列。示例:在大数据学生成绩文本文件students_scores_.txt中,列数据间用|分隔,部分数据如下图:
计算每位学生的总成绩,esProc SPL脚本如下:A注释1=file("E:/txt/students_scores_.txt").cursor@t(;,"|")指定分隔符”|”2=A1.derive(English+Chinese+Math:total_score)附加derive计算总分,新增列名为total_score3=file("E:/txt/students_scores_total.txt").export@t(A2;"|")将附加了总成绩的数据存到新文件里除了用derive增加新列,也可以用new函数创建新的数据结构,同时也可以增加新的列,例如:A注释1=file("E:/txt/students_scores_.txt").cursor@t(;,"|")指定分隔符”|”2=A1.new(CLASS,NAME,English+Chinese+Math:total_score) 从A1的列中选择需要的列或设置表达式运算产生新的列,三门成绩相加得到总成绩3=file("E:/txt/students_scores_total.txt").export@t(A2;"|")将新结构数据存到新文件里当然,也可以先对数据进行过滤,再对需要的记录产生新列或生成新的结构。例如只取出10班的学生成绩并新增总成绩列,esProc SPL脚本如下:A注释1=file("E:/txt/students_scores_.txt").cursor@t(;,"|")指定分隔符”|”2=A1.select(CLASS==10)筛选出10班的学生成绩,延迟计算3=A2.derive(English+Chinese+Math:total_score)附加derive计算总分,新增列名为total_score4=file("E:/txt/students_scores_total.txt").export@t(A3;"|")将附加了总成绩的数据存到新文件里在获取到最终计算结果之前,各种基本计算,如过滤、新增列、产生新结构、改变字段值、排序等,都可以按需求先后附加到游标上。后面小节的示例中就不再一一列举这些了,只列出小节所讲的主题计算。4. 排序大文件排序因内存不足,不能读入所有数据来排序,实现的原理是这样的:先读入一批数据记录,读多少行合适要根据内存而定,将这批数据排序后存到一个临时文件,再读入下一批数据排序后存到另一个临时文件……直到所有数据处理完,最后对这些临时文件进行有序归并——读出每个临时文件的第一行,通过对排序字段值的比较,找出应该排在最前面的那一行,写入到结果文件。然后从刚才排第一的那个临时文件中再读出下一行,继续比较找出最前面的一行写入结果文件。按此方法不断进行,直到所有数据行都写入结果文件。示例:在大数据学生成绩文本文件students_scores.txt中,按语文成绩升序排列。esProc SPL脚本如下:A注释1=file("E:/txt/students_scores.txt").cursor@t()创建游标2=A1.sortx(Chinese)按Chinese升序排序,返回游标3=file("E:/txt/students_scores_sort.txt").export@t(A2)将排序后的数据存入新文件也可以同时按多个字段排序或按表达式计算值排序,如将A2单元格改为:=A1.sortx(Chinese,Math) //按语文、数学成绩先后排序=A1.sortx(Math+English+Chinese) //按总成绩排序5. 分组聚合分组聚合是先对数据记录进行分组,对同一组的记录进行某种统计计算,最后得到每一组的统计值。大文件的分组聚合分两种情况:一是分组的结果不大(组数少),所有分组结果都能在内存中放下,称之为小分组聚合;二是分组的结果很大(组数非常多),内存中存不下所有的组,称之为大分组聚合。小分组聚合的实现原理是:把分组键值和组统计值保存在内存中,在读取每条记录时,按分组表达式计算出分组键值,在保存的组里查找此键值,找到了则将本记录的数据与组统计值汇总,没找到则新加入一个组。最后直到所有行都处理完,就得到了所有的分组和本组的统计值。示例:在大数据用户登录记录文件user_info_reg.csv中,统计各省用户的登录总次数及总时长。列数据间用逗号分隔,部分数据如下图:
esProc SPL脚本如下:A注释1=file("E:/txt/user_info_reg.csv").cursor@tc() 创建游标,@c指示分隔符是”,”2=A1.groups(id_province;count(~):cnt,sum(reg_time):total_reg) 分组后统计各省登录总次数及总时长大分组聚合因内存不足,不能把所有分组聚合的结果放在内存里,所以需要分批处理,并使用临时文件保存分批结果,最后再归并汇总,实现原理是这样的:逐行读入数据,按照小分组聚合的流程进行分组聚合,当保存的分组结果集大到一定程序时(结果集大小视内存决定),将此结果集按分组键值排序后存为临时文件,从内存中清除。继续读入数据作同样处理,当所有数据处理完以后,就得到了多个按分组键值排好序的临时文件。然后对这些临时文件的数据作有序归并(与大文件排序的归并流程相同),得到一个按分组键值排序、键值很可能有重复的大文件,最后把那些重复键值的组合并成一组,得到所有分组结果大文件,再返回用此文件创建的游标供调用者提取分组结果数据。示例:在大数据用户登录记录文件user_info_reg.csv中,统计每个用户的登录总次数及总时长。esProc SPL脚本如下:A注释1=file("E:/txt/user_info_reg.csv").cursor@tc() 创建游标,@c指示分隔符是”,”2=A1.groupx(user_id; count(~):cnt,sum(reg_time):total_reg) 分组后求各用户登录总次数及总时长,返回游标3=file("E:/txt/user_info_tj.csv").export@tc(A2)将各用户统计数据存入文件6. TopNTopN是对数据排序以后,查出前N条记录。有时需要对所有数据求前N条记录,有时还需要先对数据分组,再求每一组中的前N条记录。但计算TopN时其实不需要对所有数据排序,那样会在排序上花费很多时间,对大文件计算尤其如此。TopN的实现原理是这样的:先读出N条记录,形成一个N条记录的小数据集并排好序,再读新的记录时,与小数据集的最后一条比较,若排在它之后,则直接丢弃这条记录,若排在它之前,则将这条新记录插入到小数据集的合适位置,丢弃小数据集的最后一条记录。当所有数据都读出并处理完时,就得到了需要的前N条记录集。TopN的实现方式和聚合运算很像。示例:在大数据学生成绩文本文件students_scores.txt中,查找数学成绩排在前10的学生记录。esProc SPL脚本如下:A注释1=file("E:/txt/students_scores.txt").cursor@t()创建游标2=A1.groups(;top(10;-Math))因为要逆序排列,所以用-Math排序top函数除了返回前N条记录,也可以返回前N个值。例如查找排在前10的数学成绩,esProc SPL脚本如下:A注释1=file("E:/txt/students_scores.txt").cursor@t()创建游标2=A1.groups(;top(-10,Math))top函数中间的分隔符是逗号时,直接返回前10个MathTopN还可以使用到分组中,即每个组中取TopN,其计算原理也类似,只是需要为每个分组保持一个N条记录的小数据集。示例:在大数据学生成绩文本文件students_scores.txt中,查找各班数学成绩排在前10的学生记录。esProc SPL脚本如下:A注释1=file("E:/txt/students_scores.txt").cursor@t()创建游标2=A1.groups(CLASS;top(10;-Math))因为要逆序排列,所以用-Math排序7. 并行提速并行计算就是用多个线程同时分担一个计算任务,能充分利用多核CPU提高计算性能,这对于大文件特别有用。大文件计算常常需要将数据分批计算,最后再将分批计算结果进行合并汇总。并行计算也是如此,先将大文件分段,每个线程各自用大文件计算的方式处理一段数据,最后将各线程处理的结果进行汇总。示例:在大数据用户登录记录文件user_info_reg.csv中,统计各省用户的登录总次数,用4路并行计算提高速度。esProc SPL脚本如下:A注释1=file("E:/txt/user_info_reg.csv").cursor@tcm(;4) 创建游标,@m表示并行计算,参数4表示4路并行2=A1.groups(id_province;count(~):cnt)示例:在大数据学生成绩文本文件students_scores.csv中,查询各班语文成绩在90分以上且总成绩排在前5名的学生,并用8路并行提高速度,esProc SPL脚本如下:A注释1=file("E:/txt/students_scores.csv").cursor@tcm(;8)创建游标,@m表示并行计算,参数8表示8路并行2=A1.select(Chinese>=90)筛选出语文在90分以上的学生3=A2.derive(English+Chinese+Math:total_score)附加derive计算总分,新增列名为total_score4=A3.groups(CLASS;top(-5;total_score))按班分组,查询各班总成绩前5的学生5=file("E:/txt/students_scores_total.txt").export@tc(A4)将查询数据存到新文件里通过以上示例可以看出,在SPL中使用并行提速非常容易,与单线程代码相比,仅仅多一个游标选项与参数,让用户使用并行非常方便。《SPL CookBook》中有更多敏捷计算示例。