Data synchronization is one of the most common requirements in enterprise data platform development. As business volume continues to grow, full data synchronization can place increasing pressure on source databases while consuming substantial computing and storage resources. As a result, incremental synchronization has become the preferred approach in most production environments.
In this demo, we will combine Apache DolphinScheduler and Apache SeaTunnel to implement a typical offline incremental synchronization scenario. DolphinScheduler retrieves the synchronization checkpoint from the target system and passes it to SeaTunnel as a runtime parameter, enabling incremental data synchronization from MySQL to Apache Doris.
This article is based on an actual demonstration and provides a complete walkthrough of the environment setup, SeaTunnel configuration, and DolphinScheduler workflow configuration process.
For the full demo, please refer to:
1. Environment Setup
This demonstration uses the following components:
| Component | Version |
|---|---|
| Apache SeaTunnel | 2.3.9 |
| Apache DolphinScheduler | 3.x |
| MySQL | 8.4 |
| Apache Doris | 2.x |
In this architecture:
- MySQL serves as the source database.
- Doris serves as the target database.
- SeaTunnel is responsible for data synchronization.
- DolphinScheduler handles workflow orchestration and scheduling.
2. Preparing Test Data
Before configuring the synchronization task, we first prepare sample business data.
In this demonstration, a database named shopping is used as the sample database, and an orders table is created.
The orders table contains an auto-incrementing primary key column:
order_id
This field will later be used as the incremental synchronization checkpoint.
To verify synchronization results, a batch of sample records is inserted into the table. Approximately 300 order records are generated using a script.
The following information is then inspected:
- Current total number of orders
- Current maximum order ID
These values will serve as references when configuring incremental synchronization logic later.
It is worth noting that order_id is used only for demonstration purposes. In real-world production scenarios, timestamp fields such as update_time or create_time are often used as incremental synchronization conditions.
3. Incremental Synchronization Design
Before configuring SeaTunnel, let's first understand the overall synchronization strategy.
The core idea is to use data that has already been synchronized into Doris to determine the current synchronization progress.
The workflow operates as follows:
- Query the current maximum order ID in Doris.
- Use this value as the synchronization checkpoint.
- SeaTunnel reads records from MySQL whose order IDs are greater than this checkpoint.
- Newly added records are written into Doris.
- During the next execution, synchronization resumes from the latest checkpoint.
For example, if the current maximum order ID in Doris is:
300
SeaTunnel will execute the following condition:
WHERE order_id > 300
This ensures that only newly inserted records are processed during each run, preventing duplicate synchronization of existing data.
As emphasized during the demonstration, the incremental field does not necessarily have to be a primary key. Any field that can accurately identify newly added or modified data can be used.
4. Configuring the SeaTunnel Job
After defining the synchronization strategy, we can start configuring the SeaTunnel job.
Configure the JDBC Source
Since the source data resides in MySQL, JDBC Source is used to read the data.
The core query is:
SELECT *
FROM orders
WHERE order_id > ${order_id}
The most important part is:
${order_id}
This value is not hardcoded. Instead, it will be dynamically supplied by DolphinScheduler.
When the workflow runs, SeaTunnel automatically replaces this variable with the actual synchronization checkpoint, enabling incremental extraction.
Configure Parallelism
The demonstration also configures task parallelism:
parallelism = 4
Increasing parallelism can significantly improve synchronization performance.
In production environments, the appropriate value should be determined based on available server resources and database workload.
Configure Partitioned Reads
To improve performance when reading large tables, partitioned reading is also introduced.
The partition column is:
order_id
Configuration:
partition_column = "order_id"
Combined with the partition_num parameter, the dataset is divided into multiple partitions that can be processed in parallel.
This approach can greatly improve synchronization efficiency for large-scale datasets.
Configure Fetch Size
Within the JDBC Connector, fetch_size can be used to control the number of records retrieved from the database per fetch operation.
Proper configuration of this parameter can reduce database round trips and improve overall read performance.
5. Configuring the Doris Sink
After completing the Source configuration, the next step is configuring the Doris Sink.
Automatic Table Creation
The demonstration first introduces:
create_schema
This parameter enables automatic creation of target tables.
By leveraging automatic table creation, users can significantly reduce the effort required to manually maintain Doris table schemas.
Configure Write Mode
Since this example uses incremental synchronization, append mode is selected:
save_mode = APPEND_DATA
APPEND_DATA is used because each synchronization run only processes newly added records and does not need to overwrite historical data.
Enable Two-Phase Commit
To ensure data consistency, the demonstration also introduces:
enable_2pc = true
Enabling this option activates the two-phase commit mechanism, providing more reliable data writes.
It also helps guarantee Exactly-Once semantics during data synchronization.
Performance Optimization Parameters
Several performance-related parameters are also discussed, including:
batch_size
and
buffer_size
These parameters primarily control batch write behavior and can significantly improve Doris ingestion performance.
6. Configuring the DolphinScheduler Runtime Environment
After completing the SeaTunnel configuration, the next step is setting up DolphinScheduler.
Create a Tenant
First, navigate to the Security Center.
Open the Tenant Management page and create a new tenant.
The demonstration specifically emphasizes that all tasks in DolphinScheduler are ultimately executed under a tenant identity. Therefore, tenant configuration is an essential step in preparing the runtime environment.
Create a User and Associate It with the Tenant
Next, navigate to the User Management page.
Create a user and associate it with the tenant created in the previous step.
Once configured, the user will have permission to execute tasks under the corresponding tenant.
Create an Environment
Next, open the Environment Management page.
Create a runtime environment for SeaTunnel.
Configure the following environment variable:
SEATUNNEL_HOME=/soft/seatunnel
This configuration tells DolphinScheduler where SeaTunnel is installed.
When a workflow executes a SeaTunnel task, DolphinScheduler uses this path to locate the corresponding execution scripts.
The demonstration highlights that this configuration is mandatory and should not be skipped.
7. Creating the Project and Workflow
After completing the environment setup, create a new project.
In this demonstration, a project named:
shopping
is created.
After entering the project, create a new workflow.
The workflow contains two core nodes:
- SQL Task
- SeaTunnel Task
The SQL Task is responsible for retrieving the synchronization checkpoint, while the SeaTunnel Task performs the actual data synchronization.
8. Configuring the SQL Task to Retrieve the Synchronization Checkpoint
This is the most critical step in the entire solution.
First, create an SQL Task and select the Doris datasource.
The purpose of this task is to determine how far synchronization has progressed by querying the latest synchronized order ID.
The SQL statement is:
SELECT IFNULL(MAX(order_id),0) AS order_id
FROM orders;
The demonstration specifically explains why the following statement should not be used directly:
SELECT MAX(order_id)
FROM orders;
The reason is that during the initial synchronization, the Doris table may still be empty.
In this case:
MAX(order_id)
returns:
NULL
If NULL is passed directly to the downstream SeaTunnel task, it may generate an invalid query condition.
Therefore:
IFNULL(MAX(order_id),0)
is used to convert NULL values into 0.
This ensures that the initial synchronization starts correctly from the very first record.
Configure an OUT Parameter
The query result must be passed to downstream tasks.
To accomplish this, create a custom parameter within the SQL Task.
Select the parameter type:
OUT
Set the parameter name to:
order_id
The SQL query result will then be stored as a workflow variable.
The SeaTunnel task can subsequently reference this variable directly.
9. Incremental Synchronization Workflow Logic
Once the SQL Task is configured, the entire incremental synchronization pipeline is established.
When the workflow runs:
- The SQL Task queries the current maximum order_id in Doris.
- The result is stored as a workflow variable.
- SeaTunnel uses
${order_id}as the query condition. - Newly added records are extracted from MySQL.
Through this approach, offline incremental synchronization based on a business primary key can be implemented efficiently and reliably.
10. Conclusion
This example demonstrates how to implement offline incremental synchronization by combining Apache DolphinScheduler and Apache SeaTunnel.
SeaTunnel handles data extraction and loading, while DolphinScheduler manages synchronization checkpoint retrieval, parameter passing, workflow orchestration, and scheduling.
The key idea behind this solution is querying the maximum order_id from the target Doris table through an SQL Task and passing the result to SeaTunnel through an OUT parameter. SeaTunnel then uses the checkpoint to perform incremental extraction from MySQL.
For data warehouse construction, ODS synchronization, and recurring offline synchronization scenarios, this solution offers several advantages:
- Simple implementation
- Easy maintenance
- Strong extensibility
- Production-ready incremental synchronization capabilities
As a result, it provides a practical and highly valuable reference architecture for enterprise data platform development.

Top comments (0)