class: center, middle # Stream Processing en mémoire # avec HazelCast Jet ### Claire Villard [](https://twitter.com/neur0nia) neur0nia [](https://github.com/leneurone/hz-jet) leneurone/hz-jet --- # Claire Villard
Développeuse Java depuis 2011
Société V3D
Membre de Duchess France
Organisatrice du LyonJUG
--- # Qu'est-ce qu'HazelCast IMDG ?  * IMDG = In-Memory Data Grid, Grille de données en mémoire * Stockage distribué de structures de données (`Map`, `List`, `AtomicLong`, ...) * Calcul distribué sur ces structures * Clustering * Java-based * Open-source --- # Qu'est-ce qu'HazelCast Jet ?  * Solution de stream et batch processing basée sur HazelCast IMDG .imgContent[  ] .footnote[_Sauf mention contraire, les schémas proviennent de https://jet.hazelcast.org/_] --- # Modélisation * Utilise les graphes orientés acycliques (DAG, Directed Acyclic Graphs) .imgContent[  ] --- # Fonctionnalités * Nombreuses entrées / sorties * Faible latence * Contrôle du débit * _Sliding windows_ * Facilement extensible * Tolérance aux crashs de noeuds avec redémarrage des jobs * Garantie de traitement "au moins 1", "exactement 1" ou "au mieux" des messages * Support cloud (Pivotal Cloud Foundry) et Docker ??? Entrées / sorties : streams (IOT, APIs, Kafka Streams, ...), clusters IMDG, databases, HDFS, files, ... Sliding Windows: Calcul de valeurs aggrégées sur des fenêtres de temps glissantes, avec _watermark_ et Map-Reduce --- # Roadmap * Annoncé _production-ready_ depuis la v0.4 (juin 2017) * v0.6.1 sortie le 3 mai 2018. * 1.0 : prévue en 2018 * APIs instables jusqu'à la v1.0 v0.7 : * Outils de diagnostic et de monitoring * Scaling dynamique * Rolling job updates --- class: center, middle # En pratique  --- .dag-icon[] # Pool de thread Java ```java // Pool de threads Java ExecutorService executor = Executors.newFixedThreadPool(nbCPUs * 2); List
> futureList = new ArrayList<>(NB_ITEMS); for(String key : inputMap.keySet()) { futureList.add(executor.submit(new MetricLocalProcessor(hzInstance, key))); } for (Future> pendingResult : futureList) { pendingResult.get(); } ``` --- .dag-icon[] # Pool de thread Java ```java public class MetricLocalProcessor implements Runnable { private HazelcastInstance hzInstance; private String key; @Override public void run() { IMap
customerMap = hzInstance.getMap(CUSTOMER_MAP_NAME); IMap
metricMap = hzInstance.getMap(SOURCE_MAP_NAME); IMap
outputMap = hzInstance.getMap(OUTPUT_MAP_NAME); IMap
errorMap = hzInstance.getMap(ERROR_OUTPUT_MAP_NAME); Metric metric = metricMap.get(key); Customer customer = customerMap.get(metric.getCustomerId()); if (customer != null) { outputMap.set(metric.getTimestampMs(), new EnrichedMetric(metric, customer)); } else { errorMap.set(metric.getTimestampMs(), metric); }; } } ``` --- # Pool de thread Java - Performances * Intel® Core™ i7-5600U CPU @ 2.60GHz × 4 * 16Go RAM * Fedora 26 64 bits * JVM Oracle HotSpot 1.8.0_121 * Configuration d'HazelCast IMDG / Jet par défaut, 2 noeuds locaux .dag-icon[] ### Temps d'exécution - 1 million d'objets Local Thread Pool (8 threads) : **90s**, soit environ **11 100 items / seconde** ??? Scalabilité :( :( :( --- .dag-icon[] # HazelCast Processor ```java // Création de l'instance Hazelcast avec la configuration par défaut HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); // création d'un thread pool au sein d'Hazelcast IExecutorService executor = hzInstance.getExecutorService("enricherThreadPool"); // Execution du processeur sur chaque noeud du cluster Map
> pendingResults = executor.submitToAllMembers(new MetricEnrichmentIMDGProcessor()); // attente du résultat for (Map.Entry
> pendingResult : pendingResults.entrySet()) { pendingResult.getValue().get(); } ``` --- .dag-icon[] # HazelCast Processor ```java public class MetricEnrichmentIMDGProcessor implements Callable
, HazelcastInstanceAware, Serializable { private HazelcastInstance hzInstance; // [...] @Override public Void call() { IMap
customerMap = hzInstance.getMap(CUSTOMER_MAP_NAME); IMap
metricMap = hzInstance.getMap(SOURCE_MAP_NAME); IMap
outputMap = hzInstance.getMap(OUTPUT_MAP_NAME); IMap
errorMap = hzInstance.getMap(ERROR_OUTPUT_MAP_NAME); for(String metricKey : metricMap.localKeySet()) { Metric metric = metricMap.get(metricKey); Customer customer = customerMap.get(metric.getCustomerId()); if(customer != null) { outputMap.set(metric.getTimestampMs(), new EnrichedMetric(metric, customer)); } else { errorMap.set(metric.getTimestampMs(), metric); } } return null; } } ``` ??? Utilisation d'un Callable pour pouvoir attendre le résultat. Runnable = exécution asynchrone sans possibilité de savoir si elle est terminée --- # HazelCast Processor - Performances .dag-icon[] ### Temps d'exécution - 1 million d'objets **100s**, soit environ **10 000 items / seconde** --- .dag-icon[] # Pipeline ```java Pipeline p = Pipeline.create(); // Definition de la source de données de Customer BatchStage
> customerEntries = p.drawFrom(Sources.map(CUSTOMER_MAP_NAME)); // lecture de la source et enrichissement BatchStage
enrichedStage = p .drawFrom(Sources.
map(SOURCE_MAP_NAME)) .hashJoin(customerEntries, JoinClause.joinMapEntries(Metric::getCustomerId), EnrichedMetric::new); ``` ```java // les données valides vont dans une Map de sortie enrichedStage .filter((EnrichedMetric::isEnriched)) .drainTo(Sinks.map(OUTPUT_MAP_NAME)); ``` ```java // les données en erreur vont dans une Map d'erreur enrichedStage .filter(enrichedMetric -> !enrichedMetric.isEnriched()) .drainTo(Sinks.map(ERROR_OUTPUT_MAP_NAME)); ``` ```java // Execution du pipeline et attente du résultat jet.newJob(p).join(); ``` ??? Le fait de définir plusieurs opérations à une même étape crée un flux en Y. Le chargement de la Map de Customer provoque sa lecture intégrale au démarrage et son stockage sous forme de table de hashage sur chaque noeud Attention à la mémoire ! --- # Pipeline - Performances .dag-icon[] ### Batch processing - 1 million de `Metric` Jet : **22s**, soit environ **45 400 items / seconde** ??? Rappel : hazelcast processor = 100s TP local : 90s --- # Stream Processing .center[  ] --- .dag-icon[] # Stream Processing ```java public class QueuePoller extends AbstractProcessor { // ... @Override public boolean complete() { Metric metric = inputQueue.poll(20, TimeUnit.MILLISECONDS); if(metric != null) { // on envoie le message dans le pipeline de traitement tryEmit(metric); } return false; // execution infinie tant que complete() retourne false } } ``` ??? Le poller bloque le thread sur la méthode `poll()`, il ne doit pas être coopératif pour ne pas bloquer d'autres threads. Utilisation de la méthode `poll(long, TimeUnit)` pour pouvoir interrompre l'exécution avec `Future.cancel()`, ou un flag d'arrêt La métrique est null si le timeout a expiré. Si on ne parvient pas à émettre l'évènement, on attend un peu et on retente. Si le poller retourne `false` il sera rappelé par le moteur jusqu'à ce qu'il retourne `true` ou que le job soit arrêté. --- .dag-icon[] # Stream processing ```java Pipeline pipeline = Pipeline.create(); BatchStage
> customerEntries = pipeline.drawFrom(Sources.map(CUSTOMER_MAP_NAME)); StreamStage
enrichedStage = pipeline // Utilisation du poller pour lire la queue .
drawFrom(Sources.streamFromProcessor("source", ProcessorMetaSupplier.of(() -> new QueuePoller(SOURCE_QUEUE_NAME, STOP_FLAG_NAME), 1))) .hashJoin(customerEntries, JoinClause.joinMapEntries(Metric::getCustomerId), EnrichedMetric::new); ``` [...] ```java // On execute le graph sans attendre le résultat Future
job = jet.newJob(pipeline).getFuture(); ``` --- # Stream processing - Performances .dag-icon[] ### Stream processing Injection de `Metric` dans la queue et calcul de la latence totale Latence moyenne : **2 ms**, latence max. sur 100 000 éléments : **27 ms** --- # Performances comparées - Throughput .center[  ] ??? Jet v0.4 --- # Performances comparées - Latence .center[  ] ??? Jet v0.4 --- .dag-icon[] # Fenêtre glissante ```java public class Metric { // date de la métrique private long timestampMs; // identifiant du client private int customerId; // nombre d'appels passés private int numberOfCalls; // nombre d'appels ayant échoué private int numberOfErrors; } ``` Objectif : Connaître sur une fenêtre de temps glissante le nombre total d'appels et d'erreurs rencontrés par chaque client. --- .dag-icon[] # Fenêtre glissante ```java Pipeline pipeline = Pipeline.create(); pipeline.
drawFrom(Sources.streamFromProcessor("source", ProcessorMetaSupplier.of(() -> new QueuePoller(SOURCE_QUEUE_NAME, STOP_FLAG_NAME), 1))) // ajoute des timestamps et des watermarks aux évènements .addTimestamps() // détermine le type de fenêtre, sa largeur (1H) et sa fréquence de mise à jour (10 min) .window(WindowDefinition.sliding(TimeUnit.HOURS.toMillis(1), TimeUnit.MINUTES.toMillis(10))) // les évènements doivent être groupés par Customer .groupingKey(Metric::getCustomerId) // On aggrège à la fois le total et le nombre d'erreurs .aggregate(AggregateOperation .withCreate(DropCountAccumulator::new) .
andAccumulate((acc, item) -> acc.add(item.getNumberOfCalls(), item.getNumberOfErrors())) .andCombine(DropCountAccumulator::combine) .andDeduct(DropCountAccumulator::deduct) .andFinish(DropCountAccumulator::total) ) .drainTo(Sinks.map(OUTPUT_MAP_NAME)); ``` ```java Future
job = jet.newJob(pipeline).getFuture(); ``` --- .dag-icon[] # Fenêtre glissante ```java .aggregate(AggregateOperation .withCreate(DropCountAccumulator::new) // Integrer un nouvel item dans l'accumulateur // this.totalCount += newTotalCount; this.failedCount += newFailedCount; .
andAccumulate((acc, item) -> acc.add(item.getNumberOfCalls(), item.getNumberOfErrors())) // combiner 2 accumulateurs // this.totalCount += that.totalCount; this.failedCount += that.failedCount; .andCombine(DropCountAccumulator::combine) // retirer un accumulateur d'un autre (décalage de la fenêtre) // this.totalCount -= that.totalCount; this.failedCount -= that.failedCount; .andDeduct(DropCountAccumulator::deduct) // extraire la valeur de l'accumulateur lorsque la fenêtre est figée // return new Result(totalCount, failedCount); .andFinish(DropCountAccumulator::total) ) ``` --- .dag-icon[] # Fenêtre glissante Exemple de résultat :
Date
ID Client
Nombre d'appels
Nombre d'erreurs
2018-05-29 20:10:00
3
124
46
2018-05-29 20:20:00
3
179
62
2018-05-29 20:30:00
3
223
71
2018-05-29 20:10:00
11
103
31
2018-05-29 20:20:00
11
132
46
2018-05-29 20:30:00
11
216
66
```java .window(WindowDefinition.sliding(TimeUnit.HOURS.toMillis(1), TimeUnit.MINUTES.toMillis(10))) ``` "Toutes les 10 minutes, donne les valeurs agrégées sur la dernière heure" --- # Limitations  * Lisibilité des pipelines complexes * API complexe (et instable) * Supervision et monitoring * _Production ready_ ? --- class: center, middle # Merci ! [ leneurone/hz-jet](https://github.com/leneurone/hz-jet)