创建中心任务的管理系统
This commit is contained in:
89
tasks/tests/test_integration.py
Normal file
89
tasks/tests/test_integration.py
Normal file
@@ -0,0 +1,89 @@
|
||||
from django.test import TestCase
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APIClient
|
||||
from tasks.models import Client, Task, TaskResult
|
||||
from tasks.tests.test_factories import ClientFactory, TaskFactory
|
||||
|
||||
class TaskFlowIntegrationTest(TestCase):
|
||||
def setUp(self):
|
||||
self.client = APIClient()
|
||||
# Create a test client with token
|
||||
self.test_client = ClientFactory()
|
||||
self.token = self.test_client.token
|
||||
# Authenticate the client
|
||||
self.client.credentials(HTTP_AUTHORIZATION=f'Token {self.token}')
|
||||
|
||||
def test_full_task_flow(self):
|
||||
"""Test the full task flow: create → claim → start → complete"""
|
||||
# 1. Create a task
|
||||
create_url = reverse('task-list')
|
||||
create_data = {
|
||||
'name': 'integration_test_task',
|
||||
'client_name': self.test_client.name,
|
||||
'script': 'echo "Integration Test"',
|
||||
'timeout_seconds': 3600
|
||||
}
|
||||
create_response = self.client.post(create_url, create_data, format='json')
|
||||
self.assertEqual(create_response.status_code, status.HTTP_201_CREATED)
|
||||
task_id = create_response.data['id']
|
||||
|
||||
# 2. Claim the task
|
||||
claim_url = reverse('task-claim')
|
||||
claim_data = {
|
||||
'client_name': self.test_client.name
|
||||
}
|
||||
claim_response = self.client.post(claim_url, claim_data, format='json')
|
||||
self.assertEqual(claim_response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(claim_response.data['status'], 'assigned')
|
||||
self.assertEqual(claim_response.data['assigned_to'], self.test_client.name)
|
||||
|
||||
# 3. Start the task
|
||||
start_url = reverse('task-start', args=[task_id])
|
||||
start_response = self.client.post(start_url, format='json')
|
||||
self.assertEqual(start_response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(start_response.data['status'], 'running')
|
||||
self.assertIsNotNone(start_response.data['started_at'])
|
||||
|
||||
# 4. Complete the task
|
||||
complete_url = reverse('task-complete', args=[task_id])
|
||||
complete_data = {
|
||||
'status': 'success',
|
||||
'message': 'Task completed successfully'
|
||||
}
|
||||
complete_response = self.client.post(complete_url, complete_data, format='json')
|
||||
self.assertEqual(complete_response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(complete_response.data['status'], 'success')
|
||||
self.assertIsNotNone(complete_response.data['completed_at'])
|
||||
|
||||
# 5. Verify the task status in database
|
||||
task = Task.objects.get(id=task_id)
|
||||
self.assertEqual(task.status, 'success')
|
||||
self.assertEqual(task.assigned_to, self.test_client.name)
|
||||
self.assertIsNotNone(task.started_at)
|
||||
self.assertIsNotNone(task.completed_at)
|
||||
|
||||
def test_task_result_upload(self):
|
||||
"""Test that a client can upload task results"""
|
||||
# Create a test task
|
||||
task = TaskFactory()
|
||||
|
||||
# Upload a task result
|
||||
upload_url = reverse('taskresult-list')
|
||||
upload_data = {
|
||||
'task': task.id,
|
||||
'client': self.test_client.id,
|
||||
'status': 'success',
|
||||
'message': 'Result uploaded successfully'
|
||||
}
|
||||
|
||||
upload_response = self.client.post(upload_url, upload_data, format='json')
|
||||
self.assertEqual(upload_response.status_code, status.HTTP_201_CREATED)
|
||||
|
||||
# Verify the result exists in database
|
||||
self.assertEqual(TaskResult.objects.count(), 1)
|
||||
result = TaskResult.objects.get()
|
||||
self.assertEqual(result.task, task)
|
||||
self.assertEqual(result.client, self.test_client)
|
||||
self.assertEqual(result.status, 'success')
|
||||
Reference in New Issue
Block a user