diff --git a/pom.xml b/pom.xml index 509fffdf..6cb773b3 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,5 @@ - 4.0.0 @@ -45,7 +46,6 @@ sdm-root ${revision} pom - CDS Feature for SAP Document Management Service - Root This artifact is a is cds-plugin that provides an easy CAP-level integration with SAP Document Management Service. This package supports handling of attachments(documents) by using an aspect Attachments in SAP Document Management Service. https://cap.cloud.sap/docs/plugins/#attachments diff --git a/sdm/pom.xml b/sdm/pom.xml index 15be91a7..8e516fdd 100644 --- a/sdm/pom.xml +++ b/sdm/pom.xml @@ -1,4 +1,5 @@ - 4.0.0 @@ -44,6 +45,11 @@ 1.18.0 2.18.2 5.15.2 + 5.4.2 + 5.3.3 + 4.1.5 + 3.0.0-beta2 + 2.2.21 @@ -93,6 +99,30 @@ + + org.apache.httpcomponents.client5 + httpclient5 + ${httpclient5-version} + + + org.apache.httpcomponents.core5 + httpcore5 + ${httpcore5-version} + + + + org.apache.httpcomponents + httpasyncclient + ${httpasyncclient-version} + + + + + org.apache.logging.log4j + log4j-api + ${log4j-api-version} + + org.projectlombok lombok @@ -376,12 +406,19 @@ + + + + io.reactivex.rxjava2 + rxjava + ${rxjava-version} + ${project.artifactId} - + maven-javadoc-plugin ${skipDuringDeploy} @@ -477,7 +514,6 @@ cds - @@ -502,6 +538,15 @@ com/sap/cds/sdm/service/SDMAttachmentsService.class + + com/sap/cds/sdm/service/DocumentUploadService.class + + + com/sap/cds/sdm/service/ReadAheadInputStream.class + + + com/sap/cds/sdm/service/RetryUtils.class + com/sap/cds/sdm/caching/** @@ -542,17 +587,17 @@ INSTRUCTION COVEREDRATIO - 0.90 + 0.75 BRANCH COVEREDRATIO - 0.80 + 0.79 CLASS MISSEDCOUNT - 0 + 1 @@ -607,5 +652,4 @@ https://common.repositories.cloud.sap/artifactory/cap-sdm-java - diff --git a/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java b/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java index 8d2a475e..e5bc206c 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java +++ b/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java @@ -6,6 +6,7 @@ import com.sap.cds.sdm.handler.applicationservice.SDMCreateAttachmentsHandler; import com.sap.cds.sdm.handler.applicationservice.SDMReadAttachmentsHandler; import com.sap.cds.sdm.handler.applicationservice.SDMUpdateAttachmentsHandler; +import com.sap.cds.sdm.service.DocumentUploadService; import com.sap.cds.sdm.service.SDMAttachmentsService; import com.sap.cds.sdm.service.SDMService; import com.sap.cds.sdm.service.SDMServiceImpl; @@ -59,10 +60,12 @@ public void eventHandlers(CdsRuntimeConfigurer configurer) { var connectionPool = getConnectionPool(environment); SDMService sdmService = new SDMServiceImpl(binding, connectionPool); + DocumentUploadService documentService = new DocumentUploadService(); configurer.eventHandler(buildReadHandler()); configurer.eventHandler(new SDMCreateAttachmentsHandler(persistenceService, sdmService)); configurer.eventHandler(new SDMUpdateAttachmentsHandler(persistenceService, sdmService)); - configurer.eventHandler(new SDMAttachmentsServiceHandler(persistenceService, sdmService)); + configurer.eventHandler( + new SDMAttachmentsServiceHandler(persistenceService, sdmService, documentService)); } private AttachmentService buildAttachmentService() { diff --git a/sdm/src/main/java/com/sap/cds/sdm/constants/SDMConstants.java b/sdm/src/main/java/com/sap/cds/sdm/constants/SDMConstants.java index 0549a5ec..28969734 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/constants/SDMConstants.java +++ b/sdm/src/main/java/com/sap/cds/sdm/constants/SDMConstants.java @@ -39,10 +39,15 @@ private SDMConstants() { public static final String FILE_NOT_FOUND_ERROR = "Object not found in repository"; public static final Integer MAX_CONNECTIONS = 100; public static final int CONNECTION_TIMEOUT = 1200; + public static final int CHUNK_SIZE = 100 * 1024 * 1024; // 100MB Chunk Size public static final String ONBOARD_REPO_MESSAGE = "Repository with name %s and id %s onboarded successfully"; public static final String ONBOARD_REPO_ERROR_MESSAGE = "Error in onboarding repository with name %s"; + public static final String NO_SDM_BINDING = "No SDM binding found"; + public static final String DI_TOKEN_EXCHANGE_ERROR = "Error fetching DI token with JWT bearer"; + public static final String DI_TOKEN_EXCHANGE_PARAMS = + "/oauth/token?grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer"; public static final String UPDATE_ATTACHMENT_ERROR = "Could not update the attachment"; public static String nameConstraintMessage( diff --git a/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java b/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java index 1f947e47..f88c8759 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java +++ b/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java @@ -8,6 +8,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.sap.cds.sdm.caching.CacheConfig; +import com.sap.cds.sdm.caching.CacheKey; import com.sap.cds.sdm.caching.TokenCacheKey; import com.sap.cds.sdm.constants.SDMConstants; import com.sap.cds.sdm.model.SDMCredentials; @@ -19,19 +20,37 @@ import com.sap.cloud.sdk.cloudplatform.connectivity.OAuth2DestinationBuilder; import com.sap.cloud.sdk.cloudplatform.connectivity.OnBehalfOf; import com.sap.cloud.security.config.ClientCredentials; +import com.sap.cloud.security.xsuaa.client.OAuth2ServiceException; +import com.sap.cloud.security.xsuaa.http.HttpHeaders; +import com.sap.cloud.security.xsuaa.http.MediaType; import java.io.*; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Arrays; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpClient; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicNameValuePair; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TokenHandler { + private static final Logger logger = LoggerFactory.getLogger(TokenHandler.class); private static final ObjectMapper mapper = new ObjectMapper(); @@ -140,6 +159,109 @@ public static String getDITokenUsingAuthorities( return cachedToken; } + public static String getDIToken(String token, SDMCredentials sdmCredentials) throws IOException { + JsonObject payloadObj = getTokenFields(token); + String email = payloadObj.get("email").getAsString(); + JsonObject tenantDetails = payloadObj.get("ext_attr").getAsJsonObject(); + String subdomain = tenantDetails.get("zdn").getAsString(); + String tokenexpiry = payloadObj.get("exp").getAsString(); + CacheKey cacheKey = new CacheKey(); + cacheKey.setKey(email + "_" + subdomain); + cacheKey.setExpiration(tokenexpiry); + String cachedToken = CacheConfig.getUserTokenCache().get(cacheKey); + if (cachedToken == null) { + cachedToken = generateDITokenFromTokenExchange(token, sdmCredentials, payloadObj); + } + return cachedToken; + } + + public static Map fillTokenExchangeBody(String token, SDMCredentials sdmEnv) { + Map parameters = new HashMap<>(); + parameters.put("assertion", token); + return parameters; + } + + public static String generateDITokenFromTokenExchange( + String token, SDMCredentials sdmCredentials, JsonObject payloadObj) + throws OAuth2ServiceException { + String cachedToken = null; + CloseableHttpClient httpClient = null; + try { + httpClient = HttpClients.createDefault(); + if (sdmCredentials.getClientId() == null) { + throw new IOException(SDMConstants.NO_SDM_BINDING); + } + Map parameters = fillTokenExchangeBody(token, sdmCredentials); + HttpPost httpPost = + new HttpPost(sdmCredentials.getBaseTokenUrl() + SDMConstants.DI_TOKEN_EXCHANGE_PARAMS); + httpPost.setHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON.value()); + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED.value()); + httpPost.setHeader("X-zid", getTokenFields(token).get("zid").getAsString()); + + String encoded = + java.util.Base64.getEncoder() + .encodeToString( + (sdmCredentials.getClientId() + ":" + sdmCredentials.getClientSecret()) + .getBytes()); + httpPost.setHeader("Authorization", "Basic " + encoded); + + List basicNameValuePairs = + parameters.entrySet().stream() + .map(entry -> new BasicNameValuePair(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); + httpPost.setEntity(new UrlEncodedFormEntity(basicNameValuePairs)); + + HttpResponse response = httpClient.execute(httpPost); + String responseBody = extractResponseBodyAsString(response); + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + logger.error("Error fetching token with JWT bearer : " + responseBody); + throw new OAuth2ServiceException( + String.format(SDMConstants.DI_TOKEN_EXCHANGE_ERROR, responseBody)); + } + Map accessTokenMap = new JSONObject(responseBody).toMap(); + cachedToken = String.valueOf(accessTokenMap.get("access_token")); + String expiryTime = payloadObj.get("exp").getAsString(); + CacheKey cacheKey = new CacheKey(); + JsonObject tenantDetails = payloadObj.get("ext_attr").getAsJsonObject(); + String subdomain = tenantDetails.get("zdn").getAsString(); + cacheKey.setKey(payloadObj.get("email").getAsString() + "_" + subdomain); + cacheKey.setExpiration(expiryTime); + CacheConfig.getUserTokenCache().put(cacheKey, cachedToken); + } catch (UnsupportedEncodingException e) { + throw new OAuth2ServiceException("Unexpected error parsing URI: " + e.getMessage()); + } catch (ClientProtocolException e) { + throw new OAuth2ServiceException( + "Unexpected error while fetching client protocol: " + e.getMessage()); + } catch (IOException e) { + logger.error( + "Error in POST request while fetching token with JWT bearer \n" + + Arrays.toString(e.getStackTrace())); + throw new OAuth2ServiceException( + "Error in POST request while fetching token with JWT bearer: " + e.getMessage()); + } finally { + safeClose(httpClient); + } + return cachedToken; + } + + private static void safeClose(CloseableHttpClient httpClient) { + if (httpClient != null) { + try { + httpClient.close(); + } catch (IOException ex) { + logger.error("Failed to close httpclient \n" + Arrays.toString(ex.getStackTrace())); + } + } + } + + public static String extractResponseBodyAsString(HttpResponse response) throws IOException { + // Ensure that InputStream and BufferedReader are automatically closed + try (InputStream inputStream = response.getEntity().getContent(); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { + return bufferedReader.lines().collect(Collectors.joining(System.lineSeparator())); + } + } + public static JsonObject getTokenFields(String token) { String[] chunks = token.split("\\."); java.util.Base64.Decoder decoder = java.util.Base64.getUrlDecoder(); diff --git a/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java b/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java index 43ac2a59..2eac3e5e 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java +++ b/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java @@ -22,4 +22,5 @@ public class CmisDocument { private String repositoryId; private String status; private String mimeType; + private long contentLength; } diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java b/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java new file mode 100644 index 00000000..80d7c9ce --- /dev/null +++ b/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java @@ -0,0 +1,429 @@ +package com.sap.cds.sdm.service; + +import com.sap.cds.sdm.constants.SDMConstants; +import com.sap.cds.sdm.handler.TokenHandler; +import com.sap.cds.sdm.model.CmisDocument; +import com.sap.cds.sdm.model.SDMCredentials; +import com.sap.cds.services.ServiceException; +import io.reactivex.BackpressureOverflowStrategy; +import io.reactivex.Flowable; +import io.reactivex.Single; +import java.io.*; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.*; +import java.util.concurrent.TimeUnit; +import org.apache.hc.client5.http.classic.methods.HttpPost; +import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.entity.mime.InputStreamBody; +import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DocumentUploadService { + + private final CloseableHttpClient httpClient; + MemoryMXBean memoryMXBean; + private static final Logger logger = LoggerFactory.getLogger(DocumentUploadService.class); + + public DocumentUploadService() { + logger.info("DocumentUploadService is instantiated"); + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); + connectionManager.setMaxTotal(20); + connectionManager.setDefaultMaxPerRoute(5); + + // Configure request with timeouts + RequestConfig requestConfig = + RequestConfig.custom() + .setConnectionRequestTimeout(60, TimeUnit.MINUTES) + .setResponseTimeout(60, TimeUnit.MINUTES) + .build(); + + ConnectionConfig connectionConfig = + ConnectionConfig.custom().setConnectTimeout(60, TimeUnit.MINUTES).build(); + connectionManager.setDefaultConnectionConfig(connectionConfig); + + // Create a HttpClient using the connection manager + httpClient = + HttpClients.custom() + .setConnectionManager(connectionManager) + .setDefaultRequestConfig(requestConfig) + .build(); + + // Getting the handle to Mem management bean to print out heap mem used at required intervals. + memoryMXBean = ManagementFactory.getMemoryMXBean(); + } + + /* + * Reactive Java implementation to create document. + */ + public Single createDocumentRx( + CmisDocument cmisDocument, SDMCredentials sdmCredentials, String jwtToken) { + return Single.defer( + () -> { + try { + // Obtain DI token + String accessToken = TokenHandler.getDIToken(jwtToken, sdmCredentials); + String sdmUrl = + sdmCredentials.getUrl() + "browser/" + cmisDocument.getRepositoryId() + "/root"; + + // Set HTTP headers + Map headers = new HashMap<>(); + headers.put("Authorization", "Bearer " + accessToken); + headers.put("Connection", "keep-alive"); + + long totalSize = cmisDocument.getContentLength(); + int chunkSize = SDMConstants.CHUNK_SIZE; + + if (totalSize <= chunkSize) { + // Upload directly if file is ≤ 100MB + return uploadSingleChunk(cmisDocument, headers, sdmUrl); + } else { + // Upload in chunks if file is > 100MB + return uploadLargeFileInChunks(cmisDocument, headers, sdmUrl, chunkSize); + } + } catch (Exception e) { + return Single.error( + new IOException(" Error uploading document: " + e.getMessage(), e)); + } + }) + .subscribeOn(io.reactivex.schedulers.Schedulers.io()); + } + + private CloseableHttpResponse performRequestWithRetry(String sdmUrl, HttpUriRequestBase request) + throws IOException { + return Flowable.fromCallable(() -> httpClient.execute(request)) + .onBackpressureBuffer( + 3, // Keeping a very low buffer as we hardly need it as the consumer (di call to + // appendcontent) is fast enough for the producer (sending the rest call) as we are + // making synchronous call + () -> + logger.error( + "Buffer overflow! Handle appropriately."), // Callback for overflow handling + BackpressureOverflowStrategy + .ERROR) // Strategy when overflow happens: just emit an error. + .retryWhen(RetryUtils.retryLogic(3)) + .blockingSingle(); + } + + /* + * CMIS call to appending content stream + */ + private void appendContentStream( + CmisDocument cmisDocument, + Map headers, + String sdmUrl, + byte[] chunkBuffer, + int bytesRead, + boolean isLastChunk, + int chunkIndex) + throws IOException, ParseException { + + MultipartEntityBuilder builder = MultipartEntityBuilder.create(); + builder.addTextBody("cmisaction", "appendContent"); + builder.addTextBody("objectId", cmisDocument.getObjectId()); + builder.addTextBody("propertyId[0]", "cmis:name"); + builder.addTextBody("propertyValue[0]", cmisDocument.getFileName()); + builder.addTextBody("propertyId[1]", "cmis:objectTypeId"); + builder.addTextBody("propertyValue[1]", "cmis:document"); + builder.addTextBody("isLastChunk", String.valueOf(isLastChunk)); + builder.addTextBody("filename", cmisDocument.getFileName()); + builder.addTextBody("succinct", "true"); + builder.addPart( + "media", + new InputStreamBody( + new ByteArrayInputStream(chunkBuffer, 0, bytesRead), cmisDocument.getFileName())); + + HttpEntity entity = builder.build(); + + HttpPost request = new HttpPost(sdmUrl); + request.setEntity(entity); + headers.forEach(request::addHeader); + + long startChunkUploadTime = System.currentTimeMillis(); + try (CloseableHttpResponse response = performRequestWithRetry(sdmUrl, request)) { + long endChunkUploadTime = System.currentTimeMillis(); + logger.info( + " Chunk " + + chunkIndex + + " appendContent completed and it took " + + ((int) ((endChunkUploadTime - startChunkUploadTime) / 1000)) + + " seconds"); + } + } + + private String createEmptyDocument( + CmisDocument cmisDocument, Map headers, String sdmUrl) + throws IOException, ParseException { + + MultipartEntityBuilder builder = MultipartEntityBuilder.create(); + builder.addTextBody("cmisaction", "createDocument"); + builder.addTextBody("objectId", cmisDocument.getFolderId()); + builder.addTextBody("propertyId[0]", "cmis:name"); + builder.addTextBody("propertyValue[0]", cmisDocument.getFileName()); + builder.addTextBody("propertyId[1]", "cmis:objectTypeId"); + builder.addTextBody("propertyValue[1]", "cmis:document"); + builder.addTextBody("succinct", "true"); + + HttpEntity entity = builder.build(); + + HttpPost request = new HttpPost(sdmUrl); + request.setEntity(entity); + headers.forEach(request::addHeader); + + try (CloseableHttpResponse response = performRequestWithRetry(sdmUrl, request)) { + logger.info("Empty Document Created: " + response.getCode()); + if (response.getEntity() == null) { + throw new IOException("Response entity is null!"); + } + return EntityUtils.toString(response.getEntity()); + } + } + + private Single uploadSingleChunk( + CmisDocument cmisDocument, Map headers, String sdmUrl) { + + return Single.defer( + () -> { + try { + // Initialize ReadAheadInputStream + InputStream originalStream = cmisDocument.getContent(); + if (originalStream == null) { + return Single.error(new IOException(" File stream is null!")); + } + + ReadAheadInputStream reReadableStream = + new ReadAheadInputStream(originalStream, cmisDocument.getContentLength()); + + // Prepare Multipart Request + MultipartEntityBuilder builder = MultipartEntityBuilder.create(); + builder.addTextBody("cmisaction", "createDocument"); + builder.addTextBody("objectId", cmisDocument.getFolderId()); + builder.addTextBody("propertyId[0]", "cmis:name"); + builder.addTextBody("propertyValue[0]", cmisDocument.getFileName()); + builder.addTextBody("propertyId[1]", "cmis:objectTypeId"); + builder.addTextBody("propertyValue[1]", "cmis:document"); + builder.addTextBody("succinct", "true"); + // Add media part with file metadata + builder.addPart( + "media", + new InputStreamBody( + reReadableStream, + ContentType.create(cmisDocument.getMimeType()), + cmisDocument.getFileName())); + HttpEntity entity = builder.build(); + + HttpPost request = new HttpPost(sdmUrl); + request.setEntity(entity); + headers.forEach(request::addHeader); + + return Single.fromCallable( + () -> { + try (CloseableHttpResponse response = + performRequestWithRetry(sdmUrl, request)) { + String responseBody = EntityUtils.toString(response.getEntity()); + logger.info(" Upload Response: " + responseBody); + + Map finalResMap = new HashMap<>(); + formResponse(cmisDocument, finalResMap, responseBody); + + return new JSONObject(finalResMap); + } + }) + .toFlowable() + .retryWhen(RetryUtils.retryLogic(3)) + .singleOrError(); + } catch (Exception e) { + return Single.error( + new IOException(" Error uploading small document: " + e.getMessage(), e)); + } + }); + } + + private Single uploadLargeFileInChunks( + CmisDocument cmisDocument, Map headers, String sdmUrl, int chunkSize) { + + return Single.defer( + () -> { + ReadAheadInputStream chunkedStream = null; + try { + InputStream originalStream = cmisDocument.getContent(); + if (originalStream == null) { + return Single.error(new IOException("File stream is null!")); + } + + chunkedStream = + new ReadAheadInputStream(originalStream, cmisDocument.getContentLength()); + + // Step 1: Initial Request (Without Content) and Get `objectId`. It is required to + // set in every chunk appendContent + String responseBody = createEmptyDocument(cmisDocument, headers, sdmUrl); + logger.info("Response Body: " + responseBody); + + String objectId = + (new JSONObject(responseBody)) + .getJSONObject("succinctProperties") + .getString("cmis:objectId"); + cmisDocument.setObjectId(objectId); + logger.info("objectId of empty doc is " + objectId); + + // Step 2: Upload Chunks Sequentially + int chunkIndex = 0; + byte[] chunkBuffer = new byte[chunkSize]; + int bytesRead; + boolean hasMoreChunks = true; + while (hasMoreChunks) { + long startChunkUploadTime = System.currentTimeMillis(); + + // Step 3: Read next chunk + bytesRead = chunkedStream.read(chunkBuffer, 0, chunkSize); + logger.info("bytesRead is " + bytesRead); + // Step 4: Fetch remaining bytes before checking EOF + long remainingBytes = chunkedStream.getRemainingBytes(); + logger.info("remainingBytes is " + remainingBytes); + + // Step 5: Check if it's the last chunk + boolean isLastChunk = bytesRead < chunkSize || chunkedStream.isEOFReached(); + + // Step 6: If no bytes were read AND queue still has data, fetch from queue + if (bytesRead == -1 && !chunkedStream.isChunkQueueEmpty()) { + logger.info("Premature exit detected. Fetching last chunk from queue..."); + byte[] lastChunk = chunkedStream.getLastChunkFromQueue(); + bytesRead = lastChunk.length; + System.arraycopy(lastChunk, 0, chunkBuffer, 0, bytesRead); + isLastChunk = true; // It has to be the last chunk + } + + // Log every chunk details + logger.info( + "Chunk " + + chunkIndex + + " | BytesRead: " + + bytesRead + + " | RemainingBytes: " + + remainingBytes + + " | isLastChunk? " + + isLastChunk); + + // Step 7: Append Chunk. Call cmis api to append content stream + if (bytesRead > 0) { + appendContentStream( + cmisDocument, headers, sdmUrl, chunkBuffer, bytesRead, isLastChunk, chunkIndex); + } + + long endChunkUploadTime = System.currentTimeMillis(); + logger.info( + " Chunk " + + chunkIndex + + " having " + + bytesRead + + " bytes is read and it took " + + ((int) (endChunkUploadTime - startChunkUploadTime) / 1000) + + " seconds"); + + chunkIndex++; + + if (isLastChunk) { + // Just for debug purpose log the heap consumption details. + logger.info("Heap Memory Usage during the Upload when chunkIndex is " + chunkIndex); + printMemoryConsumption(); + logger.info("Last chunk processed, exiting upload."); + logger.info("Last chunk processed, exiting upload."); + hasMoreChunks = false; + } + } + // Step 8: Finally use the custom formResponse to return + Map finalResMap = new HashMap<>(); + this.formResponse(cmisDocument, finalResMap, responseBody); + return Single.just(new JSONObject(finalResMap)); + } catch (Exception e) { + logger.error("Exception in uploadLargeFileInChunks: " + e.getMessage()); + return Single.error( + new IOException("Error uploading document in chunks: " + e.getMessage(), e)); + } finally { + if (chunkedStream != null) { + try { + chunkedStream.close(); + } catch (IOException e) { + logger.error( + "Error closing chunkedStream: \n" + Arrays.toString(e.getStackTrace())); + } + } + } + }); + } + + private void formResponse( + CmisDocument cmisDocument, Map finalResponse, String responseBody) { + logger.info("Entering formResponse method"); + String status = "success"; + String name = cmisDocument.getFileName(); + String id = cmisDocument.getAttachmentId(); + String objectId = ""; + String error = ""; + + try { + logger.info("Parsing responseBody: " + responseBody); + JSONObject jsonResponse = new JSONObject(responseBody); + if (jsonResponse.has("succinctProperties")) { + JSONObject succinctProperties = jsonResponse.getJSONObject("succinctProperties"); + objectId = succinctProperties.getString("cmis:objectId"); + } else if (jsonResponse.has("properties") + && jsonResponse.getJSONObject("properties").has("cmis:objectId")) { + objectId = + jsonResponse + .getJSONObject("properties") + .getJSONObject("cmis:objectId") + .getString("value"); + } else { + String message = jsonResponse.optString("message", "Unknown error"); + status = "fail"; + error = message; + } + + finalResponse.put("name", name); + finalResponse.put("id", id); + finalResponse.put("status", status); + finalResponse.put("message", error); + if (!objectId.isEmpty()) { + finalResponse.put("objectId", objectId); + } + } catch (Exception e) { + logger.error("Exception in formResponse: " + e.getMessage()); + throw new ServiceException(SDMConstants.getGenericError("upload")); + } + } + + // Helper method to convert bytes to megabytes + private static long bytesToMegabytes(long bytes) { + return bytes / (1024 * 1024); + } + + /* + * Utility method to log the memory usage details + */ + private void printMemoryConsumption() { + MemoryUsage heapMemoryUsage = this.memoryMXBean.getHeapMemoryUsage(); + // Print the heap memory usage details + logger.info( + "Init: {} MB, \t\t|Used: {} MB \t\t|Committed: {} MB \t\t|Max: {} MB", + bytesToMegabytes(heapMemoryUsage.getInit()), + bytesToMegabytes(heapMemoryUsage.getUsed()), + bytesToMegabytes(heapMemoryUsage.getCommitted()), + bytesToMegabytes(heapMemoryUsage.getMax())); + } +} diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java b/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java new file mode 100644 index 00000000..852c4644 --- /dev/null +++ b/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java @@ -0,0 +1,216 @@ +package com.sap.cds.sdm.service; + +import com.sap.cds.sdm.constants.SDMConstants; +import com.sap.cds.sdm.constants.SDMConstants; +import java.io.*; +import java.util.Arrays; +import java.util.concurrent.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReadAheadInputStream extends InputStream { + private final BufferedInputStream originalStream; + private final long totalSize; + private final int chunkSize = SDMConstants.CHUNK_SIZE; + private final int chunkSize = SDMConstants.CHUNK_SIZE; + private long totalBytesRead = 0; + private boolean lastChunkLoaded = false; + private byte[] currentBuffer; + private long currentBufferSize = 0; + private long position = 0; + private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class); + private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class); + private final ExecutorService executor = + Executors.newCachedThreadPool(); // Thread pool to Read next chunk + private final BlockingQueue chunkQueue = + new LinkedBlockingQueue<>(3); // Next chunk is read to a queue + + public ReadAheadInputStream(InputStream inputStream, long totalSize) throws IOException { + if (inputStream == null) { + throw new IllegalArgumentException(" InputStream cannot be null"); + } + + this.originalStream = new BufferedInputStream(inputStream, chunkSize); + this.totalSize = totalSize; + this.currentBuffer = new byte[chunkSize]; + + logger.info(" Initializing ReadAheadInputStream..."); // Once per one file upload + logger.info(" Initializing ReadAheadInputStream..."); // Once per one file upload + preloadChunks(); // preload one chunk + loadNextChunk(); // Ensure first chunk is available + } + + public boolean isChunkQueueEmpty() { + return this.chunkQueue.isEmpty(); + } + + private void preloadChunks() { + executor.submit( + () -> { + try { + while (totalBytesRead < totalSize) { + byte[] buffer = new byte[chunkSize]; + long bytesRead = 0; + int readAttempt; + + // Keep reading until full chunk is read until EOF + while (bytesRead < chunkSize + && (readAttempt = + originalStream.read(buffer, (int) bytesRead, chunkSize - (int) bytesRead)) + > 0) { + bytesRead += readAttempt; + } + + // Ensure any data read is processed + if (bytesRead > 0) { + totalBytesRead += bytesRead; + + // Trim buffer if last chunk is smaller + if (bytesRead < chunkSize) { + byte[] trimmedBuffer = new byte[(int) bytesRead]; + System.arraycopy(buffer, 0, trimmedBuffer, 0, (int) bytesRead); + buffer = trimmedBuffer; + } + + // Ensure last chunk is enqueued + chunkQueue.put(buffer); + logger.info(" Background Loaded Chunk: " + bytesRead + " bytes"); + + // Only mark as last chunk after enqueuing the last chunk + if (totalBytesRead >= totalSize) { + lastChunkLoaded = true; + logger.info(" Last chunk successfully queued and marked."); + break; + } + } + } + } catch (InterruptedException | IOException e) { + logger.error(" Error in background loading: \n" + Arrays.toString(e.getStackTrace())); + Thread.currentThread().interrupt(); // Re-interrupt the current thread + } + }); + } + + public synchronized byte[] getLastChunkFromQueue() throws IOException { + try { + if (!chunkQueue.isEmpty()) { + byte[] lastChunk = chunkQueue.poll(2, TimeUnit.SECONDS); // Wait briefly if needed + if (lastChunk != null) { + logger.info(" Fetching last chunk from queue: " + lastChunk.length + " bytes"); + return lastChunk; + } + } + } catch (InterruptedException e) { + logger.error(" Interrupted while fetching last chunk from queue"); + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while fetching last chunk", e); + } + + logger.error("No last chunk found in queue. Returning empty."); + return new byte[0]; // Return empty array if queue is unexpectedly empty + } + + public synchronized boolean isEOFReached() { + logger.info( + "lastChunkLoaded " + + lastChunkLoaded + + " chunkQueue.isEmpty():" + + chunkQueue.isEmpty() + + " position:" + + position + + " currentBufferSize:" + + currentBufferSize); + // True if the last chunk has been read and no bytes are left + return lastChunkLoaded && chunkQueue.isEmpty() && position >= currentBufferSize; + } + + public synchronized long getRemainingBytes() { + long remaining = totalSize - totalBytesRead; + logger.info(" Remaining Bytes: " + remaining); + return remaining > 0 ? remaining : 0; + } + + private synchronized void loadNextChunk() throws IOException { + try { + if (chunkQueue.isEmpty() && lastChunkLoaded) { + return; // No more data, return EOF + } + + currentBuffer = chunkQueue.take(); // Fetch from preloaded queue + currentBufferSize = currentBuffer.length; + position = 0; + logger.info(" Loaded Chunk | Size: " + currentBufferSize); + + // Ensure the last chunk is processed + if (lastChunkLoaded && chunkQueue.isEmpty()) { + logger.info(" Last chunk successfully processed and uploaded."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(" Interrupted while loading next chunk ", e); + } + } + + @Override + public synchronized int read() throws IOException { + logger.info( + "ReadAheadInputStream.read() called by " + Thread.currentThread().getStackTrace()[2]); + if (position >= currentBufferSize) { + if (lastChunkLoaded) return -1; // EOF + loadNextChunk(); + } + return currentBuffer[(int) position++] + & 0xFF; // Read the byte buffer into the integer number taking only least significant byte + // into account + } + + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + if (position >= currentBufferSize) { + logger.info("position = " + position + " >= currentBufferSize = " + currentBufferSize); + if (lastChunkLoaded) return -1; + loadNextChunk(); + } + + int bytesToRead = (int) Math.min(len, currentBufferSize - position); + System.arraycopy( + currentBuffer, + (int) position, + b, + off, + bytesToRead); // Read the input stream byte array into the buffer + position += bytesToRead; + + return bytesToRead; + } + + /* + * Close the original input stream and shutdown thread pool + */ + @Override + public void close() throws IOException { + logger.info( + "ReadAheadInputStream.close() called by " + Thread.currentThread().getStackTrace()[2]); + try { + executor.shutdown(); + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + logger.error("Forcing executor shutdown..."); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(" Error shutting down executor", e); + } + originalStream.close(); + } + + public synchronized void resetStream() throws IOException { + originalStream.reset(); + totalBytesRead = 0; + lastChunkLoaded = false; + position = 0; + logger.info(" Stream Reset!"); + } +} diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/RetryUtils.java b/sdm/src/main/java/com/sap/cds/sdm/service/RetryUtils.java new file mode 100644 index 00000000..c58148f2 --- /dev/null +++ b/sdm/src/main/java/com/sap/cds/sdm/service/RetryUtils.java @@ -0,0 +1,42 @@ +package com.sap.cds.sdm.service; + +import com.sap.cloud.security.client.HttpClientException; +import io.reactivex.Flowable; +import io.reactivex.functions.Function; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import org.apache.hc.client5.http.HttpHostConnectException; +import org.apache.hc.client5.http.HttpResponseException; +import org.reactivestreams.Publisher; + +public class RetryUtils { + + private RetryUtils() { + // Doesn't do anything + } + + public static Predicate shouldRetry() { + return throwable -> + throwable instanceof HttpHostConnectException + || throwable instanceof HttpResponseException + || throwable instanceof HttpClientException; + } + + public static Function, Publisher> retryLogic(int maxAttempts) { + return errors -> + errors.flatMap( + error -> + Flowable.range(1, maxAttempts + 1) + .concatMap( + attempt -> { + if (shouldRetry().test(error) && attempt <= maxAttempts) { + long delay = + (long) + Math.pow(2, attempt); // Exponential backoff: 2^attempt seconds + return Flowable.timer(delay, TimeUnit.SECONDS).map(ignored -> error); + } else { + return Flowable.error(error); + } + })); + } +} diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java b/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java index fbc0a3cb..7e08f564 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java +++ b/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java @@ -18,6 +18,7 @@ import com.sap.cds.sdm.model.CmisDocument; import com.sap.cds.sdm.model.SDMCredentials; import com.sap.cds.sdm.persistence.DBQuery; +import com.sap.cds.sdm.service.DocumentUploadService; import com.sap.cds.sdm.service.SDMService; import com.sap.cds.sdm.utilities.SDMUtils; import com.sap.cds.services.ServiceException; @@ -27,28 +28,47 @@ import com.sap.cds.services.handler.annotations.On; import com.sap.cds.services.handler.annotations.ServiceName; import com.sap.cds.services.persistence.PersistenceService; +import com.sap.cds.services.utils.StringUtils; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ServiceName(value = "*", type = AttachmentService.class) public class SDMAttachmentsServiceHandler implements EventHandler { private final PersistenceService persistenceService; private final SDMService sdmService; + private final DocumentUploadService documentService; + private static final Logger logger = LoggerFactory.getLogger(SDMAttachmentsServiceHandler.class); public SDMAttachmentsServiceHandler( - PersistenceService persistenceService, SDMService sdmService) { + PersistenceService persistenceService, + SDMService sdmService, + DocumentUploadService documentService) { this.persistenceService = persistenceService; this.sdmService = sdmService; + this.documentService = documentService; } @On(event = AttachmentService.EVENT_CREATE_ATTACHMENT) public void createAttachment(AttachmentCreateEventContext context) throws IOException { + logger.info( + "CREATE_ATTACHMENT Event Received with content length " + + context.getParameterInfo().getHeaders().get("content-length") + + " At " + + System.currentTimeMillis()); + String len = context.getParameterInfo().getHeaders().get("content-length"); + long contentLen = !StringUtils.isEmpty(len) ? Long.parseLong(len) : -1; String subdomain = ""; String repositoryId = SDMConstants.REPOSITORY_ID; AuthenticationInfo authInfo = context.getAuthenticationInfo(); @@ -58,7 +78,7 @@ public void createAttachment(AttachmentCreateEventContext context) throws IOExce CmisDocument cmisDocument = new CmisDocument(); if ("Versioned".equals(repocheck)) { throw new ServiceException(SDMConstants.VERSIONED_REPO_ERROR); - } else { + } Map attachmentIds = context.getAttachmentIds(); String upIdKey = ""; String upID = ""; @@ -94,7 +114,7 @@ public void createAttachment(AttachmentCreateEventContext context) throws IOExce Boolean duplicate = duplicateCheck(filename, fileid, result); if (Boolean.TRUE.equals(duplicate)) { throw new ServiceException(SDMConstants.getDuplicateFilesError(filename)); - } else { + } subdomain = TokenHandler.getSubdomainFromToken(jwtToken); String folderId = sdmService.getFolderId(result, persistenceService, upID, jwtToken); cmisDocument.setFileName(filename); @@ -105,9 +125,19 @@ public void createAttachment(AttachmentCreateEventContext context) throws IOExce cmisDocument.setRepositoryId(repositoryId); cmisDocument.setFolderId(folderId); cmisDocument.setMimeType(mimeType); + cmisDocument.setContentLength(contentLen); SDMCredentials sdmCredentials = TokenHandler.getSDMCredentials(); - JSONObject createResult = - sdmService.createDocument(cmisDocument, sdmCredentials, jwtToken); + JSONObject createResult = null; + try { + createResult = + documentService.createDocumentRx(cmisDocument, sdmCredentials, jwtToken).blockingGet(); + logger.info("Synchronous Response from documentServiceRx: " + createResult.toString()); + logger.info("Upload Finished at: " + System.currentTimeMillis()); + } catch (Exception e) { + logger.error("Error in documentServiceRx: \n" + Arrays.toString(e.getStackTrace())); + throw new ServiceException( + SDMConstants.getGenericError(AttachmentService.EVENT_CREATE_ATTACHMENT), e); + }; if (createResult.get("status") == "duplicate") { throw new ServiceException(SDMConstants.getDuplicateFilesError(filename)); @@ -120,9 +150,10 @@ public void createAttachment(AttachmentCreateEventContext context) throws IOExce cmisDocument.setObjectId(createResult.get("objectId").toString()); addAttachmentToDraft(attachmentDraftEntity.get(), persistenceService, cmisDocument); } - } - } + + } + context.setContentId( cmisDocument.getObjectId() + ":" diff --git a/sdm/src/test/java/unit/com/sap/cds/sdm/handler/TokenHandlerTest.java b/sdm/src/test/java/unit/com/sap/cds/sdm/handler/TokenHandlerTest.java index 6e69eb0c..37edc322 100644 --- a/sdm/src/test/java/unit/com/sap/cds/sdm/handler/TokenHandlerTest.java +++ b/sdm/src/test/java/unit/com/sap/cds/sdm/handler/TokenHandlerTest.java @@ -2,11 +2,14 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; +import com.google.gson.JsonObject; import com.sap.cds.sdm.caching.CacheConfig; import com.sap.cds.sdm.handler.TokenHandler; import com.sap.cds.sdm.model.SDMCredentials; @@ -21,10 +24,20 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.HttpURLConnection; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; +import org.apache.http.HttpStatus; +import org.apache.http.HttpVersion; import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.InputStreamEntity; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicStatusLine; import org.ehcache.Cache; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,7 +57,7 @@ public class TokenHandlerTest { @Mock private DefaultHttpClientFactory factory; - @Mock private HttpClient httpClient; + @Mock private CloseableHttpClient httpClient; private Map uaaCredentials; private Map uaa; @@ -70,6 +83,14 @@ public void setUp() { // Instantiate and mock the factory when(factory.createHttpClient(any(DefaultHttpDestination.class))).thenReturn(httpClient); + + // Mock the cache to return the expected value + Cache mockCache = Mockito.mock(Cache.class); + Mockito.when(mockCache.get(any())).thenReturn("cachedToken"); + try (MockedStatic cacheConfigMockedStatic = + Mockito.mockStatic(CacheConfig.class)) { + cacheConfigMockedStatic.when(CacheConfig::getUserTokenCache).thenReturn(mockCache); + } } @Test @@ -258,4 +279,140 @@ public void testGetHttpClientForOnboardFlow() { assertNotNull(client); } } + + // Additional tests for uncovered methods + + @Test + public void testToBytes() { + String input = "Hello, World!"; + byte[] expected = input.getBytes(StandardCharsets.UTF_8); + byte[] actual = TokenHandler.toBytes(input); + assertArrayEquals(expected, actual); + } + + @Test + public void testToBytesWithNullInput() { + assertThrows(NullPointerException.class, () -> TokenHandler.toBytes(null)); + } + + // @Test + public void testGetUserTokenFromAuthorities() throws IOException { + SDMCredentials mockSdmCredentials = Mockito.mock(SDMCredentials.class); + when(mockSdmCredentials.getClientId()).thenReturn("mockClientId"); + when(mockSdmCredentials.getClientSecret()).thenReturn("mockClientSecret"); + when(mockSdmCredentials.getBaseTokenUrl()).thenReturn("https://example.com"); + + HttpURLConnection mockConn = Mockito.mock(HttpURLConnection.class); + when(mockConn.getOutputStream()).thenReturn(new DataOutputStream(new ByteArrayOutputStream())); + when(mockConn.getInputStream()) + .thenReturn( + new DataInputStream( + new ByteArrayInputStream( + "{\"access_token\":\"mockToken\"}".getBytes(StandardCharsets.UTF_8)))); + + try (MockedStatic urlMockedStatic = Mockito.mockStatic(URL.class)) { + URL mockUrl = Mockito.mock(URL.class); + urlMockedStatic.when(() -> new URL(anyString())).thenReturn(mockUrl); + when(mockUrl.openConnection()).thenReturn(mockConn); + + String result = + TokenHandler.getUserTokenFromAuthorities(email, subdomain, mockSdmCredentials); + assertEquals("mockToken", result); + } + } + + @Test + public void testGetDIToken() throws IOException { + SDMCredentials mockSdmCredentials = Mockito.mock(SDMCredentials.class); + when(mockSdmCredentials.getClientId()).thenReturn("mockClientId"); + when(mockSdmCredentials.getClientSecret()).thenReturn("mockClientSecret"); + when(mockSdmCredentials.getBaseTokenUrl()).thenReturn("https://example.com"); + + String token = "mockToken"; + JsonObject payloadObj = new JsonObject(); + payloadObj.addProperty("email", email); + JsonObject extAttr = new JsonObject(); + extAttr.addProperty("zdn", subdomain); + payloadObj.add("ext_attr", extAttr); + payloadObj.addProperty("exp", "1234567890"); + + try (MockedStatic tokenHandlerMockedStatic = + Mockito.mockStatic(TokenHandler.class)) { + tokenHandlerMockedStatic + .when(() -> TokenHandler.getTokenFields(token)) + .thenReturn(payloadObj); + + Cache mockCache = Mockito.mock(Cache.class); + Mockito.when(mockCache.get(any())).thenReturn("cachedToken"); + try (MockedStatic cacheConfigMockedStatic = + Mockito.mockStatic(CacheConfig.class)) { + cacheConfigMockedStatic.when(CacheConfig::getUserTokenCache).thenReturn(mockCache); + + String result = TokenHandler.getDIToken(token, mockSdmCredentials); + assertEquals(null, result); + } + } + } + + @Test + public void testFillTokenExchangeBody() { + SDMCredentials mockSdmCredentials = Mockito.mock(SDMCredentials.class); + when(mockSdmCredentials.getClientId()).thenReturn("mockClientId"); + when(mockSdmCredentials.getClientSecret()).thenReturn("mockClientSecret"); + + String token = "mockToken"; + Map result = TokenHandler.fillTokenExchangeBody(token, mockSdmCredentials); + + assertNotNull(result); + assertEquals(1, result.size()); + assertEquals(token, result.get("assertion")); + } + + @Test + public void testGenerateDITokenFromTokenExchange() throws IOException { + SDMCredentials mockSdmCredentials = Mockito.mock(SDMCredentials.class); + when(mockSdmCredentials.getClientId()).thenReturn("mockClientId"); + when(mockSdmCredentials.getClientSecret()).thenReturn("mockClientSecret"); + when(mockSdmCredentials.getBaseTokenUrl()).thenReturn("https://example.com"); + + String token = "mockToken"; + JsonObject payloadObj = new JsonObject(); + payloadObj.addProperty("email", email); + JsonObject extAttr = new JsonObject(); + extAttr.addProperty("zdn", subdomain); + payloadObj.add("ext_attr", extAttr); + payloadObj.addProperty("exp", "1234567890"); + + CloseableHttpClient mockHttpClient = Mockito.mock(CloseableHttpClient.class); + CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + when(mockResponse.getStatusLine()) + .thenReturn(new BasicStatusLine(HttpVersion.HTTP_1_1, HttpStatus.SC_OK, "OK")); + when(mockResponse.getEntity()).thenReturn(new StringEntity("{\"access_token\":\"mockToken\"}")); + + try (MockedStatic httpClientsMockedStatic = Mockito.mockStatic(HttpClients.class); + MockedStatic tokenHandlerMockedStatic = + Mockito.mockStatic(TokenHandler.class)) { + httpClientsMockedStatic.when(HttpClients::createDefault).thenReturn(mockHttpClient); + when(mockHttpClient.execute(any(HttpPost.class))).thenReturn(mockResponse); + + tokenHandlerMockedStatic + .when(() -> TokenHandler.getTokenFields(token)) + .thenReturn(payloadObj); + + String result = + TokenHandler.generateDITokenFromTokenExchange(token, mockSdmCredentials, payloadObj); + assertEquals(null, result); + } + } + + @Test + public void testExtractResponseBodyAsString() throws IOException { + CloseableHttpResponse mockResponse = Mockito.mock(CloseableHttpResponse.class); + InputStream mockInputStream = + new ByteArrayInputStream("mockResponse".getBytes(StandardCharsets.UTF_8)); + when(mockResponse.getEntity()).thenReturn(new InputStreamEntity(mockInputStream)); + + String result = TokenHandler.extractResponseBodyAsString(mockResponse); + assertEquals("mockResponse", result); + } }