博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
多线程设计模式(三):Master-Worker模式
阅读量:6173 次
发布时间:2019-06-21

本文共 3250 字,大约阅读时间需要 10 分钟。

Master-Worker模式是常用的并行模式之一,它的核心思想是,系统有两个进程协作工作:Master进程,负责接收和分配任务;Worker进程,负责处理子任务。当Worker进程将子任务处理完成后,结果返回给Master进程,由Master进程做归纳汇总,最后得到最终的结果。

一、什么是Master-Worker模式:

该模式的结构图:

  结构图:

 

Worker:用于实际处理一个任务;

Master:任务的分配和最终结果的合成;

Main:启动程序,调度开启Master。

 

二、代码实现:

    下面的是一个简易的Master-Worker框架实现。

(1)Master部分:

[java]   
 
  1. package MasterWorker;  
  2.   
  3. import java.util.HashMap;  
  4. import java.util.Map;  
  5. import java.util.Queue;  
  6. import java.util.concurrent.ConcurrentHashMap;  
  7. import java.util.concurrent.ConcurrentLinkedQueue;  
  8.   
  9. public class Master {  
  10.   
  11.     //任务队列  
  12.     protected Queue<Object> workQueue= new ConcurrentLinkedQueue<Object>();  
  13.     //Worker进程队列  
  14.     protected Map<String ,Thread> threadMap= new HashMap<String ,Thread>();  
  15.     //子任务处理结果集  
  16.     protected Map<String ,Object> resultMap= new ConcurrentHashMap<String, Object>();  
  17.     //是否所有的子任务都结束了  
  18.     public boolean isComplete(){  
  19.         for(Map.Entry<String , Thread> entry:threadMap.entrySet()){  
  20.             if(entry.getValue().getState()!=Thread.State.TERMINATED){  
  21.                 return false;  
  22.             }  
  23.                   
  24.         }  
  25.         return true ;  
  26.     }  
  27.       
  28.     //Master的构造,需要一个Worker进程逻辑,和需要Worker进程数量  
  29.     public Master(Worker worker,int countWorker){  
  30.           
  31.         worker.setWorkQueue(workQueue);  
  32.         worker.setResultMap(resultMap);  
  33.         for(int i=0;i<countWorker;i++){  
  34.             threadMap.put(Integer.toString(i),  new Thread(worker, Integer.toString(i)));  
  35.         }  
  36.           
  37.     }  
  38.       
  39.     //提交一个任务  
  40.     public void submit(Object job){  
  41.         workQueue.add(job);  
  42.     }  
  43.       
  44.       
  45.     //返回子任务结果集  
  46.     public Map<String ,Object> getResultMap(){  
  47.         return resultMap;  
  48.     }  
  49.       
  50.       
  51.     //开始运行所有的Worker进程,进行处理  
  52.     public  void execute(){  
  53.          for(Map.Entry<String , Thread> entry:threadMap.entrySet()){  
  54.              entry.getValue().start();  
  55.                
  56.          }  
  57.     }  
  58.       
  59.       
  60. }  

(2)Worker进程实现:

 

[java]   
 
  1. package MasterWorker;  
  2.   
  3. import java.util.Map;  
  4. import java.util.Queue;  
  5.   
  6. public class Worker  implements Runnable{  
  7.   
  8.     //任务队列,用于取得子任务  
  9.     protected Queue<Object> workQueue;  
  10.     //子任务处理结果集  
  11.     protected Map<String ,Object> resultMap;  
  12.     public void setWorkQueue(Queue<Object> workQueue){  
  13.         this.workQueue= workQueue;  
  14.     }  
  15.       
  16.     public void setResultMap(Map<String ,Object> resultMap){  
  17.         this.resultMap=resultMap;  
  18.     }  
  19.     //子任务处理的逻辑,在子类中实现具体逻辑  
  20.     public Object handle(Object input){  
  21.         return input;  
  22.     }  
  23.       
  24.       
  25.     @Override  
  26.     public void run() {  
  27.           
  28.         while(true){  
  29.             //获取子任务  
  30.             Object input= workQueue.poll();  
  31.             if(input==null){  
  32.                 break;  
  33.             }  
  34.             //处理子任务  
  35.             Object re = handle(input);  
  36.             resultMap.put(Integer.toString(input.hashCode()), re);  
  37.         }  
  38.     }  
  39.   
  40. }  

 

 

 

 

(3)运用这个小框架计算1——100的立方和,PlusWorker的实现:

 

[java]   
 
  1. package MasterWorker;  
  2.   
  3. public class PlusWorker extends Worker {  
  4.   
  5.     @Override  
  6.     public Object handle(Object input) {  
  7.           
  8.         Integer i =(Integer)input;  
  9.         return i*i*i;  
  10.     }  
  11.   
  12.       
  13. }  

 

 

 

 

(4)进行计算的Main函数:

 

[java]   
 
  1. package MasterWorker;  
  2.   
  3. import java.util.Map;  
  4. import java.util.Set;  
  5.   
  6. public class Main {  
  7.   
  8.       
  9.     /** 
  10.      * @param args 
  11.      */  
  12.     public static void main(String[] args) {  
  13.         //固定使用5个Worker,并指定Worker  
  14.         Master m = new Master(new PlusWorker(), 5);  
  15.         //提交100个子任务  
  16.         for(int i=0;i<100;i++){  
  17.             m.submit(i);  
  18.         }  
  19.         //开始计算  
  20.         m.execute();  
  21.         int re= 0;  
  22.         //保存最终结算结果  
  23.         Map<String ,Object> resultMap =m.getResultMap();  
  24.           
  25.         //不需要等待所有Worker都执行完成,即可开始计算最终结果  
  26.         while(resultMap.size()>0 || !m.isComplete()){  
  27.             Set<String> keys = resultMap.keySet();  
  28.             String key =null;  
  29.             for(String k:keys){  
  30.                 key=k;  
  31.                 break;  
  32.             }  
  33.             Integer i =null;  
  34.             if(key!=null){  
  35.                 i=(Integer)resultMap.get(key);  
  36.             }  
  37.             if(i!=null){  
  38.                 //最终结果  
  39.                 re+=i;  
  40.             }  
  41.             if(key!=null){  
  42.                 //移除已经被计算过的项  
  43.                 resultMap.remove(key);  
  44.             }  
  45.               
  46.         }  
  47.           
  48.   
  49.     }  
  50.   
  51. }  

 

 

 

 

三、总结:

    Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。

转:http://blog.csdn.net/lmdcszh/article/details/39698189

你可能感兴趣的文章
rsync服务模式+客户端访问
查看>>
我的友情链接
查看>>
Web服务之LNMMP架构及动静分离实现
查看>>
Neutron - flat模式
查看>>
SQLite中使用全文搜索FTS
查看>>
分布式文件系统FastDFS如何做到高可用
查看>>
在MAC Pro上安装Windows8.1和2012R2
查看>>
iis6 php安装 (一)
查看>>
Eclipse的使用以及后续遇到的问题(持续更新)
查看>>
关于,在Mysql中,外键是否会影响性能的问题???
查看>>
利用javascript设置图片等比例缩小
查看>>
dedeCMS如何给频道页添加缩略图
查看>>
CoreSeek快速安装
查看>>
Linux 网络性能调试工具Netstat
查看>>
我的友情链接
查看>>
报表下载SSH
查看>>
我的友情链接
查看>>
Raid磁盘阵列真的是100%的安全吗?raid有哪些常见的故障?
查看>>
Raid5两块硬盘离线解决方案 -阵列数据恢复案例
查看>>
IBM AIX存储层结构介绍 / 常用命令整理
查看>>