-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathplace_stop_order.py
More file actions
74 lines (60 loc) · 1.96 KB
/
place_stop_order.py
File metadata and controls
74 lines (60 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import asyncio
import os
from aptos_sdk.account import Account
from aptos_sdk.ed25519 import PrivateKey
from decibel import (
TESTNET_CONFIG,
BaseSDKOptions,
DecibelWriteDex,
GasPriceManager,
PlaceOrderSuccess,
TimeInForce,
amount_to_chain_units,
)
from decibel.read import DecibelReadDex
async def main() -> None:
private_key = PrivateKey.from_hex(os.environ["PRIVATE_KEY"])
account = Account.load_key(private_key.hex())
gas = GasPriceManager(TESTNET_CONFIG)
await gas.initialize()
read = DecibelReadDex(TESTNET_CONFIG, api_key=os.environ.get("APTOS_NODE_API_KEY"))
markets = await read.markets.get_all()
btc_market = next((m for m in markets if m.market_name == "BTC/USD"), None)
if btc_market is None:
print("BTC/USD market not found")
await gas.destroy()
return
price = amount_to_chain_units(80000.0, btc_market.px_decimals)
size = amount_to_chain_units(0.001, btc_market.sz_decimals)
stop_price = amount_to_chain_units(95000.0, btc_market.px_decimals)
tick_size = btc_market.tick_size
write = DecibelWriteDex(
TESTNET_CONFIG,
account,
opts=BaseSDKOptions(
node_api_key=os.environ.get("APTOS_NODE_API_KEY"),
gas_price_manager=gas,
skip_simulate=False,
no_fee_payer=True,
time_delta_ms=0,
),
)
result = await write.place_order(
market_name="BTC/USD",
price=price,
size=size,
is_buy=True,
time_in_force=TimeInForce.GoodTillCanceled,
is_reduce_only=False,
stop_price=stop_price,
tick_size=tick_size,
)
if isinstance(result, PlaceOrderSuccess):
print("Stop order placed!")
print(f"Order ID: {result.order_id}")
print(f"Transaction hash: {result.transaction_hash}")
else:
print(f"Order failed: {result.error}")
await gas.destroy()
if __name__ == "__main__":
asyncio.run(main())