Cheap MapReduce in Go

Here at Malwarebytes we have to deal with a huge amount of data generated by many different systems, malware research teams, telemetry, etc. That data can span for several Terabytes on a daily basis.

We have been using Amazon’s Elastic MapReduce (EMR) in many different occasions, but we are always looking to simplify our systems wherever we can. Before I start another flame-war about this topic and why we decided to create a MapReduce job that runs in a single machine as oppose to a cluster of EC2 instances, let me explain. When creating complex systems that leverages a cluster of machines, it comes with a lot of extra costs from DevOps, IT, deployments and many other things that now we need to mantain and keep up-to-date. There are cases where we would like to churn over tons of data, but we don’t need to be super fast, or leverage a huge complex infrastructure, therefore reducing lots of the costs and maintenance associated with it.

Other larger MapReduce jobs still runs within EMR, since we need more scalability and faster turn around times, and wouldn’t be appropriate to be executed in a single machine.

The Goal

We have a bunch of different anonymous telemetry systems that collects tons of data every second, and some of our executives wanted to have some data aggregated into CSV files that could be loaded into Excel and a couple of other analytics systems to generate custom Pivot Tables, so we can better understand some of the usage patterns and dig information from our telemetry, without us needing to write complicated reporting systems that would like change a lot.

Below is a stripped down version of our telemetry data so you guys can better understand what we were trying to accomplish.

{
    "Request": {
        "time": "2015-07-13 18:37:00",
        "processedTime": "2015-07-13 18:37:14",
        "uuid": "56ca2dbffc5f451285bade8e8ffef12c",
        "jobId": "ffffffc8f9af58389681e4a9749a4e6c",
        "Sender": "scanner",
        "Trigger": "update"
    },
    "App": {
        "Program": "app",
        "Build": "consumer",
        "License": "licensed",
        "Version": "2.1.8"
    },
    "Connection": {
        "Type": "broadband",
        "ISP": "Telecom Italia"
    },
    "Region": {
        "Continent": "EU",
        "Country": "IT",
        "Region": "08",
        "City": "Genoa"
    },
    "Client": {
        "OsVersion": "Windows 7 Service Pack 1",
        "Language": "it",
        "Architecture": "x64"
    }
}

As you can see, we have some simple telemetry data that our collector machines generate constantly from a few different systems. These JSON records gets aggregated and saved into thousands of different files into batches that can contain hundreds of thousands of lines each.

Each file contains some header comments with information from the collector and some metadata about this. Here is an example of one of these files that we need to process.

#  Type:				collector
#  Queue:				client
#  Processed At:		2015-07-13 18:37:14
#  Format:				json
#  Program Version:		2015-04-15
#  EC2 Instance Id:		i-d561a6fa
{ "Request": { "time": "2015-07-13 18:37:00", "processedTime": "2015-07-13 18:37:14", ...
{ "Request": { "time": "2015-07-13 18:37:00", "processedTime": "2015-07-13 18:37:14", ...
{ "Request": { "time": "2015-07-13 18:37:00", "processedTime": "2015-07-13 18:37:14", ...
{ "Request": { "time": "2015-07-13 18:37:00", "processedTime": "2015-07-13 18:37:14", ...

For this particular job requirement, we needed to aggregate counts of the unique combination that some of these fields have, in order to generate proper data for our Pivot Tables. Therefore this was a job for a MapReduce implementation, where we would aggregate the unique occurrences of each permutation of the data and then reduce the counts into the desired aggregation.

We wanted to ignore some fields in the JSON record, so we come up with a Telemetry structure that would map the fields that we would like to uniquely aggregate from. Here is what we came up with:

type Telemetry struct {
	Request struct {
    	Sender  string `json:"Sender,omitempty"`
    	Trigger string `json:"Trigger,omitempty"`
    } `json:"Request,omitempty"`

	App struct {
    	Program  string `json:"Program,omitempty"`
    	Build    string `json:"Build,omitempty"`
    	License  string `json:"License,omitempty"`
    	Version  string `json:"Version,omitempty"`
    } `json:"App,omitempty"`

	Connection struct {
    	Type string `json:"Type,omitempty"`
    } `json:"Connection,omitempty"`

	Region struct {
    	Continent string `json:"Continent,omitempty"`
    	Country   string `json:"Country,omitempty"`
    } `json:"Region,omitempty"`

	Client struct {
    	OsVersion    string `json:"OsVersion,omitempty"`
    	Language     string `json:"Language,omitempty"`
    	Architecture string `json:"Architecture,omitempty"`
    } `json:"Client,omitempty"`
}

So any permutation occurrence of any of these fields above, would generate a single line, with the correspondent occurrence count in the final output of our MapReduce.

Now that we have a better understanding on what were trying to accomplish, let’s look how we did implement the MapReduce for this.

Enumerating Files for the Job

In order to find files that we need to feed into our MapReduce engine, we would need to create a method to enumerate all the files in directory, and potentially sub-directories. For that, we will leverage the Walk() method inside the filepath package that is part of the standard library.

This method takes a walk function that has the following method signature:

type WalkFunc func(path string, info os.FileInfo, err error) error

We have implemented the enumeration function like this:

func enumerateFiles(dirname string) chan interface{} {
	output := make(chan interface{})
	go func() {
		filepath.Walk(dirname, func(path string, f os.FileInfo, err error) error {
			if !f.IsDir() {
				output <- path
			}
			return nil
		})
		close(output)
	}()
	return output
}

This function will first create a channel that will receive each file that is encountered by the Walk function, which will be used later in the our mapper function.

Generating Tasks for our Mapper

Another method that we need to write is one that will parse a single task file and enumerate all JSON records. Remember, we saved each JSON record as a separate individual line in our task file. We also need to account for the meta headers and ignore those.

func enumerateJSON(filename string) chan string {
	output := make(chan string)
	go func() {
		file, err := os.Open(filename)
		if err != nil {
			return
		}
		defer file.Close()
		reader := bufio.NewReader(file)
		for {
			line, err := reader.ReadString('\n')
			if err == io.EOF {
				break
			}

			// ignore any meta comments on top of JSON file
			if strings.HasPrefix(line, "#") == true {
				continue
			}

			// add each json line to our enumeration channel
			output <- line
		}
		close(output)
	}()
	return output
}

Defining our interface

For our MapReduce implementation we would need to define our collector type and a few function types that we will use later in the process. Here is what we came up with:

// MapperCollector is a channel that collects the output from mapper tasks
type MapperCollector chan chan interface{}

// MapperFunc is a function that performs the mapping part of the MapReduce job
type MapperFunc func(interface{}, chan interface{})

// ReducerFunc is a function that performs the reduce part of the MapReduce job
type ReducerFunc func(chan interface{}, chan interface{})

As you can see, we are leverage Go channels a lot here, and this will be the key to bridge all this together.

The Mapper

Onto the mapper function. The whole idea on this mapper implementation is to parse a single file and go over each JSON record that were enumerated, decoding the JSON content into our Telemetry structure and accumulate counts for each dimension (unique permutation of the data).

func mapper(filename interface{}, output chan interface{}) {
	results := map[Telemetry]int{}

    // start the enumeration of each JSON lines in the file
	for line := range enumerateJSON(filename.(string)) {

		// decode the telemetry JSON line
		dec := json.NewDecoder(strings.NewReader(line))
		var telemetry Telemetry

		// if line cannot be JSON decoded then skip to next one
		if err := dec.Decode(&telemetry); err == io.EOF {
			continue
		} else if err != nil {
			continue
		}

		// stores Telemetry structure in the mapper results dictionary
		previousCount, exists := results[telemetry]
		if !exists {
			results[telemetry] = 1
		} else {
			results[telemetry] = previousCount + 1
		}
	}

	output <- results
}

The trick on this mapper function lies into the way we define our Map to accumulate unique data. We defined a map in which the Key in our Telemetry structure as below:

results := map[Telemetry]int{}

Remember, in Go map keys may be of any type that is comparable. The language spec defines this precisely, but in short, comparable types are boolean, numeric, string, pointer, channel, and interface types, and structs or arrays that contain only those types. Notably absent from the list are slices, maps, and functions; these types cannot be compared using ==, and may not be used as map keys. It’s obvious that strings, ints, and other basic types should be available as map keys, but perhaps unexpected are struct keys. Struct can be used to key data by multiple dimensions.

The Reducer

Now for the reducer part of our job, we would simply need to aggregate the different Telemetry dimensions that were generated by all the different mappers that were ran in parallel.

func reducer(input chan interface{}, output chan interface{}) {
	results := map[Telemetry]int{}
	for matches := range input {
		for key, value := range matches.(map[Telemetry]int) {
			_, exists := results[key]
			if !exists {
				results[key] = value
			} else {
				results[key] = results[key] + value
			}
		}
	}
	output <- results
}

Dispatching Tasks

In this MapReduce implementation, were are leveraging channels for the different inputs and outputs from file enumeration, to Mappers and finally Reducers. We need to create some dispatcher functions to bridge all this together in invoke the next step in each case.

func mapperDispatcher(mapper MapperFunc, input chan interface{}, collector MapperCollector) {
	for item := range input {
		taskOutput := make(chan interface{})
		go mapper(item, taskOutput)
		collector <- taskOutput
	}
	close(collector)
}

The mapperDispatcher function is responsible to listen on the input channel that receives each filename to be processed and invoke a mapper for each file, pushing the output of the job into a MapperCollector, that would be used in the next step.

func reducerDispatcher(collector MapperCollector, reducerInput chan interface{}) {
	for output := range collector {
		reducerInput <- <-output
	}
	close(reducerInput)
}

The reducerDispatcher function is responsible to listen on the collector channel and push each item as the input for the Reducer task.

Putting all together in a MapReduce method

Now that we have all the pieces of the puzzle, it is time for us to put all together into a MapReduce function.

const (
	MaxWorkers = 10
)

func mapReduce(mapper MapperFunc, reducer ReducerFunc, input chan interface{}) interface{} {

	reducerInput := make(chan interface{})
	reducerOutput := make(chan interface{})
	mapperCollector := make(MapperCollector, MaxWorkers)

	go reducer(reducerInput, reducerOutput)
	go reducerDispatcher(mapperCollector, reducerInput)
	go mapperDispatcher(mapper, input, mapperCollector)

	return <-reducerOutput
}

As you can observe, we are creating all the required channels that would be the conduit and bridge of this entire operation.

First, we spawn a go routine that will be responsible for executing the Reducer task, listening for the input channels to operate on the data and outputing it’s task into the output channel when everything is done. Secondly, in order for the whole system to work, we need to start the dispatcher go routines that will bridge all this together invoking the next steps. The mapperDispatcher is responsible to invoke the mapper function that will trigger the whole MapReduce calculation.

We are limiting the number of concurrent mappers to 10 in this case, but we could control the amount of concurrency of over how many mappers are simultaneously opening the tasks files and aggregating data.

Finally, we have written our main() function like this:

import (
	"bufio"
	"encoding/csv"
	"encoding/json"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"runtime"
	"strconv"
	"strings"
)

...

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())
	fmt.Println("Processing. Please wait....")

    // start the enumeration of files to be processed into a channel
	input := enumerateFiles(".")

    // this will start the map reduce work
	results := mapReduce(mapper, reducer, input)

	// open output file
	f, err := os.Create("telemetry.csv")
	if err != nil {
		panic(err)
	}
	defer f.Close()

	// make a write buffer
	writer := csv.NewWriter(f)

	for telemetry, value := range results.(map[Telemetry]int) {

		var record []string

		record = append(record, telemetry.Request.Sender)
		record = append(record, telemetry.Request.Trigger)
		record = append(record, telemetry.App.Program)
		record = append(record, telemetry.App.Build)
		record = append(record, telemetry.App.License)
		record = append(record, telemetry.App.Version)
		record = append(record, telemetry.Connection.Type)
		record = append(record, telemetry.Region.Continent)
		record = append(record, telemetry.Region.Country)
		record = append(record, telemetry.Client.OsVersion)
		record = append(record, telemetry.Client.Language)
		record = append(record, telemetry.Client.Architecture)

        // The last field of the CSV line is the aggregate count for each occurrence
		record = append(record, strconv.Itoa(value))

		writer.Write(record)
	}

	writer.Flush()

	fmt.Println("Done!")
}

The first step is to kick off the enumerateFiles(...) function to start enumerating files to be processed that will be pushed into a input channel. Then we invoke the mapReduce(...) method that will perform the entire Job returning an array of the end results.

As a final step, we write the entire MapReduce results into a CSV file, printing each Telemetry dimension and it’s respective aggregate count in each line.

Conclusion

Sometimes you don’t need overly complex infrastructures or systems to do a job well. In this case, we were running these exact same aggregations over close to 20 EMR instances that would take a few minutes to execute the entire MapReduce job over hundreds of Gigabytes of data each day.

When we decided to take a look at this problem again, we rewrote this task using Go, and we now simply run this on a single 8-core machine and the whole daily execution takes about 10 minutes. We cut a lot of the costs associated with maintaining and running these EMR systems and we just schedule this Go app to run once a day over our daily dataset.

You can find the entire code here:
https://gist.github.com/mcastilho/e051898d129b44e2f502