Skip to content
This repository was archived by the owner on Jul 16, 2024. It is now read-only.

Commit e8632ab

Browse files
committed
Pinecone + OpenAI demo with a convenient trait (..App) + a script to delete all Pinecone indexes
1 parent 315fc9d commit e8632ab

File tree

4 files changed

+217
-39
lines changed

4 files changed

+217
-39
lines changed

src/main/scala/io/cequence/pineconeopenai/demo/CreateIndex.scala

Lines changed: 0 additions & 22 deletions
This file was deleted.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.cequence.pineconeopenai.demo
2+
3+
import scala.concurrent.Future
4+
5+
/**
6+
* This demo deletes all indexes. Be careful!
7+
*
8+
* The following env. variables are expected:
9+
* - PINECONE_SCALA_CLIENT_API_KEY
10+
* - PINECONE_SCALA_CLIENT_ENV
11+
*/
12+
object DeleteAllPineconeIndexes extends PineconeOpenAIDemoApp {
13+
14+
override protected def exec =
15+
pineconeIndexService.listIndexes.flatMap { indexes =>
16+
Future.sequence(
17+
indexes.map(pineconeIndexService.deleteIndex)
18+
)
19+
}.map(_ => ())
20+
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package io.cequence.pineconeopenai.demo
2+
3+
import akka.stream.scaladsl.{Flow, Sink, Source}
4+
import io.cequence.openaiscala.domain.ModelId
5+
import io.cequence.openaiscala.domain.settings.CreateEmbeddingsSettings
6+
import io.cequence.pineconescala.domain.PVector
7+
import io.cequence.pineconescala.domain.response.CreateResponse
8+
import io.cequence.pineconescala.domain.settings.QuerySettings
9+
import io.cequence.pineconescala.service.PineconeVectorService
10+
11+
import scala.concurrent.Future
12+
import play.api.libs.json.Json
13+
14+
/**
15+
* Run me!
16+
*
17+
* Based on <a href="https://docs.pinecone.io/docs/openai">Pinecone OpenAI Tutorial</a>.
18+
*
19+
* The following env. variables are expected:
20+
* - PINECONE_SCALA_CLIENT_API_KEY
21+
* - PINECONE_SCALA_CLIENT_ENV
22+
* - OPENAI_SCALA_CLIENT_API_KEY
23+
* - OPENAI_SCALA_CLIENT_ORG_ID (optional)
24+
*
25+
* Note: If you can't create a new index in Pinecone because it exceeds your quota (free account can have only one index),
26+
* you can run [[DeleteAllPineconeIndexes]] to start from a clean slate.
27+
*/
28+
object PineconeOpenAIDemo extends PineconeOpenAIDemoApp {
29+
30+
private val indexName = "openai"
31+
private val namespace = "default"
32+
private val batchSize = 32 // process everything in batches of 32
33+
private val parallelism = 1 // no rush, do it in sequence
34+
private val indexSettings = DefaultSettings.CreateIndex // metric = cosine, pods = 1, podType = p1.x1
35+
36+
override protected def exec = {
37+
for {
38+
embedResponse <- openAIService.createEmbeddings(
39+
input = Seq(
40+
"Sample document text goes here",
41+
"there will be several phrases in each batch"
42+
),
43+
settings = CreateEmbeddingsSettings(ModelId.text_embedding_ada_002)
44+
)
45+
46+
// extract embeddings to a list
47+
embeds = embedResponse.data.map(_.embedding)
48+
49+
indexNames <- pineconeIndexService.listIndexes
50+
51+
// check if 'openai' index already exists (only create index if not)
52+
_ <- if (!indexNames.contains(indexName)) {
53+
pineconeIndexService.createIndex(
54+
indexName,
55+
dimension = embeds(0).size,
56+
settings = indexSettings
57+
).map(
58+
_ match {
59+
case CreateResponse.Created =>
60+
println(s"Index '${indexName}' successfully created.")
61+
println("Waiting 30 seconds for the index initialization to finish.")
62+
Thread.sleep(30000)
63+
64+
case CreateResponse.BadRequest =>
65+
println(s"Index '${indexName}' creation failed. Request exceeds quota or an invalid index name.")
66+
67+
case CreateResponse.AlreadyExists =>
68+
println(s"Index '${indexName}' with a given name already exists.")
69+
}
70+
)
71+
} else
72+
Future(())
73+
74+
// create a service for a given index name
75+
pineconeVectorService <- createPineconeVectorService(indexName)
76+
77+
// take the first 1000 entries and extract text, result is an iterator
78+
trecTextsIterator = scala.io.Source.fromFile("src/main/resources/trec-train.json")
79+
.getLines()
80+
.take(1000)
81+
.map(jsonString => (Json.parse(jsonString) \ "text").as[String].trim) // extract text
82+
83+
// add ids to each text/line, chunk into batches of 32, and create a source
84+
textsWithIdsSource = Source.fromIterator(() => trecTextsIterator.zipWithIndex)
85+
.grouped(batchSize)
86+
// .buffer(2, OverflowStrategy.backpressure) // process in a sequence (no rush)
87+
88+
// define an Akka-based processing flow
89+
processingFlow = Flow[Seq[(String, Int)]].mapAsyncUnordered(parallelism) { textsWithIds =>
90+
val texts = textsWithIds.map(_._1)
91+
92+
println(s"Creating and upserting embeddings for batch: ${textsWithIds.head._2 / batchSize}")
93+
94+
for {
95+
// create embeddings
96+
embedResponse <- openAIService.createEmbeddings(
97+
input = texts,
98+
settings = CreateEmbeddingsSettings(ModelId.text_embedding_ada_002)
99+
)
100+
101+
embeds = embedResponse.data.map(_.embedding)
102+
103+
// prep vectors with metadata
104+
vectors = embeds.zip(textsWithIds).map { case (embed, (text, id)) =>
105+
PVector(
106+
id = id.toString,
107+
values = embed,
108+
metadata = Map("text" -> text)
109+
)
110+
}
111+
112+
// upsert the batch to Pinecone
113+
_ <- pineconeVectorService.upsert(
114+
vectors = vectors,
115+
namespace = namespace
116+
)
117+
} yield
118+
()
119+
}
120+
121+
// execute the source with the processing flow
122+
_ <- textsWithIdsSource.via(processingFlow).runWith(Sink.ignore)
123+
124+
//////////////
125+
// Querying //
126+
//////////////
127+
128+
// first query
129+
_ <- execQuery(
130+
"What caused the 1929 Great Depression?",
131+
pineconeVectorService
132+
)
133+
134+
// second query
135+
_ <- execQuery(
136+
"What was the cause of the major recession in the early 20th century?",
137+
pineconeVectorService
138+
)
139+
140+
// third query
141+
_ <- execQuery(
142+
"Why was there a long-term economic downturn in the early 20th century?",
143+
pineconeVectorService
144+
)
145+
} yield
146+
()
147+
}
148+
149+
private def execQuery(
150+
query: String,
151+
pineconeVectorService: PineconeVectorService
152+
) = {
153+
for {
154+
// create embeddings for the query (only one in this case)
155+
embed <- openAIService.createEmbeddings(
156+
input = Seq(query),
157+
settings = CreateEmbeddingsSettings(ModelId.text_embedding_ada_002)
158+
).map(_.data(0).embedding)
159+
160+
// query the index
161+
queryResponse <- pineconeVectorService.query(
162+
vector = embed,
163+
namespace = namespace,
164+
settings = QuerySettings(
165+
topK = 5,
166+
includeValues = false,
167+
includeMetadata = true
168+
)
169+
)
170+
171+
// let's print out the top_k most similar questions and their respective similarity scores.
172+
_ = {
173+
println(s"\nQuery: ${query}\n")
174+
queryResponse.matches.foreach { match_ =>
175+
println(s"${"%1.2f".format(match_.score)}: ${match_.metadata.get("text")}")
176+
}
177+
}
178+
} yield
179+
()
180+
}
181+
}

src/main/scala/io/cequence/pineconeopenai/demo/PineconeOpenAIDemoApp.scala

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,34 @@ package io.cequence.pineconeopenai.demo
22

33
import akka.actor.ActorSystem
44
import akka.stream.Materializer
5+
import io.cequence.openaiscala.service.OpenAIServiceFactory
56

67
import scala.concurrent.{ExecutionContext, Future}
7-
import io.cequence.pineconescala.service.{PineconeIndexService, PineconeIndexServiceFactory, PineconeVectorService, PineconeVectorServiceFactory}
8+
import io.cequence.pineconescala.service.{PineconeIndexServiceFactory, PineconeServiceConsts, PineconeVectorServiceFactory}
89

9-
trait PineconeOpenAIDemoApp extends App {
10+
trait PineconeOpenAIDemoApp extends App with PineconeServiceConsts {
1011

1112
protected implicit val ec: ExecutionContext = ExecutionContext.global
1213

1314
private val actorSystem: ActorSystem = ActorSystem()
14-
private implicit val materializer: Materializer = Materializer(actorSystem)
15+
protected implicit val materializer: Materializer = Materializer(actorSystem)
1516

16-
protected def execWithIndexService: PineconeIndexService => Future[_]
17-
protected def execWithVectorService: (String => Future[PineconeVectorService]) => Future[_]
17+
// impl hook
18+
protected def exec: Future[_]
1819

19-
{
20-
for {
21-
pineconeIndexService <- Future(
22-
PineconeIndexServiceFactory() // we wrap it in a Future just because of the recover block
20+
protected lazy val pineconeIndexService = PineconeIndexServiceFactory()
21+
protected def createPineconeVectorService(indexName: String) =
22+
PineconeVectorServiceFactory(indexName).map(
23+
_.getOrElse(
24+
throw new Exception(s"Could not find index '${indexName}'")
2325
)
26+
)
2427

25-
_ <- execWithIndexService(pineconeIndexService)
28+
protected lazy val openAIService = OpenAIServiceFactory()
2629

27-
_ <- execWithVectorService((indexName: String) =>
28-
PineconeVectorServiceFactory(indexName).map(
29-
_.getOrElse(
30-
throw new Exception(s"Could not find index '${indexName}'")
31-
)
32-
)
33-
)
30+
{
31+
for {
32+
_ <- exec
3433

3534
_ <- actorSystem.terminate()
3635
} yield

0 commit comments

Comments
 (0)