How-to for building high-performance python applications for Apache Ignite.

Daschinsky Ivan

me

What’s new.

  • Massive performance improvement.

  • Support of Partition Awareness

  • Support of asyncio

  • Support of cluster api (activation-deactivation)

How-to use pyignite

class Student(
    metaclass=GenericObjectMeta,
    type_name='SQL_PUBLIC_STUDENT_TYPE',
    schema={
        'name': String,
        'login': String,
        'age': IntObject,
        'gpa': DoubleObject,
    }
):
    pass
SQL_CONFIG = {
    PROP_NAME: 'SQL_PUBLIC_STUDENT',
    PROP_SQL_SCHEMA: 'PUBLIC',
    PROP_QUERY_ENTITIES: [
    ...]
}

client = Client(partition_aware=True)
with client.connect([('127.0.0.1', 10800 + i) for i in range(3)]):
    # client.get_cluster().set_state(ClusterState.ACTIVE)
    student_cache = client.get_or_create_cache(SQL_CONFIG)
    student_cache.put_all({i: Student(login='jdoe', name='John Doe', age=17, gpa=4.25) for i in range(10)})
    print(student_cache.get(1))
    print(student_cache.get_all([1, 2, 3]))

    with client.sql(r'SELECT * FROM Student', include_field_names=True) as cursor:
        for row in cursor:
            print(row)

    with student_cache.scan() as cursor:
        for row in cursor:
            print(row)

Benchmark environment

  • 16 x Intel® Xeon® Gold 6248R CPU @ 3.00GHz, 32Gb

  • Ubuntu 18.04.3 x86_64

  • OpenJDK 1.8.0u282, Apache Ignite 2.10.0, python 3.8.9, uvloop 0.15.2

    • 4 nodes, caches without backups

    • -Xmx8G -Xms8G -XX:+UseG1GC

    • In-memory data region size — 18Gb

Optimizations

bytearray_sizes = [1024, 4096, 10 * 1024, 100 * 1024, 500 * 1024, 1024 * 1024, 10 * 1024 * 1024]

def bytearray_supplier(size):
    data = bytearray(secrets.token_bytes(size))

    def supply(key=None):
        key = random.randrange(0, 1024) if key is None else key
        return key, data

    return supply

@pytest.mark.benchmark(group='bytearray_put')
@pytest.mark.parametrize('size', bytearray_sizes)
def benchmark_sync_bytearray_put(benchmark, cache, size):
    kv_supplier = bytearray_supplier(size)

    def put():
        cache.put(*kv_supplier())

    benchmark.pedantic(put, rounds=10, iterations=1000, warmup_rounds=10)
put bytearray
bytearray_sizes = [1024, 4096, 10 * 1024, 100 * 1024, 500 * 1024, 1024 * 1024, 10 * 1024 * 1024]

def load_data(cache, kv_supplier, key_range):
    for k in range(key_range):
        cache.put(*kv_supplier(k))

@pytest.mark.benchmark(group='bytearray_get')
@pytest.mark.parametrize('size', bytearray_sizes)
def benchmark_sync_bytearray_get(benchmark, cache, size):
    kv_supplier = bytearray_supplier(size)
    load_data(cache, kv_supplier, 1025)

    def get():
        k = random.randrange(0, 1025)
        assert cache.get(k) == kv_supplier()[1]

    benchmark.pedantic(get, rounds=10, iterations=1000, warmup_rounds=10)
get bytearray
data_sizes = [1024, 4096, 10 * 1024, 100 * 1024, 500 * 1024, 1024 * 1024]

class Data(
    metaclass=GenericObjectMeta,
    type_name='Data',
    schema=OrderedDict([
        ('id', IntObject),
        ('data', ByteArrayObject)
    ])
):
    pass

def binary_object_supplier(size):
    data = secrets.token_bytes(size)
    def supply(key=None):
        key = random.randrange(0, 1024) if key is None else key
        return key, Data(id=key, data=data)
    return supply

@pytest.mark.parametrize('size', data_sizes)
@pytest.mark.benchmark(group='binary_object_put')
def benchmark_sync_binary_put(benchmark, cache, size):
    kv_supplier = binary_object_supplier(size)
    def put():
        cache.put(*kv_supplier())
    benchmark.pedantic(put, rounds=10, iterations=100, warmup_rounds=10)
put binary object
data_sizes = [1024, 4096, 10 * 1024, 100 * 1024, 500 * 1024, 1024 * 1024]

def load_data(cache, kv_supplier, key_range):
    for k in range(key_range):
        cache.put(*kv_supplier(k))


@pytest.mark.parametrize('size', data_sizes)
@pytest.mark.benchmark(group='binary_object_get')
def benchmark_sync_binary_get(benchmark, cache, size):
    load_data(cache, size, 1024)

    def get():
        k = random.randrange(0, 1024)
        v = cache.get(k)
        assert v and v.id == k

    benchmark.pedantic(get, rounds=10, iterations=100, warmup_rounds=10)
get binary object

How have we achieved this?

  • Optimization of memory consumption.

    • BytesIO

    • memoryview, bytearray

  • Hashcode calulation of string, bytearray and byte has been implemented in C

Partition awareness

pa state
client = Client(partition_aware=True)
with client.connect([('192.168.0.1', 10800), ...]):
    ...
get bytearray pa
put bytearray pa
get binary object pa
put binary object pa

Asyncio

Concurrency in python

  • GIL (sic!)

  • Because of GIL usefulness of multithreading is questionable

  • Solution?

Concurrency in python

  • One of the solutions — event-loop

  • It is still a single thread

  • IO is not blocking, coroutines are suspendable.

Concurrency in python, asyncio

  • There is a performance penalty of coroutine execution and creation.

  • But if you run IO-bound tasks in coroutines concurrently you can improve performance.

async def main_async():
    client = AioClient(partition_aware=True)
    async with client.connect([('127.0.0.1', 10800 + i) for i in range(3)]):
        student_cache = await client.get_or_create_cache(SQL_CONFIG)
        await student_cache.put_all({i: Student(login='jdoe', name='John Doe', age=17, gpa=4.25) for i in range(10)})
        print(await student_cache.get(1))
        print(await student_cache.get_all([1, 2, 3]))

        async with client.sql(r'SELECT * FROM Student ORDER BY SID', include_field_names=True) as cursor:
            async for row in cursor:
                print(row)

        async with student_cache.scan() as cursor:
            async for row in cursor:
                print(row)

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.get_event_loop()

loop.run_until_complete(main())

Few words before benchmark’s demonstration.

  • uvloop was used in benchmarks.

    • It is much more performant than standard asyncio event loop

    • Drop-in replacement of standard event loop

    • You possibly already use it:

Benchmarks

bytearray_sizes = [1024, 4096, 10 * 1024, 100 * 1024, 500 * 1024, 1024 * 1024, 10 * 1024 * 1024]
coro_batches = [5, 10, 20]

@pytest.mark.async_bench
@pytest.mark.benchmark(group='bytearray_put')
@pytest.mark.parametrize('size', bytearray_sizes)
def benchmark_async_bytearray_put(benchmark, event_loop, aio_cache, size):
    kv_supplier = bytearray_supplier(size)

    async def put():
        await aio_cache.put(*kv_supplier())

    benchmark.pedantic(execute, args=(event_loop, put), rounds=10, iterations=1000, warmup_rounds=10)


@pytest.mark.async_bench
@pytest.mark.benchmark(group='bytearray_put')
@pytest.mark.parametrize('batch', coro_batches)
@pytest.mark.parametrize('size', bytearray_sizes)
def benchmark_async_bytearray_put_batched(benchmark, event_loop, aio_cache, size, batch):
    kv_supplier = bytearray_supplier(size)

    async def put():
        await aio_cache.put(*kv_supplier())

    async def put_batched():
        await asyncio.gather(*[put() for _ in range(0, batch)])

    benchmark.pedantic(execute, args=(event_loop, put_batched), rounds=10, iterations=1000 // batch, warmup_rounds=10)
get bytearray aio
get bytearray aio large
put bytearray aio
put bytearray aio large
get binary object aio
get binary object aio large
put binary object aio
put binary object aio large

Conclusions

  • Partition Aware must be enabled with asyncio.

  • Connection pool effects.

Future plans

  • Expiry policy (TTL) (already merged to master)

  • transactions

  • Connection pools (for asyncio).

  • Proceed with optimizing serialization/deserialization (more C code!)