주식 주가 데이터를 수집해보자



작성시간 :





(수정됨)

시장에서 거래되는 주식 종목들의 주가 데이터를 수집해보자.
이 분야에서는 pykrx라는 패키지가 유명한 듯 하지만, 스스로 코드를 짜는 것을 좋아하기 때문에 이와 비슷한 패키지를 직접 만들어볼 것이다.

주가 데이터는 어디서 수집할 수 있을까?

크롤링을 통해 수집하는 방법

이름네이버 증권다음 금융매일경제 증권KRX 정보데이터시스템
링크

https://finance.naver.com/sise/sise_rise.naver

https://finance.daum.net/domestic/rise_stocks

https://stock.mk.co.kr/domestic/all_stocks

http://data.krx.co.kr/contents/MDC/MDI/mdiLoader/index.cmd?menuId=MDC0201020101

장점가장 많은 사람들이 접하게 되는 데이터많은 사람들이 접하게 되는 데이터데이터 표시 방식이 비교적 간단하다정돈된 데이터
단점데이터 표시 방식이 난잡하다데이터 표시 방식이 난잡하다알 수 없는 정렬 순서여기 크롤링 해보다가 ip 밴 먹어서 현재 이용이 불가능함..

api를 통해 수집하는 방법

api 이용시 장점

  • 데이터가 일정한 형식으로 제공되고 데이터 제공이 중단되는 사태가 발생할 확률이 낮다.
  • 크롤링을 통해 접하는 데이터는 20분 정도의 지연 시간이 있지만, 증권사 api를 통해 주가 데이터를 요청하는 경우 실시간 데이터를 수신할 수 있다.

api 이용시 단점

  • 데이터를 제공해주는 곳마다 데이터 요청 방식과 형식이 다르기 때문에, 요청 방식을 익히는 것이 다소 까다롭다.
  • 증권사에 따라 api를 이용하기 위해 제시하는 조건이 있을 수 있다.
  • 증권사 api의 경우 대부분 윈도우에서만 api를 이용할 수 있다.(하나증권 제외)

공공데이터 api를 이용해보자

일단 당장 실시간 정보가 필요한 것은 아니니, 공공데이터 포털에서 제공해주는 api를 통해 간단히 주식 시세 데이터를 가져오는 코드를 만들어보기로 했다.

작성한 코드는 다음과 같다.

# 주식 시세 정보를 가져오는 코드
from decimal import Decimal
from datetime import datetime, timedelta

import requests
from bs4 import BeautifulSoup
from dateutil.parser import parse

class PRICE:
    def __init__(self, key: str) -> None:
        self.url = 'http://apis.data.go.kr/1160100/service/GetStockSecuritiesInfoService/getStockPriceInfo'

        self.params = {
            'serviceKey': key,
            'pageNo': 1,
            'resultType': 'xml',
            'basDt': '20220222',
            'numOfRows': 2,
        }

        self.dict_key = {
            'basDt': '기준일',
            'srtnCd': '종목코드', 'isinCd': 'isin', 'itmsNm': '종목명',
            'mrktCtg': '시장구분',
            'clpr': '종가', 'vs': '대비',
            'mkp': '시가', 'hipr': '고가', 'lopr': '저가',
            'trqu': '거래량', 'trPrc': '거래대금',
            'lstgStCnt': '상장주식수', 'mrktTotAmt': '시가총액',
        }

        self.one = Decimal('1')
        self.hundred = Decimal('100')

    def req(self, url: str, params: dict[str, str|int]):
        with requests.get(url, params=params) as r:
            soup = BeautifulSoup(r.text, 'xml')

        stat = soup.select_one('resultCode').text
        msg = soup.select_one('resultMsg').text

        if stat != '00':
            raise Exception(f'{stat}, {msg}')

        return soup

    def get(self, date: str|datetime=None):
        if not date: date = datetime.now() - timedelta(1)
        elif isinstance(date, str): date = parse(date)
        try: date = date.strftime('%Y%m%d')
        except:
            raise ValueError(f'date 인수는 유효한 날짜형식이어야 합니다.\n{type(date)=}')

        params = self.params
        params['basDt'] = date
        params['pageNo'] = 1
        soup = self.req(self.url, params)

        dict_code: dict[str, dict[str, str]] = {}
        for item in soup.select('items item'):
            data = {}
            for k, key in self.dict_key.items():
                data[key] = item.select_one(k).text
            data['대비'] = f"{Decimal(data['대비']):+}"
            end = Decimal(data['종가'])
            pre = end - Decimal(data['대비'])
            data['등락률'] = f"{((end / pre - self.one) * self.hundred).__round__(2):+06}"
            data['시가등락률'] = f"{((Decimal(data['시가']) / pre - self.one) * self.hundred).__round__(2):+06}"
            data['고가등락률'] = f"{((Decimal(data['고가']) / pre - self.one) * self.hundred).__round__(2):+06}"
            data['저가등락률'] = f"{((Decimal(data['저가']) / pre - self.one) * self.hundred).__round__(2):+06}"
            dict_code[data['종목코드']] = data

        return dict_code


if __name__ == '__main__':
    key = 'white.seolpyo.com'
    a = PRICE(key)
    b = a.get('20220222')
    for k, v in b.items(): print(f'  {k!r}: {v}')

>>>  '900110': {'기준일': '20220222', '종목코드': '900110', 'isin': 'HK0000057197', '종목명': '이스트아시아홀딩스',
    '시장구분': 'KOSDAQ', '종가': '161', '대비': '+37', '시가': '127', '고가': '161', '저가': '126', '거래량': '77682659', '거래대금': '11634529030',
    '상장주식수': '170480489', '시가총액': '27447358729', '등락률': '+29.84', '시가등락률': '+02.42', '고가등락률': '+29.84', '저가등락률': '+01.61'}
  '900270': {'기준일': '20220222', '종목코드': '900270', 'isin': 'HK0000214814', '종목명': '헝셩그룹',
    '시장구분': 'KOSDAQ', '종가': '391', '대비': '+1', '시가': '388', '고가': '395', '저가': '380', '거래량': '181845', '거래대금': '70033475',
    '상장주식수': '80000000', '시가총액': '31280000000', '등락률': '+00.26', '시가등락률': '-00.51', '고가등락률': '+01.28', '저가등락률': '-02.56'}

위 코드를 import하여 요청한 주식 시세 정보를 db에 저장할 수 있도록 해주는 코드를 다음과 같이 작성해보았다.

# 주가 정보를 db에 저장하는 코드
import sqlite3
from decimal import Decimal
from datetime import datetime, timedelta

from path.to.price.stock import Price as P
from path.to.calender import Calender

class Price:
    def __init__(self, db_path: str=':memory:', *, table_name='data', json_path: str=None, collect=False) -> None:
        self.hundred = Decimal('100')
        self.db = sqlite3.connect(db_path)
        self.cursor = self.db.cursor()

        self.table = table_name

        self.list_key = [
            '기준일',
            '종목코드', 'isin', '종목명',
            '시장구분',
            '소속부',
            '종가',
            '대비',
            '시가', '고가', '저가',
            '거래량', '거래대금',
            '거래량증가율', '거래대금증가율',
            '상장주식수', '시가총액',
        ]

        self.index_code = self.list_key.index('종목코드')

        self.list_data = [
            '기준일',
            '종목코드', 'isin', '종목명',
            '시장구분',
            '종가', '대비',
            '등락률',
            '시가', '고가', '저가',
            '시가등락률', '고가등락률', '저가등락률',
            '거래량', '거래대금',
            '거래량증가율', '거래대금증가율',
            '상장주식수', '시가총액',
        ]

        self.set_rate = {
            '등락률', '시가등락률', '고가등락률', '저가등락률',
        }

        # 테이블 존재 확인, 0 or 1
        check_table: int = self.cursor.execute(f"SELECT COUNT(*) FROM sqlite_master WHERE NAME='{self.table}'").fetchone()[0]
        # 테이블 생성
        if not check_table:
            keys = ','.join(f'{i} TEXT' for i in self.list_key)
            query = f'CREATE TABLE {self.table} (id INTEGER PRIMARY KEY,{keys});'
            self.cursor.execute(query)
            self.db.commit()

        self.P = P()
        self.C = Calender(db_path=json_path, collect=collect)

    def _req(self, date: datetime): return self.P.get(date)

    def load(self, date: str|datetime=None):
        date = self.P.convert_date(date)
        if self.C.is_holiday(date):
            print('=' * 30)
            print(f'\n\n휴장일입니다.\nfrom {__file__}\n\n')
            print('=' * 30)
            return {}

        keys = ','.join(self.list_key)
        dict_code: dict[str, dict[str, str]] = {}
        qeury = f"SELECT {keys} FROM {self.table} WHERE 기준일='{date.strftime('%Y.%m.%d')}';"
        for i in self.cursor.execute(qeury).fetchall():
            dict_code[i[self.index_code]] = {k: v for k, v in zip(self.list_key, i)}

        # 등락률 계산
        for code, i in dict_code.items():
            dict_code[code] = {}
            for k in self.list_data:
                if k in self.set_rate: dict_code[code][k] = ''
                else: dict_code[code][k] = i[k]

            e = Decimal(i['종가'])
            vs = Decimal(i['대비'])
            pre = e - vs
            for r in ['', '시가', '고가', '저가',]:
                p = Decimal(i[r]) if r else e
                dict_code[code][f'{r}등락률'] = f'{((p - pre) / pre * self.hundred).__round__(2):+06}'

        return dict_code

    def get(self, date: str|datetime=None):
        date = self.P.convert_date(date)
        if self.C.is_holiday(date):
            print('=' * 30)
            print(f'\n\n휴장일입니다.\nfrom {__file__}\n\n')
            print('=' * 30)
            return {}

        dict_code = self.load(date)
        # 거래량/거래대금 증가율 계산 판별자
        boolen = True
        if not dict_code: boolen = False

        if boolen: pass
        else:
            # 거래량/거래대금 증가율 계산하기
            dict_today = self._req(date)

            if dict_today:
                # 거래량/거래대금증가율 계산을 위한 이전 거래일 데이터 가져오기
                yesterday = date - timedelta(1)
                while self.C.is_holiday(yesterday): yesterday -= timedelta(1)

                dict_yesterday = self.load(yesterday)
                if not dict_yesterday: dict_yesterday = self._req(yesterday)

                # 거래량/거래대금 증가율 추가
                dict_code: dict[str, dict[str, str]] = {}
                for code, i in dict_today.items():
                    dict_code[code] = i
                    for k in ['거래량', '거래대금',]:
                        pre = Decimal(dict_yesterday.get(code, {}).get(k, '0'))
                        if pre:
                            v = Decimal(i[k])
                            dict_code[code][f'{k}증가율'] = f'{((v - pre) / pre * self.hundred).__round__(2):+}'
                        else: dict_code[code][f'{k}증가율'] = '+0.00'

                # 데이터 저장
                list_value = [','.join(f"'{i[k]}'" for k in self.list_key) for i in dict_code.values()]
                query = f"INSERT INTO {self.table}({','.join(self.list_key)}) VALUES({'),('.join(list_value)});"
                self.cursor.execute(query)
                self.db.commit()

        return dict_code


if __name__ == '__main__':
    db = 'path/to/c.sqlite'
    j = 'path/to/휴장일.txt'
    a = Price(db, table_name='data', json_path=j, collect=True)
    b = a.get('20200423')
    for k, v in b.items(): print(f'  {k!r}: {v}')

:memory:

":memory:"는 sqlite3 사용시 sqlite db를 메모리에 상주시키도록 하는 명령어다.
db 경로를 설정하지 않으면 컴퓨터 하드에 저장하지 않는다.

table_name

수동으로 쿼리문을 작성할 때 알아야 하는 테이블 명을 외부에서 지정할 수 있도록 인수로 설정하였다. 기본값은 "data"다.

Calender

별도로 작성한 코드로, 주식 휴장일 정보를 가져오는 코드다.
이 글에서는 해당 코드에 대한 내용을 공개할 예정이 없다.

등락률, 시가등락률, 고가등락률, 저가등락률

등락률과 시가등락률, 고가등락률 그리고 저가등락률은 따로 db에 저장하지 않는다.
db에 저장된 데이터로 계산할 수 있는 값이고, db 용량을 절약하기 위해서다.

1년치 주가 데이터를 db에 저장한 다음 용량을 비교해보았는데, 등락률 정보가 포함되었을 때 용량이 20~30Mb 더 많았다.
1년치 주가 데이터의 db 용량은 대략 100~130Mb이기 때문에, 대략 20~30% 정도의 용량을 절약할 수 있었다.

거래량증가율과 거래대금증가율

증권사 HTS에서 차트를 조회하면 거래량증가율과 거래대금증가율을 제공해주기 때문에 만들어보았다.
단, 등락률과 다르게 거래량증가율과 거래대금증가율 계산에는 다른 column의 데이터가 필요하기 때문에 계산후 db에 해당값을 저장한다.



태그



공유

하기






white.seolpyo.com