
需要导入的包
io.netty netty-all4.1.39.Final org.projectlombok lombok1.16.18 com.google.code.gson gson2.8.5 com.google.guava guava19.0 ch.qos.logback logback-classic1.2.3
@Slf4j
public class ThreadServerTest {
public static void main(String[] args) {
try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(8080));
//false为非阻塞模式
ssc.configureBlocking(false);
Selector master = Selector.open();
//注册到selector
ssc.register(master, SelectionKey.OP_ACCEPT);
//负载均衡
Worker[] workers=new Worker[3];
for (int i = 0; i <3 ; i++) {
workers[i]= new Worker("worker-"+i);
}
AtomicInteger ai=new AtomicInteger(1);
while(true){
master.select();
Set eventKeys = master.selectedKeys();
Iterator iterator = eventKeys.iterator();
while (iterator.hasNext()){
log.info("connect success");
SelectionKey event = iterator.next();
//不移除,下次还会获取相同的key,导致空指针错误
iterator.remove();
if(event.isAcceptable()){
log.info("connect create");
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.info("address:{}",sc.getRemoteAddress());
workers[ai.getAndIncrement()%3].exe(sc);
// sc.register(worker.worker,SelectionKey.OP_READ,buffer);
log.info("after connect:{}",sc.getRemoteAddress());
}
// else if(event.isReadable()){
// log.info("get into read");
// try {
// SocketChannel sc = (SocketChannel)event.channel();
// ByteBuffer buffer = (ByteBuffer)event.attachment();
// int read = sc.read(buffer);
// log.info("read count {}",read);
// if(read==-1){
// //event.cancel();
// }
//
// log.info("limit {},position {}",buffer.limit(),buffer.position());
// if(buffer.limit()==buffer.position()){
// ByteBuffer newBuffer=ByteBuffer.allocate(buffer.capacity()<<1);
// buffer.flip();
// newBuffer.put(buffer);
// event.attach(newBuffer);
// }
// buffer.flip();
// debugRead(buffer);
// }catch (Exception e){
// //event.cancel();
// System.out.println("connectint shut down");
// }
// }
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Slf4j
class Worker implements Runnable{
private Thread thread;
private volatile Boolean start=false;
private ConcurrentlinkedQueue queue;
private Selector worker;
private String name;
public Worker(String name)throws IOException {
this.name=name;
this.worker=Selector.open();
//保证顺序,可以不用,通过代码顺序来实现
queue=new ConcurrentlinkedQueue<>();
thread=new Thread(this,name);
}
public void exe(SocketChannel socketChannel)throws IOException{
// ByteBuffer buffer = ByteBuffer.allocate(16);
// socketChannel.register(worker,SelectionKey.OP_READ,buffer);
if(!start){
thread.start();
}
start=true;
queue.add(()->{
ByteBuffer buffer = ByteBuffer.allocate(16);
try {
socketChannel.register(worker,SelectionKey.OP_READ,buffer);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
worker.wakeup();
}
@Override
public void run() {
while (true){
try {
log.info("go into worker ");
worker.select();
Runnable task = queue.poll();
if(task!=null){
task.run();
}
log.info("worker running");
Set events = worker.selectedKeys();
Iterator iterator = events.iterator();
while (iterator.hasNext()){
SelectionKey event = iterator.next();
iterator.remove();
if(event.isReadable()){
SocketChannel channel = (SocketChannel)event.channel();
log.info("address:{}",channel.getRemoteAddress());
ByteBuffer buffer = (ByteBuffer)event.attachment();
channel.read(buffer);
buffer.flip();
debugRead(buffer);
}
}
}catch (IOException e){
e.printStackTrace();
}
}
}
}
worker.wakeup(); 这个方法是必须使用的,类似与lockSupport中的park和unpark的使用.如果不使用这个方法,进行阻塞唤醒,会造成,该线程处理后续其他机器的请求,不能注册到selector,需要等待
worker.select()的放行.
客户端代码,加断点调试
public static void main(String[] args) {
try (SocketChannel channel = SocketChannel.open()) {
channel.connect(new InetSocketAddress("localhost",8080));
// channel.write(Charset.defaultCharset().encode("hello4"));
System.out.println("waiting");//此行加断点
} catch (IOException e) {
e.printStackTrace();
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)