Skip to content

Commit dd8b87f

Browse files
bbendepvillard31
authored andcommitted
NIFI-14550 Allow setting version control info on a non-versioned process group
- Update stopVersionControl to no longer clear versioned component ids in order to allow re-attaching later - Add new methods to service facade and PG DAO to pass the flow snapshot when setting VCI - Fix system test to ensure UP_TO_DATE after Set Version Info Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes apache#9930.
1 parent f3f1bc0 commit dd8b87f

File tree

12 files changed

+183
-24
lines changed

12 files changed

+183
-24
lines changed

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ private void synchronize(final ProcessGroup group, final VersionedProcessGroup p
486486

487487
final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
488488
if (remoteCoordinates == null) {
489-
group.disconnectVersionControl(false);
489+
group.disconnectVersionControl();
490490
} else {
491491
final String registryId = determineRegistryId(remoteCoordinates);
492492
final String branch = remoteCoordinates.getBranch();

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3642,15 +3642,10 @@ private VersionedProcessGroup stripContentsFromRemoteDescendantGroups(final Vers
36423642
}
36433643

36443644
@Override
3645-
public void disconnectVersionControl(final boolean removeVersionedComponentIds) {
3645+
public void disconnectVersionControl() {
36463646
writeLock.lock();
36473647
try {
36483648
this.versionControlInfo.set(null);
3649-
3650-
if (removeVersionedComponentIds) {
3651-
// remove version component ids from each component (until another versioned PG is encountered)
3652-
applyVersionedComponentIds(this, id -> null);
3653-
}
36543649
} finally {
36553650
writeLock.unlock();
36563651
}

nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1039,7 +1039,7 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
10391039
/**
10401040
* Disconnects this Process Group from version control. If not currently under version control, this method does nothing.
10411041
*/
1042-
void disconnectVersionControl(boolean removeVersionedComponentIds);
1042+
void disconnectVersionControl();
10431043

10441044
/**
10451045
* Synchronizes the Process Group with the given Flow Registry, determining whether or not the local flow

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,7 @@ public void setVersionControlInformation(VersionControlInformation versionContro
725725
}
726726

727727
@Override
728-
public void disconnectVersionControl(final boolean removeVersionedComponentIds) {
728+
public void disconnectVersionControl() {
729729
this.versionControlInfo = null;
730730
}
731731

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,6 +1278,16 @@ Set<DocumentedTypeDTO> getControllerServiceTypes(final String serviceType, final
12781278
*/
12791279
ProcessGroupEntity updateProcessGroup(Revision revision, ProcessGroupDTO processGroupDTO);
12801280

1281+
/**
1282+
* Sets the version control info of an unversioned process group.
1283+
*
1284+
* @param revision Revision to compare with the current base version
1285+
* @param processGroupDTO The ProcessGroupDTO
1286+
* @param flowSnapshot The flow snapshot matching the given version control info
1287+
* @return the updated process group entity
1288+
*/
1289+
ProcessGroupEntity setVersionControlInformation(Revision revision, ProcessGroupDTO processGroupDTO, RegisteredFlowSnapshot flowSnapshot);
1290+
12811291
/**
12821292
* Verifies that the Process Group identified by the given DTO can be updated in the manner appropriate according
12831293
* to the DTO

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,6 +1821,21 @@ public ProcessGroupEntity updateProcessGroup(final Revision revision, final Proc
18211821
return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities);
18221822
}
18231823

1824+
@Override
1825+
public ProcessGroupEntity setVersionControlInformation(final Revision revision, final ProcessGroupDTO processGroupDTO, final RegisteredFlowSnapshot flowSnapshot) {
1826+
final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId());
1827+
final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(revision,
1828+
processGroupNode,
1829+
() -> processGroupDAO.setVersionControlInformation(processGroupDTO, flowSnapshot),
1830+
processGroup -> dtoFactory.createProcessGroupDto(processGroup));
1831+
1832+
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
1833+
final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
1834+
final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier()));
1835+
final List<BulletinEntity> bulletinEntities = getProcessGroupBulletins(processGroupNode);
1836+
return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities);
1837+
}
1838+
18241839
@Override
18251840
public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) {
18261841
if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,34 @@ public Response updateProcessGroup(
516516
updateStrategy = ProcessGroupRecursivity.valueOf(processGroupUpdateStrategy);
517517
}
518518

519+
final VersionControlInformationDTO versionControlInfo = requestProcessGroupDTO.getVersionControlInformation();
520+
if (versionControlInfo != null) {
521+
if (updateStrategy == ProcessGroupRecursivity.ALL_DESCENDANTS) {
522+
throw new IllegalArgumentException("Version Control Information cannot be specified when applying updates recursively");
523+
}
524+
525+
if (StringUtils.isBlank(versionControlInfo.getRegistryId())
526+
|| StringUtils.isBlank(versionControlInfo.getBucketId())
527+
|| StringUtils.isBlank(versionControlInfo.getFlowId())
528+
|| StringUtils.isBlank(versionControlInfo.getVersion())) {
529+
throw new IllegalArgumentException("Version Control Information must contain a registry id, bucket id, flow id, and version");
530+
}
531+
532+
final FlowSnapshotContainer flowSnapshotContainer = getFlowFromRegistry(versionControlInfo);
533+
final RegisteredFlowSnapshot flowSnapshot = flowSnapshotContainer.getFlowSnapshot();
534+
if (flowSnapshot.getFlowContents() != null) {
535+
final VersionedFlowCoordinates versionedFlowCoordinates = flowSnapshot.getFlowContents().getVersionedFlowCoordinates();
536+
if (versionedFlowCoordinates != null) {
537+
versionControlInfo.setStorageLocation(versionedFlowCoordinates.getStorageLocation());
538+
}
539+
}
540+
if (flowSnapshot.getSnapshotMetadata() != null && flowSnapshot.getSnapshotMetadata().getBranch() != null && versionControlInfo.getBranch() == null) {
541+
versionControlInfo.setBranch(flowSnapshot.getSnapshotMetadata().getBranch());
542+
}
543+
versionControlInfo.setGroupId(requestProcessGroupDTO.getId());
544+
requestProcessGroupEntity.setVersionedFlowSnapshot(flowSnapshot);
545+
}
546+
519547
final String executionEngine = requestProcessGroupDTO.getExecutionEngine();
520548
if (executionEngine != null) {
521549
try {
@@ -630,7 +658,15 @@ public Response updateProcessGroup(
630658
final ProcessGroupEntity entity = serviceFacade.updateProcessGroup(revision, groupDTO);
631659

632660
if (requestGroupId.equals(entity.getId())) {
633-
responseEntity = entity;
661+
final VersionControlInformationDTO vciDto = entry.getKey().getComponent().getVersionControlInformation();
662+
final RegisteredFlowSnapshot flowSnapshot = entry.getKey().getVersionedFlowSnapshot();
663+
if (vciDto != null && flowSnapshot != null) {
664+
final Revision updatedRevision = getRevision(entity.getRevision(), entity.getId());
665+
responseEntity = serviceFacade.setVersionControlInformation(updatedRevision, groupDTO, flowSnapshot);
666+
} else {
667+
responseEntity = entity;
668+
}
669+
634670
populateRemainingProcessGroupEntityContent(responseEntity);
635671

636672
// prune response as necessary

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2928,7 +2928,6 @@ public Map<String, String> createVersionControlComponentMappingDto(final Instant
29282928
return mapping;
29292929
}
29302930

2931-
29322931
/**
29332932
* Creates a ProcessGroupContentDTO from the specified ProcessGroup.
29342933
*

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.nifi.groups.ComponentAdditions;
2424
import org.apache.nifi.groups.ProcessGroup;
2525
import org.apache.nifi.groups.VersionedComponentAdditions;
26+
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
2627
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
2728
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
2829
import org.apache.nifi.web.api.entity.ProcessGroupRecursivity;
@@ -136,6 +137,15 @@ public interface ProcessGroupDAO {
136137
*/
137138
ProcessGroup updateProcessGroup(ProcessGroupDTO processGroup);
138139

140+
/**
141+
* Sets the version control info on an unversioned process group.
142+
*
143+
* @param processGroup the process group with the version control info specified
144+
* @param flowSnapshot the flow snapshot for the given version control info
145+
* @return the updated process group
146+
*/
147+
ProcessGroup setVersionControlInformation(ProcessGroupDTO processGroup, RegisteredFlowSnapshot flowSnapshot);
148+
139149
/**
140150
* Updates the process group so that it matches the proposed flow
141151
*

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,10 @@
3939
import org.apache.nifi.groups.VersionedComponentAdditions;
4040
import org.apache.nifi.parameter.ParameterContext;
4141
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
42+
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
4243
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
4344
import org.apache.nifi.registry.flow.VersionControlInformation;
45+
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
4446
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
4547
import org.apache.nifi.remote.RemoteGroupPort;
4648
import org.apache.nifi.web.ResourceNotFoundException;
@@ -124,19 +126,47 @@ public boolean hasProcessGroup(String groupId) {
124126

125127
@Override
126128
public void verifyUpdate(final ProcessGroupDTO processGroup) {
129+
final ProcessGroup group = locateProcessGroup(flowController, processGroup.getId());
130+
verifyCanSetParameterContext(processGroup, group);
131+
132+
final String executionEngine = processGroup.getExecutionEngine();
133+
if (executionEngine != null) {
134+
group.verifyCanSetExecutionEngine(ExecutionEngine.valueOf(executionEngine));
135+
}
136+
137+
final VersionControlInformationDTO versionControlInfoDTO = processGroup.getVersionControlInformation();
138+
final VersionControlInformation versionControlInformation = group.getVersionControlInformation();
139+
if (versionControlInfoDTO != null) {
140+
if (versionControlInformation != null) {
141+
throw new IllegalStateException("Cannot set Version Control Info because process group is already under version control");
142+
}
143+
verifyChildProcessGroupsNotUnderVersionControl(group.getProcessGroups());
144+
}
145+
}
146+
147+
private void verifyChildProcessGroupsNotUnderVersionControl(final Set<ProcessGroup> childGroups) {
148+
if (childGroups == null || childGroups.isEmpty()) {
149+
return;
150+
}
151+
152+
for (final ProcessGroup childGroup : childGroups) {
153+
final VersionControlInformation versionControlInformation = childGroup.getVersionControlInformation();
154+
if (versionControlInformation != null) {
155+
throw new IllegalStateException("Cannot set Version Control Info because a child process group is already under version control");
156+
}
157+
verifyChildProcessGroupsNotUnderVersionControl(childGroup.getProcessGroups());
158+
}
159+
}
160+
161+
private void verifyCanSetParameterContext(final ProcessGroupDTO processGroup, final ProcessGroup group) {
127162
final ParameterContextReferenceEntity parameterContextReference = processGroup.getParameterContext();
128163
if (parameterContextReference == null) {
129164
return;
130165
}
131166

132167
final ParameterContext parameterContext = locateParameterContext(parameterContextReference.getId());
133-
final ProcessGroup group = locateProcessGroup(flowController, processGroup.getId());
134-
group.verifyCanSetParameterContext(parameterContext);
135168

136-
final String executionEngine = processGroup.getExecutionEngine();
137-
if (executionEngine != null) {
138-
group.verifyCanSetExecutionEngine(ExecutionEngine.valueOf(executionEngine));
139-
}
169+
group.verifyCanSetParameterContext(parameterContext);
140170
}
141171

142172
private ParameterContext locateParameterContext(final String id) {
@@ -481,33 +511,48 @@ public ProcessGroup updateProcessGroup(ProcessGroupDTO processGroupDTO) {
481511
return group;
482512
}
483513

514+
@Override
515+
public ProcessGroup setVersionControlInformation(final ProcessGroupDTO processGroup, final RegisteredFlowSnapshot flowSnapshot) {
516+
final ProcessGroup group = locateProcessGroup(flowController, processGroup.getId());
517+
final VersionControlInformationDTO versionControlInfo = processGroup.getVersionControlInformation();
518+
final VersionedProcessGroup versionedProcessGroup = flowSnapshot.getFlowContents();
519+
updateVersionControlInformation(group, versionedProcessGroup, versionControlInfo, Collections.emptyMap());
520+
return group;
521+
}
522+
484523
@Override
485524
public ProcessGroup updateVersionControlInformation(final VersionControlInformationDTO versionControlInformation, final Map<String, String> versionedComponentMapping) {
486525
final String groupId = versionControlInformation.getGroupId();
487526
final ProcessGroup group = locateProcessGroup(flowController, groupId);
488527

528+
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
529+
final InstantiatedVersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getControllerServiceProvider(), flowController.getFlowManager(), false);
530+
531+
updateVersionControlInformation(group, flowSnapshot, versionControlInformation, versionedComponentMapping);
532+
group.onComponentModified();
533+
534+
return group;
535+
}
536+
537+
private void updateVersionControlInformation(final ProcessGroup group, final VersionedProcessGroup flowSnapshot,
538+
final VersionControlInformationDTO versionControlInformation, final Map<String, String> versionedComponentMapping) {
539+
489540
final String registryId = versionControlInformation.getRegistryId();
490541
final FlowRegistryClientNode flowRegistry = flowController.getFlowManager().getFlowRegistryClient(registryId);
491542
final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
492543

493-
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
494-
final VersionedProcessGroup flowSnapshot = mapper.mapProcessGroup(group, flowController.getControllerServiceProvider(), flowController.getFlowManager(), false);
495-
496544
final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
497545
.registryName(registryName)
498546
.flowSnapshot(flowSnapshot)
499547
.build();
500548

501549
group.setVersionControlInformation(vci, versionedComponentMapping);
502-
group.onComponentModified();
503-
504-
return group;
505550
}
506551

507552
@Override
508553
public ProcessGroup disconnectVersionControl(final String groupId) {
509554
final ProcessGroup group = locateProcessGroup(flowController, groupId);
510-
group.disconnectVersionControl(true);
555+
group.disconnectVersionControl();
511556
group.onComponentModified();
512557
return group;
513558
}

nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/flow/registry/FileSystemFlowRegistryClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurat
238238
populateBucket(snapshot, bucketId);
239239
populateFlow(snapshot, bucketId, flowId, versionFiles == null ? 0 : versionFiles.length);
240240

241+
final String latestVersion = getLatestVersion(context, flowVersionLocation).orElse(null);
242+
snapshot.setLatest(version.equals(latestVersion));
243+
241244
return snapshot;
242245
}
243246
}

nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.nifi.web.api.dto.ProcessorDTO;
2626
import org.apache.nifi.web.api.dto.RevisionDTO;
2727
import org.apache.nifi.web.api.dto.SnippetDTO;
28+
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
2829
import org.apache.nifi.web.api.dto.flow.FlowDTO;
2930
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
3031
import org.apache.nifi.web.api.entity.ConnectionEntity;
@@ -369,4 +370,49 @@ public void testStartVersionControlThenModifyAndRevert() throws NiFiClientExcept
369370
assertEquals("UP_TO_DATE", versionedFlowState);
370371
}
371372

373+
@Test
374+
public void testStopVersionControlThenSetVersionControlInfo() throws NiFiClientException, IOException, InterruptedException {
375+
final FlowRegistryClientEntity clientEntity = registerClient();
376+
final ProcessGroupEntity group = getClientUtil().createProcessGroup("Test Set Version Control Info", "root");
377+
378+
final VersionControlInformationEntity vci = getClientUtil().startVersionControl(group, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
379+
380+
// Retrieve PG under version control and verify the PG's VCI
381+
final ProcessGroupEntity groupAfterStartVersionControl = getNifiClient().getProcessGroupClient().getProcessGroup(group.getId());
382+
assertNotNull(groupAfterStartVersionControl);
383+
assertNotNull(groupAfterStartVersionControl.getComponent());
384+
assertNotNull(groupAfterStartVersionControl.getComponent().getVersionControlInformation());
385+
assertEquals(clientEntity.getId(), groupAfterStartVersionControl.getComponent().getVersionControlInformation().getRegistryId());
386+
assertEquals(TEST_FLOWS_BUCKET, groupAfterStartVersionControl.getComponent().getVersionControlInformation().getBucketId());
387+
assertEquals(vci.getVersionControlInformation().getFlowId(), groupAfterStartVersionControl.getComponent().getVersionControlInformation().getFlowId());
388+
assertEquals(vci.getVersionControlInformation().getVersion(), groupAfterStartVersionControl.getComponent().getVersionControlInformation().getVersion());
389+
assertEquals("UP_TO_DATE", groupAfterStartVersionControl.getComponent().getVersionControlInformation().getState());
390+
391+
// Stop version control
392+
getNifiClient().getVersionsClient().stopVersionControl(groupAfterStartVersionControl);
393+
394+
// Retrieve PG again and verify no VCI present
395+
final ProcessGroupEntity groupAfterStopVersionControl = getNifiClient().getProcessGroupClient().getProcessGroup(group.getId());
396+
assertNotNull(groupAfterStopVersionControl);
397+
assertNotNull(groupAfterStopVersionControl.getComponent());
398+
assertNull(groupAfterStopVersionControl.getComponent().getVersionControlInformation());
399+
400+
// Submit PG update specifying VCI
401+
final VersionControlInformationDTO setVersionInfo = new VersionControlInformationDTO();
402+
setVersionInfo.setRegistryId(clientEntity.getId());
403+
setVersionInfo.setBucketId(vci.getVersionControlInformation().getBucketId());
404+
setVersionInfo.setFlowId(vci.getVersionControlInformation().getFlowId());
405+
setVersionInfo.setVersion(vci.getVersionControlInformation().getVersion());
406+
groupAfterStopVersionControl.getComponent().setVersionControlInformation(setVersionInfo);
407+
408+
final ProcessGroupEntity groupAfterSetVersionInfo = getNifiClient().getProcessGroupClient().updateProcessGroup(groupAfterStopVersionControl);
409+
assertNotNull(groupAfterSetVersionInfo);
410+
assertNotNull(groupAfterSetVersionInfo.getComponent());
411+
assertNotNull(groupAfterSetVersionInfo.getComponent().getVersionControlInformation());
412+
assertEquals(clientEntity.getId(), groupAfterSetVersionInfo.getComponent().getVersionControlInformation().getRegistryId());
413+
assertEquals(TEST_FLOWS_BUCKET, groupAfterSetVersionInfo.getComponent().getVersionControlInformation().getBucketId());
414+
assertEquals(vci.getVersionControlInformation().getFlowId(), groupAfterSetVersionInfo.getComponent().getVersionControlInformation().getFlowId());
415+
assertEquals(vci.getVersionControlInformation().getVersion(), groupAfterSetVersionInfo.getComponent().getVersionControlInformation().getVersion());
416+
assertEquals("UP_TO_DATE", groupAfterSetVersionInfo.getComponent().getVersionControlInformation().getState());
417+
}
372418
}

0 commit comments

Comments
 (0)