Calculating moving averages with Hadoop and Map-Reduce


Please note that the article by Attain technologies is a direct copy of this article. This article, the one your are reading, is the original and was published on 29th August 2012. I did not give permission for htis and only found out by accident.

A moving average, also called a sliding mean or a rolling average, is a technique for smoothing out short term variation in data in order to reveal trends. It has uses in signal processing, data analysis and statistics. In engineering terms it is like a finite impulse response filter, while mathematically it is a convolution. For large sets of data this smoothing can take a long time and treating it like a big data problem may allow faster, possibly real time, smoothing of data.

The De Facto standard for processing Big Data is Hadoop, though other frameworks may possibly be superior in terms of development, maintenance time, and throughput. Some even claim to allow realtime processing of big data on a standard Laptop.

Hadoop is centred on the Map-Reduce paradigm, a paradigm that is bigger than Hadoop, and can to some extent be implemented with queue based thread communication. The code presented here is proof of concept Java code for Hadoop developed partly as a learning exercise and partly as a way to break out of the Big Data cage in which Map-Reduce currently resides. As such details necessary for production systems, such as the ability to choose the window length, requantise of the data or perform weighted and exponential averages have not been discussed. The code here will be updated in the light of user comments and future development.

The sliding mean

I first met the sliding mean in the context of analysis of telemetry data. The data was contaminated by noise from various sources: vibration, electrical noise and so on. One solution was to smooth the data. Since the time of events was important and the data might be subject to systematic biases, and the noise not, therefore, centred round a mean value, the smoothed data had to be “shifted” to compensate.

To be concrete, consider a sequence of a thousand noisy data points, which are integers between 0 and 128. A sliding mean with a sample window of 9 points ( The reason for 9 not ten is explained below) is calculated by taking the average of points 1 to 9 and placing it in location 1 of the smoothed data, then calculating the average of points 2 to 10 and putting it in location 2 and so on till the smoothed array is filled. If the data are not centred round a mean value the average lags the latest data point by half the window length.

Sometimes the time shift implied in the last paragraph is not wanted, in this case the central moving average is computed: in this case the smoothed sequence starts at location 5 and ends at location 995 with the end points normally discarded.

A hint of mathematics

The sliding mean is just a convolution [2]or loosely “smearing” of the time series with a square pulse, that is a function that is zero outside the sample window. As such one way to compute the sliding mean is to take the Fast Fourier Transform (FFT) of the data and multiply it point wise by the FFT of the square pulse ( or in the case of weighted means by the FFT of the weighting function). In computational terms the effort of calculating the FFT may outweigh the saving made by replacing division with multiplication.

Pursuing the notion of Sliding Mean as convolution leads into areas like Fractional Calculus [3] and is outwith the scope of this note. Using the convolution approach leads to the observation that the square pulse of the simple moving average has a Fourier transform that falls off rapidly at hight frequencies, so the smoothing eliminates high frequencies. The larger the sample window the lower the maximum frequency that is passed through. This is intuitively obvious but it is nice to have that confirmed by a more rigorous analysis

How the data arrives

There are basically two situations, the first is where the entire sequence is available, for example post test data from electromechanical systems, and the situation where data points arrive one by one, possibly at random times. Here it is assumed the data arrive at fixed times. An intermediate case is where the data is stored in a buffer with old data being removed as new data arrives. Here the smoothing must be fast enough to ensure all data is processed before new data arrives, even if this means the system is idle waiting for new data.

In order to develop the proof of concept rapidly test data was stored in a text file, one value to a line thus simulating the situation where data values arrive one by one. Because the data is not all available at once the standard technique of calculating a total once then moving forward by subtracting the oldest point and adding the newest point was not possible. Instead a technique essentially similar to the add() method of a circular list was developed for use in the Mapper class with the list being converted to a string and sent to the reducer with each new point that is added, after the list becomes full. Again as the code evolves smarter methods will be used and the code here updated.

Sample code

	public class HadoopMovingAverage {
// For production the windowlength  would be a commandline or other argument		
		 static double windowlength = 3.0;
		 static int thekey = (int)windowlength/2;
// used for handling the circular list. 
		 static boolean initialised=false;
// Sample window
		 static ArrayList<Double> window = new ArrayList<Double>() ;

// The Map method processes the data one point at a time and passes the circular list to the 
// reducer. 
	   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
	     private final static IntWritable one = new IntWritable(1);
	     private Text word = new Text();
	
	     public void map(LongWritable key,
	    		         Text value,
	    		         OutputCollector<Text,
	    		         Text> output, 
	    		         Reporter reporter
	    		         ) throws IOException 
	    		         {
	       double wlen = windowlength;
// creates windows of samples and sends them to the Reducer	       
	       partitionData(value, output, wlen);
   	       
	     }

// Create sample windows starting at each sata point and sends them to the reducer
	    
		private void partitionData(Text value,
				OutputCollector<Text, Text> output, double wlen)
				throws IOException {
			String line = value.toString();
// the division must be done this way in the mapper. 
			   Double ival = new Double(line)/wlen;
// Build initial sample window
			   if(window.size() < windowlength)
			      {
			        window.add(ival);
			      }
// emit first window
			  if(!initialised && window.size() == windowlength)
				  {
				  initialised = true;
				  emit(thekey,window,output);
				  thekey++;
				  return;
				  }
// Update and emit subsequent windows
			  if(initialised)
			      {
// remove oldest datum				  
			    	  window.remove(0);
// add new datum
			    	  window.add(ival);
			    	  emit(thekey,window,output);
					  thekey++;
			      }
		}
	   }

// Transform list to a string and send to reducer. Text to be replaced by ObjectWritable
// Problem: Hadoop apparently requires all output formats to be the same so
//			cannot make this output collector differ from the one the reducer uses.

	   public static void emit(int key, 
			                   ArrayList<Double> value,
			                   OutputCollector<Text,Text> output) throws IOException
	   {
		   Text tx = new Text();
		   tx.set(new Integer(key).toString());

		   String outstring = value.toString();
// remove the square brackets Java puts in
		   String tidied = outstring.substring(1,outstring.length()-1).trim();
		   
		   Text out = new Text();	   
		   out.set(tidied);
		   
		   output.collect(tx,out);		    
	   }
	
	   public static class Reduce extends MapReduceBase 
	   	implements Reducer<Text, Text, Text, Text> 
	   	{
	     public void reduce(Text key, 
	    		            Iterator<Text> values,
	    		            OutputCollector<Text,Text> output,
	    		            Reporter reporter
	    		            ) throws IOException 
		   {
		       	       
		       while (values.hasNext()) 
			       { 
				        computeAverage(key, values, output);
				        
				  }
		       	
		     }

// computes the average of each window and sends to ouptut collector.      
		private void computeAverage(Text key, Iterator<Text> values,
				OutputCollector<Text, Text> output)
				throws IOException {
			double sum = 0;
			String thevalue = values.next().toString();
			String[] thenumbers = thevalue.split(",");
			for( String temp: thenumbers)
			    {
			    // need to trim the string because the constructor does not trim.
			    Double ds = new Double(temp.trim());	
			    sum += ds;

			    }		       
      Text out = new Text();
      String outstring = Double.toString(sum);
      out.set(outstring);
      output.collect(key, out);
		}
	   }
	
	   public static void main(String[] args) throws Exception {
	     JobConf conf = new JobConf(WordCount.class);

	     conf.setJobName("wordcount");
	
	     conf.setOutputKeyClass(Text.class);
	     conf.setOutputValueClass(Text.class);
	
	     conf.setMapperClass(Map.class);
	     conf.setCombinerClass(Reduce.class);
	     conf.setReducerClass(Reduce.class);
	
	     conf.setInputFormat(TextInputFormat.class);
	     conf.setOutputFormat(TextOutputFormat.class);
	
//	     FileInputFormat.setInputPaths(conf, new Path(args[0]));
//	     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
	     
	     FileInputFormat.setInputPaths(conf, new Path("input/movingaverage.txt"));
	     FileOutputFormat.setOutputPath(conf, new Path("output/smoothed"));
	
	     JobClient.runJob(conf);
	   }
	}

Summary

The moving average is a commonly used technique for smoothing noisy data. Unlike for example, Kalman filtering, it is easy to understand and easy to implement. The case where data points arrive one by one is easily transformed to a map reduce program. Working proof of concept code has been presented to show that Map-Reduce and Hadoop can be used outside the big data domain.

Further Reading

More by this Author


Comments

No comments yet.

    Sign in or sign up and post using a HubPages Network account.

    0 of 8192 characters used
    Post Comment

    No HTML is allowed in comments, but URLs will be hyperlinked. Comments are not for promoting your articles or other sites.


    Click to Rate This Article
    working