Sooua
登录
返回文章列表
QRadar··2 分钟阅读

AQL 高级查询

AQL(Ariel Query Language)是 QRadar 的查询语言,用于从 Ariel 数据库检索 Event 和 Flow 数据。

什么是 AQL?

AQL(Ariel Query Language)是 QRadar 的查询语言,用于从 Ariel 数据库检索 Event 和 Flow 数据。

基础语法

SELECT 字段列表
FROM 数据源(events / flows)
WHERE 条件
GROUP BY 分组字段
ORDER BY 排序字段
LIMIT 数量
TIME RANGE

高级查询示例

1. 子查询

-- 查找与已知恶意 IP 通信的内部主机
SELECT sourceip, destinationip, COUNT(*) 
FROM flows 
WHERE destinationip IN (
    SELECT value FROM referenceSet('ThreatIntel_BadIPs')
) 
LAST 24 HOURS
GROUP BY sourceip, destinationip

2. 时间窗口函数

-- 每 5 分钟统计一次登录失败数
SELECT starttime(5 MINUTES), username, COUNT(*) 
FROM events 
WHERE QIDNAME(qid) = 'Failed Login' 
GROUP BY starttime(5 MINUTES), username
LAST 1 HOURS

3. IP 网段查询

-- 查询特定网段的事件
SELECT * FROM events 
WHERE INSUBNET(sourceip, '10.0.0.0/24') = TRUE
LAST 24 HOURS

4. 多表联合

-- 关联事件和资产信息
SELECT sourceip, ASSETUSER(sourceip), ASSETVALUE(sourceip)
FROM events
LAST 1 HOURS

5. 复杂分组统计

-- 统计每个部门的 Top 10 风险事件
SELECT ASSETPROPERTY('Department', sourceip) as dept,
       QIDNAME(qid), 
       COUNT(*) as event_count
FROM events
WHERE magnitude > 5
GROUP BY dept, QIDNAME(qid)
ORDER BY event_count DESC
LIMIT 10
LAST 7 DAYS

6. Flow 深度分析

-- 检测 beaconing 行为
SELECT sourceip, destinationip, 
       INTERVAL(firstpackettime, lastpackettime) / packetcount as avg_interval
FROM flows
WHERE destinationport = 443
  AND packetcount > 100
GROUP BY sourceip, destinationip
HAVING avg_interval BETWEEN 55 AND 65
LAST 24 HOURS

AQL 函数速查

函数说明示例
QIDNAME(qid)获取事件名称QIDNAME(qid) = 'Failed Login'
LOGSOURCENAME(id)获取日志源名称LOGSOURCENAME(logsourceid)
INSUBNET(ip, 'CIDR')IP 是否在网段INSUBNET(sourceip, '10.0.0.0/8')
ASSETUSER(ip)获取资产用户ASSETUSER(sourceip)
REFERENCESET('name')引用集合sourceip IN REFERENCESET('BadIPs')
CONCAT(a, b)字符串拼接CONCAT(username, '@', domain)
LENGTH(str)字符串长度LENGTH(username)
LOWER/UPPER(str)大小写转换LOWER(username)

API 中使用 AQL

import requests
import urllib3
urllib3.disable_warnings()
 
url = "https://qradar/api/ariel/searches"
headers = {"SEC": "<token>", "Content-Type": "application/json"}
 
query = """
SELECT sourceip, destinationip, COUNT(*) 
FROM flows 
WHERE destinationport = 443 
GROUP BY sourceip, destinationip 
LAST 1 HOURS
"""
 
# 提交查询
response = requests.post(url, headers=headers, json={"query_expression": query}, verify=False)
search_id = response.json()["search_id"]
 
# 等待查询完成并获取结果
import time
time.sleep(5)
results_url = f"{url}/{search_id}/results"
results = requests.get(results_url, headers=headers, verify=False)
print(results.json())

上一章:02 - 性能调优 下一章:04 - 自定义 DSM 开发

分享

评论

登录 后参与讨论。

加载中…

相关文章