# workflows/<name>/workflow.py
@dataclass
class MyWorkflowInput:
"""Input for the custom workflow."""
code: Contextual[str]
config: Config
type MyWorkflowOutput = List[sarif.Result]
# workflows/<name>/workflow.py
# Define file patterns for your workflow
FILE_PATTERNS = [
'*.config', '*.ini', '*.yaml', '*.yml', '*.json'
]
# Load prompts from YAML files
PROMPTS = PromptTemplate.from_yaml(os.path.join(os.path.dirname(__file__), "my_prompts.yaml"))
@workflow('my_custom_workflow')
class MyCustomWorkflow(Workflow[MyWorkflowInput, MyWorkflowOutput]):
"""Analyzes custom configuration files for security issues"""
def __init__(self, config: Config, *args, **kwargs):
super().__init__(config, *args, **kwargs)
# Construct an LLM instance
llm = LiteLLM.from_config(config)
# Construct the analysis step
parser = PydanticOutputParser(sarif.RunResults)
self.analysis_step = LLMStep(llm, PROMPTS["system"], PROMPTS["user"], parser)
async def workflow(self, input: MyWorkflowInput) -> MyWorkflowOutput:
"""Main workflow execution"""
# 1. Analyze the configuration file
analysis_results = await self.analysis_step.run({"code": input.code})
# 2. Filter results by confidence threshold
filtered_results = self.filter_results_by_confidence(
analysis_results.results, input.config.confidence
)
return filtered_results
def filter_results_by_confidence(self, results: List[sarif.Result], confidence_threshold: int) -> List[sarif.Result]:
"""Filter results by confidence."""
return [result for result in results if result.properties.confidence > confidence_threshold]
my_prompts.yaml
in the same directory:
system: |
You are a configuration security analyzer.
Your job is to analyze configuration files for security misconfigurations and vulnerabilities.
<vulnerability_types>
Valid vulnerability types (use EXACTLY as shown):
- Hardcoded Credentials
- Insecure Defaults
- Excessive Permissions
- Unencrypted Storage
- Weak Cryptography
- Missing Security Headers
- Debug Mode Enabled
- Exposed Secrets
- Insecure Protocols
- Missing Access Controls
</vulnerability_types>
{{ output_format }}
user: |
Analyze the following configuration file for security issues:
{{ code }}
__init__
function of your workflow:
@dataclass
class MultiStepWorkflowInput:
"""Input for the advanced workflow."""
code: Contextual[str]
config: Config
type MultiStepWorkflowOutput = List[sarif.Result]
@workflow('multi_step_security')
class MultiStepSecurityWorkflow(Workflow[MultiStepWorkflowInput, MultiStepWorkflowOutput]):
"""Multi-stage security analysis workflow"""
def __init__(self, config: Config, *args, **kwargs):
super().__init__(config, *args, **kwargs)
# Construct LLM instance
llm = LiteLLM.from_config(config)
# Scanner step - initial vulnerability detection
scanner_parser = PydanticOutputParser(sarif.RunResults)
self.scanner_step = LLMStep(llm, SCANNER_PROMPTS["system"], SCANNER_PROMPTS["user"], scanner_parser)
# Triager step - detailed analysis with tools
triager_tools = self.get_analysis_tools(config.project_path)
triager_llm = llm.with_tools(triager_tools)
triager_parser = PydanticOutputParser(sarif.Result)
self.triager_step = LLMStep(triager_llm, TRIAGER_PROMPTS["system"], TRIAGER_PROMPTS["user"], triager_parser)
async def workflow(self, input: MultiStepWorkflowInput) -> MultiStepWorkflowOutput:
"""Execute multi-stage security analysis"""
# 1. Initial scan for potential vulnerabilities
potential_vulns = await self.scanner_step.run({"code": input.code})
# 2. Filter by confidence threshold
high_confidence_vulns = self.filter_results_by_confidence(
potential_vulns.results, input.config.confidence
)
# 3. Triage high-confidence vulnerabilities in parallel
import asyncio
triaged_vulns = await asyncio.gather(*[
self.triager_step.run({"vulnerability": str(vuln), "code": input.code})
for vuln in high_confidence_vulns
])
# 4. Final confidence filtering after triage
final_results = self.filter_results_by_confidence(
triaged_vulns, input.config.confidence
)
return final_results
def get_analysis_tools(self, project_path: str):
"""Get tools for detailed code analysis"""
from fraim.tools.tree_sitter import TreeSitterTools
return TreeSitterTools(project_path).tools
def filter_results_by_confidence(self, results: List[sarif.Result], confidence_threshold: int) -> List[sarif.Result]:
"""Filter results by confidence threshold."""
return [result for result in results if result.properties.confidence > confidence_threshold]
# test_my_workflow.py
import pytest
import types
from fraim.config import Config
from fraim.core.contextuals import Contextual
@pytest.mark.asyncio
async def test_my_custom_workflow():
config = Config(confidence=7, project_path="/test/path")
workflow = MyCustomWorkflow(config)
code = Contextual("password=secret123\napi_key=abc123")
input_data = MyWorkflowInput(code=code, config=config)
results = await workflow.workflow(input_data)
assert len(results) > 0
assert any("hardcoded" in result.message.text.lower() for result in results)
# Test your workflow with fraim CLI
fraim --debug my_custom_workflow --location .