Implement MapReduce in C

Implement MapReduce in C

I wrote a simple MapReduce implementation in C. The code is designed to be run on a single machine, but it can be extended to work in a distributed environment. This implementation is based on the skeleton from Prof. Remzi’s project.

I also have some notes from the MapReduce paper in [[ _notes/research_paper/data/MapReduce|MapReduce ]]. Reading the paper is recommended.

Overview

Motivation

The MapReduce programming model is designed to process large amounts of data in parallel across a distributed cluster. It simplifies the process of writing parallel programs by abstracting away the complexities of data distribution, fault tolerance, and synchronization.

The model consists of two main functions: the Map function, which processes input data and produces intermediate key-value pairs, and the Reduce function, which takes those intermediate key-value pairs and combines them to produce the final output.

For example, in a word count application, given this input text:

Hello world
Hello MapReduce

The Map function would produce the following intermediate key-value pairs:

("Hello", 1)
("world", 1)
("Hello", 1)
("MapReduce", 1)

The Reduce function would then take these intermediate key-value pairs and combine them to produce the final output:

("Hello", 2)
("world", 1)
("MapReduce", 1)

Key points:

  • Data Processing: MapReduce is widely used for processing large datasets in parallel, such as log analysis, data transformation, and ETL (Extract, Transform, Load) processes.
  • Batch Processing: It is suitable for batch processing tasks where data can be processed in chunks rather than in real-time.
  • Simplicity: The programming model abstracts away the complexities of parallelization, fault tolerance, and data distribution, making it easier for developers to write distributed applications.

Limitations:

  • Latency: MapReduce is not suitable for real-time processing or low-latency applications, as it typically involves batch processing.
  • Interactive Queries: It is not designed for interactive queries or ad-hoc data analysis, as it requires a complete job to be submitted and processed before results are available.
  • Extensibility: While MapReduce is powerful, it may not be the best fit for all types of data processing tasks, as chaining multiple MapReduce workflows can be challenging. Some applications may require more specialized frameworks or libraries.

How can the user use the MapReduce library?

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include "../mapreduce.h"

void Map(char *file_name) {
    FILE *fp = fopen(file_name, "r");
    assert(fp != NULL);

    char *line = NULL;
    size_t size = 0;
    while (getline(&line, &size, fp) != -1) {
        char *token, *dummy = line;
        while ((token = strsep(&dummy, " \t\n")) != NULL) {
            if (strlen(token) > 0) {
                MR_Emit(token, "1");
            }
        }
    }

    free(line);
    fclose(fp);
}

void Reduce(char *key, Getter get_next, int partition_number) {
    int count = 0;
    char *value;
    while ((value = get_next(key, partition_number)) != NULL) {
        count += atoi(value);
        free(value);
    }
    printf("%s: %d\n", key, count);
}

/**
 * ./wordcount file1.txt file2.txt
 * @param argc
 * @param argv
 * @return
 */
int main(int argc, char *argv[]) {
    MR_Run(argc, argv, Map, 20, Reduce, 20, MR_DefaultHashPartition);
}

Users will need to implement the Map and Reduce functions according to their specific use case. The Map function processes input data and emits intermediate key-value pairs, while the Reduce function takes those intermediate key-value pairs and combines them to produce the final result. In the example code, Map will read from the buffer, for each word, MR_Emit will be called to pass a key-value pair to our MapReduce library. When all Map tasks are done, our MapReduce will call Reduce to pass all those keys back to the user-defined Reduce function. The MapReduce library can be invoked from MR_Run. wordcount can be used from the CLI by passing files that we need to process. MR_Run will receive all arguments from the command line; it also receives pointers for Map and Reduce functions. For each Map and Reduce task, users can define how many threads are needed to use. There is also the default partition function, so that intermediate key-value pairs will be used for partitions. Users can provide their own partition hash function.

Implementation

You can find my implementation here.

“1. The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines.”

You can see from the wordcount example above, ./wordcount can be called from the CLI with file1.txt and file2.txt.

Those files will be split into N 16mb files.

Instead of running worker programs in a cluster of machines, I will create map and reduce workers using thread pool implementations. Those threadpools will pull tasks to execute users Map and Reduce functions.

void MR_Run(int argc, char *argv[], Mapper map, int num_mappers, Reducer reduce, int num_reducers,
            Partitioner partition) {
    if (argc < 2) {
        printf("Usage: ./mapreduce <input file> <input file> ...\n");
        return;
    }

    // partitioner is our partition function to split keys into files
    partitioner = partition;

    char **files = malloc(argc * sizeof(char *));
    for (int i = 0; i < argc; i++) {
        files[i] = argv[i + 1];
    }

    // vector s is our array of shards file
    vector *s = malloc(sizeof(vector));
    if (s == NULL) {
        perror("malloc");
        exit(EXIT_FAILURE);
    }
    init_vector(s);

    // We start sharding users' files here
    shard_file(s, files, argc - 1);

“2. One of the copies of the program is special – the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task.”

After we shard the input from users into M shards, we will assign each Map worker a shard file to process.

To simplify our implementation, the main thread will start threadpools and assign each thread their Map and Reduce tasks to process the sharded files above.

To adopt this implementation to be used in a more cluster-machines approach, workers can start as a different process, grpc can be used to implement communication between the master and workers.

// Initializes threadpools for map tasks
thread_pool *mapper_pool = malloc(sizeof(thread_pool));
if (mapper_pool == NULL) {
    perror("malloc");
    exit(EXIT_FAILURE);
}

// Initializes threadpools for reduce tasks
thread_pool *reducer_pool = malloc(sizeof(thread_pool));
if (reducer_pool == NULL) {
    perror("malloc");
    exit(EXIT_FAILURE);
}

thread_pool_init(mapper_pool, num_mappers, 10, 0);
thread_pool_init(reducer_pool, num_reducers, 10, 0);

“3. A worker who is assigned a map task reads the contents of the corresponding input split. It parses records out of the input data and passes each record to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory.”

Once a worker is assigned a Map task, it will read data from the assigned shards into memory and pass it to the user’s Map defined function.

Users will emit each record as a key-value pair, and will be buffered in our global KeyValueBuffer *buffers = NULL;


// add map task to thread pool for each shard file
for (int i = 0; i < s->size; i++) {
    map_worker_task *map_t = malloc(sizeof(map_worker_task));
    *map_t = (map_worker_task){s->arr[i], map};
    thread_pool_add(mapper_pool, map_worker, map_t);
}

thread_pool_wait(mapper_pool);
void map_worker(void *arg) {
    const map_worker_task t = *(map_worker_task *)arg;
    const size_t buffer_size = t.s.end - t.s.start + 1;
    char *buffer = malloc(buffer_size);
    if (buffer == NULL) {
        perror("malloc");
        exit(EXIT_FAILURE);
    }
    if (read_shard(t.s, buffer) == -1) {
        fprintf(stderr, "readshard");
        exit(EXIT_FAILURE);
    }

    // call user's defined map function
    t.map(buffer);
    free(buffer);
    free(arg);
}

“4. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.”

The emitted data will be called from the user’s code into our MR_Emit function. Here, the key will be used to determine which partition it will belong to.

Periodically, the buffer will be flushed into intermediate files or when the buffer is full.

The number of intermediates will be the same as the number of reducers to maximize our parallel data processing.

void MR_Emit(char *key, char *value) {
    const ul partition_no = partitioner(key, num_partitions);

    append_buffer(partition_no, key, value);
}
// append_buffer flushes buffer if it exceeds the time threshold or capacity
void append_buffer(const ul partition_number, char *key, char *value) {
    if (buffers == NULL || partition_number >= num_partitions) {
        fprintf(stderr, "[ERROR][mapreduce.c] append_buffer: invalid partition number %ld\n",
                partition_number);
        return;
    }

    //Each partition will have its buffer
    KeyValueBuffer *buffer = &buffers[partition_number];
    pthread_mutex_lock(&buffer->mutex);

    // Check if we need to flush due to capacity
    if (buffer->size == buffer->capacity) {
        flush_buffer(partition_number);
    }

    // Check if we need to flush due to time
    if (buffer->size > 0) {
        time_t current_time = time(NULL);
        if (difftime(current_time, buffer->last_flush) > flush_interval_sec) {
            flush_buffer(partition_number);
        }
    }

    char *new_key = strdup(key);
    char *new_value = strdup(value);

    if (new_key != NULL && new_value != NULL) {
        buffer->keys[buffer->size] = new_key;
        buffer->values[buffer->size] = new_value;
        buffer->size++;
    } else {
        free(new_key);
        free(new_value);
    }

    pthread_mutex_unlock(&buffer->mutex);
}

“5. When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reducer worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task.”

Remember that MapReduce is used in a distributed environment setting, the intermediate files from the Map tasks reside on the local machine of that mapper worker. For the Reducer tasks to read those files, remote procedure calls to read those files into memory. For simplicity, our Reducer worker can read directly from our local machine.

After the Reducer reads all intermediate data, will will sort and shuffle those keys together.

For example, If we have these intermediate files with key-value pairs:

("Hello", 1)
("world", 1)
("Hello", 1)
("MapReduce", 1)

After the reducer reads all intermediate files, it will sort and shuffle those keys together:

("Hello", [1, 1])
("MapReduce", [1])
("world", [1])

Then the reducer will pass each key and its values to the user-defined Reduce function to produce the final output.

void reduce_worker(void *arg) {
    reduce_worker_task *t = arg;

    // our key value store to combine all the values from all those keys together
    KeyValueStore *kvs = &store[t->partition_number];
    if (kvs == NULL) {
        perror("malloc");
        exit(EXIT_FAILURE);
    }


    // combine all those keys and values here
    process_key_values(kvs, t->partition_number);
    // sort keys for stable output
    kv_sort(kvs);

    // there's more
}

“6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.”

void reduce_worker(void *arg) {
    /// shuffle and sort
    for (ul i = 0; i < kvs->kv_count; i++) {
        t->reduce(kvs->kvs[i].key, get_next, t->partition_number);
    }

    free(t);
}

Conclusion

In this article, I’ve demonstrated how to implement a simplified version of MapReduce in C, based on Google’s MapReduce paper. While this implementation runs on a single machine using threads instead of distributed workers, it illustrates the key concepts of the MapReduce programming model: parallel data processing, key-value pair manipulation, and the separation of Map and Reduce phases.

The implementation includes core MapReduce features like:

  • File sharding and parallel processing
  • Intermediate key-value buffering and partitioning
  • Key sorting and shuffling
  • Thread pool management for Map and Reduce workers

Though simplified, this implementation can serve as a learning tool for understanding distributed data processing concepts and can be extended to work in a truly distributed environment by adding networking capabilities, fault tolerance, and proper distributed coordination.

Notes mentioning this note


Here are all the notes in this blog, along with their links, visualized as a graph.