mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-07-05 19:32:11 +02:00
Remove unwanted stuff which should not be in the repo
This commit is contained in:
parent
740738aba3
commit
c4a218b6f5
16 changed files with 0 additions and 18422 deletions
BIN
.DS_Store
vendored
BIN
.DS_Store
vendored
Binary file not shown.
|
|
@ -1,320 +0,0 @@
|
|||
|
||||
## Getting Started
|
||||
|
||||
The `Docker Compose` files have been tested on `Linux` and `MacOS`. There are currently
|
||||
no plans for `Windows` support in the immediate future.
|
||||
|
||||
There are 4 `Docker Compose` files depending on the desired LM deployment:
|
||||
- `VertexAI` through Google Cloud
|
||||
- `Claude` through Anthropic's API
|
||||
- `AzureAI` serverless endpoint
|
||||
- Local LM deployment through `Ollama`
|
||||
|
||||
Docker Compose enables the following functions:
|
||||
- Run the required components for full e2e `Graph RAG` knowledge pipeline
|
||||
- Check processing logs
|
||||
- Load test text corpus and begin knowledge extraction
|
||||
- Verify extracted graph edges and number of edges
|
||||
- Run a query against the vector and graph stores to generate a response
|
||||
using the chosen LM
|
||||
|
||||
### Clone the Repo
|
||||
|
||||
```
|
||||
git clone https://github.com/trustgraph-ai/trustgraph trustgraph
|
||||
cd trustgraph
|
||||
```
|
||||
|
||||
### Install requirements
|
||||
|
||||
```
|
||||
python3 -m venv env
|
||||
. env/bin/activate
|
||||
pip3 install pulsar-client
|
||||
pip3 install cassandra-driver
|
||||
export PYTHON_PATH=.
|
||||
```
|
||||
|
||||
### Docker Compose files
|
||||
|
||||
Depending on your desired LM deployment, you will choose from one of the
|
||||
following `Docker Compose` files:
|
||||
|
||||
- `docker-compose-azure.yaml`: AzureAI endpoint. Set `AZURE_TOKEN` to the secret token and
|
||||
`AZURE_ENDPOINT` to the URL endpoint address for the deployed model.
|
||||
- `docker-compose-claude.yaml`: Anthropic's API. Set `CLAUDE_KEY` to your API key.
|
||||
- `docker-compose-ollama.yaml`: Local LM (currently using [Gemma2](https://ollama.com/library/gemma2) deployed through Ollama. Set `OLLAMA_HOST` to the machine running Ollama (e.g. `localhost` for Ollama running locally on your machine)
|
||||
- `docker-compose-vertexai.yaml`: VertexAI API. Requires a `private.json` authentication file to authenticate with your GCP project. Filed should stored be at path `vertexai/private.json`.
|
||||
|
||||
**NOTE**: All tokens, paths, and authentication files must be set **PRIOR** to launching a `Docker Compose` file.
|
||||
|
||||
|
||||
#### AzureAI Serverless Model Deployment
|
||||
|
||||
```
|
||||
export AZURE_ENDPOINT=https://ENDPOINT.HOST.GOES.HERE/
|
||||
export AZURE_TOKEN=TOKEN-GOES-HERE
|
||||
docker-compose -f docker-compose-azure.yaml up -d
|
||||
```
|
||||
|
||||
#### Claude through Anthropic API
|
||||
|
||||
```
|
||||
export CLAUDE_KEY=TOKEN-GOES-HERE
|
||||
docker-compose -f docker-compose-claude.yaml up -d
|
||||
```
|
||||
|
||||
#### Ollama Hosted Model Deployment
|
||||
|
||||
```
|
||||
export OLLAMA_HOST=localhost # Set to hostname of Ollama host
|
||||
docker-compose -f docker-compose-ollama.yaml up -d
|
||||
```
|
||||
|
||||
#### VertexAI through GCP
|
||||
|
||||
```
|
||||
mkdir -p vertexai
|
||||
cp {whatever} vertexai/private.json
|
||||
docker-compose -f docker-compose-vertexai.yaml up -d
|
||||
```
|
||||
|
||||
If you're running `SELinux` on Linux you may need to set the permissions on the
|
||||
VertexAI directory so that the key file can be mounted on a Docker container using
|
||||
the following command:
|
||||
|
||||
```
|
||||
chcon -Rt svirt_sandbox_file_t vertexai/
|
||||
```
|
||||
|
||||
### Verify Docker Containers
|
||||
|
||||
On first running a `Docker Compose` file, it may take a while (depending on your network connection) to pull all the necessary components. Once all of the components have been pulled, check that the TrustGraph containers are running:
|
||||
|
||||
```
|
||||
docker ps
|
||||
```
|
||||
|
||||
Any containers that have exited unexpectedly can be found by checking the `STATUS` field
|
||||
using the following:
|
||||
|
||||
```
|
||||
docker ps -a
|
||||
```
|
||||
|
||||
### Warm-Up
|
||||
|
||||
Before proceeding, allow the system to enter a stable a working state. In general
|
||||
`30 seconds` should be enough time for Pulsar to stablize.
|
||||
|
||||
The system uses Cassandra for a Graph store. Cassandra can take `60-70 seconds`
|
||||
to achieve a working state.
|
||||
|
||||
### Load a Text Corpus
|
||||
|
||||
Create a sources directory and get a test PDF file. To demonstrate the power of TrustGraph, we're using a PDF of the public [Roger's Commision Report](https://sma.nasa.gov/SignificantIncidents/assets/rogers_commission_report.pdf) from the NASA Challenger disaster. This PDF includes complex formatting, unique terms, complex concepts, unique concepts, and information not commonly found in public knowledge sources.
|
||||
|
||||
```
|
||||
mkdir sources
|
||||
curl -o sources/Challenger-Report-Vol1.pdf https://sma.nasa.gov/SignificantIncidents/assets/rogers_commission_report.pdf
|
||||
```
|
||||
|
||||
Load the file for knowledge extraction:
|
||||
|
||||
```
|
||||
scripts/loader -f sources/Challenger-Report-Vol1.pdf
|
||||
```
|
||||
|
||||
`File loaded.` indicates the PDF has been sucessfully loaded to the processing queues and extraction will begin.
|
||||
|
||||
### Processing Logs
|
||||
|
||||
At this point, many processing services are running concurrently. You can check the status of these processes with the following logs:
|
||||
|
||||
`PDF Decoder`:
|
||||
```
|
||||
docker logs trustgraph-pdf-decoder-1
|
||||
```
|
||||
|
||||
Output should look:
|
||||
```
|
||||
Decoding 1f7b7055...
|
||||
Done.
|
||||
```
|
||||
|
||||
`Chunker`:
|
||||
```
|
||||
docker logs trustgraph-chunker-1
|
||||
```
|
||||
|
||||
The output should be similiar to the output of the `Decode`, except it should be a sequence of many entries.
|
||||
|
||||
`Vectorizer`:
|
||||
```
|
||||
docker logs trustgraph-vectorize-1
|
||||
```
|
||||
|
||||
Similar output to above processes, except many entries instead.
|
||||
|
||||
|
||||
`Language Model Inference`:
|
||||
```
|
||||
docker logs trustgraph-llm-1
|
||||
```
|
||||
|
||||
Output should be a sequence of entries:
|
||||
```
|
||||
Handling prompt fa1b98ae-70ef-452b-bcbe-21a867c5e8e2...
|
||||
Send response...
|
||||
Done.
|
||||
```
|
||||
|
||||
`Knowledge Graph Definitions`:
|
||||
```
|
||||
docker logs trustgraph-kg-extract-definitions-1
|
||||
```
|
||||
|
||||
Output should be an array of JSON objects with keys `entity` and `definition`:
|
||||
|
||||
```
|
||||
Indexing 1f7b7055-p11-c1...
|
||||
[
|
||||
{
|
||||
"entity": "Orbiter",
|
||||
"definition": "A spacecraft designed for spaceflight."
|
||||
},
|
||||
{
|
||||
"entity": "flight deck",
|
||||
"definition": "The top level of the crew compartment, typically where flight controls are located."
|
||||
},
|
||||
{
|
||||
"entity": "middeck",
|
||||
"definition": "The lower level of the crew compartment, used for sleeping, working, and storing equipment."
|
||||
}
|
||||
]
|
||||
Done.
|
||||
```
|
||||
|
||||
`Knowledge Graph Relationshps`:
|
||||
```
|
||||
docker logs trustgraph-kg-extract-relationships-1
|
||||
```
|
||||
|
||||
Output should be an array of JSON objects with keys `subject`, `predicate`, `object`, and `object-entity`:
|
||||
```
|
||||
Indexing 1f7b7055-p11-c3...
|
||||
[
|
||||
{
|
||||
"subject": "Space Shuttle",
|
||||
"predicate": "carry",
|
||||
"object": "16 tons of cargo",
|
||||
"object-entity": false
|
||||
},
|
||||
{
|
||||
"subject": "friction",
|
||||
"predicate": "generated by",
|
||||
"object": "atmosphere",
|
||||
"object-entity": true
|
||||
}
|
||||
]
|
||||
Done.
|
||||
```
|
||||
|
||||
### Graph Parsing
|
||||
|
||||
To check that the knowledge graph is successfully parsing data:
|
||||
|
||||
```
|
||||
scripts/graph-show
|
||||
```
|
||||
|
||||
The output should be a set of semantic triples in [N-Triples](https://www.w3.org/TR/rdf12-n-triples/) format.
|
||||
|
||||
```
|
||||
http://trustgraph.ai/e/enterprise http://trustgraph.ai/e/was-carried to altitude and released for a gliding approach and landing at the Mojave Desert test center.
|
||||
http://trustgraph.ai/e/enterprise http://www.w3.org/2000/01/rdf-schema#label Enterprise.
|
||||
http://trustgraph.ai/e/enterprise http://www.w3.org/2004/02/skos/core#definition A prototype space shuttle orbiter used for atmospheric flight testing.
|
||||
```
|
||||
|
||||
### Number of Graph Edges
|
||||
|
||||
N-Triples format is not particularly human readable. It's more useful to know how many graph edges have successfully been extracted from the text corpus:
|
||||
```
|
||||
scripts/graph-show | wc -l
|
||||
```
|
||||
|
||||
The Challenger report has a long introduction with quite a bit of adminstrative text commonly found in official reports. The first few hundred graph edges mostly capture this document formatting knowledge. To fully test the ability to extract complex knowledge, wait until at least `1000` graph edges have been extracted. The full extraction for this PDF will extract many thousand graph edges.
|
||||
|
||||
### RAG Test Script
|
||||
```
|
||||
tests/test-graph-rag
|
||||
```
|
||||
This script forms a LM prompt asking for 20 facts regarding the Challenger disaster. Depending on how many graph edges have been extracted, the response will be similar to:
|
||||
|
||||
```
|
||||
Here are 20 facts from the provided knowledge graph about the Space Shuttle disaster:
|
||||
|
||||
1. **Space Shuttle Challenger was a Space Shuttle spacecraft.**
|
||||
2. **The third Spacelab mission was carried by Orbiter Challenger.**
|
||||
3. **Francis R. Scobee was the Commander of the Challenger crew.**
|
||||
4. **Earth-to-orbit systems are designed to transport payloads and humans from Earth's surface into orbit.**
|
||||
5. **The Space Shuttle program involved the Space Shuttle.**
|
||||
6. **Orbiter Challenger flew on mission 41-B.**
|
||||
7. **Orbiter Challenger was used on STS-7 and STS-8 missions.**
|
||||
8. **Columbia completed the orbital test.**
|
||||
9. **The Space Shuttle flew 24 successful missions.**
|
||||
10. **One possibility for the Space Shuttle was a winged but unmanned recoverable liquid-fuel vehicle based on the Saturn 5 rocket.**
|
||||
11. **A Commission was established to investigate the space shuttle Challenger accident.**
|
||||
12. **Judit h Arlene Resnik was Mission Specialist Two.**
|
||||
13. **Mission 51-L was originally scheduled for December 1985 but was delayed until January 1986.**
|
||||
14. **The Corporation's Space Transportation Systems Division was responsible for the design and development of the Space Shuttle Orbiter.**
|
||||
15. **Michael John Smith was the Pilot of the Challenger crew.**
|
||||
16. **The Space Shuttle is composed of two recoverable Solid Rocket Boosters.**
|
||||
17. **The Space Shuttle provides for the broadest possible spectrum of civil/military missions.**
|
||||
18. **Mission 51-L consisted of placing one satellite in orbit, deploying and retrieving Spartan, and conducting six experiments.**
|
||||
19. **The Space Shuttle became the focus of NASA's near-term future.**
|
||||
20. **The Commission focused its attention on safety aspects of future flights.**
|
||||
```
|
||||
|
||||
For any errors with the `RAG` proces, check the following log:
|
||||
```
|
||||
docker logs -f trustgraph-graph-rag-1
|
||||
```
|
||||
### More RAG Test Queries
|
||||
|
||||
If you want to try different RAG queries, modify the `query` in the [test script](https://github.com/trustgraph-ai/trustgraph/blob/master/tests/test-graph-rag).
|
||||
|
||||
### Shutting Down
|
||||
|
||||
When shutting down the pipeline, it's best to shut down all Docker containers and volumes. Run the `docker compose down` command that corresponds to your model deployment:
|
||||
|
||||
`AzureAI Endpoint`:
|
||||
```
|
||||
docker-compose -f docker-compose-azure.yaml down --volumes
|
||||
```
|
||||
|
||||
`Anthropic API`:
|
||||
```
|
||||
docker-compose -f docker-compose-claude.yaml down --volumes
|
||||
```
|
||||
|
||||
`Ollama`:
|
||||
```
|
||||
docker-compose -f docker-compose-ollama.yaml down --volumes
|
||||
```
|
||||
|
||||
`VertexAI API`:
|
||||
```
|
||||
docker-compose -f docker-compose-vertexai.yaml down --volumes
|
||||
```
|
||||
|
||||
To confirm all Docker containers have been shut down, check that the following list is empty:
|
||||
```
|
||||
docker ps
|
||||
```
|
||||
|
||||
To confirm all Docker volumes have been removed, check that the following list is empty:
|
||||
```
|
||||
docker volume ls
|
||||
```
|
||||
|
||||
|
|
@ -1,6 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from trustgraph.kg.extract_definitions import run
|
||||
|
||||
run()
|
||||
|
||||
|
|
@ -1,6 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from trustgraph.llm.azure_text import run
|
||||
|
||||
run()
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
File diff suppressed because it is too large
Load diff
|
|
@ -1,3 +0,0 @@
|
|||
|
||||
from . extract import *
|
||||
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from . extract import run
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
||||
|
||||
|
|
@ -1,197 +0,0 @@
|
|||
|
||||
"""
|
||||
Simple decoder, accepts vector+text chunks input, applies entity analysis to
|
||||
get entity definitions which are output as graph edges.
|
||||
"""
|
||||
|
||||
import pulsar
|
||||
from pulsar.schema import JsonSchema
|
||||
from langchain_community.document_loaders import PyPDFLoader
|
||||
import tempfile
|
||||
import base64
|
||||
import os
|
||||
import argparse
|
||||
import rdflib
|
||||
import json
|
||||
import urllib.parse
|
||||
import time
|
||||
|
||||
from ... schema import VectorsChunk, Triple, Source, Value
|
||||
from ... log_level import LogLevel
|
||||
from ... llm_client import LlmClient
|
||||
from ... prompts import to_definitions
|
||||
from ... rdf import TRUSTGRAPH_ENTITIES, DEFINITION
|
||||
|
||||
DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True)
|
||||
|
||||
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
|
||||
default_input_queue = 'vectors-chunk-load'
|
||||
default_output_queue = 'graph-load'
|
||||
default_subscriber = 'kg-extract-definitions'
|
||||
|
||||
class Processor:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pulsar_host=default_pulsar_host,
|
||||
input_queue=default_input_queue,
|
||||
output_queue=default_output_queue,
|
||||
subscriber=default_subscriber,
|
||||
log_level=LogLevel.INFO,
|
||||
):
|
||||
|
||||
self.client = None
|
||||
|
||||
self.client = pulsar.Client(
|
||||
pulsar_host,
|
||||
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
|
||||
)
|
||||
|
||||
self.consumer = self.client.subscribe(
|
||||
input_queue, subscriber,
|
||||
schema=JsonSchema(VectorsChunk),
|
||||
)
|
||||
|
||||
self.producer = self.client.create_producer(
|
||||
topic=output_queue,
|
||||
schema=JsonSchema(Triple),
|
||||
)
|
||||
|
||||
self.llm = LlmClient(pulsar_host=pulsar_host)
|
||||
|
||||
def to_uri(self, text):
|
||||
|
||||
part = text.replace(" ", "-").lower().encode("utf-8")
|
||||
quoted = urllib.parse.quote(part)
|
||||
uri = TRUSTGRAPH_ENTITIES + quoted
|
||||
|
||||
return uri
|
||||
|
||||
def get_definitions(self, chunk):
|
||||
|
||||
prompt = to_definitions(chunk)
|
||||
resp = self.llm.request(prompt)
|
||||
|
||||
defs = json.loads(resp)
|
||||
|
||||
return defs
|
||||
|
||||
def emit_edge(self, s, p, o):
|
||||
|
||||
t = Triple(s=s, p=p, o=o)
|
||||
self.producer.send(t)
|
||||
|
||||
def run(self):
|
||||
|
||||
while True:
|
||||
|
||||
msg = self.consumer.receive()
|
||||
|
||||
try:
|
||||
|
||||
v = msg.value()
|
||||
print(f"Indexing {v.source.id}...", flush=True)
|
||||
|
||||
chunk = v.chunk.decode("utf-8")
|
||||
|
||||
g = rdflib.Graph()
|
||||
|
||||
try:
|
||||
|
||||
defs = self.get_definitions(chunk)
|
||||
print(json.dumps(defs, indent=4), flush=True)
|
||||
|
||||
for defn in defs:
|
||||
|
||||
s = defn["entity"]
|
||||
s_uri = self.to_uri(s)
|
||||
|
||||
o = defn["definition"]
|
||||
|
||||
s_value = Value(value=str(s_uri), is_uri=True)
|
||||
o_value = Value(value=str(o), is_uri=False)
|
||||
|
||||
self.emit_edge(s_value, DEFINITION_VALUE, o_value)
|
||||
|
||||
except Exception as e:
|
||||
print("Exception: ", e, flush=True)
|
||||
|
||||
print("Done.", flush=True)
|
||||
|
||||
# Acknowledge successful processing of the message
|
||||
self.consumer.acknowledge(msg)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("Exception: ", e, flush=True)
|
||||
|
||||
# Message failed to be processed
|
||||
self.consumer.negative_acknowledge(msg)
|
||||
|
||||
def __del__(self):
|
||||
|
||||
if self.client:
|
||||
self.client.close()
|
||||
|
||||
def run():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='pdf-decoder',
|
||||
description=__doc__,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-p', '--pulsar-host',
|
||||
default=default_pulsar_host,
|
||||
help=f'Pulsar host (default: {default_pulsar_host})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-i', '--input-queue',
|
||||
default=default_input_queue,
|
||||
help=f'Input queue (default: {default_input_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-s', '--subscriber',
|
||||
default=default_subscriber,
|
||||
help=f'Queue subscriber name (default: {default_subscriber})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-o', '--output-queue',
|
||||
default=default_output_queue,
|
||||
help=f'Output queue (default: {default_output_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-l', '--log-level',
|
||||
type=LogLevel,
|
||||
default=LogLevel.INFO,
|
||||
choices=list(LogLevel),
|
||||
help=f'Output queue (default: info)'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
while True:
|
||||
|
||||
try:
|
||||
|
||||
p = Processor(
|
||||
pulsar_host=args.pulsar_host,
|
||||
input_queue=args.input_queue,
|
||||
output_queue=args.output_queue,
|
||||
subscriber=args.subscriber,
|
||||
log_level=args.log_level,
|
||||
)
|
||||
|
||||
p.run()
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("Exception:", e, flush=True)
|
||||
print("Will retry...", flush=True)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
|
|
@ -1,3 +0,0 @@
|
|||
|
||||
from . extract import *
|
||||
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from . extract import run
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
||||
|
||||
|
|
@ -1,253 +0,0 @@
|
|||
|
||||
"""
|
||||
Simple decoder, accepts vector+text chunks input, applies entity
|
||||
relationship analysis to get entity relationship edges which are output as
|
||||
graph edges.
|
||||
"""
|
||||
|
||||
import pulsar
|
||||
from pulsar.schema import JsonSchema
|
||||
from langchain_community.document_loaders import PyPDFLoader
|
||||
import tempfile
|
||||
import base64
|
||||
import os
|
||||
import argparse
|
||||
import rdflib
|
||||
import json
|
||||
import urllib.parse
|
||||
import time
|
||||
|
||||
from ... schema import VectorsChunk, Triple, VectorsAssociation, Source, Value
|
||||
from ... log_level import LogLevel
|
||||
from ... llm_client import LlmClient
|
||||
from ... prompts import to_relationships
|
||||
from ... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES
|
||||
|
||||
RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True)
|
||||
|
||||
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
|
||||
default_input_queue = 'vectors-chunk-load'
|
||||
default_output_queue = 'graph-load'
|
||||
default_subscriber = 'kg-extract-relationships'
|
||||
default_vector_queue='vectors-load'
|
||||
|
||||
class Processor:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pulsar_host=default_pulsar_host,
|
||||
input_queue=default_input_queue,
|
||||
vector_queue=default_vector_queue,
|
||||
output_queue=default_output_queue,
|
||||
subscriber=default_subscriber,
|
||||
log_level=LogLevel.INFO,
|
||||
):
|
||||
|
||||
self.client = pulsar.Client(
|
||||
pulsar_host,
|
||||
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
|
||||
)
|
||||
|
||||
self.consumer = self.client.subscribe(
|
||||
input_queue, subscriber,
|
||||
schema=JsonSchema(VectorsChunk),
|
||||
)
|
||||
|
||||
self.producer = self.client.create_producer(
|
||||
topic=output_queue,
|
||||
schema=JsonSchema(Triple),
|
||||
)
|
||||
|
||||
self.vec_prod = self.client.create_producer(
|
||||
topic=vector_queue,
|
||||
schema=JsonSchema(VectorsAssociation),
|
||||
)
|
||||
|
||||
self.llm = LlmClient(pulsar_host=pulsar_host)
|
||||
|
||||
def to_uri(self, text):
|
||||
|
||||
part = text.replace(" ", "-").lower().encode("utf-8")
|
||||
quoted = urllib.parse.quote(part)
|
||||
uri = TRUSTGRAPH_ENTITIES + quoted
|
||||
|
||||
return uri
|
||||
|
||||
def get_relationships(self, chunk):
|
||||
|
||||
prompt = to_relationships(chunk)
|
||||
resp = self.llm.request(prompt)
|
||||
|
||||
rels = json.loads(resp)
|
||||
|
||||
return rels
|
||||
|
||||
def emit_edge(self, s, p, o):
|
||||
|
||||
t = Triple(s=s, p=p, o=o)
|
||||
self.producer.send(t)
|
||||
|
||||
def emit_vec(self, ent, vec):
|
||||
|
||||
r = VectorsAssociation(entity=ent, vectors=vec)
|
||||
self.vec_prod.send(r)
|
||||
|
||||
def run(self):
|
||||
|
||||
while True:
|
||||
|
||||
msg = self.consumer.receive()
|
||||
|
||||
try:
|
||||
|
||||
v = msg.value()
|
||||
print(f"Indexing {v.source.id}...", flush=True)
|
||||
|
||||
chunk = v.chunk.decode("utf-8")
|
||||
|
||||
g = rdflib.Graph()
|
||||
|
||||
try:
|
||||
|
||||
rels = self.get_relationships(chunk)
|
||||
print(json.dumps(rels, indent=4), flush=True)
|
||||
|
||||
for rel in rels:
|
||||
|
||||
s = rel["subject"]
|
||||
p = rel["predicate"]
|
||||
o = rel["object"]
|
||||
|
||||
s_uri = self.to_uri(s)
|
||||
s_value = Value(value=str(s_uri), is_uri=True)
|
||||
|
||||
p_uri = self.to_uri(p)
|
||||
p_value = Value(value=str(p_uri), is_uri=True)
|
||||
|
||||
if rel["object-entity"]:
|
||||
o_uri = self.to_uri(o)
|
||||
o_value = Value(value=str(o_uri), is_uri=True)
|
||||
else:
|
||||
o_value = Value(value=str(o), is_uri=False)
|
||||
|
||||
self.emit_edge(
|
||||
s_value,
|
||||
p_value,
|
||||
o_value
|
||||
)
|
||||
|
||||
# Label for s
|
||||
self.emit_edge(
|
||||
s_value,
|
||||
RDF_LABEL_VALUE,
|
||||
Value(value=str(s), is_uri=False)
|
||||
)
|
||||
|
||||
# Label for p
|
||||
self.emit_edge(
|
||||
p_value,
|
||||
RDF_LABEL_VALUE,
|
||||
Value(value=str(p), is_uri=False)
|
||||
)
|
||||
|
||||
if rel["object-entity"]:
|
||||
# Label for o
|
||||
self.emit_edge(
|
||||
o_value,
|
||||
RDF_LABEL_VALUE,
|
||||
Value(value=str(o), is_uri=False)
|
||||
)
|
||||
|
||||
self.emit_vec(s_value, v.vectors)
|
||||
self.emit_vec(p_value, v.vectors)
|
||||
if rel["object-entity"]:
|
||||
self.emit_vec(o_value, v.vectors)
|
||||
|
||||
except Exception as e:
|
||||
print("Exception: ", e, flush=True)
|
||||
|
||||
print("Done.", flush=True)
|
||||
|
||||
# Acknowledge successful processing of the message
|
||||
self.consumer.acknowledge(msg)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("Exception: ", e, flush=True)
|
||||
|
||||
# Message failed to be processed
|
||||
self.consumer.negative_acknowledge(msg)
|
||||
|
||||
def __del__(self):
|
||||
self.client.close()
|
||||
|
||||
def run():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='kg-extract-relationships',
|
||||
description=__doc__,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-p', '--pulsar-host',
|
||||
default=default_pulsar_host,
|
||||
help=f'Pulsar host (default: {default_pulsar_host})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-i', '--input-queue',
|
||||
default=default_input_queue,
|
||||
help=f'Input queue (default: {default_input_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-s', '--subscriber',
|
||||
default=default_subscriber,
|
||||
help=f'Queue subscriber name (default: {default_subscriber})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-o', '--output-queue',
|
||||
default=default_output_queue,
|
||||
help=f'Output queue (default: {default_output_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-l', '--log-level',
|
||||
type=LogLevel,
|
||||
default=LogLevel.INFO,
|
||||
choices=list(LogLevel),
|
||||
help=f'Output queue (default: info)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-c', '--vector-queue',
|
||||
default=default_vector_queue,
|
||||
help=f'Vector output queue (default: {default_vector_queue})'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
while True:
|
||||
|
||||
try:
|
||||
|
||||
p = Processor(
|
||||
pulsar_host=args.pulsar_host,
|
||||
input_queue=args.input_queue,
|
||||
output_queue=args.output_queue,
|
||||
vector_queue=args.vector_queue,
|
||||
subscriber=args.subscriber,
|
||||
log_level=args.log_level,
|
||||
)
|
||||
|
||||
p.run()
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("Exception:", e, flush=True)
|
||||
print("Will retry...", flush=True)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
|
||||
|
|
@ -1,127 +0,0 @@
|
|||
|
||||
"""
|
||||
Simple LLM service, performs text prompt completion using the Azure
|
||||
serverless endpoint service. Input is prompt, output is response.
|
||||
"""
|
||||
|
||||
import requests
|
||||
import json
|
||||
|
||||
from ... schema import TextCompletionRequest, TextCompletionResponse
|
||||
from ... log_level import LogLevel
|
||||
from ... base import ConsumerProducer
|
||||
|
||||
default_input_queue = 'llm-complete-text'
|
||||
default_output_queue = 'llm-complete-text-response'
|
||||
default_subscriber = 'llm-azure-text'
|
||||
|
||||
class Processor(ConsumerProducer):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pulsar_host=None,
|
||||
input_queue=default_input_queue,
|
||||
output_queue=default_output_queue,
|
||||
subscriber=default_subscriber,
|
||||
log_level=LogLevel.INFO,
|
||||
endpoint=None,
|
||||
token=None,
|
||||
):
|
||||
|
||||
super(Processor, self).__init__(
|
||||
pulsar_host=pulsar_host,
|
||||
log_level=log_level,
|
||||
input_queue=input_queue,
|
||||
output_queue=output_queue,
|
||||
subscriber=subscriber,
|
||||
input_schema=TextCompletionRequest,
|
||||
output_schema=TextCompletionResponse,
|
||||
)
|
||||
|
||||
self.endpoint = endpoint
|
||||
self.token = token
|
||||
|
||||
def build_prompt(self, system, content):
|
||||
|
||||
data = {
|
||||
"messages": [
|
||||
{
|
||||
"role": "system", "content": system
|
||||
},
|
||||
{
|
||||
"role": "user", "content": content
|
||||
}
|
||||
],
|
||||
"max_tokens": 4192,
|
||||
"temperature": 0.2,
|
||||
"top_p": 1
|
||||
}
|
||||
|
||||
body = json.dumps(data)
|
||||
|
||||
return body
|
||||
|
||||
def call_llm(self, body):
|
||||
|
||||
url = self.endpoint
|
||||
|
||||
# Replace this with the primary/secondary key, AMLToken, or
|
||||
# Microsoft Entra ID token for the endpoint
|
||||
api_key = self.token
|
||||
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {api_key}'
|
||||
}
|
||||
|
||||
resp = requests.post(url, data=body, headers=headers)
|
||||
result = resp.json()
|
||||
|
||||
message_content = result['choices'][0]['message']['content']
|
||||
|
||||
return message_content
|
||||
|
||||
def handle(self, msg):
|
||||
|
||||
v = msg.value()
|
||||
|
||||
# Sender-produced ID
|
||||
|
||||
id = msg.properties()["id"]
|
||||
|
||||
print(f"Handling prompt {id}...", flush=True)
|
||||
|
||||
prompt = self.build_prompt(
|
||||
"You are a helpful chatbot",
|
||||
v.prompt
|
||||
)
|
||||
|
||||
response = self.call_llm(prompt)
|
||||
|
||||
print("Send response...", flush=True)
|
||||
r = TextCompletionResponse(response=response)
|
||||
self.producer.send(r, properties={"id": id})
|
||||
|
||||
print("Done.", flush=True)
|
||||
|
||||
@staticmethod
|
||||
def add_args(parser):
|
||||
|
||||
ConsumerProducer.add_args(
|
||||
parser, default_input_queue, default_subscriber,
|
||||
default_output_queue,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-e', '--endpoint',
|
||||
help=f'LLM model endpoint'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-k', '--token',
|
||||
help=f'LLM model token'
|
||||
)
|
||||
|
||||
def run():
|
||||
|
||||
Processor.start("llm-azure-text", __doc__)
|
||||
|
|
@ -1,192 +0,0 @@
|
|||
|
||||
"""
|
||||
Simple LLM service, performs text prompt completion using Claude.
|
||||
Input is prompt, output is response.
|
||||
"""
|
||||
|
||||
import pulsar
|
||||
from pulsar.schema import JsonSchema
|
||||
import tempfile
|
||||
import base64
|
||||
import os
|
||||
import argparse
|
||||
import anthropic
|
||||
import time
|
||||
|
||||
from ... schema import TextCompletionRequest, TextCompletionResponse
|
||||
from ... log_level import LogLevel
|
||||
|
||||
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
|
||||
default_input_queue = 'llm-complete-text'
|
||||
default_output_queue = 'llm-complete-text-response'
|
||||
default_subscriber = 'llm-claude-text'
|
||||
default_model = 'claude-3-5-sonnet-20240620'
|
||||
|
||||
class Processor:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pulsar_host=default_pulsar_host,
|
||||
input_queue=default_input_queue,
|
||||
output_queue=default_output_queue,
|
||||
subscriber=default_subscriber,
|
||||
log_level=LogLevel.INFO,
|
||||
model=default_model,
|
||||
api_key,
|
||||
):
|
||||
|
||||
self.client = None
|
||||
|
||||
self.client = pulsar.Client(
|
||||
pulsar_host,
|
||||
logger=pulsar.ConsoleLogger(log_level.to_pulsar())
|
||||
)
|
||||
|
||||
self.consumer = self.client.subscribe(
|
||||
input_queue, subscriber,
|
||||
schema=JsonSchema(TextCompletionRequest),
|
||||
)
|
||||
|
||||
self.producer = self.client.create_producer(
|
||||
topic=output_queue,
|
||||
schema=JsonSchema(TextCompletionResponse),
|
||||
)
|
||||
|
||||
self.model = model
|
||||
|
||||
self.claude = anthropic.Anthropic(api_key=api_key)
|
||||
|
||||
print("Initialised", flush=True)
|
||||
|
||||
def run(self):
|
||||
|
||||
while True:
|
||||
|
||||
msg = self.consumer.receive()
|
||||
|
||||
try:
|
||||
|
||||
v = msg.value()
|
||||
|
||||
# Sender-produced ID
|
||||
|
||||
id = msg.properties()["id"]
|
||||
|
||||
print(f"Handling prompt {id}...", flush=True)
|
||||
|
||||
prompt = v.prompt
|
||||
response = message = self.claude.messages.create(
|
||||
model=self.model,
|
||||
max_tokens=1000,
|
||||
temperature=0.1,
|
||||
system = "You are a helpful chatbot.",
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": prompt
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
resp = response.content[0].text
|
||||
print(resp, flush=True)
|
||||
|
||||
print("Send response...", flush=True)
|
||||
r = TextCompletionResponse(response=resp)
|
||||
self.producer.send(r, properties={"id": id})
|
||||
|
||||
print("Done.", flush=True)
|
||||
|
||||
# Acknowledge successful processing of the message
|
||||
self.consumer.acknowledge(msg)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("Exception:", e, flush=True)
|
||||
|
||||
# Message failed to be processed
|
||||
self.consumer.negative_acknowledge(msg)
|
||||
|
||||
def __del__(self):
|
||||
self.client.close()
|
||||
|
||||
def run():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='llm-ollama-text',
|
||||
description=__doc__,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-p', '--pulsar-host',
|
||||
default=default_pulsar_host,
|
||||
help=f'Pulsar host (default: {default_pulsar_host})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-i', '--input-queue',
|
||||
default=default_input_queue,
|
||||
help=f'Input queue (default: {default_input_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-s', '--subscriber',
|
||||
default=default_subscriber,
|
||||
help=f'Queue subscriber name (default: {default_subscriber})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-o', '--output-queue',
|
||||
default=default_output_queue,
|
||||
help=f'Output queue (default: {default_output_queue})'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-l', '--log-level',
|
||||
type=LogLevel,
|
||||
default=LogLevel.INFO,
|
||||
choices=list(LogLevel),
|
||||
help=f'Output queue (default: info)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-m', '--model',
|
||||
default="claude-3-5-sonnet-20240620",
|
||||
help=f'LLM model (default: claude-3-5-sonnet-20240620)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-k', '--api-key',
|
||||
help=f'Claude API key'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
while True:
|
||||
|
||||
try:
|
||||
|
||||
p = Processor(
|
||||
pulsar_host=args.pulsar_host,
|
||||
input_queue=args.input_queue,
|
||||
output_queue=args.output_queue,
|
||||
subscriber=args.subscriber,
|
||||
log_level=args.log_level,
|
||||
model=args.model,
|
||||
api_key=args.api_key,
|
||||
)
|
||||
|
||||
p.run()
|
||||
|
||||
except Exception as e:
|
||||
|
||||
print("Exception:", e, flush=True)
|
||||
print("Will retry...", flush=True)
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue