demos for network copilot and sql analyzer (#57)

* pulled from main branch after adding enums and made changes

* added sql_analyzer folder and built a demo for Employee stats function calling. "top_employees" and "aggregate_stats".

* sql_anayzer

* After addressing PR comments

* PR comments

* PR comments

* Addeed Network Analyzer FC Code

* Added network Analyzer code for diff timeframes

* Network Copilot and Employee Details demos are updated with their descriptions and resolved the PR comments

* Added 2nd function in network copilot

* Added 2nd function in network copilot

* Added 2nd function in network copilot

* Added 2nd function in network copilot

* Added 2nd function in network copilot
This commit is contained in:
Sampreeth Sarma 2024-09-19 11:40:31 -07:00 committed by GitHub
parent a91fbdbf1c
commit ed6a9139e6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 1052 additions and 1 deletions

View file

@ -0,0 +1,28 @@
# Function calling
This demo shows how you can use intelligent prompt gateway to act a copilot for calling the correct proc by capturing the required and optional parametrs from the prompt. This demo assumes you are using ollama running natively. If you want to run ollama running inside docker then please update ollama endpoint in docker-compose file.
# Startig the demo
1. Ensure that submodule is up to date
```sh
git submodule sync --recursive
```
1. Create `.env` file and set OpenAI key using env var `OPENAI_API_KEY`
1. Start services
```sh
docker compose up
```
1. Download Bolt-FC model. This demo assumes we have downloaded [Bolt-Function-Calling-1B:Q4_K_M](https://huggingface.co/katanemolabs/Bolt-Function-Calling-1B.gguf/blob/main/Bolt-Function-Calling-1B-Q4_K_M.gguf) to local folder.
1. If running ollama natively run
```sh
ollama serve
```
2. Create model file in ollama repository
```sh
ollama create Bolt-Function-Calling-1B:Q4_K_M -f Bolt-FC-1B-Q4_K_M.model_file
```
3. Navigate to http://localhost:18080/
4. You can type in queries like "show me the top 5 employees in each department with highest salary"
- You can also ask follow up questions like "just show the top 2"
5. To see metrics navigate to "http://localhost:3000/" (use admin/grafana for login)
- Open up dahsboard named "Intelligent Gateway Overview"
- On this dashboard you can see reuqest latency and number of requests

View file

@ -0,0 +1,79 @@
default_prompt_endpoint: "127.0.0.1"
load_balancing: "round_robin"
timeout_ms: 5000
overrides:
# confidence threshold for prompt target intent matching
prompt_target_intent_matching_threshold: 0.8
# should not be here
embedding_provider:
name: "bge-large-en-v1.5"
model: "BAAI/bge-large-en-v1.5"
llm_providers:
- name: open-ai-gpt-4
api_key: $OPEN_AI_API_KEY
model: gpt-4
default: true
prompt_targets:
- type: function_resolver
name: top_employees
description: |
Allows you to find the top employees in different groups, such as departments, locations, or position. You can rank the employees by different criteria, like salary, yoe, or rating. Returns the best-ranked employees for each group, helping you identify top n in the list.
parameters:
- name: grouping
description: |
Select how you'd like to group the employees. For example, you can group them by department, location, or their position. The tool will provide the top-ranked employees within each group you choose.
required: true
type: string
enum: [department, location, position]
- name: ranking_criteria
required: true
type: string
description: |
Choose how you'd like to rank the employees. You can rank them by their salary, their yoe, or their rating. The tool will sort the employees based on this ranking and return the best ones from each group.
enum: [salary, yoe, rating]
- name: top_n
required: true
type: integer
description: |
Enter how many of the top employees you want to see in each group. For example, if you enter 3, the tool will show you the top 3 employees for each group you selected.
endpoint:
cluster: databasehost
path: /top_employees
system_prompt: |
You are responsible for retrieving the top N employees per group ranked by a constraint.
- type: function_resolver
name: aggregate_stats
description: |
Calculate summary statistics for groups of employees. You can group employees by categories like department or location and then compute totals, averages, or other statistics for specific attributes such as salary or yoe.
parameters:
- name: grouping
description: |
Choose how you'd like to organize the employees. For example, you can group them by department, location, or position. The tool will calculate the summary statistics for each group.
required: true
enum: [department, location, position]
- name: aggregate_criteria
description: |
Select the specific attribute you'd like to analyze. This could be something like salary, yoe, or rating. The tool will calculate the statistic you request for this attribute.
required: true
enum: [salary, yoe, rating]
- name: aggregate_type
description: |
Choose the type of statistic you'd like to calculate for the selected attribute. For example, you can calculate the sum, average, minimum, or maximum value for each group.
required: true
enum: [SUM, AVG, MIN, MAX]
endpoint:
cluster: databasehost
path: /aggregate_stats
system_prompt: |
You help calculate summary statistics for groups of employees. First, organize the employees by the specified grouping (e.g., department, location, or position). Then, compute the requested statistic (e.g., total, average, minimum, or maximum) for a specific attribute like salary, experience, or rating.
clusters:
databasehost:
address: model_server

View file

@ -0,0 +1,127 @@
services:
config_generator:
build:
context: ../../
dockerfile: config_generator/Dockerfile
volumes:
- ../../envoyfilter/envoy.template.yaml:/usr/src/app/envoy.template.yaml
- ./bolt_config.yaml:/usr/src/app/bolt_config.yaml
- ./generated:/usr/src/app/out
bolt:
build:
context: ../../
dockerfile: envoyfilter/Dockerfile
hostname: bolt
ports:
- "10000:10000"
- "19901:9901"
volumes:
- ./generated/envoy.yaml:/etc/envoy/envoy.yaml
- /etc/ssl/cert.pem:/etc/ssl/cert.pem
depends_on:
config_generator:
condition: service_completed_successfully
model_server:
condition: service_healthy
environment:
- LOG_LEVEL=debug
model_server:
build:
context: ../../model_server
dockerfile: Dockerfile
ports:
- "18081:80"
healthcheck:
test: ["CMD", "curl" ,"http://localhost:80/healthz"]
interval: 5s
retries: 20
volumes:
- ~/.cache/huggingface:/root/.cache/huggingface
function_resolver:
build:
context: ../../function_resolver
dockerfile: Dockerfile
ports:
- "18082:80"
healthcheck:
test: ["CMD", "curl" ,"http://localhost:80/healthz"]
interval: 5s
retries: 20
volumes:
- ~/.cache/huggingface:/root/.cache/huggingface
environment:
# use ollama endpoint that is hosted by host machine (no virtualization)
- OLLAMA_ENDPOINT=${OLLAMA_ENDPOINT:-host.docker.internal}
# uncomment following line to use ollama endpoint that is hosted by docker
# - OLLAMA_ENDPOINT=ollama
ollama:
image: ollama/ollama
container_name: ollama
volumes:
- ./ollama:/root/.ollama
restart: unless-stopped
ports:
- '11434:11434'
profiles:
- manual
open-webui:
image: ghcr.io/open-webui/open-webui:${WEBUI_DOCKER_TAG-main}
container_name: open-webui
volumes:
- ./open-webui:/app/backend/data
# depends_on:
# - ollama
ports:
- 18090:8080
environment:
- OLLAMA_BASE_URL=http://${OLLAMA_ENDPOINT:-host.docker.internal}:11434
- WEBUI_AUTH=false
extra_hosts:
- host.docker.internal:host-gateway
restart: unless-stopped
chatbot_ui:
build:
context: ../../chatbot_ui
dockerfile: Dockerfile
ports:
- "18080:8080"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY:?error}
- CHAT_COMPLETION_ENDPOINT=http://bolt:10000/v1/chat/completions
prometheus:
image: prom/prometheus
container_name: prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yaml'
ports:
- 9090:9090
restart: unless-stopped
volumes:
- ./prometheus:/etc/prometheus
- ./prom_data:/prometheus
profiles:
- monitoring
grafana:
image: grafana/grafana
container_name: grafana
ports:
- 3000:3000
restart: unless-stopped
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=grafana
volumes:
- ./grafana:/etc/grafana/provisioning/datasources
- ./grafana/dashboard.yaml:/etc/grafana/provisioning/dashboards/main.yaml
- ./grafana/dashboards:/var/lib/grafana/dashboards
profiles:
- monitoring

View file

@ -0,0 +1,28 @@
# Function calling
This demo shows how you can use intelligent prompt gateway as a network copilot that could give information about correlation between packet loss with device reboots, downs, or maintainence. This demo assumes you are using ollama running natively. If you want to run ollama running inside docker then please update ollama endpoint in docker-compose file.
# Startig the demo
1. Ensure that submodule is up to date
```sh
git submodule sync --recursive
```
1. Create `.env` file and set OpenAI key using env var `OPENAI_API_KEY`
1. Start services
```sh
docker compose up
```
1. Download Bolt-FC model. This demo assumes we have downloaded [Bolt-Function-Calling-1B:Q4_K_M](https://huggingface.co/katanemolabs/Bolt-Function-Calling-1B.gguf/blob/main/Bolt-Function-Calling-1B-Q4_K_M.gguf) to local folder.
1. If running ollama natively run
```sh
ollama serve
```
2. Create model file in ollama repository
```sh
ollama create Bolt-Function-Calling-1B:Q4_K_M -f Bolt-FC-1B-Q4_K_M.model_file
```
3. Navigate to http://localhost:18080/
4. You can type in queries like "show me any packet drops due to interface failure in the past 3 days"
- You can also ask follow up questions like "show me just the ones with maximum 200 in errors"
5. To see metrics navigate to "http://localhost:3000/" (use admin/grafana for login)
- Open up dahsboard named "Intelligent Gateway Overview"
- On this dashboard you can see reuqest latency and number of requests

View file

@ -0,0 +1,135 @@
default_prompt_endpoint: "127.0.0.1"
load_balancing: "round_robin"
timeout_ms: 5000
overrides:
# confidence threshold for prompt target intent matching
prompt_target_intent_matching_threshold: 0.7
# should not be here
embedding_provider:
name: "bge-large-en-v1.5"
model: "BAAI/bge-large-en-v1.5"
llm_providers:
- name: open-ai-gpt-4
api_key: $OPEN_AI_API_KEY
model: gpt-4
default: true
prompt_targets:
- type: function_resolver
name: interface_down_packet_drop
description: |
Checks for packet drops due to interface unavailability like reboots, shutdowns, or maintainence events. It allows filtering the results by timeframes, interface name, region, and packet error thresholds.
parameters:
- name: from_time
description: An optional natural language timeframe (e.g., "past 7 days", "since a month") to define the time range for packet drop analysis.
required: false
type: string
- name: ifname
description: An optional interface name filter to apply.
required: false
type: string
- name: region
description: An optional region filter to apply (from the device table).
required: false
type: string
- name: min_in_errors
description: Minimum number of in_errors to filter results.
required: false
type: integer
- name: max_in_errors
description: Maximum number of in_errors to filter results.
required: false
type: integer
- name: min_out_errors
description: Minimum number of out_errors to filter results.
required: false
type: integer
- name: max_out_errors
description: Maximum number of out_errors to filter results.
required: false
type: integer
- name: min_in_discards
description: Minimum number of in_discards to filter results.
required: false
type: integer
- name: max_in_discards
description: Maximum number of in_discards to filter results.
required: false
type: integer
- name: min_out_discards
description: Minimum number of out_discards to filter results.
required: false
type: integer
- name: max_out_discards
description: Maximum number of out_discards to filter results.
required: false
type: integer
endpoint:
cluster: databasehost
path: /interface_down_packet_drop
system_prompt: |
You are responsible for correlating packet drops with interface down events by analyzing packet errors from the given data.
- type: function_resolver
name: packet_errors_impact_flow
description: |
To find whether packet flows are impacted due to packet errors by correlating the timestamps between the packet errors and the flows. It allows filtering the results by timeframes, interface name, region, and packet error thresholds.
parameters:
- name: from_time
description: An optional natural language timeframe (e.g., "past 7 days", "since a month") to define the time range for the analysis.
required: false
type: string
- name: ifname
description: An optional interface name filter to apply.
required: false
type: string
- name: region
description: An optional region filter to apply (from the device table).
required: false
type: string
- name: min_in_errors
description: Minimum number of in_errors to filter results.
required: false
type: integer
- name: max_in_errors
description: Maximum number of in_errors to filter results.
required: false
type: integer
- name: min_out_errors
description: Minimum number of out_errors to filter results.
required: false
type: integer
- name: max_out_errors
description: Maximum number of out_errors to filter results.
required: false
type: integer
- name: min_in_discards
description: Minimum number of in_discards to filter results.
required: false
type: integer
- name: max_in_discards
description: Maximum number of in_discards to filter results.
required: false
type: integer
- name: min_out_discards
description: Minimum number of out_discards to filter results.
required: false
type: integer
- name: max_out_discards
description: Maximum number of out_discards to filter results.
required: false
type: integer
endpoint:
cluster: databasehost
path: /packet_errors_impact_flow
system_prompt: |
You are responsible for finding and correlating packet errors with the packet flows based on timestamps given in the data. This correlation helps identify if packet flows are impacted by packet errors.
clusters:
databasehost:
address: model_server

View file

@ -0,0 +1,127 @@
services:
config_generator:
build:
context: ../../
dockerfile: config_generator/Dockerfile
volumes:
- ../../envoyfilter/envoy.template.yaml:/usr/src/app/envoy.template.yaml
- ./bolt_config.yaml:/usr/src/app/bolt_config.yaml
- ./generated:/usr/src/app/out
bolt:
build:
context: ../../
dockerfile: envoyfilter/Dockerfile
hostname: bolt
ports:
- "10000:10000"
- "19901:9901"
volumes:
- ./generated/envoy.yaml:/etc/envoy/envoy.yaml
- /etc/ssl/cert.pem:/etc/ssl/cert.pem
depends_on:
config_generator:
condition: service_completed_successfully
model_server:
condition: service_healthy
environment:
- LOG_LEVEL=debug
model_server:
build:
context: ../../model_server
dockerfile: Dockerfile
ports:
- "18081:80"
healthcheck:
test: ["CMD", "curl" ,"http://localhost:80/healthz"]
interval: 5s
retries: 20
volumes:
- ~/.cache/huggingface:/root/.cache/huggingface
function_resolver:
build:
context: ../../function_resolver
dockerfile: Dockerfile
ports:
- "18082:80"
healthcheck:
test: ["CMD", "curl" ,"http://localhost:80/healthz"]
interval: 5s
retries: 20
volumes:
- ~/.cache/huggingface:/root/.cache/huggingface
environment:
# use ollama endpoint that is hosted by host machine (no virtualization)
- OLLAMA_ENDPOINT=${OLLAMA_ENDPOINT:-host.docker.internal}
# uncomment following line to use ollama endpoint that is hosted by docker
# - OLLAMA_ENDPOINT=ollama
ollama:
image: ollama/ollama
container_name: ollama
volumes:
- ./ollama:/root/.ollama
restart: unless-stopped
ports:
- '11434:11434'
profiles:
- manual
open-webui:
image: ghcr.io/open-webui/open-webui:${WEBUI_DOCKER_TAG-main}
container_name: open-webui
volumes:
- ./open-webui:/app/backend/data
# depends_on:
# - ollama
ports:
- 18090:8080
environment:
- OLLAMA_BASE_URL=http://${OLLAMA_ENDPOINT:-host.docker.internal}:11434
- WEBUI_AUTH=false
extra_hosts:
- host.docker.internal:host-gateway
restart: unless-stopped
chatbot_ui:
build:
context: ../../chatbot_ui
dockerfile: Dockerfile
ports:
- "18080:8080"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY:?error}
- CHAT_COMPLETION_ENDPOINT=http://bolt:10000/v1/chat/completions
prometheus:
image: prom/prometheus
container_name: prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yaml'
ports:
- 9090:9090
restart: unless-stopped
volumes:
- ./prometheus:/etc/prometheus
- ./prom_data:/prometheus
profiles:
- monitoring
grafana:
image: grafana/grafana
container_name: grafana
ports:
- 3000:3000
restart: unless-stopped
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=grafana
volumes:
- ./grafana:/etc/grafana/provisioning/datasources
- ./grafana/dashboard.yaml:/etc/grafana/provisioning/dashboards/main.yaml
- ./grafana/dashboards:/var/lib/grafana/dashboards
profiles:
- monitoring

View file

@ -0,0 +1,59 @@
import pandas as pd
import random
import datetime
def generate_employee_data(conn):
# List of possible names, positions, departments, and locations
names = ["Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Hank", "Ivy", "Jack"]
positions = ["Manager", "Engineer", "Salesperson", "HR Specialist", "Marketing Analyst"]
departments = ["Engineering", "Marketing", "HR", "Sales", "Finance"]
locations = ["New York", "San Francisco", "Austin", "Boston", "Chicago"]
# Function to generate random hire date
def random_hire_date():
start_date = datetime.date(2000, 1, 1)
end_date = datetime.date(2023, 12, 31)
time_between_dates = end_date - start_date
days_between_dates = time_between_dates.days
random_number_of_days = random.randrange(days_between_dates)
hire_date = start_date + datetime.timedelta(days=random_number_of_days)
return hire_date
# Function to generate random employee data
def generate_employee_records(count):
employees = []
for _ in range(count):
name = random.choice(names)
position = random.choice(positions)
salary = round(random.uniform(50000, 150000), 2) # Salary between 50,000 and 150,000
department = random.choice(departments)
location = random.choice(locations)
hire_date = random_hire_date()
performance_score = round(random.uniform(1, 5), 2) # Performance score between 1.0 and 5.0
years_of_experience = random.randint(1, 30) # Years of experience between 1 and 30
employee = {
"position": position,
"name": name,
"salary": salary,
"department": department,
"location": location,
"hire_date": hire_date,
"performance_score": performance_score,
"years_of_experience": years_of_experience
}
employees.append(employee)
return employees
# Generate 10 random employee records
employee_records = generate_employee_records(200)
# Convert the list of dictionaries to a DataFrame
df = pd.DataFrame(employee_records)
df.to_sql('employees', conn, index=False)
return

View file

@ -2,6 +2,9 @@ import os
import sentence_transformers
from gliner import GLiNER
from transformers import pipeline
import sqlite3
from employee_data_generator import generate_employee_data
from network_data_generator import generate_device_data, generate_interface_stats_data, generate_flow_data
def load_transformers(models = os.getenv("MODELS", "BAAI/bge-large-en-v1.5")):
transformers = {}
@ -26,3 +29,22 @@ def load_zero_shot_models(models = os.getenv("ZERO_SHOT_MODELS", "tasksource/deb
zero_shot_models[model] = pipeline("zero-shot-classification",model=model)
return zero_shot_models
def load_sql():
# Example Usage
conn = sqlite3.connect(':memory:')
# create and load the employees table
generate_employee_data(conn)
# create and load the devices table
device_data = generate_device_data(conn)
# create and load the interface_stats table
generate_interface_stats_data(conn, device_data)
# create and load the flow table
generate_flow_data(conn, device_data)
return conn

View file

@ -2,8 +2,16 @@ import random
from fastapi import FastAPI, Response, HTTPException
from pydantic import BaseModel
from load_models import load_ner_models, load_transformers, load_zero_shot_models
from datetime import date, timedelta
from datetime import datetime, date, timedelta, timezone
import string
import pandas as pd
from load_models import load_sql
import logging
from dateparser import parse
from network_data_generator import convert_to_ago_format, load_params
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
transformers = load_transformers()
ner_models = load_ner_models()
@ -144,6 +152,163 @@ async def weather(req: WeatherRequest, res: Response):
return weather_forecast
'''
*****
Adding new functions to test the usecases - Sampreeth
*****
'''
conn = load_sql()
name_col = "name"
class TopEmployees(BaseModel):
grouping: str
ranking_criteria: str
top_n: int
@app.post("/top_employees")
async def top_employees(req: TopEmployees, res: Response):
name_col = "name"
# Check if `req.ranking_criteria` is a Text object and extract its value accordingly
logger.info(f"{'* ' * 50}\n\nCaptured Ranking Criteria: {req.ranking_criteria}\n\n{'* ' * 50}")
if req.ranking_criteria == "yoe":
req.ranking_criteria = "years_of_experience"
elif req.ranking_criteria == "rating":
req.ranking_criteria = "performance_score"
logger.info(f"{'* ' * 50}\n\nFinal Ranking Criteria: {req.ranking_criteria}\n\n{'* ' * 50}")
query = f"""
SELECT {req.grouping}, {name_col}, {req.ranking_criteria}
FROM (
SELECT {req.grouping}, {name_col}, {req.ranking_criteria},
DENSE_RANK() OVER (PARTITION BY {req.grouping} ORDER BY {req.ranking_criteria} DESC) as emp_rank
FROM employees
) ranked_employees
WHERE emp_rank <= {req.top_n};
"""
result_df = pd.read_sql_query(query, conn)
result = result_df.to_dict(orient='records')
return result
class AggregateStats(BaseModel):
grouping: str
aggregate_criteria: str
aggregate_type: str
@app.post("/aggregate_stats")
async def aggregate_stats(req: AggregateStats, res: Response):
logger.info(f"{'* ' * 50}\n\nCaptured Aggregate Criteria: {req.aggregate_criteria}\n\n{'* ' * 50}")
if req.aggregate_criteria == "yoe":
req.aggregate_criteria = "years_of_experience"
logger.info(f"{'* ' * 50}\n\nFinal Aggregate Criteria: {req.aggregate_criteria}\n\n{'* ' * 50}")
logger.info(f"{'* ' * 50}\n\nCaptured Aggregate Type: {req.aggregate_type}\n\n{'* ' * 50}")
if req.aggregate_type.lower() not in ["sum", "avg", "min", "max"]:
if req.aggregate_type.lower() == "count":
req.aggregate_type = "COUNT"
elif req.aggregate_type.lower() == "total":
req.aggregate_type = "SUM"
elif req.aggregate_type.lower() == "average":
req.aggregate_type = "AVG"
elif req.aggregate_type.lower() == "minimum":
req.aggregate_type = "MIN"
elif req.aggregate_type.lower() == "maximum":
req.aggregate_type = "MAX"
else:
raise HTTPException(status_code=400, detail="Invalid aggregate type")
logger.info(f"{'* ' * 50}\n\nFinal Aggregate Type: {req.aggregate_type}\n\n{'* ' * 50}")
query = f"""
SELECT {req.grouping}, {req.aggregate_type}({req.aggregate_criteria}) as {req.aggregate_type}_{req.aggregate_criteria}
FROM employees
GROUP BY {req.grouping};
"""
result_df = pd.read_sql_query(query, conn)
result = result_df.to_dict(orient='records')
return result
class PacketDropCorrelationRequest(BaseModel):
from_time: str = None # Optional natural language timeframe
ifname: str = None # Optional interface name filter
region: str = None # Optional region filter
min_in_errors: int = None
max_in_errors: int = None
min_out_errors: int = None
max_out_errors: int = None
min_in_discards: int = None
max_in_discards: int = None
min_out_discards: int = None
max_out_discards: int = None
@app.post("/interface_down_pkt_drop")
async def interface_down_packet_drop(req: PacketDropCorrelationRequest, res: Response):
params, filters = load_params(req)
# Join the filters using AND
where_clause = " AND ".join(filters)
if where_clause:
where_clause = "AND " + where_clause
# Step 3: Query packet errors and flows from interfacestats and ts_flow
query = f"""
SELECT
d.switchip AS device_ip_address,
i.in_errors,
i.in_discards,
i.out_errors,
i.out_discards,
i.ifname,
t.src_addr,
t.dst_addr,
t.time AS flow_time,
i.time AS interface_time
FROM
device d
INNER JOIN
interfacestats i
ON d.device_mac_address = i.device_mac_address
INNER JOIN
ts_flow t
ON d.switchip = t.sampler_address
WHERE
i.time >= :from_time -- Using the converted timestamp
{where_clause}
ORDER BY
i.time;
"""
correlated_data = pd.read_sql_query(query, conn, params=params)
if correlated_data.empty:
default_response = {
"device_ip_address": "0.0.0.0", # Placeholder IP
"in_errors": 0,
"in_discards": 0,
"out_errors": 0,
"out_discards": 0,
"ifname": req.ifname or "unknown", # Placeholder or interface provided in the request
"src_addr": "0.0.0.0", # Placeholder source IP
"dst_addr": "0.0.0.0", # Placeholder destination IP
"flow_time": str(datetime.now(timezone.utc)), # Current timestamp or placeholder
"interface_time": str(datetime.now(timezone.utc)) # Current timestamp or placeholder
}
return [default_response]
logger.info(f"Correlated Packet Drop Data: {correlated_data}")
return correlated_data.to_dict(orient='records')
class InsuranceClaimDetailsRequest(BaseModel):
policy_number: str
@ -159,3 +324,82 @@ async def insurance_claim_details(req: InsuranceClaimDetailsRequest, res: Respon
}
return claim_details
class FlowPacketErrorCorrelationRequest(BaseModel):
from_time: str = None # Optional natural language timeframe
ifname: str = None # Optional interface name filter
region: str = None # Optional region filter
min_in_errors: int = None
max_in_errors: int = None
min_out_errors: int = None
max_out_errors: int = None
min_in_discards: int = None
max_in_discards: int = None
min_out_discards: int = None
max_out_discards: int = None
@app.post("/packet_errors_impact_flow")
async def packet_errors_impact_flow(req: FlowPacketErrorCorrelationRequest, res: Response):
params, filters = load_params(req)
# Join the filters using AND
where_clause = " AND ".join(filters)
if where_clause:
where_clause = "AND " + where_clause
# Step 3: Query the packet errors and flows, correlating by timestamps
query = f"""
SELECT
d.switchip AS device_ip_address,
i.in_errors,
i.in_discards,
i.out_errors,
i.out_discards,
i.ifname,
t.src_addr,
t.dst_addr,
t.src_port,
t.dst_port,
t.packets,
t.time AS flow_time,
i.time AS error_time
FROM
device d
INNER JOIN
interfacestats i
ON d.device_mac_address = i.device_mac_address
INNER JOIN
ts_flow t
ON d.switchip = t.sampler_address
WHERE
i.time >= :from_time
AND ABS(strftime('%s', t.time) - strftime('%s', i.time)) <= 300 -- Correlate within 5 minutes
{where_clause}
ORDER BY
i.time;
"""
correlated_data = pd.read_sql_query(query, conn, params=params)
if correlated_data.empty:
default_response = {
"device_ip_address": "0.0.0.0", # Placeholder IP
"in_errors": 0,
"in_discards": 0,
"out_errors": 0,
"out_discards": 0,
"ifname": req.ifname or "unknown", # Placeholder or interface provided in the request
"src_addr": "0.0.0.0", # Placeholder source IP
"dst_addr": "0.0.0.0", # Placeholder destination IP
"src_port": 0,
"dst_port": 0,
"packets": 0,
"flow_time": str(datetime.now(timezone.utc)), # Current timestamp or placeholder
"error_time": str(datetime.now(timezone.utc)) # Current timestamp or placeholder
}
return [default_response]
# Return the correlated data if found
return correlated_data.to_dict(orient='records')

View file

@ -0,0 +1,200 @@
import pandas as pd
import random
from datetime import datetime, timedelta, timezone
import re
import logging
from dateparser import parse
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Function to convert natural language time expressions to "X {time} ago" format
def convert_to_ago_format(expression):
# Define patterns for different time units
time_units = {
r'seconds': 'seconds',
r'minutes': 'minutes',
r'mins': 'mins',
r'hrs': 'hrs',
r'hours': 'hours',
r'hour': 'hour',
r'hr': 'hour',
r'days': 'days',
r'day': 'day',
r'weeks': 'weeks',
r'week': 'week',
r'months': 'months',
r'month': 'month',
r'years': 'years',
r'yrs': 'years',
r'year': 'year',
r'yr': 'year',
}
# Iterate over each time unit and create regex for each phrase format
for pattern, unit in time_units.items():
# Handle "for the past X {unit}"
match = re.search(fr'(\d+) {pattern}', expression)
if match:
quantity = match.group(1)
return f"{quantity} {unit} ago"
# If the format is not recognized, return None or raise an error
return None
# Function to generate random MAC addresses
def random_mac():
return "AA:BB:CC:DD:EE:" + ':'.join([f"{random.randint(0, 255):02X}" for _ in range(2)])
# Function to generate random IP addresses
def random_ip():
return f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}"
# Generate synthetic data for the device table
def generate_device_data(conn, n=1000,):
device_data = {
'switchip': [random_ip() for _ in range(n)],
'hwsku': [f'HW{i+1}' for i in range(n)],
'hostname': [f'switch{i+1}' for i in range(n)],
'osversion': [f'v{i+1}' for i in range(n)],
'layer': ['L2' if i % 2 == 0 else 'L3' for i in range(n)],
'region': [random.choice(['US', 'EU', 'ASIA']) for _ in range(n)],
'uptime': [f'{random.randint(0, 10)} days {random.randint(0, 23)}:{random.randint(0, 59)}:{random.randint(0, 59)}' for _ in range(n)],
'device_mac_address': [random_mac() for _ in range(n)]
}
df = pd.DataFrame(device_data)
df.to_sql('device', conn, index=False)
return df
# Generate synthetic data for the interfacestats table
def generate_interface_stats_data(conn, device_df, n=1000):
interface_stats_data = []
for _ in range(n):
device_mac = random.choice(device_df['device_mac_address'])
ifname = random.choice(['eth0', 'eth1', 'eth2', 'eth3'])
time = datetime.now(timezone.utc) - timedelta(minutes=random.randint(0, 1440 * 5)) # random timestamps in the past 5 day
in_discards = random.randint(0, 1000)
in_errors = random.randint(0, 500)
out_discards = random.randint(0, 800)
out_errors = random.randint(0, 400)
in_octets = random.randint(1000, 100000)
out_octets = random.randint(1000, 100000)
interface_stats_data.append({
'device_mac_address': device_mac,
'ifname': ifname,
'time': time,
'in_discards': in_discards,
'in_errors': in_errors,
'out_discards': out_discards,
'out_errors': out_errors,
'in_octets': in_octets,
'out_octets': out_octets
})
df = pd.DataFrame(interface_stats_data)
df.to_sql('interfacestats', conn, index=False)
return
# Generate synthetic data for the ts_flow table
def generate_flow_data(conn, device_df, n=1000):
flow_data = []
for _ in range(n):
sampler_address = random.choice(device_df['switchip'])
proto = random.choice(['TCP', 'UDP'])
src_addr = random_ip()
dst_addr = random_ip()
src_port = random.randint(1024, 65535)
dst_port = random.randint(1024, 65535)
in_if = random.randint(1, 10)
out_if = random.randint(1, 10)
flow_start = int((datetime.now() - timedelta(days=random.randint(1, 30))).timestamp())
flow_end = int((datetime.now() - timedelta(days=random.randint(1, 30))).timestamp())
bytes_transferred = random.randint(1000, 100000)
packets = random.randint(1, 1000)
flow_time = datetime.now(timezone.utc) - timedelta(minutes=random.randint(0, 1440 * 5)) # random flow time
flow_data.append({
'sampler_address': sampler_address,
'proto': proto,
'src_addr': src_addr,
'dst_addr': dst_addr,
'src_port': src_port,
'dst_port': dst_port,
'in_if': in_if,
'out_if': out_if,
'flow_start': flow_start,
'flow_end': flow_end,
'bytes': bytes_transferred,
'packets': packets,
'time': flow_time
})
df = pd.DataFrame(flow_data)
df.to_sql('ts_flow', conn, index=False)
return
def load_params(req):
# Step 1: Convert the from_time natural language string to a timestamp if provided
if req.from_time:
# Use `dateparser` to parse natural language timeframes
logger.info(f"{'* ' * 50}\n\nCaptured from time: {req.from_time}\n\n")
parsed_time = parse(req.from_time, settings={'RELATIVE_BASE': datetime.now()})
if not parsed_time:
conv_time = convert_to_ago_format(req.from_time)
if conv_time:
parsed_time = parse(conv_time, settings={'RELATIVE_BASE': datetime.now()})
else:
return {"error": "Invalid from_time format. Please provide a valid time description such as 'past 7 days' or 'since last month'."}
logger.info(f"\n\nConverted from time: {parsed_time}\n\n{'* ' * 50}\n\n")
from_time = parsed_time
logger.info(f"Using parsed from_time: {from_time}")
else:
# If no from_time is provided, use a default value (e.g., the past 7 days)
from_time = datetime.now() - timedelta(days=7)
logger.info(f"Using default from_time: {from_time}")
# Step 2: Build the dynamic SQL query based on the optional filters
filters = []
params = {"from_time": from_time}
if req.ifname:
filters.append("i.ifname = :ifname")
params["ifname"] = req.ifname
if req.region:
filters.append("d.region = :region")
params["region"] = req.region
if req.min_in_errors is not None:
filters.append("i.in_errors >= :min_in_errors")
params["min_in_errors"] = req.min_in_errors
if req.max_in_errors is not None:
filters.append("i.in_errors <= :max_in_errors")
params["max_in_errors"] = req.max_in_errors
if req.min_out_errors is not None:
filters.append("i.out_errors >= :min_out_errors")
params["min_out_errors"] = req.min_out_errors
if req.max_out_errors is not None:
filters.append("i.out_errors <= :max_out_errors")
params["max_out_errors"] = req.max_out_errors
if req.min_in_discards is not None:
filters.append("i.in_discards >= :min_in_discards")
params["min_in_discards"] = req.min_in_discards
if req.max_in_discards is not None:
filters.append("i.in_discards <= :max_in_discards")
params["max_in_discards"] = req.max_in_discards
if req.min_out_discards is not None:
filters.append("i.out_discards >= :min_out_discards")
params["min_out_discards"] = req.min_out_discards
if req.max_out_discards is not None:
filters.append("i.out_discards <= :max_out_discards")
params["max_out_discards"] = req.max_out_discards
return params, filters

View file

@ -4,3 +4,5 @@ sentence-transformers
torch
uvicorn
gliner
pandas
dateparser