Bereits in den den letzten Artikel habe ich mich mit dem Installieren von Elasticsearch und Kibana beschäftigt. Heute will ich mir mal anschauen, wie man Daten in Elasticsearch importieren kann. Ziel ist es das ganze mit Python zu realisieren.
Als Datensatz nehme ich Sensordaten von luftdaten.info. Hier habe ich mir aus dem Archiv die Daten für den SDS011 un den DHT22 Sensor heruntergeladen und entpackt. Das sind in Summe ungefähr 11 GB.
Implementieren des Python import Scripts
Wir starten damit, eine paar Libraries zu importieren und ein paar Variablen für die Ordner mit den CSV Dateien festzulegen.
1 2 3 4 5 6 7 8 9 | import csv import sys import os from elasticsearch import Elasticsearch currentDirectory = os.path.dirname(os.path.realpath(__file__)) dht22 = currentDirectory + '/dht22/' sds011 = currentDirectory + '/sds011/' |
Im nächsten Schritt wird die Klasse zum Importieren der Daten implementiert. Ich habe das ganze extra in eine eigene Klasse gepackt, um das ganze später auch in größeren Scripten verwenden zu können.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | class ElasticSearchImporter(object): def importToDb(self, directory, fileName, indexDbName, indexType="default"): csv.field_size_limit(sys.maxsize) es = Elasticsearch(['http://192.168.10.107:9200/']) if not es.ping(): raise ValueError("Connection failed") headers = [] index = 0 f = open(directory + fileName, 'rt') reader = csv.reader(f) try: for row in reader: try: if (index == 0): headers = row else: obj = {} for i, val in enumerate(row): obj[headers[i]] = val es.index(index=indexDbName, doc_type=indexType, body=obj) except Exception as e: print(index) print(e) index = index + 1 except: print('Loading Error') if not f.closed: f.close() |
Im wesentlichen definiere ich in der Klassen den Elasticsearch Server und iteriere über die CSV Datei um diese nach Elasticsearch zu übergeben.
Ganz am Ende rufe ich dann die Funktion der Klasse auf und übergebe die entsprechenden Parameter für die CSV Dateien und die indexDB auf Elastiscsearch Seite.
1 2 3 | importer = ElasticSearchImporter() importer.importToDb(dht22,"2019-01_dht22.csv", "dht22", indexType="default") importer.importToDb(sds011,"2019-01_sds011.csv", "sds011", indexType="default") |
Hier nun noch mal den kompletten Quellcode zum Kopieren.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | import csv import sys import os from elasticsearch import Elasticsearch currentDirectory = os.path.dirname(os.path.realpath(__file__)) dht22 = currentDirectory + '/dht22/' sds011 = currentDirectory + '/sds011/' class ElasticSearchImporter(object): def importToDb(self, directory, fileName, indexDbName, indexType="default"): csv.field_size_limit(sys.maxsize) es = Elasticsearch(['http://192.168.10.107:9200/']) if not es.ping(): raise ValueError("Connection failed") headers = [] index = 0 f = open(directory + fileName, 'rt') reader = csv.reader(f) try: for row in reader: try: if (index == 0): headers = row else: obj = {} for i, val in enumerate(row): obj[headers[i]] = val es.index(index=indexDbName, doc_type=indexType, body=obj) except Exception as e: print(index) print(e) index = index + 1 except: print('Loading Error') if not f.closed: f.close() importer = ElasticSearchImporter() importer.importToDb(dht22,"2019-01_dht22.csv", "dht22", indexType="default") importer.importToDb(sds011,"2019-01_sds011.csv", "sds011", indexType="default") |
Prüfen des Uploads zu Elasticsearch
Ob die Daten in Elasticsearch angekommen sind, kann mit einer Anfrage via curl geprüft werden.
1 2 3 4 5 6 7 8 | curl -XGET 192.168.10.107:9200/_cat/indices?v health status index uuid pri rep docs.count docs.deleted store.size pri.store.size red open .kibana_1 aeBnd403TWuFYIRtsyE5Lg 1 0 red open dht22 _OuocHXxRsO-BgA03zGQ4Q 5 1 red open kibana_sample_data_logs 4W_9kGB_Qfq7Gzbe3PSJvQ 1 0 red open kibana_sample_data_flights 472QoOYpSKSplvrNBbASVw 1 0 red open sds011 kIycXUzBTf2GlqxRIAkvvg 5 1 red open kibana_sample_data_ecommerce Gq61wDHEQYC-Sxqm-vn0jg 1 0 |
Wenn alles geklappt hat, dann haben wir hier jetzt Einträge für dht22 und sds011.
Schreibe einen Kommentar