class Student(
metaclass=GenericObjectMeta,
type_name='SQL_PUBLIC_STUDENT_TYPE',
schema={
'name': String,
'login': String,
'age': IntObject,
'gpa': DoubleObject,
}
):
pass
Daschinsky Ivan
![]() |
|
Massive performance improvement.
Support of Partition Awareness
Support of asyncio
Support of cluster api (activation-deactivation)
pyignite
class Student(
metaclass=GenericObjectMeta,
type_name='SQL_PUBLIC_STUDENT_TYPE',
schema={
'name': String,
'login': String,
'age': IntObject,
'gpa': DoubleObject,
}
):
pass
SQL_CONFIG = {
PROP_NAME: 'SQL_PUBLIC_STUDENT',
PROP_SQL_SCHEMA: 'PUBLIC',
PROP_QUERY_ENTITIES: [
...]
}
client = Client(partition_aware=True)
with client.connect([('127.0.0.1', 10800 + i) for i in range(3)]):
# client.get_cluster().set_state(ClusterState.ACTIVE)
student_cache = client.get_or_create_cache(SQL_CONFIG)
student_cache.put_all({i: Student(login='jdoe', name='John Doe', age=17, gpa=4.25) for i in range(10)})
print(student_cache.get(1))
print(student_cache.get_all([1, 2, 3]))
with client.sql(r'SELECT * FROM Student', include_field_names=True) as cursor:
for row in cursor:
print(row)
with student_cache.scan() as cursor:
for row in cursor:
print(row)
16 x Intel® Xeon® Gold 6248R CPU @ 3.00GHz, 32Gb
Ubuntu 18.04.3 x86_64
OpenJDK 1.8.0u282
, Apache Ignite 2.10.0
, python 3.8.9
, uvloop 0.15.2
4 nodes, caches without backups
-Xmx8G -Xms8G -XX:+UseG1GC
In-memory data region size — 18Gb
bytearray_sizes = [1024, 4096, 10 * 1024, 100 * 1024, 500 * 1024, 1024 * 1024, 10 * 1024 * 1024]
def bytearray_supplier(size):
data = bytearray(secrets.token_bytes(size))
def supply(key=None):
key = random.randrange(0, 1024) if key is None else key
return key, data
return supply
@pytest.mark.benchmark(group='bytearray_put')
@pytest.mark.parametrize('size', bytearray_sizes)
def benchmark_sync_bytearray_put(benchmark, cache, size):
kv_supplier = bytearray_supplier(size)
def put():
cache.put(*kv_supplier())
benchmark.pedantic(put, rounds=10, iterations=1000, warmup_rounds=10)
bytearray_sizes = [1024, 4096, 10 * 1024, 100 * 1024, 500 * 1024, 1024 * 1024, 10 * 1024 * 1024]
def load_data(cache, kv_supplier, key_range):
for k in range(key_range):
cache.put(*kv_supplier(k))
@pytest.mark.benchmark(group='bytearray_get')
@pytest.mark.parametrize('size', bytearray_sizes)
def benchmark_sync_bytearray_get(benchmark, cache, size):
kv_supplier = bytearray_supplier(size)
load_data(cache, kv_supplier, 1025)
def get():
k = random.randrange(0, 1025)
assert cache.get(k) == kv_supplier()[1]
benchmark.pedantic(get, rounds=10, iterations=1000, warmup_rounds=10)
data_sizes = [1024, 4096, 10 * 1024, 100 * 1024, 500 * 1024, 1024 * 1024]
class Data(
metaclass=GenericObjectMeta,
type_name='Data',
schema=OrderedDict([
('id', IntObject),
('data', ByteArrayObject)
])
):
pass
def binary_object_supplier(size):
data = secrets.token_bytes(size)
def supply(key=None):
key = random.randrange(0, 1024) if key is None else key
return key, Data(id=key, data=data)
return supply
@pytest.mark.parametrize('size', data_sizes)
@pytest.mark.benchmark(group='binary_object_put')
def benchmark_sync_binary_put(benchmark, cache, size):
kv_supplier = binary_object_supplier(size)
def put():
cache.put(*kv_supplier())
benchmark.pedantic(put, rounds=10, iterations=100, warmup_rounds=10)
data_sizes = [1024, 4096, 10 * 1024, 100 * 1024, 500 * 1024, 1024 * 1024]
def load_data(cache, kv_supplier, key_range):
for k in range(key_range):
cache.put(*kv_supplier(k))
@pytest.mark.parametrize('size', data_sizes)
@pytest.mark.benchmark(group='binary_object_get')
def benchmark_sync_binary_get(benchmark, cache, size):
load_data(cache, size, 1024)
def get():
k = random.randrange(0, 1024)
v = cache.get(k)
assert v and v.id == k
benchmark.pedantic(get, rounds=10, iterations=100, warmup_rounds=10)
Optimization of memory consumption.
BytesIO
memoryview
, bytearray
Hashcode calulation of string
, bytearray
and byte
has been implemented in C
client = Client(partition_aware=True)
with client.connect([('192.168.0.1', 10800), ...]):
...
GIL (sic!)
Because of GIL usefulness of multithreading
is questionable
Solution?
One of the solutions — event-loop
It is still a single thread
IO is not blocking, coroutines are suspendable.
There is a performance penalty of coroutine execution and creation.
But if you run IO-bound tasks in coroutines concurrently you can improve performance.
async def main_async():
client = AioClient(partition_aware=True)
async with client.connect([('127.0.0.1', 10800 + i) for i in range(3)]):
student_cache = await client.get_or_create_cache(SQL_CONFIG)
await student_cache.put_all({i: Student(login='jdoe', name='John Doe', age=17, gpa=4.25) for i in range(10)})
print(await student_cache.get(1))
print(await student_cache.get_all([1, 2, 3]))
async with client.sql(r'SELECT * FROM Student ORDER BY SID', include_field_names=True) as cursor:
async for row in cursor:
print(row)
async with student_cache.scan() as cursor:
async for row in cursor:
print(row)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
uvloop was used in benchmarks.
It is much more performant than standard asyncio event loop
Drop-in replacement of standard event loop
You possibly already use it:
bytearray_sizes = [1024, 4096, 10 * 1024, 100 * 1024, 500 * 1024, 1024 * 1024, 10 * 1024 * 1024]
coro_batches = [5, 10, 20]
@pytest.mark.async_bench
@pytest.mark.benchmark(group='bytearray_put')
@pytest.mark.parametrize('size', bytearray_sizes)
def benchmark_async_bytearray_put(benchmark, event_loop, aio_cache, size):
kv_supplier = bytearray_supplier(size)
async def put():
await aio_cache.put(*kv_supplier())
benchmark.pedantic(execute, args=(event_loop, put), rounds=10, iterations=1000, warmup_rounds=10)
@pytest.mark.async_bench
@pytest.mark.benchmark(group='bytearray_put')
@pytest.mark.parametrize('batch', coro_batches)
@pytest.mark.parametrize('size', bytearray_sizes)
def benchmark_async_bytearray_put_batched(benchmark, event_loop, aio_cache, size, batch):
kv_supplier = bytearray_supplier(size)
async def put():
await aio_cache.put(*kv_supplier())
async def put_batched():
await asyncio.gather(*[put() for _ in range(0, batch)])
benchmark.pedantic(execute, args=(event_loop, put_batched), rounds=10, iterations=1000 // batch, warmup_rounds=10)
Partition Aware must be enabled with asyncio.
Connection pool effects.
Expiry policy (TTL) (already merged to master
)
transactions
Connection pools (for asyncio
).
Proceed with optimizing serialization/deserialization (more C code!)