import os
import platform
import sys
import time
from functools import partial

import pytest

from anyio import CancelScope, create_task_group, fail_after, to_process, wait_all_tasks_blocked

pytestmark = pytest.mark.anyio


@pytest.fixture(autouse=True)
def check_compatibility(anyio_backend_name: str) -> None:
    if anyio_backend_name == 'asyncio':
        if platform.system() == 'Windows' and sys.version_info < (3, 8):
            pytest.skip('Python < 3.8 uses SelectorEventLoop by default and it does not support '
                        'subprocesses')


async def test_run_sync_in_process_pool() -> None:
    """
    Test that the function runs in a different process, and the same process in both calls.

    """
    worker_pid = await to_process.run_sync(os.getpid)
    assert worker_pid != os.getpid()
    assert await to_process.run_sync(os.getpid) == worker_pid


async def test_identical_sys_path() -> None:
    """Test that partial() can be used to pass keyword arguments."""
    assert await to_process.run_sync(eval, 'sys.path') == sys.path


async def test_partial() -> None:
    """Test that partial() can be used to pass keyword arguments."""
    assert await to_process.run_sync(partial(sorted, reverse=True), ['a', 'b']) == ['b', 'a']


async def test_exception() -> None:
    """Test that exceptions are delivered properly."""
    with pytest.raises(ValueError, match='invalid literal for int'):
        assert await to_process.run_sync(int, 'a')


async def test_print() -> None:
    """Test that print() won't interfere with parent-worker communication."""
    worker_pid = await to_process.run_sync(os.getpid)
    await to_process.run_sync(print, 'hello')
    await to_process.run_sync(print, 'world')
    assert await to_process.run_sync(os.getpid) == worker_pid


async def test_cancel_before() -> None:
    """
    Test that starting to_process.run_sync() in a cancelled scope does not cause a worker
    process to be reserved.

    """
    with CancelScope() as scope:
        scope.cancel()
        await to_process.run_sync(os.getpid)

    pytest.raises(LookupError, to_process._process_pool_workers.get)


async def test_cancel_during() -> None:
    """
    Test that cancelling an operation on the worker process causes the process to be killed.

    """
    worker_pid = await to_process.run_sync(os.getpid)
    with fail_after(4):
        async with create_task_group() as tg:
            tg.start_soon(partial(to_process.run_sync, cancellable=True), time.sleep, 5)
            await wait_all_tasks_blocked()
            tg.cancel_scope.cancel()

    # The previous worker was killed so we should get a new one now
    assert await to_process.run_sync(os.getpid) != worker_pid
