Hadoop, mapreduce et l’apprentissage supervisé : KNN

L’idée principale derrière l’apparition de Hadoop est de stocker et de traiter une quantité massive de données qui peuvent être structurées, ayant une organisation ou un modèle prédéfini comme le cas de données relationnelles stockées dans une base de données, ou semi-structurées, ayant des propriétés organisationnelles facilitant leur analyse comme les données XML, ou non structurées n’ayant ni une organisation, ni un modèle prédéfini comme les fichiers textes ou les vidéos. La phase qui suit le stockage est d’analyser et d’explorer pour tirer de conclusion, trouver de pattern qui aide à prendre de décision ou bien faire de prédiction. Dans cet article, on va se servir de Hadoop et le paradigme de programmation mapreduce pour implémenter l’algorithme KNN.

Apprentissage supervisé : KNN

Avant se plonger dans le code, on commence par donner une définition à l’algorithme KNN ou k-nearest neighbors. Selon Wikipedia “on dispose d’une base de données d’apprentissage constituée de N couples « entrée-sortie ». Pour estimer la sortie associée à une nouvelle entrée x, la méthode des k plus proches voisins consiste à prendre en compte (de façon identique) les k échantillons d’apprentissage dont l’entrée est la plus proche de la nouvelle entrée x, selon une distance à définir. “ Brièvement, cet algorithme permet de décider à quel groupe appartient un nouvel élément. Par exemple, si on est en train de récolter trois catégories de fruits : orange, pomme rouge et pomme verte, alors à chaque fois, on récolte une nouvelle pomme verte, on va le rajouter au groupe de pommes vert et pareil pour chaque nouvelle pomme rouge ou une orange. Dans notre cas, on est basé sur le critère du couleur pour prendre déterminer la classe de la fruit à chaque fois.

KNN peut être résumé comme suit:

  • Calcule la distance entre le nouveau point de données avec les groupes de données
  • Pour calculer les mesures de distance telles que la distance euclidienne, la distance de Hamming ou la distance de Manhattan seront utilisées.
  • Le modèle sélectionne K entrées dans la base de données les plus proches du nouveau point de données.
  • Ensuite, il vote à la majorité, c’est-à-dire que la classe / étiquette la plus courante parmi ces K entrées sera la classe du nouveau point de données.

Dans cet article, on va appliquer cet algorithme sur le jeu de données Iris comprend 50 échantillons de chacune des trois espèces d’iris (Iris setosa, Iris virginica et Iris versicolor). Quatre caractéristiques ont été mesurées à partir de chaque échantillon : la longueur et la largeur des sépales et des pétales, en centimètres.

Notre tâche est de construire un modèle KNN qui classe les nouvelles espèces sur la base des mesures des sépales et des pétales.

On commence par créer un projet Maven dans IntelliJ IDEA ou Eclipse IDEA, en définissant les valeurs suivantes pour le projet:

  • GroupID: hadoop.mapreduce
  • ArtifactID: Knnalgorithm
  • Version: 1

Ensuite, on ouvre le fichier pom.xml, et on ajoute les dépendances suivantes pour Hadoop, HDFS et Map Reduce:

    <dependencies>
        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8.0_241</version>
            <scope>system</scope>
            <systemPath>C:/Java/jdk1.8.0_241/lib/tools.jar</systemPath>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.8.0</version>
        </dependency>
    </dependencies>

On crée la classe KnnMapper sous le package hadoop.mapreduce.knn contenant le code suivant:

package hadoop.mapreduce.knn;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;

public class KnnMapper extends Mapper<LongWritable, Text, Text, Text> {
    //Cette varaible représente le nombre des attributs sur lequels on va faire l'analyse
    public static int numoffeatures;

    //Le tableau feature représente les attributs feature[i]=0.5, i : le numero de l'attribut, 0.5: la valeur de l'attribut i
    //Aussi il stocke les valeurs du fichier d'input sur lequel on va appliquer l'algorithme KNN
    public static Float[] feature;

    //la list dist représente les résultats de calcul de distance entre les attributs
    public static ArrayList<String> dist=new ArrayList<String>();

    //Methode  pour calculer la distance euclidienne
    public static float euclideandist(Float[] test,int n) {
        float distance=0;
        for(int j=0; j<n; j++) {
            // feature [5.1 3.5 1.4 0.2]
            distance+=Math.pow((feature[j]-test[j]),2);
        }
        distance=(float)Math.sqrt(distance);
        return distance;
    }

    //Cette méthode permet de lire de paramètres à patir de l'objet context qui permet de personnaliser la logique de traitement
    public void setup(Context context) throws IOException, InterruptedException  {
        //Initialise la variable numeroffeatures qui représente le nombre des attributs sur lequels on va faire l'analyse
        numoffeatures=context.getConfiguration().getInt("numoffeatures",1);
        feature=new Float[numoffeatures];
        for(int j=0; j<numoffeatures; j++) {
            feature[j]=context.getConfiguration().getFloat("feature"+j,0);           
        }
    }


    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // la variable value représente le dataSet
        //On fait le split et on stoke le resultat dans le tableau input
        String[] input=value.toString().split(",");
        Float[] test=new Float[numoffeatures];
        for(int j=0; j<numoffeatures; j++) {
            test[j]=Float.parseFloat(input[j]);
        }

        //Cette varaible contient le nom de la classe
        String classlabel=input[numoffeatures].replace("\"","");

        // on va ajouter le resultat de la methode euclideandist contacte au nom de la classe à la list dist
        dist.add(String.valueOf(euclideandist(test,numoffeatures))+classlabel);
    }

    //Cette méthode permet de faire un clean up de ressources qui on  a utilisé en générale dans notre cas on va trier la liste dist
    public void cleanup(Context context) throws IOException, InterruptedException  {
        Collections.sort(dist);
        //Le tableau label contient le cinq premiers classes
        String[] label=new String[5];
        //La variable temp contient les cinq elements les plus proche au niveau de la liste dist, exemple 0.1222setosa
        String temp;
        //On va prendre le cin premiers les plus proches
        for(int j=0; j<5; j++)  {
            temp=dist.get(j);
            label[j]=String.valueOf(temp.replaceAll("[\\d.]",""));
            context.write(new Text("1"),new Text(label[j]));
        }
    }
}

La classe KnnMapper implémente la classe org.apache.hadoop.mapreduce.Mapper de Hadoop que l’on paramètre avec le type de la clé d’entrée (LongWritable), le type de la valeur d’entrée (Text), le type de la clé des sorties intermédiaires (Text) et enfin le type de la valeur des sorties intermédiaires ( Text). La classe KnnMapper implémente quatre méthodes qui sont:

  • euclideandist : Cette méthode permet de calcul la distance euclidienne entre l’élément qu’on cherche sa classe et les autres éléments existants dans le Dataset. Autrement, cette méthode calcule la distance entre la ligne ou le vecteur dans le fichier de input et chaque ligne de Dataset.
  • setup : Cette méthode s’exécute avant la fonction map et elle permet de lire de paramètres à patir de l’objet context et cela permet de personnaliser la logique de traitement. Dans notre cas, à l’aide de cette méthode, on a pu initialiser le variable numoffeatures qui représente le nombre totale des attributs et aussi le tableau feature qui représente la ligne de données au niveau de la fichier d’input.
  • map : Cette méthode permet de deviser la Dataset en de lignes, ou en de vecteurs de données, et de faire appel à la méthode euclideandist pour calculer la distance entre chaque vecteur de données et le vecteur de données d’entrée qui on cherche sa classe. Ensuite, elle stocke le résultat dans une liste. 
  • cleanup : Cette méthode s’exécute après la fonction map, elle permet de faire une sorte de clean up de ressources qui on a utilisé. Dans notre cas, cette méthode permet de trier la liste retourner par la fonction map contenant le résultat de comparaison.

Maintenant, On crée la classe KnnReducer sous le package hadoop.mapreduce.knn contenant le code suivant:

package hadoop.mapreduce.knn;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class KnnReducer extends Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        HashMap<String,Integer> map=new HashMap<String,Integer>();
        String str=null;
        int maxvalue=-1;

        for(Text value:values)  { //Si le HashMap map ne contient pas la classe
            if(!map.containsKey(value.toString())) {   //On rajoute la classe au HashMap avec un value 1
                map.put(value.toString(),1);
                System.out.println(value.toString());
            }
            //Si la classe existe déjà au niveau de le HashMap map, on incremente la valeur de value
            else {
                 map.put(value.toString(),map.get(value.toString()+1));
                System.out.println(value.toString());
            }
        }
        System.out.println(map.size());
        Iterator it=map.entrySet().iterator();
        str=((Map.Entry)it.next()).getKey().toString();
        while(it.hasNext()) {
            Map.Entry entry=(Map.Entry)it.next();
            if(Integer.parseInt(entry.getValue().toString())>maxvalue) {
                str=entry.getKey().toString();
                maxvalue=Integer.parseInt(entry.getValue().toString());
            }
        }
        context.write(null,new Text("Class label : "+str));
    }
}

La classe KnnReducer implémente la classe org.hadoop.mapreduce.Reducer de Hadoop que l’on paramètre avec le type de la clé d’entrée (Text), le type de la valeur d’entrée (Text), le type de la clé des sorties intermédiaires (Text) et enfin le type de la valeur des sorties intermédiaires ( Text). Pour écrire le code correspondant à l’opération reduce , nous avons utilisons ici les types Text. La fonction reduce commence à remplir un HashMap en parcourant la liste values qui est le résultat de la fonction map, si l’élément n’existe pas dans le HashMap alors on le rajoute, sinon on incrémente sa valeur. Ensuite, elle retourne la classe avec le plus d’occurrence dans le HashMap qui représente la classe de vecteur de données d’entrée.

Enfin, on crée la classe KNN sous le package hadoop.mapreduce.knn contenant le code suivant:

package hadoop.mapreduce.knn;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class Knn {
    public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException {
        // Créer un job en fournissant la configuration et une description textuelle de la tâche
        Configuration conf=new Configuration();
        Job job=new Job(conf,"KNN Classification");

        //On charge le fichier inputIris.txt
        FileSystem hdfs=FileSystem.get(conf);
        BufferedReader br=new BufferedReader(new InputStreamReader(hdfs.open(new Path(args[0]))));
        String line=null;
        int numoffeatures=0;
        while((line=br.readLine())!=null) {
            String[] feature=line.split("\\ ");
            numoffeatures=feature.length;
            for(int j=0; j<numoffeatures; j++)
                conf.setFloat("feature"+j,Float.parseFloat(feature[j]));
        }
        br.close();
        hdfs.close();

        //on passe le nombre des attributs
        conf.setInt("numoffeatures",numoffeatures);

        //On definit le main class
        job.setJarByClass(Knn.class);

        //On definit le fichier d'input : le jeu de données Iris, le fichier d'output:le resultat de classification
        FileInputFormat.setInputPaths(job,new Path(args[1]));
        FileOutputFormat.setOutputPath(job,new Path(args[2]));

        //On definit la class Mapper et Reducer
        job.setMapperClass(KnnMapper.class);
        job.setReducerClass(KnnReducer.class);
        
        //Définition des types clé / valeur de notre problème
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //On lance le job
        job.waitForCompletion(true);
    }
}

La classe KNN qui contient la fonction main du programme et qui va permettre de :

  • Récupérer la configuration générale du cluster.
  • Créer un job avec le nom KNN Classification .
  • Préciser quelles sont les classes Map et Reduce du programme en utilisant les setters setMapperClass() et setReducerClass().
  • Préciser les types de clés et de valeur correspondant à notre problème à l’aide de méthodes setOutputKeyClass(), setOutputValueClass().
  • Indiquer où sont les données d’entrée et de sortie dans HDFS, à l’aide setInputPaths() et setOutputPath().
  • Définir le type de clé et de valeur pour notre problème à l’aide setOutputKeyClass() et setOutputValueClass ()
  • Lancer l’exécution de la tâche.

Tester Map Reduce en local

Dans le projet sur IntelliJ:

  • On crée un répertoire input sous le répertoire resources de projet
  • On télécharge le jeu de données Iris à partir de ce site Kaggle.
  • On crée un fichier de test: inputIris.txt dans lequel on insère ce jeu de données par exemple: 5.1 3.5 1.4 0.2
  • On crée une configuration de type Application (Run->Edit Configurations…->+->Application).
  • On définir comme Main Class: hadoop.mapreduce.wordcount.WordCount, et comme Program Arguments: src/main/resources/input/inputIris.txt src/main/resources/input/Iris.csv src/main/resources/output

On lance le programme. Un répertoire output sera créé dans le répertoire resources, contenant notamment un fichier part-r-00000, dont le contenu devrait être le suivant:

Pour plus de détails sur le paradigme de programmation MapReduce: https://static.googleusercontent.com/media/research.google.com/fr//archive/mapreduce-osdi04.pdf

Pour tester le programme reportez-vous au projet GITLab du KNN MapReduce de Soufien Jabeur.

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.