diff --git a/demos/employee_details_copilot/README.md b/demos/employee_details_copilot/README.md new file mode 100644 index 00000000..d63d5d85 --- /dev/null +++ b/demos/employee_details_copilot/README.md @@ -0,0 +1,28 @@ +# Function calling +This demo shows how you can use intelligent prompt gateway to act a copilot for calling the correct proc by capturing the required and optional parametrs from the prompt. This demo assumes you are using ollama running natively. If you want to run ollama running inside docker then please update ollama endpoint in docker-compose file. + +# Startig the demo +1. Ensure that submodule is up to date + ```sh + git submodule sync --recursive + ``` +1. Create `.env` file and set OpenAI key using env var `OPENAI_API_KEY` +1. Start services + ```sh + docker compose up + ``` +1. Download Bolt-FC model. This demo assumes we have downloaded [Bolt-Function-Calling-1B:Q4_K_M](https://huggingface.co/katanemolabs/Bolt-Function-Calling-1B.gguf/blob/main/Bolt-Function-Calling-1B-Q4_K_M.gguf) to local folder. +1. If running ollama natively run + ```sh + ollama serve + ``` +2. Create model file in ollama repository + ```sh + ollama create Bolt-Function-Calling-1B:Q4_K_M -f Bolt-FC-1B-Q4_K_M.model_file + ``` +3. Navigate to http://localhost:18080/ +4. You can type in queries like "show me the top 5 employees in each department with highest salary" + - You can also ask follow up questions like "just show the top 2" +5. To see metrics navigate to "http://localhost:3000/" (use admin/grafana for login) + - Open up dahsboard named "Intelligent Gateway Overview" + - On this dashboard you can see reuqest latency and number of requests diff --git a/demos/employee_details_copilot/bolt_config.yaml b/demos/employee_details_copilot/bolt_config.yaml new file mode 100644 index 00000000..875f176d --- /dev/null +++ b/demos/employee_details_copilot/bolt_config.yaml @@ -0,0 +1,79 @@ +default_prompt_endpoint: "127.0.0.1" +load_balancing: "round_robin" +timeout_ms: 5000 + +overrides: + # confidence threshold for prompt target intent matching + prompt_target_intent_matching_threshold: 0.8 + +# should not be here +embedding_provider: + name: "bge-large-en-v1.5" + model: "BAAI/bge-large-en-v1.5" + +llm_providers: + + - name: open-ai-gpt-4 + api_key: $OPEN_AI_API_KEY + model: gpt-4 + default: true + +prompt_targets: + + - type: function_resolver + name: top_employees + description: | + Allows you to find the top employees in different groups, such as departments, locations, or position. You can rank the employees by different criteria, like salary, yoe, or rating. Returns the best-ranked employees for each group, helping you identify top n in the list. + parameters: + - name: grouping + description: | + Select how you'd like to group the employees. For example, you can group them by department, location, or their position. The tool will provide the top-ranked employees within each group you choose. + required: true + type: string + enum: [department, location, position] + - name: ranking_criteria + required: true + type: string + description: | + Choose how you'd like to rank the employees. You can rank them by their salary, their yoe, or their rating. The tool will sort the employees based on this ranking and return the best ones from each group. + enum: [salary, yoe, rating] + - name: top_n + required: true + type: integer + description: | + Enter how many of the top employees you want to see in each group. For example, if you enter 3, the tool will show you the top 3 employees for each group you selected. + endpoint: + cluster: databasehost + path: /top_employees + system_prompt: | + You are responsible for retrieving the top N employees per group ranked by a constraint. + + - type: function_resolver + name: aggregate_stats + description: | + Calculate summary statistics for groups of employees. You can group employees by categories like department or location and then compute totals, averages, or other statistics for specific attributes such as salary or yoe. + parameters: + - name: grouping + description: | + Choose how you'd like to organize the employees. For example, you can group them by department, location, or position. The tool will calculate the summary statistics for each group. + required: true + enum: [department, location, position] + - name: aggregate_criteria + description: | + Select the specific attribute you'd like to analyze. This could be something like salary, yoe, or rating. The tool will calculate the statistic you request for this attribute. + required: true + enum: [salary, yoe, rating] + - name: aggregate_type + description: | + Choose the type of statistic you'd like to calculate for the selected attribute. For example, you can calculate the sum, average, minimum, or maximum value for each group. + required: true + enum: [SUM, AVG, MIN, MAX] + endpoint: + cluster: databasehost + path: /aggregate_stats + system_prompt: | + You help calculate summary statistics for groups of employees. First, organize the employees by the specified grouping (e.g., department, location, or position). Then, compute the requested statistic (e.g., total, average, minimum, or maximum) for a specific attribute like salary, experience, or rating. + +clusters: + databasehost: + address: model_server diff --git a/demos/employee_details_copilot/docker-compose.yaml b/demos/employee_details_copilot/docker-compose.yaml new file mode 100644 index 00000000..5f0b05b8 --- /dev/null +++ b/demos/employee_details_copilot/docker-compose.yaml @@ -0,0 +1,127 @@ +services: + + config_generator: + build: + context: ../../ + dockerfile: config_generator/Dockerfile + volumes: + - ../../envoyfilter/envoy.template.yaml:/usr/src/app/envoy.template.yaml + - ./bolt_config.yaml:/usr/src/app/bolt_config.yaml + - ./generated:/usr/src/app/out + + bolt: + build: + context: ../../ + dockerfile: envoyfilter/Dockerfile + hostname: bolt + ports: + - "10000:10000" + - "19901:9901" + volumes: + - ./generated/envoy.yaml:/etc/envoy/envoy.yaml + - /etc/ssl/cert.pem:/etc/ssl/cert.pem + depends_on: + config_generator: + condition: service_completed_successfully + model_server: + condition: service_healthy + environment: + - LOG_LEVEL=debug + + model_server: + build: + context: ../../model_server + dockerfile: Dockerfile + ports: + - "18081:80" + healthcheck: + test: ["CMD", "curl" ,"http://localhost:80/healthz"] + interval: 5s + retries: 20 + volumes: + - ~/.cache/huggingface:/root/.cache/huggingface + + function_resolver: + build: + context: ../../function_resolver + dockerfile: Dockerfile + ports: + - "18082:80" + healthcheck: + test: ["CMD", "curl" ,"http://localhost:80/healthz"] + interval: 5s + retries: 20 + volumes: + - ~/.cache/huggingface:/root/.cache/huggingface + environment: + # use ollama endpoint that is hosted by host machine (no virtualization) + - OLLAMA_ENDPOINT=${OLLAMA_ENDPOINT:-host.docker.internal} + # uncomment following line to use ollama endpoint that is hosted by docker + # - OLLAMA_ENDPOINT=ollama + + ollama: + image: ollama/ollama + container_name: ollama + volumes: + - ./ollama:/root/.ollama + restart: unless-stopped + ports: + - '11434:11434' + profiles: + - manual + + open-webui: + image: ghcr.io/open-webui/open-webui:${WEBUI_DOCKER_TAG-main} + container_name: open-webui + volumes: + - ./open-webui:/app/backend/data + # depends_on: + # - ollama + ports: + - 18090:8080 + environment: + - OLLAMA_BASE_URL=http://${OLLAMA_ENDPOINT:-host.docker.internal}:11434 + - WEBUI_AUTH=false + extra_hosts: + - host.docker.internal:host-gateway + restart: unless-stopped + + chatbot_ui: + build: + context: ../../chatbot_ui + dockerfile: Dockerfile + ports: + - "18080:8080" + environment: + - OPENAI_API_KEY=${OPENAI_API_KEY:?error} + - CHAT_COMPLETION_ENDPOINT=http://bolt:10000/v1/chat/completions + + prometheus: + image: prom/prometheus + container_name: prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yaml' + ports: + - 9090:9090 + restart: unless-stopped + volumes: + - ./prometheus:/etc/prometheus + - ./prom_data:/prometheus + profiles: + - monitoring + + grafana: + image: grafana/grafana + container_name: grafana + ports: + - 3000:3000 + restart: unless-stopped + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=grafana + volumes: + - ./grafana:/etc/grafana/provisioning/datasources + - ./grafana/dashboard.yaml:/etc/grafana/provisioning/dashboards/main.yaml + - ./grafana/dashboards:/var/lib/grafana/dashboards + profiles: + - monitoring diff --git a/demos/network_copilot/README.md b/demos/network_copilot/README.md new file mode 100644 index 00000000..a4a1d08c --- /dev/null +++ b/demos/network_copilot/README.md @@ -0,0 +1,28 @@ +# Function calling +This demo shows how you can use intelligent prompt gateway as a network copilot that could give information about correlation between packet loss with device reboots, downs, or maintainence. This demo assumes you are using ollama running natively. If you want to run ollama running inside docker then please update ollama endpoint in docker-compose file. + +# Startig the demo +1. Ensure that submodule is up to date + ```sh + git submodule sync --recursive + ``` +1. Create `.env` file and set OpenAI key using env var `OPENAI_API_KEY` +1. Start services + ```sh + docker compose up + ``` +1. Download Bolt-FC model. This demo assumes we have downloaded [Bolt-Function-Calling-1B:Q4_K_M](https://huggingface.co/katanemolabs/Bolt-Function-Calling-1B.gguf/blob/main/Bolt-Function-Calling-1B-Q4_K_M.gguf) to local folder. +1. If running ollama natively run + ```sh + ollama serve + ``` +2. Create model file in ollama repository + ```sh + ollama create Bolt-Function-Calling-1B:Q4_K_M -f Bolt-FC-1B-Q4_K_M.model_file + ``` +3. Navigate to http://localhost:18080/ +4. You can type in queries like "show me any packet drops due to interface failure in the past 3 days" + - You can also ask follow up questions like "show me just the ones with maximum 200 in errors" +5. To see metrics navigate to "http://localhost:3000/" (use admin/grafana for login) + - Open up dahsboard named "Intelligent Gateway Overview" + - On this dashboard you can see reuqest latency and number of requests diff --git a/demos/network_copilot/bolt_config.yaml b/demos/network_copilot/bolt_config.yaml new file mode 100644 index 00000000..f79955dc --- /dev/null +++ b/demos/network_copilot/bolt_config.yaml @@ -0,0 +1,135 @@ +default_prompt_endpoint: "127.0.0.1" +load_balancing: "round_robin" +timeout_ms: 5000 + +overrides: + # confidence threshold for prompt target intent matching + prompt_target_intent_matching_threshold: 0.7 + +# should not be here +embedding_provider: + name: "bge-large-en-v1.5" + model: "BAAI/bge-large-en-v1.5" + +llm_providers: + + - name: open-ai-gpt-4 + api_key: $OPEN_AI_API_KEY + model: gpt-4 + default: true + +prompt_targets: + + - type: function_resolver + name: interface_down_packet_drop + description: | + Checks for packet drops due to interface unavailability like reboots, shutdowns, or maintainence events. It allows filtering the results by timeframes, interface name, region, and packet error thresholds. + parameters: + - name: from_time + description: An optional natural language timeframe (e.g., "past 7 days", "since a month") to define the time range for packet drop analysis. + required: false + type: string + - name: ifname + description: An optional interface name filter to apply. + required: false + type: string + - name: region + description: An optional region filter to apply (from the device table). + required: false + type: string + - name: min_in_errors + description: Minimum number of in_errors to filter results. + required: false + type: integer + - name: max_in_errors + description: Maximum number of in_errors to filter results. + required: false + type: integer + - name: min_out_errors + description: Minimum number of out_errors to filter results. + required: false + type: integer + - name: max_out_errors + description: Maximum number of out_errors to filter results. + required: false + type: integer + - name: min_in_discards + description: Minimum number of in_discards to filter results. + required: false + type: integer + - name: max_in_discards + description: Maximum number of in_discards to filter results. + required: false + type: integer + - name: min_out_discards + description: Minimum number of out_discards to filter results. + required: false + type: integer + - name: max_out_discards + description: Maximum number of out_discards to filter results. + required: false + type: integer + endpoint: + cluster: databasehost + path: /interface_down_packet_drop + system_prompt: | + You are responsible for correlating packet drops with interface down events by analyzing packet errors from the given data. + + - type: function_resolver + name: packet_errors_impact_flow + description: | + To find whether packet flows are impacted due to packet errors by correlating the timestamps between the packet errors and the flows. It allows filtering the results by timeframes, interface name, region, and packet error thresholds. + parameters: + - name: from_time + description: An optional natural language timeframe (e.g., "past 7 days", "since a month") to define the time range for the analysis. + required: false + type: string + - name: ifname + description: An optional interface name filter to apply. + required: false + type: string + - name: region + description: An optional region filter to apply (from the device table). + required: false + type: string + - name: min_in_errors + description: Minimum number of in_errors to filter results. + required: false + type: integer + - name: max_in_errors + description: Maximum number of in_errors to filter results. + required: false + type: integer + - name: min_out_errors + description: Minimum number of out_errors to filter results. + required: false + type: integer + - name: max_out_errors + description: Maximum number of out_errors to filter results. + required: false + type: integer + - name: min_in_discards + description: Minimum number of in_discards to filter results. + required: false + type: integer + - name: max_in_discards + description: Maximum number of in_discards to filter results. + required: false + type: integer + - name: min_out_discards + description: Minimum number of out_discards to filter results. + required: false + type: integer + - name: max_out_discards + description: Maximum number of out_discards to filter results. + required: false + type: integer + endpoint: + cluster: databasehost + path: /packet_errors_impact_flow + system_prompt: | + You are responsible for finding and correlating packet errors with the packet flows based on timestamps given in the data. This correlation helps identify if packet flows are impacted by packet errors. + +clusters: + databasehost: + address: model_server diff --git a/demos/network_copilot/docker-compose.yaml b/demos/network_copilot/docker-compose.yaml new file mode 100644 index 00000000..5f0b05b8 --- /dev/null +++ b/demos/network_copilot/docker-compose.yaml @@ -0,0 +1,127 @@ +services: + + config_generator: + build: + context: ../../ + dockerfile: config_generator/Dockerfile + volumes: + - ../../envoyfilter/envoy.template.yaml:/usr/src/app/envoy.template.yaml + - ./bolt_config.yaml:/usr/src/app/bolt_config.yaml + - ./generated:/usr/src/app/out + + bolt: + build: + context: ../../ + dockerfile: envoyfilter/Dockerfile + hostname: bolt + ports: + - "10000:10000" + - "19901:9901" + volumes: + - ./generated/envoy.yaml:/etc/envoy/envoy.yaml + - /etc/ssl/cert.pem:/etc/ssl/cert.pem + depends_on: + config_generator: + condition: service_completed_successfully + model_server: + condition: service_healthy + environment: + - LOG_LEVEL=debug + + model_server: + build: + context: ../../model_server + dockerfile: Dockerfile + ports: + - "18081:80" + healthcheck: + test: ["CMD", "curl" ,"http://localhost:80/healthz"] + interval: 5s + retries: 20 + volumes: + - ~/.cache/huggingface:/root/.cache/huggingface + + function_resolver: + build: + context: ../../function_resolver + dockerfile: Dockerfile + ports: + - "18082:80" + healthcheck: + test: ["CMD", "curl" ,"http://localhost:80/healthz"] + interval: 5s + retries: 20 + volumes: + - ~/.cache/huggingface:/root/.cache/huggingface + environment: + # use ollama endpoint that is hosted by host machine (no virtualization) + - OLLAMA_ENDPOINT=${OLLAMA_ENDPOINT:-host.docker.internal} + # uncomment following line to use ollama endpoint that is hosted by docker + # - OLLAMA_ENDPOINT=ollama + + ollama: + image: ollama/ollama + container_name: ollama + volumes: + - ./ollama:/root/.ollama + restart: unless-stopped + ports: + - '11434:11434' + profiles: + - manual + + open-webui: + image: ghcr.io/open-webui/open-webui:${WEBUI_DOCKER_TAG-main} + container_name: open-webui + volumes: + - ./open-webui:/app/backend/data + # depends_on: + # - ollama + ports: + - 18090:8080 + environment: + - OLLAMA_BASE_URL=http://${OLLAMA_ENDPOINT:-host.docker.internal}:11434 + - WEBUI_AUTH=false + extra_hosts: + - host.docker.internal:host-gateway + restart: unless-stopped + + chatbot_ui: + build: + context: ../../chatbot_ui + dockerfile: Dockerfile + ports: + - "18080:8080" + environment: + - OPENAI_API_KEY=${OPENAI_API_KEY:?error} + - CHAT_COMPLETION_ENDPOINT=http://bolt:10000/v1/chat/completions + + prometheus: + image: prom/prometheus + container_name: prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yaml' + ports: + - 9090:9090 + restart: unless-stopped + volumes: + - ./prometheus:/etc/prometheus + - ./prom_data:/prometheus + profiles: + - monitoring + + grafana: + image: grafana/grafana + container_name: grafana + ports: + - 3000:3000 + restart: unless-stopped + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=grafana + volumes: + - ./grafana:/etc/grafana/provisioning/datasources + - ./grafana/dashboard.yaml:/etc/grafana/provisioning/dashboards/main.yaml + - ./grafana/dashboards:/var/lib/grafana/dashboards + profiles: + - monitoring diff --git a/model_server/app/employee_data_generator.py b/model_server/app/employee_data_generator.py new file mode 100644 index 00000000..cbe04ad6 --- /dev/null +++ b/model_server/app/employee_data_generator.py @@ -0,0 +1,59 @@ +import pandas as pd +import random +import datetime + +def generate_employee_data(conn): + # List of possible names, positions, departments, and locations + names = ["Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Hank", "Ivy", "Jack"] + positions = ["Manager", "Engineer", "Salesperson", "HR Specialist", "Marketing Analyst"] + departments = ["Engineering", "Marketing", "HR", "Sales", "Finance"] + locations = ["New York", "San Francisco", "Austin", "Boston", "Chicago"] + + # Function to generate random hire date + def random_hire_date(): + start_date = datetime.date(2000, 1, 1) + end_date = datetime.date(2023, 12, 31) + time_between_dates = end_date - start_date + days_between_dates = time_between_dates.days + random_number_of_days = random.randrange(days_between_dates) + hire_date = start_date + datetime.timedelta(days=random_number_of_days) + return hire_date + + # Function to generate random employee data + def generate_employee_records(count): + employees = [] + + for _ in range(count): + name = random.choice(names) + position = random.choice(positions) + salary = round(random.uniform(50000, 150000), 2) # Salary between 50,000 and 150,000 + department = random.choice(departments) + location = random.choice(locations) + hire_date = random_hire_date() + performance_score = round(random.uniform(1, 5), 2) # Performance score between 1.0 and 5.0 + years_of_experience = random.randint(1, 30) # Years of experience between 1 and 30 + + employee = { + "position": position, + "name": name, + "salary": salary, + "department": department, + "location": location, + "hire_date": hire_date, + "performance_score": performance_score, + "years_of_experience": years_of_experience + } + + employees.append(employee) + + return employees + + # Generate 10 random employee records + employee_records = generate_employee_records(200) + + # Convert the list of dictionaries to a DataFrame + df = pd.DataFrame(employee_records) + + df.to_sql('employees', conn, index=False) + + return diff --git a/model_server/app/load_models.py b/model_server/app/load_models.py index a0467bfc..26418007 100644 --- a/model_server/app/load_models.py +++ b/model_server/app/load_models.py @@ -2,6 +2,9 @@ import os import sentence_transformers from gliner import GLiNER from transformers import pipeline +import sqlite3 +from employee_data_generator import generate_employee_data +from network_data_generator import generate_device_data, generate_interface_stats_data, generate_flow_data def load_transformers(models = os.getenv("MODELS", "BAAI/bge-large-en-v1.5")): transformers = {} @@ -26,3 +29,22 @@ def load_zero_shot_models(models = os.getenv("ZERO_SHOT_MODELS", "tasksource/deb zero_shot_models[model] = pipeline("zero-shot-classification",model=model) return zero_shot_models + +def load_sql(): + # Example Usage + conn = sqlite3.connect(':memory:') + + # create and load the employees table + generate_employee_data(conn) + + # create and load the devices table + device_data = generate_device_data(conn) + + # create and load the interface_stats table + generate_interface_stats_data(conn, device_data) + + # create and load the flow table + generate_flow_data(conn, device_data) + + + return conn diff --git a/model_server/app/main.py b/model_server/app/main.py index 1bdb0352..2f01e461 100644 --- a/model_server/app/main.py +++ b/model_server/app/main.py @@ -2,8 +2,16 @@ import random from fastapi import FastAPI, Response, HTTPException from pydantic import BaseModel from load_models import load_ner_models, load_transformers, load_zero_shot_models -from datetime import date, timedelta +from datetime import datetime, date, timedelta, timezone import string +import pandas as pd +from load_models import load_sql +import logging +from dateparser import parse +from network_data_generator import convert_to_ago_format, load_params + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) transformers = load_transformers() ner_models = load_ner_models() @@ -144,6 +152,163 @@ async def weather(req: WeatherRequest, res: Response): return weather_forecast + +''' +***** +Adding new functions to test the usecases - Sampreeth +***** +''' + +conn = load_sql() +name_col = "name" + +class TopEmployees(BaseModel): + grouping: str + ranking_criteria: str + top_n: int + + +@app.post("/top_employees") +async def top_employees(req: TopEmployees, res: Response): + name_col = "name" + # Check if `req.ranking_criteria` is a Text object and extract its value accordingly + logger.info(f"{'* ' * 50}\n\nCaptured Ranking Criteria: {req.ranking_criteria}\n\n{'* ' * 50}") + + if req.ranking_criteria == "yoe": + req.ranking_criteria = "years_of_experience" + elif req.ranking_criteria == "rating": + req.ranking_criteria = "performance_score" + + logger.info(f"{'* ' * 50}\n\nFinal Ranking Criteria: {req.ranking_criteria}\n\n{'* ' * 50}") + + + query = f""" + SELECT {req.grouping}, {name_col}, {req.ranking_criteria} + FROM ( + SELECT {req.grouping}, {name_col}, {req.ranking_criteria}, + DENSE_RANK() OVER (PARTITION BY {req.grouping} ORDER BY {req.ranking_criteria} DESC) as emp_rank + FROM employees + ) ranked_employees + WHERE emp_rank <= {req.top_n}; + """ + result_df = pd.read_sql_query(query, conn) + result = result_df.to_dict(orient='records') + return result + + +class AggregateStats(BaseModel): + grouping: str + aggregate_criteria: str + aggregate_type: str + +@app.post("/aggregate_stats") +async def aggregate_stats(req: AggregateStats, res: Response): + logger.info(f"{'* ' * 50}\n\nCaptured Aggregate Criteria: {req.aggregate_criteria}\n\n{'* ' * 50}") + + if req.aggregate_criteria == "yoe": + req.aggregate_criteria = "years_of_experience" + + logger.info(f"{'* ' * 50}\n\nFinal Aggregate Criteria: {req.aggregate_criteria}\n\n{'* ' * 50}") + + logger.info(f"{'* ' * 50}\n\nCaptured Aggregate Type: {req.aggregate_type}\n\n{'* ' * 50}") + if req.aggregate_type.lower() not in ["sum", "avg", "min", "max"]: + if req.aggregate_type.lower() == "count": + req.aggregate_type = "COUNT" + elif req.aggregate_type.lower() == "total": + req.aggregate_type = "SUM" + elif req.aggregate_type.lower() == "average": + req.aggregate_type = "AVG" + elif req.aggregate_type.lower() == "minimum": + req.aggregate_type = "MIN" + elif req.aggregate_type.lower() == "maximum": + req.aggregate_type = "MAX" + else: + raise HTTPException(status_code=400, detail="Invalid aggregate type") + + logger.info(f"{'* ' * 50}\n\nFinal Aggregate Type: {req.aggregate_type}\n\n{'* ' * 50}") + + query = f""" + SELECT {req.grouping}, {req.aggregate_type}({req.aggregate_criteria}) as {req.aggregate_type}_{req.aggregate_criteria} + FROM employees + GROUP BY {req.grouping}; + """ + result_df = pd.read_sql_query(query, conn) + result = result_df.to_dict(orient='records') + return result + +class PacketDropCorrelationRequest(BaseModel): + from_time: str = None # Optional natural language timeframe + ifname: str = None # Optional interface name filter + region: str = None # Optional region filter + min_in_errors: int = None + max_in_errors: int = None + min_out_errors: int = None + max_out_errors: int = None + min_in_discards: int = None + max_in_discards: int = None + min_out_discards: int = None + max_out_discards: int = None + + +@app.post("/interface_down_pkt_drop") +async def interface_down_packet_drop(req: PacketDropCorrelationRequest, res: Response): + + params, filters = load_params(req) + + # Join the filters using AND + where_clause = " AND ".join(filters) + if where_clause: + where_clause = "AND " + where_clause + + # Step 3: Query packet errors and flows from interfacestats and ts_flow + query = f""" + SELECT + d.switchip AS device_ip_address, + i.in_errors, + i.in_discards, + i.out_errors, + i.out_discards, + i.ifname, + t.src_addr, + t.dst_addr, + t.time AS flow_time, + i.time AS interface_time + FROM + device d + INNER JOIN + interfacestats i + ON d.device_mac_address = i.device_mac_address + INNER JOIN + ts_flow t + ON d.switchip = t.sampler_address + WHERE + i.time >= :from_time -- Using the converted timestamp + {where_clause} + ORDER BY + i.time; + """ + + correlated_data = pd.read_sql_query(query, conn, params=params) + + if correlated_data.empty: + default_response = { + "device_ip_address": "0.0.0.0", # Placeholder IP + "in_errors": 0, + "in_discards": 0, + "out_errors": 0, + "out_discards": 0, + "ifname": req.ifname or "unknown", # Placeholder or interface provided in the request + "src_addr": "0.0.0.0", # Placeholder source IP + "dst_addr": "0.0.0.0", # Placeholder destination IP + "flow_time": str(datetime.now(timezone.utc)), # Current timestamp or placeholder + "interface_time": str(datetime.now(timezone.utc)) # Current timestamp or placeholder + } + return [default_response] + + + logger.info(f"Correlated Packet Drop Data: {correlated_data}") + + return correlated_data.to_dict(orient='records') class InsuranceClaimDetailsRequest(BaseModel): policy_number: str @@ -159,3 +324,82 @@ async def insurance_claim_details(req: InsuranceClaimDetailsRequest, res: Respon } return claim_details + + +class FlowPacketErrorCorrelationRequest(BaseModel): + from_time: str = None # Optional natural language timeframe + ifname: str = None # Optional interface name filter + region: str = None # Optional region filter + min_in_errors: int = None + max_in_errors: int = None + min_out_errors: int = None + max_out_errors: int = None + min_in_discards: int = None + max_in_discards: int = None + min_out_discards: int = None + max_out_discards: int = None + +@app.post("/packet_errors_impact_flow") +async def packet_errors_impact_flow(req: FlowPacketErrorCorrelationRequest, res: Response): + + params, filters = load_params(req) + + # Join the filters using AND + where_clause = " AND ".join(filters) + if where_clause: + where_clause = "AND " + where_clause + + # Step 3: Query the packet errors and flows, correlating by timestamps + query = f""" + SELECT + d.switchip AS device_ip_address, + i.in_errors, + i.in_discards, + i.out_errors, + i.out_discards, + i.ifname, + t.src_addr, + t.dst_addr, + t.src_port, + t.dst_port, + t.packets, + t.time AS flow_time, + i.time AS error_time + FROM + device d + INNER JOIN + interfacestats i + ON d.device_mac_address = i.device_mac_address + INNER JOIN + ts_flow t + ON d.switchip = t.sampler_address + WHERE + i.time >= :from_time + AND ABS(strftime('%s', t.time) - strftime('%s', i.time)) <= 300 -- Correlate within 5 minutes + {where_clause} + ORDER BY + i.time; + """ + + correlated_data = pd.read_sql_query(query, conn, params=params) + + if correlated_data.empty: + default_response = { + "device_ip_address": "0.0.0.0", # Placeholder IP + "in_errors": 0, + "in_discards": 0, + "out_errors": 0, + "out_discards": 0, + "ifname": req.ifname or "unknown", # Placeholder or interface provided in the request + "src_addr": "0.0.0.0", # Placeholder source IP + "dst_addr": "0.0.0.0", # Placeholder destination IP + "src_port": 0, + "dst_port": 0, + "packets": 0, + "flow_time": str(datetime.now(timezone.utc)), # Current timestamp or placeholder + "error_time": str(datetime.now(timezone.utc)) # Current timestamp or placeholder + } + return [default_response] + + # Return the correlated data if found + return correlated_data.to_dict(orient='records') diff --git a/model_server/app/network_data_generator.py b/model_server/app/network_data_generator.py new file mode 100644 index 00000000..fe6661c8 --- /dev/null +++ b/model_server/app/network_data_generator.py @@ -0,0 +1,200 @@ +import pandas as pd +import random +from datetime import datetime, timedelta, timezone +import re +import logging +from dateparser import parse + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Function to convert natural language time expressions to "X {time} ago" format +def convert_to_ago_format(expression): + # Define patterns for different time units + time_units = { + r'seconds': 'seconds', + r'minutes': 'minutes', + r'mins': 'mins', + r'hrs': 'hrs', + r'hours': 'hours', + r'hour': 'hour', + r'hr': 'hour', + r'days': 'days', + r'day': 'day', + r'weeks': 'weeks', + r'week': 'week', + r'months': 'months', + r'month': 'month', + r'years': 'years', + r'yrs': 'years', + r'year': 'year', + r'yr': 'year', + } + + # Iterate over each time unit and create regex for each phrase format + for pattern, unit in time_units.items(): + # Handle "for the past X {unit}" + match = re.search(fr'(\d+) {pattern}', expression) + if match: + quantity = match.group(1) + return f"{quantity} {unit} ago" + + # If the format is not recognized, return None or raise an error + return None + + +# Function to generate random MAC addresses +def random_mac(): + return "AA:BB:CC:DD:EE:" + ':'.join([f"{random.randint(0, 255):02X}" for _ in range(2)]) + +# Function to generate random IP addresses +def random_ip(): + return f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}" + +# Generate synthetic data for the device table +def generate_device_data(conn, n=1000,): + device_data = { + 'switchip': [random_ip() for _ in range(n)], + 'hwsku': [f'HW{i+1}' for i in range(n)], + 'hostname': [f'switch{i+1}' for i in range(n)], + 'osversion': [f'v{i+1}' for i in range(n)], + 'layer': ['L2' if i % 2 == 0 else 'L3' for i in range(n)], + 'region': [random.choice(['US', 'EU', 'ASIA']) for _ in range(n)], + 'uptime': [f'{random.randint(0, 10)} days {random.randint(0, 23)}:{random.randint(0, 59)}:{random.randint(0, 59)}' for _ in range(n)], + 'device_mac_address': [random_mac() for _ in range(n)] + } + df = pd.DataFrame(device_data) + df.to_sql('device', conn, index=False) + return df + +# Generate synthetic data for the interfacestats table +def generate_interface_stats_data(conn, device_df, n=1000): + interface_stats_data = [] + for _ in range(n): + device_mac = random.choice(device_df['device_mac_address']) + ifname = random.choice(['eth0', 'eth1', 'eth2', 'eth3']) + time = datetime.now(timezone.utc) - timedelta(minutes=random.randint(0, 1440 * 5)) # random timestamps in the past 5 day + in_discards = random.randint(0, 1000) + in_errors = random.randint(0, 500) + out_discards = random.randint(0, 800) + out_errors = random.randint(0, 400) + in_octets = random.randint(1000, 100000) + out_octets = random.randint(1000, 100000) + + interface_stats_data.append({ + 'device_mac_address': device_mac, + 'ifname': ifname, + 'time': time, + 'in_discards': in_discards, + 'in_errors': in_errors, + 'out_discards': out_discards, + 'out_errors': out_errors, + 'in_octets': in_octets, + 'out_octets': out_octets + }) + df = pd.DataFrame(interface_stats_data) + df.to_sql('interfacestats', conn, index=False) + return + +# Generate synthetic data for the ts_flow table +def generate_flow_data(conn, device_df, n=1000): + flow_data = [] + for _ in range(n): + sampler_address = random.choice(device_df['switchip']) + proto = random.choice(['TCP', 'UDP']) + src_addr = random_ip() + dst_addr = random_ip() + src_port = random.randint(1024, 65535) + dst_port = random.randint(1024, 65535) + in_if = random.randint(1, 10) + out_if = random.randint(1, 10) + flow_start = int((datetime.now() - timedelta(days=random.randint(1, 30))).timestamp()) + flow_end = int((datetime.now() - timedelta(days=random.randint(1, 30))).timestamp()) + bytes_transferred = random.randint(1000, 100000) + packets = random.randint(1, 1000) + flow_time = datetime.now(timezone.utc) - timedelta(minutes=random.randint(0, 1440 * 5)) # random flow time + + flow_data.append({ + 'sampler_address': sampler_address, + 'proto': proto, + 'src_addr': src_addr, + 'dst_addr': dst_addr, + 'src_port': src_port, + 'dst_port': dst_port, + 'in_if': in_if, + 'out_if': out_if, + 'flow_start': flow_start, + 'flow_end': flow_end, + 'bytes': bytes_transferred, + 'packets': packets, + 'time': flow_time + }) + df = pd.DataFrame(flow_data) + df.to_sql('ts_flow', conn, index=False) + return + +def load_params(req): + # Step 1: Convert the from_time natural language string to a timestamp if provided + if req.from_time: + # Use `dateparser` to parse natural language timeframes + logger.info(f"{'* ' * 50}\n\nCaptured from time: {req.from_time}\n\n") + parsed_time = parse(req.from_time, settings={'RELATIVE_BASE': datetime.now()}) + if not parsed_time: + conv_time = convert_to_ago_format(req.from_time) + if conv_time: + parsed_time = parse(conv_time, settings={'RELATIVE_BASE': datetime.now()}) + else: + return {"error": "Invalid from_time format. Please provide a valid time description such as 'past 7 days' or 'since last month'."} + logger.info(f"\n\nConverted from time: {parsed_time}\n\n{'* ' * 50}\n\n") + from_time = parsed_time + logger.info(f"Using parsed from_time: {from_time}") + else: + # If no from_time is provided, use a default value (e.g., the past 7 days) + from_time = datetime.now() - timedelta(days=7) + logger.info(f"Using default from_time: {from_time}") + + # Step 2: Build the dynamic SQL query based on the optional filters + filters = [] + params = {"from_time": from_time} + + if req.ifname: + filters.append("i.ifname = :ifname") + params["ifname"] = req.ifname + + if req.region: + filters.append("d.region = :region") + params["region"] = req.region + + if req.min_in_errors is not None: + filters.append("i.in_errors >= :min_in_errors") + params["min_in_errors"] = req.min_in_errors + + if req.max_in_errors is not None: + filters.append("i.in_errors <= :max_in_errors") + params["max_in_errors"] = req.max_in_errors + + if req.min_out_errors is not None: + filters.append("i.out_errors >= :min_out_errors") + params["min_out_errors"] = req.min_out_errors + + if req.max_out_errors is not None: + filters.append("i.out_errors <= :max_out_errors") + params["max_out_errors"] = req.max_out_errors + + if req.min_in_discards is not None: + filters.append("i.in_discards >= :min_in_discards") + params["min_in_discards"] = req.min_in_discards + + if req.max_in_discards is not None: + filters.append("i.in_discards <= :max_in_discards") + params["max_in_discards"] = req.max_in_discards + + if req.min_out_discards is not None: + filters.append("i.out_discards >= :min_out_discards") + params["min_out_discards"] = req.min_out_discards + + if req.max_out_discards is not None: + filters.append("i.out_discards <= :max_out_discards") + params["max_out_discards"] = req.max_out_discards + + return params, filters diff --git a/model_server/requirements.txt b/model_server/requirements.txt index 231aca9c..23991d57 100644 --- a/model_server/requirements.txt +++ b/model_server/requirements.txt @@ -4,3 +4,5 @@ sentence-transformers torch uvicorn gliner +pandas +dateparser