亚洲最大看欧美片,亚洲图揄拍自拍另类图片,欧美精品v国产精品v呦,日本在线精品视频免费

  • 站長(zhǎng)資訊網(wǎng)
    最全最豐富的資訊網(wǎng)站

    基于事件的NIO多線程服務(wù)器

    JDK1.4的NIO有效解決了原有流式IO存在的線程開(kāi)銷的問(wèn)題,在NIO中使用多線程,主要目的已不是為了應(yīng)對(duì)每個(gè)客戶端請(qǐng)求而分配獨(dú)立的服務(wù)線程,而是通過(guò)多線程充分使用用多個(gè)CPU的處理能力和處理中的等待時(shí)間,達(dá)到提高服務(wù)能力的目的。

     

       

      JDK1.4的NIO有效解決了原有流式IO存在的線程開(kāi)銷的問(wèn)題,在NIO中使用多線程,主要目的已不是為了應(yīng)對(duì)每個(gè)客戶端請(qǐng)求而分配獨(dú)立的服務(wù)線程,而是通過(guò)多線程充分使用用多個(gè)CPU的處理能力和處理中的等待時(shí)間,達(dá)到提高服務(wù)能力的目的。

      線程模型

      NIO的選擇器采用了多路復(fù)用(Multiplexing)技術(shù),可在一個(gè)選擇器上處理多個(gè)套接字,通過(guò)獲取讀寫通道來(lái)進(jìn)行IO操作。由于網(wǎng)絡(luò)帶寬等原因,在通道的讀、寫操作中是容易出現(xiàn)等待的,所以在讀、寫操作中引入多線程,對(duì)性能提高明顯,而且可以提高客戶端的感知服務(wù)質(zhì)量。所以本文的模型將主要通過(guò)使用讀、寫線程池來(lái)提高與客戶端的數(shù)據(jù)交換能力。

      基于事件的NIO多線程服務(wù)器

      同時(shí)整個(gè)服務(wù)端的流程處理,建立于事件機(jī)制上。在 [接受連接->讀->業(yè)務(wù)處理->寫 ->關(guān)閉連接
      ]這個(gè)過(guò)程中,觸發(fā)器將觸發(fā)相應(yīng)事件,由事件處理器對(duì)相應(yīng)事件分別響應(yīng),完成服務(wù)器端的業(yè)務(wù)處理。
      下面我們就來(lái)詳細(xì)看一下這個(gè)模型的各個(gè)組成部分。

      相關(guān)事件定義 在這個(gè)模型中,我們定義了一些基本的事件:

      (1)onAccept:

      當(dāng)服務(wù)端收到客戶端連接請(qǐng)求時(shí),觸發(fā)該事件。通過(guò)該事件我們可以知道有新的客戶端呼入。該事件可用來(lái)控制服務(wù)端的負(fù)載。例如,服務(wù)器可設(shè)定同時(shí)只為一定數(shù)量客戶端提供服務(wù),當(dāng)同時(shí)請(qǐng)求數(shù)超出數(shù)量時(shí),可在響應(yīng)該事件時(shí)直接拋出異常,以拒絕新的連接。

      (2)onAccepted:

      當(dāng)客戶端請(qǐng)求被服務(wù)器接受后觸發(fā)該事件。該事件表明一個(gè)新的客戶端與服務(wù)器正式建立連接。

      (3)onRead:

      當(dāng)客戶端發(fā)來(lái)數(shù)據(jù),并已被服務(wù)器控制線程正確讀取時(shí),觸發(fā)該事件。該事件通知各事件處理器可以對(duì)客戶端發(fā)來(lái)的數(shù)據(jù)進(jìn)行實(shí)際處理了。需要注意的是,在本模型中,客戶端的數(shù)據(jù)讀取是由控制線程交由讀線程完成的,事件處理器不需要在該事件中進(jìn)行專門的讀操作,而只需將控制線程傳來(lái)的數(shù)據(jù)進(jìn)行直接處理即可。

      (4)onWrite:

      當(dāng)客戶端可以開(kāi)始接受服務(wù)端發(fā)送數(shù)據(jù)時(shí)觸發(fā)該事件,通過(guò)該事件,我們可以向客戶端發(fā)送回應(yīng)數(shù)據(jù)。在本模型中,事件處理器只需要在該事件中設(shè)置 。

      (5)onClosed:

      當(dāng)客戶端與服務(wù)器斷開(kāi)連接時(shí)觸發(fā)該事件。

      (6)onError:

      當(dāng)客戶端與服務(wù)器從連接開(kāi)始到最后斷開(kāi)連接期間發(fā)生錯(cuò)誤時(shí)觸發(fā)該事件。通過(guò)該事件我們可以知道有什么錯(cuò)誤發(fā)生。

      事件回調(diào)機(jī)制的實(shí)現(xiàn)

      在這個(gè)模型中,事件采用廣播方式,也就是所有在冊(cè)的事件處理器都能獲得事件通知。這樣可以將不同性質(zhì)的業(yè)務(wù)處理,分別用不同的處理器實(shí)現(xiàn),使每個(gè)處理器的業(yè)務(wù)功能盡可能單一。

      如下圖:整個(gè)事件模型由監(jiān)聽(tīng)器、事件適配器、事件觸發(fā)器、事件處理器組成。

      (事件模型)

      基于事件的NIO多線程服務(wù)器

      1.監(jiān)聽(tīng)器(Serverlistener):

      這是一個(gè)事件接口,定義需監(jiān)聽(tīng)的服務(wù)器事件,如果您需要定義更多的事件,可在這里進(jìn)行擴(kuò)展。

                                                       
      1. public interface Serverlistener
      2. {
      3. public void onError(String error);
      4. public void onAccept() throws Exception;
      5. public void onAccepted(Request request) throws Exception;
      6. public void onRead(Request request) throws Exception;
      7. public void onWrite(Request request, Response response) throws Exception;
      8. public void onClosed(Request request) throws Exception;
      9. }
       

      2. 事件適配器(EventAdapter):

      對(duì)Serverlistener接口實(shí)現(xiàn)一個(gè)適配器(EventAdapter),這樣的好處是最終的事件處理器可以只處理所關(guān)心的事件。

                                                                                                                                       
      1. public abstract class EventAdapter
      2. implements Serverlistener
      3. {
      4. public EventAdapter() {}
      5. public void onError(String error) {}
      6. public void onAccept() throws Exception {}
      7. public void onAccepted(Request request) throws Exception {}
      8. public void onRead(Request request) throws Exception {}
      9. public void onWrite(Request request, Response response) throws Exception {}
      10. public void onClosed(Request request) throws Exception {}
      11. }
       

      3. 事件觸發(fā)器(Notifier):

      用于在適當(dāng)?shù)臅r(shí)候通過(guò)觸發(fā)服務(wù)器事件,通知在冊(cè)的事件處理器對(duì)事件做出響應(yīng)。觸發(fā)器以Singleton模式實(shí)現(xiàn),統(tǒng)一控制整個(gè)服務(wù)器端的事件,避免造成混亂。

                                                                                                                                                                                                                                       
      1. public class Notifier
      2. {
      3. private static Arraylist listeners = null;
      4. private static Notifier instance = null;
      5. private Notifier()
      6. {
      7. listeners = new Arraylist();
      8. }
      9. /**
      10. * 獲取事件觸發(fā)器
      11. * @return 返回事件觸發(fā)器
      12. */
      13. public static synchronized Notifier
      14. getNotifier()
      15. {
      16. if (instance == null)
      17. {
      18. instance = new Notifier();
      19. return instance;
      20. }
      21. else
      22. {
      23. return instance;
      24. }
      25. }
      26. /**
      27. * 添加事件監(jiān)聽(tīng)器
      28. * @param l 監(jiān)聽(tīng)器
      29. */
      30. public void addlistener(Serverlistener l)
      31. {
      32. synchronized (listeners)
      33. {
      34. if (!listeners.contains(l))
      35. {
      36. listeners.add(l);
      37. }
      38. }
      39. }
      40. public void fireOnAccept()
      41. throws Exception
      42. {
      43. for (int i = listeners.size() – 1;
      44. i >= 0; i–)
      45. {
      46. ( (Serverlistener) listeners.
      47. get(i)).onAccept();
      48. }
      49. }
      50. // other fire method
      51. }
       

      4. 事件處理器(Handler):

      繼承事件適配器,對(duì)感興趣的事件進(jìn)行響應(yīng)處理,實(shí)現(xiàn)業(yè)務(wù)處理。以下是一個(gè)簡(jiǎn)單的事件處理器實(shí)現(xiàn),它響應(yīng)onRead事件,在終端打印出從客戶端讀取的數(shù)據(jù)。

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
      1. public class ServerHandler
      2. extends EventAdapter
      3. {
      4. public ServerHandler() {}
      5. public void onRead(Request request)
      6. throws Exception
      7. {
      8. System.out.println(“Received: “ +
      9. new String(data));
      10. }
      11. }
       

      5. 事件處理器的注冊(cè)。

      為了能讓事件處理器獲得服務(wù)線程的事件通知,事件處理器需在觸發(fā)器中注冊(cè)。

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
      1. ServerHandler handler = new ServerHandler();
      2. Notifier.addlistener(handler);
       

      實(shí)現(xiàn)NIO多線程服務(wù)器

      NIO多線程服務(wù)器主要由主控服務(wù)線程、讀線程和寫線程組成。

      基于事件的NIO多線程服務(wù)器

      1. 主控服務(wù)線程(Server):

      主控線程將創(chuàng)建讀、寫線程池,實(shí)現(xiàn)監(jiān)聽(tīng)、接受客戶端請(qǐng)求,同時(shí)將讀、寫通道提交由相應(yīng)的讀線程(Reader)和寫服務(wù)線程(Writer),由讀寫線程分別完成對(duì)客戶端數(shù)據(jù)的讀取和對(duì)客戶端的回應(yīng)操作。

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
      1. public class Server implements Runnable
      2. {
      3. private static List wpool = new LinkedList();
      4. private static Selector selector;
      5. private ServerSocketChannel sschannel;
      6. private InetSocketAddress address;
      7. protected Notifier notifier;
      8. private int port;
      9. private static int MAX_THREADS = 4;
      10. /**
      11. * Creat the main thread
      12. * @param port server port
      13. * @throws java.lang.Exception
      14. */
      15. public Server(int port) throws Exception
      16. {
      17. this.port = port;
      18. // event dispatcher
      19. notifier = Notifier.getNotifier();
      20. // create the thread pool for reading and writing
      21. for (int i = 0; i < MAX_THREADS; i++)
      22. {
      23. Thread r = new Reader();
      24. Thread w = new Writer();
      25. r.start();
      26. w.start();
      27. }
      28. // create nonblocking socket
      29. selector = Selector.open();
      30. sschannel = ServerSocketChannel.open();
      31. sschannel.configureBlocking(false);
      32. address = new InetSocketAddress(port);
      33. ServerSocket ss = sschannel.socket();
      34. ss.bind(address);
      35. sschannel.register(selector, SelectionKey.OP_ACCEPT);
      36. }
      37. public void run()
      38. {
      39. System.out.println(“Server started “);
      40. System.out.println(“Server listening on port: “ + port);
      41. while (true)
      42. {
      43. try
      44. {
      45. int num = 0;
      46. num = selector.select();
      47. if (num > 0)
      48. {
      49. Set selectedKeys = selector.selectedKeys();
      50. Iterator it = selectedKeys.iterator();
      51. while (it.hasNext())
      52. {
      53. SelectionKey key = (SelectionKey) it.next();
      54. it.remove();
      55. if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT)
      56. {
      57. // Accept the new connection
      58. ServerSocketChannel ssc =
      59. (ServerSocketChannel) key.channel();
      60. notifier.fireOnAccept();
      61. SocketChannel sc = ssc.accept();
      62. sc.configureBlocking(false);
      63. Request request = new Request(sc);
      64. notifier.fireOnAccepted(request);
      65. sc.register(selector, SelectionKey.OP_READ,request);
      66. }
      67. else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ)
      68. {
      69. Reader.processRequest(key);
      70. key.cancel();
      71. }
      72. else if ((key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
      73. {
      74. Writer.processRequest(key);
      75. key.cancel();
      76. }
      77. }
      78. }
      79. //this selector’s wakeup method is invoked
      80. else
      81. {
      82. //register new channel for writing to selector
      83. addRegister();
      84. }
      85. }
      86. catch (Exception e)
      87. {
      88. notifier.fireOnError(“Error occured in Server: “
      89. + e.getMessage());
      90. continue;
      91. }
      92. }
      93. }
      94. private void addRegister()
      95. {
      96. synchronized (wpool)
      97. {
      98. while (!wpool.isEmpty())
      99. {
      100. SelectionKey key = (SelectionKey) wpool.remove(0);
      101. SocketChannel schannel = (SocketChannel) key.channel();
      102. try
      103. {
      104. schannel.register(selector, SelectionKey.OP_WRITE, key
      105. .attachment());
      106. }
      107. catch (Exception e)
      108. {
      109. try
      110. {
      111. schannel.finishConnect();
      112. schannel.close();
      113. schannel.socket().close();
      114. notifier.fireOnClosed((Request) key.attachment());
      115. }
      116. catch (Exception e1)
      117. {
      118. }
      119. notifier.fireOnError(“Error occured in addRegister: “
      120. + e.getMessage());
      121. }
      122. }
      123. }
      124. }
      125. public static void processWriteRequest(SelectionKey key)
      126. {
      127. synchronized (wpool)
      128. {
      129. wpool.add(wpool.size(), key);
      130. wpool.notifyAll();
      131. }
      132. selector.wakeup();
      133. }
      134. }
       

      2. 讀線程(Reader):

      使用線程池技術(shù),通過(guò)多個(gè)線程讀取客戶端數(shù)據(jù),以充分利用網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)臅r(shí)間,提高讀取效率。

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
      1. public class Reader extends Thread
      2. {
      3. public void run()
      4. {
      5. while (true)
      6. {
      7. try
      8. {
      9. SelectionKey key;
      10. synchronized (pool)
      11. {
      12. while (pool.isEmpty())
      13. {
      14. pool.wait();
      15. }
      16. key = (SelectionKey) pool.remove(0);
      17. }
      18. // 讀取客戶端數(shù)據(jù),并觸發(fā)onRead事件
      19. read(key);
      20. }
      21. catch (Exception e)
      22. {
      23. continue;
      24. }
      25. }
      26. }
      27. }
       

      3. 寫線程(Writer):

      和讀操作一樣,使用線程池,負(fù)責(zé)將服務(wù)器端的數(shù)據(jù)發(fā)送回客戶端。

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
      1. public final class Writer extends Thread
      2. {
      3. public void run()
      4. {
      5. while (true)
      6. {
      7. try
      8. {
      9. SelectionKey key;
      10. synchronized (pool)
      11. {
      12. while (pool.isEmpty())
      13. {
      14. pool.wait();
      15. }
      16. key = (SelectionKey) pool.remove(0);
      17. }
      18. write(key);
      19. }
      20. catch (Exception e)
      21. {
      22. continue;
      23. }
      24. }
      25. }
      26. }
       

      具體應(yīng)用

      NIO多線程模型的實(shí)現(xiàn)告一段落,現(xiàn)在我們可以暫且將NIO的各個(gè)API和煩瑣的調(diào)用方法拋于腦后,專心于我們的實(shí)際應(yīng)用中。

      我們用一個(gè)簡(jiǎn)單的TimeServer(時(shí)間查詢服務(wù)器)來(lái)看看該模型能帶來(lái)多么簡(jiǎn)潔的開(kāi)發(fā)方式。

      在這個(gè)TimeServer中,將提供兩種語(yǔ)言(中文、英文)的時(shí)間查詢服務(wù)。我們將讀取客戶端的查詢命令(GB/EN),并回應(yīng)相應(yīng)語(yǔ)言格式的當(dāng)前時(shí)間。在應(yīng)答客戶的請(qǐng)求的同時(shí),服務(wù)器將進(jìn)行日志記錄。做為示例,對(duì)日志記錄,我們只是簡(jiǎn)單地將客戶端的訪問(wèn)時(shí)間和IP地址輸出到服務(wù)器的終端上。

      1. 實(shí)現(xiàn)時(shí)間查詢服務(wù)的事件處理器(TimeHandler):

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
      1. public class TimeHandler extends EventAdapter
      2. {
      3. public TimeHandler() {}
      4. public void onWrite(Request request, Response response) throws Exception
      5. {
      6. String command = new String(request.getDataInput());
      7. String time = null;
      8. Date date = new Date();
      9. if (command.equals(“GB”))
      10. {
      11. DateFormat cnDate = DateFormat.getDateTimeInstance(DateFormat.FulL,
      12. DateFormat.FulL, Locale.CHINA);
      13. time = cnDate.format(date);
      14. }
      15. else
      16. {
      17. DateFormat enDate = DateFormat.getDateTimeInstance(DateFormat.FulL,
      18. DateFormat.FulL, Locale.US);
      19. time = enDate.format(date);
      20. }
      21. response.send(time.getBytes());
      22. }
      23. }
       

      2. 實(shí)現(xiàn)日志記錄服務(wù)的事件處理器(LogHandler):

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
      1. public class LogHandler extends EventAdapter
      2. {
      3. public LogHandler() {}
      4. public void onClosed(Request request)
      5. throws Exception
      6. {
      7. String log = new Date().toString() + ” from “ + request.getAddress().toString();
      8. System.out.println(log);
      9. }
      10. public void onError(String error)
      11. {
      12. System.out.println(“Error: “ + error);
      13. }
      14. }
       

      3. 啟動(dòng)程序:

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
      1. public class Start
      2. {
      3. public static void main(String[] args)
      4. {
      5. try
      6. {
      7. LogHandler loger = new LogHandler();
      8. TimeHandler timer = new TimeHandler();
      9. Notifier notifier = Notifier.getNotifier();
      10. notifier.addlistener(loger);
      11. notifier.addlistener(timer);
      12. System.out.println(“Server starting “);
      13. Server server = new Server(5100);
      14. Thread tServer = new Thread(server);
      15. tServer.start();
      16. }
      17. catch (Exception e)
      18. {
      19. System.out.println(“Server error: “ + e.getMessage());
      20. System.exit(-1);
      21. }
      22. }
      23. }
       

      小  結(jié)

      通過(guò)例子我們可以看到,基于事件回調(diào)的NIO多線程服務(wù)器模型,提供了清晰直觀的實(shí)現(xiàn)方式,可讓開(kāi)發(fā)者從NIO及多線程的技術(shù)細(xì)節(jié)中擺脫出來(lái),集中精力關(guān)注具體的業(yè)務(wù)實(shí)現(xiàn)。

      贊(0)
      分享到: 更多 (0)
      網(wǎng)站地圖   滬ICP備18035694號(hào)-2    滬公網(wǎng)安備31011702889846號(hào)