1. åç§åæ¥æ§å¶å·¥å ·ç使ç¨
1.1 ReentrantLock
ReentrantLockæè§ä¸æ¯synchronizedçå¢å¼ºçï¼synchronizedçç¹ç¹æ¯ä½¿ç¨ç®åï¼ä¸å交ç»JVMå»å¤çï¼ä½æ¯åè½ä¸æ¯æ¯è¾èå¼±çãå¨JDK1.5ä¹åï¼ReentrantLockçæ§è½è¦å¥½äºsynchronizedï¼ç±äºå¯¹JVMè¿è¡äºä¼åï¼ç°å¨çJDKçæ¬ä¸ï¼ä¸¤è æ§è½æ¯ä¸ç¸ä¸ä¸çãå¦ææ¯ç®åçå®ç°ï¼ä¸è¦å»æå»ä½¿ç¨ReentrantLockã
ç¸æ¯äºsynchronizedï¼ReentrantLockå¨åè½ä¸æ´å 丰å¯ï¼å®å ·æå¯éå ¥ãå¯ä¸æãå¯éæ¶ãå ¬å¹³éçç¹ç¹ã
é¦å æ们éè¿ä¸ä¸ªä¾åæ¥è¯´æReentrantLockæåæ¥çç¨æ³ï¼
package test;
import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable{ public static ReentrantLock lock = new ReentrantLock(); public static int i = 0;
@Override public void run() { for (int j = 0; j < 10000000; j++)
{ lock.lock(); try
{
i++;
} finally
{ lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
Test test = new Test();
Thread t1 = new Thread(test);
Thread t2 = new Thread(test);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
æ两个线ç¨é½å¯¹iè¿è¡++æä½ï¼ä¸ºäºä¿è¯çº¿ç¨å®å ¨ï¼ä½¿ç¨äº ReentrantLockï¼ä»ç¨æ³ä¸å¯ä»¥çåºï¼ä¸ synchronizedç¸æ¯ï¼ReentrantLockå°±ç¨å¾®å¤æä¸ç¹ãå ä¸ºå¿ é¡»å¨finallyä¸è¿è¡è§£éæä½ï¼å¦æä¸å¨ finally解éï¼æå¯è½ä»£ç åºç°å¼å¸¸é没被éæ¾ï¼èsynchronizedæ¯ç±JVMæ¥éæ¾éã
é£ä¹ReentrantLockå°åºæåªäºä¼ç§çç¹ç¹å¢ï¼
1.1.1 å¯éå ¥
å线ç¨å¯ä»¥éå¤è¿å ¥ï¼ä½è¦éå¤éåº
lock.lock();
lock.lock();try{
i++;
}
finally{
lock.unlock();
lock.unlock();
}
ç±äºReentrantLockæ¯éå ¥éï¼æ以å¯ä»¥åå¤å¾å°ç¸åçä¸æéï¼å®æä¸ä¸ªä¸éç¸å ³çè·å计æ°å¨ï¼å¦ææ¥æéçæ个线ç¨å次å¾å°éï¼é£ä¹è·å计æ°å¨å°±å 1ï¼ç¶åééè¦è¢«éæ¾ä¸¤æ¬¡æè½è·å¾çæ£éæ¾(éå ¥é)ãè¿æ¨¡ä»¿äº synchronized çè¯ä¹ï¼å¦æ线ç¨è¿å ¥ç±çº¿ç¨å·²ç»æ¥æççæ§å¨ä¿æ¤ç synchronized åï¼å°±å 许线ç¨ç»§ç»è¿è¡ï¼å½çº¿ç¨éåºç¬¬äºä¸ªï¼æè åç»ï¼ synchronized åçæ¶åï¼ä¸éæ¾éï¼åªæ线ç¨éåºå®è¿å ¥ççæ§å¨ä¿æ¤ç第ä¸ä¸ªsynchronized åæ¶ï¼æéæ¾éã
public class Child extends Father implements Runnable{ final static Child child = new Child();//为äºä¿è¯éå¯ä¸
public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(child).start();
}
}
public synchronized void doSomething() {
System.out.println("1child.doSomething()");
doAnotherThing(); // è°ç¨èªå·±ç±»ä¸å
¶ä»çsynchronizedæ¹æ³
}
private synchronized void doAnotherThing() { super.doSomething(); // è°ç¨ç¶ç±»çsynchronizedæ¹æ³
System.out.println("3child.doAnotherThing()");
}
@Override
public void run() {
child.doSomething();
}
}class Father { public synchronized void doSomething() {
System.out.println("2father.doSomething()");
}
}
æ们å¯ä»¥çå°ä¸ä¸ªçº¿ç¨è¿å ¥ä¸åç synchronizedæ¹æ³ï¼æ¯ä¸ä¼éæ¾ä¹åå¾å°çéçãæ以è¾åºè¿æ¯é¡ºåºè¾åºãæ以synchronizedä¹æ¯éå ¥é
è¾åºï¼
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
...
1.1.2.å¯ä¸æ
ä¸synchronizedä¸åçæ¯ï¼ReentrantLock对ä¸ææ¯æååºçãä¸æç¸å ³ç¥è¯æ¥ç[é«å¹¶åJava äº] å¤çº¿ç¨åºç¡
æ®éçlock.lock()æ¯ä¸è½ååºä¸æçï¼lock.lockInterruptibly()è½å¤ååºä¸æã
æ们模æåºä¸ä¸ªæ»éç°åºï¼ç¶åç¨ä¸ææ¥å¤çæ»é
package test;import java.lang.management.ManagementFactory;import java.lang.management.ThreadInfo;import java.lang.management.ThreadMXBean;import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable{ public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public Test(int lock)
{ this.lock = lock;
} @Override
public void run()
{ try
{ if (lock == 1)
{
lock1.lockInterruptibly(); try
{
Thread.sleep(500);
} catch (Exception e)
{ // TODO: handle exception
}
lock2.lockInterruptibly();
} else
{
lock2.lockInterruptibly(); try
{
Thread.sleep(500);
} catch (Exception e)
{ // TODO: handle exception
}
lock1.lockInterruptibly();
}
} catch (Exception e)
{ // TODO: handle exception
} finally
{ if (lock1.isHeldByCurrentThread())
{
lock1.unlock();
} if (lock2.isHeldByCurrentThread())
{
lock2.unlock();
}
System.out.println(Thread.currentThread().getId() + ":线ç¨éåº");
}
} public static void main(String[] args) throws InterruptedException {
Test t1 = new Test(1);
Test t2 = new Test(2);
Thread thread1 = new Thread(t1);
Thread thread2 = new Thread(t2);
thread1.start();
thread2.start();
Thread.sleep(1000); //DeadlockChecker.check();
} static class DeadlockChecker
{ private final static ThreadMXBean mbean = ManagementFactory
.getThreadMXBean(); final static Runnable deadlockChecker = new Runnable()
{ @Override
public void run()
{ // TODO Auto-generated method stub
while (true)
{ long[] deadlockedThreadIds = mbean.findDeadlockedThreads(); if (deadlockedThreadIds != null)
{
ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds); for (Thread t : Thread.getAllStackTraces().keySet())
{ for (int i = 0; i < threadInfos.length; i++)
{ if(t.getId() == threadInfos[i].getThreadId())
{
t.interrupt();
}
}
}
} try
{
Thread.sleep(5000);
} catch (Exception e)
{ // TODO: handle exception
}
}
}
};
public static void check()
{
Thread t = new Thread(deadlockChecker);
t.setDaemon(true);
t.start();
}
}
}
ä¸è¿°ä»£ç æå¯è½ä¼åçæ»éï¼çº¿ç¨1å¾å°lock1ï¼çº¿ç¨2å¾å°lock2ï¼ç¶åå½¼æ¤åæ³è·å¾å¯¹æ¹çéã
æ们ç¨jstackæ¥çè¿è¡ä¸è¿°ä»£ç åçæ åµ
çç¡®åç°äºä¸ä¸ªæ»éã
DeadlockChecker.check();æ¹æ³ç¨æ¥æ£æµæ»éï¼ç¶åææ»éç线ç¨ä¸æãä¸æåï¼çº¿ç¨æ£å¸¸éåºã
1.1.3.å¯éæ¶
è¶ æ¶ä¸è½è·å¾éï¼å°±è¿åfalseï¼ä¸ä¼æ°¸ä¹ çå¾ æææ»é
使ç¨lock.tryLock(long timeout, TimeUnit unit)æ¥å®ç°å¯éæ¶éï¼åæ°ä¸ºæ¶é´ååä½ã
举个ä¾åæ¥è¯´æä¸å¯éæ¶ï¼
package test;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable{ public static ReentrantLock lock = new ReentrantLock(); @Override
public void run()
{ try
{ if (lock.tryLock(5, TimeUnit.SECONDS))
{
Thread.sleep(6000);
} else
{
System.out.println("get lock failed");
}
} catch (Exception e)
{
} finally
{ if (lock.isHeldByCurrentThread())
{
lock.unlock();
}
}
}
public static void main(String[] args)
{
Test t = new Test();
Thread t1 = new Thread(t);
Thread t2 = new Thread(t);
t1.start();
t2.start();
}
}
使ç¨ä¸¤ä¸ªçº¿ç¨æ¥äºå¤ºä¸æéï¼å½æ个线ç¨è·å¾éåï¼sleep6ç§ï¼æ¯ä¸ªçº¿ç¨é½åªå°è¯5ç§å»è·å¾éã
æä»¥å¿ å®æä¸ä¸ªçº¿ç¨æ æ³è·å¾éãæ æ³è·å¾åå°±ç´æ¥éåºäºã
è¾åºï¼
get lock failed
1.1.4.å ¬å¹³é
使ç¨æ¹å¼ï¼
public ReentrantLock(boolean fair) public static ReentrantLock fairLock = new ReentrantLock(true);
ä¸è¬æä¹ä¸çéæ¯ä¸å ¬å¹³çï¼ä¸ä¸å®å æ¥ç线ç¨è½å å¾å°éï¼åæ¥ç线ç¨å°±åå¾å°éãä¸å ¬å¹³çéå¯è½ä¼äº§ç饥饿ç°è±¡ã
å ¬å¹³éçææå°±æ¯ï¼è¿ä¸ªéè½ä¿è¯çº¿ç¨æ¯å æ¥çå å¾å°éãè½ç¶å ¬å¹³éä¸ä¼äº§ç饥饿ç°è±¡ï¼ä½æ¯å ¬å¹³éçæ§è½ä¼æ¯éå ¬å¹³éå·®å¾å¤ã
1.2 Condition
Conditionä¸ReentrantLockçå ³ç³»å°±ç±»ä¼¼äºsynchronizedä¸Object.wait()/signal()
await()æ¹æ³ä¼ä½¿å½å线ç¨çå¾
ï¼åæ¶éæ¾å½åéï¼å½å
¶ä»çº¿ç¨ä¸ä½¿ç¨signal()æ¶æè
signalAll()æ¹æ³æ¶ï¼çº¿ ç¨ä¼éæ°è·å¾é并继ç»æ§è¡ãæè
å½çº¿ç¨è¢«ä¸ææ¶ï¼ä¹è½è·³åºçå¾
ãè¿åObject.wait()æ¹æ³å¾ç¸ä¼¼ã
awaitUninterruptibly()æ¹æ³ä¸await()æ¹æ³åºæ¬ç¸åï¼ä½æ¯å®å¹¶ä¸ä¼åçå¾
è¿ç¨ä¸ååºä¸æã singal()æ¹æ³ç¨äºå¤éä¸ä¸ªå¨çå¾
ä¸ç线ç¨ãç¸å¯¹çsingalAll()æ¹æ³ä¼å¤éææå¨çå¾
ä¸ç线ç¨ãè¿åObejct.notify()æ¹æ³å¾ç±»ä¼¼ã
è¿éå°±ä¸å详ç»ä»ç»äºã举个ä¾åæ¥è¯´æï¼
package test;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class Test implements Runnable{ public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition();
@Override public void run() { try
{ lock.lock();
condition.await();
System.out.println("Thread is going on");
} catch (Exception e)
{
e.printStackTrace();
} finally
{ lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
Test t = new Test();
Thread thread = new Thread(t);
thread.start();
Thread.sleep(2000);
lock.lock();
condition.signal(); lock.unlock();
}
}
ä¸è¿°ä¾åå¾ç®åï¼è®©ä¸ä¸ªçº¿ç¨awaitä½ï¼è®©ä¸»çº¿ç¨å»å¤éå®ãcondition.await()/signalåªè½å¨å¾å°é以å使ç¨ã
1.3.Semaphore
对äºéæ¥è¯´ï¼å®æ¯äºæ¥çæä»çãææå°±æ¯ï¼åªè¦æè·å¾äºéï¼æ²¡äººè½åè·å¾äºã
è对äºSemaphoreæ¥è¯´ï¼å®å 许å¤ä¸ªçº¿ç¨åæ¶è¿å ¥ä¸´çåºãå¯ä»¥è®¤ä¸ºå®æ¯ä¸ä¸ªå ±äº«éï¼ä½æ¯å ±äº«çé¢åº¦æ¯æéå¶çï¼é¢åº¦ç¨å®äºï¼å ¶ä»æ²¡ææ¿å°é¢åº¦ç线ç¨è¿æ¯è¦é»å¡å¨ä¸´çåºå¤ãå½é¢åº¦ä¸º1æ¶ï¼å°±ç¸çäºlock
ä¸é¢ä¸¾ä¸ªä¾åï¼
package test;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class Test implements Runnable{ final Semaphore semaphore = new Semaphore(5); @Override
public void run()
{ try
{
semaphore.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + " done");
} catch (Exception e)
{
e.printStackTrace();
}finally {
semaphore.release();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(20); final Test t = new Test(); for (int i = 0; i < 20; i++)
{
executorService.submit(t);
}
}
}
æä¸ä¸ª20个线ç¨ç线ç¨æ± ï¼æ¯ä¸ªçº¿ç¨é½å» Semaphoreç许å¯ï¼Semaphoreç许å¯åªæ5个ï¼è¿è¡åå¯ä»¥çå°ï¼5个ä¸æ¹ï¼ä¸æ¹ä¸æ¹å°è¾åºã
å½ç¶ä¸ä¸ªçº¿ç¨ä¹å¯ä»¥ä¸æ¬¡ç³è¯·å¤ä¸ªè®¸å¯
public void acquire(int permits) throws InterruptedException
1.4 ReadWriteLock
ReadWriteLockæ¯åºååè½çéã读ååæ¯ä¸¤ç§ä¸åçåè½ï¼è¯»-读ä¸äºæ¥ï¼è¯»-åäºæ¥ï¼å-åäºæ¥ã
è¿æ ·ç设计æ¯å¹¶åéæé«äºï¼åä¿è¯äºæ°æ®å®å ¨ã
使ç¨æ¹å¼ï¼
private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
详ç»ä¾åå¯ä»¥æ¥ç Javaå®ç°ç产è æ¶è´¹è é®é¢ä¸è¯»è åè é®é¢ï¼è¿éå°±ä¸å±å¼äºã
1.5 CountDownLatch
åæ°è®¡æ¶å¨
ä¸ç§å
¸åçåºæ¯å°±æ¯ç«ç®åå°ãå¨ç«ç®åå°åï¼ä¸ºäºä¿è¯ä¸æ ä¸å¤±ï¼å¾å¾è¿è¦è¿è¡å项设å¤ã仪å¨çæ£æ¥ã åªæçæææ£æ¥å®æ¯åï¼å¼ææè½ç¹ç«ãè¿ç§åºæ¯å°±é常éå使ç¨CountDownLatchãå®å¯ä»¥ä½¿å¾ç¹ç«çº¿ç¨
ï¼çå¾
æææ£æ¥çº¿ç¨å
¨é¨å®å·¥åï¼åæ§è¡
使ç¨æ¹å¼ï¼
static final CountDownLatch end = new CountDownLatch(10);
end.countDown();
end.await();
示æå¾ï¼
ä¸ä¸ªç®åçä¾åï¼
package test;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Test implements Runnable{ static final CountDownLatch countDownLatch = new CountDownLatch(10); static final Test t = new Test(); @Override
public void run()
{ try
{
Thread.sleep(2000);
System.out.println("complete");
countDownLatch.countDown();
} catch (Exception e)
{
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++)
{
executorService.execute(t);
}
countDownLatch.await();
System.out.println("end");
executorService.shutdown();
}
}
主线ç¨å¿ é¡»çå¾ 10个线ç¨å ¨é¨æ§è¡å®æä¼è¾åº"end"ã
1.6 CyclicBarrier
åCountDownLatchç¸ä¼¼ï¼ä¹æ¯çå¾ æäºçº¿ç¨é½åå®ä»¥ååæ§è¡ãä¸CountDownLatchåºå«å¨äºè¿ä¸ªè®¡æ°å¨å¯ä»¥åå¤ä½¿ç¨ãæ¯å¦ï¼å设æ们å°è®¡æ°å¨è®¾ç½®ä¸º10ãé£ä¹åé½ç¬¬ä¸æ¹1 0个线ç¨åï¼è®¡æ°å¨å°±ä¼å½é¶ï¼ç¶åæ¥çåé½ä¸ä¸æ¹10个线ç¨
使ç¨æ¹å¼ï¼
public CyclicBarrier(int parties, Runnable barrierAction) barrierActionå°±æ¯å½è®¡æ°å¨ä¸æ¬¡è®¡æ°å®æåï¼ç³»ç»ä¼æ§è¡çå¨ä½await()
示æå¾ï¼
ä¸é¢ä¸¾ä¸ªä¾åï¼
package test;import java.util.concurrent.CyclicBarrier;public class Test implements Runnable{ private String soldier; private final CyclicBarrier cyclic; public Test(String soldier, CyclicBarrier cyclic)
{ this.soldier = soldier; this.cyclic = cyclic;
} @Override
public void run()
{ try
{ //çå¾
ææ士å
µå°é½
cyclic.await();
dowork(); //çå¾
ææ士å
µå®æå·¥ä½
cyclic.await();
} catch (Exception e)
{ // TODO Auto-generated catch block
e.printStackTrace();
}
} private void dowork()
{ // TODO Auto-generated method stub
try
{
Thread.sleep(3000);
} catch (Exception e)
{ // TODO: handle exception
}
System.out.println(soldier + ": done");
} public static class BarrierRun implements Runnable
{ boolean flag; int n; public BarrierRun(boolean flag, int n)
{ super(); this.flag = flag; this.n = n;
} @Override
public void run()
{ if (flag)
{
System.out.println(n + "个任å¡å®æ");
} else
{
System.out.println(n + "个éåå®æ");
flag = true;
}
}
} public static void main(String[] args)
{ final int n = 10;
Thread[] threads = new Thread[n]; boolean flag = false;
CyclicBarrier barrier = new CyclicBarrier(n, new BarrierRun(flag, n));
System.out.println("éå"); for (int i = 0; i < n; i++)
{
System.out.println(i + "æ¥é");
threads[i] = new Thread(new Test("士å
µ" + i, barrier));
threads[i].start();
}
}
}
æå°ç»æï¼
éå
士å
µ5: done士å
µ7: done士å
µ8: done士å
µ3: done士å
µ4: done士å
µ1: done士å
µ6: done士å
µ2: done士å
µ0: done士å
µ9: done10个任å¡å®æ
1.7 LockSupport
æä¾çº¿ç¨é»å¡åè¯
åsuspend类似
LockSupport.park();
LockSupport.unpark(t1);
ä¸suspendç¸æ¯ ä¸å®¹æå¼èµ·çº¿ç¨å»ç»
LockSupportçææ³å¢ï¼å Semaphoreæç¹ç¸ä¼¼ï¼å é¨æä¸ä¸ªè®¸å¯ï¼parkçæ¶åæ¿æè¿ä¸ªè®¸å¯ï¼unparkçæ¶åç³è¯·è¿ä¸ªè®¸å¯ãæ以å¦æunparkå¨parkä¹åï¼æ¯ä¸ä¼åç线ç¨å»ç»çã
ä¸é¢ç代ç æ¯[é«å¹¶åJava äº] å¤çº¿ç¨åºç¡ä¸suspend示ä¾ä»£ç ï¼å¨ä½¿ç¨suspendæ¶ä¼åçæ»éã
èä½¿ç¨ LockSupportåä¸ä¼åçæ»éã
å¦å¤
park()è½å¤ååºä¸æï¼ä½ä¸æåºå¼å¸¸ãä¸æååºçç»ææ¯ï¼park()å½æ°çè¿åï¼å¯ä»¥ä»Thread.interrupted()å¾å°ä¸ææ å¿ã
å¨JDKå½ä¸æ大éå°æ¹ä½¿ç¨å°äºparkï¼å½ç¶LockSupportçå®ç°ä¹æ¯ä½¿ç¨unsafe.park()æ¥å®ç°çã
public static void park() { unsafe.park(false, 0L);
}
1.8 ReentrantLock çå®ç°
ä¸é¢æ¥ä»ç»ä¸ReentrantLockçå®ç°ï¼ReentrantLockçå®ç°ä¸»è¦ç±3é¨åç»æï¼
CASç¶æ
çå¾ éå
park()
ReentrantLockçç¶ç±»ä¸ä¼æä¸ä¸ªstateåéæ¥è¡¨ç¤ºåæ¥çç¶æ
/**éè¿CASæä½æ¥è®¾ç½®stateæ¥è·åéï¼å¦æ设置æäº1ï¼åå°éçææè ç»å½å线ç¨
final void lock() { if (compareAndSetState(0, 1))å¦ææ¿éä¸æåï¼åä¼åä¸ä¸ªç³è¯·
public final void acquire(int arg) { if (!tryAcquire(arg) &&é¦å ï¼åå»ç³è¯·ä¸è¯è¯çtryAcquireï¼å 为æ¤æ¶å¯è½å¦ä¸ä¸ªçº¿ç¨å·²ç»éæ¾äºéã
å¦æè¿æ¯æ²¡æç³è¯·å°éï¼å°±addWaiterï¼æææ¯æèªå·±å å°çå¾ éåä¸å»
å ¶é´è¿ä¼æå¤æ¬¡å°è¯å»ç³è¯·éï¼å¦æè¿æ¯ç³è¯·ä¸å°ï¼å°±ä¼è¢«æèµ·
private final boolean parkAndCheckInterrupt() {åçï¼å¦æå¨unlockæä½ä¸ï¼å°±æ¯éæ¾äºéï¼ç¶åunparkï¼è¿éå°±ä¸å ·ä½è®²äºã
2. 并å容å¨åå ¸åæºç åæ
2.1 ConcurrentHashMap
æ们ç¥éHashMapä¸æ¯ä¸ä¸ªçº¿ç¨å®å ¨ç容å¨ï¼æç®åçæ¹å¼ä½¿HashMapåæ线ç¨å®å ¨å°±æ¯ä½¿ç¨Collections.synchronizedMapï¼å®æ¯å¯¹HashMapçä¸ä¸ªå è£
public static Map m=Collections.synchronizedMap(new HashMap());åç对äºListï¼Setä¹æä¾äºç¸ä¼¼æ¹æ³ã
ä½æ¯è¿ç§æ¹å¼åªéåäºå¹¶åéæ¯è¾å°çæ åµã
æ们æ¥çä¸synchronizedMapçå®ç°
å®ä¼å°HashMapå è£ å¨éé¢ï¼ç¶åå°HashMapçæ¯ä¸ªæä½é½å ä¸synchronizedã
ç±äºæ¯ä¸ªæ¹æ³é½æ¯è·ååä¸æé(mutex)ï¼è¿å°±æå³çï¼putåremoveçæä½æ¯äºæ¥çï¼å¤§å¤§åå°äºå¹¶åéã
ä¸é¢æ¥çä¸ConcurrentHashMapæ¯å¦ä½å®ç°ç
å¨ ConcurrentHashMapå é¨æä¸ä¸ªSegment段ï¼å®å°å¤§çHashMapååæè¥å¹²ä¸ªæ®µï¼å°çHashMapï¼ï¼ç¶å让æ°æ®å¨æ¯ä¸æ®µä¸Hashï¼è¿æ ·å¤ä¸ªçº¿ç¨å¨ä¸å段ä¸çHashæä½ä¸å®æ¯çº¿ç¨å®å ¨çï¼æ以åªéè¦åæ¥åä¸ä¸ªæ®µä¸ç线ç¨å°±å¯ä»¥äºï¼è¿æ ·å®ç°äºéçå离ï¼å¤§å¤§å¢å äºå¹¶åéã
å¨ä½¿ç¨ConcurrentHashMap.sizeæ¶ä¼æ¯è¾éº»ç¦ï¼å 为å®è¦ç»è®¡æ¯ä¸ªæ®µçæ°æ®åï¼å¨è¿ä¸ªæ¶åï¼è¦ææ¯ä¸ä¸ªæ®µé½å ä¸éï¼ç¶åååæ°æ®ç»è®¡ãè¿ä¸ªå°±æ¯æéå离åçå°å°å¼ç«¯ï¼ä½æ¯sizeæ¹æ³åºè¯¥æ¯ä¸ä¼è¢«é«é¢çè°ç¨çæ¹æ³ã
å¨å®ç°ä¸ï¼ä¸ä½¿ç¨synchronizedålock.lockèæ¯å°½é使ç¨trylockï¼åæ¶å¨HashMapçå®ç°ä¸ï¼ä¹åäºä¸ç¹ä¼åãè¿éå°±ä¸æäºã
2.2 BlockingQueue
BlockingQueueä¸æ¯ä¸ä¸ªé«æ§è½ç容å¨ãä½æ¯å®æ¯ä¸ä¸ªé常好çå ±äº«æ°æ®ç容å¨ãæ¯å ¸åçç产è åæ¶è´¹è çå®ç°ã