In article Spark - Read from BigQuery Table, I provided details about how to read data from BigQuery in PySpark using Spark 3.1.1 with GCS connector 2.2.0. This article continues the journey about reading JSON file from Google Cloud Storage (GCS) directly. You can use similar APIs to read XML or other file format in GCS as data frame in Spark.
Prerequisites
Similar as the previous article, I will directly use PySpark standalone installation in Cloud Shell to test out the function. Refer to Spark - Read from BigQuery Table about installation guide and also service account permissions and credential file setup.
Cloud storage connector
Cloud storage connector is required to read from GCS in Spark.
Refer to GitHub for more details. hadoop-connectors/gcs at master · GoogleCloudDataproc/hadoop-connectors (github.com).
For simplicity, we will directly pass in the package as a dependency when submitting the application:
spark-submit --packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.0
Without this connector, an error will be thrown out:
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"
Input JSON file
The input JSON file has the following content:
[{"ID":1,"ATTR1":"ABC"},
{"ID":2,"ATTR1":"DEF"},
{"ID":3,"ATTR1":"GHI"}]
File is uploaded to a GCS bucket.
Create the script file
Create a python script named pyspark-gcs.py with the following content:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
appName = "PySpark Example - Read JSON file from GCS"
master = "local"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
# Create a schema for the dataframe
schema = StructType([
StructField('ID', IntegerType(), True),
StructField('ATTR1', StringType(), True)
])
# Create data frame
json_file_path = 'gs://YOUR_BUCKET/test.json'
df = spark.read.json(json_file_path, schema, multiLine=True)
print(df.schema)
df.show()
Remember to change the bucket address accordingly.
In the above code, we also dynamically added Hadoop file system configuration properties for Google Cloud Storage. If you have a Hadoop and Spark cluster (not Dataproc), you can also directly update core-site.xml configuration file as specified by GCS connector installation documentation.
Run the script file
Use the following command to run the script:
spark-submit --packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.0 pyspark-gcs.py
We use the latest GCS connector 2.2.0 (when the article is written) for Hadoop 3 to read from GCS files.
The output looks like the following:
+---+-----+
| ID|ATTR1|
+---+-----+
| 1| ABC|
| 2| DEF|
| 3| GHI|
+---+-----+
About guava library
For PySpark 3.1.1, the referenced guava library version is 14.0.1 (guava-14.0.1.jar ). However for GCS Hadoop 3 2.2.0 version, the referenced guava library version is 30.1-jre. This can cause issues like the following:
Caused by: java.lang.NoSuchMethodError: 'void com.google.common.base.Preconditions.checkArgument(boolean, java.lang.String, long, long)'
To address this issue, we need to ensure the versions are consistent and compatible.
Follow these steps to fix this issue:
Download the version of guava used by GCS connector:
wget https://repo1.maven.org/maven2/com/google/guava/guava/30.1-jre/guava-30.1-jre.jar
Backup existing guava library in PySpark:
mv /home/$USER/.local/lib/python3.7/site-packages/pyspark/jars/guava-14.0.1.jar /home/$USER/.local/lib/python3.7/site-packages/pyspark/jars/guava-14.0.1.jar.bk
Copy the downloaded guava jar file to PySpark 3.1.1 jars folder:
cp guava-30.1-jre.jar /home/$USER/.local/lib/python3.7/site-packages/pyspark/jars
infoMy PySpark is installed into /home/$USER/.local folder by default in Cloud Shell. Please change the paths accordingly if that is not the case for your environment.
About authentication
Authentication is done via the JSON credential file which is specified by environment variable GOOGLE_APPLICATION_CREDENTIALS. You can alternatively specify it via Spark configurations.
References
Read JSON file as Spark DataFrame in Python / Spark
Use the Cloud Storage connector with Apache Spark (google.com)