什么是 AQL?
AQL(Ariel Query Language)是 QRadar 的查询语言,用于从 Ariel 数据库检索 Event 和 Flow 数据。
基础语法
SELECT 字段列表
FROM 数据源(events / flows)
WHERE 条件
GROUP BY 分组字段
ORDER BY 排序字段
LIMIT 数量
TIME RANGE高级查询示例
1. 子查询
-- 查找与已知恶意 IP 通信的内部主机
SELECT sourceip, destinationip, COUNT(*)
FROM flows
WHERE destinationip IN (
SELECT value FROM referenceSet('ThreatIntel_BadIPs')
)
LAST 24 HOURS
GROUP BY sourceip, destinationip2. 时间窗口函数
-- 每 5 分钟统计一次登录失败数
SELECT starttime(5 MINUTES), username, COUNT(*)
FROM events
WHERE QIDNAME(qid) = 'Failed Login'
GROUP BY starttime(5 MINUTES), username
LAST 1 HOURS3. IP 网段查询
-- 查询特定网段的事件
SELECT * FROM events
WHERE INSUBNET(sourceip, '10.0.0.0/24') = TRUE
LAST 24 HOURS4. 多表联合
-- 关联事件和资产信息
SELECT sourceip, ASSETUSER(sourceip), ASSETVALUE(sourceip)
FROM events
LAST 1 HOURS5. 复杂分组统计
-- 统计每个部门的 Top 10 风险事件
SELECT ASSETPROPERTY('Department', sourceip) as dept,
QIDNAME(qid),
COUNT(*) as event_count
FROM events
WHERE magnitude > 5
GROUP BY dept, QIDNAME(qid)
ORDER BY event_count DESC
LIMIT 10
LAST 7 DAYS6. Flow 深度分析
-- 检测 beaconing 行为
SELECT sourceip, destinationip,
INTERVAL(firstpackettime, lastpackettime) / packetcount as avg_interval
FROM flows
WHERE destinationport = 443
AND packetcount > 100
GROUP BY sourceip, destinationip
HAVING avg_interval BETWEEN 55 AND 65
LAST 24 HOURSAQL 函数速查
| 函数 | 说明 | 示例 |
|---|---|---|
QIDNAME(qid) | 获取事件名称 | QIDNAME(qid) = 'Failed Login' |
LOGSOURCENAME(id) | 获取日志源名称 | LOGSOURCENAME(logsourceid) |
INSUBNET(ip, 'CIDR') | IP 是否在网段 | INSUBNET(sourceip, '10.0.0.0/8') |
ASSETUSER(ip) | 获取资产用户 | ASSETUSER(sourceip) |
REFERENCESET('name') | 引用集合 | sourceip IN REFERENCESET('BadIPs') |
CONCAT(a, b) | 字符串拼接 | CONCAT(username, '@', domain) |
LENGTH(str) | 字符串长度 | LENGTH(username) |
LOWER/UPPER(str) | 大小写转换 | LOWER(username) |
API 中使用 AQL
import requests
import urllib3
urllib3.disable_warnings()
url = "https://qradar/api/ariel/searches"
headers = {"SEC": "<token>", "Content-Type": "application/json"}
query = """
SELECT sourceip, destinationip, COUNT(*)
FROM flows
WHERE destinationport = 443
GROUP BY sourceip, destinationip
LAST 1 HOURS
"""
# 提交查询
response = requests.post(url, headers=headers, json={"query_expression": query}, verify=False)
search_id = response.json()["search_id"]
# 等待查询完成并获取结果
import time
time.sleep(5)
results_url = f"{url}/{search_id}/results"
results = requests.get(results_url, headers=headers, verify=False)
print(results.json())上一章:02 - 性能调优 下一章:04 - 自定义 DSM 开发