MapReduce

MapReduce ist ein vom Unternehmen Google Inc. eingeführtes Programmiermodell für nebenläufige Berechnungen über (mehrere Petabyte[1]) große Datenmengen auf Computerclustern. MapReduce ist auch der Name einer Implementierung des Programmiermodells in Form einer Software-Bibliothek. Beim MapReduce-Verfahren werden die Daten in drei Phasen verarbeitet (Map, Shuffle, Reduce), von denen zwei durch den Anwender spezifiziert werden (Map und Reduce). Dadurch lassen sich Berechnungen parallelisieren und auf mehrere Rechner verteilen. Bei sehr großen Datenmengen ist die Parallelisierung unter Umständen schon deshalb erforderlich, weil die Datenmengen für einen einzelnen Prozess (und das ausführende Rechnersystem) zu groß sind.

Das Programmiermodell wurde durch die in der funktionalen Programmierung häufig verwendeten Funktionen map und reduce inspiriert,[2] auch wenn die Arbeitsweise der Bibliothek davon abweicht.[3] 2010 wurde für MapReduce ein US-Patent erteilt.[4] Der wesentliche Beitrag von MapReduce ist jedoch das zu Grunde liegende System, das die Berechnungen stark parallelisiert, die Reorganisation der Daten im Shuffle-Schritt optimiert, und automatisch auf Fehler im Cluster reagieren kann, wie beispielsweise den Ausfall von kompletten Knoten.

Arbeitsweise

Illustration des Datenflusses

Das obige Bild illustriert den Datenfluss bei der MapReduce-Berechnung.

  • Map-Phase:
    • Die Eingabedaten (D, A, T, A) werden auf eine Menge von Map-Prozessen verteilt (illustriert durch bunte Rechtecke), welche jeweils die vom Nutzer bereitgestellte Map-Funktion berechnen.
    • Die Map-Prozesse werden idealerweise parallel ausgeführt.
    • Jede dieser Map-Instanzen legt Zwischenergebnisse ab (illustriert durch pinkfarbene Sterne).
    • Von jeder Map-Instanz fließen Daten in eventuell verschiedene Zwischenergebnisspeicher.
  • Shuffle-Phase:
    • Die Zwischenergebnisse werden gemäß den Ausgabeschlüsseln, die von der Map-Funktion produziert wurden, neu verteilt, sodass alle Zwischenergebnisse mit demselben Schlüssel im nächsten Schritt auf demselben Computersystem verarbeitet werden.
  • Reduce-Phase:
    • Für jeden Satz an Zwischenergebnissen berechnet jeweils genau ein Reduce-Prozess (illustriert durch violette Rechtecke) die vom Nutzer bereitgestellte Reduce-Funktion und damit die Ausgabedaten (illustriert durch violette Kreise X, Y und Z).
    • Die Reduce-Prozesse werden idealerweise ebenfalls parallel ausgeführt.

Definition der MapReduce-Funktion

Die MapReduce-Bibliothek realisiert eine Funktion, welche aus einer Liste von Schlüssel-Wert-Paaren (Eingabeliste) eine neue Liste von Schlüssel-Wert-Paaren (Ausgabeliste) berechnet:

Erläuterung:

  • Die Mengen und enthalten Schlüssel, die Mengen und enthalten Werte.
  • Alle Schlüssel sind vom gleichen Typ, z. B. Strings.
  • Alle Schlüssel sind vom gleichen Typ, z. B. ganze Zahlen.
  • Alle Werte sind vom gleichen Typ, z. B. Atome.
  • Alle Werte sind vom gleichen Typ, z. B. Gleitkommazahlen.
  • Wenn und Mengen sind, so ist mit die Menge aller Paare gemeint, wobei und (kartesisches Produkt).
  • Wenn eine Menge ist, so ist mit die Menge aller endlichen Listen mit Elementen aus gemeint (angelehnt an den Kleene-Stern) – die Liste kann auch leer sein.

Definition der Map- und Reduce-Funktionen

Der Nutzer konfiguriert die Bibliothek über die Bereitstellung der beiden Funktionen Map und Reduce, die wie folgt definiert sind:

bzw.

Map-Phase

  • Map bildet ein Paar, bestehend aus einem Schlüssel und einem Wert , auf eine Liste von neuen Paaren ab, welche die Rolle von Zwischenergebnissen spielen. Die Werte sind vom gleichen Typ wie die Endergebnisse .
  • Bei einem neuen Paar verweist der von Map vergebene Schlüssel dabei auf eine Liste von Zwischenergebnissen, in welcher der von Map berechnete Wert gesammelt wird.
  • Die Bibliothek ruft für jedes Paar in der Eingabeliste die Funktion Map auf.
  • All diese Map-Berechnungen sind voneinander unabhängig, so dass man sie nebenläufig und verteilt auf einem Computercluster ausführen kann.

Shuffle-Phase

  • Bevor die Reduce-Phase starten kann, müssen die Ergebnisse der Map-Phase nach ihrem neuen Schlüssel in Listen gruppiert werden.
  • Wenn Map- und Reduce-Funktionen nebenläufig und verteilt ausgeführt werden, wird hierfür ein koordinierter Datenaustausch notwendig.
  • Die Performanz eines Map-Reduce-Systems hängt maßgeblich davon ab, wie effizient die Shuffle-Phase implementiert ist.
  • Der Nutzer wird in der Regel nur über die Gestaltung der Schlüssel auf die Shuffle-Phase Einfluss nehmen. Daher reicht es, sie einmalig gut zu optimieren, und zahlreiche Anwendungen können hiervon profitieren.

Reduce-Phase

  • Sind alle Map-Aufrufe erfolgt bzw. liegen alle Zwischenergebnisse in vor, so ruft die Bibliothek für jede Zwischenwertliste die Funktion Reduce auf, welche daraus eine Liste von Ergebniswerten berechnet, die von der Bibliothek in der Ausgabeliste als Paare gesammelt werden.
  • Auch die Aufrufe von Reduce können unabhängig auf verschiedene Prozesse im Computercluster verteilt werden.

Anmerkung: Diese Darstellung war etwas vereinfacht, denn in der Regel wird die Steuerung des MapReduce Verfahrens eine Anzahl von Reduce-Prozessen anstreben, so dass, wenn es für mehr als verschiedene Schlüssel Zwischenergebnisse gibt, Zwischenergebnisse mit verschiedenen Schlüsseln in einer gemeinsamen Liste gespeichert werden. Die entsprechenden Paare werden vor der Reduce-Berechnung nach Schlüsseln sortiert.

Combine-Phase

Optional kann vor der Shuffle-Phase noch eine Combine-Phase erfolgen. Diese hat in der Regel die gleiche Funktionalität wie die Reducefunktion, wird aber auf dem gleichen Knoten wie die Map-Phase ausgeführt. Dabei geht es darum, die Datenmenge, die in der Shuffle-Phase verarbeitet werden muss, und damit die Netzwerklast zu reduzieren.[2] Der Sinn der Combine-Phase erschließt sich sofort bei der Betrachtung des Wordcount-Beispiels: Auf Grund der unterschiedlichen Häufigkeit von Wörtern in natürlicher Sprache, würde bei einem deutschen Text beispielsweise sehr oft eine Ausgabe der Form ("und", 1) erzeugt (gleiches gilt für Artikel und Hilfsverben). Durch die Combine-Phase wird nun aus 100 Nachrichten der Form ("und", 1) lediglich eine Nachricht der Form ("und", 100). Dies kann die Netzwerkbelastung signifikant reduzieren, ist aber nicht in allen Anwendungsfällen möglich.

Beispiel: Verteilte Häufigkeitsanalyse mit MapReduce

Problem

Man möchte für umfangreiche Texte herausfinden, wie oft welche Wörter vorkommen.

Angabe der Map- und Reduce-Funktionen

 map(String name, String document):
  // name: document name ("key")
  // document: document contents ("value")
  for each word w in document:
    EmitIntermediate(w, 1);

 reduce(String word, Iterator partialCounts):
  // word: a word ("key")
  // partialCounts: a list of aggregated partial counts ("values")
  //     for 'word'
  int result = 0;
  for each v in partialCounts:
    result += v;
  Emit(word, result);

Map-Phase

  • Map bekommt jeweils einen Dokumentnamen name und ein Dokument document als Zeichenkette übergeben.
  • Map durchläuft das Dokument Wort für Wort.
  • Jedes Mal, wenn ein Wort w angetroffen wird, wandert eine 1 in die w-Zwischenergebnisliste (falls diese noch nicht existiert, wird sie angelegt).
  • Ist man mit allen Wörtern durch und hat der Text insgesamt n verschiedene Wörter, so endet die Map-Phase mit n Zwischenergebnislisten, jede für ein anderes Wort sammelnd, welche so viele 1-Einträge enthält, wie das entsprechende Wort im Dokument gefunden wurde.
  • Eventuell liefen viele Map-Instanzen gleichzeitig, falls der Bibliothek mehrere Wörter und Dokumente übergeben wurden.

Shuffle-Phase

  • Die Zwischenergebnislisten von mehreren Prozessen / Systemen für das gleiche Wort w werden zusammengefasst, und auf die Systeme für die Reducer verteilt.

Reduce-Phase

  • Reduce wird für das Wort word und die Zwischenergebnisliste partialCounts aufgerufen.
  • Reduce durchläuft die Zwischenergebnisliste und addiert alle gefundenen Zahlen auf.
  • Die Summe result wird an die Bibliothek zurückgegeben, sie enthält, wie oft das Wort word in allen Dokumenten gefunden wurde.
  • Die Zwischenergebnisse konnten parallel, durch gleichzeitige Reduce-Aufrufe, berechnet werden.

Insgesamt

  • Aus einer Liste von Dokumentnamen und Dokumenten wird eine Liste von Worten und Worthäufigkeiten generiert.

Beispielhafte Berechnung

Zum Beispiel wäre folgende Berechnung auf einem klassischen Text denkbar:

 Text = "Fest gemauert in der Erden
         Steht die Form, aus Lehm gebrannt.
         Heute muß die Glocke werden,
         Frisch, Gesellen! seid zur Hand.
         Von der Stirne heiß
         Rinnen muß der Schweiß,
         Soll das Werk den Meister loben,
         Doch der Segen kommt von oben."

Der Text wird in Sätze aufgeteilt, dabei bietet sich eine Normalisierung an, indem man alles klein schreibt und die Satzzeichen entfernt:

 Eingabeliste = [ (satz_1, "fest gemauert in der erden steht die form aus lehm gebrannt"),
                  (satz_2, "heute muß die glocke werden frisch gesellen seid zur hand"),
                  (satz_3, "von der stirne heiß rinnen muß der schweiß soll das werk den meister loben doch der segen kommt von oben") ]

Die Eingabeliste hat drei Paare als Elemente, wir können daher drei Map-Prozesse starten:

 P1 = Map(satz_1, "fest gemauert in der erden steht die form aus lehm gebrannt")
 P2 = Map(satz_2, "heute muß die glocke werden frisch gesellen seid zur hand")
 P3 = Map(satz_3, "von der stirne heiß rinnen muß der schweiß soll das werk den meister loben doch der segen kommt von oben")

Die Map-Aufrufe generieren diese Zwischenergebnispaare:

 P1 = [ ("fest", 1), ("gemauert", 1), ("in", 1), ("der", 1), ("erden", 1),
        ("steht", 1), ("die", 1), ("form", 1), ("aus", 1), ("lehm, 1),
        ("gebrannt", 1) ]
 P2 = [ ("heute", 1), ("muß", 1), ("die", 1), ("glocke", 1), ("werden", 1),
        ("frisch", 1), ("gesellen", 1), ("seid", 1), ("zur", 1), ("hand", 1) ]
 P3 = [ ("von", 1), ("der", 1), ("stirne", 1), ("heiß", 1), ("rinnen", 1),
        ("muß, 1), ("der", 1), ("schweiß", 1), ("soll", 1), ("das", 1),
        ("werk", 1), ("den", 1), ("meister", 1), ("loben", 1), ("doch", 1),
        ("der", 1), ("segen", 1), ("kommt", 1), ("von", 1), ("oben", 1) ]

Die Map-Prozesse liefern ihre Paare an die MapReduce-Bibliothek, welche diese in den Zwischenergebnislisten sammelt. Parallel könnte folgendes geschehen (Die gleiche Taktung der 3 Map-Prozesse ist unrealistisch, tatsächlich überlappen sich die Ausführungen. Die T_wort-Listen sind lokal pro Map-Prozess vorhanden und werden nicht zwischen den Schritten synchronisiert):

 1. Iteration:
    P1:  T_fest     = [ 1 ]     (neu)
    P2:  T_heute    = [ 1 ]     (neu)
    P3:  T_von      = [ 1 ]     (neu)

 2. Iteration:
    P1:  T_gemauert = [ 1 ]     (neu)
    P2:  T_muß      = [ 1 ]     (neu)
    P3:  T_der      = [ 1 ]     (neu)

 3. Iteration:
    P1:  T_in       = [ 1 ]     (neu)
    P2:  T_die      = [ 1 ]     (neu)
    P3:  T_stirne   = [ 1 ]     (neu)

Im vierten Schritt sieht man, dass Zwischenergebnislisten lokal für jeden Map-Prozess existieren und nicht global wiederverwendet werden können:

 4. Iteration:
    P1:  T_der      = [ 1 ]     (neu, der 1. Map-Prozess hat noch kein T_der, nur P3)
    P2:  T_glocke   = [ 1 ]     (neu)
    P3:  T_heiss    = [ 1 ]     (neu)

 5. Iteration
    P1:  T_erden    = [ 1 ]     (neu)
    P2:  T_werden   = [ 1 ]     (neu)
    P3:  T_rinnen   = [ 1 ]     (neu)

 6. Iteration
    P1:  T_steht    = [ 1 ]     (neu)
    P2:  T_frisch   = [ 1 ]     (neu)
    P3:  T_muß      = [ 1 ]     (neu, der 3. Map-Prozess hat noch kein T_muß, nur P2)

Im siebten Schritt kommt dann zum ersten Mal vor, dass ein weiteres Vorkommen in einer bereits angelegten Zwischenergebnisliste gesammelt wird:

 7. Schritt
    P1:  T_die      = [ 1 ]     (neu, der 1. Map-Prozess hat noch kein T_die)
    P2:  T_gesellen = [ 1 ]     (neu)
    P3:  T_der      = [ 1, 1 ]  (beim 3. Map-Prozess seit Iteration 2 vorhandene Liste verwenden)

usw.

Nach 21 Schritten sind alle drei Map-Prozesse mit ihrer Arbeit fertig, die Map-Phase endet und es beginnt die Reduce-Phase. Die Zwischenergebnislisten, die von verschiedenen Map-Prozessen zu demselben Wort angelegt wurden, werden zusammengefügt. Für jede der entstandenen Zwischenergebnislisten (hier sortiert aufgeführt)

                     reduce
   T_der      = [ 1 ] ++ [ 1, 1, 1 ] -> [ 4 ]
   T_die      = [ 1 ] ++ [ 1 ]       -> [ 2 ]
   T_fest     = [ 1 ]                -> [ 1 ]
   T_gemauert = [ 1 ]                -> [ 1 ]
   T_glocke   = [ 1 ]                -> [ 1 ]
   T_heiss    = [ 1 ]                -> [ 1 ]
   T_heute    = [ 1 ]                -> [ 1 ]
   T_in       = [ 1 ]                -> [ 1 ]
   T_muß      = [ 1 ] ++ [ 1 ]       -> [ 2 ]
   T_stirne   = [ 1 ]                -> [ 1 ]
   T_von      = [ 1, 1 ]             -> [ 2 ]
   .
   .
   . (für alle verschiedenen T-Listen)

können wir parallel einen Reduce-Prozess starten, der jeweils die Elemente aufzählt. Das Ergebnis von MapReduce sieht in etwa so aus:

 Ausgabeliste = [ ("fest", 1), ("heute", 1), ("von", 2), ("gemauert", 1),
                  ("muß", 2), ("der", 4), ("in", 1), ("die", 2), .. ]

Weitere Beispiele

Verfahren Map-Funktion Reduce-Funktion
Verteiltes grep Gibt die gefundene Zeile (hit) in einen Zwischenergebnisspeicher Reicht durch (Identische Abbildung, genauer: Projektion auf die 2. Komponente)
Umsatzauswertung Schreibt für jeden Beleg die Artikelnummer und den Betrag in einen Zwischenspeicher Addiert für jede unterschiedliche Artikelnummer die Beträge zusammen
Datenbanksystem Liest, filtert und verarbeitet Teilmengen von Datensätzen Führt Aggregatfunktionen aus

Verallgemeinerung

Nachdem das Verfahren 2014 bereits zehn Jahre alt ist, bietet Google seit kurzem eine Erweiterung Cloud Dataflow an, die größere Flexibilität bietet und das Cloud Computing noch stärker vorantreiben soll.

Siehe auch

  • Apache Hadoop – Java-Framework basierend auf dem MapReduce-Algorithmus
Commons: MapReduce – Sammlung von Bildern, Videos und Audiodateien

Fachartikel

Software

PlasmaFS. Plasma MapReduce wurde von Gerd Stolpmann (Darmstadt) entwickelt.

  • Splunk.com Data Management und Analyse Engine für Big Data, welche auf MapReduce basiert
  • Stratosphere PACT Programmiermodell: Erweiterung und Generalisierung des MapReduce Programmiermodells

Einzelnachweise

  1. Google spotlights data center inner workings. (Memento des Originals vom 19. Oktober 2013 im Internet Archive)  Info: Der Archivlink wurde automatisch eingesetzt und noch nicht geprüft. Bitte prüfe Original- und Archivlink gemäß Anleitung und entferne dann diesen Hinweis.@1@2Vorlage:Webachiv/IABot/news.cnet.com CNET News, Tech news blog
  2. a b Jeffrey Dean, Sanjay Ghemawat: MapReduce: Simplified Data Processing on Large Clusters. Google Labs: “Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages.”
  3. Ralf Lämmel (Microsoft): Google’s MapReduce Programming Model – Revisited. (PDF)
  4. Patent US7650331B1: System and method for efficient large-scale data processing. Angemeldet am 18. Juni 2004, veröffentlicht am 19. Januar 2010, Anmelder: Google Inc, Erfinder: Jeffrey Dean, Sanjay Ghemawat.