首页 > 其他 > 详细

spark读取压缩文件

时间:2021-05-21 12:27:44      阅读:30      评论:0      收藏:0      [点我收藏+]

spark读取压缩文件,对同一个压缩文件内文件进行分布式处理,粒度:文件级

-| .rar.gz

   -| .gz

   -| .zip

-| .zip  

   -| .gz

   -| .zip

使用 sc.binaryFile()得到-> JavaPairRDD<String,PortableDataStream>

key是压缩文件根目录,PortableDataStream是根目录的二进制流。

并行化处理:将每个压缩文件根据内部文件拆分成文件流,实现1:n的并行度

技术分享图片
 1 // 一个压缩包流,对应多个流,每个流对应一个文件名称
 2     public static JavaPairRDD<PortableDataStream, FilePropertyPojo> getFileListRdd(
 3             JavaPairRDD<String, PortableDataStream> zipRdd) {
 4         return zipRdd.flatMapToPair(tuple2 -> {
 5             List<Tuple2<PortableDataStream, FilePropertyPojo>> targetList = new ArrayList<>();
 6             List<FilePropertyPojo> fileNameList = getFileNameList(tuple2._2);
 7 
 8             for (FilePropertyPojo filePropertyPojo : fileNameList) {
 9 
10                 targetList.add(new Tuple2<>(tuple2._2, filePropertyPojo));
11             }
12             return targetList.iterator();
13 
14         });
15 
16     }
17 
18     private static List<FilePropertyPojo> getFileNameList(PortableDataStream portableDataStream) {
19         List<FilePropertyPojo> fileNameList = new ArrayList<>();
20         try {
21             List<FilePropertyPojo> mrPropertyPojoList = new ArrayList<>();
22             String path = portableDataStream.getPath();
23 
24             String fileCompressMode = path.substring(path.lastIndexOf(‘.‘)).toLowerCase();
25             switch (fileCompressMode) {
26             case ".gz":
27                 getFileNameFromGz(portableDataStream, mrPropertyPojoList);
28                 break;
29             case ".zip":
30                 getFileNameFromZip(portableDataStream, mrPropertyPojoList);
31                 break;
32 
33             default:
34             }
35             return mrPropertyPojoList;
36 
37         } catch (Exception e) {
38             //
39         }
40         return fileNameList;
41     }
42 
43     private static void getFileNameFromGz(PortableDataStream portableDataStream,
44             List<FilePropertyPojo> mrPropertyPojoList) {
45         try (TarArchiveInputStream inputStream = new TarArchiveInputStream(
46                 new GZIPInputStream(portableDataStream.open()))) {
47             TarArchiveEntry tarArchiveEntry;
48             while ((tarArchiveEntry = inputStream.getNextTarEntry()) != null) {
49                 try {
50                     getEachFileName(mrPropertyPojoList, tarArchiveEntry.getName(), tarArchiveEntry.getSize());
51 
52                 } catch (Exception e) {
53                     //
54                 }
55             }
56         } catch (Exception e) {
57             //
58         }
59     }
60 
61     private static void getFileNameFromZip(PortableDataStream portableDataStream,
62             List<FilePropertyPojo> mrPropertyPojoList) throws IOException {
63 
64         try (ZipArchiveInputStream zipArchiveInputStream = new ZipArchiveInputStream(portableDataStream.open())) {
65             ZipArchiveEntry nextZipEntry;
66             while ((nextZipEntry = zipArchiveInputStream.getNextZipEntry()) != null) {
67                 try {
68                     getEachFileName(mrPropertyPojoList, nextZipEntry.getName(), nextZipEntry.getSize());
69 
70                 } catch (Exception e) {
71                     //
72                 }
73             }
74         }
75     }
View Code

 

spark读取压缩文件

原文:https://www.cnblogs.com/carsonwuu/p/14792548.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!