Skip to content

Commit

Permalink
a
Browse files Browse the repository at this point in the history
  • Loading branch information
de2425c committed Nov 28, 2024
1 parent fa70058 commit 6b2aa74
Showing 1 changed file with 4 additions and 26 deletions.
30 changes: 4 additions & 26 deletions Data-Streams/data_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@
from datetime import datetime
import pytz
from websockets import connect
from google.cloud import storage

trades_filename = '/tmp/recent_trades.csv'
funding_filename = '/tmp/funding_rates.csv'
liquidations_filename = '/tmp/liquidation_events.csv'
trades_filename = 'recent_trades.csv'
funding_filename = 'funding_rates.csv'
liquidations_filename = 'liquidation_events.csv'

# Google Cloud Storage bucket name
bucket_name = "crypto-sheets-usd"

for filename, headers in [
(trades_filename, 'Event Time, Symbol, Trade ID, Price, Quantity, Trade Time, Is Buyer Maker\n'),
Expand All @@ -22,12 +19,6 @@
with open(filename, 'w') as f:
f.write(headers)

def upload_to_bucket(source_file_name, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"{source_file_name} uploaded to {bucket_name}/{destination_blob_name}.")

async def track_trades(uri, filename):
async with connect(uri) as websocket:
Expand Down Expand Up @@ -92,20 +83,7 @@ async def main():
trade_tasks = [track_trades(f"wss://fstream.binance.com/ws/{symbol}@aggTrade", trades_filename) for symbol in symbols]
funding_tasks = [track_funding_rates(symbol, funding_filename) for symbol in symbols]
liquidation_task = track_liquidations("wss://fstream.binance.com/ws/!forceOrder@arr", liquidations_filename)

async def upload_files_periodically():
while True:
try:
upload_to_bucket(trades_filename, "recent_trades.csv")
upload_to_bucket(funding_filename, "funding_rates.csv")
upload_to_bucket(liquidations_filename, "liquidation_events.csv")
print("Files uploaded successfully.")
except Exception as e:
print(f"Error uploading files: {e}")
await asyncio.sleep(3600) # Upload every hour

upload_task = asyncio.create_task(upload_files_periodically())
await asyncio.gather(*trade_tasks, *funding_tasks, liquidation_task, upload_task)
await asyncio.gather(*trade_tasks, *funding_tasks, liquidation_task)

asyncio.run(main())

Expand Down

0 comments on commit 6b2aa74

Please sign in to comment.