Skip to content

Add search index #4306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 27 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
371 changes: 371 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ tokio-stream = "0.1.9"
tokio-util = {version = "0.7.3", features = ["compat"] }
tower-http = { version = "0.6.2", features = ["auth", "compression-br", "compression-gzip", "cors", "set-header"] }
urlencoding = "2.1.3"
tantivy = "0.22.0"

[dev-dependencies]
criterion = "0.5.1"
Expand Down
18 changes: 18 additions & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1941,6 +1941,24 @@ impl Index {
Ok((inscriptions, more))
}

pub fn get_inscriptions_by_sequence_range(
&self,
start_sequence: u32,
end_sequence: u32,
) -> Result<Vec<InscriptionId>> {
let rtx = self.database.begin_read()?;

let sequence_number_to_inscription_entry =
rtx.open_table(SEQUENCE_NUMBER_TO_INSCRIPTION_ENTRY)?;

let inscriptions = sequence_number_to_inscription_entry
.range(start_sequence..=end_sequence)?
.map(|result| result.map(|(_number, entry)| InscriptionEntry::load(entry.value()).id))
.collect::<Result<Vec<InscriptionId>, StorageError>>()?;

Ok(inscriptions)
}

pub fn get_inscriptions_in_block(&self, block_height: u32) -> Result<Vec<InscriptionId>> {
let rtx = self.database.begin_read()?;

Expand Down
11 changes: 11 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use {
properties::Properties,
representation::Representation,
satscard::Satscard,
search_index::SearchIndex,
settings::Settings,
signer::Signer,
subcommand::{OutputFormat, Subcommand, SubcommandResult},
Expand Down Expand Up @@ -132,6 +133,7 @@ mod re;
mod representation;
pub mod runes;
mod satscard;
mod search_index;
pub mod settings;
mod signer;
pub mod subcommand;
Expand All @@ -148,6 +150,7 @@ const TARGET_POSTAGE: Amount = Amount::from_sat(10_000);
static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false);
static LISTENERS: Mutex<Vec<axum_server::Handle>> = Mutex::new(Vec::new());
static INDEXER: Mutex<Option<thread::JoinHandle<()>>> = Mutex::new(None);
static SEARCH_INDEXER: Mutex<Option<thread::JoinHandle<()>>> = Mutex::new(None);

#[doc(hidden)]
#[derive(Deserialize, Serialize)]
Expand Down Expand Up @@ -279,6 +282,14 @@ fn gracefully_shut_down_indexer() {
log::warn!("Index thread panicked; join failed");
}
}

if let Some(search_indexer) = SEARCH_INDEXER.lock().unwrap().take() {
shut_down();
log::info!("Waiting for search index thread to finish...");
if search_indexer.join().is_err() {
log::warn!("Search index thread panicked; join failed");
}
Comment on lines +289 to +291
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just good for debuggability:

Suggested change
if search_indexer.join().is_err() {
log::warn!("Search index thread panicked; join failed");
}
if let Err(err) = search_indexer.join() {
log::warn!("Search index thread panicked: {err}");
}

}
}

pub fn main() {
Expand Down
2 changes: 2 additions & 0 deletions src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub struct Options {
help = "Do not index inscriptions."
)]
pub(crate) no_index_inscriptions: bool,
#[arg(long, help = "Use search index at <SEARCH_INDEX>.")]
pub(crate) search_index: Option<PathBuf>,
Comment on lines +76 to +77
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doesn't even need to be configurable initially. We could just put it into the ord data dir.

#[arg(
long,
help = "Require basic HTTP authentication with <SERVER_PASSWORD>. Credentials are sent in cleartext. Consider using authentication in conjunction with HTTPS."
Expand Down
206 changes: 206 additions & 0 deletions src/search_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use {
super::*,
crate::subcommand::server::query,
tantivy::{
collector::{Count, TopDocs},
directory::MmapDirectory,
query::QueryParser,
schema::{
document::OwnedValue, DateOptions, DateTimePrecision, Field, Schema as TantivySchema,
INDEXED, STORED, STRING,
},
DateTime, Index as TantivyIndex, IndexReader, IndexWriter, ReloadPolicy, TantivyDocument,
},
};

#[derive(Clone)]
struct Schema {
inscription_id: Field,
charm: Field,
sat_name: Field,
timestamp: Field,
}

impl Schema {
fn default_search_fields(&self) -> Vec<Field> {
vec![
self.inscription_id,
self.charm,
self.sat_name,
self.timestamp,
]
}

fn search_result(&self, document: &TantivyDocument) -> Option<SearchResult> {
let inscription_id = document.get_first(self.inscription_id).and_then(|value| {
if let OwnedValue::Str(id_str) = value {
Some(id_str)
} else {
None
}
})?;

Some(SearchResult {
inscription_id: inscription_id.parse().ok()?,
})
}
}

#[derive(Clone)]
pub struct SearchIndex {
ord_index: Arc<Index>,
reader: IndexReader,
schema: Schema,
search_index: TantivyIndex,
writer: Arc<Mutex<IndexWriter>>,
}

#[derive(Eq, Hash, PartialEq)]
pub struct SearchResult {
pub inscription_id: InscriptionId,
}

impl SearchIndex {
pub fn open(index: Arc<Index>, settings: &Settings) -> Result<Self> {
let mut schema_builder = TantivySchema::builder();

let schema = Schema {
inscription_id: schema_builder.add_text_field("inscription_id", STRING | STORED),
charm: schema_builder.add_text_field("charm", STRING),
sat_name: schema_builder.add_text_field("sat_name", STRING),
timestamp: schema_builder.add_date_field(
"timestamp",
DateOptions::from(INDEXED)
.set_fast()
.set_precision(DateTimePrecision::Seconds),
),
};

let path = settings.search_index().to_owned();

fs::create_dir_all(&path).snafu_context(error::Io { path: path.clone() })?;

let search_index =
TantivyIndex::open_or_create(MmapDirectory::open(path)?, schema_builder.build())?;

let reader = search_index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommitWithDelay)
.try_into()?;

let writer = search_index.writer(50_000_000)?;

Ok(Self {
ord_index: index,
reader,
schema,
search_index,
writer: Arc::new(Mutex::new(writer)),
})
}

pub fn search(&self, query: &str) -> Result<Vec<SearchResult>> {
let searcher = self.reader.searcher();

Ok(
searcher
.search(
&self.query_parser().parse_query(query)?,
&TopDocs::with_limit(100),
)?
.iter()
.filter_map(|(_score, doc_address)| {
self
.schema
.search_result(&searcher.doc::<TantivyDocument>(*doc_address).ok()?)
})
.collect(),
)
}

pub fn update(&self) -> Result {
let batch_size = 100;

let mut starting_sequence_number = 0;

let mut writer = self.writer.lock().unwrap();

loop {
let batch = self.ord_index.get_inscriptions_by_sequence_range(
starting_sequence_number,
starting_sequence_number + batch_size,
)?;

if batch.is_empty() {
return Ok(());
}

for inscription_id in batch {
self.add_inscription(inscription_id, &mut writer)?;

if SHUTTING_DOWN.load(atomic::Ordering::Relaxed) {
writer.commit()?;
return Ok(());
}
}

writer.commit()?;

starting_sequence_number += batch_size;
}
}

fn add_inscription(&self, inscription_id: InscriptionId, writer: &mut IndexWriter) -> Result {
let searcher = self.reader.searcher();

let query = self
.query_parser()
.parse_query(&format!("inscription_id:{inscription_id}"))?;

if searcher.search(&query, &Count)? > 0 {
return Ok(());
}

let (inscription, _, _) = self
.ord_index
.inscription_info(query::Inscription::Id(inscription_id), None)?
.ok_or(anyhow!(format!(
"failed to get info for inscription with id `{inscription_id}`"
)))?;

let mut document = TantivyDocument::default();

document.add_text(self.schema.inscription_id, inscription.id.to_string());

for charm in inscription.charms {
document.add_text(self.schema.charm, charm);
}

if let Some(sat) = inscription.sat {
document.add_text(self.schema.sat_name, sat.name());
}

document.add_date(
self.schema.timestamp,
DateTime::from_timestamp_secs(inscription.timestamp),
);

writer.add_document(document)?;

log::info!(
"Added inscription with id `{}` to search index",
inscription_id
);

Ok(())
}

fn query_parser(&self) -> QueryParser {
let mut query_parser =
QueryParser::for_index(&self.search_index, self.schema.default_search_fields());

query_parser.set_conjunction_by_default();

query_parser
}
}
Loading