Apr.10

IntelliJ IDEA Hello World

intellij-15

Dans cet article, je vais présenter comment démarrer un projet Scala avec l’IDEIntellij IDEA. Je vais montrer toutes les étapes pour pouvoir écrire son premier « Hello World » dans la console Intellij.

Télécharger Intellij IDEA

La partie la plus simple est le téléchargement de l’IDE. La page de téléchargement est ici. La version Community prend en charge le développement d’application Scala, donc inutile de s’embêter avec la versionUltimate.

Une fois téléchargé et installé, lancez le logiciel.

Télécharger le plugin Scala

Pour développer en Scala, vous devez commencer par télécharger le plugin correspondant pour Intellij.

Une fois Intellij ouvert, appuyez sur CTRL + ALT + S pour ouvrir la fenêtre desSettings. Elle est aussi disponible par le chemin File -> Settings... Depuis cette fenêtre, cliquez sur « plugins » dans la colonne de gauche puis recherchez le plugin « scala » comme sur l’image ci-dessus (chez moi le plugin est déjà installé). settings

Notez qu’avec ce plugin, vous téléchargez aussi l’outillage nécessaire à faire un projet SBT (SBT est à Scala ce que Maven est à Java : un outil permettant de gérer les dépendances de votre projet).

Créer le projet Scala (avec SBT)

Une fois que tout est installé, faites File -> new -> Project. Vous arrivez sur une fenêtre similaire à celle ci-dessous.

new projet

Sélectionnez Scala dans la colonne de gauche et SBT dans celle de droite (à moins que vous ne souhaitiez pas utiliser SBT mais il n’y a pas de raison si c’est un nouveau projet).

Validez et vous arriverez sur la page suivante :

helloworld

Ici, je vous laisse y mettre vos paramètres. Personnellement, je n’utilise pas la fonction « auto-import » (qui permet d’importer les dépendances automatiquement à chaque fois que vous en ajoutez une dans votre fichierbuild.sbt) car de temps en temps, ça ne marche pas et ça a le don de m’énerver.

Validez et laissez votre projet se mettre en place. Cela peut mettre du temps. Une barre de progression est cachée en bas du logiciel.

refresh

Une fois votre projet prêt, vous devriez avoir un écran similaire au mien :

intellij-AZ

Sur la colonne de gauche, vous pouvez voir la hiérarchie de votre projet et notamment le fichier build.sbt qui vous permettra de créer vos dépendances avec d’autres APIs.

Ecrivez votre programme

Les dossiers en bleus sont les dossiers de Sources. Vous pouvez manager le rôle de vos dossiers avec le clic droit directement sur le dossier concerné.

On va créer notre premier fichier dans le dossier src -> main -> scala. Clic droit sur le dossier bleu scala puis new -> scala class. Choisissez la création d’un objet (si vous ne comprenez pas pourquoi, je vous propose ce tutoriel sur Scala) et nommez le comme vous le souhaitez.

new-hello

Votre fichier .scala s’ouvre. Remarquez qu’il ne contient pas de méthodemain(). Créez le vous-même et profitez-en pour y placer la commande println()qui a pour but d’afficher Hello World dans la console.

Code à copier/coller :

object HelloWorld {
  def main(args: Array[String]): Unit = {
    println("Hello World")
  }
}

hellohelloe

Ne reste plus qu’à lancer votre programme. Pour ça, cliquez droit sur votre fichier depuis la hiérarchie de la colonne gauche et choisissez run ‘HelloWorld’. Vous devriez obtenir :

println

Fin!

Bigdata

Feb.13

Sc.textFile ou sc.wholeTextFiles [Spark]

Lors d’un projet sur le traitement de textes, vous avez le choix entre distribuer les lignes de vos fichiers ou distribuer les contenus entiers de ces derniers. L’article présente l’intérêt de chacun et la manière de le faire.

Distribuer les lignes d’un fichier

Pour distribuer les lignes d’un fichier, il faut créer un RDD[String] avec la méthode textFile() de votre instance SparkContext(). Cette méthode prend en paramètre le chemin vers le fichier en question (en local, hdfs, …). Le retour de cette méthode est un RDD[String] où chaque String correspond à une ligne de votre fichier.  Ainsi, vous distribuez votre analyse sur les lignes du fichiers.

val sc = new SparkContext(conf)
val mesLignes: RDD[String] = sc.textFile("cheminVersMonFichier")
mesLignes.map(maLigne => {
  // Traitements
})

Distribuer les fichiers d’un dossier

Si vos fichiers contiennent peu de lignes mais la complexité est situé au niveau du nombre de ces fichiers, il existe la méthode wholeTextFiles() de votre instance SparkContext(). Cette méthode prend en paramètre le chemin vers votre dossier conteneur. Le retour de cette méthode est un RDD[(String, String)]. Chaque ligne du RDD est un Tuple2. Le premier élément du Tuple2 est le chemin vers le fichier, le second son contenu. Ainsi, vous distribuez les fichiers et leur contenu en entier.

val sc = new SparkContext(conf)
val mesFichiers: RDD[(String, String)] = sc.wholeTextFiles("cheminVersMonDossierConteneur")
mesFichiers.map(unFichier => {
  println("Chemin : " + unFichier._1)
  println("Contenu : " + unFichier._2)
  // Traitements
})

A vous de décider si le meilleur moyen est de distribuer les contenus entiers des fichiers ou de boucler sur les fichiers et de distribuer les lignes de ces derniers.

Bigdata

Feb.10

Créer un .JAR avec Intellij Idea

Pour créer un fichier JAR à partir d’un programme (scala, java, ce que vous voulez) développé sur Intellij Idea 15, la procédure est la suivante :

Créer un Artifact

Pour créer un Artifact, allez dans File –> Project Structure (le racourci pour ça est CTRL + ALT + MAJ + S) puis allez dans la section Artifacts.

artifacts-1

Cliquez sur le « + » vert. Dans l’arborescence proposé, allez dans JAR –> From modules with dependencies.

Dans la fenêtre qui vient de s’ouvrir, cherchez votre classe d’entrée du projet (le main donc), ajoutez votre module et sélectionnez « copy to the output directory and link via manifest« . Puis validez.

Construire le JAR

Ensuite, dans le menu (File, Edit, View, etc.), allez dans Run -> Build Artifacts… Notez qu’avant d’avoir créer votre Artifact, cette option était grisée.

Dans le petit menu déroulant qui s’affiche en plein milieu de votre page, choisissez « Build » et le tour est joué.

A noter que le jar se trouve dans le dossier suivant :

IdeaProject –> Votre-Projet –> out –> artifacts –> Votre-Projet_jar –>Votre-Proket.jar

Bigdata

Dec.10

Cassandra avec le connecteur spark [Scala]

Apache Cassandra

Retour de la splendide Cassandre qui n’a finalement pas eu tant de mal que ça à convaincre les utilisateurs de BDD NoSQL. 

Dans cet article, je vais présenter comment mettre en place Cassandra (version 2.2.3) dans votre projet Spark (version 1.5.0), le tout en Scala (version 2.11.7). Pour programmer, j’utilise Intellij Idea dans sa version 14.1.4 ainsi queSBT (version 0.13.9) pour les dépendances de librairies. Il y était question de présenter Cassandra, son fonctionnement, son langage de requête (CQL) et son fonctionnement via les outils Datastax OpsCenter et DevCenter.

Mise au point sur les mots clefs

Avant toute chose, je propose une petite mise au point sur les mots clefs. Cassandra est un système de gestion de base de données (SGBD) NoSQL orienté colonnes. Spark est un framework de développement distribué. Via son API, Spark permet la parallélisation des calculs en toute transparence (ou presque). Scala est un langage de programmation. Spark étant codé en Scala, il s’agit d’un langage très utilisé par les data scientists. DataStax est une société distribuant Cassandra. C’est eux qui se sont occupés de développer leconnecteur Cassandra pour Spark qui l’ont va utiliser. Ils participent aussi grandement au développement de Cassandra. A noter que ce connecteur est aussi développé en Scala.

Dépendances et téléchargements

Pour faire fonctionner Cassandra avec Spark, vous aurez besoins de plusieurs dépendances et de télécharger le projet Cassandra.

Dépendances

Les dépendances nécessaires sont :

  • Spark (j’utilise la version 1.5.0 dans l’exemple)
  • Cassandra (pour ne pas être limité, je prends cassandra-all dans l’exemple)
  • Le connecteur Cassandra/Spark développé par DataStax. Dans sa version 1.5.0 (correspond à la version de Spark)
  • BONUS : Commons-lang3. Dans mon exemple, j’ai fait face à une erreur que j’ai résolue en important les dépendances Apache Commons Lang3.

Je vous conseille vivement d’utiliser SBT pour charger vos dépendances. Moins vous aurez à télécharger de jar, plus votre projet sera organisé et exportable. Ainsi, mon fichier build.sbt ressemble à ça :

name := "CassandraPriseEnMain"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.5.0
libraryDependencies += "org.apache.cassandra" % "cassandra-all" % "2.2.3" 
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M2" 
libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.4"

Pour information, vous retrouverez les dépendances dans leurs dernières versions sur le site MavenRepository.

A télécharger

Personnellement, je travaille en local sur ma machine. Ainsi, j’ai besoins d’y installer Cassandra. Installer est un grand mot d’ailleurs, télécharger Cassandra suffira.

Vous trouverez Cassandra sur le site Apache, par ici. Personnellement, j’ai téléchargé « apache-cassandra-2.2.3-bin.tar.gz« . Dé-zippez l’archive où ça vous chante, à la racine de votre projet par exemple.

Enfin, pour lancer Cassandra en mode localhost, direction le dossier bin/ et lancez cassandra.bat (double clic). Une console Windows s’ouvre et Cassandra tourne sur votre machine.

cassandra-apache-bat

Cette étape est indispensable si vous souhaitez pouvoir utiliser cassandra dans votre programme.

Utilisation concrète

Dans cette partie, je vais présenter comment configurer Cassandra avec Spark, créer une base complète directement dans le programme, ajouter et accéder à des données.

Configuration en localhost

Pour configurer Cassandra avec Spark, il ne suffit que de 3 lignes. Comme on est en local, on aura besoins que d’une seule ligne (je tire mon chapeau aux développeurs). Tout se passe dans la conf de votre sparkContext. Plutôt que d’expliquer, je vous copie colle, le morceau de code :

val conf = new SparkConf()
  .setAppName("CassandraPriseEnMain")
 .setMaster("local[*]")
 .set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(conf)

Si vous êtes familier avec Spark, vous remarquerez uniquement l’avant dernière ligne. Elle y indique l’adresse de votre hôte Cassandra. 127.0.0.1 correspond àlocalhost. D’ailleurs, vous retrouverez cette information en lançant cqlsh.batqui se trouve dans le même dossier que cassandra.bat.

cqlsh

Création du keyspace et de votre table

Vous aurez besoins de l’objet CassandraConnector pour créer vos requêtes dans l’environnement Spark. Voilà à quoi ressemble la création de ma base :

CassandraConnector(sc.getConf).withSessionDo { session =>
  session.execute("CREATE KEYSPACE IF NOT EXISTS myKeyspace WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
  session.execute("CREATE TABLE IF NOT EXISTS myKeyspace.mytable (firstColumn text PRIMARY KEY, secondColumn int)")
}

Il n’y a rien de spécial à raconter ici. Pour éclaircir les actions, la première commande crée un keyspace que j’ai nommé « myKeyspace ». La seconde crée une table « myTable » au sein du keyspace que je viens de créer.

Insertion de données

Dans le même formalisme (withSessionDo), vous allez pouvoir faire des insertions dans votre base, comme par exemple :

session.execute("INSERT INTO myKeyspace.myTable (firstColumn, secondColumn) VALUES ('La Meuse', 55)")
session.execute("INSERT INTO myKeyspace.myTable (firstColumn, secondColumn) VALUES ('Une bonne année', 1989)")

Lecture de données

La lecture des données est un peu différente. Vous allez vous servir de votresparkContext. Par exemple, pour récupérer l’ensemble de votre table « myTable », vous allez exécuter ce code :

val myRdd = sc.cassandraTable("myKeyspace", "words")

Et comme vous l’aurez remarqué avec le nom de la variable, le résultat est un RDD. Ainsi, vous manipulerez vos résultats de requête comme des RDD normaux. Exemple avec l’affichage des lignes de ce RDD :

rdd.foreach(println)
Bigdata

Nov.01

ZooKeeper et Apache Kafka en pratique

apache kafka

Introduction

Cet article présente comment prendre en main Apache Kafka (+ ZooKeeper) en Scala. La version de Kafka utilisée est 0.8.2, la version de ZooKeeper est 3.4.6 (du moins je pense, car il s’agit de la version empaquetée dans Kafka 0.8.2), la version de Scala est 2.11.7. Enfin, j’utilise l’environnement de développement Intellij Idea dans sa version 14.1.4.

L’objectif principal de cet article est d’expliquer toutes les étapes de la mise en place d’un projet Kafka. L’application réalisée est très simple et fonctionne en local.

Qu’est ce que Kafka?

Définition globale

Kafka est un « système orienté message de type publication / souscription distribué ». Autrement dit, Kafka est un système permettant de gérer le flux de messages entre des producteurs (de messages) et des consommateurs (de messages aussi). Un message correspond à un couple clé/valeur de données. Un message peut être un log d’une machine, un vecteur de données, une liste de valeurs de capteurs à un instant t, etc.

Cet outil est de plus en plus populaire car il est bien intégré par les distributeurs de Hadoop (Hortonworks, Cloudera et compagnie). Il est également intégré à l’utilisation de Spark grâce à son connecteur spécifique. Ainsi, Kafka est le système orienté message pour le Big Data.

Un petit mot sur ZooKeeper qui est un système centralisé permettant de coordonner les processus dans un environnement distribué. ZooKeeper est essentiel au bon fonctionnement de Kafka, c’est pour cela qu’il est empaqueté dans Kafka.

Définition plus fine

Kafka a un vocabulaire bien spécifique, cette partie a pour objectif de vous présenter ce vocabulaire. Je ne vais pas rentrer dans le détail (loin de là). Le but est simplement de donner le vocabulaire et la connaissance nécessaire pour faire tourner Kafka sur votre machine.

Comme dit plus haut, Kafka fait le lien entre producteurs et consommateurs et gère les accès de chacun. Pour faciliter la compréhension, il ne faut pas voir un unique flux entre les producteurs et les consommateurs mais plusieurs flux. Ces flux sont appelés des topics. Ainsi, on va pouvoir inscrire des producteurs et des consommateurs à ces topics : « Le producteur A publie dans le topic #1 », « Le consommateur B consomme les données des topics #1 et #2 », etc. Chaque topic est partitionné. Cela permet la résistance à la pane de machines entre autres.

Ci-après le graphique que tout le monde utilise pour représenter l’anatomie d’un topic Kafka :

kafka

Voilà l’essentiel à savoir sur le fonctionnement et le vocabulaire de Kafka. Evidemment, l’affaire est bien plus complexe. Pour en savoir d’avantage (notamment sur les répliquas de partition pour des raisons de sécurité et de non perte de données), rendez-vous sur le site officiel.

Mise en pratique

Objectif du programme

Le programme présenté est très simpliste tout en étant complet. Il va être question de coder un producteur et un consommateur. Le producteur va envoyer un chiffre aléatoire entre 0 et 9 toutes les demies secondes. Le consommateur va récupérer ce chiffre et l’afficher dans sa console.

Pour faire fonctionner ce programme, il est nécessaire de télécharger Kafka. Comme dit dans l’introduction, la version utilisée dans l’exemple est 0.8.2.0. Vous pourrez la trouver sur le site officiel, ici. Téléchargez la version de votre choix et dé-zippez la où ça vous arrange (dans /lib de votre projet Intellij par exemple).

Architecture

Pour réaliser ce programme est il nécessaire de comprendre l’architecture d’un projet Kafka. Pour faire fonctionner un programme Kafka il vous faut 4 choses distinctes :

Démarrer le serveur ZooKeeper 

Pour démarrer votre ZooKeeper, ouvrez une console windows et naviguez jusqu’à la racine de votre Kafka. Ici, vous lancez le fichier zookeeper-server-start.bat en précisant le fichier properties de zookeeper en argument qui se trouve dans le dossier config/ (config/zookeeper.properties). Personnellement, je n’ai pas modifié ce fichier, mais si des contraintes s’imposent à vous, c’est ici qu’on configure zookeeper (port, hôte et compagnie).

Si vous êtes à la racine de Kafka, la commande est la suivante :

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

Et vous devriez obtenir :

zookeeper-start-server

Démarrer le serveur Kafka

De la même manière que pour Zookeeper, il est nécessaire de lancer le serveur Kafka. Ouvrez une autre console, positionnez vous à la racine de Kafka et tapez dans votre console :

bin\windows\kafka-server-start.bat config\server.properties

Vous devriez obtenir :

kafka-console

Programmer le(les) producteur(s) ET le(les) consommateur(s)

Vous devez être capable de lancer votre producteur indépendamment de votre consommateur. Ainsi, créez deux programmes distincts. Pour ma part, j’ai créé deux projets Intellij différents. Cela me permet de lancer mes programmes depuis Intellij. Intellij vous proposant de gérer un projet par fenêtre, c’est parfait.

Pensez à bien importer les dépendances Kafka sur les deux projets pour pouvoir coder vos producteurs/consommateurs avec l’API. Plus loin, je donne le contenu des fichiers build.sbt qui gèrent les dépendances de mes projets.

Le programme

Les dépendances

Une fois que la théorie est à peu près intégrée, il est temps de passer à la pratique. D’ailleurs, je conseille de ne pas trop s’attarder sur la théorie. Il n’y a pas de priorité sur la mise en place du producteur ou du consommateur. L’un peut fonctionner sans l’autre et vis et versa.

Avant de commencer le code, pensez à créer les dépendances avec Kafka pour avoir accès à l’API. Utilisez la méthode de votre choix : SBT ou import à la main. Pour ma part, j’ai pris l’habitude de travailler avec SBT, je vous copie colle le contenu de mes fichiers build.sbt de mes deux projets :

build.sbt du producteur :

name := "TEST-PRODUCER"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.8.2.0"

build.sbt du consommateur :

name := "TEST-CONSUMER"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.8.2.0"

Producteur

Les properties

La première chose à faire, avant de toucher à l’API est de crée la configuration de votre producteur via la classe Properties (une classe java.util). Voilà mon morceau de code, je l’explique ensuite :

// DEFINITION DES PROPERTIES
val props = new Properties()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

J’indique que je vais faire tourner Kafka sur ma machine (localhost) et que je vais utiliser le port 9092 pour la communication. Ce port est le port généralement utilisé par Kafka. Vous le retrouverez dans le fichierconfig/server.properties de votre dossier Kafka.

J’indique aussi que mes messages (clé + valeur) qui vont transiter seront du type String. Pour cela, Kafka a un serializer tout prêt que je vous conseille d’utiliser.

Il existe de nombreuses configurations possibles. J’ai mis le minimum vital pour que ça marche dans l’exemple. Pour connaitre toutes les variables possibles, deux choix : Accédez à la classe ProducerConfig.java, elle est bien commentée ou visitez la documentation sur le site officiel, à cette page. A noter qu’il existe les « anciennes » configurations et les « nouvelles ». Les nouvelles sont plus claires car elles sont sous forme de variables globales et ainsi retrouvables facilement via l’auto-complétion.

L’objet producer

Ici, il est question de créer l’objet producer. C’est avec ce dernier que vous allez envoyer vos messages. Pour l’instancier, le producer a besoins des configurations qu’on vient de mettre en place et des types de la clé et de la valeur des messages que vous souhaitez envoyer. Comme vu dans lesproperties, la clé et la valeur seront de type String. On obtient donc :

val producer = new KafkaProducer[String, String](props)

La production et l’envoi des données

Ici, deux choses sont importantes :

  • La création d’un message via la classe ProductRecord
  • L’envoi de vos messages dans le bon topic via la méthode send() appliquée à l’objet producer.

Pour rappel, je souhaite envoyer un chiffre entre 0 et 9 toutes les demies secondes. Je vous copie colle le code :

// ON ENVOIE TOUT
while (true) {
  val str = scala.util.Random.nextInt(10).toString
  val message = new ProducerRecord[String, String]("monTopic", null, str)
  producer.send(message)
  println("MESSAGE : " + str)
  Thread.sleep(500)
}

On retrouve donc la ligne me permettant de générer un chiffre aléatoirement. On note que je le caste en String pour respecter ce que j’ai énoncé plus tôt.

La création de mon message se fait donc via la création d’un objetProducerRecord auquel j’associe les bons types pour la clé et la valeur de mon message. En argument je lui donne le topic auquel je souhaite contribuer (une chaîne de caractères suffit) ainsi que mon message.

Ensuite, j’envoie ce message via la méthode send() qui n’a pas d’argument particulier mis à part le message.

Enfin, je débug en affichant ce que j’envoie et j’attends un demie seconde avant l’envoi du prochain message.

Consommateur

Les properties

Tout comme pour le producteur, le consommateur a aussi besoins d’un minimum de properties. Cependant, il n’existe pas de variables globales comme pour le producteur (rappelez vous des « nouvelles » configurations). Voilà à quoi ressemble ma variable de configuration :

val props = new Properties()
props.put("group.idonPremierGroupID")
props.put("zookeeper.connectocalhost:2181")
props.put("auto.offset.resetmallest")

Je n’ai pas parlé du Group ID dans cet article car ça ne fait que compliquer les choses. Ce qu’il faut savoir c’est qu’on regroupe les consommateurs dans des groupes qui suivent les mêmes topics. Plus d’explications sur le site officiel. Quoiqu’il en soit, pour une phase de test, le group.id importe très peu puisque vous exécutez le code sur votre propre machine et non pas un maxi cluster.

La seconde propriété indique l’hôte et le port de ZooKeeper. Nous sommes en local dont l’hôte est localhost et le port de ZooKeeper est par convention 2181. Vous retrouverez ces informations dans le fichier config/zookeeper.properties.

Enfin, dans la dernière propriété, j’indique que je souhaite traiter les messages par ordre croissant de leur offset (numéro attribué lors de l’envoi d’un message dans le flux). Ainsi les messages de mon topic sont lus à la manière d’une fileFIFO (First In First Out).

La liste de toutes les propriétés configurables des consommateurs sont dispoici.

Création du consommateur

Pour créer le consommateur, il faut d’abord créer un objet ConsumerConfig. Rien est très compliqué ici, voilà les deux lignes nécessaires :

val config = new ConsumerConfig(props)
val connector = Consumer.create(config)

Définition des topics à suivre

Il faut ensuite définir la liste des topics à suivre (il faut voir ça comme un filtre) et donc à ajouter à notre WhiteList. Rien de compliqué non plus, voilà le code :

val filterSpec = new Whitelist(topic)

Création du stream

La partie la plus intéressante du consommateur, la création du stream. Ici, il est question de créer le stream et de réagir à chaque nouveau message.

val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder(null), new StringDecoder(null))(0)
val it = stream.iterator()
while(it.hasNext()) {
    println("REPONSE : " + it.next().message())
}

Pour créer le stream, on utilise la méthode createMessageStreamsByFilterappliquée à notre consommateur. Le filtre étant notre WhiteList, nous la fournissons en argument. Nous fournissons également les décodeurs pour la clé et le contenu du message. Et oui, pour rappel, avant de les insérer dans le flux, nous les avions sérialisés. Pour décoder en String, nous utilisons l’objetStringDecoder. A la fin de la première ligne, on indique que l’on souhaite accéder au premier élément du stream avec (0).

Ensuite, via un iterator, on va boucler sur nos messages et simplement en afficher le contenu.

Exécuter le programme

Pour exécuter le programme, pensez à lancer le serveur ZooKeeper en premier puis le serveur Kafka. Ensuite lancez indépendamment le programme du consommateur et du producteur (l’ordre du lancement n’importe en rien).

De mon coté, voilà un extrait des sorties avec à gauche le producteur et à droite le consommateur.

kafka-sortie

Bigdata