diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..fdd19e0a --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*~ +__pycache__/ +env/ +*.egg_info/ diff --git a/Containerfile b/Containerfile new file mode 100644 index 00000000..e01bd7a2 --- /dev/null +++ b/Containerfile @@ -0,0 +1,54 @@ + +# ---------------------------------------------------------------------------- +# Build an AI container. This does the torch install which is huge, and I +# like to avoid re-doing this. +# ---------------------------------------------------------------------------- + +FROM docker.io/fedora:40 AS ai + +ENV PIP_BREAK_SYSTEM_PACKAGES=1 + +RUN dnf install -y python3 python3-pip python3-wheel python3-aiohttp \ + python3-rdflib + +RUN pip3 install torch --index-url https://download.pytorch.org/whl/cpu + +RUN pip3 install anthropic google-cloud-aiplatform langchain langchain-core \ + langchain-huggingface langchain-text-splitters langchain-community \ + pymilvus sentence-transformers transformers huggingface-hub \ + pulsar-client && \ + pip3 cache purge + +# ---------------------------------------------------------------------------- +# Build a container which contains the built Python package. The build +# creates a bunch of left-over cruft, a separate phase means this is only +# needed to support package build +# ---------------------------------------------------------------------------- + +FROM ai AS build + +env PACKAGE_VERSION=0.0.0 + +COPY setup.py /root/build/ +COPY README.md /root/build/ +COPY scripts/ /root/build/scripts/ +COPY trustgraph/ root/build/trustgraph/ + +RUN (cd /root/build && pip3 wheel -w /root/wheels --no-deps .) + +# ---------------------------------------------------------------------------- +# Finally, the target container. Start with base and add the package. +# ---------------------------------------------------------------------------- + +FROM ai + +COPY --from=build /root/wheels /root/wheels + +RUN pip3 install /root/wheels/trustgraph-* && \ + pip3 cache purge && \ + rm -rf /root/wheels + +WORKDIR / + +CMD sleep 1000000 + diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..d6456956 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..f7fbdc72 --- /dev/null +++ b/Makefile @@ -0,0 +1,30 @@ + +# VERSION=$(shell git describe | sed 's/^v//') +VERSION=0.1.16 + +all: container + +CONTAINER=docker.io/trustgraph/trustgraph-flow + +container: + podman build -f Containerfile -t ${CONTAINER}:${VERSION} \ + --format docker + +push: + podman push ${CONTAINER}:${VERSION} + +start: + podman run -i -t --name ${NAME} \ + -i -t \ + -p 8081:8081 \ + -v $$(pwd)/keys:/keys \ + -v $$(pwd)/configs:/configs \ + ${CONTAINER}:${VERSION} + +stop: + podman rm -f ${NAME} + +clean: + rm -rf wheels/ + +# sed -i 's/0.1.15/0.1.16/' docker-compose*.yaml diff --git a/architecture.png b/architecture.png new file mode 100644 index 00000000..deb30c7c Binary files /dev/null and b/architecture.png differ diff --git a/architecture.svg b/architecture.svg new file mode 100644 index 00000000..86a0cad5 --- /dev/null +++ b/architecture.svg @@ -0,0 +1,598 @@ + + + + + + + + + + + + + + + + + loader + + + + + pdfdecoder + + + + chunker + + + + vectorizer + + + + kgextractor + + + + llmollama + + + + graphwrite + + + + vectorstore + + + + Pulsar + + + + Ollama + + + + GPUhardware + + + + Milvus + + + + Cassandra + + + + + + + + + + + + + + + + + + Embeds + + + + + RDFedges + + + + + diff --git a/docker-compose-azure.yaml b/docker-compose-azure.yaml new file mode 100644 index 00000000..1a86ed9e --- /dev/null +++ b/docker-compose-azure.yaml @@ -0,0 +1,172 @@ + +volumes: + cassandra: + pulsar-conf: + pulsar-data: + etcd: + minio-data: + milvus: + +services: + + cassandra: + image: docker.io/cassandra:4.1.5 + ports: + - "9042:9042" + volumes: + - "cassandra:/var/lib/cassandra" + restart: on-failure:100 + + pulsar: + image: docker.io/apachepulsar/pulsar:3.3.0 + command: bin/pulsar standalone + ports: + - "6650:6650" + - "8080:8080" + volumes: + - "pulsar-conf:/pulsar/conf" + - "pulsar-data:/pulsar/data" + restart: on-failure:100 + + pulsar-manager: + image: docker.io/apachepulsar/pulsar-manager:v0.3.0 + ports: + - "9527:9527" + - "7750:7750" + environment: + SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties + restart: on-failure:100 + + etcd: + image: quay.io/coreos/etcd:v3.5.5 + command: + - "etcd" + - "-advertise-client-urls=http://127.0.0.1:2379" + - "-listen-client-urls" + - "http://0.0.0.0:2379" + - "--data-dir" + - "/etcd" + environment: + ETCD_AUTO_COMPACTION_MODE: revision + ETCD_AUTO_COMPACTION_RETENTION: "1000" + ETCD_QUOTA_BACKEND_BYTES: "4294967296" + ETCD_SNAPSHOT_COUNT: "50000" + ports: + - "2379:2379" + volumes: + - "etcd:/etcd" + restart: on-failure:100 + + minio: + image: docker.io/minio/minio:RELEASE.2024-07-04T14-25-45Z + command: + - "minio" + - "server" + - "/minio_data" + - "--console-address" + - ":9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + ports: + - "9001:9001" + volumes: + - "minio-data:/minio_data" + restart: on-failure:100 + + milvus: + image: docker.io/milvusdb/milvus:v2.4.5 + command: + - "milvus" + - "run" + - "standalone" + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + ports: + - "9091:9091" + - "19530:19530" + volumes: + - "milvus:/var/lib/milvus" + restart: on-failure:100 + + pdf-decoder: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "pdf-decoder" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + chunker: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "chunker-recursive" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + vectorize: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "embeddings-vectorize" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + embeddings: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "embeddings-hf" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + kg-extract-definitions: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "kg-extract-definitions" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + kg-extract-relationships: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "kg-extract-relationships" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + vector-write: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "vector-write-milvus" + - "-p" + - "pulsar://pulsar:6650" + - "-t" + - "http://milvus:19530" + restart: on-failure:100 + + graph-write: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "graph-write-cassandra" + - "-p" + - "pulsar://pulsar:6650" + - "-g" + - "cassandra" + restart: on-failure:100 + + llm: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "llm-azure-text" + - "-p" + - "pulsar://pulsar:6650" + - "-k" + - ${AZURE_TOKEN} + - "-e" + - ${AZURE_ENDPOINT} + restart: on-failure:100 + diff --git a/docker-compose-claude.yaml b/docker-compose-claude.yaml new file mode 100644 index 00000000..5999725e --- /dev/null +++ b/docker-compose-claude.yaml @@ -0,0 +1,170 @@ + +volumes: + cassandra: + pulsar-conf: + pulsar-data: + etcd: + minio-data: + milvus: + +services: + + cassandra: + image: docker.io/cassandra:4.1.5 + ports: + - "9042:9042" + volumes: + - "cassandra:/var/lib/cassandra" + restart: on-failure:100 + + pulsar: + image: docker.io/apachepulsar/pulsar:3.3.0 + command: bin/pulsar standalone + ports: + - "6650:6650" + - "8080:8080" + volumes: + - "pulsar-conf:/pulsar/conf" + - "pulsar-data:/pulsar/data" + restart: on-failure:100 + + pulsar-manager: + image: docker.io/apachepulsar/pulsar-manager:v0.3.0 + ports: + - "9527:9527" + - "7750:7750" + environment: + SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties + restart: on-failure:100 + + etcd: + image: quay.io/coreos/etcd:v3.5.5 + command: + - "etcd" + - "-advertise-client-urls=http://127.0.0.1:2379" + - "-listen-client-urls" + - "http://0.0.0.0:2379" + - "--data-dir" + - "/etcd" + environment: + ETCD_AUTO_COMPACTION_MODE: revision + ETCD_AUTO_COMPACTION_RETENTION: "1000" + ETCD_QUOTA_BACKEND_BYTES: "4294967296" + ETCD_SNAPSHOT_COUNT: "50000" + ports: + - "2379:2379" + volumes: + - "etcd:/etcd" + restart: on-failure:100 + + minio: + image: docker.io/minio/minio:RELEASE.2024-07-04T14-25-45Z + command: + - "minio" + - "server" + - "/minio_data" + - "--console-address" + - ":9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + ports: + - "9001:9001" + volumes: + - "minio-data:/minio_data" + restart: on-failure:100 + + milvus: + image: docker.io/milvusdb/milvus:v2.4.5 + command: + - "milvus" + - "run" + - "standalone" + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + ports: + - "9091:9091" + - "19530:19530" + volumes: + - "milvus:/var/lib/milvus" + restart: on-failure:100 + + pdf-decoder: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "pdf-decoder" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + chunker: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "chunker-recursive" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + vectorize: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "embeddings-vectorize" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + embeddings: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "embeddings-hf" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + kg-extract-definitions: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "kg-extract-definitions" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + kg-extract-relationships: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "kg-extract-relationships" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + vector-write: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "vector-write-milvus" + - "-p" + - "pulsar://pulsar:6650" + - "-t" + - "http://milvus:19530" + restart: on-failure:100 + + graph-write: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "graph-write-cassandra" + - "-p" + - "pulsar://pulsar:6650" + - "-g" + - "cassandra" + restart: on-failure:100 + + llm: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "llm-claude-text" + - "-p" + - "pulsar://pulsar:6650" + - "-k" + - ${CLAUDE_KEY} + restart: on-failure:100 + diff --git a/docker-compose-ollama.yaml b/docker-compose-ollama.yaml new file mode 100644 index 00000000..d7d0f36f --- /dev/null +++ b/docker-compose-ollama.yaml @@ -0,0 +1,178 @@ + +volumes: + cassandra: + pulsar-conf: + pulsar-data: + etcd: + minio-data: + milvus: + +services: + + cassandra: + image: docker.io/cassandra:4.1.5 + ports: + - "9042:9042" + volumes: + - "cassandra:/var/lib/cassandra" + restart: on-failure:100 + + pulsar: + image: docker.io/apachepulsar/pulsar:3.3.0 + command: bin/pulsar standalone + ports: + - "6650:6650" + - "8080:8080" + volumes: + - "pulsar-conf:/pulsar/conf" + - "pulsar-data:/pulsar/data" + restart: on-failure:100 + + pulsar-manager: + image: docker.io/apachepulsar/pulsar-manager:v0.3.0 + ports: + - "9527:9527" + - "7750:7750" + environment: + SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties + restart: on-failure:100 + + etcd: + image: quay.io/coreos/etcd:v3.5.5 + command: + - "etcd" + - "-advertise-client-urls=http://127.0.0.1:2379" + - "-listen-client-urls" + - "http://0.0.0.0:2379" + - "--data-dir" + - "/etcd" + environment: + ETCD_AUTO_COMPACTION_MODE: revision + ETCD_AUTO_COMPACTION_RETENTION: "1000" + ETCD_QUOTA_BACKEND_BYTES: "4294967296" + ETCD_SNAPSHOT_COUNT: "50000" + ports: + - "2379:2379" + volumes: + - "etcd:/etcd" + restart: on-failure:100 + + minio: + image: docker.io/minio/minio:RELEASE.2024-07-04T14-25-45Z + command: + - "minio" + - "server" + - "/minio_data" + - "--console-address" + - ":9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + ports: + - "9001:9001" + volumes: + - "minio-data:/minio_data" + restart: on-failure:100 + + milvus: + image: docker.io/milvusdb/milvus:v2.4.5 + command: + - "milvus" + - "run" + - "standalone" + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + ports: + - "9091:9091" + - "19530:19530" + volumes: + - "milvus:/var/lib/milvus" + restart: on-failure:100 + + pdf-decoder: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "pdf-decoder" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + chunker: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "chunker-recursive" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + vectorize: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "embeddings-vectorize" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + embeddings: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "embeddings-hf" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + kg-extract-definitions: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "kg-extract-definitions" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + kg-extract-relationships: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "kg-extract-relationships" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + vector-write: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "vector-write-milvus" + - "-p" + - "pulsar://pulsar:6650" + - "-t" + - "http://milvus:19530" + restart: on-failure:100 + + graph-write: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "graph-write-cassandra" + - "-p" + - "pulsar://pulsar:6650" + - "-g" + - "cassandra" + restart: on-failure:100 + + llm: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "llm-ollama-text" + - "-p" + - "pulsar://pulsar:6650" + - "-r" + - "http://${OLLAMA_HOST}:11434/" + restart: on-failure:100 + + graph-rag: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "graph-rag" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + diff --git a/docker-compose-vertexai.yaml b/docker-compose-vertexai.yaml new file mode 100644 index 00000000..e7e14be2 --- /dev/null +++ b/docker-compose-vertexai.yaml @@ -0,0 +1,174 @@ + +volumes: + cassandra: + pulsar-conf: + pulsar-data: + etcd: + minio-data: + milvus: + +services: + + cassandra: + image: docker.io/cassandra:4.1.5 + ports: + - "9042:9042" + volumes: + - "cassandra:/var/lib/cassandra" + restart: on-failure:100 + + pulsar: + image: docker.io/apachepulsar/pulsar:3.3.0 + command: bin/pulsar standalone + ports: + - "6650:6650" + - "8080:8080" + volumes: + - "pulsar-conf:/pulsar/conf" + - "pulsar-data:/pulsar/data" + restart: on-failure:100 + + pulsar-manager: + image: docker.io/apachepulsar/pulsar-manager:v0.3.0 + ports: + - "9527:9527" + - "7750:7750" + environment: + SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties + restart: on-failure:100 + + etcd: + image: quay.io/coreos/etcd:v3.5.5 + command: + - "etcd" + - "-advertise-client-urls=http://127.0.0.1:2379" + - "-listen-client-urls" + - "http://0.0.0.0:2379" + - "--data-dir" + - "/etcd" + environment: + ETCD_AUTO_COMPACTION_MODE: revision + ETCD_AUTO_COMPACTION_RETENTION: "1000" + ETCD_QUOTA_BACKEND_BYTES: "4294967296" + ETCD_SNAPSHOT_COUNT: "50000" + ports: + - "2379:2379" + volumes: + - "etcd:/etcd" + restart: on-failure:100 + + minio: + image: docker.io/minio/minio:RELEASE.2024-07-04T14-25-45Z + command: + - "minio" + - "server" + - "/minio_data" + - "--console-address" + - ":9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + ports: + - "9001:9001" + volumes: + - "minio-data:/minio_data" + restart: on-failure:100 + + milvus: + image: docker.io/milvusdb/milvus:v2.4.5 + command: + - "milvus" + - "run" + - "standalone" + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + ports: + - "9091:9091" + - "19530:19530" + volumes: + - "milvus:/var/lib/milvus" + restart: on-failure:100 + + pdf-decoder: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "pdf-decoder" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + chunker: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "chunker-recursive" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + vectorize: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "embeddings-vectorize" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + embeddings: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "embeddings-hf" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + kg-extract-definitions: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "kg-extract-definitions" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + kg-extract-relationships: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "kg-extract-relationships" + - "-p" + - "pulsar://pulsar:6650" + restart: on-failure:100 + + vector-write: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "vector-write-milvus" + - "-p" + - "pulsar://pulsar:6650" + - "-t" + - "http://milvus:19530" + restart: on-failure:100 + + graph-write: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "graph-write-cassandra" + - "-p" + - "pulsar://pulsar:6650" + - "-g" + - "cassandra" + restart: on-failure:100 + + llm: + image: docker.io/trustgraph/trustgraph-flow:0.1.16 + command: + - "llm-vertexai-text" + - "-p" + - "pulsar://pulsar:6650" + - "-k" + - "/vertexai/private.json" + - "-r" + - "us-west1" + volumes: + - "./vertexai:/vertexai" + restart: on-failure:100 + diff --git a/graph-clear b/graph-clear new file mode 100755 index 00000000..9633a08f --- /dev/null +++ b/graph-clear @@ -0,0 +1,8 @@ +#!/usr/bin/env python3 + +from trustgraph import TrustGraph + +t = TrustGraph() + +t.clear() + diff --git a/graph-dump b/graph-dump new file mode 100755 index 00000000..e31a1c3b --- /dev/null +++ b/graph-dump @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 + +import pulsar +from pulsar.schema import JsonSchema, Bytes +from schema import Chunk, Triple +from langchain_huggingface import HuggingFaceEmbeddings +from langchain_community.llms import Ollama +from trustgraphETL import scholar, callmixtral, build_graph_robust +import sys +import rdflib +import uuid + +g = rdflib.Graph() + +client = pulsar.Client("pulsar://localhost:6650") + +consumer = client.subscribe( + 'graph-load', 'graph-dump', + schema=JsonSchema(Triple), +) + +g = rdflib.Graph() +count = 0 +limit = 100 + +while True: + + msg = consumer.receive() + + try: + + v = msg.value() + + if v.o.is_uri: + g.add(( + rdflib.term.URIRef(v.s.value), + rdflib.term.URIRef(v.p.value), + rdflib.term.URIRef(v.o.value), + )) + else: + g.add(( + rdflib.term.URIRef(v.s.value), + rdflib.term.URIRef(v.p.value), + rdflib.term.Literal(v.o.value), + )) + + count += 1 + + if count > limit: + + id = str(uuid.uuid4()) + path = f"graph/{id}.ttl" + g.serialize(destination=path) + g = rdflib.Graph() + print(f"Written {path}") + + count = 0 + + # Acknowledge successful processing of the message + consumer.acknowledge(msg) + + except Exception as e: + + print(e) + + # Message failed to be processed + consumer.negative_acknowledge(msg) + +client.close() + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..7bfcf3ba --- /dev/null +++ b/requirements.txt @@ -0,0 +1,16 @@ +torch +urllib3 +transformers +sentence-transformers +rdflib +pymilvus +langchain +langchain-core +langchain-huggingface +langchain-text-splitters +langchain-community +huggingface-hub +cassandra-driver +pulsar-client +anthropic +google-cloud-aiplatform diff --git a/scripts/chunker-recursive b/scripts/chunker-recursive new file mode 100755 index 00000000..2356903d --- /dev/null +++ b/scripts/chunker-recursive @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.chunker.recursive import run + +run() + diff --git a/scripts/embeddings-hf b/scripts/embeddings-hf new file mode 100755 index 00000000..a7d84d04 --- /dev/null +++ b/scripts/embeddings-hf @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.embeddings.hf import run + +run() + diff --git a/scripts/embeddings-vectorize b/scripts/embeddings-vectorize new file mode 100755 index 00000000..3de1e3a9 --- /dev/null +++ b/scripts/embeddings-vectorize @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.embeddings.vectorize import run + +run() + diff --git a/scripts/graph-rag b/scripts/graph-rag new file mode 100755 index 00000000..a6dab1f3 --- /dev/null +++ b/scripts/graph-rag @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.rag.graph import run + +run() + diff --git a/scripts/graph-show b/scripts/graph-show new file mode 100755 index 00000000..26e6cbff --- /dev/null +++ b/scripts/graph-show @@ -0,0 +1,10 @@ +#!/usr/bin/env python3 + +from trustgraph.trustgraph import TrustGraph + +t = TrustGraph() + +rows = t.get_all(limit=100_000_000) +for s, p, o in rows: + print(s, p, o) + diff --git a/scripts/graph-to-turtle b/scripts/graph-to-turtle new file mode 100755 index 00000000..1bd48802 --- /dev/null +++ b/scripts/graph-to-turtle @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 + +from trustgraph.trustgraph import TrustGraph +import rdflib +import sys +import io + +t = TrustGraph() + +g = rdflib.Graph() + +rows = t.get_all(limit=100_000_000) +for s, p, o in rows: + +# print(s, p, o) + sv = rdflib.term.URIRef(s) + pv = rdflib.term.URIRef(p) + + if o.startswith("https://") or o.startswith("http://"): + + # Skip malformed URLs with spaces in + if " " in o: + continue + + ov = rdflib.term.URIRef(o) + else: + ov = rdflib.term.Literal(o) + + g.add((sv, pv, ov)) + +g.serialize(destination="output.ttl", format="turtle") + +buf = io.BytesIO() + +g.serialize(destination=buf, format="turtle") + +sys.stdout.write(buf.getvalue().decode("utf-8")) diff --git a/scripts/graph-write-cassandra b/scripts/graph-write-cassandra new file mode 100755 index 00000000..7fc3d0c8 --- /dev/null +++ b/scripts/graph-write-cassandra @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.graph.cassandra_write import run + +run() + diff --git a/scripts/init-pulsar-manager b/scripts/init-pulsar-manager new file mode 100755 index 00000000..6e855ffb --- /dev/null +++ b/scripts/init-pulsar-manager @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token) + +curl \ + -H "X-XSRF-TOKEN: $CSRF_TOKEN" \ + -H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \ + -H 'Content-Type: application/json' \ + -X PUT \ + http://localhost:7750/pulsar-manager/users/superuser \ + -d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}' diff --git a/scripts/kg-extract-definitions b/scripts/kg-extract-definitions new file mode 100755 index 00000000..327ec06f --- /dev/null +++ b/scripts/kg-extract-definitions @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.kg.extract_definitions import run + +run() + diff --git a/scripts/kg-extract-relationships b/scripts/kg-extract-relationships new file mode 100755 index 00000000..91040589 --- /dev/null +++ b/scripts/kg-extract-relationships @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.kg.extract_relationships import run + +run() + diff --git a/scripts/llm-azure-text b/scripts/llm-azure-text new file mode 100755 index 00000000..cdaea4b8 --- /dev/null +++ b/scripts/llm-azure-text @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.llm.azure_text import run + +run() + diff --git a/scripts/llm-claude-text b/scripts/llm-claude-text new file mode 100755 index 00000000..496d1440 --- /dev/null +++ b/scripts/llm-claude-text @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.llm.claude_text import run + +run() + diff --git a/scripts/llm-ollama-text b/scripts/llm-ollama-text new file mode 100755 index 00000000..cb7a4ebc --- /dev/null +++ b/scripts/llm-ollama-text @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.llm.ollama_text import run + +run() + diff --git a/scripts/llm-vertexai-text b/scripts/llm-vertexai-text new file mode 100755 index 00000000..4634015f --- /dev/null +++ b/scripts/llm-vertexai-text @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.llm.vertexai_text import run + +run() + diff --git a/scripts/loader b/scripts/loader new file mode 100755 index 00000000..a6dc4450 --- /dev/null +++ b/scripts/loader @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +import pulsar +from pulsar.schema import JsonSchema, Bytes, String +from trustgraph.schema import Document, Source +import base64 +import hashlib + +# client = pulsar.Client("pulsar://localhost:6650") +host="10.89.1.246" +host="localhost" +client = pulsar.Client(f"pulsar://{host}:6650") + +producer = client.create_producer( + topic='document-load', + schema=JsonSchema(Document), + chunking_enabled=True, +) + +files=[ + "Challenger-Report-Vol1.pdf", +# "columbia-accident-investigation-board-report-volume-1.pdf", +# "Proposed_CIRCIA_Rules.pdf", +] + +for file in files: + + path = "sources/" + file + data = open(path, "rb").read() + + id = hashlib.sha256(path.encode("utf-8")).hexdigest()[0:8] + + r = Document( + source=Source( + source=path, + title=path, + id=id, + ), + data=base64.b64encode(data), + ) + + resp = producer.send(r) + + print(resp) + +client.close() + diff --git a/scripts/pdf-decoder b/scripts/pdf-decoder new file mode 100755 index 00000000..82b89298 --- /dev/null +++ b/scripts/pdf-decoder @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.decoder.pdf import run + +run() + diff --git a/scripts/query b/scripts/query new file mode 100755 index 00000000..5cb5c0c6 --- /dev/null +++ b/scripts/query @@ -0,0 +1,16 @@ +#!/usr/bin/env python3 + +from trustgraph.graph_rag import GraphRag +import sys + +query = " ".join(sys.argv[1:]) + +gr = GraphRag(verbose=True) + +if query == "": + query="""This knowledge graph describes the Space Shuttle disaster. +Present 20 facts which are present in the knowledge graph.""" + +resp = gr.query(query) +print(resp) + diff --git a/scripts/vector-write-milvus b/scripts/vector-write-milvus new file mode 100755 index 00000000..952e22cf --- /dev/null +++ b/scripts/vector-write-milvus @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.vector.milvus_write import run + +run() + diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..2ad4db47 --- /dev/null +++ b/setup.py @@ -0,0 +1,65 @@ +import setuptools +import os + +with open("README.md", "r") as fh: + long_description = fh.read() + +version = "0.0.0" + +setuptools.setup( + name="trustgraph", + version=version, + author="trustgraph.ai", + author_email="security@trustgraph.ai", + description="trustgraph.ai", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/trustgraph.ai/FIXME.git", + packages=setuptools.find_packages(), + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", + "Operating System :: OS Independent", + ], + python_requires='>=3.8', + download_url = "https://github.com/trustgraph.ai/FIXME.git/archive/refs/tags/v" + version + ".tar.gz", + install_requires=[ + "torch", + "urllib3", + "transformers", + "sentence-transformers", + "rdflib", + "pymilvus", + "langchain", + "langchain-core", + "langchain-huggingface", + "langchain-text-splitters", + "langchain-community", + "huggingface-hub", + "requests", + "cassandra-driver", + "pulsar-client", + "pypdf", + "anthropic", + "google-cloud-aiplatform", + ], + scripts=[ + "scripts/chunker-recursive", + "scripts/graph-show", + "scripts/graph-to-turtle", + "scripts/graph-write-cassandra", + "scripts/kg-extract-definitions", + "scripts/kg-extract-relationships", + "scripts/llm-ollama-text", + "scripts/llm-vertexai-text", + "scripts/llm-claude-text", + "scripts/llm-azure-text", + "scripts/loader", + "scripts/pdf-decoder", + "scripts/query", + "scripts/embeddings-vectorize", + "scripts/embeddings-hf", + "scripts/vector-write-milvus", + "scripts/graph-rag", + ] +) diff --git a/tests/test-embeddings b/tests/test-embeddings new file mode 100755 index 00000000..e2bcdbde --- /dev/null +++ b/tests/test-embeddings @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 + +import pulsar +from trustgraph.embeddings_client import EmbeddingsClient + +embed = EmbeddingsClient(pulsar_host="pulsar://localhost:6650") + +prompt="Write a funny limerick about a llama" + +resp = embed.request(prompt) + +print(resp) + + + diff --git a/tests/test-graph-rag b/tests/test-graph-rag new file mode 100755 index 00000000..c6abfe05 --- /dev/null +++ b/tests/test-graph-rag @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 + +import pulsar +from trustgraph.graph_rag_client import GraphRagClient + +rag = GraphRagClient(pulsar_host="pulsar://localhost:6650") + +query="""This knowledge graph describes the Space Shuttle disaster. +Present 20 facts which are present in the knowledge graph.""" + +resp = rag.request(query) + +print(resp) + diff --git a/tests/test-llm b/tests/test-llm new file mode 100755 index 00000000..35177e81 --- /dev/null +++ b/tests/test-llm @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 + +import pulsar +from trustgraph.llm_client import LlmClient + +llm = LlmClient(pulsar_host="pulsar://localhost:6650") + +prompt="Write a funny limerick about a llama" + +resp = llm.request(prompt) + +print(resp) + +llm.close() + diff --git a/tests/test-milvus b/tests/test-milvus new file mode 100755 index 00000000..e95955dc --- /dev/null +++ b/tests/test-milvus @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 + +from langchain_huggingface import HuggingFaceEmbeddings + +from edge_map import VectorStore + +client = VectorStore() + +embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") + +text="""A cat is a small animal. A dog is a large animal. +Cats say miaow. Dogs go woof. +""" + +embeds = embeddings.embed_documents([text])[0] + +text2="""If you couldn't download the model due to network issues, as a walkaround, you can use random vectors to represent the text and still finish the example. Just note that the search result won't reflect semantic similarity as the vectors are fake ones. +""" + +embeds2 = embeddings.embed_documents([text2])[0] + +client.insert(embeds, "animals") +client.insert(embeds, "vectors") + +query="""What noise does a cat make?""" + +qembeds = embeddings.embed_documents([query])[0] + +res = client.search( + qembeds, + limit=2 +) + +print(res) + diff --git a/trustgraph/__init__.py b/trustgraph/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/chunker/__init__.py b/trustgraph/chunker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/chunker/recursive/__init__.py b/trustgraph/chunker/recursive/__init__.py new file mode 100644 index 00000000..3b816664 --- /dev/null +++ b/trustgraph/chunker/recursive/__init__.py @@ -0,0 +1,3 @@ + +from . chunker import * + diff --git a/trustgraph/chunker/recursive/__main__.py b/trustgraph/chunker/recursive/__main__.py new file mode 100644 index 00000000..18e14ad5 --- /dev/null +++ b/trustgraph/chunker/recursive/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . chunker import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/chunker/recursive/chunker.py b/trustgraph/chunker/recursive/chunker.py new file mode 100755 index 00000000..ba5eb939 --- /dev/null +++ b/trustgraph/chunker/recursive/chunker.py @@ -0,0 +1,164 @@ + +""" +Simple decoder, accepts text documents on input, outputs chunks from the +as text as separate output objects. +""" + +import pulsar +from pulsar.schema import JsonSchema +import tempfile +import base64 +import os +import argparse +from langchain_text_splitters import RecursiveCharacterTextSplitter +import time + +from ... schema import TextDocument, Chunk, Source +from ... log_level import LogLevel + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + output_queue, + subscriber, + log_level, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(TextDocument), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(Chunk), + ) + + self.text_splitter = RecursiveCharacterTextSplitter( + chunk_size=1000, + chunk_overlap=20, + length_function=len, + is_separator_regex=False, + ) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + print(f"Chunking {v.source.id}...", flush=True) + + texts = self.text_splitter.create_documents( + [v.text.decode("utf-8")] + ) + + for ix, chunk in enumerate(texts): + + id = v.source.id + "-c" + str(ix) + + r = Chunk( + source=Source( + source=v.source.source, + id=id, + title=v.source.title + ), + chunk=chunk.page_content.encode("utf-8"), + ) + + self.producer.send(r) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + print("Done.", flush=True) + + except Exception as e: + print(e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='pdf-decoder', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'text-doc-load' + default_output_queue = 'chunk-load' + default_subscriber = 'chunker-recursive' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + args = parser.parse_args() + + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + output_queue=args.output_queue, + subscriber=args.subscriber, + log_level=args.log_level, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + + diff --git a/trustgraph/decoder/__init__.py b/trustgraph/decoder/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/decoder/pdf/__init__.py b/trustgraph/decoder/pdf/__init__.py new file mode 100644 index 00000000..0d8d9c78 --- /dev/null +++ b/trustgraph/decoder/pdf/__init__.py @@ -0,0 +1,3 @@ + +from . pdf_decoder import * + diff --git a/trustgraph/decoder/pdf/__main__.py b/trustgraph/decoder/pdf/__main__.py new file mode 100755 index 00000000..44dd026d --- /dev/null +++ b/trustgraph/decoder/pdf/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . pdf_decoder import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/decoder/pdf/pdf_decoder.py b/trustgraph/decoder/pdf/pdf_decoder.py new file mode 100755 index 00000000..f892ebac --- /dev/null +++ b/trustgraph/decoder/pdf/pdf_decoder.py @@ -0,0 +1,159 @@ + +""" +Simple decoder, accepts PDF documents on input, outputs pages from the +PDF document as text as separate output objects. +""" + +import pulsar +from pulsar.schema import JsonSchema +from langchain_community.document_loaders import PyPDFLoader +import tempfile +import base64 +import os +import argparse +import time + +from ... schema import Document, TextDocument, Source +from ... log_level import LogLevel + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + output_queue, + subscriber, + log_level, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(Document), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(TextDocument), + ) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + print(f"Decoding {v.source.id}...", flush=True) + + with tempfile.NamedTemporaryFile(delete_on_close=False) as fp: + + fp.write(base64.b64decode(v.data)) + fp.close() + + with open(fp.name, mode='rb') as f: + + loader = PyPDFLoader(fp.name) + pages = loader.load() + + for ix, page in enumerate(pages): + + id = v.source.id + "-p" + str(ix) + r = TextDocument( + source=Source( + source=v.source.source, + title=v.source.title, + id=id, + ), + text=page.page_content.encode("utf-8"), + ) + + self.producer.send(r) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + print("Done.", flush=True) + + except Exception as e: + print(e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='pdf-decoder', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'document-load' + default_output_queue = 'text-doc-load' + default_subscriber = 'pdf-decoder' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + args = parser.parse_args() + + while True: + + try: + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + output_queue=args.output_queue, + subscriber=args.subscriber, + log_level=args.log_level, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + diff --git a/trustgraph/edge_map.py b/trustgraph/edge_map.py new file mode 100644 index 00000000..55d9077f --- /dev/null +++ b/trustgraph/edge_map.py @@ -0,0 +1,102 @@ + +from pymilvus import MilvusClient, CollectionSchema, FieldSchema, DataType + +class VectorStore: + + def __init__(self, uri="http://localhost:19530"): + + self.client = MilvusClient(uri=uri) + + self.collection = "edges" + self.dimension = 384 + + if not self.client.has_collection(collection_name=self.collection): + self.init_collection() + + def init_collection(self): + + pkey_field = FieldSchema( + name="id", + dtype=DataType.INT64, + is_primary=True, + auto_id=True, + ) + + vec_field = FieldSchema( + name="vector", + dtype=DataType.FLOAT_VECTOR, + dim=self.dimension, + ) + + entity_field = FieldSchema( + name="entity", + dtype=DataType.VARCHAR, + max_length=65535, + ) + + schema = CollectionSchema( + fields = [pkey_field, vec_field, entity_field], + description = "Edge map schema", + ) + + self.client.create_collection( + collection_name=self.collection, + schema=schema, + metric_type="IP", + ) + + index_params = MilvusClient.prepare_index_params() + + index_params.add_index( + field_name="vector", + metric_type="COSINE", + index_type="FLAT", # IVF_FLAT?! + index_name="vector_index", + params={ "nlist": 128 } + ) + + self.client.create_index( + collection_name=self.collection, + index_params=index_params + ) + + def insert(self, embeds, entity): + + data = [ + { + "vector": embeds, + "entity": entity, + } + ] + + self.client.insert(collection_name=self.collection, data=data) + + def search(self, embeds, fields=["entity"], limit=10): + + search_params = { + "metric_type": "COSINE", + "params": { + "radius": 0.1, + "range_filter": 0.8 + } + } + + self.client.load_collection( + collection_name=self.collection, +# replica_number=1 + ) + + res = self.client.search( + collection_name=self.collection, + data=[embeds], + limit=limit, + output_fields=fields, + search_params=search_params, + )[0] + + self.client.release_collection( + collection_name=self.collection, + ) + + return res + diff --git a/trustgraph/embeddings/__init__.py b/trustgraph/embeddings/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/embeddings/hf/__init__.py b/trustgraph/embeddings/hf/__init__.py new file mode 100644 index 00000000..7ccb0b37 --- /dev/null +++ b/trustgraph/embeddings/hf/__init__.py @@ -0,0 +1,3 @@ + +from . hf import * + diff --git a/trustgraph/embeddings/hf/__main__.py b/trustgraph/embeddings/hf/__main__.py new file mode 100755 index 00000000..89684e3e --- /dev/null +++ b/trustgraph/embeddings/hf/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . hf import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/embeddings/hf/hf.py b/trustgraph/embeddings/hf/hf.py new file mode 100755 index 00000000..33890483 --- /dev/null +++ b/trustgraph/embeddings/hf/hf.py @@ -0,0 +1,161 @@ + +""" +Simple LLM service, performs text prompt completion using an Ollama service. +Input is prompt, output is response. +""" + +import pulsar +from pulsar.schema import JsonSchema +import tempfile +import base64 +import os +import argparse +from langchain_huggingface import HuggingFaceEmbeddings +import time + +from ... schema import EmbeddingsRequest, EmbeddingsResponse +from ... log_level import LogLevel + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + output_queue, + subscriber, + log_level, + model, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(EmbeddingsRequest), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(EmbeddingsResponse), + ) + + self.embeddings = HuggingFaceEmbeddings(model_name=model) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + + # Sender-produced ID + + id = msg.properties()["id"] + + print(f"Handling input {id}...", flush=True) + + text = v.text + embeds = self.embeddings.embed_documents([text]) + + print("Send response...", flush=True) + r = EmbeddingsResponse(vectors=embeds) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + print("Closing", flush=True) + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='llm-ollama-text', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'embeddings' + default_output_queue = 'embeddings-response' + default_subscriber = 'embeddings-hf' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-m', '--model', + default="all-MiniLM-L6-v2", + help=f'LLM model (default: all-MiniLM-L6-v2)' + ) + + args = parser.parse_args() + + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + output_queue=args.output_queue, + subscriber=args.subscriber, + log_level=args.log_level, + model=args.model, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + diff --git a/trustgraph/embeddings/vectorize/__init__.py b/trustgraph/embeddings/vectorize/__init__.py new file mode 100644 index 00000000..31596b8c --- /dev/null +++ b/trustgraph/embeddings/vectorize/__init__.py @@ -0,0 +1,3 @@ + +from . vectorize import * + diff --git a/trustgraph/embeddings/vectorize/__main__.py b/trustgraph/embeddings/vectorize/__main__.py new file mode 100755 index 00000000..a578de8a --- /dev/null +++ b/trustgraph/embeddings/vectorize/__main__.py @@ -0,0 +1,6 @@ + +from . vectorize import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/embeddings/vectorize/vectorize.py b/trustgraph/embeddings/vectorize/vectorize.py new file mode 100755 index 00000000..b362470d --- /dev/null +++ b/trustgraph/embeddings/vectorize/vectorize.py @@ -0,0 +1,167 @@ + +""" +Vectorizer, applies an embedding algorithm to a chunk. Input is a chunk, +output is chunk and vectors. +""" + +import pulsar +from pulsar.schema import JsonSchema +import tempfile +import base64 +import os +import argparse +import time + +from ... schema import Chunk, VectorsChunk +from ... embeddings_client import EmbeddingsClient +from ... log_level import LogLevel + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + output_queue, + subscriber, + log_level, + model, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(Chunk), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(VectorsChunk), + ) + + self.embeddings = EmbeddingsClient(pulsar_host=pulsar_host) + + def emit(self, source, chunk, vectors): + + r = VectorsChunk(source=source, chunk=chunk, vectors=vectors) + self.producer.send(r) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + print(f"Indexing {v.source.id}...", flush=True) + + chunk = v.chunk.decode("utf-8") + + try: + + vectors = self.embeddings.request(chunk) + + self.emit( + source=v.source, + chunk=chunk.encode("utf-8"), + vectors=vectors + ) + + except Exception as e: + print("Exception:", e, flush=True) + + print("Done.", flush=True) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='embeddings-vectorizer', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'chunk-load' + default_output_queue = 'vectors-chunk-load' + default_subscriber = 'embeddings-vectorizer' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-m', '--model', + default="all-MiniLM-L6-v2", + help=f'LLM model (default: all-MiniLM-L6-v2)' + ) + + args = parser.parse_args() + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + output_queue=args.output_queue, + subscriber=args.subscriber, + log_level=args.log_level, + model=args.model, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + diff --git a/trustgraph/embeddings_client.py b/trustgraph/embeddings_client.py new file mode 100644 index 00000000..e464e02f --- /dev/null +++ b/trustgraph/embeddings_client.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 + +import pulsar +import _pulsar +from pulsar.schema import JsonSchema +from trustgraph.schema import EmbeddingsRequest, EmbeddingsResponse +import hashlib +import uuid + +# Ugly +ERROR=_pulsar.LoggerLevel.Error +WARN=_pulsar.LoggerLevel.Warn +INFO=_pulsar.LoggerLevel.Info +DEBUG=_pulsar.LoggerLevel.Debug + +class EmbeddingsClient: + + def __init__( + self, log_level=ERROR, client_id=None, + pulsar_host="pulsar://pulsar:6650", + ): + + if client_id == None: + client_id = str(uuid.uuid4()) + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level), + ) + + self.producer = self.client.create_producer( + topic='embeddings', + schema=JsonSchema(EmbeddingsRequest), + chunking_enabled=True, + ) + + self.consumer = self.client.subscribe( + 'embeddings-response', client_id, + schema=JsonSchema(EmbeddingsResponse), + ) + + def request(self, text, timeout=500): + + id = str(uuid.uuid4()) + + r = EmbeddingsRequest( + text=text + ) + self.producer.send(r, properties={ "id": id }) + + while True: + + msg = self.consumer.receive(timeout_millis=timeout * 1000) + + mid = msg.properties()["id"] + + if mid == id: + resp = msg.value().vectors + self.consumer.acknowledge(msg) + return resp + + # Ignore messages with wrong ID + self.consumer.acknowledge(msg) + + def __del__(self): + + self.producer.close() + self.consumer.close() + self.client.close() + diff --git a/trustgraph/graph/__init__.py b/trustgraph/graph/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/graph/cassandra_write/__init__.py b/trustgraph/graph/cassandra_write/__init__.py new file mode 100644 index 00000000..d891d55f --- /dev/null +++ b/trustgraph/graph/cassandra_write/__init__.py @@ -0,0 +1,3 @@ + +from . write import * + diff --git a/trustgraph/graph/cassandra_write/__main__.py b/trustgraph/graph/cassandra_write/__main__.py new file mode 100755 index 00000000..c05d8c6d --- /dev/null +++ b/trustgraph/graph/cassandra_write/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . write import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/graph/cassandra_write/write.py b/trustgraph/graph/cassandra_write/write.py new file mode 100755 index 00000000..1b56b5ab --- /dev/null +++ b/trustgraph/graph/cassandra_write/write.py @@ -0,0 +1,144 @@ + +""" +Simple decoder, accepts PDF documents on input, outputs pages from the +PDF document as text as separate output objects. +""" + +import pulsar +from pulsar.schema import JsonSchema +import tempfile +import base64 +import os +import argparse +import time + +from ... trustgraph import TrustGraph +from ... schema import Triple +from ... log_level import LogLevel + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + subscriber, + log_level, + graph_host, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(Triple), + ) + + self.tg = TrustGraph([graph_host]) + + self.count = 0 + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + + self.tg.insert( + v.s.value, + v.p.value, + v.o.value + ) + + self.count += 1 + + if (self.count % 1000) == 0: + print(self.count, "...", flush=True) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='graph-write-cassandra', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'graph-load' + default_subscriber = 'graph-write-cassandra' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-g', '--graph-host', + default="localhost", + help=f'Output queue (default: localhost)' + ) + + args = parser.parse_args() + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + subscriber=args.subscriber, + log_level=args.log_level, + graph_host=args.graph_host, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + + diff --git a/trustgraph/graph_rag.py b/trustgraph/graph_rag.py new file mode 100644 index 00000000..4175698e --- /dev/null +++ b/trustgraph/graph_rag.py @@ -0,0 +1,227 @@ + +from trustgraph.trustgraph import TrustGraph +from trustgraph.edge_map import VectorStore +from trustgraph.trustgraph import TrustGraph +from trustgraph.llm_client import LlmClient +from trustgraph.embeddings_client import EmbeddingsClient + +LABEL="http://www.w3.org/2000/01/rdf-schema#label" +DEFINITION="http://www.w3.org/2004/02/skos/core#definition" + +class GraphRag: + + def __init__( + self, + graph_hosts=None, + pulsar_host="pulsar://pulsar:6650", + vector_store="http://milvus:19530", + verbose=False + ): + + self.verbose=verbose + + if graph_hosts == None: + graph_hosts = ["cassandra"] + + if self.verbose: + print("Initialising...", flush=True) + + self.graph = TrustGraph(graph_hosts) + + self.embeddings = EmbeddingsClient(pulsar_host=pulsar_host) + + self.vecstore = VectorStore(vector_store) + + self.entity_limit=50 + self.query_limit=30 + self.max_sg_size=3000 + + self.label_cache = {} + + self.llm = LlmClient(pulsar_host=pulsar_host) + + if self.verbose: + print("Initialised", flush=True) + + def get_vector(self, query): + + if self.verbose: + print("Compute embeddings...", flush=True) + + qembeds = self.embeddings.request(query) + + if self.verbose: + print("Done.", flush=True) + + return qembeds + + def get_entities(self, query): + + everything = [] + + vectors = self.get_vector(query) + + if self.verbose: + print("Get entities...", flush=True) + + for vector in vectors: + + res = self.vecstore.search( + vector, + limit=self.entity_limit + ) + + entities = set([ + item["entity"]["entity"] + for item in res + ]) + + everything.extend(entities) + + if self.verbose: + print("Entities:", flush=True) + for ent in everything: + print(" ", ent, flush=True) + + return everything + + def maybe_label(self, e): + + if e in self.label_cache: + return self.label_cache[e] + + res = self.graph.get_sp(e, LABEL) + res = list(res) + + if len(res) == 0: + self.label_cache[e] = e + return e + + self.label_cache[e] = res[0][0] + return self.label_cache[e] + + def get_nodes(self, query): + + ents = self.get_entities(query) + + if self.verbose: + print("Get labels...", flush=True) + + nodes = [ + self.maybe_label(e) + for e in ents + ] + + if self.verbose: + print("Nodes:", flush=True) + for node in nodes: + print(" ", node, flush=True) + + return nodes + + def get_subgraph(self, query): + + entities = self.get_entities(query) + + subgraph = set() + + if self.verbose: + print("Get subgraph...", flush=True) + + for e in entities: + + res = self.graph.get_s(e, limit=self.query_limit) + for p, o in res: + subgraph.add((e, p, o)) + + res = self.graph.get_p(e, limit=self.query_limit) + for s, o in res: + subgraph.add((s, e, o)) + + res = self.graph.get_o(e, limit=self.query_limit) + for s, p in res: + subgraph.add((s, p, e)) + + subgraph = list(subgraph) + + subgraph = subgraph[0:self.max_sg_size] + + if self.verbose: + print("Subgraph:", flush=True) + for edge in subgraph: + print(" ", str(edge), flush=True) + + if self.verbose: + print("Done.", flush=True) + + return subgraph + + def get_labelgraph(self, query): + + subgraph = self.get_subgraph(query) + + sg2 = [] + + for edge in subgraph: + + if edge[1] == LABEL: + continue + + s = self.maybe_label(edge[0]) + p = self.maybe_label(edge[1]) + o = self.maybe_label(edge[2]) + + sg2.append((s, p, o)) + + return sg2 + + def get_cypher(self, query): + + sg = self.get_labelgraph(query) + + sg2 = [] + + for s, p, o in sg: + + sg2.append(f"({s})-[{p}]->({o})") + + kg = "\n".join(sg2) + kg = kg.replace("\\", "-") + + return kg + + def get_graph_prompt(self, query): + + kg = self.get_cypher(query) + + prompt=f"""Study the knowledge graph provided, and use +the information to answer the question. The question should be answered +in plain English only. + + +{kg} + + +{query} + +""" + + return prompt + + def query(self, query): + + if self.verbose: + print("Construct prompt...", flush=True) + + prompt = self.get_graph_prompt(query) + + if self.verbose: + print("Invoke LLM...", flush=True) + + resp = self.llm.request(prompt) + + if self.verbose: + print("Done", flush=True) + + return resp + diff --git a/trustgraph/graph_rag_client.py b/trustgraph/graph_rag_client.py new file mode 100644 index 00000000..6f48e772 --- /dev/null +++ b/trustgraph/graph_rag_client.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 + +import pulsar +import _pulsar +from pulsar.schema import JsonSchema +from trustgraph.schema import GraphRagQuery, GraphRagResponse +import hashlib +import uuid + +# Ugly +ERROR=_pulsar.LoggerLevel.Error +WARN=_pulsar.LoggerLevel.Warn +INFO=_pulsar.LoggerLevel.Info +DEBUG=_pulsar.LoggerLevel.Debug + +class GraphRagClient: + + def __init__( + self, log_level=ERROR, client_id=None, + pulsar_host="pulsar://pulsar:6650", + ): + + if client_id == None: + client_id = str(uuid.uuid4()) + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level), + ) + + self.producer = self.client.create_producer( + topic='graph-rag-query', + schema=JsonSchema(GraphRagQuery), + chunking_enabled=True, + ) + + self.consumer = self.client.subscribe( + 'graph-rag-response', client_id, + schema=JsonSchema(GraphRagResponse), + ) + + def request(self, query, timeout=500): + + id = str(uuid.uuid4()) + + r = GraphRagQuery( + query=query + ) + self.producer.send(r, properties={ "id": id }) + + while True: + + msg = self.consumer.receive(timeout_millis=timeout * 1000) + + mid = msg.properties()["id"] + + if mid == id: + resp = msg.value().response + self.consumer.acknowledge(msg) + return resp + + # Ignore messages with wrong ID + self.consumer.acknowledge(msg) + + def __del__(self): + + self.client.close() + diff --git a/trustgraph/kg/__init__.py b/trustgraph/kg/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/kg/extract_definitions/__init__.py b/trustgraph/kg/extract_definitions/__init__.py new file mode 100644 index 00000000..81287a3c --- /dev/null +++ b/trustgraph/kg/extract_definitions/__init__.py @@ -0,0 +1,3 @@ + +from . extract import * + diff --git a/trustgraph/kg/extract_definitions/__main__.py b/trustgraph/kg/extract_definitions/__main__.py new file mode 100755 index 00000000..403fe672 --- /dev/null +++ b/trustgraph/kg/extract_definitions/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . extract import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/kg/extract_definitions/extract.py b/trustgraph/kg/extract_definitions/extract.py new file mode 100755 index 00000000..369aef59 --- /dev/null +++ b/trustgraph/kg/extract_definitions/extract.py @@ -0,0 +1,193 @@ + +""" +Simple decoder, accepts PDF documents on input, outputs pages from the +PDF document as text as separate output objects. +""" + +import pulsar +from pulsar.schema import JsonSchema +from langchain_community.document_loaders import PyPDFLoader +import tempfile +import base64 +import os +import argparse +import rdflib +import json +import urllib.parse +import time + +from ... schema import VectorsChunk, Triple, Source, Value +from ... log_level import LogLevel +from ... llm_client import LlmClient +from ... prompts import to_definitions +from ... rdf import TRUSTGRAPH_ENTITIES, DEFINITION + +DEFINITION_VALUE = Value(value=DEFINITION, is_uri=True) + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + output_queue, + subscriber, + log_level, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(VectorsChunk), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(Triple), + ) + + self.llm = LlmClient(pulsar_host=pulsar_host) + + def to_uri(self, text): + + part = text.replace(" ", "-").lower().encode("utf-8") + quoted = urllib.parse.quote(part) + uri = TRUSTGRAPH_ENTITIES + quoted + + return uri + + def get_definitions(self, chunk): + + prompt = to_definitions(chunk) + resp = self.llm.request(prompt) + + defs = json.loads(resp) + + return defs + + def emit_edge(self, s, p, o): + + t = Triple(s=s, p=p, o=o) + self.producer.send(t) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + print(f"Indexing {v.source.id}...", flush=True) + + chunk = v.chunk.decode("utf-8") + + g = rdflib.Graph() + + try: + + defs = self.get_definitions(chunk) + print(json.dumps(defs, indent=4), flush=True) + + for defn in defs: + + s = defn["entity"] + s_uri = self.to_uri(s) + + o = defn["definition"] + + s_value = Value(value=str(s_uri), is_uri=True) + o_value = Value(value=str(o), is_uri=False) + + self.emit_edge(s_value, DEFINITION_VALUE, o_value) + + except Exception as e: + print("Exception: ", e, flush=True) + + print("Done.", flush=True) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except Exception as e: + + print("Exception: ", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='pdf-decoder', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'vectors-chunk-load' + default_output_queue = 'graph-load' + default_subscriber = 'kg-extract-definitions' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + args = parser.parse_args() + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + output_queue=args.output_queue, + subscriber=args.subscriber, + log_level=args.log_level, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + diff --git a/trustgraph/kg/extract_relationships/__init__.py b/trustgraph/kg/extract_relationships/__init__.py new file mode 100644 index 00000000..81287a3c --- /dev/null +++ b/trustgraph/kg/extract_relationships/__init__.py @@ -0,0 +1,3 @@ + +from . extract import * + diff --git a/trustgraph/kg/extract_relationships/__main__.py b/trustgraph/kg/extract_relationships/__main__.py new file mode 100755 index 00000000..403fe672 --- /dev/null +++ b/trustgraph/kg/extract_relationships/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . extract import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/kg/extract_relationships/extract.py b/trustgraph/kg/extract_relationships/extract.py new file mode 100755 index 00000000..a47c3b6e --- /dev/null +++ b/trustgraph/kg/extract_relationships/extract.py @@ -0,0 +1,252 @@ + +""" +Simple decoder, accepts PDF documents on input, outputs pages from the +PDF document as text as separate output objects. +""" + +import pulsar +from pulsar.schema import JsonSchema +from langchain_community.document_loaders import PyPDFLoader +import tempfile +import base64 +import os +import argparse +import rdflib +import json +import urllib.parse +import time + +from ... schema import VectorsChunk, Triple, VectorsAssociation, Source, Value +from ... log_level import LogLevel +from ... llm_client import LlmClient +from ... prompts import to_relationships +from ... rdf import RDF_LABEL, TRUSTGRAPH_ENTITIES + +RDF_LABEL_VALUE = Value(value=RDF_LABEL, is_uri=True) + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + output_queue, + vec_queue, + subscriber, + log_level, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(VectorsChunk), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(Triple), + ) + + self.vec_prod = self.client.create_producer( + topic=vec_queue, + schema=JsonSchema(VectorsAssociation), + ) + + self.llm = LlmClient(pulsar_host=pulsar_host) + + def to_uri(self, text): + + part = text.replace(" ", "-").lower().encode("utf-8") + quoted = urllib.parse.quote(part) + uri = TRUSTGRAPH_ENTITIES + quoted + + return uri + + def get_relationships(self, chunk): + + prompt = to_relationships(chunk) + resp = self.llm.request(prompt) + + rels = json.loads(resp) + + return rels + + def emit_edge(self, s, p, o): + + t = Triple(s=s, p=p, o=o) + self.producer.send(t) + + def emit_vec(self, ent, vec): + + r = VectorsAssociation(entity=ent, vectors=vec) + self.vec_prod.send(r) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + print(f"Indexing {v.source.id}...", flush=True) + + chunk = v.chunk.decode("utf-8") + + g = rdflib.Graph() + + try: + + rels = self.get_relationships(chunk) + print(json.dumps(rels, indent=4), flush=True) + + for rel in rels: + + s = rel["subject"] + p = rel["predicate"] + o = rel["object"] + + s_uri = self.to_uri(s) + s_value = Value(value=str(s_uri), is_uri=True) + + p_uri = self.to_uri(p) + p_value = Value(value=str(p_uri), is_uri=True) + + if rel["object-entity"]: + o_uri = self.to_uri(o) + o_value = Value(value=str(o_uri), is_uri=True) + else: + o_value = Value(value=str(o), is_uri=False) + + self.emit_edge( + s_value, + p_value, + o_value + ) + + # Label for s + self.emit_edge( + s_value, + RDF_LABEL_VALUE, + Value(value=str(s), is_uri=False) + ) + + # Label for p + self.emit_edge( + p_value, + RDF_LABEL_VALUE, + Value(value=str(p), is_uri=False) + ) + + if rel["object-entity"]: + # Label for o + self.emit_edge( + o_value, + RDF_LABEL_VALUE, + Value(value=str(o), is_uri=False) + ) + + self.emit_vec(s_value, v.vectors) + self.emit_vec(p_value, v.vectors) + if rel["object-entity"]: + self.emit_vec(o_value, v.vectors) + + except Exception as e: + print("Exception: ", e, flush=True) + + print("Done.", flush=True) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except Exception as e: + + print("Exception: ", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='kg-extract-relationships', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'vectors-chunk-load' + default_output_queue = 'graph-load' + default_subscriber = 'kg-extract-relationships' + default_vector_queue='vectors-load' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-c', '--vector-queue', + default=default_vector_queue, + help=f'Vector output queue (default: {default_vector_queue})' + ) + + args = parser.parse_args() + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + output_queue=args.output_queue, + vec_queue=args.vector_queue, + subscriber=args.subscriber, + log_level=args.log_level, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + + diff --git a/trustgraph/llm/__init__.py b/trustgraph/llm/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/llm/azure_text/__init__.py b/trustgraph/llm/azure_text/__init__.py new file mode 100644 index 00000000..f2017af8 --- /dev/null +++ b/trustgraph/llm/azure_text/__init__.py @@ -0,0 +1,3 @@ + +from . llm import * + diff --git a/trustgraph/llm/azure_text/__main__.py b/trustgraph/llm/azure_text/__main__.py new file mode 100755 index 00000000..91342d2d --- /dev/null +++ b/trustgraph/llm/azure_text/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . llm import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/llm/azure_text/llm.py b/trustgraph/llm/azure_text/llm.py new file mode 100755 index 00000000..c4f47b6a --- /dev/null +++ b/trustgraph/llm/azure_text/llm.py @@ -0,0 +1,213 @@ + +""" +Simple LLM service, performs text prompt completion using an Ollama service. +Input is prompt, output is response. +""" + +import pulsar +from pulsar.schema import JsonSchema +import tempfile +import base64 +import os +import argparse +from langchain_community.llms import Ollama +import requests +import time +import json + +from ... schema import TextCompletionRequest, TextCompletionResponse +from ... log_level import LogLevel + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + output_queue, + subscriber, + log_level, + endpoint, + token, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(TextCompletionRequest), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(TextCompletionResponse), + ) + + self.endpoint = endpoint + self.token = token + + def build_prompt(self, system, content): + + data = { + "messages": [ + { + "role": "system", "content": system + }, + { + "role": "user", "content": content + } + ], + "max_tokens": 4192, + "temperature": 0.2, + "top_p": 1 + } + + body = json.dumps(data) + + return body + + def call_llm(self, body): + + url = self.endpoint + + # Replace this with the primary/secondary key, AMLToken, or + # Microsoft Entra ID token for the endpoint + api_key = self.token + + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {api_key}' + } + + resp = requests.post(url, data=body, headers=headers) + result = resp.json() + + message_content = result['choices'][0]['message']['content'] + + return message_content + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + + # Sender-produced ID + + id = msg.properties()["id"] + + print(f"Handling prompt {id}...", flush=True) + + prompt = self.build_prompt( + "You are a helpful chatbot", + v.prompt + ) + + response = self.call_llm(prompt) + + print("Send response...", flush=True) + r = TextCompletionResponse(response=response) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='llm-ollama-text', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'llm-complete-text' + default_output_queue = 'llm-complete-text-response' + default_subscriber = 'llm-ollama-text' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-e', '--endpoint', + help=f'LLM model endpoint' + ) + + parser.add_argument( + '-k', '--token', + help=f'LLM model token' + ) + + args = parser.parse_args() + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + output_queue=args.output_queue, + subscriber=args.subscriber, + log_level=args.log_level, + endpoint=args.endpoint, + token=args.token, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + + diff --git a/trustgraph/llm/claude_text/__init__.py b/trustgraph/llm/claude_text/__init__.py new file mode 100644 index 00000000..f2017af8 --- /dev/null +++ b/trustgraph/llm/claude_text/__init__.py @@ -0,0 +1,3 @@ + +from . llm import * + diff --git a/trustgraph/llm/claude_text/__main__.py b/trustgraph/llm/claude_text/__main__.py new file mode 100755 index 00000000..91342d2d --- /dev/null +++ b/trustgraph/llm/claude_text/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . llm import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/llm/claude_text/llm.py b/trustgraph/llm/claude_text/llm.py new file mode 100755 index 00000000..85ffc9c7 --- /dev/null +++ b/trustgraph/llm/claude_text/llm.py @@ -0,0 +1,190 @@ + +""" +Simple LLM service, performs text prompt completion using Claude. +Input is prompt, output is response. +""" + +import pulsar +from pulsar.schema import JsonSchema +import tempfile +import base64 +import os +import argparse +import anthropic +import time + +from ... schema import TextCompletionRequest, TextCompletionResponse +from ... log_level import LogLevel + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + output_queue, + subscriber, + log_level, + model, + api_key, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(TextCompletionRequest), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(TextCompletionResponse), + ) + + self.model = model + + self.claude = anthropic.Anthropic(api_key=api_key) + + print("Initialised", flush=True) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + + # Sender-produced ID + + id = msg.properties()["id"] + + print(f"Handling prompt {id}...", flush=True) + + prompt = v.prompt + response = message = self.claude.messages.create( + model=self.model, + max_tokens=1000, + temperature=0.1, + system = "You are a helpful chatbot.", + messages=[ + { + "role": "user", + "content": [ + { + "type": "text", + "text": prompt + } + ] + } + ] + ) + + resp = response.content[0].text + print(resp, flush=True) + + print("Send response...", flush=True) + r = TextCompletionResponse(response=resp) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='llm-ollama-text', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'llm-complete-text' + default_output_queue = 'llm-complete-text-response' + default_subscriber = 'llm-claude-text' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-m', '--model', + default="claude-3-5-sonnet-20240620", + help=f'LLM model (default: claude-3-5-sonnet-20240620)' + ) + + parser.add_argument( + '-k', '--api-key', + help=f'Claude API key' + ) + + args = parser.parse_args() + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + output_queue=args.output_queue, + subscriber=args.subscriber, + log_level=args.log_level, + model=args.model, + api_key=args.api_key, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + + diff --git a/trustgraph/llm/ollama_text/__init__.py b/trustgraph/llm/ollama_text/__init__.py new file mode 100644 index 00000000..f2017af8 --- /dev/null +++ b/trustgraph/llm/ollama_text/__init__.py @@ -0,0 +1,3 @@ + +from . llm import * + diff --git a/trustgraph/llm/ollama_text/__main__.py b/trustgraph/llm/ollama_text/__main__.py new file mode 100755 index 00000000..91342d2d --- /dev/null +++ b/trustgraph/llm/ollama_text/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . llm import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/llm/ollama_text/llm.py b/trustgraph/llm/ollama_text/llm.py new file mode 100755 index 00000000..9d9c7dad --- /dev/null +++ b/trustgraph/llm/ollama_text/llm.py @@ -0,0 +1,169 @@ + +""" +Simple LLM service, performs text prompt completion using an Ollama service. +Input is prompt, output is response. +""" + +import pulsar +from pulsar.schema import JsonSchema +import tempfile +import base64 +import os +import argparse +from langchain_community.llms import Ollama +import time + +from ... schema import TextCompletionRequest, TextCompletionResponse +from ... log_level import LogLevel + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + output_queue, + subscriber, + log_level, + model, + ollama, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(TextCompletionRequest), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(TextCompletionResponse), + ) + + self.llm = Ollama(base_url=ollama, model=model) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + + # Sender-produced ID + + id = msg.properties()["id"] + + print(f"Handling prompt {id}...", flush=True) + + prompt = v.prompt + response = self.llm.invoke(prompt) + + print("Send response...", flush=True) + r = TextCompletionResponse(response=response) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + print("Closing") + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='llm-ollama-text', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'llm-complete-text' + default_output_queue = 'llm-complete-text-response' + default_subscriber = 'llm-ollama-text' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-m', '--model', + default="gemma2", + help=f'LLM model (default: gemma2)' + ) + + parser.add_argument( + '-r', '--ollama', + default="http://localhost:11434", + help=f'ollama (default: http://localhost:11434)' + ) + + args = parser.parse_args() + + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + output_queue=args.output_queue, + subscriber=args.subscriber, + log_level=args.log_level, + model=args.model, + ollama=args.ollama, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + diff --git a/trustgraph/llm/vertexai_text/__init__.py b/trustgraph/llm/vertexai_text/__init__.py new file mode 100644 index 00000000..f2017af8 --- /dev/null +++ b/trustgraph/llm/vertexai_text/__init__.py @@ -0,0 +1,3 @@ + +from . llm import * + diff --git a/trustgraph/llm/vertexai_text/__main__.py b/trustgraph/llm/vertexai_text/__main__.py new file mode 100755 index 00000000..91342d2d --- /dev/null +++ b/trustgraph/llm/vertexai_text/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . llm import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/llm/vertexai_text/llm.py b/trustgraph/llm/vertexai_text/llm.py new file mode 100755 index 00000000..44b55e62 --- /dev/null +++ b/trustgraph/llm/vertexai_text/llm.py @@ -0,0 +1,254 @@ + +""" +Simple LLM service, performs text prompt completion using an Ollama service. +Input is prompt, output is response. +""" + +import pulsar +from pulsar.schema import JsonSchema +import tempfile +import base64 +import os +import argparse +import vertexai +import time + +from google.oauth2 import service_account +import google + +from vertexai.preview.generative_models import ( + Content, + FunctionDeclaration, + GenerativeModel, + GenerationConfig, + HarmCategory, + HarmBlockThreshold, + Part, + Tool, +) + +from ... schema import TextCompletionRequest, TextCompletionResponse +from ... log_level import LogLevel + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + output_queue, + subscriber, + log_level, + credentials, + region, + model, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(TextCompletionRequest), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(TextCompletionResponse), + ) + + self.parameters = { + "temperature": 0.2, + "top_p": 1.0, + "top_k": 32, + "candidate_count": 1, + "max_output_tokens": 8192, + } + + self.generation_config = GenerationConfig( + temperature=0.2, + top_p=1.0, + top_k=10, + candidate_count=1, + max_output_tokens=8191, + ) + + # Block none doesn't seem to work + block_level = HarmBlockThreshold.BLOCK_ONLY_HIGH + # block_level = HarmBlockThreshold.BLOCK_NONE + + self.safety_settings = { + HarmCategory.HARM_CATEGORY_HARASSMENT: block_level, + HarmCategory.HARM_CATEGORY_HATE_SPEECH: block_level, + HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: block_level, + HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: block_level, + } + + print("Initialise VertexAI...", flush=True) + + if credentials: + vertexai.init( + location=region, + credentials=credentials, + project=credentials.project_id, + ) + else: + vertexai.init( + location=region + ) + + print(f"Initialise model {model}", flush=True) + self.llm = GenerativeModel(model) + + print("Initialisation complete", flush=True) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + + # Sender-produced ID + + id = msg.properties()["id"] + + print(f"Handling prompt {id}...", flush=True) + + prompt = v.prompt + + resp = self.llm.generate_content( + prompt, generation_config=self.generation_config, + safety_settings=self.safety_settings + ) + + resp = resp.text + + resp = resp.replace("```json", "") + resp = resp.replace("```", "") + + print("Send response...", flush=True) + r = TextCompletionResponse(response=resp) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except google.api_core.exceptions.ResourceExhausted: + + print("429, resource busy, sleeping", flush=True) + time.sleep(15) + self.consumer.negative_acknowledge(msg) + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='llm-ollama-text', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'llm-complete-text' + default_output_queue = 'llm-complete-text-response' + default_subscriber = 'llm-vertexai-text' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-m', '--model', + default="gemini-1.0-pro-001", + help=f'LLM model (default: gemini-1.0-pro-001)' + ) + # Also: text-bison-32k + + parser.add_argument( + '-k', '--private-key', + help=f'Google Cloud private JSON file' + ) + + parser.add_argument( + '-r', '--region', + default='us-west1', + help=f'Google Cloud region (default: us-west1)', + ) + + args = parser.parse_args() + + if args.private_key: + credentials = service_account.Credentials.from_service_account_file( + args.private_key + ) + else: + credentials = None + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + output_queue=args.output_queue, + subscriber=args.subscriber, + log_level=args.log_level, + credentials=credentials, + region=args.region, + model=args.model, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + diff --git a/trustgraph/llm_client.py b/trustgraph/llm_client.py new file mode 100644 index 00000000..5e0df96d --- /dev/null +++ b/trustgraph/llm_client.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 + +import pulsar +import _pulsar +from pulsar.schema import JsonSchema +from trustgraph.schema import TextCompletionRequest, TextCompletionResponse +import hashlib +import uuid + +# Ugly +ERROR=_pulsar.LoggerLevel.Error +WARN=_pulsar.LoggerLevel.Warn +INFO=_pulsar.LoggerLevel.Info +DEBUG=_pulsar.LoggerLevel.Debug + +class LlmClient: + + def __init__( + self, log_level=ERROR, client_id=None, + pulsar_host="pulsar://pulsar:6650", + ): + + if client_id == None: + client_id = str(uuid.uuid4()) + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level), + ) + + self.producer = self.client.create_producer( + topic='llm-complete-text', + schema=JsonSchema(TextCompletionRequest), + chunking_enabled=True, + ) + + self.consumer = self.client.subscribe( + 'llm-complete-text-response', client_id, + schema=JsonSchema(TextCompletionResponse), + ) + + def request(self, prompt, timeout=500): + + id = str(uuid.uuid4()) + + + r = TextCompletionRequest( + prompt=prompt + ) + self.producer.send(r, properties={ "id": id }) + + while True: + + msg = self.consumer.receive(timeout_millis=timeout * 1000) + + mid = msg.properties()["id"] + + if mid == id: + resp = msg.value().response + self.consumer.acknowledge(msg) + return resp + + # Ignore messages with wrong ID + self.consumer.acknowledge(msg) + + def __del__(self): + + self.producer.close() + self.consumer.close() + self.client.close() + diff --git a/trustgraph/log_level.py b/trustgraph/log_level.py new file mode 100644 index 00000000..65486b29 --- /dev/null +++ b/trustgraph/log_level.py @@ -0,0 +1,20 @@ + +from enum import Enum +import _pulsar + +class LogLevel(Enum): + DEBUG = 'debug' + INFO = 'info' + WARN = 'warn' + ERROR = 'error' + + def __str__(self): + return self.value + + def to_pulsar(self): + if self == LogLevel.DEBUG: return _pulsar.LoggerLevel.Debug + if self == LogLevel.INFO: return _pulsar.LoggerLevel.Info + if self == LogLevel.WARN: return _pulsar.LoggerLevel.Warn + if self == LogLevel.ERROR: return _pulsar.LoggerLevel.Error + raise RuntimeError("Log level mismatch") + diff --git a/trustgraph/prompts.py b/trustgraph/prompts.py new file mode 100644 index 00000000..c6b91ff2 --- /dev/null +++ b/trustgraph/prompts.py @@ -0,0 +1,138 @@ + +def turtle_extract(text): + + prompt = f""" +Study the following text and extract knowledge as +information in Turtle RDF format. +When declaring any new URIs, use prefix, +and declare appropriate namespace tags. + + + +{text} + + + +Do not use placeholders for information you do not know. +You will respond only with raw Turtle RDF data. Do not provide +explanations. Do not use special characters in the abstract text. The +abstract must be written as plain text. Do not add markdown formatting. +""" + + return prompt + +def scholar(text): + + # Build the prompt for Article style extraction + jsonexample = """{ + "title": "Article title here", + "abstract": "Abstract text here", + "keywords": ["keyword1", "keyword2", "keyword3"], + "people": ["person1", "person2", "person3"] +}""" + + promptscholar = f"""Your task is to read the provided text and write a scholarly abstract to fully explain all of the concepts described in the provided text. The abstract must include all conceptual details. + +{text} + + + +- Structure: For the provided text, write a title, abstract, keywords, + and people for the concepts found in the provided text. Ignore + document formatting in the provided text such as table of contents, + headers, footers, section metadata, and URLs. +- Focus on Concepts The abstract must focus on concepts found in the + provided text. The abstract must be factually accurate. Do not + write any concepts not found in the provided text. Do not + speculate. Do not omit any conceptual details. +- Completeness: The abstract must capture all topics the reader will + need to understand the concepts found in the provided text. Describe + all terms, definitions, entities, people, events, concepts, + conceptual relationships, and any other topics necessary for the + reader to understand the concepts of the provided text. + +- Format: Respond in the form of a valid JSON object. + + +{jsonexample} + + +You will respond only with the JSON object. Do not provide +explanations. Do not use special characters in the abstract text. The +abstract must be written as plain text. +""" + + return promptscholar + +def to_json_ld(text): + + prompt = f""" +Study the following text and output any facts you discover in +well-structured JSON-LD format. +Use any schema you understand from schema.org to describe the facts. + + + +{text} + + + +You will respond only with raw JSON-LD data in JSON format. Do not provide +explanations. Do not use special characters in the abstract text. The +abstract must be written as plain text. Do not add markdown formatting +or headers or prefixes. Do not use information which is not present in +the input text. +""" + + return prompt + + +def to_relationships(text): + + prompt = f""" +Study the following text and derive entity relationships. For each +relationship, derive the subject, predicate and object of the relationship. +Output relationships in JSON format as an arary of objects with fields: +- subject: the subject of the relationship +- predicate: the predicate +- object: the object of the relationship +- object-entity: false if the object is a simple data type: name, value or date. true if it is an entity. + + + +{text} + + + +You will respond only with raw JSON format data. Do not provide +explanations. Do not use special characters in the abstract text. The +abstract must be written as plain text. Do not add markdown formatting +or headers or prefixes. +""" + + return prompt + +def to_definitions(text): + + prompt = f""" +Study the following text and derive definitions for any discovered entities. +Do not provide definitions for entities whose definitions are incomplete +or unknown. +Output relationships in JSON format as an arary of objects with fields: +- entity: the name of the entity +- definition: English text which defines the entity + + + +{text} + + + +You will respond only with raw JSON format data. Do not provide +explanations. Do not use special characters in the abstract text. The +abstract will be written as plain text. Do not add markdown formatting +or headers or prefixes. Do not include null or unknown definitions. +""" + + return prompt + diff --git a/trustgraph/rag/__init__.py b/trustgraph/rag/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/rag/graph/__init__.py b/trustgraph/rag/graph/__init__.py new file mode 100644 index 00000000..432ae594 --- /dev/null +++ b/trustgraph/rag/graph/__init__.py @@ -0,0 +1,3 @@ + +from . rag import * + diff --git a/trustgraph/rag/graph/__main__.py b/trustgraph/rag/graph/__main__.py new file mode 100755 index 00000000..89ebb780 --- /dev/null +++ b/trustgraph/rag/graph/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . rag import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/rag/graph/rag.py b/trustgraph/rag/graph/rag.py new file mode 100755 index 00000000..28ad298b --- /dev/null +++ b/trustgraph/rag/graph/rag.py @@ -0,0 +1,172 @@ + +""" +Simple RAG service, performs query using graph RAG an LLM. +Input is query, output is response. +""" + +import pulsar +from pulsar.schema import JsonSchema +import tempfile +import base64 +import os +import argparse +import time + +from ... schema import GraphRagQuery, GraphRagResponse +from ... log_level import LogLevel +from ... graph_rag import GraphRag + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + output_queue, + subscriber, + log_level, + graph_hosts, + vector_store, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(GraphRagQuery), + ) + + self.producer = self.client.create_producer( + topic=output_queue, + schema=JsonSchema(GraphRagResponse), + ) + + self.rag = GraphRag( + pulsar_host=pulsar_host, + graph_hosts=graph_hosts, + vector_store=vector_store, + verbose=True, + ) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + + # Sender-produced ID + + id = msg.properties()["id"] + + print(f"Handling input {id}...", flush=True) + + response = self.rag.query(v.query) + + print("Send response...", flush=True) + r = GraphRagResponse(response = response) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + print("Closing", flush=True) + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='llm-ollama-text', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'graph-rag-query' + default_output_queue = 'graph-rag-response' + default_subscriber = 'graph-rag' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-o', '--output-queue', + default=default_output_queue, + help=f'Output queue (default: {default_output_queue})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-g', '--graph-hosts', + default='cassandra', + help=f'Graph hosts, comma separated (default: cassandra)' + ) + + parser.add_argument( + '-v', '--vector-store', + default='http://milvus:19530', + help=f'Vector host (default: http://milvus:19530)' + ) + + args = parser.parse_args() + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + output_queue=args.output_queue, + subscriber=args.subscriber, + log_level=args.log_level, + graph_hosts=args.graph_hosts.split(","), + vector_store=args.vector_store, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + diff --git a/trustgraph/rdf.py b/trustgraph/rdf.py new file mode 100644 index 00000000..b65d9c29 --- /dev/null +++ b/trustgraph/rdf.py @@ -0,0 +1,6 @@ + +RDF_LABEL = "http://www.w3.org/2000/01/rdf-schema#label" +DEFINITION = "http://www.w3.org/2004/02/skos/core#definition" + +TRUSTGRAPH_ENTITIES = "http://trustgraph.ai/e/" + diff --git a/trustgraph/schema.py b/trustgraph/schema.py new file mode 100644 index 00000000..d8fd4af7 --- /dev/null +++ b/trustgraph/schema.py @@ -0,0 +1,67 @@ + +from pulsar.schema import Record, Bytes, String, Boolean, Integer, Array, Double + +from enum import Enum + +#class Command(Enum): +# reindex = 1 + +#class IndexCommand(Record): +# command = Command + +class Value(Record): + value = String() + is_uri = Boolean() + type = String() + +class Source(Record): + source = String() + id = String() + title = String() + +class Document(Record): + source = Source() + data = Bytes() + +class TextDocument(Record): + source = Source() + text = Bytes() + +class Chunk(Record): + source = Source() + chunk = Bytes() + +class VectorsChunk(Record): + source = Source() + vectors = Array(Array(Double())) + chunk = Bytes() + +class VectorsAssociation(Record): + source = Source() + vectors = Array(Array(Double())) + entity = Value() + +class Triple(Record): + source = Source() + s = Value() + p = Value() + o = Value() + +class TextCompletionRequest(Record): + prompt = String() + +class TextCompletionResponse(Record): + response = String() + +class EmbeddingsRequest(Record): + text = String() + +class EmbeddingsResponse(Record): + vectors = Array(Array(Double())) + +class GraphRagQuery(Record): + query = String() + +class GraphRagResponse(Record): + response = String() + diff --git a/trustgraph/trustgraph.py b/trustgraph/trustgraph.py new file mode 100644 index 00000000..a7b53a06 --- /dev/null +++ b/trustgraph/trustgraph.py @@ -0,0 +1,108 @@ + +from cassandra.cluster import Cluster +from cassandra.auth import PlainTextAuthProvider + +class TrustGraph: + + def __init__(self, hosts=None): + + if hosts is None: + hosts = ["localhost"] + + self.cluster = Cluster(hosts) + self.session = self.cluster.connect() + + self.init() + + def clear(self): + + self.session.execute(""" + drop keyspace if exists trustgraph; + """); + + self.init() + + def init(self): + + self.session.execute(""" + create keyspace if not exists trustgraph + with replication = { + 'class' : 'SimpleStrategy', + 'replication_factor' : 1 + }; + """); + + self.session.set_keyspace('trustgraph') + + self.session.execute(""" + create table if not exists triples ( + s text, + p text, + o text, + PRIMARY KEY (s, p) + ); + """); + + self.session.execute(""" + create index if not exists triples_p + ON triples (p); + """); + + self.session.execute(""" + create index if not exists triples_o + ON triples (o); + """); + + def insert(self, s, p, o): + + self.session.execute( + "insert into triples (s, p, o) values (%s, %s, %s)", + (s, p, o) + ) + + def get_all(self, limit=50): + return self.session.execute( + f"select s, p, o from triples limit {limit}" + ) + + def get_s(self, s, limit=10): + return self.session.execute( + f"select p, o from triples where s = %s", + (s,) + ) + + def get_p(self, p, limit=10): + return self.session.execute( + f"select s, o from triples where p = %s limit {limit}", + (p,) + ) + + def get_o(self, o, limit=10): + return self.session.execute( + f"select s, p from triples where o = %s limit {limit}", + (o,) + ) + + def get_sp(self, s, p, limit=10): + return self.session.execute( + f"select o from triples where s = %s and p = %s limit {limit}", + (s, p) + ) + + def get_po(self, p, o, limit=10): + return self.session.execute( + f"select s from triples where p = %s and o = %s allow filtering limit {limit}", + (p, o) + ) + + def get_os(self, o, s, limit=10): + return self.session.execute( + f"select s from triples where o = %s and s = %s limit {limit}", + (o, s) + ) + + def get_spo(self, s, p, o, limit=10): + return self.session.execute( + f"""select s as x from triples where s = %s and p = %s and o = %s limit {limit}""", + (s, p, o) + ) diff --git a/trustgraph/vector/__init__.py b/trustgraph/vector/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/trustgraph/vector/milvus_write/__init__.py b/trustgraph/vector/milvus_write/__init__.py new file mode 100644 index 00000000..d891d55f --- /dev/null +++ b/trustgraph/vector/milvus_write/__init__.py @@ -0,0 +1,3 @@ + +from . write import * + diff --git a/trustgraph/vector/milvus_write/__main__.py b/trustgraph/vector/milvus_write/__main__.py new file mode 100755 index 00000000..c05d8c6d --- /dev/null +++ b/trustgraph/vector/milvus_write/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . write import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph/vector/milvus_write/write.py b/trustgraph/vector/milvus_write/write.py new file mode 100755 index 00000000..6a5700d9 --- /dev/null +++ b/trustgraph/vector/milvus_write/write.py @@ -0,0 +1,136 @@ + +""" +Simple decoder, accepts PDF documents on input, outputs pages from the +PDF document as text as separate output objects. +""" + +import pulsar +from pulsar.schema import JsonSchema +from langchain_community.document_loaders import PyPDFLoader +import tempfile +import base64 +import os +import argparse +import time + +from ... schema import VectorsAssociation +from ... log_level import LogLevel +from ... edge_map import VectorStore + +class Processor: + + def __init__( + self, + pulsar_host, + input_queue, + subscriber, + store_uri, + log_level, + ): + + self.client = pulsar.Client( + pulsar_host, + logger=pulsar.ConsoleLogger(log_level.to_pulsar()) + ) + + self.consumer = self.client.subscribe( + input_queue, subscriber, + schema=JsonSchema(VectorsAssociation), + ) + + self.vecstore = VectorStore(store_uri) + + def run(self): + + while True: + + msg = self.consumer.receive() + + try: + + v = msg.value() + + if v.entity.value != "": + for vec in v.vectors: + self.vecstore.insert(vec, v.entity.value) + + # Acknowledge successful processing of the message + self.consumer.acknowledge(msg) + + except Exception as e: + + print("Exception:", e, flush=True) + + # Message failed to be processed + self.consumer.negative_acknowledge(msg) + + def __del__(self): + self.client.close() + +def run(): + + parser = argparse.ArgumentParser( + prog='pdf-decoder', + description=__doc__, + ) + + default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') + default_input_queue = 'vectors-load' + default_subscriber = 'vector-write-milvus' + + parser.add_argument( + '-p', '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '-i', '--input-queue', + default=default_input_queue, + help=f'Input queue (default: {default_input_queue})' + ) + + parser.add_argument( + '-s', '--subscriber', + default=default_subscriber, + help=f'Queue subscriber name (default: {default_subscriber})' + ) + + parser.add_argument( + '-l', '--log-level', + type=LogLevel, + default=LogLevel.INFO, + choices=list(LogLevel), + help=f'Output queue (default: info)' + ) + + parser.add_argument( + '-t', '--store-uri', + default="http://localhost:19530", + help=f'Milvus store URI (default: http://localhost:19530)' + ) + + args = parser.parse_args() + + while True: + + try: + + p = Processor( + pulsar_host=args.pulsar_host, + input_queue=args.input_queue, + subscriber=args.subscriber, + store_uri=args.store_uri, + log_level=args.log_level, + ) + + p.run() + + except Exception as e: + + print("Exception:", e, flush=True) + print("Will retry...", flush=True) + + time.sleep(10) + +