1)使用本方法计算F*B,其中F是1000*1000的矩阵,B是1000*20000的矩阵,使用三个节点的集群,每个节点一个CPU核(集群装在虚拟机里, 宿主机只有4个CPU核),每个节点配置一个map槽,一个reduce槽,完成矩阵运算时间为5mins。 2)源码如下:
1 /** 2 * Created with IntelliJ IDEA. 3 * User: hadoop 4 * Date: 16-3-14 5 * Time: 下午3:13 6 * To change this template use File | Settings | File Templates. 7 */ 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FileSystem; 10 import java.io.IOException; 11 import java.lang.reflect.Array; 12 import java.net.URI; 13 import org.apache.hadoop.fs.Path; 14 import org.apache.hadoop.io.*; 15 import org.apache.hadoop.io.DoubleWritable; 16 import org.apache.hadoop.io.Writable; 17 import org.apache.hadoop.mapreduce.InputSplit; 18 import org.apache.hadoop.mapreduce.Job; 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 21 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 22 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 23 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 24 import org.apache.hadoop.mapreduce.Reducer; 25 import org.apache.hadoop.mapreduce.Mapper; 26 import org.apache.hadoop.filecache.DistributedCache; 27 import org.apache.hadoop.util.ReflectionUtils; 28 29 public class MutiDoubleInputMatrixProduct { 30 31 public static void initDoubleArrayWritable(int length,DoubleWritable[] doubleArrayWritable){ 32 for (int i=0;i{ 38 public DoubleArrayWritable map_value=new DoubleArrayWritable(); 39 public double[][] leftMatrix=null;/******************************************/ 40 //public Object obValue=null; 41 public DoubleWritable[] arraySum=null; 42 public DoubleWritable[] tempColumnArrayDoubleWritable=null; 43 public DoubleWritable[] tempRowArrayDoubleWritable=null; 44 public double sum=0; 45 public double uValue; 46 public int leftMatrixRowNum; 47 public int leftMatrixColumnNum; 48 public void setup(Context context) throws IOException { 49 Configuration conf=context.getConfiguration(); 50 leftMatrixRowNum=conf.getInt("leftMatrixRowNum",10); 51 leftMatrixColumnNum=conf.getInt("leftMatrixColumnNum",10); 52 leftMatrix=new double[leftMatrixRowNum][leftMatrixColumnNum]; 53 uValue=(double)(context.getConfiguration().getFloat("u",1.0f)); 54 tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum]; 55 initDoubleArrayWritable(leftMatrixColumnNum,tempRowArrayDoubleWritable); 56 tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum]; 57 initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable); 58 System.out.println("map setup() start!"); 59 //URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration()); 60 Path[] cacheFiles=DistributedCache.getLocalCacheFiles(conf); 61 String localCacheFile="file://"+cacheFiles[0].toString(); 62 //URI[] cacheFiles=DistributedCache.getCacheFiles(conf); 63 //DistributedCache. 64 System.out.println("local path is:"+cacheFiles[0].toString()); 65 // URI[] cacheFiles=DistributedCache.getCacheFiles(context.getConfiguration()); 66 FileSystem fs =FileSystem.get(URI.create(localCacheFile), conf); 67 SequenceFile.Reader reader=null; 68 reader=new SequenceFile.Reader(fs,new Path(localCacheFile),conf); 69 IntWritable key= (IntWritable)ReflectionUtils.newInstance(reader.getKeyClass(),conf); 70 DoubleArrayWritable value= (DoubleArrayWritable)ReflectionUtils.newInstance(reader.getValueClass(),conf); 71 //int valueLength=0; 72 int rowIndex=0; 73 int index; 74 while (reader.next(key,value)){ 75 index=-1; 76 for (Writable val:value.get()){ 77 tempRowArrayDoubleWritable[++index].set(((DoubleWritable)val).get()); 78 } 79 //obValue=value.toArray(); 80 rowIndex=key.get(); 81 leftMatrix[rowIndex]=new double[leftMatrixColumnNum]; 82 //this.leftMatrix=new double[valueLength][Integer.parseInt(context.getConfiguration().get("leftMatrixColumnNum"))]; 83 for (int i=0;i {122 public DoubleWritable[] sum=null;123 // public Object obValue=null;124 public DoubleArrayWritable valueArrayWritable=new DoubleArrayWritable();125 public DoubleWritable[] tempColumnArrayDoubleWritable=null;126 // public DoubleWritable[] tempRowArrayDoubleWritable=null;127 //private int leftMatrixColumnNum;128 private int leftMatrixRowNum;129 130 public void setup(Context context){131 //leftMatrixColumnNum=context.getConfiguration().getInt("leftMatrixColumnNum",100);132 leftMatrixRowNum=context.getConfiguration().getInt("leftMatrixRowNum",100);133 sum=new DoubleWritable[leftMatrixRowNum];134 initDoubleArrayWritable(leftMatrixRowNum,sum);135 //tempRowArrayDoubleWritable=new DoubleWritable[leftMatrixColumnNum];136 tempColumnArrayDoubleWritable=new DoubleWritable[leftMatrixRowNum];137 initDoubleArrayWritable(leftMatrixRowNum,tempColumnArrayDoubleWritable);138 }139 140 public void reduce(IntWritable key,Iterable value,Context context) throws IOException, InterruptedException {141 //int valueLength=0;142 for(DoubleArrayWritable doubleValue:value){143 int index=-1;144 for (Writable val:doubleValue.get()){145 tempColumnArrayDoubleWritable[++index].set(((DoubleWritable)val).get());146 }147 //valueLength=Array.getLength(obValue);148 for (int i=0;i