from longport.openapi import QuoteContext, SubType# Level-2 订阅类型SubType.Depth # 十档买卖盘SubType.Broker # 经纪商席位SubType.Trade # 逐笔成交
第一部分:十档买卖盘(Order Book)
关键指标:
from longport.openapi import Config, QuoteContext, SubType, PushDepthfrom rich.console import Consolefrom rich.table import Tablefrom rich.live import Livefrom decimal import Decimalimport timeconsole = Console()class DepthAnalyzer: """买卖盘分析器""" def __init__(self): self.config = Config( app_key="你的_app_key", app_secret="你的_app_secret", access_token="你的_access_token" ) self.ctx = QuoteContext(self.config) self.depth_data = {} def on_depth(self, symbol: str, event: PushDepth): """深度数据回调""" self.depth_data[symbol] = { "asks": event.asks, # 卖盘 [(价格, 数量), ...] "bids": event.bids, # 买盘 [(价格, 数量), ...] "timestamp": event.timestamp } def analyze_depth(self, symbol: str): """分析买卖盘结构""" if symbol not in self.depth_data: return None data = self.depth_data[symbol] asks = data["asks"] # 卖盘,价格从低到高 bids = data["bids"] # 买盘,价格从高到低 # 计算关键指标 best_ask = asks[0] if asks else (Decimal("0"), 0) # 卖一 best_bid = bids[0] if bids else (Decimal("0"), 0) # 买一 spread = best_ask[0] - best_bid[0] # 价差 spread_pct = (spread / best_bid[0] * 100) if best_bid[0] > 0 else 0 # 买卖盘总量 total_ask_vol = sum(qty for _, qty in asks) total_bid_vol = sum(qty for _, qty in bids) # 买卖比 bid_ask_ratio = total_bid_vol / total_ask_vol if total_ask_vol > 0 else 0 return { "spread": spread, "spread_pct": spread_pct, "bid_ask_ratio": bid_ask_ratio, "best_ask": best_ask, "best_bid": best_bid, "total_ask_vol": total_ask_vol, "total_bid_vol": total_bid_vol } def display_depth(self, symbol: str): """可视化展示买卖盘""" if symbol not in self.depth_data: return Table(title="无数据") data = self.depth_data[symbol] analysis = self.analyze_depth(symbol) table = Table( title=f"[bold cyan]{symbol} 十档买卖盘[/bold cyan]", box=box.ROUNDED ) table.add_column("档位", justify="center", width=8) table.add_column("买价", justify="right", style="green", width=12) table.add_column("买量", justify="right", style="green", width=10) table.add_column("卖价", justify="right", style="red", width=12) table.add_column("卖量", justify="right", style="red", width=10) asks = list(reversed(data["asks"])) # 卖盘从高到低显示 for i in range(10): row = [f"Lv{i+1}"] # 买盘 if i < len(data["bids"]): price, vol = data["bids"][i] row.extend([f"{price:.3f}", f"{vol:,}"]) else: row.extend(["-", "-"]) # 卖盘 if i < len(asks): price, vol = asks[i] row.extend([f"{price:.3f}", f"{vol:,}"]) else: row.extend(["-", "-"]) table.add_row(*row) # 添加分析摘要 if analysis: summary = ( f"价差: {analysis['spread']:.3f} ({analysis['spread_pct']:.3f}%) | " f"买卖比: {analysis['bid_ask_ratio']:.2f} | " f"买盘总量: {analysis['total_bid_vol']:,} | " f"卖盘总量: {analysis['total_ask_vol']:,}" ) table.title = f"[bold cyan]{symbol} 十档买卖盘[/bold cyan]\n[dim]{summary}[/dim]" return table def start(self, symbols): """启动监控""" self.ctx.set_on_depth(self.on_depth) self.ctx.subscribe(symbols, [SubType.Depth]) console.print(f"[bold green]🔍 启动 Level-2 买卖盘监控: {', '.join(symbols)}[/bold green]") with Live(refresh_per_second=2) as live: while True: for symbol in symbols: table = self.display_depth(symbol) live.update(table) time.sleep(0.5)# 使用if __name__ == "__main__": analyzer = DepthAnalyzer() analyzer.start(["700.HK", "TSLA.US"])
第二部分:逐笔成交(Time & Sales)
什么是逐笔成交?
Level-1 只告诉你这一分钟成交了1000股,Level-2 告诉你:
10:00:01 成交 100股,价格 423.50,主动买入
10:00:02 成交 200股,价格 423.48,主动卖出
10:00:03 成交 50股,价格 423.52,主动买入
关键判断:主动买入 vs 主动卖出
成交价 = 卖一价 → 主动买入(资金流入)
成交价 = 买一价 → 主动卖出(资金流出)
from longport.openapi import PushTradefrom collections import dequefrom datetime import datetimeclass TradeAnalyzer: """逐笔成交分析器""" def __init__(self, max_history=1000): self.trades = deque(maxlen=max_history) # 固定长度,防止内存溢出 self.buy_volume = 0 self.sell_volume = 0 def on_trade(self, symbol: str, event: PushTrade): """处理逐笔成交""" # 判断主动方向(简化版:与上一笔价格比较) trade = { "symbol": symbol, "price": event.price, "volume": event.volume, "timestamp": event.timestamp, "side": self._judge_side(event) } self.trades.append(trade) # 统计资金流向 if trade["side"] == "BUY": self.buy_volume += event.volume else: self.sell_volume += event.volume def _judge_side(self, event: PushTrade) -> str: """ 判断主动买卖方向 实际应该用当时的买卖盘数据对比,这里简化处理 """ # 如果有深度数据,对比成交价与买卖一价 # 这里简化为随机(实际项目中需要接入深度数据) return "BUY" if event.price >= event.prev_price else "SELL" def get_flow_analysis(self, window_seconds=60): """分析最近 N 秒资金流向""" cutoff = datetime.now().timestamp() - window_seconds recent_trades = [t for t in self.trades if t["timestamp"].timestamp() > cutoff] buy_vol = sum(t["volume"] for t in recent_trades if t["side"] == "BUY") sell_vol = sum(t["volume"] for t in recent_trades if t["side"] == "SELL") net_flow = buy_vol - sell_vol flow_ratio = buy_vol / (buy_vol + sell_vol) if (buy_vol + sell_vol) > 0 else 0.5 return { "buy_volume": buy_vol, "sell_volume": sell_vol, "net_flow": net_flow, "flow_ratio": flow_ratio, "signal": "流入" if net_flow > 0 else "流出" } def detect_big_orders(self, threshold=10000): """检测大单""" big_orders = [t for t in self.trades if t["volume"] >= threshold] return big_orders def display_trades(self, n=20): """展示最近 N 笔成交""" table = Table(title="逐笔成交明细") table.add_column("时间", width=12) table.add_column("价格", justify="right", width=10) table.add_column("数量", justify="right", width=10) table.add_column("方向", justify="center", width=8) table.add_column("标记", width=10) for trade in list(self.trades)[-n:]: color = "green" if trade["side"] == "BUY" else "red" marker = "🟢" if trade["side"] == "BUY" else "🔴" big_flag = " [bold]大单[/bold]" if trade["volume"] >= 10000 else "" table.add_row( trade["timestamp"].strftime("%H:%M:%S"), f"{trade['price']:.2f}", f"{trade['volume']:,}", f"[{color}]{trade['side']}[/{color}]", f"{marker}{big_flag}" ) return table
第三部分:完整整合 - Level-2 行情终端
#!/usr/bin/env python3"""LongPort Level-2 行情终端功能:实时展示十档买卖盘 + 逐笔成交 + 资金流向"""from longport.openapi import ( Config, QuoteContext, TradeContext, SubType, PushQuote, PushDepth, PushTrade)from rich.console import Consolefrom rich.layout import Layoutfrom rich.panel import Panelfrom rich.table import Tablefrom rich.live import Livefrom decimal import Decimalfrom datetime import datetimeimport timeconsole = Console()class Level2Terminal: """Level-2 行情终端""" def __init__(self): self.config = Config( app_key="你的_app_key", app_secret="你的_app_secret", access_token="你的_access_token" ) self.quote_ctx = QuoteContext(self.config) # 数据存储 self.quotes = {} # 最新报价 self.depths = {} # 买卖盘 self.trades = {} # 逐笔成交 # 统计 self.trade_stats = {} def on_quote(self, symbol: str, event: PushQuote): """报价回调""" self.quotes[symbol] = event def on_depth(self, symbol: str, event: PushDepth): """深度回调""" self.depths[symbol] = event def on_trade(self, symbol: str, event: PushTrade): """成交回调""" if symbol not in self.trades: self.trades[symbol] = [] self.trade_stats[symbol] = {"buy_vol": 0, "sell_vol": 0} self.trades[symbol].append({ "price": event.price, "volume": event.volume, "time": datetime.now() }) # 简化判断主动方向(实际应用需结合深度数据) # 这里假设价格>=昨收为主动买入 if event.price >= (self.quotes.get(symbol, event)).prev_close: self.trade_stats[symbol]["buy_vol"] += event.volume else: self.trade_stats[symbol]["sell_vol"] += event.volume def create_layout(self, symbol: str) -> Layout: """创建界面布局""" layout = Layout() # 分割为上下两部分 layout.split_column( Layout(name="upper", ratio=2), Layout(name="lower", ratio=1) ) # 上半部分左右分割:报价 + 买卖盘 layout["upper"].split_row( Layout(name="quote_info"), Layout(name="depth") ) # 下半部分:逐笔成交 layout["lower"].update(self._make_trade_panel(symbol)) # 填充数据 layout["quote_info"].update(self._make_quote_panel(symbol)) layout["depth"].update(self._make_depth_panel(symbol)) return layout def _make_quote_panel(self, symbol: str) -> Panel: """报价信息面板""" if symbol not in self.quotes: return Panel("等待数据...", title="报价") q = self.quotes[symbol] change = q.last_done - q.prev_close change_pct = (change / q.prev_close) * 100 color = "green" if change >= 0 else "red" content = f"""[bold {color}]{symbol}[/bold {color}]最新价: {q.last_done:.3f}涨跌额: {change:+.3f}涨跌幅: {change_pct:+.2f}%开盘: {q.open:.3f}最高: {q.high:.3f}最低: {q.low:.3f}成交量: {q.volume:,}时间: {q.timestamp} """ return Panel(content, title="[bold]实时报价[/bold]", border_style=color) def _make_depth_panel(self, symbol: str) -> Panel: """买卖盘面板""" if symbol not in self.depths: return Panel("等待深度数据...", title="买卖盘") d = self.depths[symbol] table = Table(show_header=False, box=None) table.add_column("买量", justify="right", style="green") table.add_column("买价", justify="right", style="green") table.add_column("卖价", justify="right", style="red") table.add_column("卖量", justify="right", style="red") # 合并显示5档 for i in range(5): bid_price = bid_vol = "-" ask_price = ask_vol = "-" if i < len(d.bids): bid_price, bid_vol = d.bids[i] bid_price = f"{bid_price:.3f}" bid_vol = f"{bid_vol:,}" if i < len(d.asks): ask_price, ask_vol = d.asks[i] ask_price = f"{ask_price:.3f}" ask_vol = f"{ask_vol:,}" table.add_row(bid_vol, bid_price, ask_price, ask_vol) return Panel(table, title="[bold]五档买卖盘[/bold]") def _make_trade_panel(self, symbol: str) -> Panel: """逐笔成交面板""" if symbol not in self.trades or not self.trades[symbol]: return Panel("等待成交数据...", title="逐笔成交") stats = self.trade_stats.get(symbol, {"buy_vol": 0, "sell_vol": 0}) total_vol = stats["buy_vol"] + stats["sell_vol"] buy_ratio = stats["buy_vol"] / total_vol if total_vol > 0 else 0.5 table = Table(title=f"主动买占比: {buy_ratio*100:.1f}%") table.add_column("时间", width=10) table.add_column("价格", justify="right", width=10) table.add_column("数量", justify="right", width=10) # 显示最近5笔 for t in self.trades[symbol][-5:]: color = "green" if t["price"] >= self.quotes.get(symbol, t).prev_close else "red" table.add_row( t["time"].strftime("%H:%M:%S"), f"[{color}]{t['price']:.2f}[/{color}]", f"{t['volume']:,}" ) return Panel(table, title="[bold]逐笔成交[/bold]") def start(self, symbols): """启动终端""" # 设置回调 self.quote_ctx.set_on_quote(self.on_quote) self.quote_ctx.set_on_depth(self.on_depth) self.quote_ctx.set_on_trade(self.on_trade) # 订阅数据 for symbol in symbols: self.quote_ctx.subscribe([symbol], [SubType.Quote, SubType.Depth, SubType.Trade]) console.print(f"[bold green]🚀 Level-2 终端启动 | 监控: {', '.join(symbols)}[/bold green]") # 实时刷新 with Live(refresh_per_second=2, screen=True) as live: while True: # 显示第一个标的(可扩展为多标的切换) layout = self.create_layout(symbols[0]) live.update(layout) time.sleep(0.5)if __name__ == "__main__": terminal = Level2Terminal() terminal.start(["700.HK", "TSLA.US"])
第四部分:Level-2 数据的应用策略
1. 盘口异动策略