Data operation with Cloud Spanner using Mercari Dataflow Template

Yoichi Nagai
7 min readJan 26, 2021

Mercari Dataflow Template (MDT) is an OSS tool for easy data processing using GCP’s distributed data processing service, Cloud Dataflow.
It is used within Merpay, Inc. to combine, process, and store data between various data sources.
In this article, I will introduce examples of using MDT to input/output and process data to/from Cloud Spanner.

How to use Mercari Dataflow Template ?

First, you need to deploy MDT. After that, describe the configuration file (called a pipeline file) that defines the process you want to execute in JSON format. Upload that file to GCS, and launch it using the gcloud command or Dataflow’s REST API.
In this section, we will show you how to deploy, define, and execute data processing using MDT. (For more information on how to use MDT, please refer to the documentation)

Deploy Pipeline

First, you need to clone the MDT code from the GitHub repository and deploy it with the following command.(You will need to install Java 11 and maven 3)

mvn clean package -DskipTests -Dimage=gcr.io/{deploy_project}/{template_repo_name}

MDT uses a feature called FlexTemplate in Cloud Dataflow to allow users to define and execute data processing without writing programs.
With the above command, the MDT pipeline code will be packaged and deployed as a Docker image to the Cloud Container Registry (GCR) that you specify.
In addition, by executing the following command, the template file for launching a Cloud Dataflow job from the image registered in the GCR will be registered in the GCS.

gcloud dataflow flex-template build gs://{path/to/template_file} \
--image "gcr.io/{deploy_project}/{template_repo_name}" \
--sdk-language "JAVA"

This completes the deployment of MDT. Next, we will see how to define the data processing that we want to perform in MDT.

Define Pipeline

As introduced earlier, data processing is defined in JSON format as a pipeline file. The pipeline file mainly consists of sources, transforms, and sinks. Those fields describe the source modules that define the data reading source, the transform modules that define how to process inputs, and the sink module that defines the data writing destination.

{
"sources": [
{
"name": "input1",
"module": "bigquery",
"parameters": {...}
},
...
],
"transforms": [
{
"name": "trans1",
"module": "beamsql",
"inputs": ["input1"],
"parameters": {...}
},
...
],
"sinks": [
{
"name": "output1",
"module": "spanner",
"input": "trans1",
"parameters": {...}
},
...
]
}

In the source, transform, and sink modules, the name and module fields are required. The module field specifies the appropriate module name for the process (see the GitHub for a list of available modules). The name field specifies the name of the step in the pipeline and should be unique in the pipeline file.
Each transform step can have several inputs, taking data from either a source or other transform steps, while the sink only accepts a single input. By specifying the name of the step you want to use as input for each step in the inputs or input fields, you can chain steps together to create a pipeline.
In the parameters field, you can specify configuration items specific to each module.

Execute Pipeline

With the following gcloud command, you can start a Cloud Dataflow job from the MDT deployed as FlexTemplate and the pipeline file defined by you.

gcloud dataflow flex-template run bigquery-to-spanner \
--project=myproject \
--region=us-central1 \
--template-file-gcs-location=gs://example/template \
--parameters=config=gs://example/pipeline.json

Another way to launch a Job is to call the REST API directly. (The REST API is also used by the gcloud command)
If you want to execute a Job from the Cloud Scheduler or a program, you can use the REST API.
The following is an example of executing a Job from the REST API using the curl command.

PROJECT_ID=[PROJECT_ID]
REGION=us-central1

curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" "https://dataflow.googleapis.com/v1b3/projects/${PROJECT_ID}/locations/${REGION}/templates:launch"`
`"?dynamicTemplate.gcsPath=gs://example/template" -d "{
'parameters': {
'config': 'gs://example/pipeline.json'
}
'jobName':'myJobName',
}"

# Note that the deployment method for gcloud and REST API is a little different.

Data Operations for Cloud Spanner using MDT

In this section, I will introduce examples of using MDT to actually input/output data to Cloud Spanner and process it.

1. Insert BigQuery query results into Cloud Spanner

Let’s start with a simple example of inserting query results from BigQuery into Spanner.
First, define the pipeline file as shown below and upload it to GCS (Google Cloud Storage).
In the pipeline file, bigquery is specified as the source of data and spanner as the destination.

{
"sources": [
{
"name": "bigqueryInput",
"module": "bigquery",
"parameters": {
"query": "SELECT * FROM `myproject.mydataset.mytable`"
}
}
],
"sinks": [
{
"name": "spannerOutput",
"module": "spanner",
"input": "bigqueryInput",
"parameters": {
"projectId": "myproject",
"instanceId": "myinstance",
"databaseId": "mydatabase",
"table": "mytable"
}
}
]
}

Next, execute the following command.
(Let’s assume that you have deployed MDT to gs://example/template and saved the pipeline file to gs://example/pipeline.json)

gcloud dataflow flex-template run bigquery-to-spanner \
--project=myproject \
--template-file-gcs-location=gs://example/template \
--parameters=config=gs://example/pipeline.json

After a few moments, the Dataflow Job should start and the process executed as defined in the pipeline file.

2. Insert Cloud Spanner query results into BigQuery

In contrast to the previous example, here is an example of inserting query results from Spanner into BigQuery.
You can see that bigquery is exchanged as the sink and spanner as the source.

{
"sources": [
{
"name": "spanner",
"module": "spanner",
"parameters": {
"projectId": "myproject",
"instanceId": "myinstance",
"databaseId": "mydatabase",
"query": "SELECT * FROM mytable"
}
}
],
"sinks": [
{
"name": "bigquery",
"module": "bigquery",
"input": "spanner",
"parameters": {
"table": "myproject.mydataset.mytable",
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_TRUNCATE"
}
}
]
}

The job execution method is the same as in the previous example.

3. Combine data from different resources with SQL

In the previous example we simply moved the data, but you can also use Beam SQL to perform SQL joins and process data.
The following pipeline file example uses SQL to JOIN a Spanner table with the results of a query to BigQuery, and stores the results in another Spanner table.
In this example, in addition to the source and sink modules, beamsql is specified as a transform module to process the data in SQL.
In the transform module, specify the name of the data you want to process in the inputs. Specify the SQL you want to process with the sql parameter. The table name can be referred to by the name specified in inputs.

{
"sources": [
{
"name": "BigQueryInput",
"module": "bigquery",
"parameters": {
"query": "SELECT BField1, BField2 FROM `myproject.mydataset.mytable`"
}
},
{
"name": "SpannerInput",
"module": "spanner",
"parameters": {
"projectId": "myproject",
"instanceId": "myinstance",
"databaseId": "mydatabase",
"table": "mytable",
"fields": ["SField1","SField2"]
}
}
],
"transforms": [
{
"name": "BeamsqlTransform",
"module": "beamsql",
"inputs": [
"BigQueryInput",
"SpannerInput"
],
"parameters": {
"sql": "SELECT BigQueryInput.BField1 AS Field1, IF(BigQueryInput.BField2 IS NULL, SpannerInput.SField2, BigQueryInput.BField2) AS Field2 FROM BigQueryInput LEFT JOIN SpannerInput ON BigQueryInput.BField1 = SpannerInput.SField1"
}
}
],
"sinks": [
{
"name": "SpannerOutput",
"module": "spanner",
"input": "BeamsqlTransform",
"parameters": {
"projectId": "anotherproject",
"instanceId": "anotherinstance",
"databaseId": "anotherdatabase",
"table": "anothertable"
}
}
]
}

If you run the same command as in the previous example, the following Dataflow Job will be started.

4. Load data from Spanner and save it to BigQuery in micro-batch

In some cases, you may want to use the latest data from Spanner in near real-time. MDT supports fetching data from Spanner by executing queries in micro-batches.
The following configuration is an example of a micro-batch that periodically executes a query to Spanner at a specified frequency to retrieve data and insert it into BigQuery.

{
"sources": [
{
"name": "spanner",
"module": "spanner",
"microbatch": true,
"parameters": {
"projectId": "myproject",
"instanceId": "myinstance",
"databaseId": "mydatabase",
"query": "SELECT * FROM MyTable@{FORCE_INDEX=MyTableCreatedAtDesc} WHERE ShardCreatedAt = 1 AND CreatedAt >= TIMESTAMP_SECONDS(${__EVENT_EPOCH_SECOND_PRE__}) AND CreatedAt < TIMESTAMP_SECONDS(${__EVENT_EPOCH_SECOND__})",
"startDatetime": "2021-01-01T00:00:00Z",
"intervalSecond": 60,
"gapSecond": 30,
"maxDurationMinute": 60,
"catchupIntervalSecond": 60
}
}
],
"sinks": [
{
"name": "bigquery",
"module": "bigquery",
"input": "spanner",
"parameters": {
"table": "myproject:mydataset.mytable",
"createDisposition": "CREATE_IF_NEEDED",
"writeDisposition": "WRITE_APPEND"
}
}
]
}

In order to run queries to Spanner in micro-batches, set the microbatch parameter to true in the Spanner source module.
In the example above, the query to Spanner is prepared for micro-batch execution.
In the query executed in micro-batch, filter conditions are specified using variables named __EVENT_EPOCH_SECOND_PRE__ and __EVENT_EPOCH_SECOND__. These variables will be embedded at the time of query execution based on the frequency and interval specified in the configuration.
The frequency and interval of query execution is specified by parameters such as startDatetime, intervalSecond, gapSecond, maxDurationMinute, and catchupIntervalSecond. The relationship of these parameters to the query execution timing and filter conditions is shown in the figure below.
(For a detailed explanation of micro-batch execution, see GitHub.)

To run the Pipeline in micro-batch, run Dataflow in streaming mode with the streaming parameter as shown below.

gcloud dataflow flex-template run spanner-microbatch-to-bigquery \
--project=myproject \
--region=us-central1 \
--template-file-gcs-location=gs://example/template \
--parameters=config=gs://example/pipeline.json \
--parameters=streaming=true

Note:
When retrieving data from Spanner in micro-batches, care must be taken to create Indexes appropriately so that the query execution time is within the execution interval.

As introduced in this article, using MDT, you can define the data processing you want to execute as a pipeline file and execute it in Cloud Dataflow.
Other examples of pipeline files can be found on GitHub, so if you are interested, please look for a file that is similar to the data processing you want to do.
MDT is still a developing OSS that has just been released. If you find any bugs, feature requests, or questions, please feel free to let us know on GitHub.

--

--