Complete Implementation Guide
This document provides comprehensive implementation details for the background task system, including protocol buffer definitions, Firestore schema, frontend integration, and testing strategies.
Protocol Buffer Definitions
Task Proto Schema
The Task proto message defines the complete task document structure:
message Task {
enum TaskStatus {
PENDING = 0;
PROCESSING = 1;
COMPLETE = 2;
FAILED = 3;
CANCELLED = 4;
}
string task_id = 1;
string task_type = 2;
TaskStatus status = 3;
int32 progress = 4; // 0-100
string message = 5;
string created_by = 6;
string created_at = 7;
string updated_at = 8;
string completed_at = 9;
string failed_at = 10;
string request_metadata = 11; // JSON serialized request
map<string, string> result = 12;
string project_id = 13; // For efficient filtering
MetaCostAnalysis cost_analysis = 14;
repeated ProgressStep progress_steps = 15;
}
Task Update Proto
Defines partial updates for progress tracking:
message TaskUpdate {
Task.TaskStatus status = 1;
int32 progress = 2;
string message = 3;
string updated_at = 4;
string completed_at = 5;
string failed_at = 6;
map<string, string> result = 7;
PlanIngestionStep current_step = 8;
}
Progress Step Tracking
Comprehensive step-by-step progress tracking:
message ProgressStep {
enum StepStatus {
PENDING = 0;
IN_PROGRESS = 1;
COMPLETED = 2;
FAILED = 3;
SKIPPED = 4;
}
PlanIngestionStep step = 1; // Legacy enum field
string step_name = 2; // Generic step name
string step_type = 3; // Type discriminator (plan_ingestion, code_applicability, etc.)
string name = 4; // Human-readable step title
int32 step_number = 5; // Sequential order
int32 total_steps = 6; // Total expected steps
StepStatus status = 7;
string started_at = 8;
string completed_at = 9;
int64 duration_ms = 10;
string details = 11; // Optional message/error
}
Firestore Schema
Task Document Structure
Tasks are stored in the tasks collection with auto-serialized proto structure:
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"task_type": "code-applicability",
"status": "PROCESSING",
"progress": 65,
"message": "Analyzing section 3 of 5",
"created_by": "user@example.com",
"project_id": "R2024.0091-2024-10-14",
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T10:32:15Z",
"request_metadata": "{\"architectural_project_id\":\"R2024.0091\",\"page_number\":6}",
"progress_steps": [
{
"step_name": "RETRIEVE_PLAN_CONTENT",
"step_type": "code_applicability",
"name": "Retrieving plan content",
"step_number": 1,
"total_steps": 11,
"status": "COMPLETED",
"started_at": "2024-01-15T10:30:10Z",
"completed_at": "2024-01-15T10:30:45Z",
"duration_ms": 35000
},
{
"step_name": "ANALYZE_SECTIONS",
"step_type": "code_applicability",
"name": "Analyzing building code sections",
"step_number": 3,
"total_steps": 11,
"status": "IN_PROGRESS",
"started_at": "2024-01-15T10:31:30Z"
}
],
"cost_analysis": {
"total_cost_usd": 0.0125,
"model_costs": {
"gemini-1.5-flash": {
"input_tokens": 2500,
"output_tokens": 800,
"cost_usd": 0.0125
}
}
}
}
Firestore Proto Converter
Automatic proto-to-Firestore conversion with timestamp handling:
public class FirestoreProtoConverter {
/**
* Converts proto message to Firestore-compatible map.
* Automatically handles:
* - ISO 8601 timestamp strings -> Firestore Timestamp objects
* - Proto enums -> String values
* - Nested messages -> Nested maps
*/
public static Map<String, Object> convertToFirestore(Message protoMessage) {
try {
// Convert proto to JSON
String json = JsonFormat.printer().print(protoMessage);
// Parse to Map
Gson gson = new Gson();
Map<String, Object> map = gson.fromJson(json, Map.class);
// Convert timestamp fields
convertTimestampFields(map);
return map;
} catch (Exception e) {
logger.warning("Proto conversion failed: " + e.getMessage());
return new HashMap<>();
}
}
/**
* Converts fields matching timestamp patterns to Firestore Timestamp objects.
* Patterns: *_at, *At, *timestamp*, *Timestamp*
*/
private static void convertTimestampFields(Map<String, Object> map) {
for (Map.Entry<String, Object> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
// Check if field name indicates a timestamp
if (isTimestampField(key) && value instanceof String) {
try {
Instant instant = Instant.parse((String) value);
entry.setValue(com.google.cloud.Timestamp.ofTimeSecondsAndNanos(
instant.getEpochSecond(), instant.getNano()));
} catch (Exception e) {
// Keep as string if parsing fails
}
} else if (value instanceof Map) {
// Recursively process nested objects
convertTimestampFields((Map<String, Object>) value);
}
}
}
}
Frontend Implementation
Angular Service Integration
TaskService
Handles gRPC communication and task management:
@Injectable({
providedIn: 'root'
})
export class TaskService {
constructor(
private taskServiceClient: TaskServiceClient,
private codeApplicabilityClient: CodeApplicabilityServiceClient
) {}
/**
* Start code applicability analysis
*/
startCodeApplicabilityAnalysis(
projectId: string,
pageNumber: number,
iccBookId: string,
maxRelevantChapters?: number
): Observable<StartCodeApplicabilityAnalysisResponse> {
const request = new StartCodeApplicabilityAnalysisRequest();
request.setArchitecturalProjectId(projectId);
request.setPageNumber(pageNumber);
request.setIccBookId(iccBookId);
if (maxRelevantChapters) {
request.setMaxRelevantChapters(maxRelevantChapters);
}
return from(this.codeApplicabilityClient.startAsyncCodeApplicabilityAnalysis(request, {}))
.pipe(
map(response => response.toObject()),
catchError(error => {
console.error('Failed to start analysis:', error);
return throwError(() => error);
})
);
}
/**
* Get task status (one-time check)
*/
getTaskStatus(taskId: string): Observable<Task> {
const request = new GetTaskStatusRequest();
request.setTaskId(taskId);
return from(this.taskServiceClient.getTaskStatus(request, {}))
.pipe(
map(response => response.toObject().task as Task),
catchError(error => {
console.error('Failed to get task status:', error);
return throwError(() => error);
})
);
}
}
FirestoreTaskTrackingService
Real-time progress tracking via Firestore:
@Injectable({
providedIn: 'root'
})
export class FirestoreTaskTrackingService {
constructor(private firestore: Firestore) {}
/**
* Subscribe to real-time task updates
*/
trackTask(taskId: string): Observable<Task> {
const taskDoc = doc(this.firestore, `tasks/${taskId}`);
return docSnapshots(taskDoc).pipe(
map(snapshot => {
if (!snapshot.exists()) {
throw new Error(`Task ${taskId} not found`);
}
return this.convertFirestoreToTask(snapshot.data());
}),
catchError(error => {
console.error('Firestore tracking error:', error);
return throwError(() => error);
})
);
}
/**
* Track multiple tasks simultaneously
*/
trackTasks(taskIds: string[]): Observable<Task[]> {
return combineLatest(
taskIds.map(taskId => this.trackTask(taskId))
);
}
/**
* Wait for task completion
*/
waitForCompletion(taskId: string, timeout: number = 300000): Observable<Task> {
return this.trackTask(taskId).pipe(
filter(task =>
task.status === 'COMPLETE' ||
task.status === 'FAILED' ||
task.status === 'CANCELLED'
),
take(1),
timeout({
each: timeout,
with: () => throwError(() => new Error('Task timeout'))
})
);
}
private convertFirestoreToTask(data: any): Task {
return {
taskId: data.task_id,
taskType: data.task_type,
status: data.status,
progress: data.progress || 0,
message: data.message || '',
createdBy: data.created_by,
createdAt: data.created_at,
updatedAt: data.updated_at,
completedAt: data.completed_at,
progressSteps: data.progress_steps || [],
costAnalysis: data.cost_analysis
};
}
}
Progress Display Component
AsyncTaskProgressComponent
Real-time progress visualization:
@Component({
selector: 'app-async-task-progress',
templateUrl: './async-task-progress.component.html',
styleUrls: ['./async-task-progress.component.scss']
})
export class AsyncTaskProgressComponent implements OnInit, OnDestroy {
@Input() taskIds: string[] = [];
@Output() taskCompleted = new EventEmitter<Task[]>();
@Output() taskFailed = new EventEmitter<Task>();
tasks$: Observable<Task[]> | null = null;
overallProgress = 0;
private destroy$ = new Subject<void>();
constructor(
private firestoreTracking: FirestoreTaskTrackingService,
private snackBar: MatSnackBar
) {}
ngOnInit(): void {
if (this.taskIds.length > 0) {
this.startTracking();
}
}
startTracking(): void {
this.tasks$ = this.firestoreTracking.trackTasks(this.taskIds).pipe(
tap(tasks => {
this.calculateOverallProgress(tasks);
this.checkCompletion(tasks);
}),
takeUntil(this.destroy$)
);
}
calculateOverallProgress(tasks: Task[]): void {
if (tasks.length === 0) {
this.overallProgress = 0;
return;
}
const totalProgress = tasks.reduce((sum, task) => sum + task.progress, 0);
this.overallProgress = Math.round(totalProgress / tasks.length);
}
checkCompletion(tasks: Task[]): void {
const allComplete = tasks.every(t => t.status === 'COMPLETE');
const anyFailed = tasks.find(t => t.status === 'FAILED');
if (allComplete) {
this.taskCompleted.emit(tasks);
this.snackBar.open('All tasks completed successfully', 'Close', { duration: 3000 });
} else if (anyFailed) {
this.taskFailed.emit(anyFailed);
this.snackBar.open(`Task failed: ${anyFailed.message}`, 'Close', { duration: 5000 });
}
}
getStatusIcon(status: string): string {
switch (status) {
case 'COMPLETE': return 'check_circle';
case 'FAILED': return 'error';
case 'PROCESSING': return 'hourglass_empty';
case 'PENDING': return 'schedule';
default: return 'help';
}
}
getStatusColor(status: string): string {
switch (status) {
case 'COMPLETE': return 'primary';
case 'FAILED': return 'warn';
case 'PROCESSING': return 'accent';
default: return '';
}
}
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
}
Template (async-task-progress.component.html)
<mat-card class="task-progress-card">
<mat-card-header>
<mat-card-title>Task Progress</mat-card-title>
<mat-card-subtitle>{{ overallProgress }}% Complete</mat-card-subtitle>
</mat-card-header>
<mat-card-content>
<!-- Overall Progress Bar -->
<mat-progress-bar
mode="determinate"
[value]="overallProgress"
[color]="overallProgress === 100 ? 'primary' : 'accent'">
</mat-progress-bar>
<!-- Individual Task Progress -->
<div *ngIf="tasks$ | async as tasks" class="task-list">
<div *ngFor="let task of tasks" class="task-item">
<div class="task-header">
<mat-icon [color]="getStatusColor(task.status)">
{{ getStatusIcon(task.status) }}
</mat-icon>
<span class="task-type">{{ task.taskType }}</span>
<span class="task-progress">{{ task.progress }}%</span>
</div>
<mat-progress-bar
mode="determinate"
[value]="task.progress"
[color]="getStatusColor(task.status)">
</mat-progress-bar>
<div class="task-message">{{ task.message }}</div>
<!-- Step Details -->
<div *ngIf="task.progressSteps?.length > 0" class="steps-container">
<div *ngFor="let step of task.progressSteps" class="step-item">
<mat-icon class="step-icon" [class.step-complete]="step.status === 'COMPLETED'">
{{ step.status === 'COMPLETED' ? 'check' : 'radio_button_unchecked' }}
</mat-icon>
<span>{{ step.name }}</span>
</div>
</div>
</div>
</div>
</mat-card-content>
</mat-card>
Testing
Backend Unit Tests
Test task creation and progress updates:
@Test
public void testTaskCreationAndProgress() {
TaskServiceImpl taskService = new TaskServiceImpl();
// Create task
String taskId = taskService.createTask(
"test-task",
"{\"test\":\"data\"}",
"test@example.com",
null
);
assertNotNull(taskId);
// Update progress
taskService.updateTaskProgress(taskId, "processing", 50, "Halfway there");
// Verify in Firestore
Task task = taskService.getTask(taskId);
assertEquals(50, task.getProgress());
assertEquals("processing", task.getStatus().name().toLowerCase());
}
Frontend Integration Tests
Test real-time progress tracking:
describe('FirestoreTaskTrackingService', () => {
let service: FirestoreTaskTrackingService;
let firestore: Firestore;
beforeEach(() => {
TestBed.configureTestingModule({
providers: [
FirestoreTaskTrackingService,
{ provide: Firestore, useValue: mockFirestore }
]
});
service = TestBed.inject(FirestoreTaskTrackingService);
});
it('should track task progress', (done) => {
const taskId = 'test-task-123';
service.trackTask(taskId).subscribe(task => {
expect(task.taskId).toBe(taskId);
expect(task.progress).toBeGreaterThanOrEqual(0);
done();
});
});
it('should wait for task completion', (done) => {
const taskId = 'test-task-456';
service.waitForCompletion(taskId).subscribe(task => {
expect(['COMPLETE', 'FAILED', 'CANCELLED']).toContain(task.status);
done();
});
});
});
Deployment
Backend Dependencies
Add to pom.xml:
<dependencies>
<!-- Firestore -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-firestore</artifactId>
<version>3.25.2</version>
</dependency>
<!-- Cloud Run Jobs (optional) -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-run</artifactId>
<version>0.15.0</version>
</dependency>
</dependencies>
Environment Variables
Required for Cloud Run Service:
GCP_PROJECT_ID=construction-code-expert-dev
GCP_LOCATION=us-central1
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
Firestore Security Rules
rules_version = '2';
service cloud.firestore {
match /databases/{database}/documents {
match /tasks/{taskId} {
// Users can read their own tasks
allow read: if request.auth != null &&
resource.data.created_by == request.auth.token.email;
// Only backend service can create/update tasks
allow write: if false; // Backend uses service account
}
}
}
Monitoring and Observability
Key Metrics to Track
- Task Duration: Average time to completion
- Success Rate: Percentage of successful vs failed tasks
- CPU Throttling Impact: ExecutorService task duration variance
- Cost per Task: LLM costs tracked in
cost_analysis - Queue Depth: Number of PENDING tasks
Cloud Logging Queries
# View all task activity
gcloud logging read "resource.type=cloud_run_revision AND textPayload=~'Task.*progress'" --limit 100
# Filter by task ID
gcloud logging read "resource.type=cloud_run_revision AND textPayload=~'task-id-here'" --limit 50
# Find failed tasks
gcloud logging read "resource.type=cloud_run_revision AND severity=ERROR AND textPayload=~'Task.*FAILED'" --limit 20
Best Practices Summary
- Proto-First Design: Define all schemas in proto files
- Auto-Serialization: Use
FirestoreProtoConverterconsistently - Step Tracking: Always update progress steps for visibility
- Cost Tracking: Accumulate LLM costs for billing transparency
- Error Handling: Log errors, update Firestore, notify users
- Idempotency: Design tasks to be safely retryable
- Graceful Degradation: Handle Cloud Run Jobs unavailability
- Comprehensive Testing: Test both happy and error paths