文件队列 QueueFile

  1. /**
  2.  * Copyright (C) 2010 Square, Inc.
  3.  *
  4.  * Licensed under the Apache License, Version 2.0 (the “License”);
  5.  * you may not use this file except in compliance with the License.
  6.  * You may obtain a copy of the License at
  7.  *
  8.  * http://www.apache.org/licenses/LICENSE-2.0
  9.  *
  10.  * Unless required by applicable law or agreed to in writing, software
  11.  * distributed under the License is distributed on an “AS IS” BASIS,
  12.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13.  * See the License for the specific language governing permissions and
  14.  * limitations under the License.
  15.  */
  16. package com.squareup.util;
  17. import com.squareup.Square;
  18. import java.io.File;
  19. import java.io.FileNotFoundException;
  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.io.RandomAccessFile;
  23. import java.nio.channels.FileChannel;
  24. import java.util.NoSuchElementException;
  25. /**
  26.  * A reliable, efficient, file-based, FIFO queue. Additions and removals are
  27.  * O(1). All operations are atomic. Writes are synchronous; data will be
  28.  * written to disk before an operation returns. The underlying file is
  29.  * structured to survive process and even system crashes. If an I/O exception
  30.  * is thrown during a mutating change, the change is aborted. It is safe to
  31.  * continue to use a {@code QueueFile} instance after an exception.
  32.  *
  33.  * <p>All operations are synchronized. In a traditional queue, the remove
  34.  * operation returns an element. In this queue, {@link #peek} and {@link
  35.  * #remove} are used in conjunction. Use {@code peek} to retrieve the first
  36.  * element, and then {@code remove} to remove it after successful processing.
  37.  * If the system crashes after {@code peek} and during processing, the element
  38.  * will remain in the queue, to be processed when the system restarts.
  39.  *
  40.  * <p><b><font color=”red”>NOTE:</font></b> The current implementation is
  41.  * built for file systems that support atomic segment writes (like YAFFS).
  42.  * Most conventional file systems don’t support this; if the power goes out
  43.  * while writing a segment, the segment will contain garbage and the file will
  44.  * be corrupt. We’ll add journaling support so this class can be used with
  45.  * more file systems later.
  46.  *
  47.  * @author Bob Lee (bob@squareup.com)
  48.  */
  49. public class QueueFile {
  50. /** Initial file size in bytes. */
  51. private static final int INITIAL_LENGTH = 4096; // one file system block
  52. /** Length of header in bytes. */
  53. static final int HEADER_LENGTH = 16;
  54. /**
  55.   * The underlying file. Uses a ring buffer to store entries. Designed so
  56.   * that a modification isn’t committed or visible until we write the header.
  57.   * The header is much smaller than a segment. So long as the underlying file
  58.   * system supports atomic segment writes, changes to the queue are atomic.
  59.   * Storing the file length ensures we can recover from a failed expansion
  60.   * (i.e. if setting the file length succeeds but the process dies before the
  61.   * data can be copied).
  62.   *
  63.   * <pre>
  64.   * Format:
  65.   * Header (16 bytes)
  66.   * Element Ring Buffer (File Length – 16 bytes)
  67.   *
  68.   * Header:
  69.   * File Length (4 bytes)
  70.   * Element Count (4 bytes)
  71.   * First Element Position (4 bytes, =0 if null)
  72.   * Last Element Position (4 bytes, =0 if null)
  73.   *
  74.   * Element:
  75.   * Length (4 bytes)
  76.   * Data (Length bytes)
  77.   * </pre>
  78.   */
  79. private final RandomAccessFile raf;
  80. /** Cached file length. Always a power of 2. */
  81. int fileLength;
  82. /** Number of elements. */
  83. private int elementCount;
  84. /** Pointer to first (or eldest) element. */
  85. private Element first;
  86. /** Pointer to last (or newest) element. */
  87. private Element last;
  88. /** In-memory buffer. Big enough to hold the header. */
  89. private final byte[] buffer = new byte[16];
  90. /**
  91.   * Constructs a new queue backed by the given file. Only one {@code QueueFile}
  92.   * instance should access a given file at a time.
  93.   */
  94. public QueueFile(File file) throws IOException {
  95. if (!file.exists()) initialize(file);
  96. raf = open(file);
  97. readHeader();
  98. }
  99. /** For testing. */
  100. QueueFile(RandomAccessFile raf) throws IOException {
  101. this.raf = raf;
  102. readHeader();
  103. }
  104. /**
  105.   * Stores int in buffer. The behavior is equivalent to calling
  106.   * {@link RandomAccessFile#writeInt}.
  107.   */
  108. private static void writeInt(byte[] buffer, int offset, int value) {
  109. buffer[offset] = (byte) (value >> 24);
  110. buffer[offset + 1] = (byte) (value >> 16);
  111. buffer[offset + 2] = (byte) (value >> 8);
  112. buffer[offset + 3] = (byte) value;
  113. }
  114. /**
  115.   * Stores int values in buffer. The behavior is equivalent to calling
  116.   * {@link RandomAccessFile#writeInt} for each value.
  117.   */
  118. private static void writeInts(byte[] buffer, int… values) {
  119. int offset = 0;
  120. for (int value : values) {
  121. writeInt(buffer, offset, value);
  122. offset += 4;
  123. }
  124. }
  125. /**
  126.   * Reads an int from a byte[].
  127.   */
  128. private static int readInt(byte[] buffer, int offset) {
  129. return ((buffer[offset] & 0xff) << 24)
  130. + ((buffer[offset + 1] & 0xff) << 16)
  131. + ((buffer[offset + 2] & 0xff) << 8)
  132. + (buffer[offset + 3] & 0xff);
  133. }
  134. /**
  135.   * Reads the header.
  136.   */
  137. private void readHeader() throws IOException {
  138. raf.seek(0);
  139. raf.readFully(buffer);
  140. fileLength = readInt(buffer, 0);
  141. elementCount = readInt(buffer, 4);
  142. int firstOffset = readInt(buffer, 8);
  143. int lastOffset = readInt(buffer, 12);
  144. first = readElement(firstOffset);
  145. last = readElement(lastOffset);
  146. }
  147. /**
  148.   * Writes header atomically. The arguments contain the updated values. The
  149.   * class member fields should not have changed yet. This only updates the
  150.   * state in the file. It’s up to the caller to update the class member
  151.   * variables *after* this call succeeds. Assumes segment writes are atomic
  152.   * in the underlying file system.
  153.   */
  154. private void writeHeader(int fileLength, int elementCount, int firstPosition,
  155. int lastPosition) throws IOException {
  156. writeInts(buffer, fileLength, elementCount, firstPosition, lastPosition);
  157. raf.seek(0);
  158. raf.write(buffer);
  159. }
  160. /**
  161.   * Returns the Element for the given offset.
  162.   */
  163. private Element readElement(int position) throws IOException {
  164. if (position == 0) return Element.NULL;
  165. raf.seek(position);
  166. return new Element(position, raf.readInt());
  167. }
  168. /** Atomically initializes a new file. */
  169. private static void initialize(File file) throws IOException {
  170. // Use a temp file so we don’t leave a partially-initialized file.
  171. File tempFile = new File(file.getPath() + “.tmp”);
  172. RandomAccessFile raf = open(tempFile);
  173. try {
  174. raf.setLength(INITIAL_LENGTH);
  175. raf.seek(0);
  176. byte[] headerBuffer = new byte[16];
  177. writeInts(headerBuffer, INITIAL_LENGTH, 0, 0, 0);
  178. raf.write(headerBuffer);
  179. } finally {
  180. raf.close();
  181. }
  182. // A rename is atomic.
  183. if (!tempFile.renameTo(file)) throw new IOException(“Rename failed!”);
  184. }
  185. /**
  186.   * Opens a random access file that writes synchronously.
  187.   */
  188. private static RandomAccessFile open(File file) throws FileNotFoundException {
  189. return new RandomAccessFile(file, “rwd”);
  190. }
  191. /**
  192.   * Wraps the position if it exceeds the end of the file.
  193.   */
  194. private int wrapPosition(int position) {
  195. return position < fileLength ? position
  196. : HEADER_LENGTH + position – fileLength;
  197. }
  198. /**
  199.   * Writes count bytes from buffer to position in file. Automatically wraps
  200.   * write if position is past the end of the file or if buffer overlaps it.
  201.   *
  202.   * @param position in file to write to
  203.   * @param buffer to write from
  204.   * @param count # of bytes to write
  205.   */
  206. private void ringWrite(int position, byte[] buffer, int offset, int count)
  207. throws IOException {
  208. position = wrapPosition(position);
  209. if (position + count <= fileLength) {
  210. raf.seek(position);
  211. raf.write(buffer, offset, count);
  212. } else {
  213. // The write overlaps the EOF.
  214. // # of bytes to write before the EOF.
  215. int beforeEof = fileLength – position;
  216. raf.seek(position);
  217. raf.write(buffer, offset, beforeEof);
  218. raf.seek(HEADER_LENGTH);
  219. raf.write(buffer, offset + beforeEof, count – beforeEof);
  220. }
  221. }
  222. /**
  223.   * Reads count bytes into buffer from file. Wraps if necessary.
  224.   *
  225.   * @param position in file to read from
  226.   * @param buffer to read into
  227.   * @param count # of bytes to read
  228.   */
  229. private void ringRead(int position, byte[] buffer, int offset, int count)
  230. throws IOException {
  231. position = wrapPosition(position);
  232. if (position + count <= fileLength) {
  233. raf.seek(position);
  234. raf.readFully(buffer, 0, count);
  235. } else {
  236. // The read overlaps the EOF.
  237. // # of bytes to read before the EOF.
  238. int beforeEof = fileLength – position;
  239. raf.seek(position);
  240. raf.readFully(buffer, offset, beforeEof);
  241. raf.seek(HEADER_LENGTH);
  242. raf.readFully(buffer, offset + beforeEof, count – beforeEof);
  243. }
  244. }
  245. /**
  246.   * Adds an element to the end of the queue.
  247.   *
  248.   * @param data to copy bytes from
  249.   */
  250. public void add(byte[] data) throws IOException {
  251. add(data, 0, data.length);
  252. }
  253. /**
  254.   * Adds an element to the end of the queue.
  255.   *
  256.   * @param data to copy bytes from
  257.   * @param offset to start from in buffer
  258.   * @param count number of bytes to copy
  259.   *
  260.   * @throws IndexOutOfBoundsException if {@code offset < 0} or
  261.   * {@code count < 0}, or if {@code offset + count} is bigger than the length
  262.   * of {@code buffer}.
  263.   */
  264. public synchronized void add(byte[] data, int offset, int count)
  265. throws IOException {
  266. Objects.nonNull(data, “buffer”);
  267. if ((offset | count) < 0 || count > data.length – offset) {
  268. throw new IndexOutOfBoundsException();
  269. }
  270. expandIfNecessary(count);
  271. // Insert a new element after the current last element.
  272. boolean wasEmpty = isEmpty();
  273. int position = wasEmpty ? HEADER_LENGTH : wrapPosition(
  274. last.position + Element.HEADER_LENGTH + last.length);
  275. Element newLast = new Element(position, count);
  276. // Write length.
  277. writeInt(buffer, 0, count);
  278. ringWrite(newLast.position, buffer, 0, Element.HEADER_LENGTH);
  279. // Write data.
  280. ringWrite(newLast.position + Element.HEADER_LENGTH, data, offset, count);
  281. // Commit the addition. If wasEmpty, first == last.
  282. int firstPosition = wasEmpty ? newLast.position : first.position;
  283. writeHeader(fileLength, elementCount + 1, firstPosition, newLast.position);
  284. last = newLast;
  285. elementCount++;
  286. if (wasEmpty) first = last; // first element
  287. }
  288. /**
  289.   * Returns the number of used bytes.
  290.   */
  291. private int usedBytes() {
  292. if (elementCount == 0) return HEADER_LENGTH;
  293. if (last.position >= first.position) {
  294. // Contiguous queue.
  295. return (last.position – first.position) // all but last entry
  296. + Element.HEADER_LENGTH + last.length // last entry
  297. + HEADER_LENGTH;
  298. } else {
  299. // tail < head. The queue wraps.
  300. return last.position // buffer front + header
  301. + Element.HEADER_LENGTH + last.length // last entry
  302. + fileLength – first.position; // buffer end
  303. }
  304. }
  305. /**
  306.   * Returns number of unused bytes.
  307.   */
  308. private int remainingBytes() {
  309. return fileLength – usedBytes();
  310. }
  311. /**
  312.   * Returns true if this queue contains no entries.
  313.   */
  314. public synchronized boolean isEmpty() {
  315. return elementCount == 0;
  316. }
  317. /**
  318.   * If necessary, expands the file to accommodate an additional element of the
  319.   * given length.
  320.   *
  321.   * @param dataLength length of data being added
  322.   */
  323. private void expandIfNecessary(int dataLength) throws IOException {
  324. int elementLength = Element.HEADER_LENGTH + dataLength;
  325. int remainingBytes = remainingBytes();
  326. if (remainingBytes >= elementLength) return;
  327. // Expand.
  328. int previousLength = fileLength;
  329. int newLength;
  330. // Double the length until we can fit the new data.
  331. do {
  332. remainingBytes += previousLength;
  333. newLength = previousLength << 1;
  334. previousLength = newLength;
  335. } while (remainingBytes < elementLength);
  336. raf.setLength(newLength);
  337. // If the buffer is split, we need to make it contiguous.
  338. if (last.position < first.position) {
  339. FileChannel channel = raf.getChannel();
  340. channel.position(fileLength); // destination position
  341. int count = last.position + Element.HEADER_LENGTH + last.length
  342. – HEADER_LENGTH;
  343. if (channel.transferTo(HEADER_LENGTH, count, channel) != count) {
  344. throw new AssertionError(“Copied insufficient number of bytes!”);
  345. }
  346. // Commit the expansion.
  347. int newLastPosition = fileLength + last.position – HEADER_LENGTH;
  348. writeHeader(newLength, elementCount, first.position, newLastPosition);
  349. last = new Element(newLastPosition, last.length);
  350. } else {
  351. writeHeader(newLength, elementCount, first.position, last.position);
  352. }
  353. fileLength = newLength;
  354. }
  355. /**
  356.   * Reads the eldest element. Returns null if the queue is empty.
  357.   */
  358. public synchronized byte[] peek() throws IOException {
  359. if (isEmpty()) return null;
  360. int length = first.length;
  361. byte[] data = new byte[length];
  362. ringRead(first.position + Element.HEADER_LENGTH, data, 0, length);
  363. return data;
  364. }
  365. /**
  366.   * Invokes reader with the eldest element, if an element is available.
  367.   */
  368. public synchronized void peek(ElementReader reader) throws IOException {
  369. if (elementCount > 0) {
  370. reader.read(new ElementInputStream(first), first.length);
  371. }
  372. }
  373. /**
  374.   * Invokes the given reader once for each element in the queue, from
  375.   * eldest to most recently added.
  376.   */
  377. public synchronized void forEach(ElementReader reader) throws IOException {
  378. int position = first.position;
  379. for (int i = 0; i < elementCount; i++) {
  380. Element current = readElement(position);
  381. reader.read(new ElementInputStream(current), current.length);
  382. position = wrapPosition(current.position + Element.HEADER_LENGTH
  383. + current.length);
  384. }
  385. }
  386. /**
  387.   * Reads a single element.
  388.   */
  389. private class ElementInputStream extends InputStream {
  390. private int position;
  391. private int remaining;
  392. private ElementInputStream(Element element) {
  393. position = wrapPosition(element.position + Element.HEADER_LENGTH);
  394. remaining = element.length;
  395. }
  396. @Override public int read(byte[] buffer, int offset, int length)
  397. throws IOException {
  398. Objects.nonNull(buffer, “buffer”);
  399. if ((offset | length) < 0 || length > buffer.length – offset) {
  400. throw new ArrayIndexOutOfBoundsException();
  401. }
  402. if (length > remaining) length = remaining;
  403. ringRead(position, buffer, offset, length);
  404. position = wrapPosition(position + length);
  405. remaining -= length;
  406. return length;
  407. }
  408. @Override public int read() throws IOException {
  409. if (remaining == 0) return -1;
  410. raf.seek(position);
  411. int b = raf.read();
  412. position = wrapPosition(position + 1);
  413. remaining–;
  414. return b;
  415. }
  416. }
  417. /**
  418.   * Returns the number of elements in this queue.
  419.   */
  420. public synchronized int size() {
  421. return elementCount;
  422. }
  423. /**
  424.   * Removes the eldest element.
  425.   *
  426.   * @throw NoSuchElementException if the queue is empty
  427.   */
  428. public synchronized void remove() throws IOException {
  429. if (isEmpty()) throw new NoSuchElementException();
  430. if (elementCount == 1) {
  431. clear();
  432. } else {
  433. // assert elementCount > 1
  434. int newFirstPosition = wrapPosition(first.position
  435. + Element.HEADER_LENGTH + first.length);
  436. ringRead(newFirstPosition, buffer, 0, Element.HEADER_LENGTH);
  437. int length = readInt(buffer, 0);
  438. writeHeader(fileLength, elementCount – 1, newFirstPosition, last.position);
  439. elementCount–;
  440. first = new Element(newFirstPosition, length);
  441. }
  442. }
  443. /**
  444.   * Clears this queue. Truncates the file to the initial size.
  445.   */
  446. public synchronized void clear() throws IOException {
  447. if (fileLength > INITIAL_LENGTH) raf.setLength(INITIAL_LENGTH);
  448. writeHeader(INITIAL_LENGTH, 0, 0, 0);
  449. elementCount = 0;
  450. first = last = Element.NULL;
  451. fileLength = INITIAL_LENGTH;
  452. }
  453. /**
  454.   * Closes the underlying file.
  455.   */
  456. public synchronized void close() throws IOException {
  457. raf.close();
  458. }
  459. @Override public String toString() {
  460. final StringBuilder builder = new StringBuilder();
  461. builder.append(getClass().getSimpleName()).append(‘[‘);
  462. builder.append(“fileLength=”).append(fileLength);
  463. builder.append(“, size=”).append(elementCount);
  464. builder.append(“, first=”).append(first);
  465. builder.append(“, last=”).append(last);
  466. builder.append(“, element lengths=[“);
  467. try {
  468. forEach(new ElementReader() {
  469. boolean first = true;
  470. public void read(InputStream in, int length) throws IOException {
  471. if (first) {
  472. first = false;
  473. } else {
  474. builder.append(“, “);
  475. }
  476. builder.append(length);
  477. }
  478. });
  479. } catch (IOException e) {
  480. Square.warning(e);
  481. }
  482. builder.append(“]]”);
  483. return builder.toString();
  484. }
  485. /** A pointer to an element. */
  486. static class Element {
  487. /** Length of element header in bytes. */
  488. static final int HEADER_LENGTH = 4;
  489. /** Null element. */
  490. static final Element NULL = new Element(0, 0);
  491. /** Position in file. */
  492. final int position;
  493. /** The length of the data. */
  494. final int length;
  495. /**
  496.   * Constructs a new element.
  497.   *
  498.   * @param position within file
  499.   * @param length of data
  500.   */
  501. Element(int position, int length) {
  502. this.position = position;
  503. this.length = length;
  504. }
  505. @Override public String toString() {
  506. return getClass().getSimpleName() + “[“
  507. + “position = ” + position
  508. + “, length = ” + length + “]”;
  509. }
  510. }
  511. /**
  512.   * Reads queue elements. Enables partial reads as opposed to reading all
  513.   * of the bytes into a byte[].
  514.   */
  515. public interface ElementReader {
  516. /*
  517.   * TODO: Support remove() call from read().
  518.   */
  519. /**
  520.   * Called once per element.
  521.   *
  522.   * @param in stream of element data. Reads as many bytes as requested,
  523.   * unless fewer than the request number of bytes remains, in which case it
  524.   * reads all the remaining bytes.
  525.   * @param length of element data in bytes
  526.   */
  527. public void read(InputStream in, int length) throws IOException;
  528. }
  529. }
  530. QueueFileTest.java:
  531. package com.squareup.util;
  532. import android.test.AndroidTestCase;
  533. import com.squareup.Square;
  534. import java.io.File;
  535. import java.io.FileNotFoundException;
  536. import java.io.IOException;
  537. import java.io.InputStream;
  538. import java.io.RandomAccessFile;
  539. import java.util.Arrays;
  540. import java.util.LinkedList;
  541. import java.util.Queue;
  542. import junit.framework.ComparisonFailure;
  543. /**
  544.  * Tests for QueueFile.
  545.  *
  546.  * @author Bob Lee (bob@squareup.com)
  547.  */
  548. public class QueueFileTest extends AndroidTestCase {
  549. /**
  550.   * Takes up 33401 bytes in the queue (N*(N+1)/2+4*N). Picked 254 instead of
  551.   * 255 so that the number of bytes isn’t a multiple of 4.
  552.   */
  553. private static int N = 254; //
  554. private static byte[][] values = new byte[N][];
  555. static {
  556. for (int i = 0; i < N; i++) {
  557. byte[] value = new byte[i];
  558. // Example: values[3] = { 3, 2, 1 }
  559. for (int ii = 0; ii < i; ii++) value[ii] = (byte) (i – ii);
  560. values[i] = value;
  561. }
  562. }
  563. private File file;
  564. @Override protected void setUp() throws Exception {
  565. file = getContext().getFileStreamPath(“test.queue”);
  566. file.delete();
  567. }
  568. @Override protected void tearDown() throws Exception {
  569. file.delete();
  570. }
  571. public void testAddOneElement() throws IOException {
  572. // This test ensures that we update ‘first’ correctly.
  573. QueueFile queue = new QueueFile(file);
  574. byte[] expected = values[253];
  575. queue.add(expected);
  576. assertEquals(expected, queue.peek());
  577. queue.close();
  578. queue = new QueueFile(file);
  579. assertEquals(expected, queue.peek());
  580. }
  581. public void testAddAndRemoveElements() throws IOException {
  582. long start = System.nanoTime();
  583. Queue<byte[]> expected = new LinkedList<byte[]>();
  584. for (int round = 0; round < 5; round++) {
  585. QueueFile queue = new QueueFile(file);
  586. for (int i = 0; i < N; i++) {
  587. queue.add(values[i]);
  588. expected.add(values[i]);
  589. }
  590. // Leave N elements in round N, 15 total for 5 rounds. Removing all the
  591. // elements would be like starting with an empty queue.
  592. for (int i = 0; i < N – round – 1; i++) {
  593. assertEquals(expected.remove(), queue.peek());
  594. queue.remove();
  595. }
  596. queue.close();
  597. }
  598. // Remove and validate remaining 15 elements.
  599. QueueFile queue = new QueueFile(file);
  600. assertEquals(15, queue.size());
  601. assertEquals(expected.size(), queue.size());
  602. while (!expected.isEmpty()) {
  603. assertEquals(expected.remove(), queue.peek());
  604. queue.remove();
  605. }
  606. queue.close();
  607. // length() returns 0, but I checked the size w/ ‘ls’, and it is correct.
  608. // assertEquals(65536, file.length());
  609. Square.debug(“Ran in ” + ((System.nanoTime() – start) / 1000000) + “ms.”);
  610. }
  611. /**
  612.   * Tests queue expansion when the data crosses EOF.
  613.   */
  614. public void testSplitExpansion() throws IOException {
  615. // This should result in 3560 bytes.
  616. int max = 80;
  617. Queue<byte[]> expected = new LinkedList<byte[]>();
  618. QueueFile queue = new QueueFile(file);
  619. for (int i = 0; i < max; i++) {
  620. expected.add(values[i]);
  621. queue.add(values[i]);
  622. }
  623. // Remove all but 1.
  624. for (int i = 1; i < max; i++) {
  625. assertEquals(expected.remove(), queue.peek());
  626. queue.remove();
  627. }
  628. // This should wrap around before expanding.
  629. for (int i = 0; i < N; i++) {
  630. expected.add(values[i]);
  631. queue.add(values[i]);
  632. }
  633. while (!expected.isEmpty()) {
  634. assertEquals(expected.remove(), queue.peek());
  635. queue.remove();
  636. }
  637. queue.close();
  638. }
  639. public void testFailedAdd() throws IOException {
  640. QueueFile queueFile = new QueueFile(file);
  641. queueFile.add(values[253]);
  642. queueFile.close();
  643. final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, “rwd”);
  644. queueFile = new QueueFile(braf);
  645. try {
  646. queueFile.add(values[252]);
  647. fail();
  648. } catch (IOException e) { /* expected */ }
  649. braf.rejectCommit = false;
  650. // Allow a subsequent add to succeed.
  651. queueFile.add(values[251]);
  652. queueFile.close();
  653. queueFile = new QueueFile(file);
  654. assertEquals(2, queueFile.size());
  655. assertEquals(values[253], queueFile.peek());
  656. queueFile.remove();
  657. assertEquals(values[251], queueFile.peek());
  658. }
  659. public void testFailedRemoval() throws IOException {
  660. QueueFile queueFile = new QueueFile(file);
  661. queueFile.add(values[253]);
  662. queueFile.close();
  663. final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, “rwd”);
  664. queueFile = new QueueFile(braf);
  665. try {
  666. queueFile.remove();
  667. fail();
  668. } catch (IOException e) { /* expected */ }
  669. queueFile.close();
  670. queueFile = new QueueFile(file);
  671. assertEquals(1, queueFile.size());
  672. assertEquals(values[253], queueFile.peek());
  673. queueFile.add(values[99]);
  674. queueFile.remove();
  675. assertEquals(values[99], queueFile.peek());
  676. }
  677. public void testFailedExpansion() throws IOException {
  678. QueueFile queueFile = new QueueFile(file);
  679. queueFile.add(values[253]);
  680. queueFile.close();
  681. final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, “rwd”);
  682. queueFile = new QueueFile(braf);
  683. try {
  684. // This should trigger an expansion which should fail.
  685. queueFile.add(new byte[8000]);
  686. fail();
  687. } catch (IOException e) { /* expected */ }
  688. queueFile.close();
  689. queueFile = new QueueFile(file);
  690. assertEquals(1, queueFile.size());
  691. assertEquals(values[253], queueFile.peek());
  692. assertEquals(4096, queueFile.fileLength);
  693. queueFile.add(values[99]);
  694. queueFile.remove();
  695. assertEquals(values[99], queueFile.peek());
  696. }
  697. public void testPeakWithElementReader() throws IOException {
  698. QueueFile queueFile = new QueueFile(file);
  699. final byte[] a = { 1, 2 };
  700. queueFile.add(a);
  701. final byte[] b = { 3, 4, 5 };
  702. queueFile.add(b);
  703. queueFile.peek(new QueueFile.ElementReader() {
  704. public void read(InputStream in, int length) throws IOException {
  705. assertEquals(length, 2);
  706. byte[] actual = new byte[length];
  707. in.read(actual);
  708. assertEquals(a, actual);
  709. }
  710. });
  711. queueFile.peek(new QueueFile.ElementReader() {
  712. public void read(InputStream in, int length) throws IOException {
  713. assertEquals(length, 2);
  714. assertEquals(1, in.read());
  715. assertEquals(2, in.read());
  716. assertEquals(-1, in.read());
  717. }
  718. });
  719. queueFile.remove();
  720. queueFile.peek(new QueueFile.ElementReader() {
  721. public void read(InputStream in, int length) throws IOException {
  722. assertEquals(length, 3);
  723. byte[] actual = new byte[length];
  724. in.read(actual);
  725. assertEquals(b, actual);
  726. }
  727. });
  728. assertEquals(b, queueFile.peek());
  729. assertEquals(1, queueFile.size());
  730. }
  731. public void testForEach() throws IOException {
  732. QueueFile queueFile = new QueueFile(file);
  733. final byte[] a = { 1, 2 };
  734. queueFile.add(a);
  735. final byte[] b = { 3, 4, 5 };
  736. queueFile.add(b);
  737. final int[] iteration = new int[] { 0 };
  738. QueueFile.ElementReader elementReader = new QueueFile.ElementReader() {
  739. public void read(InputStream in, int length) throws IOException {
  740. if (iteration[0] == 0) {
  741. assertEquals(length, 2);
  742. byte[] actual = new byte[length];
  743. in.read(actual);
  744. assertEquals(a, actual);
  745. } else if (iteration[0] == 1) {
  746. assertEquals(length, 3);
  747. byte[] actual = new byte[length];
  748. in.read(actual);
  749. assertEquals(b, actual);
  750. } else {
  751. fail();
  752. }
  753. iteration[0]++;
  754. }
  755. };
  756. queueFile.forEach(elementReader);
  757. assertEquals(a, queueFile.peek());
  758. assertEquals(2, iteration[0]);
  759. }
  760. /**
  761.   * Compares two byte[]s for equality.
  762.   */
  763. private static void assertEquals(byte[] expected, byte[] actual) {
  764. if (!Arrays.equals(expected, actual)) {
  765. throw new ComparisonFailure(null, Arrays.toString(expected),
  766. Arrays.toString(actual));
  767. }
  768. }
  769. /**
  770.   * A RandomAccessFile that can break when you go to write the COMMITTED
  771.   * status.
  772.   */
  773. static class BrokenRandomAccessFile extends RandomAccessFile {
  774. boolean rejectCommit = true;
  775. BrokenRandomAccessFile(File file, String mode)
  776. throws FileNotFoundException {
  777. super(file, mode);
  778. }
  779. @Override public void write(byte[] buffer) throws IOException {
  780. if (rejectCommit && getFilePointer() == 0) {
  781. throw new IOException(“No commit for you!”);
  782. }
  783. super.write(buffer);
  784. }
  785. }
  786. }

标签