Easy Guide to Create a Custom Read Data Source in Apache Spark 3

Step by step guide for writing a custom read data source in Apache Spark 3.0.x with location-aware and multi-partition support

Amar Gajbhiye
Level Up Coding

--

Photo by Andrik Langfield on Unsplash

Apache Spark is a very powerful distributed execution engine. As we read through its documentation and examples, even with all its complex functionalities, we find it relatively easy to use. As we delve deep into it and try to solve real-life use cases, despite its features packed functionalities we need to write some customizations. One thing that really stands out about Apache Spark is its ability to support these customizations inherently. Spark has not just provided different interfaces to plug-in such customizations but it also keeps evolving those to overcome the limitations.

What will we learn?

  • What are the data source APIs and their importance?
  • We’ll discuss all the interfaces that are needed to implement read data source in Spark 3.0.x
  • Significance of those interfaces and their deployment on a cluster, Driver vs Executor
  • Then, we will implement those interfaces and will create a simple CSV read data source
  • In the end, we will learn how to write a slightly complex location-aware, a multi partitioned read data source.

So, without wasting any time, let us get started.

What is the data source?

The data source is a very popular functionality in Apache Spark. It is used very extensively used by many developers to connect third-party applications with Apache Spark. As of version 2.4.x, there are two variations of it, V1 and V2.

V1 was available before v 2.3.x. V2 APIs were introduced in 2.3.0 and modified in 2.4.0. In 3.0.0, Spark has introduced major changes in v2 APIs, however, v1 are kept intact for backward compatibility.

In one of the previous article, I discussed how to create a custom data source with v2 data source APIs. In this article, we will learn about creating a read data source for Apache Spark 3.0.x

Data source interfaces

  • TableProvider
  • Table
  • ScanBuilder
  • Scan
  • Batch
  • InputPartition
  • PartitionReaderFactory
  • PartitionReader

Now, let us discuss each one of these and their purpose in detail.

TableProvider

In Spark 2.4.x, the primary interface in data source APIs was DatasourceV2, all the custom data sources needed to implement it or one or other specialization of it like ReadSupport or WriteSupport. This interface is removed in 3.0.x. Instead, a new TableProvider interface is introduced. It is a base interface for all the custom data sources that don’t need to support DDL. Implementation of this interface should have a 0-argument public constructor. Let us implement it for our custom CSV data source and see how it looks.

Our data source can accept schema from the external sources, like user-specified schema through DataFrameReader.read(), we return true from supportsExternalMetadata and won’t implement inferschema.

Table

This interface defines a single logical entity which represents a structured data. It could be a file/folder for a file system based data source, a topic for Kafka, or a table for JDBC data source. It can be mixed with SupportsRead and SupportsWrite to add read and write capabilities. In our example, a CSV file will be a table. The capabilities API returns all the capabilities of the table. For our simple read implementation, let us just return BATCH_READ.

Scan

For our simple read data source, we returned BATCH_READ. Other read capabilities are MICRO_BATCH_READ and CONTINUOUS_READ. SupportsRead interface has newScanBuilder() API which returns Scan. Scan represents a logical plan of data source scan which can be either Batch, MicroBatchStream, or ContinuousStream. For this example, we will implement toBatch() API, since, we have added BATCH_READ as capabilities in table capabilities.

Batch

Batch represents a physical plan. At runtime, the logical table scan is converted to a physical scan. This is the interface where data source partitions are planned. It defines a factory that is sent over to the executor to create a PartitionReader for each InputPartition.

InputPartition and PartitionReader

After using all the configured options, applying runtime optimizations like pushing down filters, column pruning, etc, the physical plan is created i.e., Batch. This involves creating InputPartition and PartitionReaderFactory. They get deployed on the Executors. On every Executor, for each partition, an instance of PartitionReader is created using PartitionReaderFactory. PartitionReader does the job of reading data from the data storage, converting it into InternalRow using a given schema.

So, it’s done. We have successfully defined a simple custom data source with reading capability. The complete source code of this read data source is shared here.

Now, let us use all this learning and define a location-aware, multi-partition data source.

Multi Partition Data Source with preferred location

We have defined a basic CSV data source that supports external metadata and has a single partition. Now, let us see how to define a custom JDBC data source with multiple partitions, without the need for external schema and with preferred location support for partitions.

We need three things from user inputs.

  1. A number of partitions
  2. A table column on which data should be partitioned.
  3. Locality information for each partition

JDBC can infer a schema, thus TableProvider implementation will return Schema from inferSchema API implementation.

Implementations for ReadSupport, ScanBuilder and Scan interfaces are similar. Batch implementation, where partitions are planned, will be different. JdbcBatch would plan input partitions using given inputs i.e. a number of partitions and partitioning column. It would fetch the partitioning column data and divide it into number partitions. So, each instance of JdbcInputPartition would get a chunk of partitioning column data.

For each instance of JdbcInputPartition, JdbcPartitionReader would be created. In turn, each instance of the reader will only fetch a part of data for the table passing those ids as filters. Each partition read runs in parallel, thus data fetch would be faster.

What is location awareness?

Assume, we are writing a custom data source for a distributed, multi-node, partitioned storage like HDFS. If we run data read closer to the HDFS partition, it will perform much faster. Instead of reading data over the network, it would read it locally, if it runs on the same node. Running each data source partitions read closer to the closer, considering locality of data storage is making it location-aware.

Partitions locality information can be specified through the preferredLocation API on InputPartition interface. It can either be a host address or hostname. As per the API documentation, Apache spark tries to run the partition at the given location but doesn’t guarantee it. It uses spark.locality.wait configuration before running it on a different node in the cluster.

We can use preferredLocation to run data fetch closer to the source and speed up the process but locality is not guaranteed. So, read logic should not be location-dependent and should not fail when partition runs on a different node.

Now, let us run this on a cluster. Steps to set up an Apache Spark Standalone cluster on windows are shared here

The complete source code of this example is shared here.

What we discussed so far?

We discussed how to create a single partition read data source and how to create a multi partitioned, location-aware data source.

This example can be improved further by adding filtering capability using SupportsPushDownFilters and column pruning capability using SupportsPushDownRequiredColumns.

Thanks for reading. Do you like this article? Let me know your thoughts in the comments.

In the next article, we will discuss how to create a write data source in Apache Spark 3.

References

--

--

Technology Enthusiast | Big Data Developer | Amateur Cricketer | Technical Lead Engineer @ eQ Technologic |