IoTDBWAL解析之InputStream.available()在SingleFileLogReader的应用

database

先来看为什么我要单独查看这个方法,当我阅读IoTDB 的wal 读取方法的时候,发现读取数据的时候根据 available() 方法获取当前可读取的数据量,但是在网络编程中,应用这个方法会有个问题

查看 InputStream 的方法注释,available 是个非阻塞的操作,在网络拥堵情况下不会等待数据流全部返回以后才执行,在业务数量大的时候,服务方数据同步返回时间比较长,还未等到数据流返回,程序已经开始执行 is.available(),从而导致服务方有返回数据,而is.available()=0 

InputStream.available() 的默认实现 永远返回 0 ,注意到注释This method should be overridden by subclasses. 告诉我们子类需要复写该方法,代码如下

IoTDB 的 SingleFileLogReader 判断是否还有需要读取的执行计划的判断依据之一是根据返回的可读取数量和 LEAST_LOG_SIZE = 12  进行比较。

if (logStream.available() < LEAST_LOG_SIZE) {

return false;

}

现在我们来看看IoTDB 中使用这个方法是否会有问题?

logStream 的实例化方式见 open 方法

public void open(File logFile) throws FileNotFoundException {

close();

logStream = new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)));

logger.info("open WAL file: {} size is {}", logFile.getName(), logFile.length());

this.filepath = logFile.getPath();

idx = 0;

}

可以看到 inputStream 是通过 FileInputStream 进行初始化的,FileInputStream 又是调用了 available0 的native 方法

public int available() throws IOException {

return available0();

}

private native int available0() throws IOException;

我以 https://github.com/openjdk/jdk  在 window 的实现为例,找到  src/java.base/share/native/libjava/FileInputStream.c 文件 ,代码如下

JNIEXPORT jint JNICALL

Java_java_io_FileInputStream_available0(JNIEnv *env, jobject this) {

jlong ret;

FD fd = getFD(env, this, fis_fd);

if (fd == -1) {

JNU_ThrowIOException (env, "Stream Closed");

return 0;

}

if (IO_Available(fd, &ret)) {

if (ret > INT_MAX) {

ret = (jlong) INT_MAX;

} else if (ret < 0) {

ret = 0;

}

return jlong_to_jint(ret);

}

JNU_ThrowIOExceptionWithLastError(env, NULL);

return 0;

}

调用了 IO_Available 方法返回了文件的大小,这个方法 在 src/java.base/unix/native/libjava/io_util_md.h 头文件中做了定义

#define IO_Available handleAvailable

也就是调用了 src/java.base/unix/native/libjava/io_util_md.c 文件中的 handleAvailable 方法 代码如下

int

handleAvailable(FD fd, jlong *pbytes) {

HANDLE h = (HANDLE)fd;

DWORD type = 0;

type = GetFileType(h);

/* Handle is for keyboard or pipe */

if (type == FILE_TYPE_CHAR || type == FILE_TYPE_PIPE) {

int ret;

long lpbytes;

HANDLE stdInHandle = GetStdHandle(STD_INPUT_HANDLE);

if (stdInHandle == h) {

ret = handleStdinAvailable(fd, &lpbytes); /* keyboard */

} else {

ret = handleNonSeekAvailable(fd, &lpbytes); /* pipe */

}

(*pbytes) = (jlong)(lpbytes);

return ret;

}

/* Handle is for regular file */

if (type == FILE_TYPE_DISK) {

jlong current, end;

LARGE_INTEGER filesize;

current = handleLseek(fd, 0, SEEK_CUR);

if (current < 0) {

return FALSE;

}

if (GetFileSizeEx(h, &filesize) == 0) {

return FALSE;

}

end = long_to_jlong(filesize.QuadPart);

*pbytes = end - current;

return TRUE;

}

return FALSE;

}

查看GetFileTypeGetFileSizeEx的API文档

// FILE_TYPE_CHAR 字符文件,典型的如:打印设备或控制台

// FILE_TYPE_DISK 磁盘文件

// FILE_TYPE_PIPE 管道文件,如Socket,命名管道,匿名管道

// FILE_TYPE_REMOTE 未使用

// FILE_TYPE_UNKNOWN 未知设备,或者函数调用出错

DWORD GetFileType(

// 文件句柄

_In_ HANDLE hFile

);

BOOL GetFileSizeEx(

// 文件句柄

_In_ HANDLE hFile,

// 接收文件大小的长整型指针

_Out_ PLARGE_INTEGER lpFileSize

);

结论就是 FileInputStream 复写了 InputStream 的  available() 方法, 使用 C 通过传入的File 转换为 文件句柄 HANDLE  调用 windows  API  来获取文件大小,所以不存在网络拥堵带来的问题。

 

参考

https://stackoverflow.com/questions/5826198/inputstream-available-is-0-always

https://blog.csdn.net/qq_36918149/article/details/103022221

 

 

 

 

以上是 IoTDBWAL解析之InputStream.available()在SingleFileLogReader的应用 的全部内容, 来源链接: utcz.com/z/535456.html

回到顶部