This example is to demonstrate
- send and receive DNS UDP packets
- add netty's built-in ChannelHandler
- using Reactor Netty's interface to build send and receive action
- terminate connection
Code of netty is here and using following library
- netty 4.1.107.Final
- Project Reactor 3.6.3
- Reactor Netty 1.1.16
Code
package example;
import io.netty.buffer.ByteBufUtil;
import io.netty.handler.codec.dns.*;
import io.netty.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;
import java.net.InetSocketAddress;
public class DnsUdpClient {
private static final String SERVER_HOST = "8.8.8.8";
private static final int SERVER_PORT = 53;
private static final Logger LOGGER = LoggerFactory.getLogger(DnsUdpClient.class);
// (1)
private static void handleQueryResp(DatagramDnsResponse msg) {
if (msg.count(DnsSection.QUESTION) > 0) {
DnsQuestion question = msg.recordAt(DnsSection.QUESTION, 0);
LOGGER.info("name: {}", question.name());
}
for (int i = 0, count = msg.count(DnsSection.ANSWER); i < count; i++) {
DnsRecord r = msg.recordAt(DnsSection.ANSWER, i);
if (r.type() == DnsRecordType.A) {
//just print the IP after query
DnsRawRecord raw = (DnsRawRecord) r;
LOGGER.info("{}", NetUtil.bytesToIpAddress(ByteBufUtil.getBytes(raw.content())));
}
}
}
public static void main(String[] args) {
UdpClient client = UdpClient.create()
.host(SERVER_HOST).port(SERVER_PORT) // (2)
.wiretap(true) // (3)
/*
// (4)
.doOnChannelInit((observer, channel, remoteAddress) -> {
Connection c = Connection.from(channel);
c.addHandlerLast(new DatagramDnsQueryEncoder());
c.addHandlerLast(new DatagramDnsResponseDecoder());
LOGGER.info("pipeline={}", channel.pipeline());
});
*/
// (5)
.doOnConnected(c -> {
c.addHandlerLast(new DatagramDnsQueryEncoder());
c.addHandlerLast(new DatagramDnsResponseDecoder());
LOGGER.info("pipeline={}", c.channel().pipeline());
});
Connection conn = client.connectNow(); // (6)
// (7)
conn.inbound().receiveObject()
.doOnNext(obj -> {
DatagramDnsResponse r = (DatagramDnsResponse) obj;
LOGGER.info("response={}", obj);
handleQueryResp(r);
})
.doOnError(err -> LOGGER.error(String.valueOf(err)))
.subscribe();
// (8)
conn.outbound()
.sendObject(new DatagramDnsQuery(null, new InetSocketAddress(SERVER_HOST, SERVER_PORT), 0x42b7)
.setRecord(
DnsSection.QUESTION,
new DefaultDnsQuestion("www.google.com", DnsRecordType.A)
))
.then().subscribe();
conn.outbound()
.sendObject(new DatagramDnsQuery(null, new InetSocketAddress(SERVER_HOST, SERVER_PORT), 0x42b8)
.setRecord(
DnsSection.QUESTION,
new DefaultDnsQuestion("projectreactor.io", DnsRecordType.A)
))
.then().subscribe();
// (9)
conn.onReadIdle(5000, () -> {
LOGGER.error("Request time out");
conn.disposeNow();
});
// (10)
conn.onDispose().block();
}
}
Explanation
(1): for decoding DNS reply
(2): destination IP and port number is necessary when create netty's io.netty.channel.Channel
(3): a debug ChannelHandler which prints all events related to Channel in DEBUG level will be added into the start of Channel's pipeline
(4): doOnChannelInit() accepts reactor.netty.ChannelPipelineConfigurer, for configuring the channel pipeline while initializing the channel.
This ChannelPipelineConfigurer is called by reactor.netty.transport.TransportConfig.TransportChannelInitializer#initChannel
@Override
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
if (config.metricsRecorder != null) {
//...
}
if (config.loggingHandler != null) {
pipeline.addFirst(NettyPipeline.LoggingHandler, config.loggingHandler);
}
ChannelOperations.addReactiveBridge(channel, config.channelOperationsProvider(), connectionObserver);
config.defaultOnChannelInit()
.then(config.doOnChannelInit)
.onChannelInit(connectionObserver, channel, remoteAddress); // <- HERE
pipeline.remove(this);
if (log.isDebugEnabled()) {
log.debug(format(channel, "Initialized pipeline {}"), pipeline.toString());
}
}
To obtain reactor.netty.Connection instance from Netty's Channel instance, the following static method can be used.
Connection c = Connection.from(channel);
At the beginning of function executing, pipeline contains
pipeline=DefaultChannelPipeline{(reactor.left.loggingHandler = reactor.netty.transport.logging.ReactorNettyLoggingHandler), (TransportConfig$TransportChannelInitializer#0 = reactor.netty.transport.TransportConfig$TransportChannelInitializer), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
At the end of function executing, pipeline contains
pipeline=DefaultChannelPipeline{(reactor.left.loggingHandler = reactor.netty.transport.logging.ReactorNettyLoggingHandler), (TransportConfig$TransportChannelInitializer#0 = reactor.netty.transport.TransportConfig$TransportChannelInitializer), (DatagramDnsQueryEncoder = io.netty.handler.codec.dns.DatagramDnsQueryEncoder), (DatagramDnsResponseDecoder = io.netty.handler.codec.dns.DatagramDnsResponseDecoder), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
reactor.netty.channel.ChannelOperationsHandler is the bridge connect netty and Reactor Netty library. It must at the end of pipeline.
DatagramDnsQueryEncoder and DatagramDnsResponseDecoder are netty's build-in ChannelHandler, which response for encoding/decoding raw ByteBuf from/to DNS instances.
(5): doOnConnected() is another way for configuring the channel pipeline. It do the job exactly the same as (4), but it run on Channel's channelConnected event.
(6): block, perform actual establish connection (but there is no need for establish connection of UDP traffic), and return when connection is setup.
(7): define the action on packet recevied. inbound().receiveObject() returns Flux. Remember to append subscribe() at the end.
(8): there is 2 lines, one for one request sent. then() will returns a Mono which will have complete signal when packet is sent. Also it must be subscribed.
(9): terminate connection if do not receive packet in 5 seconds
(10): get a Mono which will have complete signal when connection is shutdown successfully. block() is blocking the main thread. Main thread must end at last.