Skip to content

wip(osc): introduce odc online schema change adaptor #4334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: dev/4.3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.OmsStepName;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.OscStepName;
import com.oceanbase.odc.service.onlineschemachange.oms.request.CommonTransferConfig;
import com.oceanbase.odc.service.onlineschemachange.oms.request.CreateOmsProjectRequest;
import com.oceanbase.odc.service.onlineschemachange.oms.request.DatabaseTransferObject;
Expand Down Expand Up @@ -83,7 +83,7 @@ public void test_json_sub_type_missing_property() {
Assert.assertNotNull(apiReturnResults);
Assert.assertTrue(CollectionUtils.isNotEmpty(apiReturnResults.getData()));
Optional<OmsProjectStepVO> first = apiReturnResults.getData().stream().filter(
a -> a.getName() == OmsStepName.FULL_TRANSFER).findFirst();
a -> a.getName() == OscStepName.FULL_TRANSFER).findFirst();
Assert.assertTrue(first.isPresent());
FullTransferStepInfoVO fullTransferStepInfoVO = (FullTransferStepInfoVO) first.get().getStepInfo();
Assert.assertSame(fullTransferStepInfoVO.getProcessedRecords(), 100L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ public enum ErrorCodes implements ErrorCode {
OmsGhanaOperateFailed,
OmsParamError,
OmsConnectivityTestFailed,
OmsPreCheckFailed,
OmsProjectExecutingFailed,
OscPreCheckFailed,
OscProjectExecutingFailed,

// resource
BuiltInResourceOperateNotAllowed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ com.oceanbase.odc.ErrorCodes.OmsDataCheckInconsistent=Data inconsistencies found
com.oceanbase.odc.ErrorCodes.OmsGhanaOperateFailed=OceanBase Migration Service (OMS) Ghana operation error.
com.oceanbase.odc.ErrorCodes.OmsParamError=Parameter error of OceanBase Migration Service (OMS).
com.oceanbase.odc.ErrorCodes.OmsConnectivityTestFailed=The connectivity test of OceanBase Migration Service (OMS) failed.
com.oceanbase.odc.ErrorCodes.OmsPreCheckFailed=The pre-check of OceanBase Migration Service (OMS) failed.
com.oceanbase.odc.ErrorCodes.OmsProjectExecutingFailed=OceanBase Migration Service (OMS) runtime failed
com.oceanbase.odc.ErrorCodes.OscPreCheckFailed=The pre-check of Online Schema Change failed.
com.oceanbase.odc.ErrorCodes.OscProjectExecutingFailed=Online Schema Change runtime failed
com.oceanbase.odc.ErrorCodes.BuiltInResourceOperateNotAllowed=The operation {0} is not allowed for the resource type {1}.
com.oceanbase.odc.ErrorCodes.BuiltInResourceNotAvailable=The built-in resource {0} has been disabled.
com.oceanbase.odc.ErrorCodes.SqlInterceptApprovalRequired=Approval is required for this SQL statement before the execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ com.oceanbase.odc.ErrorCodes.OmsDataCheckInconsistent=OceanBase Migration Servic
com.oceanbase.odc.ErrorCodes.OmsGhanaOperateFailed=OceanBase Migration Service (OMS) 管控操作失敗
com.oceanbase.odc.ErrorCodes.OmsParamError=OceanBase Migration Service (OMS) 參數錯誤
com.oceanbase.odc.ErrorCodes.OmsConnectivityTestFailed=OceanBase Migration Service (OMS) 連接測試失敗
com.oceanbase.odc.ErrorCodes.OmsPreCheckFailed=OmsOceanBase Migration Service (OMS) 預檢查失敗
com.oceanbase.odc.ErrorCodes.OmsProjectExecutingFailed=OceanBase Migration Service (OMS) 執行失敗
com.oceanbase.odc.ErrorCodes.OmsPreCheckFailed=Online Schema Change 預檢查失敗
com.oceanbase.odc.ErrorCodes.OmsProjectExecutingFailed=Online Schema Change 執行失敗
com.oceanbase.odc.ErrorCodes.BuiltInResourceOperateNotAllowed=不支持對資源類型 {1} 進行 {0} 操作
com.oceanbase.odc.ErrorCodes.BuiltInResourceNotAvailable=內置資源: {0} 已被停用
com.oceanbase.odc.ErrorCodes.SqlInterceptApprovalRequired=無法直接執行此 SQL,需要發起審批流程
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe
param.setUid(uid);
param.setRateLimitConfig(parameter.getRateLimitConfig());
param.setState(OscStates.YIELD_CONTEXT.getState());
// assign when create task
param.setUseODCMigrateTool(onlineSchemaChangeProperties.isUseOdcMigrateTool());
param.setOdcCommandURl(onlineSchemaChangeProperties.getOdcMigrateUrl());
return createScheduleTaskEntity(schedule.getId(), param);
}).collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public class OnlineSchemaChangeProperties {

private boolean enableFullVerify;

// if use odc migrate tool
private boolean useOdcMigrateTool = false;
// if this url provided, use provided url, or use k8s pod to create new instance
private String odcMigrateUrl = null;

@Data
public static class OmsProperties {
private String url;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ public class OnlineSchemaChangeScheduleTaskParameters {
// only set when column is dropped
private List<String> filterColumns;

// if odc migrate is enabled
private boolean useODCMigrateTool = false;

// odc command url to migrate data
private String odcCommandURl;

// mapper port to access, maybe a part of odcCommandURl if k8s only access by host ip
private Integer k8sMapperPort;

// resourceID for pod if useODCMigrateTool = true, -1 means release not needed
private Long resourceID;

public String getOriginTableNameWithSchema() {
return tableNameWithSchema(originTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.oceanbase.odc.common.json.NormalDialectTypeOutput;
import com.oceanbase.odc.core.shared.constant.DialectType;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.OmsStepName;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.OscStepName;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down Expand Up @@ -121,7 +121,10 @@ public class OnlineSchemaChangeScheduleTaskResult {
*/
private boolean manualSwapTableStarted;

private Map<OmsStepName, Long> checkFailedTime = new HashMap<>();
private Map<OscStepName, Long> checkFailedTime = new HashMap<>();

// last check failed time, it maybe not ready yet
private Long lastCheckFailedTimeSecond;

public OnlineSchemaChangeScheduleTaskResult(OnlineSchemaChangeScheduleTaskParameters taskParam) {
this.originTableName = taskParam.getOriginTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
@Getter
@AllArgsConstructor
public enum OmsStepName {
public enum OscStepName {

PRE_CHECK,
TRANSFER_PRECHECK,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import com.oceanbase.odc.service.onlineschemachange.oms.enums.CheckerObjectStatus;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.CheckerResultType;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.OmsProjectStatusEnum;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.OmsStepName;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.OmsStepStatus;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.OscStepName;

/**
* @author yaobin
Expand Down Expand Up @@ -69,14 +69,14 @@ public OmsProjectStatusEnum deserialize(JsonParser jsonParser, DeserializationCo
}
}

public static class OmsStepNameDeserializer extends JsonDeserializer<OmsStepName> {
public static class OmsStepNameDeserializer extends JsonDeserializer<OscStepName> {

@Override
public OmsStepName deserialize(JsonParser jsonParser, DeserializationContext ctxt)
public OscStepName deserialize(JsonParser jsonParser, DeserializationContext ctxt)
throws IOException, JacksonException {

String status = jsonParser.readValueAs(String.class);
return getEnum(OmsStepName.class, status, OmsStepName.UNKNOWN);
return getEnum(OscStepName.class, status, OscStepName.UNKNOWN);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.OmsStepName;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.OmsStepStatus;
import com.oceanbase.odc.service.onlineschemachange.oms.enums.OscStepName;
import com.oceanbase.odc.service.onlineschemachange.oms.jackson.CustomEnumDeserializer;

import lombok.Data;
Expand All @@ -49,7 +49,7 @@ public class OmsProjectStepVO {
* 步骤名
*/
@JsonDeserialize(using = CustomEnumDeserializer.OmsStepNameDeserializer.class)
private OmsStepName name;
private OscStepName name;

/**
* 步骤描述(预检查/结构迁移/结构同步/全量迁移/全量同步/全量校验/索引迁移/增量日志拉取/增量同步/增量校验/正向切换)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void init() {

// CREATE_DATA_TASK -> CREATE_DATA_TASK | MONITOR_DATA_TASK| COMPLETE(cnacel)
registerEvent(OscStates.CREATE_DATA_TASK.getState(),
CreateDataTaskAction.ofOMSCreateDataTaskAction(dataSourceOpenApiService, omsProjectOpenApiService,
onlineSchemaChangeProperties),
CreateDataTaskAction.createDataTaskAction(dataSourceOpenApiService, omsProjectOpenApiService,
onlineSchemaChangeProperties, systemConfigService, resourceManager),
statesTransfer,
ImmutableSet.of(OscStates.CREATE_DATA_TASK.getState(), OscStates.MONITOR_DATA_TASK.getState(),
OscStates.COMPLETE.getState()));
Expand Down Expand Up @@ -90,7 +90,7 @@ public void init() {

// CLEAN_RESOURCE -> YIELD_CONTEXT | CLEAN_RESOURCES | COMPLETE(cancel)
registerEvent(OscStates.CLEAN_RESOURCE.getState(),
CleanResourcesAction.ofOMSCleanResourcesAction(omsProjectOpenApiService), statesTransfer,
CleanResourcesAction.ofCleanResourcesAction(omsProjectOpenApiService, resourceManager), statesTransfer,
ImmutableSet.of(OscStates.YIELD_CONTEXT.getState(), OscStates.CLEAN_RESOURCE.getState(),
OscStates.COMPLETE.getState()));
// COMPLETE should not be scheduled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.oceanbase.odc.metadb.schedule.ScheduleEntity;
import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity;
import com.oceanbase.odc.metadb.schedule.ScheduleTaskRepository;
import com.oceanbase.odc.service.config.SystemConfigService;
import com.oceanbase.odc.service.connection.ConnectionService;
import com.oceanbase.odc.service.connection.model.ConnectionConfig;
import com.oceanbase.odc.service.flow.BeanInjectedClassDelegate;
Expand All @@ -49,6 +50,7 @@
import com.oceanbase.odc.service.onlineschemachange.oms.openapi.OmsProjectOpenApiService;
import com.oceanbase.odc.service.onlineschemachange.oscfms.action.ConnectionProvider;
import com.oceanbase.odc.service.onlineschemachange.oscfms.state.OscStates;
import com.oceanbase.odc.service.resource.ResourceManager;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.ScheduleTaskService;
import com.oceanbase.odc.service.session.DBSessionManageFacade;
Expand Down Expand Up @@ -87,6 +89,10 @@ public abstract class OscActionFsmBase extends ActionFsm<OscActionContext, OscAc
// ugly impl, try impl only in scheduler or flow
@Autowired
protected FlowInstanceService flowInstanceService;
@Autowired
protected ResourceManager resourceManager;
@Autowired
protected SystemConfigService systemConfigService;
// default is 432000 = 5*24*3600
@Value("${osc-task-expired-after-seconds:432000}")
protected long oscTaskExpiredAfterSeconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,24 @@
*/
public class ActionDelegate implements Action<OscActionContext, OscActionResult> {
// use delegate
protected Action<OscActionContext, OscActionResult> action;
protected Action<OscActionContext, OscActionResult> omsAction;
protected Action<OscActionContext, OscActionResult> odcAction;

@Override
public OscActionResult execute(OscActionContext context) throws Exception {
return action.execute(context);
return chooseAction(context).execute(context);
}

@Override
public void rollback(OscActionContext context) {
action.rollback(context);
chooseAction(context).rollback(context);
}

protected Action<OscActionContext, OscActionResult> chooseAction(OscActionContext context) {
if (context.getTaskParameter().isUseODCMigrateTool()) {
return odcAction;
} else {
return omsAction;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
import javax.validation.constraints.NotNull;

import com.oceanbase.odc.service.onlineschemachange.oms.openapi.OmsProjectOpenApiService;
import com.oceanbase.odc.service.onlineschemachange.oscfms.action.odc.OdcCleanResourcesAction;
import com.oceanbase.odc.service.onlineschemachange.oscfms.action.oms.OmsCleanResourcesAction;
import com.oceanbase.odc.service.resource.ResourceManager;

import lombok.NonNull;

/**
* @author longpeng.zlp
Expand All @@ -27,10 +31,11 @@
*/
public class CleanResourcesAction extends ActionDelegate {

public static CleanResourcesAction ofOMSCleanResourcesAction(
@NotNull OmsProjectOpenApiService omsProjectOpenApiService) {
public static CleanResourcesAction ofCleanResourcesAction(
@NotNull OmsProjectOpenApiService omsProjectOpenApiService, @NonNull ResourceManager resourceManager) {
CleanResourcesAction ret = new CleanResourcesAction();
ret.action = new OmsCleanResourcesAction(omsProjectOpenApiService);
ret.omsAction = new OmsCleanResourcesAction(omsProjectOpenApiService);
ret.odcAction = new OdcCleanResourcesAction(resourceManager);
return ret;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.service.onlineschemachange.oscfms.action;

import java.util.List;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.core.session.ConnectionSession;
import com.oceanbase.odc.core.shared.constant.TaskErrorStrategy;
import com.oceanbase.odc.core.shared.constant.TaskStatus;
import com.oceanbase.odc.metadb.schedule.ScheduleEntity;
import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity;
import com.oceanbase.odc.service.onlineschemachange.OscTableUtil;
import com.oceanbase.odc.service.onlineschemachange.fsm.Action;
import com.oceanbase.odc.service.onlineschemachange.model.OnlineSchemaChangeParameters;
import com.oceanbase.odc.service.onlineschemachange.model.OnlineSchemaChangeScheduleTaskParameters;
import com.oceanbase.odc.service.onlineschemachange.oscfms.OscActionContext;
import com.oceanbase.odc.service.onlineschemachange.oscfms.OscActionResult;
import com.oceanbase.odc.service.onlineschemachange.oscfms.state.OscStates;

import lombok.extern.slf4j.Slf4j;

/**
* @author longpeng.zlp
* @date 2025/3/24 14:11
*/
@Slf4j
public abstract class CleanResourcesActionBase implements Action<OscActionContext, OscActionResult> {

protected final List<TaskStatus> expectedTaskStatus = Lists.newArrayList(TaskStatus.DONE, TaskStatus.FAILED,
TaskStatus.CANCELED, TaskStatus.RUNNING, TaskStatus.ABNORMAL);

protected boolean tryDropNewTable(OscActionContext context) {
ConnectionSession connectionSession = null;
OnlineSchemaChangeScheduleTaskParameters taskParam = context.getTaskParameter();
String databaseName = taskParam.getDatabaseName();
String tableName = taskParam.getNewTableNameUnwrapped();
boolean succeed;
try {
connectionSession = context.getConnectionProvider().createConnectionSession();
OscTableUtil.dropNewTableIfExits(databaseName, tableName, connectionSession);
succeed = true;
} catch (Throwable e) {
log.warn("osc: drop table = {}.{} failed", databaseName, tableName, e);
succeed = false;
} finally {
if (connectionSession != null) {
connectionSession.expire();
}
}
return succeed;
}

@VisibleForTesting
protected OscActionResult determinateNextState(ScheduleTaskEntity scheduleTask, ScheduleEntity schedule) {
Long scheduleId = schedule.getId();
// try to dispatch to next state for done status
if (scheduleTask.getStatus() == TaskStatus.DONE) {
return new OscActionResult(OscStates.CLEAN_RESOURCE.getState(), null, OscStates.YIELD_CONTEXT.getState());
}
// if task state is in cancel state, stop and transfer to complete state
if (scheduleTask.getStatus() == TaskStatus.CANCELED) {
log.info("Because task is canceled, so delete quartz job={}", scheduleId);
// cancel as complete
return new OscActionResult(OscStates.CLEAN_RESOURCE.getState(), null, OscStates.COMPLETE.getState());
}
// remain failed and prepare state
OnlineSchemaChangeParameters onlineSchemaChangeParameters = JsonUtils.fromJson(
schedule.getJobParametersJson(), OnlineSchemaChangeParameters.class);
if (onlineSchemaChangeParameters.getErrorStrategy() == TaskErrorStrategy.CONTINUE) {
log.info("Because error strategy is continue, so schedule next task");
// try schedule next task
return new OscActionResult(OscStates.CLEAN_RESOURCE.getState(), null, OscStates.YIELD_CONTEXT.getState());
} else {
log.info("Because error strategy is abort, so delete quartz job={}", scheduleId);
// not continue for remain state, transfer to complete state
return new OscActionResult(OscStates.CLEAN_RESOURCE.getState(), null, OscStates.COMPLETE.getState());
}
}
}
Loading