前面我们详细讲了Java-NIO分析-8-Selector详解和Java-NIO分析-9-从BSD-socket到SocketChannel, 分别是NIO的事件分发器和非阻塞处理器.
为了支持Channel
的双向读写和Scatter/Gather
操作,我们还需要Buffer
,将I/O数据存储备用。普通的Buffer都是JVM堆内的Buffer, 比较好理解.
接下来我们聊聊JVM使用堆外内存的沧桑历史以及为什么要设计出DirectBuffer
。
首先我们要从没有NIO的前JDK1.4时代开始说起,那会儿大家使用的是SocketInputStream
和SocketOutputStream
.
1. Java传统的Socket是如何收发数据的?
以SocketOutputStream
为例,继承类图如下:
其实大家应该比较熟悉,java.io
包里最根本的抽象就是InputStream
和OutputStream
,其余的类抽象和实现都是一堆装饰器。
所以我们要跟踪的就是write方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
// java.io.SocketOutputStream
public void write(byte b[], int off, int len) throws IOException {
// 委托给本类的socketWrite方法
socketWrite(b, off, len);
}
/**
* Writes to the socket with appropriate locking of the
* FileDescriptor.
* @param b the data to be written
* @param off the start offset in the data
* @param len the number of bytes that are written
* @exception IOException If an I/O error has occurred.
*/
private void socketWrite(byte b[], int off, int len) throws IOException {
...省略非关键代码
FileDescriptor fd = impl.acquireFD();
try {
// 上面就是做了一些参数判断,然后委托给socketWrite0
socketWrite0(fd, b, off, len);
} catch (SocketException se) {
...
} finally {
impl.releaseFD();
}
}
/**
* Writes to the socket.
* @param fd the FileDescriptor
* @param b the data to be written
* @param off the start offset in the data
* @param len the number of bytes that are written
* @exception IOException If an I/O error has occurred.
*/
private native void socketWrite0(FileDescriptor fd, byte[] b, int off,
int len) throws IOException;
|
最后调用的是一个native方法socketWrite0
,打开jdk/src/solaris/native/java/net/SocketOutputStream.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
/*
* Class: java_net_SocketOutputStream
* Method: socketWrite0
* Signature: (Ljava/io/FileDescriptor;[BII)V
*/
JNIEXPORT void JNICALL
Java_java_net_SocketOutputStream_socketWrite0(JNIEnv *env, jobject this,
jobject fdObj,
jbyteArray data,
jint off, jint len) {
char *bufP;
char BUF[MAX_BUFFER_LEN];
int buflen;
int fd;
if (IS_NULL(fdObj)) {
JNU_ThrowByName(env, "java/net/SocketException", "Socket closed");
return;
} else {
fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
/* Bug 4086704 - If the Socket associated with this file descriptor
* was closed (sysCloseFD), the the file descriptor is set to -1.
*/
if (fd == -1) {
JNU_ThrowByName(env, "java/net/SocketException", "Socket closed");
return;
}
}
if (len <= MAX_BUFFER_LEN) {
bufP = BUF;
buflen = MAX_BUFFER_LEN;
} else {
buflen = min(MAX_HEAP_BUFFER_LEN, len);
// 初始化一块直接内存
bufP = (char *)malloc((size_t)buflen);
/* if heap exhausted resort to stack buffer */
if (bufP == NULL) {
bufP = BUF;
buflen = MAX_BUFFER_LEN;
}
}
while(len > 0) {
int loff = 0;
int chunkLen = min(buflen, len);
int llen = chunkLen;
// 将堆内的data复制到刚才初始化的内存bufP里
(*env)->GetByteArrayRegion(env, data, off, chunkLen, (jbyte *)bufP);
while(llen > 0) {
// 发送数据
int n = NET_Send(fd, bufP + loff, llen, 0);
if (n > 0) {
llen -= n;
loff += n;
continue;
}
if (n == JVM_IO_INTR) {
JNU_ThrowByName(env, "java/io/InterruptedIOException", 0);
} else {
if (errno == ECONNRESET) {
JNU_ThrowByName(env, "sun/net/ConnectionResetException",
"Connection reset");
} else {
NET_ThrowByNameWithLastError(env, "java/net/SocketException",
"Write failed");
}
}
if (bufP != BUF) {
free(bufP);
}
return;
}
len -= chunkLen;
off += chunkLen;
}
if (bufP != BUF) {
free(bufP);
}
}
|
这段比较短,就没有省略代码了。
大致分3步:
- 申请一块直接内存bufP, 长度为len
- 将堆内的data复制到刚才初始化的内存bufP里
- 调用Net_send函数发送数据
Net_send
是一个宏,定义在net_util_md.h
里
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// net_util_md.h
#define NET_Send JVM_Send
// jvm.cpp
JVM_LEAF(jint, JVM_Send(jint fd, char *buf, jint nBytes, jint flags))
JVMWrapper2("JVM_Send (0x%x)", fd);
//%note jvm_r6
return os::send(fd, buf, (size_t)nBytes, (uint)flags);
JVM_END
// os_linux.inline.hpp
inline int os::send(int fd, char* buf, size_t nBytes, uint flags) {
RESTARTABLE_RETURN_INT(::send(fd, buf, nBytes, flags));
}
|
这个宏替换的是JVM_Send
,是一个cpp封装方法,最后会调用os::send
,这个在os_linux.inline.hpp
里,可以看到,最后调用的是一个全局函数send
。
send
是我们的老朋友了,属于POSIX的标准Socket API
, 如果你在类Unix系统上,都可以通过man send
来翻看它的文档。它的函数签名如下:
1
2
3
4
|
#include <sys/socket.h>
ssize_t
send(int socket, const void *buffer, size_t length, int flags);
|
这个函数用来将首地址为buffer, 长度为length的数据发送到socket fd
.
所以传统的Java Socket编程每次发送数据的时候,都会申请一块直接内存(堆外),然后从堆内复制到堆外,最后在调用send发送
为什么要把数据从堆内复制到堆外呢?因为堆内的对象地址会随着gc改变,在send的时候会崩。
在高并发场景下,这是非常费内存的,假如每个链接发送的数据是1k, 那么堆内有1K的数据,堆外还要申请1K的数据,还要做数据拷贝,百万链接就需要2T的内存(极端场景), 无疑是瓶颈之一。
2. NIO的解决方案: DirectBuffer
既然Socket Api一定要用堆外的内存,一个解决思路就是复用这块内存,这样就不必每次都申请一块新的内存,减少系统调用损耗。
计算机世界就是解决一个问题的同时带来新的问题,如果复用了这块内存,如何在不用的时候进行回收就是一个问题了,因为这块内存在堆外,JVM的gc管不到,放着不管又迟早触发Linux的OOM Killer
.
jdk1.4以后的NIO带来了解决方案: DirectBuffer
2.1 DirectBuffer
JVM堆内的对象是带gc的,那么将JVM堆内的对象关联到堆外,在回收堆内对象的时候触发一个回收堆外内存的操作,就可以解决这个问题了。
这个设计思路的常用实现就是DirectByteBuffer
, 打开其代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
// Primary constructor
//
DirectByteBuffer(int cap) { // package-private
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
// 主要是记录jdk已经使用的直接内存的数量,当分配直接内存时,需要进行增加,当释放时,需要减少
Bits.reserveMemory(size, cap);
long base = 0;
try {
// 分配直接内存
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
// 内存清零
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
// Round up to page boundary
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
// 创建Cleaner对象
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
private ByteBuffer putShort(long a, short x) {
if (unaligned) {
short y = (x);
unsafe.putShort(a, (nativeByteOrder ? y : Bits.swap(y)));
} else {
Bits.putShort(a, x, bigEndian);
}
return this;
}
|
在构造器里就会申请一块直接内存,大小就是ByteBuffer.allocateDirect
的时候指定的。
在调用DirectByteBuffer
的PutXXX
的时候,都是写入到这块直接内存的,无需再去malloc
.
所以SocketChannel
使用DirectBuffer
进行读写的时候,性能远比SocketOutputStream
高,需要的内存也没有它多, 对SocketChannel
有疑问的可以看看前面分析的Java-NIO分析-9-从BSD-socket到SocketChannel
2.2 何时回收直接内存?
上面的构造器最后一行是使用了Cleaner
对象, 这个类是个虚引用
类,继承自PhantomReference
.
1
2
3
|
// 创建Cleaner对象
cleaner = Cleaner.create(this,
new Deallocator(base, size, cap));
|
create第一个参数是要观察的引用,第二个参数是引用被回收触发的操作。
在JVM中,虚引用是为了实现更细粒度的内存控制的手段,在创建虚引用的时候必须传入一个引用队列(ReferenceQueue),在一个对象的finalize函数被调用之后,这个对象的虚引用会被加入引用队列, 通过检查队列就可以知道对象是不是要被回收了。
sun.misc.Cleaner
就是一个自带ReferenceQueue
的类, 在create
的时候会将DirectBuffer
的引用加入观察,一旦引用被回收,JVM将会通知Cleaner去执行回收操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public class Cleaner
extends PhantomReference<Object>
{
// 引用队列
private static final ReferenceQueue<Object> dummyQueue = new ReferenceQueue<>();
...
private Cleaner(Object referent, Runnable thunk) {
super(referent, dummyQueue);
this.thunk = thunk;
}
...
}
|
2.3 如何回收直接内存?
在DirectBuffer
中有个内部类Deallocator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
private static class Deallocator
implements Runnable
{
private static Unsafe unsafe = Unsafe.getUnsafe();
private long address;
private long size;
private int capacity;
private Deallocator(long address, long size, int capacity) {
assert (address != 0);
this.address = address;
this.size = size;
this.capacity = capacity;
}
public void run() {
if (address == 0) {
// Paranoia
return;
}
// 调用unsafe释放内存
unsafe.freeMemory(address);
address = 0;
Bits.unreserveMemory(size, capacity);
}
}
|
在回收的时候调用其run方法,可以看到就是调用sun.misc.unsafe
去释放的。这里提下,unsafe类里都是静态native方法,其实就是堆大家常用的各种free
,malloc
这样的内存操作函数的封装,因为java不能操作直接内存。
One more thing
构造DirectBuffer
的时候,有一行Bits.reserveMemory
, 功能主要是记录jdk已经使用的直接内存的数量, but会判断有没有足够多的直接内存使用,如果内存不够,会触发gc..
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// These methods should be called whenever direct memory is allocated or
// freed. They allow the user to control the amount of direct memory
// which a process may access. All sizes are specified in bytes.
static void reserveMemory(long size, int cap) {
synchronized (Bits.class) {
// 判断内存够不够, 够的话直接增加内存引用的计数
if (!memoryLimitSet && VM.isBooted()) {
maxMemory = VM.maxDirectMemory();
memoryLimitSet = true;
}
// -XX:MaxDirectMemorySize limits the total capacity rather than the
// actual memory usage, which will differ when buffers are page
// aligned.
if (cap <= maxMemory - totalCapacity) {
reservedMemory += size;
totalCapacity += cap;
count++;
return;
}
}
// 内存不够就去建议JVM gc了。。
System.gc();
try {
// 等待垃圾回收
Thread.sleep(100);
} catch (InterruptedException x) {
// Restore interrupt status
Thread.currentThread().interrupt();
}
synchronized (Bits.class) {
// 增加直接内存的计数
if (totalCapacity + cap > maxMemory)
throw new OutOfMemoryError("Direct buffer memory");
reservedMemory += size;
totalCapacity += cap;
count++;
}
}
|
3. 总结
本文从SocketStream
开始讲起,分析了祖传BIO里Socket的内存缺陷和性能瓶颈:
- 每次发送都需要
malloc
一块新直接内存,
- 要将数据从堆内copy到堆外,然后再send.
然后介绍了NIO的对堆外内存改进–即复用堆外内存, 以及复用堆外内存引出的堆外内存回收问题, 并从源码层面解读了其如何创建和回收。NIO的改进只是针对1,但是对2没改进,没准儿这是java吃内存的原因之一。
DirectBuffer
是我们常用的冰山对象
,使用的时候要慎重,如果该对象在年轻代, 容易gc, 回收的时候直接内存可以被释放掉。但是在存活了几次gc之后,被移入年老代,就不容易gc了。 其所关联的直接内存也不会被释放。
这也是常见的一个内存泄漏原因,看JVM堆内没用多少内存,但是机器上内存所剩无几,还频繁gc, 频繁触发OOM Killer
, 希望看完本文大家能意识到问题出在哪 :)