public class BufferedInputStream extends FilterInputStream {

private static int defaultBufferSize = 8192;

/*内部的缓冲区*/

protected volatile byte buf[];

/**

* Atomic updater to provide compareAndSet for buf. This is necessary

* because closes can be asynchronous. We use nullness of buf[] as primary

* indicator that this stream is closed. (The "in" field is also nulled out

* on close.)

*/

private static final AtomicReferenceFieldUpdater bufUpdater = AtomicReferenceFieldUpdater

.newUpdater(BufferedInputStream.class, byte[].class, "buf");

/*缓冲区内部的有效字节数,在0到buf.length之间*/

protected int count;

/**

* The current position in the buffer. This is the index of the next

* character to be read from the buf array.

*

* This value is always in the range 0 through

* count. If it is less than count, then

* buf[pos] is the next byte to be supplied as input; if it is

* equal to count, then the next read or

* skip operation will require more bytes to be read from the

* contained input stream.

*

* @see java.io.BufferedInputStream#buf

*/

/*1. 缓冲区内部的当前位置,也就是下一次从缓冲区读取时的开始位置, 一般处于0到count之间

2. 如果pos大于count,那么下次读取将从潜在的input stream读取*/

protected int pos;

/*1. 当调用mark方法时,记录pos的值,处于-1到pos之间。

2. 如果没有marked,那么它的值为-1,

3. 如果做了marked,那么当调用reset之后,buf[markpos]成为第一个将被读取的字节

4. 如果marpos不是-1, 那么在buf[markpos]到buf[pos-1]之间的字节将被保留在缓冲区中,直到pos和markpos之间的差大于marklimit才会被丢弃

protected int markpos = -1;

/**

* The maximum read ahead allowed after a call to the mark

* method before subsequent calls to the reset method fail.

* Whenever the difference between pos and markpos

* exceeds marklimit, then the mark may be dropped by setting

* markpos to -1.

*

* @see java.io.BufferedInputStream#mark(int)

* @see java.io.BufferedInputStream#reset()

*/

/*这个参数一致不太懂,保留英文注释*/

protected int marklimit;

private InputStream getInIfOpen() throws IOException {

InputStream input = in;

if (input == null)

throw new IOException("Stream closed");

return input;

}

private byte[] getBufIfOpen() throws IOException {

byte[] buffer = buf;

if (buffer == null)

throw new IOException("Stream closed");

return buffer;

}

public BufferedInputStream(InputStream in) {

this(in, defaultBufferSize);

}

public BufferedInputStream(InputStream in, int size) {

super(in);

if (size <= 0) {

throw new IllegalArgumentException("Buffer size <= 0");

}

buf = new byte[size];

}

private void fill() throws IOException {

byte[] buffer = getBufIfOpen();

if (markpos < 0) /*没有做标记, 这个可以看做是第一次读取时的情况 记做case1*/

pos = 0;

else if (pos >= buffer.length) /*说明已经读完了buffer中的最后一个字节*/

if (markpos > 0) { /*说明做过标记,记做case2*/

int sz = pos - markpos; /*计算有多少字节需要保留)

System.arraycopy(buffer, markpos, buffer, 0, sz); /*把做了标记之后的字节拷贝到缓冲区的开头*/

pos = sz; /*把pos置为保留的那些字节之后的那个位置,也就是下一次要读取的位置*/

markpos = 0;

} else if (buffer.length >= marklimit) { /*我能想到进入到这个分支的情况是,在初始化之后,pos=0,这时调用mark(0),

/*然后一直读到buffer的最后一个字节,

/*然后再读的时候, 这时候pos>=buf.length, markpos=0, buf.length >= marklimit

markpos = -1;

pos = 0;

} else { /* grow buffer */ /*我能想到进入到这个分支的情况是,初始化之后,pos=0, 调用mark(>buf.length)

/*然后一直读到buffer的最后一个字节,

/*然后再读的时候, 这时候pos>=buf.length, markpos=0, buf.length < marklimit

/*这时需要扩展空间,new一个新buf,然后把旧buf里的字节拷贝过去,pos定位到

/*拷贝后的那个位置

int nsz = pos * 2;

if (nsz > marklimit)

nsz = marklimit;

byte nbuf[] = new byte[nsz];

System.arraycopy(buffer, 0, nbuf, 0, pos);

if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {

// Can't replace buf if there was an async close.

// Note: This would need to be changed if fill()

// is ever made accessible to multiple threads.

// But for now, the only way CAS can fail is via close.

// assert buf == null;

throw new IOException("Stream closed");

}

buffer = nbuf;

}

count = pos;

int n = getInIfOpen().read(buffer, pos, buffer.length - pos); /*从潜在的input stream中读取数据,读取的字节数是n*/

if (n > 0)

count = n + pos;

}

public synchronized int read() throws IOException {

if (pos >= count) { /*如果pos>=count,说明pos已经到了有效字节的末尾,需要重新从潜在的inputStream中读取新数据*/

fill(); /*所以要调用fill()*/

if (pos >= count)

return -1;

}

return getBufIfOpen()[pos++] & 0xff;

}

private int read1(byte[] b, int off, int len) throws IOException {

int avail = count - pos;

if (avail <= 0) {

/*

* If the requested length is at least as large as the buffer, and

* if there is no mark/reset activity, do not bother to copy the

* bytes into the local buffer. In this way buffered streams will

* cascade harmlessly.

*/

if (len >= getBufIfOpen().length && markpos < 0) {

return getInIfOpen().read(b, off, len);

}

fill();

avail = count - pos;

if (avail <= 0)

return -1;

}

int cnt = (avail < len) ? avail : len;

System.arraycopy(getBufIfOpen(), pos, b, off, cnt);

pos += cnt;

return cnt;

}

public synchronized int read(byte b[], int off, int len) throws IOException {

getBufIfOpen(); // Check for closed stream

if ((off | len | (off + len) | (b.length - (off + len))) < 0) {

throw new IndexOutOfBound***ception();

} else if (len == 0) {

return 0;

}

int n = 0;

for (;;) {

int nread = read1(b, off + n, len - n);

if (nread <= 0)

return (n == 0) ? nread : n;

n += nread;

if (n >= len)

return n;

// if not closed but no bytes available, return

InputStream input = in;

if (input != null && input.available() <= 0)

return n;

}

}

public synchronized long skip(long n) throws IOException {

getBufIfOpen(); // Check for closed stream

if (n <= 0) {

return 0;

}

long avail = count - pos; /*先计算剩余的有效字节数*/

if (avail <= 0) { /*如果没有有效字节了*/

// If no mark position set then don't keep in buffer

if (markpos < 0)

return getInIfOpen().skip(n); /*直接在潜在的inputStream中skip

// Fill in buffer to save bytes for reset

fill(); /*如果做了标记,那么就调用fill(),把标记之后的字节保存,再读取,再准备下面的skip, 记做case3

avail = count - pos; /*读取后,有效字节数和当前位置的差*/

if (avail <= 0)

return 0;

}

long skipped = (avail < n) ? avail : n; /*看看要skip的字节数和可以被skip的字节数哪个小,就用哪个*/

pos += skipped; /*定位pos到当前位置加上要skip的字节数*/

return skipped; /*返回实际skip的字节数*/

}

public synchronized int available() throws IOException {

return getInIfOpen().available() + (count - pos);

}

public synchronized void mark(int readlimit) {

marklimit = readlimit;

markpos = pos;

}

public synchronized void reset() throws IOException {

getBufIfOpen(); // Cause exception if closed

if (markpos < 0)

throw new IOException("Resetting to invalid mark");

pos = markpos;

}

public boolean markSupported() {

return true;

}

public void close() throws IOException {

byte[] buffer;

while ((buffer = buf) != null) {

if (bufUpdater.compareAndSet(this, buffer, null)) {

InputStream input = in;

in = null;

if (input != null)

input.close();

return;

}

// Else retry in case a new buf was CASed in fill()

}

}

}

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐