Skip to content

Memory leaks AsyncClient #424

@ZwickVitaly

Description

@ZwickVitaly

Describe the bug

I need to write a massive (2b rows per 24 hours) amount of data every day.
Application does 1m requests per city. (4 cities)
Evey request gets 500 rows of info.
Every 4 requests I write rows to clickhouse db using clickhouse-connect -> 2k products per write
Got 8 celery processes, each does 125k requests part of all
Each process does 4 requests async.gather -> gets 4 lists of 500 rows, combines them to 2k list
There goes client.insert

Every 10 minutes SWAP memory increases by 50mb, BUT Database swap usage stays on same level always (70MB)
smem shows, that celery processes grow in SWAP memory.

I was worried that aiohttp leaks, but removing insert and doing just requests does not make memory grow at all

so the problem is insert

Steps to reproduce

  1. Run process, that writes something with AsyncClient.insert in a loop
  2. Wait
  3. See swap memory growth

Expected behaviour

No swap memory growth

Seems to me, that client does not clear QueryResult or QueryResultSummary properly, but I am not smart enough to figure it out.

Code example

made custom async context manager for getting connect

@asynccontextmanager
async def get_async_connection() -> AsyncClient:
    session = AsyncSession(CLICKHOUSE_CONFING)
    async with session as client:
        yield client


class AsyncSession:

    def __init__(self, clickhouse_config):
        self.config = clickhouse_config

    async def __aenter__(self) -> AsyncClient:
        self.client = await clickhouse_connect.get_async_client(**self.config)
        return self.client

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.client.close()

this is request function

async def get_city_result(city, date, requests, request_batch_no, client=None):
    requests_list = [r for r in requests if not r[1].isdigit() or "javascript" not in r[1]]
    batch_size = 4
    prev = 0
    async with ClientSession() as http_session:
        async with get_async_connection() as client:
            for batch_i in range(batch_size, len(requests_list) + 1, batch_size):
                request_batch = requests_list[prev:batch_i]
                requests_tasks = [
                    asyncio.create_task(
                        get_r_data(
                            r=r,
                            city=city,
                            date=date,
                            http_session=http_session,
                        )
                    )
                    for r in request_batch
                ]
                product_batches: tuple = await asyncio.gather(*requests_tasks)
                prev = batch_i
                full_res = []
                for batch in product_batches:
                    full_res.extend(batch)
                await save_to_db(
                    items=full_res,
                    table="request_product",
                    fields=["product", "city", "date", "query", "place", "advert", "natural_place", "cpm"],
                    client=client
                )

and this is saving to db function:

async def save_to_db(items, table, fields, client: AsyncClient):
    if items:
        try:
            await client.insert(table, items, column_names=fields)
        except Exception as e:
            logger.critical(f"{e}, {items}")

yes, I tried to do

async def save_to_db(items, table, fields):
    if items:
        try:
            async with get_async_connection() as client:
                await client.insert(table, items, column_names=fields)
        except Exception as e:
            logger.critical(f"{e}, {items}")

made it worse

Configuration

Environment

  • clickhouse-connect version: 0.8.6
  • Python version: 3.12
  • Operating system: docker (python:3.12-alpine)

ClickHouse server

  • ClickHouse Server version: 22.1
  • CREATE TABLE statements for tables involved:
client.command('''CREATE TABLE IF NOT EXISTS request_product (
                        product UInt32 CODEC(LZ4HC),
                        city UInt8 CODEC(LZ4HC),
                        date UInt16 CODEC(LZ4HC),
                        query UInt32 CODEC(LZ4HC),
                        place UInt16 Codec(LZ4HC),
                        advert FixedString(1) Codec(LZ4HC),
                        natural_place UInt16 Codec (LZ4HC),
                        cpm UInt16 DEFAULT 0 CODEC(LZ4HC)
                    ) ENGINE = MergeTree()
                    PRIMARY KEY (product, city, date) 
                    ORDER BY (product, city, date, query);''')

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingneed infoPlease provide additional information

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions