Docs
File Management - Python SDK
File Management - Python SDK
Upload, search, and manage files with the Python SDK
File Management
Upload Files
from lumnisai import Client, FileScope, DuplicateHandling
client = Client(api_key="your-api-key")
# Upload a single file
file_result = client.upload_file(
file_path="document.pdf",
scope=FileScope.TENANT,
user_id="user@example.com",
tags=["documentation", "important"],
duplicate_handling=DuplicateHandling.SUFFIX
)
print(f"File ID: {file_result.file_id}")
print(f"Status: {file_result.status}")Upload from File Content
# Upload from bytes or file object
with open("document.pdf", "rb") as f:
file_result = client.upload_file(
file_content=f,
file_name="document.pdf",
scope=FileScope.USER,
user_id="user@example.com",
tags="documentation"
)Wait for File Processing
import time
from lumnisai import ProcessingStatus
# Upload file
file_result = client.upload_file(
file_path="data.csv",
user_id="user@example.com"
)
# Wait for processing to complete
while True:
status = client.get_file_status(
file_result.file_id,
user_id="user@example.com"
)
if status.status == ProcessingStatus.COMPLETED:
print("File processing complete!")
break
elif status.status == ProcessingStatus.ERROR:
print(f"Processing failed: {status.error_message}")
break
print(f"Processing: {status.progress_percentage}%")
time.sleep(2)Search Files
Semantic Search
# Search across files with semantic understanding
results = client.search_files(
query="machine learning algorithms",
user_id="user@example.com",
limit=10,
min_score=0.7,
file_types=["pdf", "md"],
tags=["research"]
)
print(f"Found {results.total_count} results")
for result in results.results:
print(f"\nFile: {result.file.file_name}")
print(f"Score: {result.overall_score:.2f}")
print(f"Type: {result.file.file_type}")
# Access matching chunks
if result.chunks:
print(f"Excerpt: {result.chunks[0].chunk_text[:200]}...")List Files
from lumnisai import FileScope, ProcessingStatus
# List files with filters
files = client.list_files(
user_id="user@example.com",
scope=FileScope.TENANT,
file_type="pdf",
status=ProcessingStatus.COMPLETED,
tags=["important"],
page=1,
limit=20
)
print(f"Total files: {files.total_count}")
for file in files.files:
print(f"\n{file.file_name}")
print(f" Type: {file.file_type}")
print(f" Status: {file.processing_status}")
print(f" Chunks: {file.chunks_embedded}/{file.total_chunks}")
if file.tags:
print(f" Tags: {', '.join(file.tags)}")File Operations
Get File Metadata
# Get file details
file = client.get_file(
file_id="file-id",
user_id="user@example.com"
)
print(f"File: {file.file_name}")
print(f"Size: {file.file_size} bytes")
print(f"Type: {file.file_type}")
print(f"Status: {file.processing_status}")
print(f"Uploaded: {file.created_at}")Get File Content
from lumnisai import ContentType
# Get text content
content = client.get_file_content(
file_id="file-id",
user_id="user@example.com",
content_type=ContentType.TEXT,
start_line=1,
end_line=100
)
print(content.text)
# Get markdown content
markdown = client.get_file_content(
file_id="file-id",
user_id="user@example.com",
content_type=ContentType.MARKDOWN
)
print(markdown.text)Download File
# Download original file
file_bytes = client.download_file(
file_id="file-id",
user_id="user@example.com"
)
# Save to disk
with open("downloaded_file.pdf", "wb") as f:
f.write(file_bytes)
# Or specify save path directly
client.download_file(
file_id="file-id",
user_id="user@example.com",
save_path="local_file.pdf"
)Delete Files
# Delete single file
client.delete_file(
file_id="file-id",
user_id="user@example.com",
hard_delete=True
)
# Delete multiple files (using files resource)
async with client:
await client.files.bulk_delete(
file_ids=["id1", "id2", "id3"],
hard_delete=True
)Complete File Workflow Example
from lumnisai import Client, FileScope, ProcessingStatus, display_progress
from pathlib import Path
import time
# Initialize client
client = Client(api_key="your-api-key")
# Create user
user = client.create_user(
email="analyst@example.com",
first_name="Data",
last_name="Analyst"
)
# Upload file
print("Uploading file...")
file = client.upload_file(
file_path="data.csv",
scope=FileScope.USER,
user_id=user.email,
tags=["analysis", "important"]
)
print(f"File uploaded: {file.file_id}")
# Wait for processing
print("Waiting for processing...")
while True:
status = client.get_file_status(file.file_id, user_id=user.email)
if status.status == ProcessingStatus.COMPLETED:
print("Processing complete!")
break
elif status.status == ProcessingStatus.ERROR:
print(f"Error: {status.error_message}")
break
print(f"Progress: {status.progress_percentage}%")
time.sleep(2)
# Search the file content
print("\nSearching file content...")
search_results = client.search_files(
query="important metrics",
user_id=user.email,
limit=5,
min_score=0.5
)
print(f"Found {search_results.total_count} results")
for result in search_results.results:
print(f"\n{result.file.file_name} (score: {result.overall_score:.2f})")
if result.chunks:
print(f"Excerpt: {result.chunks[0].chunk_text[:150]}...")
# Create AI response with file context
print("\nAnalyzing file with AI...")
for update in client.invoke(
"Analyze the uploaded CSV and provide insights",
stream=True,
user_id=user.email
):
display_progress(update)
if update.state == "completed":
print(f"\n\nAnalysis:\n{update.output_text}")
# List all user's files
files = client.list_files(user_id=user.email)
print(f"\nUser has {files.total_count} files")Bulk File Operations
Upload Multiple Files
import asyncio
from pathlib import Path
from lumnisai import AsyncClient
async def upload_directory(directory_path: str):
client = AsyncClient(api_key="your-api-key")
async with client:
# Get all files
files = list(Path(directory_path).rglob("*.pdf"))
# Upload in batches
batch_size = 5
uploaded = []
for i in range(0, len(files), batch_size):
batch = files[i:i+batch_size]
# Upload batch concurrently
tasks = [
client.upload_file(
file_path=str(f),
user_id="user@example.com"
)
for f in batch
]
results = await asyncio.gather(*tasks)
uploaded.extend(results)
print(f"Uploaded {len(uploaded)}/{len(files)} files")
await asyncio.sleep(1)
return uploaded
# Run upload
uploaded_files = asyncio.run(upload_directory("./documents"))
print(f"Total uploaded: {len(uploaded_files)}")Best Practices
Tag Files for Organization
# Use tags to organize files
client.upload_file(
file_path="report.pdf",
user_id="user@example.com",
tags=["Q1-2025", "finance", "confidential"]
)
# Search by tags
files = client.list_files(
user_id="user@example.com",
tags=["Q1-2025", "finance"]
)Handle Processing Errors
from lumnisai import ProcessingStatus
file = client.upload_file(
file_path="document.pdf",
user_id="user@example.com"
)
status = client.get_file_status(file.file_id, user_id="user@example.com")
if status.status == ProcessingStatus.ERROR:
print(f"Processing failed: {status.error_message}")
# Handle error (e.g., retry, notify user)
elif status.status == ProcessingStatus.COMPLETED:
print(f"File ready! Chunks: {status.chunks_embedded}")Use Appropriate Scope
from lumnisai import FileScope
# User-specific files
client.upload_file(
file_path="user_data.csv",
scope=FileScope.USER,
user_id="user@example.com"
)
# Tenant-wide files (accessible to all users)
client.upload_file(
file_path="company_handbook.pdf",
scope=FileScope.TENANT
)