This article provides a detailed guide on using SeaTunnel to synchronize data from InfluxDB to Doris. Leveraging SeaTunnel's robust data integration capabilities, users can efficiently transfer time-series data stored in InfluxDB to Doris, making it easily accessible for data querying and analysis.
Version Information:
SeaTunnel 2.3.3
InfluxDB 2.7.6
Doris 2.1.3 rc09
Preparation
The installation process for SeaTunnel 2.3.3 is omitted here and can be found in the official documentation.
After installing SeaTunnel 2.3.3, delete two connector JAR files, connector-hudi-2.3.3.jar
and connector-datahub-2.3.3.jar
, as these may cause database synchronization errors.
Add the following JAR files to your setup: seatunnel-api-2.3.3.jar
, seatunnel-transforms-v2-2.3.3.jar
, mysql-connector-java-8.0.28.jar
, and jersey-client-1.19.4.jar
. Without these, the data synchronization script may fail due to missing classes.
For InfluxDB 2.7.6, certain preliminary configurations are necessary, which will be detailed below.
To view field types easily when defining fields for the synchronization file, you can use the InfluxDB Studio-0.2.0 client. This client allows viewing field types, aiding in setting up field definitions for synchronization.
After installing InfluxDB 2.7.6 on Linux, you can access its UI through ip:8086
and input your username, password, organization, and bucket information.
Synchronization Process and Troubleshooting
Configuring InfluxDB credentials in SeaTunnel 2.3 might lead to errors, such as issues retrieving field information. After tracing the SeaTunnel code, it was found that the 401 authorization error was persistent.
Attempts to connect using the InfluxDB Studio management tool with the same username and password as on the UI also resulted in 401 errors. Further investigation revealed that the UI credentials could not directly authenticate to the database.
To set up database access credentials, run the following command to assign permissions:
influx v1 auth create -o orgName --read-bucket bucketId --username=username
Alternatively:
influx v1 auth create -o "Organization" --write-bucket bucketId --read-bucket bucketId --username=username --password=password
To delete an authorization, use:
influx v1 auth delete --id 'id'
where id
can be obtained from the output of influx v1 auth list
.
After executing these commands and setting up the password, you should be able to log in successfully using the new credentials, allowing for data synchronization through SeaTunnel.
Data Synchronization Configuration File:
v1.batch.config_tmp.template
Below is a sample SeaTunnel configuration file to synchronize data from InfluxDB to Doris.
env {
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
influxdb {
url = "http://X.X.X.X:8086"
token = "your_token" # optional
org = "your_organization"
bucket = "your_bucket" # optional
database = "your_bucket"
username = "your_influxdb_username"
password = "your_influxdb_password"
epoch = "H" # options available on InfluxDB documentation
query_timeout_sec = 600
measurement = "prometheus_remote_write" # data table
fields = ["node_cpu_seconds_total", "node_memory_MemTotal_bytes"] # optional
sql = """SELECT node_cpu_seconds_total as system_cpu_usage, cpu as process_occupy_physical_memory_size, job as create_dept, node_memory_MemTotal_bytes as process_read_written_file_system_total_bytes, node_memory_MemAvailable_bytes as process_open_file_describe_quantity, time as create_time FROM "prometheus_remote_write" where time > now() - 1h"""
where = " where time > now() - 1h"
schema {
fields {
system_cpu_usage = FLOAT
process_occupy_physical_memory_size = INT
create_dept = STRING
process_read_written_file_system_total_bytes = FLOAT
process_open_file_describe_quantity = FLOAT
create_time = BIGINT
}
}
}
}
sink {
Doris {
fenodes = "X.X.X.X:8030"
username = "username"
password = "password"
table.identifier = "sbyw_data_acquisition.sbyw_application_process_type_tmp"
sink.label-prefix = "test-cdc"
sink.enable-2pc = "true"
sink.enable-delete = "true"
sink.max-retries = 3
batch_size = 10000
result_table_name = "sbyw_application_process_type_tmp"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}
transform {
FieldMapper {
source_table_name = "prometheus_remote_write"
result_table_name = "sbyw_application_process_type_tmp"
field_mapper = {
system_cpu_usage = system_cpu_usage
process_occupy_physical_memory_size = process_occupy_physical_memory_size
process_read_written_file_system_total_bytes = process_read_written_file_system_total_bytes
process_open_file_describe_quantity = process_open_file_describe_quantity
create_time = create_time
create_dept = create_dept
}
}
}
Run the data synchronization script using:
./bin/seatunnel.sh -c ./config/v1.batch.config_tmp.template
Doris Test Table and InfluxDB Data
Here is my Doris test table:
Below is a snapshot of the data from the InfluxDB Studio-0.2.0 client with InfluxDB 2.7.6 data and the synchronization results in Doris.
Note: InfluxDB 2.7.6 does not fully support SQL queries. It only allows basic, simple queries. Attempts to perform complex queries with aggregations and single fields will result in errors due to unsupported query structures.
However, the following type of query is supported. This is due to the design of InfluxDB 2, which does not support combining aggregation queries with individual field queries.
Final Results:
My Data Synchronized to Doris:
Here it is, the data is successfully synchronized from InfluxDB to Doris with SeaTunnel!