Cassandra avec le connecteur spark [Scala]
Retour de la splendide Cassandre qui n’a finalement pas eu tant de mal que ça à convaincre les utilisateurs de BDD NoSQL.
Dans cet article, je vais présenter comment mettre en place Cassandra (version 2.2.3) dans votre projet Spark (version 1.5.0), le tout en Scala (version 2.11.7). Pour programmer, j’utilise Intellij Idea dans sa version 14.1.4 ainsi queSBT (version 0.13.9) pour les dépendances de librairies. Il y était question de présenter Cassandra, son fonctionnement, son langage de requête (CQL) et son fonctionnement via les outils Datastax OpsCenter et DevCenter.
Mise au point sur les mots clefs
Avant toute chose, je propose une petite mise au point sur les mots clefs. Cassandra est un système de gestion de base de données (SGBD) NoSQL orienté colonnes. Spark est un framework de développement distribué. Via son API, Spark permet la parallélisation des calculs en toute transparence (ou presque). Scala est un langage de programmation. Spark étant codé en Scala, il s’agit d’un langage très utilisé par les data scientists. DataStax est une société distribuant Cassandra. C’est eux qui se sont occupés de développer leconnecteur Cassandra pour Spark qui l’ont va utiliser. Ils participent aussi grandement au développement de Cassandra. A noter que ce connecteur est aussi développé en Scala.
Dépendances et téléchargements
Pour faire fonctionner Cassandra avec Spark, vous aurez besoins de plusieurs dépendances et de télécharger le projet Cassandra.
Dépendances
Les dépendances nécessaires sont :
- Spark (j’utilise la version 1.5.0 dans l’exemple)
- Cassandra (pour ne pas être limité, je prends cassandra-all dans l’exemple)
- Le connecteur Cassandra/Spark développé par DataStax. Dans sa version 1.5.0 (correspond à la version de Spark)
- BONUS : Commons-lang3. Dans mon exemple, j’ai fait face à une erreur que j’ai résolue en important les dépendances Apache Commons Lang3.
Je vous conseille vivement d’utiliser SBT pour charger vos dépendances. Moins vous aurez à télécharger de jar, plus votre projet sera organisé et exportable. Ainsi, mon fichier build.sbt ressemble à ça :
name := "CassandraPriseEnMain" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.5.0 libraryDependencies += "org.apache.cassandra" % "cassandra-all" % "2.2.3" libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M2" libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.4"
Pour information, vous retrouverez les dépendances dans leurs dernières versions sur le site MavenRepository.
A télécharger
Personnellement, je travaille en local sur ma machine. Ainsi, j’ai besoins d’y installer Cassandra. Installer est un grand mot d’ailleurs, télécharger Cassandra suffira.
Vous trouverez Cassandra sur le site Apache, par ici. Personnellement, j’ai téléchargé « apache-cassandra-2.2.3-bin.tar.gz« . Dé-zippez l’archive où ça vous chante, à la racine de votre projet par exemple.
Enfin, pour lancer Cassandra en mode localhost, direction le dossier bin/ et lancez cassandra.bat (double clic). Une console Windows s’ouvre et Cassandra tourne sur votre machine.
Cette étape est indispensable si vous souhaitez pouvoir utiliser cassandra dans votre programme.
Utilisation concrète
Dans cette partie, je vais présenter comment configurer Cassandra avec Spark, créer une base complète directement dans le programme, ajouter et accéder à des données.
Configuration en localhost
Pour configurer Cassandra avec Spark, il ne suffit que de 3 lignes. Comme on est en local, on aura besoins que d’une seule ligne (je tire mon chapeau aux développeurs). Tout se passe dans la conf de votre sparkContext. Plutôt que d’expliquer, je vous copie colle, le morceau de code :
val conf = new SparkConf()
.setAppName("CassandraPriseEnMain")
.setMaster("local[*]")
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(conf)
Si vous êtes familier avec Spark, vous remarquerez uniquement l’avant dernière ligne. Elle y indique l’adresse de votre hôte Cassandra. 127.0.0.1 correspond àlocalhost. D’ailleurs, vous retrouverez cette information en lançant cqlsh.batqui se trouve dans le même dossier que cassandra.bat.
Création du keyspace et de votre table
Vous aurez besoins de l’objet CassandraConnector pour créer vos requêtes dans l’environnement Spark. Voilà à quoi ressemble la création de ma base :
CassandraConnector(sc.getConf).withSessionDo { session => session.execute("CREATE KEYSPACE IF NOT EXISTS myKeyspace WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }") session.execute("CREATE TABLE IF NOT EXISTS myKeyspace.mytable (firstColumn text PRIMARY KEY, secondColumn int)") }
Il n’y a rien de spécial à raconter ici. Pour éclaircir les actions, la première commande crée un keyspace que j’ai nommé « myKeyspace ». La seconde crée une table « myTable » au sein du keyspace que je viens de créer.
Insertion de données
Dans le même formalisme (withSessionDo), vous allez pouvoir faire des insertions dans votre base, comme par exemple :
session.execute("INSERT INTO myKeyspace.myTable (firstColumn, secondColumn) VALUES ('La Meuse', 55)") session.execute("INSERT INTO myKeyspace.myTable (firstColumn, secondColumn) VALUES ('Une bonne année', 1989)")
Lecture de données
La lecture des données est un peu différente. Vous allez vous servir de votresparkContext. Par exemple, pour récupérer l’ensemble de votre table « myTable », vous allez exécuter ce code :
val myRdd = sc.cassandraTable("myKeyspace", "words")
Et comme vous l’aurez remarqué avec le nom de la variable, le résultat est un RDD. Ainsi, vous manipulerez vos résultats de requête comme des RDD normaux. Exemple avec l’affichage des lignes de ce RDD :
rdd.foreach(println)