Skip to content

Add search index #4306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 27 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
371 changes: 371 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ tokio-stream = "0.1.9"
tokio-util = {version = "0.7.3", features = ["compat"] }
tower-http = { version = "0.6.2", features = ["auth", "compression-br", "compression-gzip", "cors", "set-header"] }
urlencoding = "2.1.3"
tantivy = "0.22.0"

[dev-dependencies]
criterion = "0.5.1"
Expand Down
18 changes: 18 additions & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1941,6 +1941,24 @@ impl Index {
Ok((inscriptions, more))
}

pub fn get_inscriptions(&self) -> Result<Vec<InscriptionId>> {
let rtx = self.database.begin_read()?;

let sequence_number_to_inscription_entry =
rtx.open_table(SEQUENCE_NUMBER_TO_INSCRIPTION_ENTRY)?;

let inscriptions = sequence_number_to_inscription_entry
.iter()?
.map(|result| {
result
.map(|(_number, entry)| InscriptionEntry::load(entry.value()).id)
.map_err(|err| anyhow!(err))
})
.collect::<Result<Vec<InscriptionId>>>()?;

Ok(inscriptions)
}

pub fn get_inscriptions_in_block(&self, block_height: u32) -> Result<Vec<InscriptionId>> {
let rtx = self.database.begin_read()?;

Expand Down
11 changes: 11 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use {
properties::Properties,
representation::Representation,
satscard::Satscard,
search_index::SearchIndex,
settings::Settings,
signer::Signer,
subcommand::{OutputFormat, Subcommand, SubcommandResult},
Expand Down Expand Up @@ -132,6 +133,7 @@ mod re;
mod representation;
pub mod runes;
mod satscard;
mod search_index;
pub mod settings;
mod signer;
pub mod subcommand;
Expand All @@ -148,6 +150,7 @@ const TARGET_POSTAGE: Amount = Amount::from_sat(10_000);
static SHUTTING_DOWN: AtomicBool = AtomicBool::new(false);
static LISTENERS: Mutex<Vec<axum_server::Handle>> = Mutex::new(Vec::new());
static INDEXER: Mutex<Option<thread::JoinHandle<()>>> = Mutex::new(None);
static SEARCH_INDEXER: Mutex<Option<thread::JoinHandle<()>>> = Mutex::new(None);

#[doc(hidden)]
#[derive(Deserialize, Serialize)]
Expand Down Expand Up @@ -279,6 +282,14 @@ fn gracefully_shut_down_indexer() {
log::warn!("Index thread panicked; join failed");
}
}

if let Some(search_indexer) = SEARCH_INDEXER.lock().unwrap().take() {
shut_down();
log::info!("Waiting for search index thread to finish...");
if search_indexer.join().is_err() {
log::warn!("Search index thread panicked; join failed");
}
Comment on lines +289 to +291
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just good for debuggability:

Suggested change
if search_indexer.join().is_err() {
log::warn!("Search index thread panicked; join failed");
}
if let Err(err) = search_indexer.join() {
log::warn!("Search index thread panicked: {err}");
}

}
}

pub fn main() {
Expand Down
2 changes: 2 additions & 0 deletions src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub struct Options {
help = "Do not index inscriptions."
)]
pub(crate) no_index_inscriptions: bool,
#[arg(long, help = "Use search index at <SEARCH_INDEX>.")]
pub(crate) search_index: Option<PathBuf>,
Comment on lines +76 to +77
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doesn't even need to be configurable initially. We could just put it into the ord data dir.

#[arg(
long,
help = "Require basic HTTP authentication with <SERVER_PASSWORD>. Credentials are sent in cleartext. Consider using authentication in conjunction with HTTPS."
Expand Down
210 changes: 210 additions & 0 deletions src/search_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use {
super::*,
crate::subcommand::server::query,
tantivy::{
collector::{Count, TopDocs},
directory::MmapDirectory,
query::QueryParser,
schema::{
document::OwnedValue, DateOptions, DateTimePrecision, Field, Schema as TantivySchema,
INDEXED, STORED, STRING,
},
DateTime, Index as TantivyIndex, IndexReader, IndexWriter, ReloadPolicy, TantivyDocument,
},
};

#[derive(Clone)]
struct Schema {
inscription_id: Field,
charm: Field,
sat_name: Field,
timestamp: Field,
}

impl Schema {
fn default_search_fields(&self) -> Vec<Field> {
vec![
self.inscription_id,
self.charm,
self.sat_name,
self.timestamp,
]
}

fn search_result(&self, document: &TantivyDocument) -> Option<SearchResult> {
let inscription_id = document.get_first(self.inscription_id).and_then(|value| {
if let OwnedValue::Str(id_str) = value {
Some(id_str)
} else {
None
}
})?;

let timestamp = document.get_first(self.timestamp).and_then(|value| {
if let OwnedValue::Date(date) = value {
Some(date)
} else {
None
}
})?;

Some(SearchResult {
inscription_id: inscription_id.parse().ok()?,
timestamp: *timestamp,
})
}

fn query_parser(&self, search_index: &TantivyIndex) -> QueryParser {
let mut query_parser = QueryParser::for_index(search_index, self.default_search_fields());
query_parser.set_conjunction_by_default();
query_parser
}
}

#[derive(Clone)]
pub struct SearchIndex {
ord_index: Arc<Index>,
reader: IndexReader,
schema: Schema,
search_index: TantivyIndex,
writer: Arc<Mutex<IndexWriter>>,
}

#[derive(Eq, Hash, PartialEq)]
pub struct SearchResult {
pub inscription_id: InscriptionId,
pub timestamp: DateTime,
}

impl SearchIndex {
pub fn open(index: Arc<Index>, settings: &Settings) -> Result<Self> {
let mut schema_builder = TantivySchema::builder();

let schema = Schema {
inscription_id: schema_builder.add_text_field("inscription_id", STRING | STORED),
charm: schema_builder.add_text_field("charm", STRING),
sat_name: schema_builder.add_text_field("sat_name", STRING),
timestamp: schema_builder.add_date_field(
"timestamp",
DateOptions::from(INDEXED)
.set_stored()
.set_fast()
.set_precision(DateTimePrecision::Seconds),
),
};

let path = settings.search_index().to_owned();

fs::create_dir_all(&path).snafu_context(error::Io { path: path.clone() })?;

let search_index =
TantivyIndex::open_or_create(MmapDirectory::open(path)?, schema_builder.build())?;

let reader = search_index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommitWithDelay)
.try_into()?;

let writer = search_index.writer(50_000_000)?;

Ok(Self {
ord_index: index,
reader,
schema,
search_index,
writer: Arc::new(Mutex::new(writer)),
})
}

pub fn search(&self, query: &str) -> Result<Vec<SearchResult>> {
let searcher = self.reader.searcher();

let query = self
.schema
.query_parser(&self.search_index)
.parse_query(query)?;

let unique_results = searcher
.search(&query, &TopDocs::with_limit(100))?
.iter()
.filter_map(|(_score, doc_address)| {
self
.schema
.search_result(&searcher.doc::<TantivyDocument>(*doc_address).ok()?)
})
.collect::<HashSet<SearchResult>>();

let mut results = unique_results.into_iter().collect::<Vec<SearchResult>>();

results.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));

Ok(results)
}

pub(crate) fn update(&self) -> Result {
let mut indexed_inscriptions = Vec::new();

loop {
for inscription_id in self.ord_index.get_inscriptions()? {
if !indexed_inscriptions.contains(&inscription_id) {
self.index_inscription(inscription_id)?;
indexed_inscriptions.push(inscription_id);
}

if SHUTTING_DOWN.load(atomic::Ordering::Relaxed) {
return Ok(());
}
}
}
}

fn index_inscription(&self, inscription_id: InscriptionId) -> Result {
let searcher = self.reader.searcher();

let query = self
.schema
.query_parser(&self.search_index)
.parse_query(&format!("inscription_id:{inscription_id}"))?;

if searcher.search(&query, &Count)? > 0 {
return Ok(());
}

let (inscription, _, _) = self
.ord_index
.inscription_info(query::Inscription::Id(inscription_id), None)?
.ok_or(anyhow!(format!(
"failed to get info for inscription with id `{inscription_id}`"
)))?;

let mut writer = self.writer.lock().unwrap();

let mut document = TantivyDocument::default();

document.add_text(self.schema.inscription_id, inscription.id.to_string());

for charm in inscription.charms {
document.add_text(self.schema.charm, charm);
}

if let Some(sat) = inscription.sat {
document.add_text(self.schema.sat_name, sat.name());
}

document.add_date(
self.schema.timestamp,
DateTime::from_timestamp_secs(inscription.timestamp),
);

writer.add_document(document)?;

writer.commit()?;

log::info!(
"Indexed inscription with id `{}` to search index",
inscription_id
);

Ok(())
}
}
Loading
Loading