在我的上一篇文章中,我已经介绍了如何在Windows上设置和使用Hadoop。现在,这篇文章是关于在Windows操作系统上为Apache Spark配置一个本地开发环境。
Apache Spark是最流行的集群计算技术,为快速和可靠的计算而设计。它提供了隐含的数据并行性和默认的容错。它很容易与HIVE和HDFS集成,并提供了一个无缝的并行数据处理的经验。你可以在 https://spark.apache.org 阅读更多关于Spark的信息。
默认情况下,Spark SQL项目不在Windows操作系统上运行,需要我们先进行一些基本的设置;这就是我们在这篇文章中要讨论的,因为我没有在互联网上或书中找到很好的记录。
这篇文章也可用于在Mac或Linux上建立Spark开发环境。只要确保你从Spark的网站上下载了正确的操作系统版本。
你可以从GitHub这里参考本文中使用的Scala项目。 https://github.com/gopal-tiwari/LocalSparkSql .
在本文结束时,你应该能够在Windows操作系统上创建/运行你的 Spark SQL 项目和 spark-shell。
我把这篇文章分为三个部分。你可以根据你的具体使用情况,遵循这三种模式中的任何一种。
你可能也喜欢。 《Apache Spark大全》[教程和文章] 。
设置时间:15分钟
功能性。有限
设置时间:20分钟
功能。扩展
设置时间:40分钟
功能。
许多人可能已经尝试在Windows上运行spark,在运行项目时可能面临以下错误。
16/04/02 19:59:31 WARN NativeCodeLoader: Unable to load native-hadoop library for
your platform... using builtin-java classes where applicable
1604/02 19:59:31 ERROR Shell: 未能在 hadoop
二进制路径中找到 winutils 二进制文件 java.io.IOException: 无法在Hadoop二进制文件中找到可执行文件 null\bin\winutils.exe
。
这是因为你的系统没有用于Windows操作系统的Hadoop二进制文件。
你可以按照我之前的 文章 建立一个,或者从 https://github.com/cdarlint/winutils 下载一个。
下面的错误也与Windows操作系统的本地Hadoop二进制文件有关。
16/04/03 19:59:10 ERROR util.Shell: 未能在
hadoop二进制路径中找到winutils二进制文件 java.io.IOException: 无法在Hadoop二进制文件中找到可执行文件
C:\hadoop\bin\winutils.exe。
解决办法是一样的。我们需要用Native Windows二进制文件来设置 HADOOP_HOME
。
所以,只要跟着这篇文章走,在本教程结束时,你应该能够摆脱所有这些错误。
对于32位(x86)操作系统,你只需要安装a.,对于64位(x64)请安装a.和b.。
对于本教程,我们假设Spark和Hadoop二进制文件在你的 C:\ 驱动器中解压。然而,你可以在你系统中的任何位置解压它们。
在我们进一步进行之前,让我们确保你的Java设置是正确的,环境变量也是根据Java的安装目录更新的。
要确认Java已经安装在你的机器上,只需打开cmd并输入 java –version
。 你应该可以看到你的系统上安装的Java的版本。
如果你得到一个错误信息,如"'java'未被识别为内部或外部命令、可操作程序或批处理文件",那么请遵循以下步骤。否则,跳过它。
JAVA_HOME
,在 变量值 字段中提供你的JDK安装路径。
java –version
,你应该可以看到你刚安装的Java的版本。 如果你的命令提示符有点像上面的图片,你就可以了。否则,你需要检查你的安装版本是否与你的操作系统架构相匹配(x86,x64)。环境变量的路径也可能不正确。
scala –version
。你的命令提示符应该如下图所示。 HADOOP_HOME
系统环境变量。 HADOOP_HOME
,打开 环境变量 对话框,在 系统变量 部分点击 新建... 按钮,填写 名称 和 值 文本框,如下图所示。 注意:如果你没有管理员权限来添加环境变量,不用担心,因为你可以在你的IDE中为每个项目单独设置它。这个过程将在下面的章节中解释。
现在,让我们创建一个名为 "LocalSparkSql "的Scala-Maven新项目。或者,你也可以从GitHub上克隆它。 https://github.com/gopal-tiwari/LocalSparkSql .
项目结构看起来像这样。
LocalSparkHiveTest.scala
package org.connected.spark
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object LocalSparkHiveTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.enableHiveSupport()
.master("local")
.appName("Demo")
.getOrCreate()
spark.sql("SHOW DATABASES").show
spark.sql("CREATE DATABASE IF NOT EXISTS sparkdemo")
spark.sql("CREATE TABLE IF NOT EXISTS sparkdemo.table1(id INT, name STRING)")
spark.sql("SHOW DATABASES").show
import spark.implicits._
val df:DataFrame = Seq(
(1, "One"),
(2, "Two"),
(3, "Three")
).toDF("id","name")
df.write.mode(SaveMode.Append).format("hive").saveAsTable("sparkdemo.table1")
//Thread.sleep(60 * 1000)
spark.sql("SELECT * FROM sparkdemo.table1").show(false)
println(spark.sql("select * from sparkdemo.table1").count)
}
}
pom.xml
4.0.0
org.connected.spark
LocalSparkSql
1.0.0-SNAPSHOT
org.apache.spark
s park-core_2.12
2.4。 4
org.apache.spark
s park-sql_2.12
2.4.4
org.apache.spark
s park-hive_2.12
2.4.4
现在,让我们右击 "LocalSparkHiveTest" 文件并点击 运行 。
线程 "main "中出现异常 org.apache.spark.sql.AnalysisException。
java.lang.RuntimeException: java.io.IOException: (null)命令字符串中的条目。
null chmod 0733 C:\tmp\hive;
如果你看到上述错误,说明你的 HADOOP_HOME
没有正确设置。
如果你由于管理权限问题而无法设置系统环境变量,你可以在项目层面上通过下面的几个步骤进行设置,否则,你可以跳到下一个指令。
现在,让我们再次尝试运行主对象
线程 "main "中出现异常 org.apache.spark.sql.AnalysisException:
java.lang.RuntimeException: java.lang.RuntimeException: root scratch dir:
HDFS上的/tmp/hive应该是可写的。当前的权限是。---------;
为了解决这个错误,我们需要打开Hive默认临时目录的权限。
在这里,我们需要打开你的Hadoop主页,然后进入 /bin 目录并执行以下命令。
winutils.exe chmod 777 /tmp/hive
授予的权限应该是 "drwxrwxrwx",你可以使用下面的命令检查权限状态
winutils.exe ls \tmp\hive
如果你的机器没有Microsoft Visual C++ 2010 Redistributable Package,你可能得到以下错误。
由于没有找到MSVCR100.dll,代码执行无法进行。
重新安装程序可能会解决这个问题。
你可以从本文的 下载 部分下载并安装C++ 2010 Redistributable Package,因为我在那里提供了直接的下载链接。
现在,让我们再次执行该项目,你应该可以毫无问题地创建一个数据库和表。
输出。
现在,你可能已经注意到在你项目的根文件夹下已经创建了两个新的目录, metastore_db 和 spark-warehouse 。
这些目录是什么?
Spark SQL使用一个Hive元存储来管理用户创建的数据库和表的元数据信息。你可以认为它是一个小型的关系型数据库,存储了实际的数据库目录路径、表结构、分区列、文件位置等信息。
默认情况下,Spark带有一个嵌入式Derby Db支持来管理元数据。关于Derby的更多信息可以在这里找到 https://db.apache.org/derby/ 。
仓库目录是一个写入表数据的位置。默认情况下,Spark将该目录创建为spark-warehouse。
你可以在这里得到更多的信息 https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-hive-metastore.html 。
现在,如果我们用我们用于 LocalSparkSql 项目的相同代码创建另一个新项目,我们可以注意到,我们无法访问由我们之前的项目创建的数据库" s parkdemo" 。这种行为背后的原因是,对于每个项目, metastore_db 和 s park-warehouse 都会被重新创建,而且它们是特定于该项目的,因为它们被创建在项目的根目录下。
然而,我们可以连接到其他项目的metastore和仓库,但更好的方法是创建一个共同的独立的metastore_db和spark-warehouse目录,并通过向SparkSession添加一些额外的配置在多个项目之间共享,这就是我们将在下一节讨论的内容。
为了模拟这种行为,我们需要创建一个新的项目并尝试访问一个共同的 metastore_db 和仓库。
在这个演示中,我们将使用目录位置,如下所示。
对于metastore_db C:\tmp\hive\metastore_db
对于warehouse C:\tmp\hive\spark-warehouse
为了让Spark引用我们新的公共目录,我们需要在创建Spark会话对象的时候添加以下配置。
s park.sql.warehouse.dir = C:/tmp/hive/spark-warehouse
javax.jdo.option.ConnectionURL = jdbc:derby:;databaseName=C:/tmp/hive/metastore_db;create=true
代码应该看起来像。
val spark: SparkSession = SparkSession.builder()
.enableHiveSupport()
.master("local")
.appName("Demo")
.config("spark.sql.warehouse.dir","C:/tmp/hive/spark-warehouse")
.config("javax.jdo.option.ConnectionURL","jdbc:derby:;databaseName=C:/tmp/hive/metastore_db;create=true")
.getOrCreate()
你需要在你的每个项目中使用上述配置,让项目访问其他应用程序创建的数据库和表。
现在,为了检查上述配置的有效性,我们可以启动一个spark-shell并尝试访问一个" s parkdemo" 数据库和 "table1"。
在我们启动spark-shell之前,我们需要通过以下步骤将其配置为指向我们的公共元存储和仓库。
spark.driver.extraJavaOptions -Dderby.system.home=C:/tmp/hive
spark.sql.warehouse.dir C:/tmp/hive/spark-warehouse
spark-shell
。 现在,让我们试着通过运行 SHOW DATABASES
并在shell中选择table1的所有数据来列出所有的数据库。
上述结果验证了我们的共享元存储配置是正确的,因为我们能够访问由上述代码创建的表。
现在,如果你继续打开spark-shell,并试图在同一时间点运行你的scala项目,你会得到以下错误。
Unable to open a test connection to the given database. JDBC url = jdbc:derby:;databaseName=C:/tmp/hive/metastore_db;create=true, username = APP. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ------
java.sql.SQLException: Failed to start database 'C:/tmp/hive/metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@4262fdeb, see the next exception for details.
这是因为我们使用的是Spark的嵌入式 derbyDb ,一个应用程序(即spark-shell)已经直接连接到共享的 metastore_db 数据库并获得了锁,所以另一个实例无法启动。如果我们可以使用网络服务连接到derby,而不是直接连接到数据库,这种情况是可以避免的。但是,我们没有运行一个derby服务器实例,所以在这种情况下我们不能这样做。
然而,我们可以在一个单独的关系型数据库中创建Hive元存储,并让Spark连接到该数据库以实现多连接设置。我们将在下一节讨论这个问题。
为了在这种模式下配置本地元存储,我们需要下载并安装下面列出的一些额外的组件,以及本文" 下载" 部分中指定的其他组件。
注意:直接跳到文章的这一部分并不是一个好主意,因为你可能会错过一些重要的和必须的步骤,所以请从一开始就按照文章的要求来完成配置。
CREATE DATABASE metastore_db;
USE metastore_db;
一旦这两个SQL文件在你的 metastore_db 上成功执行,你就可以配置你的项目来访问这个metastore了。所以,让我们通过以下步骤来配置spark-shell和Scala/Java项目。
在我们启动spark-shell之前,我们需要配置它来使用我们常用的MySQL元存储和仓库。请按照以下步骤来配置shell。
hive-site.xml
javax.jdo.option.ConnectionURL
jdbc:mysql://localhost:3306/metastore_db?ceateDatabaseIfNotExist=true
JDBC metastore的JDBC连接字符串
javax. jdo.option.ConnectionDriverName
com.mysql.cj.jdbc.Driver
JDBC元存储的驱动类名称
javax.jdo.option.ConnectionUserName
root. ConnectionUserName
root
javax.jdo.option.ConnectionPassword
root
hive.metastore.warehouse.dir
C:/tmp/hive/spark-warehouse
仓库默认数据库的位置
spark.sql("CREATE DATABASE IF NOT EXISTS sparkdemo")
spark.sql(
s"""
CREATE TABLE IF NOT EXISTS sparkdemo.table2
(
id INT,
name STRING
)
PARTITIONED BY(
date STRING
)
STORED AS PARQUET
""")
import org.apache.spark.sql.{DataFrame, SaveMode}
import spark.implicits._
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")
val df2: DataFrame = Seq(
(1, "One","2020-01-01"),
(2, "Two","2020-01-03"),
(3, "Three","2020-01-11")
).toDF("id", "name","date")
df2.write.mode("overwrite").insertInto("sparkdemo.table2")
spark.sql("SHOW DATABASES").show
spark.sql(“SELECT * FROM sparkdemo.table2”).show
spark.sql("SHOW PARTITIONS sparkdemo.table2").show
输出。 SQL命令的输出
现在,我们需要验证我们是否可以打开多个连接到Hive元存储。然后,我们可以检查我们是否可以在本地Windows系统中同时运行多个Spark项目。让我们在下一节中尝试在不关闭当前spark-shell的情况下从一个单独的项目访问元存储。
现在,让我们在 "LocalSparkSql "项目中创建一个新的Scala对象, LocalMySQLMetastoreTest
。
在这个对象中,我们将尝试在 s parkdemo.table2 , 中插入一些更多的记录,这是我们刚刚用spark-shell创建的。
现在,我们需要提供一些与MySQL服务器地址和证书有关的额外配置,以便它可以被 SparkSession 实例用来连接到我们新的MySQL元存储。
下面给出了一个配置样本,以及完整的代码。
val spark: SparkSession = SparkSession.builder()
.enableHiveSupport()
.master("local")
...
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql:// : /metastore_db" )
.config("javax.jdo.option.ConnectionDriverName", " " )
.config("javax.jdo.option.ConnectionUserName", " " )
.config("javax.jdo.option.ConnectionPassword", " " )
...
.getOrCreate()
由于我们将使用MySQL JDBC进行连接,我们需要在 pom.xml 中添加MySQL JDBC驱动作为依赖,如下图所示。
...
mysql
mysql-connector-java
8.0.18
..
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object LocalMySQLMetastoreTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.enableHiveSupport()
.master("local")
.appName("Demo")
.config("spark.sql.warehouse.dir", "C:\\tmp\\hive\\spark-warehouse")
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql://localhost:3306/metastore_db")
.config("javax.jdo.option.ConnectionDriverName", "com.mysql.cj.jdbc.Driver")
.config("javax.jdo.option.ConnectionUserName", "root")
.config("javax.jdo.option.ConnectionPassword", "root")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.getOrCreate()
import spark.implicits._
spark.sql("CREATE DATABASE IF NOT EXISTS sparkdemo")
spark.sql(
s"""
CREATE TABLE IF NOT EXISTS sparkdemo.table2
(
id INT,
name STRING
)
PARTITIONED BY(
date STRING
)
STORED AS PARQUET
""")
val df2: DataFrame = Seq(
(4, "Four", "2020-01-13"),
(5, "Five", "2020-01-13"),
(6, "Six", "2020-01-15")
).toDF("id", "name", "date")
df2.write.mode(SaveMode.Overwrite).insertInto("sparkdemo.table2")
spark.sql("SELECT * FROM sparkdemo.table2").show
spark.sql("SHOW PARTITIONS sparkdemo.table2").show
}
}
4.0.0
org.connected.spark
LocalSparkSql
1.0.0-SNAPSHOT
org.apache.spark
s park-core_2.12
2.4.4
org. apache.spark
s park-sql_2.12
2.4.4
org.apache.spark
s park-hive_2.12
2.4.4
mysql
mysql-connector-java
8.0.18
现在,让我们运行我们的 LocalMySQLMetastoreTest 对象。其输出结果应该如下图所示。
注意:在某些情况下,你可能无法看到其他spark-shell/项目的新添加的分区或数据,因为这是Hive/Spark元存储管理的预期行为。你可以考虑执行ALTER TABLE ADD PARTITIONS或spark.catalog.refreshTable("dbname.tablame")来反映新的数据。
在用相同的数据再运行一次该项目后,当我试图在我的旧火花-shell中运行 spark.sql("SELECT * FROM sparkdemo.table2").show
时,我得到了以下异常。
java.io.FileNotFoundException。
file:/C:/tmp/hive/spark-warehouse/sparkdemo.db/table2/date=2020-01-13/part-000
00-666cc8ed-b44b-4025-9606-e5d9e660c8db.c000不存在
这背后的原因很简单,我们从shell外部添加/修改了一些额外的行/分区,所以我们spark-shell的metastore目录并不知道这些变化。
运行下面的代码将执行元数据的强制更新,并最终解决这个问题
spark.catalog.refreshTable("sparkdemo.table2")
现在,在shell中执行 spark.sql("SELECT * FROM sparkdemo.table2").show
会得到以下的更新结果。
我希望这个关于设置本地Spark开发环境的扩展演示让你对与本地设置相关的配置有一个全面深入的了解。我已经尽力涵盖了尽可能多的故障情况,但如果你有任何其他问题、疑问或建议,可以在下面的评论中分享。
感谢阅读!!