jiny

Stream parallel 병렬 처리에 List 사용 시, 충돌 문제 본문

Project/작은 지식

Stream parallel 병렬 처리에 List 사용 시, 충돌 문제

ongjiny 2023. 12. 27. 19:07

 

Stream.parallel() 을 사용하여 List 에 값을 추가하고, List를 반환하여 추가 작업을 하려는데 문제가 발생했습니다.

대략 아래와 같은 상황으로 List에 값을 추가하고 List를 그대로 반환하려고 했습니다.

@Test
public void List_는_멀티스레드에서_안전하지_않다() {
   List<Integer> list = new ArrayList<>();
   IntStream stream = IntStream.rangeClosed(0, 499);
   stream.parallel().forEach(i -> {
      list.add(i);
   });

   assertThat(list.size()).isEqualTo(500);
}

 

분명 parallel() 은 병렬 처리를 안전하게 쉽게 도와주는 것으로 알고

List 에는 500개의 요소가 있을 것으로 생각했지만, 매번 테스트 결과가 달랐습니다.

500개가 나올때도 있고, 아래처럼 495, 496개가 나올 때도 있었습니다.

 

이러한 문제는 병렬 처리 과정에서 여러 스레드에서 하나의 ArrayList 에 접근했기 때문입니다.

ArrayList 는 스레드 세이프 하지 않습니다.

 

해결방법은 CopyOnWriteArrayList와 SynchronizedList를 사용하는 방법이 있습니다.

CopyOnWriteArrayList 는 배열의 값을 세팅하고 추가하는 과정에 Lock 을 걸기 때문에 스레드 안전하다고 합니다.

SynchronizedList 는 자바의 synchronized 키워드를 사용하여 스레드 세이프하게 구현되어 있습니다.

 

 

그리고 비슷한 상황에서 stream의 collect를 사용할 수 있습니다.

아래는 List의 값들을 2배 씩하여 새로운 List에 추가하는 과정입니다.

당연하게도 ArrayList이기 때문에 테스트는 실패합니다.

@Test
public void List_는_멀티스레드에서_안전하지_않다() {
   List<Integer> tmp = new ArrayList<>();
   for(int i = 0 ; i <= 499 ; i++ ) {
      tmp.add(i);
   }

   List<Integer> list = new ArrayList<>();
   tmp.parallelStream().map(num -> num * 2 ).forEach(list::add);

   assertThat(list.size()).isEqualTo(500);
}

 

 

필터링을 거치고 collect를 사용해 새로운 List를 반환합니다

collect 는 cpu가 포크한 값들을 안전하게 모아 반환합니다.

@Test
public void List_는_멀티스레드에서_안전하지_않다() {
   List<Integer> tmp = new ArrayList<>();
   for(int i = 0 ; i <= 499 ; i++ ) {
      tmp.add(i);
   }

   List<Integer> list = tmp.parallelStream().map(num -> num * 2 ).collect(Collectors.toList());

   assertThat(list.size()).isEqualTo(500);
}

 

 

 

[참고]

모던자바인액션

https://mag1c.tistory.com/368

https://code0xff.tistory.com/182