Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions bundle/src/bundle_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ pub enum VersionedBundle {
V0_7_7(BundleMetaV0_7_7),
}

impl VersionedBundle {
pub fn internal_bundled_file(&self) -> Option<BundledFile> {
match self {
Self::V0_7_7(data) => data.internal_bundled_file.clone(),
_ => None,
}
}
}

#[cfg_attr(feature = "wasm", wasm_bindgen(getter_with_clone))]
#[derive(Debug, Clone)]
pub struct VersionedBundleWithBindingsReport {
Expand Down
166 changes: 94 additions & 72 deletions bundle/src/bundler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
path::PathBuf,
};

use anyhow::Context;
use async_compression::futures::bufread::ZstdDecoder;
use async_std::{io::ReadExt, stream::StreamExt};
use async_tar_wasm::Archive;
Expand Down Expand Up @@ -138,49 +139,6 @@ impl<'a> BundlerUtil<'a> {
}
}

/// Reads and decompresses a .tar.zstd file from an input stream into multiple specified files.
pub async fn extract_files_from_tarball<R: AsyncBufRead>(
input: R,
file_names: &[&str],
) -> anyhow::Result<HashMap<String, Vec<u8>>> {
let zstd_decoder = ZstdDecoder::new(Box::pin(input));
let archive = Archive::new(zstd_decoder);
let mut entries = archive.entries()?;

let mut extracted_files = HashMap::new();
let file_names_set: std::collections::HashSet<&str> = file_names.iter().cloned().collect();

while let Some(entry) = entries.next().await {
let mut owned_entry = entry?;
let path_str = owned_entry.path()?.to_str().unwrap_or_default().to_owned();

if file_names_set.contains(path_str.as_str()) {
let mut file_bytes = Vec::new();
owned_entry.read_to_end(&mut file_bytes).await?;
extracted_files.insert(path_str, file_bytes);

if extracted_files.len() == file_names.len() {
break;
}
}
}

let missing_files: Vec<&str> = file_names
.iter()
.filter(|&name| !extracted_files.contains_key(*name))
.cloned()
.collect();

if !missing_files.is_empty() {
return Err(anyhow::anyhow!(
"Files not found in tarball: {:?}",
missing_files
));
}

Ok(extracted_files)
}

pub fn parse_meta(meta_bytes: Vec<u8>) -> anyhow::Result<VersionedBundle> {
if let Ok(message) = serde_json::from_slice(&meta_bytes) {
return Ok(VersionedBundle::V0_7_7(message));
Expand Down Expand Up @@ -250,36 +208,46 @@ pub fn bin_parse(bin: &Vec<u8>) -> anyhow::Result<TestReport> {
pub async fn parse_internal_bin_from_tarball<R: AsyncBufRead>(
input: R,
) -> anyhow::Result<TestReport> {
let extracted_files = extract_files_from_tarball(input, &[INTERNAL_BIN_FILENAME]).await?;
if let Some(internal_bin_bytes) = extracted_files.get(INTERNAL_BIN_FILENAME) {
return bin_parse(internal_bin_bytes);
}

Err(anyhow::anyhow!(
"No {} file found in the tarball",
INTERNAL_BIN_FILENAME
))
let (internal_bin, _) = parse_internal_bin_and_meta_from_tarball(input).await?;
Ok(internal_bin)
}

pub async fn parse_internal_bin_and_meta_from_tarball<R: AsyncBufRead>(
input: R,
) -> anyhow::Result<(TestReport, VersionedBundle)> {
let extracted_files =
extract_files_from_tarball(input, &[META_FILENAME, INTERNAL_BIN_FILENAME]).await?;

let internal_bin_bytes = extracted_files
.get(INTERNAL_BIN_FILENAME)
.ok_or_else(|| anyhow::anyhow!("No {} file found in the tarball", INTERNAL_BIN_FILENAME))?;
let zstd_decoder = ZstdDecoder::new(Box::pin(input));
let archive = Archive::new(zstd_decoder);
let mut entries = archive.entries()?;
let mut file_entries = HashMap::new();

let test_report = bin_parse(internal_bin_bytes)
.map_err(|err| anyhow::anyhow!("Failed to decode {}: {}", INTERNAL_BIN_FILENAME, err))?;
while let Some(entry) = entries.next().await {
let mut unwrapped_entry = entry?;
let path_str = unwrapped_entry
.path()?
.to_str()
.unwrap_or_default()
.to_owned();
let mut bytes = Vec::new();
unwrapped_entry.read_to_end(&mut bytes).await?;
file_entries.insert(path_str, bytes);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we reading through every file? We know that we care about meta.json and we also know that the internal.bin will have internal in its name (whether that be the directory or the filename. Can we pre-filter based on that so we don't read everything? I'm concerned about some of our customers with large uploads.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was encountering issues passing around the entries, but this actually does cause issues for the context-py tests, so I'm going to need to switch away from it.


let meta_bytes = extracted_files
let meta_bytes = file_entries
.get(META_FILENAME)
.ok_or_else(|| anyhow::anyhow!("No {} file found in the tarball", META_FILENAME))?;

.context(format!("No {} file found in the tarball", META_FILENAME))?;
let versioned_bundle = parse_meta(meta_bytes.to_vec())?;

let internal_bin_filename = versioned_bundle
.internal_bundled_file()
.map(|bf| bf.path)
.unwrap_or(INTERNAL_BIN_FILENAME.to_string());
let internal_bin_bytes = file_entries.get(&internal_bin_filename).context(format!(
"No {} file found in the tarball",
internal_bin_filename
))?;
let test_report = bin_parse(internal_bin_bytes)
.map_err(|err| anyhow::anyhow!("Failed to decode {}: {}", internal_bin_filename, err))?;

Ok((test_report, versioned_bundle))
}

Expand All @@ -292,17 +260,21 @@ mod tests {
};

use context::repo::BundleRepo;
use proto::test_context::test_run::UploaderMetadata;
use tempfile::tempdir;

use super::*;
use crate::bundle_meta::{
BundleMeta, BundleMetaBaseProps, BundleMetaDebugProps, BundleMetaJunitProps, META_VERSION,
};
use crate::files::{FileSet, FileSetType};
use crate::Test;
use crate::{
bundle_meta::{
BundleMeta, BundleMetaBaseProps, BundleMetaDebugProps, BundleMetaJunitProps,
META_VERSION,
},
BundledFile,
};

#[tokio::test]
pub async fn test_bundle_meta_is_first_entry() {
fn create_bundle_meta(bundled_file: Option<BundledFile>) -> BundleMeta {
let mut repo = BundleRepo::default();
let upload_time_epoch = SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand All @@ -312,7 +284,7 @@ mod tests {
repo.repo.name = "repo".to_string();
let mut envs: HashMap<String, String> = HashMap::new();
envs.insert("key".to_string(), "value".to_string());
let meta = BundleMeta {
BundleMeta {
junit_props: BundleMetaJunitProps::default(),
bundle_upload_id_v2: String::with_capacity(0),
debug_props: BundleMetaDebugProps {
Expand Down Expand Up @@ -348,8 +320,13 @@ mod tests {
codeowners: None,
envs,
},
internal_bundled_file: None,
};
internal_bundled_file: bundled_file,
}
}

#[tokio::test]
pub async fn test_bundle_meta_is_first_entry() {
let meta = create_bundle_meta(None);
let bundler_util = BundlerUtil::new(&meta, None);
let temp_dir = tempdir().unwrap();
let bundle_path = temp_dir.path().join(BUNDLE_FILE_NAME);
Expand All @@ -372,7 +349,10 @@ mod tests {
assert_eq!(meta.base_props.cli_version, "0.0.1");
assert_eq!(meta.base_props.bundle_upload_id, "00");
assert_eq!(meta.base_props.file_sets.len(), 1);
assert_eq!(meta.base_props.upload_time_epoch, upload_time_epoch);
assert_eq!(
meta.base_props.upload_time_epoch,
meta.base_props.upload_time_epoch
);
assert_eq!(meta.base_props.test_command, Some("exit 1".to_string()));
assert_eq!(meta.base_props.quarantined_tests.len(), 1);
assert_eq!(meta.base_props.os_info, Some(env::consts::OS.to_string()));
Expand All @@ -383,4 +363,46 @@ mod tests {
_ => panic!("Expected V0_7_7 versioned bundle"),
}
}

#[tokio::test]
pub async fn test_nondefault_internal_bin_path() {
let temp_dir = tempdir().unwrap();
let bin_path = "new_bin_file.bin".to_string();
let full_bin_path = temp_dir.path().join(bin_path.clone());

let test_report = TestReport {
test_results: Vec::new(),
uploader_metadata: Some(UploaderMetadata {
version: "v1".to_string(),
origin: "A test".to_string(),
upload_time: None,
variant: "A variant".to_string(),
}),
};
let mut buf = Vec::new();
prost::Message::encode(&test_report, &mut buf).unwrap();
std::fs::write(&full_bin_path, buf).unwrap();

let bundled_file = Some(BundledFile {
original_path: full_bin_path.to_str().unwrap().to_string(),
original_path_rel: None,
path: bin_path,
owners: Vec::new(),
team: None,
});
let meta = create_bundle_meta(bundled_file.clone());
let bundler_util = BundlerUtil::new(&meta, None);
let bundle_path = temp_dir.path().join(BUNDLE_FILE_NAME);

assert!(bundler_util.make_tarball(&bundle_path).is_ok());
assert!(bundle_path.exists());

let tarball_file = async_std::fs::File::open(&bundle_path).await.unwrap();
let reader = async_std::io::BufReader::new(tarball_file);

let data = parse_internal_bin_and_meta_from_tarball(reader).await;
let (internal_bin, parsed_meta) = data.unwrap();
assert_eq!(parsed_meta.internal_bundled_file(), bundled_file);
assert_eq!(internal_bin, test_report);
}
}
Loading