叫我阿衰
3K posts


Pharos批量检查机器人
创建“wallets.txt”填入查询的钱包evm密钥
创建“bot.py”填入代码:
import argparse
import csv
import json
import os
import re
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from curl_cffi import requests
BASE_URL = "api.claim.pharos.xyz/airdrop/airdro…"
SIGNIN_URL = "api.claim.pharos.xyz/accounts/sign_…"
DEFAULT_DELAY_SECONDS = 2.0
DEFAULT_TIMEOUT_SECONDS = 10
DEFAULT_RETRIES = 3
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/plain, */*",
"Origin": "claim.pharos.xyz",
"Referer": "claim.pharos.xyz",
}
ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
def _strip_ansi(s: str) -> str:
return ANSI_RE.sub("", s)
def _terminal_cols() -> int:
try:
cols = os.get_terminal_size().columns
except OSError:
cols = 80
return max(60, int(cols))
def _center_line(line: str, cols: int) -> str:
plain = _strip_ansi(line).rstrip()
pad = max(0, (cols - len(plain)) // 2)
return (" " * pad) + line
def _center_block(text: str, cols: int) -> str:
return "\n".join(_center_line(line, cols) for line in text.split("\n"))
def _center_block_uniform(text: str, cols: int) -> str:
lines = text.split("\n")
lengths = [len(_strip_ansi(line).rstrip()) for line in lines]
max_len = max(lengths) if lengths else 1
pad = max(0, (cols - max_len) // 2)
return "\n".join((" " * pad) + line for line in lines)
def _ansi_color(text: str, color_code: int, bold: bool = False) -> str:
style = "1;" if bold else ""
return f"\033[{style}38;5;{color_code}m{text}\033[0m"
def _gradient_text(text: str) -> str:
# 柔和渐变色(接近 pastel)
palette = [219, 183, 153, 117, 111, 147]
out = []
idx = 0
for ch in text:
if ch.isspace():
out.append(ch)
continue
out.append(_ansi_color(ch, palette[idx % len(palette)], bold=True))
idx += 1
return "".join(out)
def _get_warning_text() -> str:
return _ansi_color(
"⚠️ 【免责声明】\n"
"此脚本仅供学习交流使用\n"
"使用本脚本产生的任何后果\n"
"由用户自行承担\n"
"作者对任何损失概不负责。",
196,
bold=True,
)
def _build_warning_box(content: str, width: int = 45) -> str:
inner = max(20, width - 2)
lines = content.split("\n")
result = [f"╭{'─' * inner}╮"]
for line in lines:
plain_len = len(_strip_ansi(line))
left = max(0, (inner - plain_len) // 2)
right = max(0, inner - plain_len - left)
result.append(f"│{' ' * left}{line}{' ' * right}│")
result.append(f"╰{'─' * inner}╯")
return "\n".join(result)
def get_banner_string() -> str:
cols = _terminal_cols()
dog = "\n".join(
[
" / ̄|",
" (ᴗ ⊃🍖",
" U ̄ ̄U",
" U U",
]
)
dog_centered = _center_block(_ansi_color(dog, 51, bold=True), cols)
title_centered = _center_block(_gradient_text("✨ Pharos 空投检测机器人 ✨"), cols)
twitter_centered = _center_line(_gradient_text("🐦 Twitter: @lumaogou_web3"), cols)
warning_box = _build_warning_box(_get_warning_text(), width=45)
warning_centered = _center_block_uniform(warning_box, cols)
line = _center_line(_ansi_color("—" * 50, 244), cols)
return (
"\n"
+ dog_centered
+ "\n"
+ title_centered
+ "\n"
+ twitter_centered
+ "\n"
+ warning_centered
+ "\n"
+ line
+ "\n"
)
def display_banner() -> None:
print(get_banner_string())
def show_banner() -> None:
display_banner()
@dataclass
class CheckResult:
wallet: str
status: str
message: str
data: Optional[Dict]
http_status: Optional[int] = None
def is_valid_wallet(wallet: str) -> bool:
return bool(re.fullmatch(r"0x[a-fA-F0-9]{40}", wallet.strip()))
def is_valid_private_key(value: str) -> bool:
return bool(re.fullmatch(r"0x[a-fA-F0-9]{64}", value.strip()))
def load_wallets_from_file(path: str) -> List[str]:
wallets: List[str] = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
wallets.append(line)
return wallets
def split_wallets_and_private_keys(items: List[str]) -> Tuple[List[str], List[str], List[str]]:
wallets: List[str] = []
private_keys: List[str] = []
invalid: List[str] = []
for item in items:
s = item.strip()
if is_valid_wallet(s):
wallets.append(s)
elif is_valid_private_key(s):
private_keys.append(s)
else:
invalid.append(s)
return wallets, private_keys, invalid
def parse_wallets(args_wallets: List[str], wallet_file: Optional[str]) -> List[str]:
wallets: List[str] = []
if wallet_file:
wallets.extend(load_wallets_from_file(wallet_file))
wallets.extend(args_wallets)
unique_wallets = list(dict.fromkeys(wallets))
return unique_wallets
def classify_response(wallet: str, resp_json: Dict, status_code: int) -> CheckResult:
data = resp_json.get("data")
message = str(resp_json.get("message", "")).lower()
biz_code = resp_json.get("code")
success = resp_json.get("success")
if (
data is None
and (
message == "no data"
or biz_code == 40001
or success is True
)
):
return CheckResult(
wallet=wallet,
status="not_selected",
message="未中签 (0 空投)",
data=None,
http_status=status_code,
)
return CheckResult(
wallet=wallet,
status="selected",
message="中签",
data=data if isinstance(data, dict) else {"raw_data": data},
http_status=status_code,
)
def check_airdrop(
session: requests.Session,
wallet: str,
timeout: int,
retries: int,
backoff: float,
) -> CheckResult:
params = {"address": wallet}
for attempt in range(1, retries + 1):
try:
response = session.get(BASE_URL, params=params, timeout=timeout)
status = response.status_code
if status == 200:
try:
resp_json = response.json()
except json.JSONDecodeError:
return CheckResult(
wallet=wallet,
status="error",
message="返回内容不是有效 JSON",
data=None,
http_status=status,
)
return classify_response(wallet, resp_json, status)
if status == 403:
return CheckResult(
wallet=wallet,
status="blocked",
message="被网站防火墙拦截 (403)",
data=None,
http_status=status,
)
if status == 401:
return CheckResult(
wallet=wallet,
status="unauthorized",
message="未授权 (401):认证信息缺失、错误或已过期",
data=None,
http_status=status,
)
if status == 429:
if attempt < retries:
sleep_s = backoff * attempt
print(f"⚠️ 地址: {wallet} | 429 限流,{sleep_s:.1f}s 后重试...")
time.sleep(sleep_s)
continue
return CheckResult(
wallet=wallet,
status="rate_limited",
message="查询过快被限流 (429)",
data=None,
http_status=status,
)
if 500 <= status < 600 and attempt < retries:
sleep_s = backoff * attempt
print(f"⚠️ 地址: {wallet} | 服务器错误 {status},{sleep_s:.1f}s 后重试...")
time.sleep(sleep_s)
continue
return CheckResult(
wallet=wallet,
status="http_error",
message=f"异常状态码: {status}",
data=None,
http_status=status,
)
except requests.exceptions.RequestException as e:
if attempt < retries:
sleep_s = backoff * attempt
print(f"⚠️ 地址: {wallet} | 网络错误重试中: {e},{sleep_s:.1f}s 后重试...")
time.sleep(sleep_s)
continue
return CheckResult(
wallet=wallet,
status="network_error",
message=f"网络错误: {e}",
data=None,
http_status=None,
)
return CheckResult(
wallet=wallet,
status="error",
message="未知错误",
data=None,
http_status=None,
)
def print_result(result: CheckResult) -> None:
if result.status == "selected":
print(f"🎉 地址: {result.wallet} | 结果: 中签 | 详情: {result.data}")
elif result.status == "not_selected":
print(f"❌ 地址: {result.wallet} | 结果: {result.message}")
else:
print(f"🚫 地址: {result.wallet} | 失败: {result.message}")
def build_runtime_headers(auth_bearer: Optional[str], cookie: Optional[str]) -> Dict[str, str]:
headers = dict(HEADERS)
if auth_bearer:
token = auth_bearer.strip()
if not token.lower().startswith("bearer "):
token = f"Bearer {token}"
headers["Authorization"] = token
if cookie:
headers["Cookie"] = cookie.strip()
return headers
def set_auth_header_with_token(session: requests.Session, token: str) -> None:
session.headers["Authorization"] = f"TOKEN {token.strip()}"
def build_token_header_candidates(token: str) -> List[Dict[str, str]]:
t = token.strip()
return [
{"Authorization": f"TOKEN {t}"},
{"Authorization": f"Bearer {t}"},
{"Authorization": t},
{"token": t},
{"x-token": t},
{"Cookie": f"token={t}"},
{"Cookie": f"auth_token={t}"},
{"Cookie": f"authorization={t}"},
{"Cookie": f"Authorization=Bearer {t}"},
]
def apply_working_token_header(
session: requests.Session,
token: str,
wallet: str,
timeout: int,
) -> Optional[str]:
original_headers = dict(session.headers)
for candidate in build_token_header_candidates(token):
session.headers.clear()
session.headers.update(original_headers)
session.headers.update(candidate)
try:
r = session.get(BASE_URL, params={"address": wallet}, timeout=timeout)
if r.status_code != 401:
return ", ".join(candidate.keys())
except requests.exceptions.RequestException:
continue
session.headers.clear()
session.headers.update(original_headers)
return None
def sign_in_blockchain_get_token(
session: requests.Session,
payload: Dict,
timeout: int,
) -> Optional[str]:
try:
r = session.post(SIGNIN_URL, json=payload, timeout=timeout)
except requests.exceptions.RequestException as e:
print(f"⚠️ 签名登录请求失败: {e}")
return None
if r.status_code != 200:
print(f"⚠️ 签名登录失败,HTTP 状态码: {r.status_code}")
return None
try:
resp_json = r.json()
except json.JSONDecodeError:
print("⚠️ 签名登录失败:返回不是有效 JSON")
return None
data = resp_json.get("data") if isinstance(resp_json, dict) else None
if isinstance(data, dict) and data.get("verified") is True and data.get("token"):
if session.cookies:
print(f"ℹ️ 登录后已获取 Cookie 数量: {len(session.cookies)}")
return str(data["token"])
print(f"⚠️ 签名登录未拿到 token,返回: {resp_json}")
return None
def build_signin_payload_from_private_key(private_key: str) -> Tuple[str, Dict]:
try:
from eth_account import Account
from eth_account.messages import encode_defunct
except ImportError as e:
raise RuntimeError("缺少依赖 eth-account,请先执行: pip install eth-account") from e
account = Account.from_key(private_key.strip())
address = account.address
timestamp = int(time.time() * 1000)
message = (
"Sign this message to authenticate with Pharos.\n\n"
f"Wallet: {address}\n"
f"Timestamp: {timestamp}"
)
signed = account.sign_message(encode_defunct(text=message))
signature_hex = signed.signature.hex()
if not signature_hex.startswith("0x"):
signature_hex = f"0x{signature_hex}"
payload = {
"address": address,
"message": message,
"mode": "evm",
"signature": signature_hex,
}
return address, payload
def save_as_csv(path: str, results: List[CheckResult]) -> None:
with open(path, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow(["wallet", "status", "message", "http_status", "data_json"])
for r in results:
writer.writerow(
[
r.wallet,
r.status,
r.message,
r.http_status if r.http_status is not None else "",
json.dumps(r.data, ensure_ascii=False) if r.data is not None else "",
]
)
def summarize(results: List[CheckResult]) -> None:
total = len(results)
selected = sum(1 for r in results if r.status == "selected")
not_selected = sum(1 for r in results if r.status == "not_selected")
failed = total - selected - not_selected
print("\n" + "-" * 60)
print("📊 查询汇总")
print(f"总数: {total}")
print(f"中签: {selected}")
print(f"未中签: {not_selected}")
print(f"失败: {failed}")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="批量查询 Pharos 空投资格")
parser.add_argument(
"wallets",
nargs="*",
help="钱包地址或私钥列表(可直接空格分隔传入多个)",
)
parser.add_argument(
"-f",
"--wallet-file",
help="从文本文件读取地址/私钥(每行一个,支持 # 注释)",
)
parser.add_argument(
"--delay",
type=float,
default=DEFAULT_DELAY_SECONDS,
help=f"每个地址基础间隔秒数(默认 {DEFAULT_DELAY_SECONDS})",
)
parser.add_argument(
"--timeout",
type=int,
default=DEFAULT_TIMEOUT_SECONDS,
help=f"单次请求超时秒数(默认 {DEFAULT_TIMEOUT_SECONDS})",
)
parser.add_argument(
"--retries",
type=int,
default=DEFAULT_RETRIES,
help=f"每个地址最大尝试次数(默认 {DEFAULT_RETRIES})",
)
parser.add_argument(
"--backoff",
type=float,
default=1.5,
help="重试退避系数秒(默认 1.5)",
)
parser.add_argument(
"--csv",
help="将结果导出为 CSV 文件(例如 result.csv)",
)
parser.add_argument(
"--auth-bearer",
help="可选:传入 Bearer Token(也可用环境变量 PHAROS_BEARER)",
)
parser.add_argument(
"--cookie",
help="可选:传入 Cookie(也可用环境变量 PHAROS_COOKIE)",
)
parser.add_argument(
"--token",
help="可选:登录接口返回的 token(也可用环境变量 PHAROS_TOKEN)",
)
return parser
def main() -> None:
display_banner()
parser = build_parser()
args = parser.parse_args()
wallet_file = args.wallet_file
if not args.wallets and not wallet_file:
default_wallet_file = "wallets.txt"
if os.path.exists(default_wallet_file):
wallet_file = default_wallet_file
print(f"ℹ️ 未传入参数,默认读取 {default_wallet_file}")
entries = parse_wallets(args.wallets, wallet_file)
if not entries:
parser.error("请至少提供一个钱包地址或私钥,或在当前目录准备 wallets.txt。")
valid_wallets, private_keys, invalid_wallets = split_wallets_and_private_keys(entries)
if invalid_wallets:
print("⚠️ 以下地址格式无效,已跳过:")
for w in invalid_wallets:
print(f" - {w}")
if not valid_wallets and not private_keys:
print("❌ 没有可查询的有效地址/私钥。")
return
print(
f"🚀 开始批量查询,地址 {len(valid_wallets)} 个,私钥 {len(private_keys)} 个...\n"
+ "-" * 60
)
results: List[CheckResult] = []
session = requests.Session(impersonate="chrome120")
auth_bearer = args.auth_bearer or os.getenv("PHAROS_BEARER")
cookie = args.cookie or os.getenv("PHAROS_COOKIE")
token = args.token or os.getenv("PHAROS_TOKEN")
session.headers.update(build_runtime_headers(auth_bearer, cookie))
if auth_bearer:
print("ℹ️ 已使用 Authorization 认证头")
if cookie:
print("ℹ️ 已使用 Cookie 认证头")
if token and not auth_bearer and not cookie:
picked = apply_working_token_header(
session=session,
token=token,
wallet=valid_wallets[0] if valid_wallets else "0x0000000000000000000000000000000000000000",
timeout=args.timeout,
)
if picked:
print(f"ℹ️ 已使用 token 认证头: {picked}")
else:
set_auth_header_with_token(session, token)
print("⚠️ token 认证头自动探测失败,先使用 Authorization: Bearer")
if not auth_bearer and not cookie and not token and not private_keys:
print("⚠️ 当前未提供 Authorization/Cookie/token,接口大概率会返回 401。")
for i, wallet in enumerate(valid_wallets):
result = check_airdrop(
session=session,
wallet=wallet,
timeout=args.timeout,
retries=max(1, args.retries),
backoff=max(0.1, args.backoff),
)
results.append(result)
print_result(result)
if i < len(valid_wallets) - 1 or private_keys:
time.sleep(max(0.0, args.delay))
for j, private_key in enumerate(private_keys):
try:
wallet_addr, signin_payload = build_signin_payload_from_private_key(private_key)
except Exception as e:
result = CheckResult(
wallet="(private_key)",
status="sign_error",
message=f"本地签名失败: {e}",
data=None,
http_status=None,
)
results.append(result)
print_result(result)
continue
auto_token = sign_in_blockchain_get_token(
session=session,
payload=signin_payload,
timeout=args.timeout,
)
if not auto_token:
result = CheckResult(
wallet=wallet_addr,
status="signin_failed",
message="签名登录失败,未拿到 token",
data=None,
http_status=None,
)
results.append(result)
print_result(result)
else:
picked = apply_working_token_header(
session=session,
token=auto_token,
wallet=wallet_addr,
timeout=args.timeout,
)
if not picked:
set_auth_header_with_token(session, auto_token)
print(f"⚠️ 地址: {wallet_addr} | token 头自动探测失败,改用 Authorization: Bearer")
result = check_airdrop(
session=session,
wallet=wallet_addr,
timeout=args.timeout,
retries=max(1, args.retries),
backoff=max(0.1, args.backoff),
)
results.append(result)
print_result(result)
if j < len(private_keys) - 1:
time.sleep(max(0.0, args.delay))
summarize(results)
if results and all(r.http_status == 401 for r in results):
print("\n💡 当前全部为 401,通常表示接口已要求认证参数。")
print(" 请在浏览器开发者工具 Network 中复制请求头后重试:")
print(" 1) 方式A:py bot.py --auth-bearer \"你的token\"")
print(" 2) 方式B:py bot.py --cookie \"你的cookie\"")
print(" 3) 方式C:py bot.py --token \"登录接口返回token\"")
print(" 4) 或把私钥放入 wallets.txt(每行一个 0x...)走自动签名登录")
if args.csv:
save_as_csv(args.csv, results)
print(f"✅ 结果已导出: {args.csv}")
print("✅ 查询结束。")
if __name__ == "__main__":
main()
中文

打卡1177天
Somnia有新进展了!!!坚持了半年!!!
Somnia 的宝箱终于能开了 ,
网址:lootboxes.somnia.network
8 个钥匙可以在右上角里合成一个钥匙,
每人最多可以开 2 个箱子,也就需要 16 把钥匙 。
其中8把钥匙是赠送的,另外8把钥匙要购买,
钥匙购买:opensea.io/zh-CN/collecti…
挑便宜的买8 个钥匙就行,不用管是不是一样的
目前宝箱里有积分: 125,180,250,300,500
钥匙现在基本是 0.4 刀,8 个就是 3.2 刀

中文

断签了一天,可惜,别辜负我,希望能有个好的结局
Joining the @TermMaxFi Puzzle Challenge — collecting all 4 pieces to unlock the Master Badge #TermMaxPuzzleChallenge #TermMax

中文

Meet Haggle — a first-of-its-kind AI agent guarding a vault of tokens on Base.
Convince Haggle you deserve a share of the supply.
As the vault empties, it gets harder to convince.
Don’t be late.
Early wallets are being airdropped Haggle credits — your way to talk to the AI and mint tokens:
→ Follow
→ Like + RT
→ Drop your Base address 👇

English

The Gensyn Mainnet is now live with its first application.
@Delphi_fyi
gensyn@gensynai
The Gensyn Mainnet is live with its first application. Delphi launches today.
English

聊一个刚看到的项目,叫 @navaai ,4月14日刚宣布完成 830 万美金融资,核心投资方是 Polychain 和 Hack VC。
算下来才过去不到一周,属于非常早期的阶段。
在 RootData 上它被归类为「AI 代理安全基础设施」,听起来有点绕,我说一下它想干嘛:
1⃣就是在你点下一笔交易、但还没正式上链的这个中间空挡,Nava 会插一脚进来。
⭕️用 AI 帮你判断这笔交易是不是有问题——比如是不是在和黑客合约交互、是不是钓鱼、是不是假币。
如果有问题,它会拦下来,不让你提交上链。
我个人觉得这个切入点其实挺实在的。
2⃣我们平时装了一堆安全插件,OneKey、币安之前也做过类似的,但说真的,手快的时候那个弹窗根本看不仔细,直接就点过去了🥲
Nava 做的其实就是在这层做第二道 AI 审核,理论上能在你无感的情况下把风险挡掉。
3⃣发币预期这块是比较确定的。
翻了一下他们的文档,里面明确提到了 Nava Chain,也就是后续会有自己的链。
既然有链,大概率就会有代币,这个逻辑和之前 Arc、Stable 这些稳定币公链最终也都发币是一样的。
文档里还写到,交易执行前会提交给 Nava 做仲裁,返回一份带理由的「合格/不合格」签名报告,所以后续每笔交易基本都会过一遍 AI 代理审核。
4⃣团队背景这里我多说两句,还挺有意思的:
CEO 和 COO 都是前 EigenLayer(现在改名叫 EigenCloud)创始团队出来的人。
EigenLayer 大家应该都有印象,2024 年那会儿非常火,融资两亿多美金,也是 Polychain 投的。
所以 Nava 和 EigenLayer 算是同一个投资机构背景下出来的项目,可以作为一个参考维度。
5⃣但这里我要说句实话,EigenLayer 当时确实女巫了很多号,空投相对没那么大方,也算不上赚很多。
所以 Nava 大家做的时候预期别拉太高。
参与方式目前很简单:
官网填邮箱进 waitlist,就这么一步。
navalabs.ai
6⃣我的建议是多填几个邮箱,因为像 Arc 最近封号 ban 号的情况挺多的,号少了一旦被 ban 基本就没了,号多一点至少能分散点风险。
现在这个阶段不用想太多,反正就是留个邮箱的事,等后面到了要投钱、要花大量时间精力那种节点,再重新评估这个项目值不值得继续做。
7⃣总结一下我的看法:
融资还行、背景还可以、做的事情不难、有发币预期。
早期埋伏一下没什么问题。

中文

okboost今天什么策略?
距离上一次x luanch 的两个项目kat和ofc已经11天了。
这11天没有一个项目!!!这什么概念?OKboost10天一个周期,11天0个项目。4月8日、9日两天刷的兄弟,一个都没吃到,数据就已经过期了。血本无归🫠
这也就是我刷okboost一直有说的,OK没有包袱,可以随时割我们。这也是大工作室迟迟没有进入的主要原因,同时也正因此,留给了散户博弈的机会。
那怎么操作?
小量参与,观望为主,不下牌桌。别一被反撸就提币走人(除非你有更好的机会、看不上这里的三瓜俩枣)。
如何刷?47以上反佣比例的兄弟瞄准quq和koge吧,别去刷加成币了,实际成本比高反稳定币高多了。boost交易量刷500门槛就够了,大家都被反撸了,都撤了一大堆人了,不会提高门槛的,放一百个心。刷600u成本又要➕3-4u。
什么时候刷?
今天刷。监控有一个项目了,OPG价值约50w,tge时间是下周二。所以可能明天会快照。

中文

@sandyXBT @solsticefi It sounds so good, but no one has registered, and now there are less than 1w people. It's funny.
English

Good news for @solsticefi Airdrop 1 participants!
Registration has been extended to April 24th
More time for eligible wallets to lock in their SLX allocation ahead of TGE
also unclaimed SLX from those who miss the deadline will be redistributed to registered true supporters.

English












