简单的TermuxAgent

Star Dust Lv1

概述

Boundless(无量) 是一个运行在 Termux 中的智能编程助手,基于 DeepSeek-v4-pro API。它提供了流式思考 + 动态 Markdown 渲染、文件读写、命令实时输出、目录切换、必应搜索、网页抓取、Skill 管理、上下文管理、Token 裁剪、对话导出等功能。


安装依赖

1
2
3
4
5
# Python 依赖
pip install requests beautifulsoup4 rich pyfiglet

# 可选:安装 readline 以支持方向键
pkg install readline

环境变量

启动前必须设置 DeepSeek API Key:

1
2
3
4
export DEEPSEEK_API_KEY="your-api-key-here"

# 可选:开启自动确认模式(跳过所有确认提示)
export AUTO_CONFIRM=true

启动方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 直接启动(显示项目列表或进入全局模式)
python agent.py

# 创建新项目
python agent.py new <项目名>

# 管理 Skill
python agent.py skill list # 列出已安装 Skill
python agent.py skill install <路径> # 安装 Skill(支持本地路径或 URL)
python agent.py skill remove <名称> # 卸载 Skill

# 管理全局上下文
python agent.py context view # 查看
python agent.py context clear # 清空

# 管理项目
python agent.py project remove <名称> # 删除项目
python agent.py project rename <旧> <新> # 重命名项目

# 开启调试日志
python agent.py --debug

对话内命令

命令说明
""""""多行输入模式,以三个双引号结束
/clear清空当前对话,开始新会话
/backup手动备份当前对话记录
/export导出对话为 Markdown 文件
/edit调用外部编辑器(默认 nano)输入长文本
/help显示帮助信息
exit / q / quit退出程序

AI 可调用的内置工具

工具功能
read_file读取文件内容
write_file写入文件
list_directory列出目录
execute_command执行 shell 命令(实时输出)
web_fetch抓取网页
web_search必应搜索
change_directory切换工作目录
read_skill读取指定 Skill 的完整指令
list_skills列出所有已安装 Skill

配置文件

位置:~/.boundless/config.json

1
2
3
4
5
{
"max_context_tokens": 102400,
"keep_recent_rounds": 10,
"backup_count": 3
}

完整源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
#!/usr/bin/env python3
"""
Boundless (无量) - DeepSeek-v4-pro Code Agent for Termux
功能:流式思考 + 动态 Markdown 渲染、文件读写、命令实时输出、目录切换、搜索、网页抓取
项目管理、Skill 管理、上下文管理、Token 裁剪、对话导出等
"""

import os, sys, json, subprocess, textwrap, urllib.request, urllib.parse, urllib.error
import shutil, tempfile, zipfile, tarfile, threading, time, logging, argparse
from pathlib import Path
from queue import Queue, Empty
from typing import List, Dict, Any, Optional

try:
import readline
except ImportError:
print("💡 提示:安装 readline 可让方向键正常使用:pkg install readline")

try:
import requests
from bs4 import BeautifulSoup
except ImportError:
sys.exit("请先安装依赖:pip install requests beautifulsoup4")

try:
from rich.console import Console
from rich.markdown import Markdown
from rich.live import Live
console = Console()
except ImportError:
sys.exit("请先安装 rich:pip install rich")

try:
from pyfiglet import Figlet
except ImportError:
sys.exit("请先安装 pyfiglet:pip install pyfiglet")

parser = argparse.ArgumentParser(description="Boundless Agent", add_help=False)
parser.add_argument("command", nargs="?", default=None, help="子命令:new, context, skill, project")
parser.add_argument("args", nargs="*", help="子命令参数")
parser.add_argument("--debug", action="store_true", help="启用调试日志")
args, unknown = parser.parse_known_args()

if args.debug:
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
else:
logging.basicConfig(level=logging.WARNING)

logger = logging.getLogger("boundless")

def is_termux() -> bool:
return os.path.isdir("/data/data/com.termux/files/usr") or "TERMUX_VERSION" in os.environ

def auto_confirm() -> bool:
return os.environ.get("AUTO_CONFIRM", "").strip().lower() == "true"

def confirm_action(prompt: str) -> bool:
if auto_confirm():
return True
ans = input(f"{prompt} (Y/n): ").strip().lower()
return ans in ("", "y")

BOUNDLESS_DIR = Path.home() / ".boundless"
SKILLS_DIR = BOUNDLESS_DIR / "skills"
CONFIG_FILE = BOUNDLESS_DIR / "config.json"
HISTORY_FILE = BOUNDLESS_DIR / "history"

def setup_readline():
try:
import readline
histfile = str(HISTORY_FILE)
try:
readline.read_history_file(histfile)
readline.set_history_length(1000)
except FileNotFoundError:
pass
import atexit
atexit.register(readline.write_history_file, histfile)
except Exception:
pass

DEFAULT_CONFIG = {
"max_context_tokens": 102400,
"keep_recent_rounds": 10,
"backup_count": 3,
}

def load_config() -> dict:
if not CONFIG_FILE.exists():
CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(DEFAULT_CONFIG, f, indent=2)
return DEFAULT_CONFIG.copy()
try:
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
user = json.load(f)
config = DEFAULT_CONFIG.copy()
config.update(user)
return config
except Exception:
return DEFAULT_CONFIG.copy()

def estimate_tokens(text: str) -> int:
count = 0
for ch in text:
if '\u4e00' <= ch <= '\u9fff' or '\u3400' <= ch <= '\u4dbf':
count += 2
else:
count += 1
return count

def count_messages_tokens(messages: List[Dict[str, Any]]) -> int:
total = 0
for msg in messages:
content = msg.get("content") or ""
total += estimate_tokens(content)
for tc in msg.get("tool_calls") or []:
args = tc.get("function", {}).get("arguments", "")
total += estimate_tokens(args)
total += 4
return total

def show_banner():
f = Figlet(font='slant')
banner = f.renderText('Boundless')
console.print(f"[bold cyan]{banner}[/bold cyan]")
console.print("无量 · 智能编程助手", style="italic")

def execute_command(command: str) -> str:
dangerous = ["rm -rf /", "dd if=", "mkfs.", ":(){ :|:& };:"]
for pat in dangerous:
if pat in command:
return f"拒绝高危命令: {pat}"
if not confirm_action(f"即将执行: {command}\n继续?"):
return "用户取消"

console.print("[bold yellow]⚡ 实时输出:[/bold yellow]")
stdout_lines = []
stderr_lines = []
out_queue = Queue()
stop_event = threading.Event()

def reader(pipe, is_stderr):
try:
for line in iter(pipe.readline, ""):
if stop_event.is_set():
break
out_queue.put((is_stderr, line))
except ValueError:
pass
finally:
pipe.close()

try:
proc = subprocess.Popen(
command, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, bufsize=1,
executable="/data/data/com.termux/files/usr/bin/bash" if is_termux() else None,
)
t1 = threading.Thread(target=reader, args=(proc.stdout, False), daemon=True)
t2 = threading.Thread(target=reader, args=(proc.stderr, True), daemon=True)
t1.start()
t2.start()

while True:
try:
is_stderr, line = out_queue.get(timeout=0.2)
if is_stderr:
console.print(f"[red]{line.rstrip()}[/red]", markup=False, highlight=False)
stderr_lines.append(line)
else:
console.print(line.rstrip(), markup=False, highlight=False)
stdout_lines.append(line)
except Empty:
if proc.poll() is not None and out_queue.empty():
break

stop_event.set()
t1.join(timeout=2)
t2.join(timeout=2)
ret = proc.wait()

except Exception as e:
return f"命令执行异常: {e}"

out_text = "".join(stdout_lines).rstrip()
err_text = "".join(stderr_lines).rstrip()
result = f"[返回码: {ret}]\n"
if out_text:
result += "[标准输出]\n" + out_text + "\n"
if err_text:
result += "[错误输出]\n" + err_text + "\n"
if not out_text and not err_text:
result += "(无输出)"
return result.strip()

def read_file(path: str) -> str:
p = Path(path).expanduser().resolve()
if not p.exists():
return f"错误:文件 {p} 不存在"
if p.is_dir():
return f"{p} 是目录"
try:
content = p.read_text(encoding="utf-8", errors="ignore")
lines = content.splitlines()
if len(lines) > 100:
return f"文件 {p.name}{len(lines)} 行,前100行:\n" + "\n".join(lines[:100])
return content
except Exception as e:
return f"读取失败: {e}"

def write_file(path: str, content: str) -> str:
p = Path(path).expanduser().resolve()
if p.exists() and not confirm_action(f"文件 {p} 已存在,覆盖?"):
return "写入取消"
try:
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content, encoding="utf-8")
return f"成功写入 {p},共 {len(content)} 字符"
except Exception as e:
return f"写入失败: {e}"

def list_directory(path: str = ".") -> str:
p = Path(path).expanduser().resolve()
if not p.exists():
return f"路径不存在: {p}"
if not p.is_dir():
return f"{p} 不是目录"
try:
items = sorted(p.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower()))
lines = [f" {i.name}{'/' if i.is_dir() else ''}" for i in items]
return "\n".join(lines) if lines else "目录为空"
except Exception as e:
return f"列出失败: {e}"

def web_fetch(url: str) -> str:
if not url.startswith(("http://", "https://")):
url = "https://" + url
try:
headers = {"User-Agent": "Mozilla/5.0 (Termux Agent)"}
resp = requests.get(url, headers=headers, timeout=15)
resp.raise_for_status()
text = resp.text[:2000]
if len(resp.text) > 2000:
text += "\n...(内容已截断)"
return text
except Exception as e:
return f"网页抓取失败: {e}"

def web_search(query: str) -> str:
try:
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9",
}
url = f"https://cn.bing.com/search?q={urllib.parse.quote(query)}"
resp = requests.get(url, headers=headers, timeout=10)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for item in soup.select("li.b_algo"):
title_elem = item.select_one("h2 a")
desc_elem = item.select_one(".b_caption p")
title = title_elem.get_text(strip=True) if title_elem else ""
link = title_elem.get("href", "") if title_elem else ""
desc = desc_elem.get_text(strip=True) if desc_elem else ""
if title:
results.append(f"📌 {title}\n {desc}\n {link}")
if not results:
for item in soup.select("ol#b_results li.b_algo"):
title_elem = item.select_one("h2 a")
desc_elem = item.select_one("p")
title = title_elem.get_text(strip=True) if title_elem else ""
link = title_elem.get("href", "") if title_elem else ""
desc = desc_elem.get_text(strip=True) if desc_elem else ""
if title:
results.append(f"📌 {title}\n {desc}\n {link}")
return "\n\n".join(results[:5]) if results else "未找到相关搜索结果"
except Exception as e:
return f"搜索失败: {e}"

def change_directory(path: str) -> str:
try:
target = Path(path).expanduser().resolve()
if not target.is_dir():
return f"错误:{target} 不是目录或不存在"
os.chdir(target)
return f"当前目录已更改为: {os.getcwd()}"
except Exception as e:
return f"切换目录失败: {e}"

def list_installed_skills() -> list:
if not SKILLS_DIR.exists():
return []
skills = []
for d in sorted(SKILLS_DIR.iterdir()):
if d.is_dir() and (d / "SKILL.md").exists():
meta = parse_skill_metadata(d / "SKILL.md")
name = meta.get("name") if meta else None
if not name:
name = d.name
description = meta.get("description", "") if meta else ""
skills.append({
"name": name,
"description": description,
"path": str(d)
})
return skills

def parse_skill_metadata(skill_md_path: Path) -> dict:
try:
content = skill_md_path.read_text(encoding="utf-8")
if content.startswith("---"):
end = content.find("---", 3)
if end != -1:
yaml_str = content[3:end].strip()
meta = {}
for line in yaml_str.splitlines():
if ":" in line:
key, val = line.split(":", 1)
key = key.strip()
val = val.strip().strip('"').strip("'")
if key in ("name", "description"):
meta[key] = val
return meta
except Exception:
pass
return {}

def install_skill_from_source(source_path: str) -> str:
src = source_path
tmpdir = None

if src.startswith(("http://", "https://")):
try:
console.print(f"正在下载 Skill: {src}")
resp = requests.get(src, stream=True, timeout=60)
resp.raise_for_status()
content_disposition = resp.headers.get("Content-Disposition", "")
filename = None
if "filename=" in content_disposition:
import re
fname_match = re.search(r'filename[^;=\n]*=((["\']).*?\2|[^;\n]*)', content_disposition)
if fname_match:
filename = fname_match.group(1).strip('"\' ')
if not filename:
parsed_url = urllib.parse.urlparse(src)
filename = Path(parsed_url.path).name
if not filename:
filename = "skill_download"

suffix = Path(filename).suffix or ".tmp"
tmp_fd, tmp_path = tempfile.mkstemp(suffix=suffix)
os.close(tmp_fd)
with open(tmp_path, 'wb') as f:
total_size = int(resp.headers.get('content-length', 0))
downloaded = 0
for chunk in resp.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total_size:
percent = downloaded / total_size * 100
console.print(f"\r下载进度: {percent:.0f}%", end="")
console.print()
src = str(Path(tmp_path))
except Exception as e:
return f"下载 Skill 失败: {e}"

src_path = Path(src).expanduser().resolve()
if not src_path.exists():
if tmpdir:
shutil.rmtree(tmpdir, ignore_errors=True)
return f"源路径不存在: {src_path}"

try:
if src_path.suffix in ('.zip', '.gz', '.bz2', '.xz'):
tmpdir = tempfile.mkdtemp()
if src_path.suffix == '.zip':
with zipfile.ZipFile(src_path, 'r') as zf:
zf.extractall(tmpdir)
else:
with tarfile.open(src_path, 'r:*') as tf:
tf.extractall(tmpdir)
skill_root = None
for root, dirs, files in os.walk(tmpdir):
if "SKILL.md" in files:
skill_root = Path(root)
break
if not skill_root:
shutil.rmtree(tmpdir)
return "未在压缩包中找到 SKILL.md"
meta = parse_skill_metadata(skill_root / "SKILL.md")
name = meta.get("name") if meta else None
if not name:
name = skill_root.name
dest = SKILLS_DIR / name
if dest.exists():
shutil.rmtree(tmpdir)
return f"Skill '{name}' 已存在,请先卸载旧版本"
shutil.copytree(skill_root, dest)
shutil.rmtree(tmpdir)
return f"Skill '{name}' 安装成功"
elif src_path.is_dir():
md = src_path / "SKILL.md"
if not md.exists():
return "目标文件夹中未找到 SKILL.md"
meta = parse_skill_metadata(md)
name = meta.get("name") if meta else None
if not name:
name = src_path.name
dest = SKILLS_DIR / name
if dest.exists():
return f"Skill '{name}' 已存在"
shutil.copytree(src_path, dest)
return f"Skill '{name}' 安装成功"
else:
return "不支持的文件类型,请提供文件夹或压缩包"
except Exception as e:
return f"安装失败: {e}"
finally:
if source_path.startswith(("http://", "https://")) and src_path.exists():
try:
src_path.unlink()
except:
pass

def remove_skill(name: str) -> str:
target = SKILLS_DIR / name
if not target.exists():
return f"Skill '{name}' 不存在"
try:
shutil.rmtree(target)
return f"Skill '{name}' 已卸载"
except Exception as e:
return f"卸载失败: {e}"

def preview_skill(source_path: str) -> str:
src = source_path

if src.startswith(("http://", "https://")):
try:
console.print(f"正在下载预览: {src}")
resp = requests.get(src, timeout=30)
resp.raise_for_status()
parsed_url = urllib.parse.urlparse(src)
filename = Path(parsed_url.path).name
if any(ext in filename.lower() for ext in ('.zip', '.tar.gz', '.tgz', '.bz2', '.xz')):
tmpfd, tmp_path = tempfile.mkstemp(suffix=Path(filename).suffix or ".tmp")
os.close(tmpfd)
with open(tmp_path, 'wb') as f:
f.write(resp.content)
return preview_skill(tmp_path)
else:
return resp.text[:2000]
except Exception as e:
return f"预览失败: {e}"

src_path = Path(src).expanduser().resolve()
if not src_path.exists():
return "源路径不存在"
if src_path.suffix in ('.zip', '.gz', '.bz2', '.xz'):
try:
tmpdir = tempfile.mkdtemp()
if src_path.suffix == '.zip':
with zipfile.ZipFile(src_path, 'r') as zf:
zf.extractall(tmpdir)
else:
with tarfile.open(src_path, 'r:*') as tf:
tf.extractall(tmpdir)
for root, dirs, files in os.walk(tmpdir):
if "SKILL.md" in files:
md_path = Path(root) / "SKILL.md"
content = md_path.read_text(encoding="utf-8")
shutil.rmtree(tmpdir)
return content
shutil.rmtree(tmpdir)
return "压缩包中未找到 SKILL.md"
except Exception as e:
return f"预览失败: {e}"
elif src_path.is_dir():
md = src_path / "SKILL.md"
if md.exists():
return md.read_text(encoding="utf-8")
return "文件夹中未找到 SKILL.md"
else:
try:
return src_path.read_text(encoding="utf-8", errors="ignore")[:2000]
except:
return "不支持的文件类型"

def get_base_tools():
return [
{"type": "function", "function": {"name": "read_file", "description": "读取文件内容", "parameters": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}}},
{"type": "function", "function": {"name": "write_file", "description": "写入文件", "parameters": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}},
{"type": "function", "function": {"name": "list_directory", "description": "列出目录", "parameters": {"type": "object", "properties": {"path": {"type": "string", "default": "."}}}}},
{"type": "function", "function": {"name": "execute_command", "description": "执行 shell 命令(实时输出)", "parameters": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}},
{"type": "function", "function": {"name": "web_fetch", "description": "抓取网页", "parameters": {"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}}},
{"type": "function", "function": {"name": "web_search", "description": "必应搜索", "parameters": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}}},
{"type": "function", "function": {"name": "change_directory", "description": "切换工作目录", "parameters": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}}},
]

def get_skill_tools():
return [
{"type": "function", "function": {"name": "read_skill", "description": "读取指定 Skill 的完整 SKILL.md 指令", "parameters": {"type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"]}}},
{"type": "function", "function": {"name": "list_skills", "description": "列出所有已安装的 Skill 名称及描述", "parameters": {"type": "object", "properties": {}}}},
]

DEEPSEEK_BASE = "https://api.deepseek.com/v1"

def deepseek_stream(messages, tools=None, max_retries=3):
api_key = os.environ.get("DEEPSEEK_API_KEY")
if not api_key:
sys.exit("请设置环境变量 DEEPSEEK_API_KEY")
url = f"{DEEPSEEK_BASE}/chat/completions"
payload = {
"model": "deepseek-v4-pro",
"messages": messages,
"stream": True,
"tools": tools or [],
"tool_choice": "auto",
"thinking": {"type": "enabled"},
}
data = json.dumps(payload).encode("utf-8")
last_exception = None
for attempt in range(max_retries):
try:
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Authorization", f"Bearer {api_key}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "text/event-stream")
logger.debug(f"API 请求 (尝试 {attempt+1})")
return urllib.request.urlopen(req, timeout=180)
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
last_exception = Exception(f"API 请求失败 ({e.code}): {error_body}")
if e.code in (429, 500, 502, 503, 504):
wait = 2 ** attempt
console.print(f"[yellow]服务器错误,{wait}秒后重试...[/yellow]")
time.sleep(wait)
else:
raise last_exception
except Exception as e:
last_exception = Exception(f"网络错误: {e}")
wait = 2 ** attempt
console.print(f"[yellow]网络错误,{wait}秒后重试...[/yellow]")
time.sleep(wait)
raise last_exception

class BoundlessAgent:
def __init__(self, context_file: Path):
if not os.environ.get("DEEPSEEK_API_KEY"):
sys.exit("请设置环境变量 DEEPSEEK_API_KEY")
self.context_file = context_file
self.backup_dir = context_file.parent / (context_file.stem + "_backups")
self.config = load_config()

self.tools = get_base_tools() + get_skill_tools()
self.function_map = {
"read_file": read_file,
"write_file": write_file,
"list_directory": list_directory,
"execute_command": self._execute_command_paged,
"web_fetch": web_fetch,
"web_search": web_search,
"change_directory": change_directory,
"read_skill": self.read_skill,
"list_skills": self.list_skills,
}
self.messages = []
self._saved_len = 0
self._load_context()

def _execute_command_paged(self, command: str) -> str:
raw_result = execute_command(command)
out_lines = raw_result.splitlines()
if len(out_lines) > 20 and not auto_confirm():
console.print(f"[yellow]输出共 {len(out_lines)} 行,使用 less 查看?(Y/n/skip)[/yellow] ", end="")
ans = input().strip().lower()
if ans in ("", "y"):
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8') as f:
f.write(raw_result)
tmpname = f.name
os.system(f"less -R {tmpname}")
os.unlink(tmpname)
return "[输出已通过 less 查看]"
elif ans == "skip":
return "[输出过长,已跳过]"
return raw_result

def _load_context(self):
if self.context_file.exists():
try:
with open(self.context_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
if lines:
messages = [json.loads(line) for line in lines if line.strip()]
user_count = sum(1 for m in messages if m.get("role") == "user")
last_user_content = ""
for m in reversed(messages):
if m.get("role") == "user":
last_user_content = m.get("content", "")
break
preview = last_user_content[:60] + ("..." if len(last_user_content) > 60 else "")

console.print(f"\n📂 发现对话记录 ({self.context_file.name}):", style="bold")
console.print(f" 总轮次:{user_count} 最后提问:{preview}")
console.print(" 选项:[Y] 继续 | [n] 开始新对话 | [v] 查看完整历史")
ans = input("> ").strip().lower()

if ans == 'v':
console.print("\n--- 历史消息概览 ---")
for idx, msg in enumerate(messages):
role = msg.get("role", "?")
content = str(msg.get("content", ""))[:100]
console.print(f" [{idx}] {role}: {content}")
ans2 = input("\n是否继续此对话?(Y/n): ").strip().lower()
if ans2 in ("", "y"):
ans = "y"
else:
ans = "n"

if ans in ("", "y"):
self.messages = messages
self._update_system_prompt()
console.print("✅ 已恢复对话", style="green")
else:
console.print("开始新对话,旧记录将移至备份")
self._backup_file(keep_original=False)
self.context_file.unlink(missing_ok=True)
self._build_system_prompt()
self._save_context()
self._saved_len = len(self.messages)
return
except Exception as e:
console.print(f"⚠️ 上下文加载失败: {e},创建新对话")
self._build_system_prompt()
self._save_context()
self._saved_len = len(self.messages)

def _save_context(self):
if not self.context_file.parent.exists():
self.context_file.parent.mkdir(parents=True, exist_ok=True)
new_msgs = self.messages[self._saved_len:]
if new_msgs:
try:
with open(self.context_file, 'a', encoding='utf-8') as f:
for msg in new_msgs:
f.write(json.dumps(msg, ensure_ascii=False) + '\n')
self._saved_len = len(self.messages)
except Exception as e:
console.print(f"保存上下文失败: {e}", style="red")

def _backup_file(self, keep_original=True):
if not self.context_file.exists():
return
self.backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = time.strftime("%Y%m%d_%H%M%S")
backup_name = f"{self.context_file.stem}_{timestamp}.jsonl"
backup_path = self.backup_dir / backup_name
try:
shutil.copy2(self.context_file, backup_path)
backups = sorted(self.backup_dir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True)
for old in backups[self.config.get("backup_count", 3):]:
old.unlink()
if not keep_original:
self.context_file.unlink(missing_ok=True)
except Exception as e:
console.print(f"备份失败: {e}", style="red")

def _trim_context(self):
max_tokens = self.config.get("max_context_tokens", 0)
keep_rounds = self.config.get("keep_recent_rounds", 10)
if max_tokens <= 0:
return

current_tokens = count_messages_tokens(self.messages)
if current_tokens <= max_tokens:
return

system_msgs = [m for m in self.messages if m["role"] == "system"]
user_indices = [i for i, m in enumerate(self.messages) if m["role"] == "user"]
if len(user_indices) <= keep_rounds:
trimmed = system_msgs.copy()
for msg in self.messages:
if msg["role"] != "system":
trimmed.append(msg)
while len(trimmed) > len(system_msgs) and count_messages_tokens(trimmed) > max_tokens:
for i in range(len(trimmed)):
if trimmed[i]["role"] != "system":
del trimmed[i]
break
self.messages = trimmed
else:
last_round_start = user_indices[-keep_rounds]
trimmed = system_msgs + self.messages[last_round_start:]
self.messages = trimmed

while count_messages_tokens(self.messages) > max_tokens and len(self.messages) > len(system_msgs):
for i in range(len(self.messages)):
if self.messages[i]["role"] != "system":
del self.messages[i]
break

console.print("🧹 上下文已自动裁剪以控制 Token 用量", style="dim")

def _make_system_message(self):
skills = list_installed_skills()
if skills:
lines = []
for s in skills:
name = s['name']
desc = s.get('description', '')
if desc:
lines.append(f"- {name}: {desc}")
else:
lines.append(f"- {name}")
skill_list = "\n".join(lines)
skill_desc = (
f"\n已安装的 Skill(根据描述判断何时调用 read_skill 获取详细指令):\n"
f"{skill_list}\n"
f"注意:仅在当前任务确实需要时才调用 read_skill,切勿预读所有 Skill。"
)
else:
skill_desc = "\n当前无已安装的 Skill。可使用 ag skill install 安装。"

return {
"role": "system",
"content": (
"你是 Boundless,一个运行在 Termux 中的智能编程助手。\n"
"工具能力:读取/写入文件、执行 shell 命令、浏览目录、抓取网页、必应搜索、切换目录。\n"
"工作原则:\n"
"1. 多步操作前先规划,再调用工具。\n"
"2. 执行命令后检查返回码和错误输出;若出错,分析原因并尝试修复代码,最多重试3次,若仍失败则请求用户指导。\n"
"3. 修改系统或文件前,在思考中说明目的并等待用户确认。\n"
"4. 写入文件前若已存在,提醒用户可能覆盖。\n"
"5. 技术问题优先使用 web_search,必要时用 web_fetch 阅读文档。\n"
"6. 默认使用当前工作目录的相对路径。\n"
"7. 当用户要求"输出"、"显示"或"查看"内容(如 Markdown 文档、代码片段等)时,请在回复中直接使用 Markdown 格式呈现,不要调用 write_file 写入文件,除非用户明确要求保存。\n"
"8. 对于已安装的 Skill,仅在当前任务确实需要时才调用 read_skill 获取详细指令,切勿预读所有 Skill。\n"
f"{skill_desc}"
)
}

def _build_system_prompt(self):
self.messages = [self._make_system_message()]

def _update_system_prompt(self):
new_sys = self._make_system_message()
for i, msg in enumerate(self.messages):
if msg.get("role") == "system":
self.messages[i] = new_sys
return
self.messages.insert(0, new_sys)

def read_skill(self, name: str) -> str:
path = SKILLS_DIR / name / "SKILL.md"
if not path.exists():
return f"Skill '{name}' 不存在"
try:
return path.read_text(encoding="utf-8")
except Exception as e:
return f"读取 Skill 失败: {e}"

def list_skills(self) -> str:
skills = list_installed_skills()
if not skills:
return "当前没有安装任何 Skill"
lines = []
for s in skills:
name = s['name']
desc = s.get('description', '')
if desc:
lines.append(f"{name}: {desc}")
else:
lines.append(name)
return "已安装的 Skill:\n" + "\n".join(lines)

def _execute_tool(self, tool_name, arguments):
func = self.function_map.get(tool_name)
if not func:
return f"未知工具: {tool_name}"
try:
args = json.loads(arguments) if isinstance(arguments, str) else arguments
return func(**args)
except Exception as e:
return f"工具执行异常: {e}"

def _process_stream(self, response):
content_buffer = ""
tool_calls_dict = {}
live = None
rendered = False

console.print("[bold yellow]🧠 思考过程:[/bold yellow] ", end="")
for line_bytes in response:
line = line_bytes.decode("utf-8").strip()
if not line or line.startswith(":") or line == "data: [DONE]":
continue
if not line.startswith("data: "):
continue
try:
chunk = json.loads(line[6:])
except json.JSONDecodeError:
continue

choices = chunk.get("choices", [])
if not choices:
continue
delta = choices[0].get("delta", {})

reasoning = delta.get("reasoning_content") or ""
if reasoning:
console.print(reasoning, end="", markup=False, highlight=False)

content = delta.get("content") or ""
if content:
if live is None and not tool_calls_dict:
live = Live(Markdown(""), console=console, auto_refresh=False, vertical_overflow="visible")
live.start()
content_buffer += content
if live:
live.update(Markdown(content_buffer), refresh=True)

for tc_delta in delta.get("tool_calls") or []:
if live and not tool_calls_dict:
live.stop()
live = None
idx = tc_delta.get("index")
if idx is None:
continue
if idx not in tool_calls_dict:
tool_calls_dict[idx] = {"id": "", "type": "function", "function": {"name": "", "arguments": ""}}
tc = tool_calls_dict[idx]
if "id" in tc_delta:
tc["id"] += tc_delta["id"]
fd = tc_delta.get("function") or {}
if "name" in fd:
tc["function"]["name"] += fd["name"]
if "arguments" in fd:
tc["function"]["arguments"] += fd["arguments"]

if live:
live.stop()
rendered = True
console.print()

tool_calls = []
for idx in sorted(tool_calls_dict.keys()):
tc = tool_calls_dict[idx]
tool_calls.append({
"id": tc["id"],
"type": "function",
"function": {"name": tc["function"]["name"], "arguments": tc["function"]["arguments"]}
})

if content_buffer.strip() and not tool_calls:
return content_buffer.strip(), None, rendered
if tool_calls:
return None, tool_calls, False
return None, None, False

def _export_markdown(self):
md = f"# Boundless 对话导出\n\n"
md += f"项目: {self.context_file.parent.name}\n"
md += f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n---\n\n"
for msg in self.messages:
role = msg.get("role", "unknown")
content = msg.get("content")
if role == "system":
continue
elif role == "user":
md += f"### 👤 用户\n\n{content}\n\n"
elif role == "assistant":
if content:
md += f"### 🤖 Boundless\n\n{content}\n\n"
if msg.get("tool_calls"):
for tc in msg["tool_calls"]:
fn = tc["function"]["name"]
args = tc["function"]["arguments"]
md += f"*调用工具: `{fn}`* \n```json\n{args}\n```\n\n"
elif role == "tool":
md += f"*工具返回 (id: {msg.get('tool_call_id', '')})* \n```\n{content}\n```\n\n"
return md

def _handle_export(self):
md = self._export_markdown()
export_dir = self.context_file.parent
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"boundless_export_{timestamp}.md"
path = export_dir / filename
try:
path.write_text(md, encoding="utf-8")
console.print(f"📄 对话已导出至: {path}", style="green")
except Exception as e:
console.print(f"导出失败: {e}", style="red")

def _multiline_input(self):
console.print("[提示] 输入多行文本,以三个双引号 \"\"\" 结束", style="dim")
lines = []
while True:
try:
line = input()
except (EOFError, KeyboardInterrupt):
break
if line == '"""':
break
lines.append(line)
return "\n".join(lines)

def run(self):
console.print(f"🤖 Boundless | 目录: {os.getcwd()}", style="bold blue")
skills = list_installed_skills()
if skills:
console.print("[bold]📦 已加载 Skill:[/bold]")
for s in skills:
name = s['name']
desc = s.get('description', '')
if desc:
console.print(f" • {name} - {desc}")
else:
console.print(f" • {name}")
else:
console.print("[dim]未加载任何 Skill,可使用 ag skill install 安装[/dim]")

if auto_confirm():
console.print("⚡ 自动确认模式", style="yellow")
console.print("[提示] 输入 /help 查看命令列表", style="dim")

EXIT_KEYS = {"exit", "quit", "q", "退出", "exit()", "quit()"}
try:
while True:
tokens_used = count_messages_tokens(self.messages)
max_model_tokens = 1_048_576
percent = tokens_used / max_model_tokens * 100
console.print(
f"[dim]📊 已用 Token: {tokens_used:,} / {max_model_tokens:,} ({percent:.1f}%)[/dim]"
)
try:
user_input = input("> ")
except (EOFError, KeyboardInterrupt):
console.print("\n再见!", style="bold")
break
if user_input.strip().lower() in EXIT_KEYS:
console.print("再见!", style="bold")
break
if not user_input.strip():
continue

if user_input.strip().startswith('"""'):
text = self._multiline_input()
if text:
self.messages.append({"role": "user", "content": text})
else:
continue
elif user_input.strip() == "/clear":
self._build_system_prompt()
self._save_context()
console.print("🧹 上下文已清空,开始新对话。", style="green")
continue
elif user_input.strip() == "/backup":
self._save_context()
self._backup_file()
console.print("📦 上下文已备份", style="green")
continue
elif user_input.strip() == "/export":
self._handle_export()
continue
elif user_input.strip() == "/edit":
with tempfile.NamedTemporaryFile(mode='w+', suffix='.txt', delete=False, encoding='utf-8') as f:
f.write("")
tmpname = f.name
editor = os.environ.get("EDITOR", "nano")
os.system(f"{editor} {tmpname}")
if os.path.exists(tmpname):
with open(tmpname, 'r', encoding='utf-8') as f:
edited = f.read().strip()
os.unlink(tmpname)
if edited:
self.messages.append({"role": "user", "content": edited})
else:
continue
else:
continue
elif user_input.strip() == "/help":
self._print_help()
continue
else:
self.messages.append({"role": "user", "content": user_input})

while True:
try:
response = deepseek_stream(self.messages, self.tools)
except Exception as e:
console.print(f"API 错误: {e}", style="red")
if self.messages and self.messages[-1]["role"] == "user":
self.messages.pop()
break

final_text, tool_calls, rendered = self._process_stream(response)

if final_text:
if not rendered:
console.print(Markdown(final_text))
self.messages.append({"role": "assistant", "content": final_text})
self._save_context()
self._trim_context()
break

if tool_calls:
self.messages.append({"role": "assistant", "content": None, "tool_calls": tool_calls})
for tc in tool_calls:
fn_name = tc["function"]["name"]
fn_args = tc["function"]["arguments"]
console.print(f"\n{'='*40}", style="dim")
console.print(f"⚙️ 调用工具: [bold]{fn_name}[/bold]")
console.print(f" 参数: {fn_args}")
result = self._execute_tool(fn_name, fn_args)
if fn_name == "execute_command":
console.print("✅ 命令执行完毕", style="green")
else:
console.print(Markdown(result))
console.print(f"{'='*40}", style="dim")
self.messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": result,
})
self._save_context()
continue

console.print("警告: 无有效响应", style="yellow")
break
finally:
self._save_context()
self._backup_file()
console.print(f"💾 对话已保存", style="green")

def _print_help(self):
help_text = """
[bold]Boundless 命令说明[/bold]

[bold]对话内命令:[/bold]
/clear - 清空当前对话并开始新会话
/backup - 手动备份当前对话记录
/export - 导出对话为 Markdown 文件
/edit - 调用外部编辑器 (nano) 输入长文本
\"\"\" - 多行输入模式,以三个双引号结束
/help - 显示此帮助信息
exit / q - 退出程序

[bold]启动子命令 (ag 命令):[/bold]
ag new <项目名> - 在当前目录创建新项目
ag project remove <名> - 删除项目(含上下文)
ag project rename <旧> <新> - 重命名项目
ag context clear/view - 管理全局上下文
ag skill list/install/remove - 管理 Skill (install 支持本地路径或 URL)
ag --debug - 启用调试日志

[bold]内置工具 (AI 可调用):[/bold]
read_file, write_file, list_directory, execute_command
web_fetch, web_search, change_directory
read_skill, list_skills

[bold]配置:[/bold]
文件: ~/.boundless/config.json
可调整: max_context_tokens, keep_recent_rounds, backup_count
"""
console.print(Markdown(help_text))

def project_remove(name: str):
proj_dir = Path.cwd() / name
if not proj_dir.exists() or not (proj_dir / "context.jsonl").exists():
console.print(f"项目 '{name}' 不存在或无效", style="red")
return
if confirm_action(f"确认删除项目 '{name}' 及其全部对话记录?"):
shutil.rmtree(proj_dir)
console.print(f"项目 '{name}' 已删除", style="green")
else:
console.print("已取消")

def project_rename(old: str, new: str):
old_dir = Path.cwd() / old
new_dir = Path.cwd() / new
if not old_dir.exists() or not (old_dir / "context.jsonl").exists():
console.print(f"项目 '{old}' 不存在或无效", style="red")
return
if new_dir.exists():
console.print(f"目标 '{new}' 已存在", style="red")
return
old_dir.rename(new_dir)
console.print(f"项目已重命名: {old} -> {new}", style="green")

def main():
setup_readline()

if args.command == "context":
if len(args.args) < 1:
console.print("用法: ag context [clear|view]")
return
cmd = args.args[0]
ctx_file = BOUNDLESS_DIR / "global_context.jsonl"
if cmd == "clear":
if ctx_file.exists():
ctx_file.unlink()
console.print("已删除全局上下文文件。", style="green")
else:
console.print("全局上下文不存在。")
elif cmd == "view":
if ctx_file.exists():
try:
with open(ctx_file, 'r') as f:
lines = f.readlines()
msgs = [json.loads(l) for l in lines if l.strip()]
console.print(f"全局上下文包含 {len(msgs)} 条消息。")
for idx, msg in enumerate(msgs):
role = msg.get("role", "?")
content = str(msg.get("content", ""))[:80]
console.print(f" [{idx}] {role}: {content}")
except Exception as e:
console.print(f"上下文文件损坏: {e}")
else:
console.print("全局上下文不存在。")
else:
console.print("未知子命令。支持: clear / view")
return

elif args.command == "skill":
if len(args.args) < 1:
console.print("用法: ag skill [list|install|remove] ...")
return
cmd = args.args[0]
if cmd == "list":
skills = list_installed_skills()
if not skills:
console.print("暂无已安装的 Skill")
else:
for s in skills:
desc = s.get('description', '')
if desc:
console.print(f"📦 {s['name']} - {desc}")
else:
console.print(f"📦 {s['name']}")
elif cmd == "install":
if len(args.args) < 2:
console.print("请提供 Skill 源路径: ag skill install <路径或URL>")
return
source = args.args[1]
console.print("预览 SKILL.md:")
preview = preview_skill(source)
console.print(Markdown(preview))
ans = input("确认安装? (Y/n): ").strip().lower()
if ans in ("", "y"):
result = install_skill_from_source(source)
console.print(result)
else:
console.print("已取消安装")
elif cmd == "remove":
if len(args.args) < 2:
console.print("请提供 Skill 名称: ag skill remove <名称>")
return
result = remove_skill(args.args[1])
console.print(result)
else:
console.print("未知 skill 子命令")
return

elif args.command == "project":
if len(args.args) < 1:
console.print("用法: ag project [remove|rename] ...")
return
cmd = args.args[0]
if cmd == "remove":
if len(args.args) < 2:
console.print("请提供项目名称: ag project remove <名称>")
return
project_remove(args.args[1])
elif cmd == "rename":
if len(args.args) < 3:
console.print("用法: ag project rename <旧名称> <新名称>")
return
project_rename(args.args[1], args.args[2])
else:
console.print("未知 project 子命令。支持: remove, rename")
return

elif args.command == "new":
if len(args.args) < 1:
console.print("用法: ag new <项目名>")
return
proj_name = args.args[0]
context_file = Path.cwd() / f"{proj_name}.jsonl"
if context_file.exists():
console.print(f"项目文件 {proj_name}.jsonl 已存在")
ans = input("(C)继续对话 / (O)覆盖并开始新对话 / (Q)取消 [C]: ").strip().lower()
if ans in ('o', 'overwrite'):
backup_dir = context_file.parent / (context_file.stem + "_backups")
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = time.strftime("%Y%m%d_%H%M%S")
backup_name = f"{context_file.stem}_{timestamp}.jsonl"
shutil.copy2(context_file, backup_dir / backup_name)
context_file.unlink()
console.print("已备份旧对话并创建新会话")
elif ans in ('q', 'quit', 'cancel'):
return
else:
console.print(f"在当前目录创建项目 '{proj_name}',上下文文件: {proj_name}.jsonl", style="green")
agent = BoundlessAgent(context_file=context_file)
agent.run()
return

else:
show_banner()
projects = sorted(
[d for d in Path.cwd().iterdir() if d.is_dir() and (d / "context.jsonl").exists()],
key=lambda x: x.name
)
if projects:
console.print("📁 当前目录下的项目:", style="bold")
for i, p in enumerate(projects, 1):
console.print(f" {i}. {p.name}")
console.print(" 输入数字选择项目,或输入 'g' 进入全局模式,'q' 退出")
choice = input("> ").strip()
if choice.lower() in ('q', 'quit', 'exit'):
return
elif choice.lower() == 'g':
context_file = BOUNDLESS_DIR / "global_context.jsonl"
work_dir = Path.cwd()
else:
try:
idx = int(choice) - 1
proj_dir = projects[idx]
context_file = proj_dir / "context.jsonl"
work_dir = proj_dir
except (ValueError, IndexError):
console.print("无效选择", style="red")
return
else:
console.print("当前目录暂无项目,创建新项目请使用: ag new <项目名>")
console.print("进入全局模式...")
context_file = BOUNDLESS_DIR / "global_context.jsonl"
work_dir = Path.cwd()

os.chdir(work_dir)
agent = BoundlessAgent(context_file=context_file)
agent.run()

if __name__ == "__main__":
main()
  • 标题: 简单的TermuxAgent
  • 作者: Star Dust
  • 创建于 : 2026-06-18 20:15:58
  • 更新于 : 2026-06-18 20:24:11
  • 链接: https://starblog.qzz.io/posts/483eadb4.html
  • 版权声明: 版权所有 © Star Dust,禁止转载。
评论