西安电商平台网站建设,wordpress剧情网,搜索引擎有哪些,电商网站 解决方案最近#xff0c;我不得不处理一组包含逐笔历史汇率市场数据的文件#xff0c;并很快意识到使用传统的InputStream都无法将它们读取到内存中#xff0c;因为每个文件的大小都超过4 GB。 Emacs甚至无法打开它们。 在这种特殊情况下#xff0c;我可以编写一个简单的bash脚本我不得不处理一组包含逐笔历史汇率市场数据的文件并很快意识到使用传统的InputStream都无法将它们读取到内存中因为每个文件的大小都超过4 GB。 Emacs甚至无法打开它们。 在这种特殊情况下我可以编写一个简单的bash脚本将文件分成小块然后像往常一样读取它们。 但是我不希望这样因为二进制格式会使这种方法无效。 因此正确处理此问题的方法是使用内存映射文件逐步处理数据区域。 内存映射文件的优点在于它们不消耗虚拟内存或分页空间因为它们由磁盘上的文件数据支持。 Okey让我们看一下这些文件并提取一些数据。 似乎它们包含带有逗号分隔字段的ASCII文本行。 格式 [currency-pair],[timestamp],[bid-price],[ask-price] 例如 EUR/USD,20120102 00:01:30.420,1.29451,1.2949 公平地说我可以为该格式编写程序。 但是读取和解析文件是正交的概念。 因此让我们退后一步考虑一下可以在将来遇到类似问题时可以重用的通用设计。 问题归结为对一组以无限长字节数组编码的条目进行增量解码而不会耗尽内存。 示例格式以逗号/行分隔的文本编码的事实与一般解决方案无关因此很明显需要解码器接口才能处理不同的格式。 同样在处理完整个文件之前无法解析每个条目并将其保留在内存中因此我们需要一种方法来逐步移交可以在其他位置磁盘或网络写入的条目块然后再进行垃圾回收。 迭代器是处理此要求的很好的抽象方法因为它们的行为就像游标一样这正是重点。 每次迭代都会转发文件指针然后让我们对数据进行处理。 所以首先是Decoder接口。 这个想法是从MappedByteBuffer增量解码对象或者如果缓冲区中没有对象则返回null。 public interface DecoderT {public T decode(ByteBuffer buffer);
} 然后是实现Iterable的FileReader 。 每次迭代将处理下一个4096字节的数据并使用Decoder将其Decoder为对象列表。 请注意 FileReader接受文件列表这很不错因为它允许遍历数据而无需担心跨文件聚合。 顺便说一下对于较大的文件4096个字节的块可能会有点小。 public class FileReader implements IterableListT {private static final long CHUNK_SIZE 4096;private final DecoderT decoder;private IteratorFile files;private FileReader(DecoderT decoder, File... files) {this(decoder, Arrays.asList(files));}private FileReader(DecoderT decoder, ListFile files) {this.files files.iterator();this.decoder decoder;}public static T FileReaderT create(DecoderT decoder, ListFile files) {return new FileReaderT(decoder, files);}public static T FileReaderT create(DecoderT decoder, File... files) {return new FileReaderT(decoder, files);}Overridepublic IteratorListT iterator() {return new IteratorListT() {private ListT entries;private long chunkPos 0;private MappedByteBuffer buffer;private FileChannel channel;Overridepublic boolean hasNext() {if (buffer null || !buffer.hasRemaining()) {buffer nextBuffer(chunkPos);if (buffer null) {return false;}}T result null;while ((result decoder.decode(buffer)) ! null) {if (entries null) {entries new ArrayListT();}entries.add(result);}// set next MappedByteBuffer chunkchunkPos buffer.position();buffer null;if (entries ! null) {return true;} else {Closeables.closeQuietly(channel);return false;}}private MappedByteBuffer nextBuffer(long position) {try {if (channel null || channel.size() position) {if (channel ! null) {Closeables.closeQuietly(channel);channel null;}if (files.hasNext()) {File file files.next();channel new RandomAccessFile(file, r).getChannel();chunkPos 0;position 0;} else {return null;}}long chunkSize CHUNK_SIZE;if (channel.size() - position chunkSize) {chunkSize channel.size() - position;}return channel.map(FileChannel.MapMode.READ_ONLY, chunkPos, chunkSize);} catch (IOException e) {Closeables.closeQuietly(channel);throw new RuntimeException(e);}}Overridepublic ListT next() {ListT res entries;entries null;return res;}Overridepublic void remove() {throw new UnsupportedOperationException();}};}
} 下一个任务是编写一个Decoder 我决定为任何逗号分隔的文本文件格式实现一个通用的TextRowDecoder 接受每行的字段数和一个字段定界符并返回一个字节数组数组。 然后 TextRowDecoder可以由可能处理不同字符集的格式特定的解码器重用。 public class TextRowDecoder implements Decoderbyte[][] {private static final byte LF 10;private final int numFields;private final byte delimiter;public TextRowDecoder(int numFields, byte delimiter) {this.numFields numFields;this.delimiter delimiter;}Overridepublic byte[][] decode(ByteBuffer buffer) {int lineStartPos buffer.position();int limit buffer.limit();while (buffer.hasRemaining()) {byte b buffer.get();if (b LF) { // reached line feed so parse lineint lineEndPos buffer.position();// set positions for one row duplicationif (buffer.limit() lineEndPos 1) {buffer.position(lineStartPos).limit(lineEndPos);} else {buffer.position(lineStartPos).limit(lineEndPos 1);}byte[][] entry parseRow(buffer.duplicate());if (entry ! null) {// reset main bufferbuffer.position(lineEndPos);buffer.limit(limit);// set start after LFlineStartPos lineEndPos;}return entry;}}buffer.position(lineStartPos);return null;}public byte[][] parseRow(ByteBuffer buffer) {int fieldStartPos buffer.position();int fieldEndPos 0;int fieldNumber 0;byte[][] fields new byte[numFields][];while (buffer.hasRemaining()) {byte b buffer.get();if (b delimiter || b LF) {fieldEndPos buffer.position();// save limitint limit buffer.limit();// set positions for one row duplicationbuffer.position(fieldStartPos).limit(fieldEndPos);fields[fieldNumber] parseField(buffer.duplicate(), fieldNumber, fieldEndPos - fieldStartPos - 1);fieldNumber;// reset main bufferbuffer.position(fieldEndPos);buffer.limit(limit);// set start after LFfieldStartPos fieldEndPos;}if (fieldNumber numFields) {return fields;}}return null;}private byte[] parseField(ByteBuffer buffer, int pos, int length) {byte[] field new byte[length];for (int i 0; i field.length; i) {field[i] buffer.get();}return field;}
} 这就是文件的处理方式。 每个列表包含从单个缓冲区解码的元素每个元素都是由TextRowDecoder指定的字节数组的数组。 TextRowDecoder decoder new TextRowDecoder(4, comma);
FileReaderbyte[][] reader FileReader.create(decoder, file.listFiles());
for (Listbyte[][] chunk : reader) {// do something with each chunk
} 我们可以在这里停下来但还有其他要求。 每行都包含一个时间戳记并且必须按时间段而不是按天或按小时对缓冲区进行分组。 我仍然想遍历每个批次因此立即的反应是为FileReader创建一个Iterable包装器以实现此行为。 另外一个细节是每个元素必须通过实现PeriodEntries Timestamped接口此处未显示为PeriodEntries提供其时间戳。 public class PeriodEntriesT extends Timestamped implements IterableListT {private final IteratorListT extends Timestamped entriesIt;private final long interval;private PeriodEntries(IterableListT entriesIt, long interval) {this.entriesIt entriesIt.iterator();this.interval interval;}public static T extends Timestamped PeriodEntriesT create(IterableListT entriesIt, long interval) {return new PeriodEntriesT(entriesIt, interval);}Overridepublic IteratorListT extends Timestamped iterator() {return new IteratorListT() {private QueueListT queue new LinkedListListT();private long previous;private IteratorT entryIt;Overridepublic boolean hasNext() {if (!advanceEntries()) {return false;}T entry entryIt.next();long time normalizeInterval(entry);if (previous 0) {previous time;}if (queue.peek() null) {ListT group new ArrayListT();queue.add(group);}while (previous time) {queue.peek().add(entry);if (!advanceEntries()) {break;}entry entryIt.next();time normalizeInterval(entry);}previous time;ListT result queue.peek();if (result null || result.isEmpty()) {return false;}return true;}private boolean advanceEntries() {// if there are no rows leftif (entryIt null || !entryIt.hasNext()) {// try get more rows if possibleif (entriesIt.hasNext()) {entryIt entriesIt.next().iterator();return true;} else {// no more rowsreturn false;}}return true;}private long normalizeInterval(Timestamped entry) {long time entry.getTime();int utcOffset TimeZone.getDefault().getOffset(time);long utcTime time utcOffset;long elapsed utcTime % interval;return time - elapsed;}Overridepublic ListT next() {return queue.poll();}Overridepublic void remove() {throw new UnsupportedOperationException();}};}
} 引入此功能后最终处理代码并没有太大变化只有一个干净紧凑的for循环不必关心跨文件缓冲区和句点对元素进行分组。 PeriodEntries也足够灵活可以管理间隔上的任何长度。 TrueFxDecoder decoder new TrueFxDecoder();
FileReaderTrueFxData reader FileReader.create(decoder, file.listFiles());
long periodLength TimeUnit.DAYS.toMillis(1);
PeriodEntriesTrueFxData periods PeriodEntries.create(reader, periodLength);for (ListTrueFxData entries : periods) {// data for each dayfor (TrueFxData entry : entries) {// process each entry}
} 正如您可能意识到的那样不可能用集合来解决这个问题。 选择迭代器是一项关键的设计决策它能够解析TB级的数据而不会占用太多的堆空间。 参考 Deephacks博客上的JCG合作伙伴 Kristoffer Sjogren 使用Java处理大型文件 。 翻译自: https://www.javacodegeeks.com/2013/01/processing-huge-files-with-java.html