Mysql Lab Connection via JDBC from spark-shell

apache-spark

#1

Dear Itversity,

Unable to resolve the error. Its really the options are needed while launching the spark shell.
Totally confused.

Spark shell

spark-shell --driver-java-options --master yarn --conf spark.ui.port=1100 --conf spark.driver.extraClassPath=mysql-connector-java-5.1.6.jar
–packages=“mysql:mysql-connector-java:5.1.6” --jars mysql-connector-java-5.1.6.jar

Code

scala> :paste
// Entering paste mode (ctrl-D to finish)
import java.util.Properties

val url = “jdbc:mysql://ms.itversity.com:3306”
val props = new Properties()
props.put(“user”, “retail_user”)
props.put(“password”, “itversty”)

println("connection successfull")

val query = "SELECT order_status, count(1) FROM retail_db.orders GROUP BY order_status;"

val orderDF = spark.read
  .option("driver", "com.mysql.jdbc.Driver").
  jdbc(url, s"$query", props)

orderDF.show()

println("Dataframe creation successfull")

// Exiting paste mode, now interpreting.

connection successfull
java.sql.SQLException: Access denied for user ‘retail_user’@‘gw03.itversity.com’ (using password: YES)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1055)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:956)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3491)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3423)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:910)
at com.mysql.jdbc.MysqlIO.secureAuth411(MysqlIO.java:3923)
at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1273)
at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2031)
at com.mysql.jdbc.ConnectionImpl.(ConnectionImpl.java:718)
at com.mysql.jdbc.JDBC4Connection.(JDBC4Connection.java:46)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

And the same below code working fine

[anuvenkatesheee@gw03 ~]$ spark-shell --driver-java-options “-Dlog4j.configuration=file:////home/anuvenkatesheee/log4j.properties” --master yarn --conf spark.ui.port=1100 --conf spark.driver.extraClassPath=mysql-connector-java-5.1.6.jar --packages=“mysql:mysql-connector-java:5.1.6” --jars mysql-connector-java-5.1.6.jar
SPARK_MAJOR_VERSION is set to 2, using Spark2
Ivy Default Cache set to: /home/anuvenkatesheee/.ivy2/cache
The jars for the packages stored in: /home/anuvenkatesheee/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.5.0.0-1245/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
mysql#mysql-connector-java added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found mysql#mysql-connector-java;5.1.6 in central
:: resolution report :: resolve 1166ms :: artifacts dl 5ms
:: modules in use:
mysql#mysql-connector-java;5.1.6 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/6ms)
Setting default log level to “WARN”.
To adjust logging level use sc.setLogLevel(newLevel).
18/04/08 04:11:18 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.16.1.113:1100
Spark context available as ‘sc’ (master = yarn, app id = application_1520592249193_43088).
Spark session available as ‘spark’.
Welcome to
____ __
/ / ___ / /
\ / _ / _ `/ __/ '/
/
/ .__/_,// //_\ version 2.0.0.2.5.0.0-1245
/
/

Using Scala version 2.11.8 (Java HotSpot™ 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val dataframe_mysqlss = spark.read.format(“jdbc”).option(“url”,
| “jdbc:mysql://ms.itversity.com:3306/retail_db”).option(“driver”,
| “com.mysql.jdbc.Driver”).option(“dbtable”, “orders”).option(“user”,
| “retail_user”).option(“password”, “itversity”).load()
dataframe_mysqlss: org.apache.spark.sql.DataFrame = [order_id: int, order_date: timestamp … 2 more fields]

scala>

scala> dataframe_mysqlss.show
±-------±-------------------±----------------±--------------+
|order_id| order_date|order_customer_id| order_status|
±-------±-------------------±----------------±--------------+
| 1|2013-07-25 00:00:…| 11599| CLOSED|
| 2|2013-07-25 00:00:…| 256|PENDING_PAYMENT|
| 3|2013-07-25 00:00:…| 12111| COMPLETE|
| 4|2013-07-25 00:00:…| 8827| CLOSED|
| 5|2013-07-25 00:00:…| 11318| COMPLETE|
| 6|2013-07-25 00:00:…| 7130| COMPLETE|
| 7|2013-07-25 00:00:…| 4530| COMPLETE|
| 8|2013-07-25 00:00:…| 2911| PROCESSING|
| 9|2013-07-25 00:00:…| 5657|PENDING_PAYMENT|
| 10|2013-07-25 00:00:…| 5648|PENDING_PAYMENT|

What wrong with my first code. Am wondering what happens.

Regards
Venkatesh


#2

Dear Venkat,

We don’t have access to read retail_db database.

Below code will work.

[rrajkumar9999@gw01 ~] spark-shell --master yarn --conf spark.ui.port=11111 --conf spark.driver.extraClassPath=Jar/mysql-connector-java-6.0.6.jar --packages=“mysql:mysql-connector-java:6.0.6”

scala> val connection_url=“jdbc:mysql://nn01.itversity.com:3306/retail_import”

scala> val mysql_props = new java.util.Properties

scala> mysql_props.setProperty(“user”,“retail_dba”)

scala> mysql_props.setProperty(“password”,“itversity”)

scala> val departmentsDF = sqlContext.read.jdbc(connection_url,“departments”,mysql_props)

scala> departmentsDF.show()
±------------±--------------+
|department_id|department_name|
±------------±--------------+
| 2| Fitness|
| 3| Footwear|
| 4| Apparel|
| 5| Golf|
| 6| Outdoors|
| 7| Fan Shop|
±------------±--------------+

Thanks
Raj.


#3

Dear Raj,

Thanks for the reply. You are good but am pointing to “ms.itversity.com”, and you are pointing to “nn01.itversity.com”.


#4

@kmln can you please have a look on this issue. correct me where I did wrong.


#5

Hello Venkatesh,

Try to read the documentation before writing code. JDBC accepts connection string, table name and options.

Also your jdbc url connection does not mention which database it is connecting to. It is difficult for others to debug your code always. You have to try debugging the code based on the errors. Pay more attention to the errors which the code is throwing. You can then try and fix it from there. I have attached the working code below.

val url = "jdbc:mysql://ms.itversity.com:3306/retail_db"
val orderDF = spark.read.option("driver", "com.mysql.jdbc.Driver").jdbc(url, "orders", props)

Regards,
Koushik


#6

@kmln
Dear Koushik,

Thanks for the reply,

Note: We have an option to pass the query in JDBC connection.

Databricks Document Link

Push down a query to the database engine:
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#push-down-a-query-to-the-database-engine

I debug the error.

Need to provide an alias, that is missing in my earlier query.

Here are the results

scala> :paste
// Entering paste mode (ctrl-D to finish)

import java.util.Properties

val url = “jdbc:mysql://ms.itversity.com:3306”

val props = new Properties()

props.put(“user”, “retail_user”)

props.put(“password”, “itversity”)

println(“connection successfull”)
val query = “SELECT order_status, count(1) FROM retail_db.orders GROUP BY order_status”

val orderDF1 = spark.read.option(“driver”, “com.mysql.jdbc.Driver”).jdbc(url, s"($query) as cus", props)

// Exiting paste mode, now interpreting.

connection successfull
import java.util.Properties
url: String = jdbc:mysql://ms.itversity.com:3306
props: java.util.Properties = {user=retail_user, password=itversity}
query: String = SELECT order_status, count(1) FROM retail_db.orders GROUP BY order_status
orderDF1: org.apache.spark.sql.DataFrame = [order_status: string, count(1): bigint]

scala> orderDF1.show()
±--------------±-------+
| order_status|count(1)|
±--------------±-------+
| CANCELED| 1428|
| CLOSED| 7556|
| COMPLETE| 22899|
| ON_HOLD| 3798|
| PAYMENT_REVIEW| 729|
| PENDING| 7610|
|PENDING_PAYMENT| 15030|
| PROCESSING| 8275|
|SUSPECTED_FRAUD| 1558|
±--------------±-------+