Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public class DataxParameters extends AbstractParameters {
*/
private int jobSpeedRecord;

/**
* datax channel
*/
private int jobChannel;

/**
* Xms memory
*/
Expand Down Expand Up @@ -206,6 +211,14 @@ public void setJobSpeedRecord(int jobSpeedRecord) {
this.jobSpeedRecord = jobSpeedRecord;
}

public int getJobChannel() {
return jobChannel;
}

public void setJobChannel(int jobChannel) {
this.jobChannel = jobChannel;
}

public int getXms() {
return xms;
}
Expand Down Expand Up @@ -249,23 +262,24 @@ public List<ResourceInfo> getResourceFilesList() {

@Override
public String toString() {
return "DataxParameters{"
+ "customConfig=" + customConfig
+ ", json='" + json + '\''
+ ", dsType='" + dsType + '\''
+ ", dataSource=" + dataSource
+ ", dtType='" + dtType + '\''
+ ", dataTarget=" + dataTarget
+ ", sql='" + sql + '\''
+ ", targetTable='" + targetTable + '\''
+ ", preStatements=" + preStatements
+ ", postStatements=" + postStatements
+ ", jobSpeedByte=" + jobSpeedByte
+ ", jobSpeedRecord=" + jobSpeedRecord
+ ", xms=" + xms
+ ", xmx=" + xmx
+ ", resourceList=" + JSONUtils.toJsonString(resourceList)
+ '}';
return "DataxParameters{" +
"customConfig=" + customConfig +
", json='" + json + '\'' +
", dsType='" + dsType + '\'' +
", dataSource=" + dataSource +
", dtType='" + dtType + '\'' +
", dataTarget=" + dataTarget +
", sql='" + sql + '\'' +
", targetTable='" + targetTable + '\'' +
", preStatements=" + preStatements +
", postStatements=" + postStatements +
", jobSpeedByte=" + jobSpeedByte +
", jobSpeedRecord=" + jobSpeedRecord +
", jobChannel=" + jobChannel +
", xms=" + xms +
", xmx=" + xmx +
", resourceList=" + JSONUtils.toJsonString(resourceList) +
'}';
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -309,7 +310,7 @@ private ObjectNode buildDataxJobSettingJson() {

ObjectNode speed = JSONUtils.createObjectNode();

speed.put("channel", DATAX_CHANNEL_COUNT);
speed.put("channel", Optional.of(dataXParameters.getJobChannel()).orElse(DATAX_CHANNEL_COUNT));

if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
Expand All @@ -333,7 +334,7 @@ private ObjectNode buildDataxJobSettingJson() {
private ObjectNode buildDataxCoreJson() {

ObjectNode speed = JSONUtils.createObjectNode();
speed.put("channel", DATAX_CHANNEL_COUNT);
speed.put("channel", Optional.of(dataXParameters.getJobChannel()).orElse(DATAX_CHANNEL_COUNT));

if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void testToString() {
dataxParameters.setDtType("MYSQL");
dataxParameters.setJobSpeedByte(1);
dataxParameters.setJobSpeedRecord(1);
dataxParameters.setJobChannel(1);
dataxParameters.setJson("json");
dataxParameters.setResourceList(resourceInfoList);

Expand All @@ -87,6 +88,7 @@ public void testToString() {
+ "postStatements=null, "
+ "jobSpeedByte=1, "
+ "jobSpeedRecord=1, "
+ "jobChannel=1, "
+ "xms=0, "
+ "xmx=-100, "
+ "resourceList=[{\"id\":null,\"resourceName\":\"/hdfs.keytab\",\"res\":null}]"
Expand Down
1 change: 1 addition & 0 deletions dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ export default {
datax_job_speed_byte_info: '(0 means unlimited)',
datax_job_speed_record: 'Speed(Record count)',
datax_job_speed_record_info: '(0 means unlimited)',
datax_job_channel: 'datax channel',
datax_job_runtime_memory: 'Runtime Memory Limits',
datax_job_runtime_memory_xms: 'Low Limit Value',
datax_job_runtime_memory_xmx: 'High Limit Value',
Expand Down
1 change: 1 addition & 0 deletions dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ export default {
datax_job_speed_byte_info: '(KB,0代表不限制)',
datax_job_speed_record: '限流(记录数)',
datax_job_speed_record_info: '(0代表不限制)',
datax_job_channel: '数据管道数',
datax_job_runtime_memory: '运行内存',
datax_job_runtime_memory_xms: '最小内存',
datax_job_runtime_memory_xmx: '最大内存',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,28 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
value: 3000
}
]
const jobChannelOptions: any[] = [
{
label: '1',
value: 1
},
{
label: '3',
value: 3
},
{
label: '5',
value: 5
},
{
label: '10',
value: 10
},
{
label: '15',
value: 15
}
]
const memoryLimitOptions = [
{
label: '1G',
Expand Down Expand Up @@ -264,6 +286,14 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
options: memoryLimitOptions,
value: 1
},
{
type: 'input',
field: 'jobChannel',
name: t('project.node.datax_job_channel'),
span: jobSpeedSpan,
options: jobChannelOptions,
value: 1
},
...useCustomParams({ model, field: 'localParams', isSimple: true })
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ export function formatParams(data: INodeData): {
taskParams.targetTable = data.targetTable
taskParams.jobSpeedByte = data.jobSpeedByte
taskParams.jobSpeedRecord = data.jobSpeedRecord
taskParams.jobChannel = data.jobChannel
taskParams.preStatements = data.preStatements
taskParams.postStatements = data.postStatements
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ interface ITaskParams {
targetTable?: string
jobSpeedByte?: number
jobSpeedRecord?: number
jobChannel?: number
xms?: number
xmx?: number
sparkParameters?: ISparkParameters
Expand Down