This article provides example code to load data from MariaDB database using jdbc connector in PySpark. The same approach can be applied to other relational databases like MySQL, PostgreSQL, SQL Server, etc.
Prerequisites
PySpark environment
You can install Spark on you Windows or Linux machine by following this article: Install Spark 3.2.1 on Linux or WSL. For macOS, follow this one: Apache Spark 3.0.1 Installation on macOS.
For testing the sample script, you can also just use PySpark package directly without doing Spark configurations:
pip install pyspark
For Anaconda environment, you can also install PySpark using the following command:
conda install pyspark
MariaDB environment
If you don't have MariaDB environment, follow Install MariaDB Server on WSL to create one.
The user and test table is created using the following SQL statements:
MariaDB [(none)]> CREATE USER 'spark'@'localhost' IDENTIFIED BY 'kontext';
Query OK, 0 rows affected (0.001 sec)
MariaDB [(none)]> GRANT SELECT ON *.* TO 'spark'@'localhost';
Query OK, 0 rows affected (0.000 sec)
MariaDB [(none)]> create database test;
Query OK, 1 row affected (0.002 sec)
MariaDB [(none)]> use test;
Database changed
MariaDB [test]> create table customers (id int, name varchar(100));
Query OK, 0 rows affected (0.370 sec)
MariaDB [test]> insert into customers values (1,'Kontext'),(2,'Seb'),(3,'Mary');
Query OK, 3 rows affected (0.054 sec)
Records: 3 Duplicates: 0 Warnings: 0
MariaDB [test]> select * from customers;
+------+---------+
| id | name |
+------+---------+
| 1 | Kontext |
| 2 | Seb |
| 3 | Mary |
+------+---------+
3 rows in set (0.001 sec)
In the following sections, we will read data from this sample table test.customerson localhostserver with user spark and password 'kontext'.
When login with this user, we should be able to query the data:
$ mariadb -u spark -pkontext
Welcome to the MariaDB monitor. Commands end with ; or \g.
Your MariaDB connection id is 54
Server version: 10.3.34-MariaDB-0ubuntu0.20.04.1 Ubuntu 20.04
Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
MariaDB [(none)]> select * from test.customers;
+------+---------+
| id | name |
+------+---------+
| 1 | Kontext |
| 2 | Seb |
| 3 | Mary |
+------+---------+
3 rows in set (0.000 sec)
MariaDB JDBC driver
As we will use JDBC to connect to MariaDB, let's download the driver from the official website MariaDB Products & Tools Downloads | MariaDB. Download the right version based on your JDK version accordingly. For my WSL environment, I am using 3.0.5-GA version for Java 8+.
Save the downloaded jar file (mariadb-java-client-3.0.5.jar) to your PySpark project folder.
Read from MariaDB database
Now we can create a PySpark script (mariadb-example.py) to load data from Oracle database as DataFrame.
# mariadb-example.py
from pyspark.sql import SparkSession
appName = "PySpark Example - MariaDB Example"
master = "local"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
sql = "select * from test.customers"
database = "test"
user = "spark"
password = "kontext"
server = "localhost"
port = 3306
jdbc_url = f"jdbc:mysql://{server}:{port}/{database}?permitMysqlScheme"
jdbc_driver = "org.mariadb.jdbc.Driver"
# Create a data frame by reading data from Oracle via JDBC
df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("query", sql) \
.option("user", user) \
.option("password", password) \
.option("driver", jdbc_driver) \
.load()
df.show()
Run the script with the following command:
spark-submit --jars mariadb-java-client-3.0.5.jar mariadb-example.py
Argument --jars
will add the jar package to both driver and executor containers.
Output:
+---+-------+
| id| name|
+---+-------+
| 1|Kontext|
| 2| Seb|
| 3| Mary|
+---+-------+
About the JDBC URL
As Spark doesn't recognize 'jdbc:mariadb', we have to use JDBC URL like MySQL: f"jdbc:mysql://{server}:{port}/{database}?permitMysqlScheme". This will work with the MariaDB JDBC driver even the scheme is specified asjdbc:mysql.
This is important otherwise you may encounter conversion issues like the following:
java.sql.SQLDataException: value 'id' cannot be decoded as Integer
at org.mariadb.jdbc.plugin.codec.IntCodec.decodeTextInt(IntCodec.java:127)
at org.mariadb.jdbc.codec.TextRowDecoder.decodeInt(TextRowDecoder.java:42)
at org.mariadb.jdbc.codec.RowDecoder.getIntValue(RowDecoder.java:145)
at org.mariadb.jdbc.client.result.Result.getInt(Result.java:399)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$7(JdbcUtils.scala:431)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$7$adapted(JdbcUtils.scala:430)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:367)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:349)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
You detailed message can be different but the it should follow this format: value '{column name}' cannot be decoded as {data type}. This is because MariaDB server returns column names instead of column values without the setup. For instance, the query in this example returns the following data:
+---+-------+
| id| name|
+---+-------+
| id| name|
| id| name|
| id| name|
+---+-------+
Thus the data type conversion error throw out. For more details, you can refer to this StackOverflow topic: mysql - Spark MariaDB jdbc SQL query returns column names instead of column values - Stack Overflow.