使用Aerospike的 Spark 连接器(Spark Connect)

Aerospike是一个高度可扩展的键值数据库,可提供同类产品中最佳的性能。它在实时业务环境中通常部署管理TB到PB数据量。

Aerospike通常与其他可扩展的分布式软件(例如,用于系统耦合的Kafka或用于分析的Spark)一起运行。Aerospike提供的 Aerospike Connect 附件使集成变得很容易。

本文通过使用 aerospike-ansible 讨论了 Aerospike Spark Connect 在实际中的工作方式,并提供一个全面且易于复制的端到端示例。

数据库集群设置

首先看一下 Ansible for Aerospike,它解释了如何使用 aerospike-ansible。

在此示例中,我在 vars/cluster-config.yml 中将 cluster_instance_type 设置为 c5d.18xlarge。

按照说明进行操作,直到并包括一键式设置,最后我们会运行到

ansible-playbook aws-setup-plus-aerospike-install.yml ansible-playbook aerospike-java-client-setup.yml 

这会产生一个3个节点的群集,以及一个安装了相关软件的客户端实例。

Spark 集群设置

这是通过

ansible-playbook spark-cluster-setup.yml 

该脚本创建了一个3节点的给定实例类型的Spark集群,在其中已安装并运行了Spark,它还安装了Aerospike Spark Connect。

请注意,您需要设置 enterprise:true,并通过 vars/cluster-config.yml 中的 feature_key:/your/path/to/key 提供有效的Aerospike功能密钥的路径。因此,您必须是Aerospike的授权客户,或者必须正在Aerospike企业版试用期。

在过程即将结束时,您会看到

TASK [Spark master IP & master internal url] ************************************************************************************************************************************************************************ ok: [localhost] => { "msg": "Spark master is 3.88.237.103. Spark master internal url is spark://10.0.2.122:7077." } 

记下Spark master内部网址,稍后需要。

加载数据

首先,我们获得Aerospike集群的节点的地址,稍后需要这些地址。

source ./scripts/cluster-ip-address-list.sh 
Adds cluster ips to this array- AERO_CLUSTER_IPS Use as ${ AERO_CLUSTER_IPS[index]} There are 3 entries ########################################################## cluster IP addresses : Public : 3.87.14.39, Private : 10.0.2.58 cluster IP addresses : Public : 3.89.113.231, Private : 10.0.0.234 cluster IP addresses : Public : 23.20.193.64, Private : 10.0.1.95 
{ "name": "pkup_datetime", "value": { "column_position": 3, "type": "timestamp", "encoding": "yyyy-MM-dd hh:mm:ss", "dst_type": "integer" } } 

在 repos/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json 的仓库中提供了此功能,我们将此上传到客户端实例。

source ./scripts/client-ip-address-list.sh scp -i .aws.pem ./recipes/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json ec2-user@${AERO_CLIENT_IPS[0]}:~ 

接下来,将数据放入客户端计算机。有多种方法可以执行此操作,但是您需要进行规划,因为未压缩时数据集为7.6GB。我使用了以下命令,但是具体情况取决于您的闪存和文件系统的具体情况

./scripts/client-quick-ssh.sh # to log in, followed by sudo mkfs.ext4 /dev/nvme1n1 sudo mkdir /data sudo mount -t ext4 /dev/nvme1n1 /data sudo chmod 777 /data wget -P /data https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz gunzip /data/trips_xaa.csv.gz 

最后,我们使用上传的配置文件加载数据。

cd ~/aerospike-loader ./run_loader -h 10.0.0.234 -p 3000 -n test -c ~/nyc-taxi-data-aero-loader-config.json /data/trips_xaa.csv 

请注意,我们使用的是我们先前记录的群集IP地址之一。

使用Spark

登录到Spark的一个节点,通过aerospike-ansible中的一个工具脚本

/scripts/spark-quick-ssh.sh 

使用我们在运行Spark集群安装手册时看到的Spark主URL启动Spark Shell。

/spark/bin/spark-shell --master spark://10.0.2.122:7077 

导入相关库

import org.apache.spark.sql.{ SQLContext, SparkSession, SaveMode} import org.apache.spark.SparkConf import java.util.Date import java.text.SimpleDateFormat 

提供Aerospike配置。请注意,我们在这里使用了之前的集群IP:

spark.conf.set("aerospike.seedhost", "10.0.0.234") spark.conf.set("aerospike.namespace", "test") 

定义一个视图,以及我们将要使用的功能

val sqlContext = spark.sqlContext sqlContext.udf.register("getYearFromSeconds", (seconds: Long) => (new SimpleDateFormat("yyyy")).format(1000 * seconds)) val taxi = sqlContext.read.format("com.aerospike.spark.sql").option("aerospike.set", "nyc-taxi-data").load taxi.createOrReplaceTempView("taxi") 

最后,运行我们的查询语句

// Journeys grouped by cab type val result = sqlContext.sql("SELECT cab_type,count(*) count FROM taxi GROUP BY cab_type") result.show() +--------+--------+ |cab_type| count| +--------+--------+ | green|20000000| +--------+--------+ // Average fare based on different passenger count val result = sqlContext.sql("SELECT passenger_cnt, round(avg(total_amount),2) avg_amount FROM taxi GROUP BY passenger_cnt ORDER BY passenger_cnt") result.show() +-------------+----------+ |passenger_cnt|avg_amount| +-------------+----------+ | 0| 10.86| | 1| 14.63| | 2| 15.75| | 3| 15.87| | 4| 15.85| | 5| 14.76| | 6| 15.42| | 7| 23.74| | 8| 19.52| | 9| 34.9| +-------------+----------+ // No of journeys for different numbers of passengers val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year, count(*) count FROM taxi GROUP BY passenger_cnt, getYearFromSeconds(pkup_datetime) order by passenger_cnt"); result.show() +-------------+---------+--------+ |passenger_cnt|trip_year| count| +-------------+---------+--------+ | 0| 2014| 4106| | 1| 2014|16557518| | 2| 2014| 1473578| | 3| 2014| 507862| | 4| 2014| 160714| | 5| 2014| 939276| | 6| 2014| 355846| | 7| 2014| 492| | 8| 2014| 494| | 9| 2014| 114| +-------------+---------+--------+ // Number of trips for each passenger count/distance combination // Ordered by trip count, descending val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year,round(trip_distance) distance,count(*) trips FROM taxi GROUP BY passenger_cnt,getYearFromSeconds(pkup_datetime),round(trip_distance) ORDER BY trip_year,trips desc") result.show() +-------------+---------+--------+-------+ |passenger_cnt|trip_year|distance| trips| +-------------+---------+--------+-------+ | 1| 2014| 1.0|5321230| | 1| 2014| 2.0|3500458| | 1| 2014| 3.0|2166462| | 1| 2014| 4.0|1418494| | 1| 2014| 5.0| 918460| | 1| 2014| 0.0| 868210| | 1| 2014| 6.0| 653646| | 1| 2014| 7.0| 488416| | 2| 2014| 1.0| 433746| | 1| 2014| 8.0| 345728| | 2| 2014| 2.0| 305578| | 5| 2014| 1.0| 302120| | 1| 2014| 9.0| 226278| | 5| 2014| 2.0| 199968| | 2| 2014| 3.0| 199522| | 1| 2014| 10.0| 163928| | 3| 2014| 1.0| 145580| | 2| 2014| 4.0| 137152| | 5| 2014| 3.0| 122714| | 1| 2014| 11.0| 117570| +-------------+---------+--------+-------+ only showing top 20 rows 

这篇文章向您展示了可以很快的启动并运行一个大型数据集。该示例处理了二千万行数据,并很容易扩展到整个数据集。我们还可以看到您可以快速启动并运行 Aerospike-ansible 工具。

本网页由快兔兔AI采集器生成,目的为演示采集效果,若侵权请及时联系删除。

原文链接:https://brands.cnblogs.com/aerospike/p/1883

更多内容