Source code for tradingview_scraper.utils.ohlc_converter

from typing import List, Dict
import pkg_resources
import json
import os


[docs]class OHLCVConverter:
[docs] def __init__(self, target_timeframe: str): self.timeframes: dict = self._load_timeframes() self._validate_timeframe(target_timeframe) self.target_interval = self._timeframe_to_minutes(timeframe=target_timeframe)
def _timeframe_to_minutes(self, timeframe: str) -> int: """ Convert a given timeframe string to its equivalent in minutes. Args: timeframe (str): The timeframe to convert (e.g., '1m', '1h', '1d', '1w', '1M'). Returns: int: The equivalent value in minutes or None if conversion is not possible. """ conversion_map = { "1m": 1, "5m": 5, "15m": 15, "30m": 30, "1h": 60, "2h": 120, "4h": 240, "1d": 1440, "1w": 10080, "1M": 302400 } return conversion_map.get(timeframe) def _validate_timeframe(self, timeframe: str) -> None: """Validates the specified timeframe against the list of supported timeframes. Args: timeframe (str): The timeframe to validate. Raises: ValueError: If the specified timeframe is not in the list of supported timeframes. """ valid_timeframes = self.timeframes.keys() if timeframe not in valid_timeframes: raise ValueError("This timeframe is not supported! Please check the list of supported timeframes.")
[docs] def convert(self, data: List[Dict]) -> List[Dict]: if self.target_interval==1: return data if self.target_interval < 1: raise ValueError("Target interval must be 1 minute or greater") sorted_data = sorted(data, key=lambda x: x['timestamp']) resampled_data = [] current_group = None count = 0 idx = 0 for item in sorted_data: if count == 0: # Start a new group current_group = {'timestamp': item['timestamp']} # Initialize OHLCV if present if 'open' in item: current_group['open'] = item['open'] if 'high' in item: current_group['high'] = item['high'] if 'low' in item: current_group['low'] = item['low'] if 'close' in item: current_group['close'] = item['close'] if 'volume' in item: current_group['volume'] = item['volume'] # Add any other keys present in the item for key, value in item.items(): if key not in current_group: current_group[key] = value count = 1 else: # Update existing group if 'high' in current_group and 'high' in item: current_group['high'] = max(current_group['high'], item['high']) if 'low' in current_group and 'low' in item: current_group['low'] = min(current_group['low'], item['low']) if 'close' in item: current_group['close'] = item['close'] if 'volume' in current_group and 'volume' in item: current_group['volume'] += item['volume'] # Update other keys by setting their latest value for key, value in item.items(): if key not in {'timestamp', 'open', 'high', 'low', 'close', 'volume'}: current_group[key] = value count += 1 # Finalize the group if it reaches the target interval if count == self.target_interval: current_group['index'] = idx resampled_data.append(current_group) count = 0 idx += 1 # Add the last group if it exists if count > 0: current_group['index'] = idx resampled_data.append(current_group) return resampled_data
def _load_timeframes(self) -> dict: """Load timeframes from a specified file. Returns: dict: A dictionary of timeframes loaded from the file. Returns a dict with '1d' as default. """ path = pkg_resources.resource_filename('tradingview_scraper', 'data/timeframes.json') if not os.path.exists(path): print(f"[ERROR] Timeframe file not found at {path}.") return {"1d": None} try: with open(path, 'r', encoding="utf-8") as f: timeframes = json.load(f) return timeframes.get('indicators', {"1d": None}) except IOError as e: print(f"[ERROR] Error reading timeframe file: {e}") return {"1d": None}