Netty의 EventLoop 특성 이해하기
오늘은 Netty의 EventLoop에 대해 이해를 높이고자 조금 찾아보고 공부한 내용을 정리한다.
Client Code
package com.example.demo.netty;
import com.example.demo.DemoApplication;
import com.example.demo.netty.handler.ClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class NettyClient implements ApplicationListener<ContextClosedEvent> {
@Value("${netty.host}")
String host;
@Value("${netty.port}")
int port;
EventLoopGroup group = new NioEventLoopGroup();
public void start() {
log.info("Netty Client start.");
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// Logging
ch.pipeline().addLast(new LoggingHandler());
// Decoder
// Handler
ch.pipeline().addLast(new ClientHandler());
// Encoder
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
} catch (InterruptedException e) {
log.error("Client Error: {}", e.getMessage());
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
@Override
public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
try {
// sync로 shutdown 동기 처리
group.shutdownGracefully().sync();
log.info("Client Graceful shutdown.");
} catch (Exception e) {
log.error("Client Graceful shutdown failed.");
}
}
// 30초마다 Connection 확인
@Scheduled(initialDelay = 30000, fixedRate = 30000)
public void connectionChecking() {
if (!DemoApplication.connected) {
log.info("Client reconnecting...");
start();
}
}
}
이전에 작성한 Netty TCP/IP Client 코드 중 일부다.
Netty는 기본적으로 Non-blocking 방식으로 동작하고, Blocking 작업이 추가되면 엄청나게 성능이 느려질 수 있다.
Netty의 주요 컴포넌트
컴포넌트 | 역할 |
EventLoop | I/O 작업을 처리하는 단일 스레드 루프 연결된 채널의 이벤트(Read/Write... etc)를 순차적으로 처리함 |
Channel | 클라이언트와 서버 간의 연결을 나타낸다. SocketChannel, DatagramChannel(UDP) 등이 있음 |
ChannelPipeline | 이벤트가 흐르는 파이프라인 여러 Handler를 체이닝하여 이벤트 가공 또는 처리 진행 |
ChannelHandler | 실제 네트워크 이벤트를 처리하는 클래스 인바운드(ChannelInboundHandler)와 아웃바운드(ChannelOutboundHandler)로 나뉨 |
Bootstrap ServerBootstrap |
클라이언트(Bootstrap)와 서버(ServerBootstrap)를 설정하고 시작함 |
ChannelFuture | 비동기 작업의 결과를 나타내고, 작업 완료 성공 & 실패 여부를 알 수 있음 |
ByteBuf | Netty가 데이터를 읽고 쓰는데 사용하는 버퍼 NIO ByteBuffer보다 유연하고 성능이 좋음 |
이정도로 정리할 수 있다.
Netty의 이벤트 기반 처리 방식
EventLoop & EventLoopGroup
- EventLoop: 단일 스레드로 구성되어 있고, 채널의 이벤트를 처리함. 각 채널은 하나의 EventLoop에 바인딩되어, 해당 스레드에서 모든 이벤트를 처리한다.
- EventLoopGroup: 여러 EventLoop를 관리하는 그룹, 일반적으로 BossGroup(클라이언트 연결 수락), WorkerGroup(데이터 처리)로 구성함.
Channel & ChannelPipeline
- Channel: 클라이언트와 서버 간 연결을 나타내고, TCP의 경우 연결된 소켓, UDP의 경우 바인딩된 포트를 의미
- ChannelPipeline: 채널에 연결된 핸들러들의 체인, 이벤트는 파인프라인을 통해 순차적으로 핸들러를 거치며 처리
ChannelHandler
- ChannelInboundHandler: 수신 이벤트(데이터 읽기, 연결 수락 등) 처리
- ChannelOutboundHandler: 송신 이벤트(데이터 쓰기, 연결 종료 등) 처리
Netty 이벤트 흐름 다이어그램
간단하게 설명하면 ChannelPipeline은 핸들러들이 순차적으로 연결되어 있는 체인 구조이고, 이 구조를 통해 이벤트가 흐르고 핸들러가 원하는 처리를 한다.
데이터 수신
- Inbound Handler 1은 다음 핸들러로 이벤트를 넘기기 위해 ctx.fireChannelRead(msg) 같은 메서드를 호출합니다.
- 사진 기준으로 Socket.read() 이후 Inbound Handler 1부터 Inbound Handler N-1, Inbound Handler N 까지 도달한다.
데이터 전송
- Outbound Handler 1은 다음 핸들러를 넘기기 위해 ctx.write(msg) 또는 ctx.flush(msg), ctx.flushAndWrite(msg)를 호출하여 이벤트를 위로 전달합니다.
- 사진 기준으로 Channel 또는 ChannelHandlerContext로 부터 Outbound Handler 1, ..., Outbound Handler N-1, Outbound Handler N 까지 도달한 후 Socket.write() 하게 된다.
EventLoop의 기본 생성 Threads
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
기본적으로 EventLoop는 Core * 2를 따른다.
1 CPU라면 workerGroup에는 2개의 EventLoop가 생성된다.
해당 내용들을 통해 작성했던 Client Code를 조금 고도화 해보자.
개선
1. 연결 관리 구조 분리
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
서버와의 연결이 해제되면 bootstrap의 재생성을 막고(비용 감소), 전역으로 관리.
2. workerGroup Thread의 개수를 지정
EventLoopGroup group = new NioEventLoopGroup(50);
대략 50개를 지정했지만, LoopGroup이 많으면 비용이 증가하니 주의하자.
3. JDBC 통신과 같은 Blocking I/O 작업은 별도 Handler로 분리
결과
package com.example.demo.netty;
import com.example.demo.DemoApplication;
import com.example.demo.netty.handler.ClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class NettyClient implements ApplicationListener<ContextClosedEvent> {
@Value("${netty.host}")
String host;
@Value("${netty.port}")
int port;
EventLoopGroup group = new NioEventLoopGroup(50);
Bootstrap bootstrap = new Bootstrap();
public void start() {
log.info("Netty Client start.");
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// Logging
ch.pipeline().addLast(new LoggingHandler());
// Decoder
// Handler
ch.pipeline().addLast(new ClientHandler());
// Blocking I/O Handler
// Encoder
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
} catch (InterruptedException e) {
log.error("Client Error: {}", e.getMessage());
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
@Override
public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
try {
// sync로 shutdown 동기 처리
group.shutdownGracefully().sync();
log.info("Client Graceful shutdown.");
} catch (Exception e) {
log.error("Client Graceful shutdown failed.");
}
}
// 30초마다 Connection 확인
@Scheduled(initialDelay = 30000, fixedRate = 30000)
public void connectionChecking() {
if (!DemoApplication.connected) {
log.info("Client reconnecting...");
start();
}
}
}
대략적으로 Netty의 구조에 대해 조금 더 알 수 있게 된 경험이었다.
Blocking I/O, Non-blocking I/O 둘 다 관심갖게 되고, 특히 이벤트 기반으로 처리한다는게 조금은 더 매력적이었다.
나중에는 R2DBC도 테스트를 해보고, Armeria도 공부를 해봐야겠다.
출처
- https://netty.io/4.1/api/io/netty/channel/ChannelPipeline.html