CrewAI
Integrate BlueNexus MCP tools into your CrewAI agents.
Install
pip install crewai mcp
Setup
import asyncio
from crewai import Agent, Task, Crew
from crewai.tools import tool
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
BLUENEXUS_TOKEN = "your-personal-access-token"
# Create a CrewAI tool that wraps BlueNexus
@tool("BlueNexus Agent")
def bluenexus_agent(prompt: str, connector: str = None) -> str:
"""Access user's connected services (Google, Slack, GitHub, etc.)
via the BlueNexus Universal MCP agent."""
return asyncio.run(_call_bluenexus(prompt, connector))
async def _call_bluenexus(prompt: str, connector: str = None) -> str:
headers = {"Authorization": f"Bearer {BLUENEXUS_TOKEN}"}
async with streamablehttp_client(
"https://api.bluenexus.ai/mcp",
headers=headers
) as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
args = {"prompt": prompt}
if connector:
args["connector"] = connector
result = await session.call_tool("use-agent", arguments=args)
return result.content[0].text
# Define your CrewAI agent
researcher = Agent(
role="Research Assistant",
goal="Help the user with tasks across their connected services",
backstory="You have access to the user's Google, Slack, GitHub, and other services via BlueNexus.",
tools=[bluenexus_agent],
verbose=True
)
# Define a task
task = Task(
description="Find my calendar events for today and summarize them",
agent=researcher,
expected_output="A summary of today's calendar events"
)
# Run the crew
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
print(result)
Multiple Tools
You can create separate tools for different use cases:
@tool("Check Connections")
def list_connections() -> str:
"""List all active service connections for the current user."""
return asyncio.run(_list_connections())
async def _list_connections() -> str:
headers = {"Authorization": f"Bearer {BLUENEXUS_TOKEN}"}
async with streamablehttp_client(
"https://api.bluenexus.ai/mcp",
headers=headers
) as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
result = await session.call_tool("list-connections", arguments={})
return result.content[0].text
Next Steps
- Custom MCP — Raw MCP SDK usage
- MCP Endpoint Reference — Full specification