V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
UEVdugfw
V2EX  ›  Telegram

飞机开发者 API 连接超时

  •  
  •   UEVdugfw · 3 天前 · 152 次点击

    刚接触飞机开发者 API ,写的 python 脚本用于群关键字监控,运行时报错 Attempt 1 at connecting failed: TimeoutError

    之后使用 curl -4 -L https://api.telegram.org 测试连通性,返回的是一个 html 网页,而不是 json 。

    该 IP 用于飞机登录和日常使用是没问题的。期间也换了几个,报错都一样。

    请问这问题是不是出在 IP 上?

    脚本:

    import asyncio
    from telethon import TelegramClient, events
    from telethon.network import ConnectionTcpMTProxyRandomizedIntermediate
    
    # 替换为你的 API 凭证
    api_id = 'xxxx'  # 你的 api_id
    api_hash = 'xxxx'  # 你的 api_hash
    phone_number = 'xxxx'  # 你的电话号码,例如 '+1234567890'
    group_name = 'xxxx'  # 目标群聊的用户名(例如 '@MyGroup')或 ID (例如 -100123456789 )
    
    # 定义要监控的关键字列表
    keywords = ['test', 'python']  # 替换为你想监控的关键字
    
    # 代理设置(根据你的代理类型选择 SOCKS5 或 HTTP )
    proxy = {
        'proxy_type': 'http',  # 可选值:'socks5', 'http', 或 'mtproxy'
        'addr': '127.0.0.1',  # 代理服务器地址
        'port': 7897,  # 代理服务器端口
        'username': None,  # 代理用户名(如果不需要认证,设为 None )
        'password': None  # 代理密码(如果不需要认证,设为 None )
    }
    
    # 创建 Telegram 客户端并应用代理设置
    if proxy['proxy_type'] == 'mtproxy':
        client = TelegramClient(
            'session_name',
            api_id,
            api_hash,
            connection=ConnectionTcpMTProxyRandomizedIntermediate,
            proxy=(proxy['addr'], proxy['port'], proxy.get('secret'))  # MTProxy 需要 secret
        )
    else:
        client = TelegramClient(
            'session_name',
            api_id,
            api_hash,
            proxy=(proxy['proxy_type'].upper(), proxy['addr'], proxy['port'], proxy['username'], proxy['password'])
        )
    
    # 事件处理器:监听新消息
    @client.on(events.NewMessage(chats=group_name))
    async def handler(event):
        message = event.message.text
        if not message:  # 确保消息内容不为空
            return
        message = message.lower()  # 获取消息内容并转为小写
        # 检查消息是否包含任意关键字
        for keyword in keywords:
            if keyword.lower() in message:
                print(f"检测到关键字 '{keyword}' 在消息: {event.message.text}")
                print(f"发送者: {event.message.sender_id}, 时间: {event.message.date}")
                # 将消息保存到文件
                with open('keyword_messages.txt', 'a', encoding='utf-8') as f:
                    f.write(f"关键字: {keyword}\n 消息: {event.message.text}\n 发送者: {event.message.sender_id}\n 时间: {event.message.date}\n\n")
    
    async def main():
        # 启动客户端并登录
        try:
            await client.start(phone=phone_number)
            print("正在通过代理监控群聊消息...")
            # 保持脚本运行
            await client.run_until_disconnected()
        except Exception as e:
            print(f"启动客户端失败: {e}")
    
    # 运行脚本
    if __name__ == '__main__':
        asyncio.run(main())
    
    目前尚无回复
    关于   ·   帮助文档   ·   自助推广系统   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1253 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 23:32 · PVG 07:32 · LAX 16:32 · JFK 19:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.