一、样本标签确认(VirusTotal)
使用 python 完成自动化操作,结果保存成 csv 格式:
将样本文件放在 ./sample 文件夹下
API 格式介绍网站为 https://docs.virustotal.com/reference/files-scan
首先需要提交文件:
import requests
url = "https://www.virustotal.com/api/v3/files"
files = { "file": ("0d49388ec8384d654d8e3383e8ea606c.apk", open("0d49388ec8384d654d8e3383e8ea606c.apk", "rb"), "application/vnd.android.package-archive") }
headers = {
"accept": "application/json",
"x-apikey": "API_KEY"
}
response = requests.post(url, files=files, headers=headers)
print(response.text)
返回类似于
{
"data": {
"type": "analysis",
"id": "MGQ0OTM4OGVjODM4NGQ2NTRkOGUzMzgzZThlYTYwNmM6MTc0Mjc0MTgyOA==",
"links": {
"self": "https://www.virustotal.com/api/v3/analyses/MGQ0OTM4OGVjODM4NGQ2NTRkOGUzMzgzZThlYTYwNmM6MTc0Mjc0MTgyOA=="
}
}
}
然后需要获取文件报告
import requests
url = "https://www.virustotal.com/api/v3/files/MGQ0OTM4OGVjODM4NGQ2NTRkOGUzMzgzZThlYTYwNmM6MTc0Mjc0MTgyOA%3D%3D"
headers = {
"accept": "application/json",
"x-apikey": "API_KEY"
}
response = requests.get(url, headers=headers)
print(response.text)
返回结果是一个 json
{
"data": {
"id": "NGQ3ZGI3NjFjYzk0Mjg3ZWRmMmE3MmJhZTAzZTA2NGY6MTc0Mjc0NDU2MQ==",
"type": "analysis",
"links": {
"self": "https://www.virustotal.com/api/v3/analyses/NGQ3ZGI3NjFjYzk0Mjg3ZWRmMmE3MmJhZTAzZTA2NGY6MTc0Mjc0NDU2MQ==",
"item": "https://www.virustotal.com/api/v3/files/aeff479483af8badfb0f574b16598caa27c6767fafb62b381c67454730c90696"
},
"attributes": {
"results": {
"Bkav": {
"method": "blacklist",
"engine_name": "Bkav",
"engine_version": "2.0.0.1",
"engine_update": "20250323",
"category": "undetected",
"result": null
},
...
},
"stats": {
"malicious": 37,
"suspicious": 0,
"undetected": 24,
"harmless": 0,
"timeout": 7,
"confirmed-timeout": 0,
"failure": 0,
"type-unsupported": 9
},
"date": 1742744561,
"status": "completed"
}
},
"meta": {
"file_info": {
"sha256": "aeff479483af8badfb0f574b16598caa27c6767fafb62b381c67454730c90696",
"md5": "4d7db761cc94287edf2a72bae03e064f",
"sha1": "cf1652cdcaa16042d40a11e2d2d4923604019cd1",
"size": 4240055
}
}
}
之后通过编写 python 自动化脚本得到所有样本的报告关键信息,存储为 CSV 格式。这里需要注意每分钟提交不超过四次,而且每天有限额,而且上传文件会先排队,需要有一个轮询的逻辑。这里一分钟检查一次。
import os
import time
import requests
import csv
import hashlib
# 配置参数
API_KEY = "3e39ab239f6402a091cfe99faafb31ba1858b1d75e5696dd4180c6aa5532a16b"
SAMPLE_DIR = "./sample"
OUTPUT_CSV = "virustotal_report.csv"
MAX_RETRIES = 5 # 最大重试次数
API_DELAY = 60
def get_file_hash(filepath):
"""计算文件的SHA256哈希"""
sha256 = hashlib.sha256()
with open(filepath, "rb") as f:
while chunk := f.read(4096):
sha256.update(chunk)
return sha256.hexdigest()
def upload_file(filepath):
"""上传文件到VirusTotal并返回分析ID"""
url = "https://www.virustotal.com/api/v3/files"
headers = {"x-apikey": API_KEY}
try:
with open(filepath, "rb") as f:
files = {"file": (os.path.basename(filepath), f)}
response = requests.post(url, files=files, headers=headers)
response.raise_for_status()
# print("Getting upload response: ", response.text)
return response.json()['data']['id']
except Exception as e:
print(f"Upload failed for {filepath}: {str(e)}")
return None
def get_analysis_report(analysis_id):
"""轮询获取完整分析报告"""
url = f"https://www.virustotal.com/api/v3/analyses/{analysis_id}"
headers = {"x-apikey": API_KEY}
retries = 0
while retries < MAX_RETRIES:
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
# print("Getting report: ", response.text)
status = data['data']['attributes']['status']
print(f"当前分析状态: {status}") # 调试信息
if status == 'completed':
attributes = data['data']['attributes']
stats = attributes.get('stats', {})
results = attributes.get('results', {})
# 构建结构化报告
return {
"malicious": stats.get("malicious", 0),
"suspicious": stats.get("suspicious", 0),
"total_engines": sum(stats.values()),
"vendors": [
f"{k}:{v['result']}"
for k, v in results.items()
if v.get('category') == 'malicious'
]
}
except KeyError as e:
print(f"关键字段缺失: {str(e)}")
return None
except requests.exceptions.HTTPError as e:
print(f"API请求失败: {e.response.status_code}")
if e.response.status_code == 404:
print("分析ID不存在,终止重试")
break
time.sleep(API_DELAY)
retries += 1
print("超出最大重试次数")
return None
def process_samples():
"""批量处理样本"""
with open(OUTPUT_CSV, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([
'filename', 'malicious', 'suspicious',
'total_engines', 'result', 'detected_vendors',
'report_link', 'sha256'
])
for filename in os.listdir(SAMPLE_DIR):
if not filename.endswith('.apk'):
continue
filepath = os.path.join(SAMPLE_DIR, filename)
print(f"Processing {filename}...")
# 步骤1: 上传文件
analysis_id = upload_file(filepath)
if not analysis_id:
print(f"无法获取 {filename} 的id")
continue
time.sleep(API_DELAY)
# 步骤2: 获取报告
report = get_analysis_report(analysis_id)
if not report:
print(f"无法获取 {filename} 的完整报告")
continue
file_hash = get_file_hash(filepath)
row = [
filename,
report.get('malicious', 0),
report.get('suspicious', 0),
report.get('total_engines', 0),
'Malicious' if report.get('malicious', 0) > 0 else 'Benign',
";".join(report.get('vendors', [])[:3]), # 最多显示三个厂商
f"https://www.virustotal.com/gui/file/{file_hash}/detection",
file_hash
]
writer.writerow(row)
print("report: ", row)
time.sleep(API_DELAY)
if __name__ == "__main__":
process_samples()
print(f"Report saved to {OUTPUT_CSV}")
最终得到
filename,malicious,suspicious,total_engines,result,detected_vendors,report_link,sha256
25be589140f73949124f08759ab5bb57b126396f1401e3bfbfdc5e5c056e0d03.25e35ca260d2ae765cfbe139277fe473.apk,42,0,77,Malicious,Lionic:Trojan.AndroidOS.KungFu.C!c;CAT-QuickHeal:Android.Kungfu.L;Skyhigh:Artemis!Trojan,https://www.virustotal.com/gui/file/25be589140f73949124f08759ab5bb57b126396f1401e3bfbfdc5e5c056e0d03/detection,25be589140f73949124f08759ab5bb57b126396f1401e3bfbfdc5e5c056e0d03
二、AndroGuard 环境配置(macOS)
brew install python
python3 -m venv androenv
source androenv/bin/activate
pip install androguard pandas numpy
如使用 uv 配置环境
[project]
name = "android-virus"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"androguard>=4.1.3",
"pandas>=2.2.3",
"requests>=2.32.3",
"scikit-learn>=1.6.1",
]
三、特征提取脚本
提取证书、权限和字符串信息
import os
from androguard.misc import AnalyzeAPK
import pandas as pd
import re
# 预定义敏感特征库
DANGEROUS_PERMISSIONS = {
'SEND_SMS', 'RECEIVE_SMS', 'READ_SMS', 'WRITE_SMS',
'ACCESS_FINE_LOCATION', 'ACCESS_COARSE_LOCATION',
'READ_CONTACTS', 'WRITE_CONTACTS', 'CALL_PHONE'
}
SENSITIVE_API_PATTERNS = {
'telephony': ['Landroid/telephony/', 'getDeviceId', 'getSimSerialNumber'],
'location': ['Landroid/location/', 'requestLocationUpdates'],
'sms': ['Landroid/telephony/SmsManager', 'sendTextMessage'],
'contact': ['Landroid/provider/ContactsContract', 'getContactList']
}
def analyze_apk(apk_path)
a, d, dx = AnalyzeAPK(apk_path)
# 权限分析
all_perms = a.get_permissions()
dangerous_perms = [p for p in all_perms if any(dp in p for dp in DANGEROUS_PERMISSIONS)]
# API调用分析
sensitive_apis = {}
for category, patterns in SENSITIVE_API_PATTERNS.items():
methods = dx.find_methods(*patterns)
sensitive_apis[f"api_{category}"] = len(methods)
# 证书分析
cert_info = a.get_certificates()[0].get_issuer() if a.get_certificates() else {}
# 动态行为检测
dynamic_loading = len([clazz for clazz in dx.get_classes() if 'BaseDexClassLoader' in clazz.get_superclassname()])
# 字符串特征
suspicious_strings = re.findall(r'(http|ftp|ssh)://\S+', a.get_android_manifest_axml().get_xml())
return {
"filename": os.path.basename(apk_path),
"permissions": ",".join(all_perms),
"dangerous_perms_count": len(dangerous_perms),
"activities": len(a.get_activities()),
"services": len(a.get_services()),
"receivers": len(a.get_receivers()),
**sensitive_apis,
"cert_issuer": cert_info.get('O', 'Unknown'),
"dynamic_loading": dynamic_loading,
"suspicious_urls": len(suspicious_strings)
}
# 批量处理APK
features = []
for apk_file in os.listdir("apk_folder"):
if apk_file.endswith(".apk"):
try:
features.append(analyze_apk(os.path.join("apk_folder", apk_file)))
except Exception as e:
print(f"Error processing {apk_file}: {str(e)}")
pd.DataFrame(features).to_csv("enhanced_features.csv", index=False)
四、自动化大模型判断
import csv
import requests
import time
API_KEY = "sk-0fa29c110d354cb1bc93b6c1cfe2be59"
API_URL = "https://api.deepseek.com/v1/chat/completions"
MODEL_NAME = "deepseek-reasoner"
PROMPT_TEMPLATE = """
根据下面的内容判断样本是否恶意,同时注意不要误报,遵循【误报最小化原则】:
【要求的权限】
{permissions}
【程序中出现的URL】
{suspicous_urls}
【证书信息】
{cert_issuer}
请根据以上信息判断样本是否恶意,然后填写结论。仅填写(恶意/正常):
"""
def main():
with open('local_abstract.csv', 'r') as csvfile, \
open('result.csv', 'w', newline='', encoding='utf-8') as outfile:
reader = csv.DictReader(csvfile)
writer = csv.DictWriter(outfile, fieldnames=['filename', 'prediction'])
writer.writeheader()
for row in reader:
# 解析数据
permissions = row['permissions'].split(',')
suspicious_urls = row['suspicous_urls'].split(',')
cert_issuer = row['cert_issuer']
# 构造prompt
content = PROMPT_TEMPLATE.format(
permissions=permissions,
suspicous_urls=suspicious_urls,
cert_issuer=cert_issuer
)
print("=====================================")
print(f"Analyzing {row['filename']}...")
print(content)
# API调用
response = requests.post(
API_URL,
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"model": MODEL_NAME,
"messages": [{"role": "user", "content": content}],
"temperature": 0.1
},
timeout=120
)
response.raise_for_status()
data = response.json()
prediction = data['choices'][0]['message']['content'].strip()
reasoning = data['choices'][0]['message']['reasoning_content'].strip()
print(f"Reasoning: {reasoning}")
# 写入结果
writer.writerow({
'filename': row['filename'],
'prediction': 1 if '恶意' in prediction else 0
})
# 打印日志
print(f"Processed: {row['filename']} -> {prediction}")
time.sleep(1)
if __name__ == "__main__":
main()
五、自动化评估脚本
from sklearn.metrics import accuracy_score, f1_score
import pandas as pd
true_labels = pd.read_csv("virustotal_report.csv")["result"].map({"Malicious":1, "Benign":0})
pred_labels = pd.read_csv("result.csv")["prediction"]
print(f"True Labels: {true_labels}")
print(f"Pred Labels: {pred_labels}")
print(f"Accuracy: {accuracy_score(true_labels, pred_labels):.2f}")
print(f"F1-Score: {f1_score(true_labels, pred_labels):.2f}")
通过大模型有较高的准确率(毕竟都没有混淆过,特征还是非常明显的)