Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions bundle/src/bundle_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ pub enum VersionedBundle {
V0_7_7(BundleMetaV0_7_7),
}

impl VersionedBundle {
pub fn internal_bundled_file(&self) -> Option<BundledFile> {
match self {
Self::V0_7_7(data) => data.internal_bundled_file.clone(),
_ => None,
}
}
}

#[cfg_attr(feature = "wasm", wasm_bindgen(getter_with_clone))]
#[derive(Debug, Clone)]
pub struct VersionedBundleWithBindingsReport {
Expand Down
187 changes: 118 additions & 69 deletions bundle/src/bundler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use async_compression::futures::bufread::ZstdDecoder;
use async_std::{io::ReadExt, stream::StreamExt};
use async_tar_wasm::Archive;
use async_tar_wasm::{Archive, Entries};
use codeowners::CodeOwners;
use context::bazel_bep::common::BepParseResult;
use futures_io::AsyncBufRead;
Expand Down Expand Up @@ -138,49 +138,6 @@ impl<'a> BundlerUtil<'a> {
}
}

/// Reads and decompresses a .tar.zstd file from an input stream into multiple specified files.
pub async fn extract_files_from_tarball<R: AsyncBufRead>(
input: R,
file_names: &[&str],
) -> anyhow::Result<HashMap<String, Vec<u8>>> {
let zstd_decoder = ZstdDecoder::new(Box::pin(input));
let archive = Archive::new(zstd_decoder);
let mut entries = archive.entries()?;

let mut extracted_files = HashMap::new();
let file_names_set: std::collections::HashSet<&str> = file_names.iter().cloned().collect();

while let Some(entry) = entries.next().await {
let mut owned_entry = entry?;
let path_str = owned_entry.path()?.to_str().unwrap_or_default().to_owned();

if file_names_set.contains(path_str.as_str()) {
let mut file_bytes = Vec::new();
owned_entry.read_to_end(&mut file_bytes).await?;
extracted_files.insert(path_str, file_bytes);

if extracted_files.len() == file_names.len() {
break;
}
}
}

let missing_files: Vec<&str> = file_names
.iter()
.filter(|&name| !extracted_files.contains_key(*name))
.cloned()
.collect();

if !missing_files.is_empty() {
return Err(anyhow::anyhow!(
"Files not found in tarball: {:?}",
missing_files
));
}

Ok(extracted_files)
}

pub fn parse_meta(meta_bytes: Vec<u8>) -> anyhow::Result<VersionedBundle> {
if let Ok(message) = serde_json::from_slice(&meta_bytes) {
return Ok(VersionedBundle::V0_7_7(message));
Expand Down Expand Up @@ -250,35 +207,74 @@ pub fn bin_parse(bin: &Vec<u8>) -> anyhow::Result<TestReport> {
pub async fn parse_internal_bin_from_tarball<R: AsyncBufRead>(
input: R,
) -> anyhow::Result<TestReport> {
let extracted_files = extract_files_from_tarball(input, &[INTERNAL_BIN_FILENAME]).await?;
if let Some(internal_bin_bytes) = extracted_files.get(INTERNAL_BIN_FILENAME) {
return bin_parse(internal_bin_bytes);
let (internal_bin, _) = parse_internal_bin_and_meta_from_tarball(input).await?;
Ok(internal_bin)
}

async fn find_internal_bin_in_entries<R: futures_io::AsyncRead + Unpin>(
mut entries: Entries<R>,
target_name: String,
) -> anyhow::Result<TestReport> {
while let Some(entry) = entries.next().await {
let mut unwrapped_entry = entry?;
let path_str = unwrapped_entry
.path()?
.to_str()
.unwrap_or_default()
.to_owned();
if path_str == target_name {
let mut bytes = Vec::new();
unwrapped_entry.read_to_end(&mut bytes).await?;
return bin_parse(&bytes)
.map_err(|err| anyhow::anyhow!("Failed to decode {}: {}", target_name, err));
}
}

Err(anyhow::anyhow!(
anyhow::Result::Err(anyhow::anyhow!(
"No {} file found in the tarball",
INTERNAL_BIN_FILENAME
target_name
))
}

pub async fn parse_internal_bin_and_meta_from_tarball<R: AsyncBufRead>(
input: R,
) -> anyhow::Result<(TestReport, VersionedBundle)> {
let extracted_files =
extract_files_from_tarball(input, &[META_FILENAME, INTERNAL_BIN_FILENAME]).await?;
let zstd_decoder = ZstdDecoder::new(Box::pin(input));
let archive = Archive::new(zstd_decoder);
let mut entries = archive.entries()?;

let internal_bin_bytes = extracted_files
.get(INTERNAL_BIN_FILENAME)
.ok_or_else(|| anyhow::anyhow!("No {} file found in the tarball", INTERNAL_BIN_FILENAME))?;
let versioned_bundle_result = if let Some(first_entry) = entries.next().await {
let mut owned_first_entry = first_entry?;
let path_str = owned_first_entry
.path()?
.to_str()
.unwrap_or_default()
.to_owned();

let test_report = bin_parse(internal_bin_bytes)
.map_err(|err| anyhow::anyhow!("Failed to decode {}: {}", INTERNAL_BIN_FILENAME, err))?;
if path_str == META_FILENAME {
let mut meta_bytes = Vec::new();
owned_first_entry.read_to_end(&mut meta_bytes).await?;

let meta_bytes = extracted_files
.get(META_FILENAME)
.ok_or_else(|| anyhow::anyhow!("No {} file found in the tarball", META_FILENAME))?;
parse_meta(meta_bytes)
} else {
anyhow::Result::Err(anyhow::anyhow!(
"No {} file found in the tarball",
META_FILENAME
))
}
} else {
anyhow::Result::Err(anyhow::anyhow!(
"No {} file found in the tarball",
META_FILENAME
))
};
let versioned_bundle = versioned_bundle_result?;

let versioned_bundle = parse_meta(meta_bytes.to_vec())?;
let internal_bin_filename = versioned_bundle
.internal_bundled_file()
.map(|bf| bf.path)
.unwrap_or(INTERNAL_BIN_FILENAME.to_string());
let test_report = find_internal_bin_in_entries(entries, internal_bin_filename).await?;

Ok((test_report, versioned_bundle))
}
Expand All @@ -292,17 +288,21 @@ mod tests {
};

use context::repo::BundleRepo;
use proto::test_context::test_run::UploaderMetadata;
use tempfile::tempdir;

use super::*;
use crate::bundle_meta::{
BundleMeta, BundleMetaBaseProps, BundleMetaDebugProps, BundleMetaJunitProps, META_VERSION,
};
use crate::files::{FileSet, FileSetType};
use crate::Test;
use crate::{
bundle_meta::{
BundleMeta, BundleMetaBaseProps, BundleMetaDebugProps, BundleMetaJunitProps,
META_VERSION,
},
BundledFile,
};

#[tokio::test]
pub async fn test_bundle_meta_is_first_entry() {
fn create_bundle_meta(bundled_file: Option<BundledFile>) -> BundleMeta {
let mut repo = BundleRepo::default();
let upload_time_epoch = SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand All @@ -312,7 +312,7 @@ mod tests {
repo.repo.name = "repo".to_string();
let mut envs: HashMap<String, String> = HashMap::new();
envs.insert("key".to_string(), "value".to_string());
let meta = BundleMeta {
BundleMeta {
junit_props: BundleMetaJunitProps::default(),
bundle_upload_id_v2: String::with_capacity(0),
debug_props: BundleMetaDebugProps {
Expand Down Expand Up @@ -348,8 +348,13 @@ mod tests {
codeowners: None,
envs,
},
internal_bundled_file: None,
};
internal_bundled_file: bundled_file,
}
}

#[tokio::test]
pub async fn test_bundle_meta_is_first_entry() {
let meta = create_bundle_meta(None);
let bundler_util = BundlerUtil::new(&meta, None);
let temp_dir = tempdir().unwrap();
let bundle_path = temp_dir.path().join(BUNDLE_FILE_NAME);
Expand All @@ -372,7 +377,10 @@ mod tests {
assert_eq!(meta.base_props.cli_version, "0.0.1");
assert_eq!(meta.base_props.bundle_upload_id, "00");
assert_eq!(meta.base_props.file_sets.len(), 1);
assert_eq!(meta.base_props.upload_time_epoch, upload_time_epoch);
assert_eq!(
meta.base_props.upload_time_epoch,
meta.base_props.upload_time_epoch
);
assert_eq!(meta.base_props.test_command, Some("exit 1".to_string()));
assert_eq!(meta.base_props.quarantined_tests.len(), 1);
assert_eq!(meta.base_props.os_info, Some(env::consts::OS.to_string()));
Expand All @@ -383,4 +391,45 @@ mod tests {
_ => panic!("Expected V0_7_7 versioned bundle"),
}
}

#[tokio::test]
pub async fn test_nondefault_internal_bin_path() {
let temp_dir = tempdir().unwrap();
let bin_path = "new_bin_file.bin".to_string();
let full_bin_path = temp_dir.path().join(bin_path.clone());

let test_report = TestReport {
test_results: Vec::new(),
uploader_metadata: Some(UploaderMetadata {
version: "v1".to_string(),
origin: "A test".to_string(),
upload_time: None,
variant: "A variant".to_string(),
}),
};
let mut buf = Vec::new();
prost::Message::encode(&test_report, &mut buf).unwrap();
std::fs::write(&full_bin_path, buf).unwrap();

let bundled_file = Some(BundledFile {
original_path: full_bin_path.to_str().unwrap().to_string(),
original_path_rel: None,
path: bin_path,
..Default::default()
});
let meta = create_bundle_meta(bundled_file.clone());
let bundler_util = BundlerUtil::new(&meta, None);
let bundle_path = temp_dir.path().join(BUNDLE_FILE_NAME);

assert!(bundler_util.make_tarball(&bundle_path).is_ok());
assert!(bundle_path.exists());

let tarball_file = async_std::fs::File::open(&bundle_path).await.unwrap();
let reader = async_std::io::BufReader::new(tarball_file);

let data = parse_internal_bin_and_meta_from_tarball(reader).await;
let (internal_bin, parsed_meta) = data.unwrap();
assert_eq!(parsed_meta.internal_bundled_file(), bundled_file);
assert_eq!(internal_bin, test_report);
}
}
18 changes: 15 additions & 3 deletions context-js/tests/parse_compressed_bundle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,14 @@ describe("context-js", () => {
it("decompresses and parses internal.bin", async () => {
expect.hasAssertions();

const uploadMeta = generateBundleMeta();
const metaInfoJson = JSON.stringify(
uploadMeta,
bundleMetaJsonSerializer,
2,
);
const readableStream = await compressAndUploadMeta({
metaInfoJson: "{}",
metaInfoJson,
includeInternalBin: RUBY_INTERNAL_BIN,
});

Expand Down Expand Up @@ -403,14 +409,20 @@ describe("context-js", () => {

await expect(
parse_internal_bin_and_meta_from_tarball(readableStream),
).rejects.toThrow('Files not found in tarball: ["internal.bin"]');
).rejects.toThrow("No internal.bin file found in the tarball");
});

it("correctly gets and sets variant", async () => {
expect.hasAssertions();

const uploadMeta = generateBundleMeta();
const metaInfoJson = JSON.stringify(
uploadMeta,
bundleMetaJsonSerializer,
2,
);
const readableStream = await compressAndUploadMeta({
metaInfoJson: "{}",
metaInfoJson,
includeInternalBin: VARIANT_INTERNAL_BIN,
});

Expand Down
Loading